Allow multifd to open file-backed channels. This will be used when
enabling the fixed-ram migration stream format which expects a
seekable transport.
The QIOChannel read and write methods will use the preadv/pwritev
versions which don't update the file offset at each call so we can
reuse the fd without re-opening for every channel.
Note that this is just setup code and multifd cannot yet make use of
the file channels.
Signed-off-by: Fabiano Rosas <farosas@suse.de>
---
- open multifd channels with O_WRONLY and no mode
- stop cancelling migration and propagate error via qio_task
---
migration/file.c | 47 +++++++++++++++++++++++++++++++++++++++++--
migration/file.h | 5 +++++
migration/multifd.c | 14 +++++++++++--
migration/options.c | 7 +++++++
migration/options.h | 1 +
migration/qemu-file.h | 1 -
6 files changed, 70 insertions(+), 5 deletions(-)
diff --git a/migration/file.c b/migration/file.c
index 5d4975f43e..67d6f42da7 100644
--- a/migration/file.c
+++ b/migration/file.c
@@ -17,6 +17,10 @@
#define OFFSET_OPTION ",offset="
+static struct FileOutgoingArgs {
+ char *fname;
+} outgoing_args;
+
/* Remove the offset option from @filespec and return it in @offsetp. */
int file_parse_offset(char *filespec, uint64_t *offsetp, Error **errp)
@@ -36,6 +40,42 @@ int file_parse_offset(char *filespec, uint64_t *offsetp, Error **errp)
return 0;
}
+static void qio_channel_file_connect_worker(QIOTask *task, gpointer opaque)
+{
+ /* noop */
+}
+
+int file_send_channel_destroy(QIOChannel *ioc)
+{
+ if (ioc) {
+ qio_channel_close(ioc, NULL);
+ object_unref(OBJECT(ioc));
+ }
+ g_free(outgoing_args.fname);
+ outgoing_args.fname = NULL;
+
+ return 0;
+}
+
+void file_send_channel_create(QIOTaskFunc f, void *data)
+{
+ QIOChannelFile *ioc;
+ QIOTask *task;
+ Error *err = NULL;
+ int flags = O_WRONLY;
+
+ ioc = qio_channel_file_new_path(outgoing_args.fname, flags, 0, &err);
+
+ task = qio_task_new(OBJECT(ioc), f, (gpointer)data, NULL);
+ if (!ioc) {
+ qio_task_set_error(task, err);
+ return;
+ }
+
+ qio_task_run_in_thread(task, qio_channel_file_connect_worker,
+ (gpointer)data, NULL, NULL);
+}
+
void file_start_outgoing_migration(MigrationState *s,
FileMigrationArgs *file_args, Error **errp)
{
@@ -43,15 +83,18 @@ void file_start_outgoing_migration(MigrationState *s,
g_autofree char *filename = g_strdup(file_args->filename);
uint64_t offset = file_args->offset;
QIOChannel *ioc;
+ int flags = O_CREAT | O_TRUNC | O_WRONLY;
+ mode_t mode = 0660;
trace_migration_file_outgoing(filename);
- fioc = qio_channel_file_new_path(filename, O_CREAT | O_WRONLY | O_TRUNC,
- 0600, errp);
+ fioc = qio_channel_file_new_path(filename, flags, mode, errp);
if (!fioc) {
return;
}
+ outgoing_args.fname = g_strdup(filename);
+
ioc = QIO_CHANNEL(fioc);
if (offset && qio_channel_io_seek(ioc, offset, SEEK_SET, errp) < 0) {
return;
diff --git a/migration/file.h b/migration/file.h
index 37d6a08bfc..511019b319 100644
--- a/migration/file.h
+++ b/migration/file.h
@@ -9,10 +9,15 @@
#define QEMU_MIGRATION_FILE_H
#include "qapi/qapi-types-migration.h"
+#include "io/task.h"
+#include "channel.h"
void file_start_incoming_migration(FileMigrationArgs *file_args, Error **errp);
void file_start_outgoing_migration(MigrationState *s,
FileMigrationArgs *file_args, Error **errp);
int file_parse_offset(char *filespec, uint64_t *offsetp, Error **errp);
+
+void file_send_channel_create(QIOTaskFunc f, void *data);
+int file_send_channel_destroy(QIOChannel *ioc);
#endif
diff --git a/migration/multifd.c b/migration/multifd.c
index 123ff0dec0..427740aab6 100644
--- a/migration/multifd.c
+++ b/migration/multifd.c
@@ -17,6 +17,7 @@
#include "exec/ramblock.h"
#include "qemu/error-report.h"
#include "qapi/error.h"
+#include "file.h"
#include "ram.h"
#include "migration.h"
#include "migration-stats.h"
@@ -28,6 +29,7 @@
#include "threadinfo.h"
#include "options.h"
#include "qemu/yank.h"
+#include "io/channel-file.h"
#include "io/channel-socket.h"
#include "yank_functions.h"
@@ -511,7 +513,11 @@ static void multifd_send_terminate_threads(Error *err)
static int multifd_send_channel_destroy(QIOChannel *send)
{
- return socket_send_channel_destroy(send);
+ if (migrate_to_file()) {
+ return file_send_channel_destroy(send);
+ } else {
+ return socket_send_channel_destroy(send);
+ }
}
void multifd_save_cleanup(void)
@@ -904,7 +910,11 @@ static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque)
static void multifd_new_send_channel_create(gpointer opaque)
{
- socket_send_channel_create(multifd_new_send_channel_async, opaque);
+ if (migrate_to_file()) {
+ file_send_channel_create(multifd_new_send_channel_async, opaque);
+ } else {
+ socket_send_channel_create(multifd_new_send_channel_async, opaque);
+ }
}
int multifd_save_setup(Error **errp)
diff --git a/migration/options.c b/migration/options.c
index 10730b13ba..f671e24758 100644
--- a/migration/options.c
+++ b/migration/options.c
@@ -409,6 +409,13 @@ bool migrate_tls(void)
return s->parameters.tls_creds && *s->parameters.tls_creds;
}
+bool migrate_to_file(void)
+{
+ MigrationState *s = migrate_get_current();
+
+ return qemu_file_is_seekable(s->to_dst_file);
+}
+
typedef enum WriteTrackingSupport {
WT_SUPPORT_UNKNOWN = 0,
WT_SUPPORT_ABSENT,
diff --git a/migration/options.h b/migration/options.h
index 8a19d6939c..84628a76e8 100644
--- a/migration/options.h
+++ b/migration/options.h
@@ -60,6 +60,7 @@ bool migrate_multifd_packets(void);
bool migrate_postcopy(void);
bool migrate_rdma(void);
bool migrate_tls(void);
+bool migrate_to_file(void);
/* capabilities helpers */
diff --git a/migration/qemu-file.h b/migration/qemu-file.h
index 32fd4a34fd..78ea21ab98 100644
--- a/migration/qemu-file.h
+++ b/migration/qemu-file.h
@@ -83,5 +83,4 @@ size_t qemu_get_buffer_at(QEMUFile *f, const uint8_t *buf, size_t buflen,
off_t pos);
QIOChannel *qemu_file_get_ioc(QEMUFile *file);
-
#endif
--
2.35.3
On Mon, Nov 27, 2023 at 05:25:55PM -0300, Fabiano Rosas wrote: > Allow multifd to open file-backed channels. This will be used when > enabling the fixed-ram migration stream format which expects a > seekable transport. > > The QIOChannel read and write methods will use the preadv/pwritev > versions which don't update the file offset at each call so we can > reuse the fd without re-opening for every channel. > > Note that this is just setup code and multifd cannot yet make use of > the file channels. > > Signed-off-by: Fabiano Rosas <farosas@suse.de> > --- > - open multifd channels with O_WRONLY and no mode > - stop cancelling migration and propagate error via qio_task > --- > migration/file.c | 47 +++++++++++++++++++++++++++++++++++++++++-- > migration/file.h | 5 +++++ > migration/multifd.c | 14 +++++++++++-- > migration/options.c | 7 +++++++ > migration/options.h | 1 + > migration/qemu-file.h | 1 - > 6 files changed, 70 insertions(+), 5 deletions(-) > > diff --git a/migration/file.c b/migration/file.c > index 5d4975f43e..67d6f42da7 100644 > --- a/migration/file.c > +++ b/migration/file.c > @@ -17,6 +17,10 @@ > > #define OFFSET_OPTION ",offset=" > > +static struct FileOutgoingArgs { > + char *fname; > +} outgoing_args; > + > /* Remove the offset option from @filespec and return it in @offsetp. */ > > int file_parse_offset(char *filespec, uint64_t *offsetp, Error **errp) > @@ -36,6 +40,42 @@ int file_parse_offset(char *filespec, uint64_t *offsetp, Error **errp) > return 0; > } > > +static void qio_channel_file_connect_worker(QIOTask *task, gpointer opaque) > +{ > + /* noop */ > +} > + > +int file_send_channel_destroy(QIOChannel *ioc) > +{ > + if (ioc) { > + qio_channel_close(ioc, NULL); > + object_unref(OBJECT(ioc)); > + } > + g_free(outgoing_args.fname); > + outgoing_args.fname = NULL; > + > + return 0; > +} > + > +void file_send_channel_create(QIOTaskFunc f, void *data) > +{ > + QIOChannelFile *ioc; > + QIOTask *task; > + Error *err = NULL; > + int flags = O_WRONLY; > + > + ioc = qio_channel_file_new_path(outgoing_args.fname, flags, 0, &err); > + > + task = qio_task_new(OBJECT(ioc), f, (gpointer)data, NULL); > + if (!ioc) { > + qio_task_set_error(task, err); > + return; > + } > + > + qio_task_run_in_thread(task, qio_channel_file_connect_worker, > + (gpointer)data, NULL, NULL); This is pretty weird. This invokes a thread, but it'll run a noop. It seems meaningless to me. I assume you wanted to keep using the same async model as the socket typed multifd, but I don't think that works anyway, because file open blocks at qio_channel_file_new_path() so it's sync anyway. AFAICT we still share the code, as long as the file path properly invokes multifd_channel_connect() after the iochannel is setup. > +} > + > void file_start_outgoing_migration(MigrationState *s, > FileMigrationArgs *file_args, Error **errp) > { > @@ -43,15 +83,18 @@ void file_start_outgoing_migration(MigrationState *s, > g_autofree char *filename = g_strdup(file_args->filename); > uint64_t offset = file_args->offset; > QIOChannel *ioc; > + int flags = O_CREAT | O_TRUNC | O_WRONLY; > + mode_t mode = 0660; > > trace_migration_file_outgoing(filename); > > - fioc = qio_channel_file_new_path(filename, O_CREAT | O_WRONLY | O_TRUNC, > - 0600, errp); > + fioc = qio_channel_file_new_path(filename, flags, mode, errp); > if (!fioc) { > return; > } > > + outgoing_args.fname = g_strdup(filename); > + > ioc = QIO_CHANNEL(fioc); > if (offset && qio_channel_io_seek(ioc, offset, SEEK_SET, errp) < 0) { > return; > diff --git a/migration/file.h b/migration/file.h > index 37d6a08bfc..511019b319 100644 > --- a/migration/file.h > +++ b/migration/file.h > @@ -9,10 +9,15 @@ > #define QEMU_MIGRATION_FILE_H > > #include "qapi/qapi-types-migration.h" > +#include "io/task.h" > +#include "channel.h" > > void file_start_incoming_migration(FileMigrationArgs *file_args, Error **errp); > > void file_start_outgoing_migration(MigrationState *s, > FileMigrationArgs *file_args, Error **errp); > int file_parse_offset(char *filespec, uint64_t *offsetp, Error **errp); > + > +void file_send_channel_create(QIOTaskFunc f, void *data); > +int file_send_channel_destroy(QIOChannel *ioc); > #endif > diff --git a/migration/multifd.c b/migration/multifd.c > index 123ff0dec0..427740aab6 100644 > --- a/migration/multifd.c > +++ b/migration/multifd.c > @@ -17,6 +17,7 @@ > #include "exec/ramblock.h" > #include "qemu/error-report.h" > #include "qapi/error.h" > +#include "file.h" > #include "ram.h" > #include "migration.h" > #include "migration-stats.h" > @@ -28,6 +29,7 @@ > #include "threadinfo.h" > #include "options.h" > #include "qemu/yank.h" > +#include "io/channel-file.h" > #include "io/channel-socket.h" > #include "yank_functions.h" > > @@ -511,7 +513,11 @@ static void multifd_send_terminate_threads(Error *err) > > static int multifd_send_channel_destroy(QIOChannel *send) > { > - return socket_send_channel_destroy(send); > + if (migrate_to_file()) { > + return file_send_channel_destroy(send); > + } else { > + return socket_send_channel_destroy(send); > + } > } > > void multifd_save_cleanup(void) > @@ -904,7 +910,11 @@ static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque) > > static void multifd_new_send_channel_create(gpointer opaque) > { > - socket_send_channel_create(multifd_new_send_channel_async, opaque); > + if (migrate_to_file()) { > + file_send_channel_create(multifd_new_send_channel_async, opaque); > + } else { > + socket_send_channel_create(multifd_new_send_channel_async, opaque); > + } > } > > int multifd_save_setup(Error **errp) > diff --git a/migration/options.c b/migration/options.c > index 10730b13ba..f671e24758 100644 > --- a/migration/options.c > +++ b/migration/options.c > @@ -409,6 +409,13 @@ bool migrate_tls(void) > return s->parameters.tls_creds && *s->parameters.tls_creds; > } > > +bool migrate_to_file(void) > +{ > + MigrationState *s = migrate_get_current(); > + > + return qemu_file_is_seekable(s->to_dst_file); > +} Would this migrate_to_file() == migrate_multifd_packets()? Maybe we can keep using the other one and drop migrate_to_file? > + > typedef enum WriteTrackingSupport { > WT_SUPPORT_UNKNOWN = 0, > WT_SUPPORT_ABSENT, > diff --git a/migration/options.h b/migration/options.h > index 8a19d6939c..84628a76e8 100644 > --- a/migration/options.h > +++ b/migration/options.h > @@ -60,6 +60,7 @@ bool migrate_multifd_packets(void); > bool migrate_postcopy(void); > bool migrate_rdma(void); > bool migrate_tls(void); > +bool migrate_to_file(void); > > /* capabilities helpers */ > > diff --git a/migration/qemu-file.h b/migration/qemu-file.h > index 32fd4a34fd..78ea21ab98 100644 > --- a/migration/qemu-file.h > +++ b/migration/qemu-file.h > @@ -83,5 +83,4 @@ size_t qemu_get_buffer_at(QEMUFile *f, const uint8_t *buf, size_t buflen, > off_t pos); > > QIOChannel *qemu_file_get_ioc(QEMUFile *file); > - > #endif > -- > 2.35.3 > -- Peter Xu
Peter Xu <peterx@redhat.com> writes: > On Mon, Nov 27, 2023 at 05:25:55PM -0300, Fabiano Rosas wrote: >> Allow multifd to open file-backed channels. This will be used when >> enabling the fixed-ram migration stream format which expects a >> seekable transport. >> >> The QIOChannel read and write methods will use the preadv/pwritev >> versions which don't update the file offset at each call so we can >> reuse the fd without re-opening for every channel. >> >> Note that this is just setup code and multifd cannot yet make use of >> the file channels. >> >> Signed-off-by: Fabiano Rosas <farosas@suse.de> >> --- >> - open multifd channels with O_WRONLY and no mode >> - stop cancelling migration and propagate error via qio_task >> --- >> migration/file.c | 47 +++++++++++++++++++++++++++++++++++++++++-- >> migration/file.h | 5 +++++ >> migration/multifd.c | 14 +++++++++++-- >> migration/options.c | 7 +++++++ >> migration/options.h | 1 + >> migration/qemu-file.h | 1 - >> 6 files changed, 70 insertions(+), 5 deletions(-) >> >> diff --git a/migration/file.c b/migration/file.c >> index 5d4975f43e..67d6f42da7 100644 >> --- a/migration/file.c >> +++ b/migration/file.c >> @@ -17,6 +17,10 @@ >> >> #define OFFSET_OPTION ",offset=" >> >> +static struct FileOutgoingArgs { >> + char *fname; >> +} outgoing_args; >> + >> /* Remove the offset option from @filespec and return it in @offsetp. */ >> >> int file_parse_offset(char *filespec, uint64_t *offsetp, Error **errp) >> @@ -36,6 +40,42 @@ int file_parse_offset(char *filespec, uint64_t *offsetp, Error **errp) >> return 0; >> } >> >> +static void qio_channel_file_connect_worker(QIOTask *task, gpointer opaque) >> +{ >> + /* noop */ >> +} >> + >> +int file_send_channel_destroy(QIOChannel *ioc) >> +{ >> + if (ioc) { >> + qio_channel_close(ioc, NULL); >> + object_unref(OBJECT(ioc)); >> + } >> + g_free(outgoing_args.fname); >> + outgoing_args.fname = NULL; >> + >> + return 0; >> +} >> + >> +void file_send_channel_create(QIOTaskFunc f, void *data) >> +{ >> + QIOChannelFile *ioc; >> + QIOTask *task; >> + Error *err = NULL; >> + int flags = O_WRONLY; >> + >> + ioc = qio_channel_file_new_path(outgoing_args.fname, flags, 0, &err); >> + >> + task = qio_task_new(OBJECT(ioc), f, (gpointer)data, NULL); >> + if (!ioc) { >> + qio_task_set_error(task, err); >> + return; >> + } >> + >> + qio_task_run_in_thread(task, qio_channel_file_connect_worker, >> + (gpointer)data, NULL, NULL); > > This is pretty weird. This invokes a thread, but it'll run a noop. It > seems meaningless to me. > That's QIOTask weirdness isn't it? It will run the worker in the thread, but it also schedules the completion function as a glib event. So that's when multifd_new_send_channel_async() will run. The crucial aspect here is that it gets dispatched by glib on the main loop. I'm just keeping the model, except that I don't have work to do during the "connection" phase. > I assume you wanted to keep using the same async model as the socket typed > multifd, but I don't think that works anyway, because file open blocks at > qio_channel_file_new_path() so it's sync anyway. It's async regarding multifd_channel_connect(). The connections will be happening while multifd_save_setup() continues execution, IIUC. > > AFAICT we still share the code, as long as the file path properly invokes > multifd_channel_connect() after the iochannel is setup. > I don't see the point in moving any of that logic into the URI implementation. We already have the TLS handshake code which can also call multifd_channel_connect() and that is a mess. IMO we should be keeping the interface between multifd and the frontends as boilerplate as possible. >> +} >> + >> void file_start_outgoing_migration(MigrationState *s, >> FileMigrationArgs *file_args, Error **errp) >> { >> @@ -43,15 +83,18 @@ void file_start_outgoing_migration(MigrationState *s, >> g_autofree char *filename = g_strdup(file_args->filename); >> uint64_t offset = file_args->offset; >> QIOChannel *ioc; >> + int flags = O_CREAT | O_TRUNC | O_WRONLY; >> + mode_t mode = 0660; >> >> trace_migration_file_outgoing(filename); >> >> - fioc = qio_channel_file_new_path(filename, O_CREAT | O_WRONLY | O_TRUNC, >> - 0600, errp); >> + fioc = qio_channel_file_new_path(filename, flags, mode, errp); >> if (!fioc) { >> return; >> } >> >> + outgoing_args.fname = g_strdup(filename); >> + >> ioc = QIO_CHANNEL(fioc); >> if (offset && qio_channel_io_seek(ioc, offset, SEEK_SET, errp) < 0) { >> return; >> diff --git a/migration/file.h b/migration/file.h >> index 37d6a08bfc..511019b319 100644 >> --- a/migration/file.h >> +++ b/migration/file.h >> @@ -9,10 +9,15 @@ >> #define QEMU_MIGRATION_FILE_H >> >> #include "qapi/qapi-types-migration.h" >> +#include "io/task.h" >> +#include "channel.h" >> >> void file_start_incoming_migration(FileMigrationArgs *file_args, Error **errp); >> >> void file_start_outgoing_migration(MigrationState *s, >> FileMigrationArgs *file_args, Error **errp); >> int file_parse_offset(char *filespec, uint64_t *offsetp, Error **errp); >> + >> +void file_send_channel_create(QIOTaskFunc f, void *data); >> +int file_send_channel_destroy(QIOChannel *ioc); >> #endif >> diff --git a/migration/multifd.c b/migration/multifd.c >> index 123ff0dec0..427740aab6 100644 >> --- a/migration/multifd.c >> +++ b/migration/multifd.c >> @@ -17,6 +17,7 @@ >> #include "exec/ramblock.h" >> #include "qemu/error-report.h" >> #include "qapi/error.h" >> +#include "file.h" >> #include "ram.h" >> #include "migration.h" >> #include "migration-stats.h" >> @@ -28,6 +29,7 @@ >> #include "threadinfo.h" >> #include "options.h" >> #include "qemu/yank.h" >> +#include "io/channel-file.h" >> #include "io/channel-socket.h" >> #include "yank_functions.h" >> >> @@ -511,7 +513,11 @@ static void multifd_send_terminate_threads(Error *err) >> >> static int multifd_send_channel_destroy(QIOChannel *send) >> { >> - return socket_send_channel_destroy(send); >> + if (migrate_to_file()) { >> + return file_send_channel_destroy(send); >> + } else { >> + return socket_send_channel_destroy(send); >> + } >> } >> >> void multifd_save_cleanup(void) >> @@ -904,7 +910,11 @@ static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque) >> >> static void multifd_new_send_channel_create(gpointer opaque) >> { >> - socket_send_channel_create(multifd_new_send_channel_async, opaque); >> + if (migrate_to_file()) { >> + file_send_channel_create(multifd_new_send_channel_async, opaque); >> + } else { >> + socket_send_channel_create(multifd_new_send_channel_async, opaque); >> + } >> } >> >> int multifd_save_setup(Error **errp) >> diff --git a/migration/options.c b/migration/options.c >> index 10730b13ba..f671e24758 100644 >> --- a/migration/options.c >> +++ b/migration/options.c >> @@ -409,6 +409,13 @@ bool migrate_tls(void) >> return s->parameters.tls_creds && *s->parameters.tls_creds; >> } >> >> +bool migrate_to_file(void) >> +{ >> + MigrationState *s = migrate_get_current(); >> + >> + return qemu_file_is_seekable(s->to_dst_file); >> +} > > Would this migrate_to_file() == migrate_multifd_packets()? > > Maybe we can keep using the other one and drop migrate_to_file? > Possibly the other way around as you mention. I'll take a look. >> + >> typedef enum WriteTrackingSupport { >> WT_SUPPORT_UNKNOWN = 0, >> WT_SUPPORT_ABSENT, >> diff --git a/migration/options.h b/migration/options.h >> index 8a19d6939c..84628a76e8 100644 >> --- a/migration/options.h >> +++ b/migration/options.h >> @@ -60,6 +60,7 @@ bool migrate_multifd_packets(void); >> bool migrate_postcopy(void); >> bool migrate_rdma(void); >> bool migrate_tls(void); >> +bool migrate_to_file(void); >> >> /* capabilities helpers */ >> >> diff --git a/migration/qemu-file.h b/migration/qemu-file.h >> index 32fd4a34fd..78ea21ab98 100644 >> --- a/migration/qemu-file.h >> +++ b/migration/qemu-file.h >> @@ -83,5 +83,4 @@ size_t qemu_get_buffer_at(QEMUFile *f, const uint8_t *buf, size_t buflen, >> off_t pos); >> >> QIOChannel *qemu_file_get_ioc(QEMUFile *file); >> - >> #endif >> -- >> 2.35.3 >>
On Tue, Jan 16, 2024 at 10:37:48AM -0300, Fabiano Rosas wrote: > Peter Xu <peterx@redhat.com> writes: > > > On Mon, Nov 27, 2023 at 05:25:55PM -0300, Fabiano Rosas wrote: > >> Allow multifd to open file-backed channels. This will be used when > >> enabling the fixed-ram migration stream format which expects a > >> seekable transport. > >> > >> The QIOChannel read and write methods will use the preadv/pwritev > >> versions which don't update the file offset at each call so we can > >> reuse the fd without re-opening for every channel. > >> > >> Note that this is just setup code and multifd cannot yet make use of > >> the file channels. > >> > >> Signed-off-by: Fabiano Rosas <farosas@suse.de> > >> --- > >> - open multifd channels with O_WRONLY and no mode > >> - stop cancelling migration and propagate error via qio_task > >> --- > >> migration/file.c | 47 +++++++++++++++++++++++++++++++++++++++++-- > >> migration/file.h | 5 +++++ > >> migration/multifd.c | 14 +++++++++++-- > >> migration/options.c | 7 +++++++ > >> migration/options.h | 1 + > >> migration/qemu-file.h | 1 - > >> 6 files changed, 70 insertions(+), 5 deletions(-) > >> > >> diff --git a/migration/file.c b/migration/file.c > >> index 5d4975f43e..67d6f42da7 100644 > >> --- a/migration/file.c > >> +++ b/migration/file.c > >> @@ -17,6 +17,10 @@ > >> > >> #define OFFSET_OPTION ",offset=" > >> > >> +static struct FileOutgoingArgs { > >> + char *fname; > >> +} outgoing_args; > >> + > >> /* Remove the offset option from @filespec and return it in @offsetp. */ > >> > >> int file_parse_offset(char *filespec, uint64_t *offsetp, Error **errp) > >> @@ -36,6 +40,42 @@ int file_parse_offset(char *filespec, uint64_t *offsetp, Error **errp) > >> return 0; > >> } > >> > >> +static void qio_channel_file_connect_worker(QIOTask *task, gpointer opaque) > >> +{ > >> + /* noop */ > >> +} > >> + > >> +int file_send_channel_destroy(QIOChannel *ioc) > >> +{ > >> + if (ioc) { > >> + qio_channel_close(ioc, NULL); > >> + object_unref(OBJECT(ioc)); > >> + } > >> + g_free(outgoing_args.fname); > >> + outgoing_args.fname = NULL; > >> + > >> + return 0; > >> +} > >> + > >> +void file_send_channel_create(QIOTaskFunc f, void *data) > >> +{ > >> + QIOChannelFile *ioc; > >> + QIOTask *task; > >> + Error *err = NULL; > >> + int flags = O_WRONLY; > >> + > >> + ioc = qio_channel_file_new_path(outgoing_args.fname, flags, 0, &err); > >> + > >> + task = qio_task_new(OBJECT(ioc), f, (gpointer)data, NULL); > >> + if (!ioc) { > >> + qio_task_set_error(task, err); > >> + return; > >> + } > >> + > >> + qio_task_run_in_thread(task, qio_channel_file_connect_worker, > >> + (gpointer)data, NULL, NULL); > > > > This is pretty weird. This invokes a thread, but it'll run a noop. It > > seems meaningless to me. > > > > That's QIOTask weirdness isn't it? It will run the worker in the thread, > but it also schedules the completion function as a glib event. So that's > when multifd_new_send_channel_async() will run. The crucial aspect here > is that it gets dispatched by glib on the main loop. I'm just keeping > the model, except that I don't have work to do during the "connection" > phase. The question is why do we need that if "file:" can be done synchronously. Please see below. > > > I assume you wanted to keep using the same async model as the socket typed > > multifd, but I don't think that works anyway, because file open blocks at > > qio_channel_file_new_path() so it's sync anyway. > > It's async regarding multifd_channel_connect(). The connections will be > happening while multifd_save_setup() continues execution, IIUC. Yes. But I'm wondering whether we can start to simplify at least the "file:" for this process. We all know that we _may_ have created too many threads each doing very light work, which might not be needed. We haven't yet resolved the "how to kill a thread along this process if migration cancels during when one thread got blocked in a syscall" issue. We'll need to start recording tids for every thread, and that'll be a mess for sure when there're tons of threads. > > > > > AFAICT we still share the code, as long as the file path properly invokes > > multifd_channel_connect() after the iochannel is setup. > > > > I don't see the point in moving any of that logic into the URI > implementation. We already have the TLS handshake code which can also > call multifd_channel_connect() and that is a mess. IMO we should be > keeping the interface between multifd and the frontends as boilerplate > as possible. Hmm, I don't think it's a mess? At least multifd_channel_connect(). AFAICT multifd_channel_connect() can be called in any context. multifd_channel_connect() always creates yet another thread, no matter it's for tls handshake, or it's one of the multifd send thread. Here this series already treat file/socket differently: static void multifd_new_send_channel_create(gpointer opaque) { if (migrate_to_file()) { file_send_channel_create(multifd_new_send_channel_async, opaque); } else { socket_send_channel_create(multifd_new_send_channel_async, opaque); } } What I am thinking is it could be much simpler if multifd_new_send_channel_create() can create the multifd channels synchronously here, then directly call multifd_channel_connect(), further that'll create threads for whatever purposes. When TLS is not enabled, I'd expect if with that change and with a "file:" URI, after multifd_save_setup() completes, all send threads will be created already. I think multifd_new_send_channel_create() can already take "MultiFDSendParams *p" as parameter, then: static void multifd_new_send_channel_create(MultiFDSendParams *p) { if (migrate_to_file()) { file_send_channel_create(p); } else { socket_send_channel_create(multifd_new_send_channel_async, p); } } Where file_send_channel_create() can call multifd_channel_connect() directly upon the ioc created. Would that work for us, and much cleaner? -- Peter Xu
Peter Xu <peterx@redhat.com> writes: > On Tue, Jan 16, 2024 at 10:37:48AM -0300, Fabiano Rosas wrote: >> Peter Xu <peterx@redhat.com> writes: >> >> > On Mon, Nov 27, 2023 at 05:25:55PM -0300, Fabiano Rosas wrote: >> >> Allow multifd to open file-backed channels. This will be used when >> >> enabling the fixed-ram migration stream format which expects a >> >> seekable transport. >> >> >> >> The QIOChannel read and write methods will use the preadv/pwritev >> >> versions which don't update the file offset at each call so we can >> >> reuse the fd without re-opening for every channel. >> >> >> >> Note that this is just setup code and multifd cannot yet make use of >> >> the file channels. >> >> >> >> Signed-off-by: Fabiano Rosas <farosas@suse.de> >> >> --- >> >> - open multifd channels with O_WRONLY and no mode >> >> - stop cancelling migration and propagate error via qio_task >> >> --- >> >> migration/file.c | 47 +++++++++++++++++++++++++++++++++++++++++-- >> >> migration/file.h | 5 +++++ >> >> migration/multifd.c | 14 +++++++++++-- >> >> migration/options.c | 7 +++++++ >> >> migration/options.h | 1 + >> >> migration/qemu-file.h | 1 - >> >> 6 files changed, 70 insertions(+), 5 deletions(-) >> >> >> >> diff --git a/migration/file.c b/migration/file.c >> >> index 5d4975f43e..67d6f42da7 100644 >> >> --- a/migration/file.c >> >> +++ b/migration/file.c >> >> @@ -17,6 +17,10 @@ >> >> >> >> #define OFFSET_OPTION ",offset=" >> >> >> >> +static struct FileOutgoingArgs { >> >> + char *fname; >> >> +} outgoing_args; >> >> + >> >> /* Remove the offset option from @filespec and return it in @offsetp. */ >> >> >> >> int file_parse_offset(char *filespec, uint64_t *offsetp, Error **errp) >> >> @@ -36,6 +40,42 @@ int file_parse_offset(char *filespec, uint64_t *offsetp, Error **errp) >> >> return 0; >> >> } >> >> >> >> +static void qio_channel_file_connect_worker(QIOTask *task, gpointer opaque) >> >> +{ >> >> + /* noop */ >> >> +} >> >> + >> >> +int file_send_channel_destroy(QIOChannel *ioc) >> >> +{ >> >> + if (ioc) { >> >> + qio_channel_close(ioc, NULL); >> >> + object_unref(OBJECT(ioc)); >> >> + } >> >> + g_free(outgoing_args.fname); >> >> + outgoing_args.fname = NULL; >> >> + >> >> + return 0; >> >> +} >> >> + >> >> +void file_send_channel_create(QIOTaskFunc f, void *data) >> >> +{ >> >> + QIOChannelFile *ioc; >> >> + QIOTask *task; >> >> + Error *err = NULL; >> >> + int flags = O_WRONLY; >> >> + >> >> + ioc = qio_channel_file_new_path(outgoing_args.fname, flags, 0, &err); >> >> + >> >> + task = qio_task_new(OBJECT(ioc), f, (gpointer)data, NULL); >> >> + if (!ioc) { >> >> + qio_task_set_error(task, err); >> >> + return; >> >> + } >> >> + >> >> + qio_task_run_in_thread(task, qio_channel_file_connect_worker, >> >> + (gpointer)data, NULL, NULL); >> > >> > This is pretty weird. This invokes a thread, but it'll run a noop. It >> > seems meaningless to me. >> > >> >> That's QIOTask weirdness isn't it? It will run the worker in the thread, >> but it also schedules the completion function as a glib event. So that's >> when multifd_new_send_channel_async() will run. The crucial aspect here >> is that it gets dispatched by glib on the main loop. I'm just keeping >> the model, except that I don't have work to do during the "connection" >> phase. > > The question is why do we need that if "file:" can be done synchronously. I guess I tend to avoid changing existing patterns when adding a new feature. But you're right, we don't really need this. > Please see below. > >> >> > I assume you wanted to keep using the same async model as the socket typed >> > multifd, but I don't think that works anyway, because file open blocks at >> > qio_channel_file_new_path() so it's sync anyway. >> >> It's async regarding multifd_channel_connect(). The connections will be >> happening while multifd_save_setup() continues execution, IIUC. > > Yes. But I'm wondering whether we can start to simplify at least the > "file:" for this process. We all know that we _may_ have created too many > threads each doing very light work, which might not be needed. We haven't > yet resolved the "how to kill a thread along this process if migration > cancels during when one thread got blocked in a syscall" issue. We'll need > to start recording tids for every thread, and that'll be a mess for sure > when there're tons of threads. > >> >> > >> > AFAICT we still share the code, as long as the file path properly invokes >> > multifd_channel_connect() after the iochannel is setup. >> > >> >> I don't see the point in moving any of that logic into the URI >> implementation. We already have the TLS handshake code which can also >> call multifd_channel_connect() and that is a mess. IMO we should be >> keeping the interface between multifd and the frontends as boilerplate >> as possible. > > Hmm, I don't think it's a mess? At least multifd_channel_connect(). AFAICT > multifd_channel_connect() can be called in any context. Well this sequence: multifd_new_send_channel_async() -> multifd_channel_connect() -> multifd_tls_channel_connect() -> new thread -> multifd_tls_handshake_thread() -> new task -> multifd_tls_outgoing_handshake() -> multifd_channel_connect() ...is not what I would call intuitive. Specifically with multifd_channel_connect() being called more than the number of multifd channels. This would be "not a mess" IMO: for (i = 0; i < migrate_multifd_channels(); i++) { multifd_tls_channel_connect(); multifd_channel_connect() -> qemu_thread_create(..., multifd_send_thread); } > multifd_channel_connect() always creates yet another thread, no matter it's > for tls handshake, or it's one of the multifd send thread. > > Here this series already treat file/socket differently: > > static void multifd_new_send_channel_create(gpointer opaque) > { > if (migrate_to_file()) { > file_send_channel_create(multifd_new_send_channel_async, opaque); > } else { > socket_send_channel_create(multifd_new_send_channel_async, opaque); > } > } > > What I am thinking is it could be much simpler if > multifd_new_send_channel_create() can create the multifd channels > synchronously here, then directly call multifd_channel_connect(), further > that'll create threads for whatever purposes. > > When TLS is not enabled, I'd expect if with that change and with a "file:" > URI, after multifd_save_setup() completes, all send threads will be created > already. > > I think multifd_new_send_channel_create() can already take > "MultiFDSendParams *p" as parameter, then: > > static void multifd_new_send_channel_create(MultiFDSendParams *p) > { > if (migrate_to_file()) { > file_send_channel_create(p); > } else { > socket_send_channel_create(multifd_new_send_channel_async, p); > } > } > > Where file_send_channel_create() can call multifd_channel_connect() > directly upon the ioc created. > > Would that work for us, and much cleaner? Looks cleaner indeed, let me give it a try.
On Wed, Jan 17, 2024 at 02:34:18PM -0300, Fabiano Rosas wrote: > Well this sequence: > > multifd_new_send_channel_async() -> multifd_channel_connect() -> > multifd_tls_channel_connect() -> new thread -> > multifd_tls_handshake_thread() -> new task -> > multifd_tls_outgoing_handshake() -> multifd_channel_connect() > > ...is not what I would call intuitive. Specifically with > multifd_channel_connect() being called more than the number of multifd > channels. > > This would be "not a mess" IMO: > > for (i = 0; i < migrate_multifd_channels(); i++) { > multifd_tls_channel_connect(); > multifd_channel_connect() -> > qemu_thread_create(..., multifd_send_thread); > } Ah, I see what you meant now, yes I agree. Let's see whether we can have a simple procedure for file first, then the possibility to make the socket path closer to file. TLS could be another story, I'm guessing Dan could have good reasons to do it like that, but we can rethink after we settle the file specific paths. -- Peter Xu
On Tue, Jan 16, 2024 at 12:05:57PM +0800, Peter Xu wrote: > On Mon, Nov 27, 2023 at 05:25:55PM -0300, Fabiano Rosas wrote: > > Allow multifd to open file-backed channels. This will be used when > > enabling the fixed-ram migration stream format which expects a > > seekable transport. > > > > The QIOChannel read and write methods will use the preadv/pwritev > > versions which don't update the file offset at each call so we can > > reuse the fd without re-opening for every channel. > > > > Note that this is just setup code and multifd cannot yet make use of > > the file channels. > > > > Signed-off-by: Fabiano Rosas <farosas@suse.de> > > --- > > - open multifd channels with O_WRONLY and no mode > > - stop cancelling migration and propagate error via qio_task > > --- > > migration/file.c | 47 +++++++++++++++++++++++++++++++++++++++++-- > > migration/file.h | 5 +++++ > > migration/multifd.c | 14 +++++++++++-- > > migration/options.c | 7 +++++++ > > migration/options.h | 1 + > > migration/qemu-file.h | 1 - > > 6 files changed, 70 insertions(+), 5 deletions(-) > > > > diff --git a/migration/file.c b/migration/file.c > > index 5d4975f43e..67d6f42da7 100644 > > --- a/migration/file.c > > +++ b/migration/file.c > > @@ -17,6 +17,10 @@ > > > > #define OFFSET_OPTION ",offset=" > > > > +static struct FileOutgoingArgs { > > + char *fname; > > +} outgoing_args; > > + > > /* Remove the offset option from @filespec and return it in @offsetp. */ > > > > int file_parse_offset(char *filespec, uint64_t *offsetp, Error **errp) > > @@ -36,6 +40,42 @@ int file_parse_offset(char *filespec, uint64_t *offsetp, Error **errp) > > return 0; > > } > > > > +static void qio_channel_file_connect_worker(QIOTask *task, gpointer opaque) > > +{ > > + /* noop */ > > +} > > + > > +int file_send_channel_destroy(QIOChannel *ioc) > > +{ > > + if (ioc) { > > + qio_channel_close(ioc, NULL); > > + object_unref(OBJECT(ioc)); > > + } > > + g_free(outgoing_args.fname); > > + outgoing_args.fname = NULL; > > + > > + return 0; > > +} > > + > > +void file_send_channel_create(QIOTaskFunc f, void *data) > > +{ > > + QIOChannelFile *ioc; > > + QIOTask *task; > > + Error *err = NULL; > > + int flags = O_WRONLY; > > + > > + ioc = qio_channel_file_new_path(outgoing_args.fname, flags, 0, &err); > > + > > + task = qio_task_new(OBJECT(ioc), f, (gpointer)data, NULL); > > + if (!ioc) { > > + qio_task_set_error(task, err); > > + return; > > + } > > + > > + qio_task_run_in_thread(task, qio_channel_file_connect_worker, > > + (gpointer)data, NULL, NULL); > > This is pretty weird. This invokes a thread, but it'll run a noop. It > seems meaningless to me. > > I assume you wanted to keep using the same async model as the socket typed > multifd, but I don't think that works anyway, because file open blocks at > qio_channel_file_new_path() so it's sync anyway. > > AFAICT we still share the code, as long as the file path properly invokes > multifd_channel_connect() after the iochannel is setup. > > > +} > > + > > void file_start_outgoing_migration(MigrationState *s, > > FileMigrationArgs *file_args, Error **errp) > > { > > @@ -43,15 +83,18 @@ void file_start_outgoing_migration(MigrationState *s, > > g_autofree char *filename = g_strdup(file_args->filename); > > uint64_t offset = file_args->offset; > > QIOChannel *ioc; > > + int flags = O_CREAT | O_TRUNC | O_WRONLY; > > + mode_t mode = 0660; > > > > trace_migration_file_outgoing(filename); > > > > - fioc = qio_channel_file_new_path(filename, O_CREAT | O_WRONLY | O_TRUNC, > > - 0600, errp); > > + fioc = qio_channel_file_new_path(filename, flags, mode, errp); > > if (!fioc) { > > return; > > } > > > > + outgoing_args.fname = g_strdup(filename); > > + > > ioc = QIO_CHANNEL(fioc); > > if (offset && qio_channel_io_seek(ioc, offset, SEEK_SET, errp) < 0) { > > return; > > diff --git a/migration/file.h b/migration/file.h > > index 37d6a08bfc..511019b319 100644 > > --- a/migration/file.h > > +++ b/migration/file.h > > @@ -9,10 +9,15 @@ > > #define QEMU_MIGRATION_FILE_H > > > > #include "qapi/qapi-types-migration.h" > > +#include "io/task.h" > > +#include "channel.h" > > > > void file_start_incoming_migration(FileMigrationArgs *file_args, Error **errp); > > > > void file_start_outgoing_migration(MigrationState *s, > > FileMigrationArgs *file_args, Error **errp); > > int file_parse_offset(char *filespec, uint64_t *offsetp, Error **errp); > > + > > +void file_send_channel_create(QIOTaskFunc f, void *data); > > +int file_send_channel_destroy(QIOChannel *ioc); > > #endif > > diff --git a/migration/multifd.c b/migration/multifd.c > > index 123ff0dec0..427740aab6 100644 > > --- a/migration/multifd.c > > +++ b/migration/multifd.c > > @@ -17,6 +17,7 @@ > > #include "exec/ramblock.h" > > #include "qemu/error-report.h" > > #include "qapi/error.h" > > +#include "file.h" > > #include "ram.h" > > #include "migration.h" > > #include "migration-stats.h" > > @@ -28,6 +29,7 @@ > > #include "threadinfo.h" > > #include "options.h" > > #include "qemu/yank.h" > > +#include "io/channel-file.h" > > #include "io/channel-socket.h" > > #include "yank_functions.h" > > > > @@ -511,7 +513,11 @@ static void multifd_send_terminate_threads(Error *err) > > > > static int multifd_send_channel_destroy(QIOChannel *send) > > { > > - return socket_send_channel_destroy(send); > > + if (migrate_to_file()) { > > + return file_send_channel_destroy(send); > > + } else { > > + return socket_send_channel_destroy(send); > > + } > > } > > > > void multifd_save_cleanup(void) > > @@ -904,7 +910,11 @@ static void multifd_new_send_channel_async(QIOTask *task, gpointer opaque) > > > > static void multifd_new_send_channel_create(gpointer opaque) > > { > > - socket_send_channel_create(multifd_new_send_channel_async, opaque); > > + if (migrate_to_file()) { > > + file_send_channel_create(multifd_new_send_channel_async, opaque); > > + } else { > > + socket_send_channel_create(multifd_new_send_channel_async, opaque); > > + } > > } > > > > int multifd_save_setup(Error **errp) > > diff --git a/migration/options.c b/migration/options.c > > index 10730b13ba..f671e24758 100644 > > --- a/migration/options.c > > +++ b/migration/options.c > > @@ -409,6 +409,13 @@ bool migrate_tls(void) > > return s->parameters.tls_creds && *s->parameters.tls_creds; > > } > > > > +bool migrate_to_file(void) > > +{ > > + MigrationState *s = migrate_get_current(); > > + > > + return qemu_file_is_seekable(s->to_dst_file); > > +} > > Would this migrate_to_file() == migrate_multifd_packets()? > > Maybe we can keep using the other one and drop migrate_to_file? Or perhaps the other way round; as migrate_to_file() is a migration global helper, so applicable without multifd. > > > + > > typedef enum WriteTrackingSupport { > > WT_SUPPORT_UNKNOWN = 0, > > WT_SUPPORT_ABSENT, > > diff --git a/migration/options.h b/migration/options.h > > index 8a19d6939c..84628a76e8 100644 > > --- a/migration/options.h > > +++ b/migration/options.h > > @@ -60,6 +60,7 @@ bool migrate_multifd_packets(void); > > bool migrate_postcopy(void); > > bool migrate_rdma(void); > > bool migrate_tls(void); > > +bool migrate_to_file(void); > > > > /* capabilities helpers */ > > > > diff --git a/migration/qemu-file.h b/migration/qemu-file.h > > index 32fd4a34fd..78ea21ab98 100644 > > --- a/migration/qemu-file.h > > +++ b/migration/qemu-file.h > > @@ -83,5 +83,4 @@ size_t qemu_get_buffer_at(QEMUFile *f, const uint8_t *buf, size_t buflen, > > off_t pos); > > > > QIOChannel *qemu_file_get_ioc(QEMUFile *file); > > - > > #endif > > -- > > 2.35.3 > > > > -- > Peter Xu -- Peter Xu
© 2016 - 2024 Red Hat, Inc.