[RFC PATCH v3 14/30] migration/multifd: Add incoming QIOChannelFile support

Fabiano Rosas posted 30 patches 1 year ago
Maintainers: Juan Quintela <quintela@redhat.com>, Peter Xu <peterx@redhat.com>, Fabiano Rosas <farosas@suse.de>, Leonardo Bras <leobras@redhat.com>, Paolo Bonzini <pbonzini@redhat.com>, David Hildenbrand <david@redhat.com>, "Philippe Mathieu-Daudé" <philmd@linaro.org>, "Daniel P. Berrangé" <berrange@redhat.com>, Eric Blake <eblake@redhat.com>, Markus Armbruster <armbru@redhat.com>, Thomas Huth <thuth@redhat.com>, Laurent Vivier <lvivier@redhat.com>
There is a newer version of this series
[RFC PATCH v3 14/30] migration/multifd: Add incoming QIOChannelFile support
Posted by Fabiano Rosas 1 year ago
On the receiving side we don't need to differentiate between main
channel and threads, so whichever channel is defined first gets to be
the main one. And since there are no packets, use the atomic channel
count to index into the params array.

Signed-off-by: Fabiano Rosas <farosas@suse.de>
---
- stop setting offset in secondary channels
- check for packets when peeking
---
 migration/file.c      | 36 ++++++++++++++++++++++++++++--------
 migration/migration.c |  3 ++-
 migration/multifd.c   |  3 +--
 migration/multifd.h   |  1 +
 4 files changed, 32 insertions(+), 11 deletions(-)

diff --git a/migration/file.c b/migration/file.c
index 67d6f42da7..62ba994109 100644
--- a/migration/file.c
+++ b/migration/file.c
@@ -7,12 +7,14 @@
 
 #include "qemu/osdep.h"
 #include "qemu/cutils.h"
+#include "qemu/error-report.h"
 #include "qapi/error.h"
 #include "channel.h"
 #include "file.h"
 #include "migration.h"
 #include "io/channel-file.h"
 #include "io/channel-util.h"
+#include "options.h"
 #include "trace.h"
 
 #define OFFSET_OPTION ",offset="
@@ -117,22 +119,40 @@ void file_start_incoming_migration(FileMigrationArgs *file_args, Error **errp)
     g_autofree char *filename = g_strdup(file_args->filename);
     QIOChannelFile *fioc = NULL;
     uint64_t offset = file_args->offset;
-    QIOChannel *ioc;
+    int channels = 1;
+    int i = 0, fd;
 
     trace_migration_file_incoming(filename);
 
     fioc = qio_channel_file_new_path(filename, O_RDONLY, 0, errp);
     if (!fioc) {
+        goto out;
+    }
+
+    if (offset &&
+        qio_channel_io_seek(QIO_CHANNEL(fioc), offset, SEEK_SET, errp) < 0) {
         return;
     }
 
-    ioc = QIO_CHANNEL(fioc);
-    if (offset && qio_channel_io_seek(ioc, offset, SEEK_SET, errp) < 0) {
+    if (migrate_multifd()) {
+        channels += migrate_multifd_channels();
+    }
+
+    fd = fioc->fd;
+
+    do {
+        QIOChannel *ioc = QIO_CHANNEL(fioc);
+
+        qio_channel_set_name(ioc, "migration-file-incoming");
+        qio_channel_add_watch_full(ioc, G_IO_IN,
+                                   file_accept_incoming_migration,
+                                   NULL, NULL,
+                                   g_main_context_get_thread_default());
+    } while (++i < channels && (fioc = qio_channel_file_new_fd(fd)));
+
+out:
+    if (!fioc) {
+        error_setg(errp, "Error creating migration incoming channel");
         return;
     }
-    qio_channel_set_name(QIO_CHANNEL(ioc), "migration-file-incoming");
-    qio_channel_add_watch_full(ioc, G_IO_IN,
-                               file_accept_incoming_migration,
-                               NULL, NULL,
-                               g_main_context_get_thread_default());
 }
diff --git a/migration/migration.c b/migration/migration.c
index 897ed1db67..16689171ab 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -838,7 +838,8 @@ void migration_ioc_process_incoming(QIOChannel *ioc, Error **errp)
     uint32_t channel_magic = 0;
     int ret = 0;
 
-    if (migrate_multifd() && !migrate_postcopy_ram() &&
+    if (migrate_multifd() && migrate_multifd_packets() &&
+        !migrate_postcopy_ram() &&
         qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_READ_MSG_PEEK)) {
         /*
          * With multiple channels, it is possible that we receive channels
diff --git a/migration/multifd.c b/migration/multifd.c
index 427740aab6..3476fac49f 100644
--- a/migration/multifd.c
+++ b/migration/multifd.c
@@ -1283,8 +1283,7 @@ void multifd_recv_new_channel(QIOChannel *ioc, Error **errp)
         /* initial packet */
         num_packets = 1;
     } else {
-        /* next patch gives this a meaningful value */
-        id = 0;
+        id = qatomic_read(&multifd_recv_state->count);
     }
 
     p = &multifd_recv_state->params[id];
diff --git a/migration/multifd.h b/migration/multifd.h
index a835643b48..a112ec7ac6 100644
--- a/migration/multifd.h
+++ b/migration/multifd.h
@@ -18,6 +18,7 @@ void multifd_save_cleanup(void);
 int multifd_load_setup(Error **errp);
 void multifd_load_cleanup(void);
 void multifd_load_shutdown(void);
+bool multifd_recv_first_channel(void);
 bool multifd_recv_all_channels_created(void);
 void multifd_recv_new_channel(QIOChannel *ioc, Error **errp);
 void multifd_recv_sync_main(void);
-- 
2.35.3
Re: [RFC PATCH v3 14/30] migration/multifd: Add incoming QIOChannelFile support
Posted by Peter Xu 10 months, 2 weeks ago
On Mon, Nov 27, 2023 at 05:25:56PM -0300, Fabiano Rosas wrote:
> On the receiving side we don't need to differentiate between main
> channel and threads, so whichever channel is defined first gets to be
> the main one. And since there are no packets, use the atomic channel
> count to index into the params array.
> 
> Signed-off-by: Fabiano Rosas <farosas@suse.de>
> ---
> - stop setting offset in secondary channels
> - check for packets when peeking
> ---
>  migration/file.c      | 36 ++++++++++++++++++++++++++++--------
>  migration/migration.c |  3 ++-
>  migration/multifd.c   |  3 +--
>  migration/multifd.h   |  1 +
>  4 files changed, 32 insertions(+), 11 deletions(-)
> 
> diff --git a/migration/file.c b/migration/file.c
> index 67d6f42da7..62ba994109 100644
> --- a/migration/file.c
> +++ b/migration/file.c
> @@ -7,12 +7,14 @@
>  
>  #include "qemu/osdep.h"
>  #include "qemu/cutils.h"
> +#include "qemu/error-report.h"
>  #include "qapi/error.h"
>  #include "channel.h"
>  #include "file.h"
>  #include "migration.h"
>  #include "io/channel-file.h"
>  #include "io/channel-util.h"
> +#include "options.h"
>  #include "trace.h"
>  
>  #define OFFSET_OPTION ",offset="
> @@ -117,22 +119,40 @@ void file_start_incoming_migration(FileMigrationArgs *file_args, Error **errp)
>      g_autofree char *filename = g_strdup(file_args->filename);
>      QIOChannelFile *fioc = NULL;
>      uint64_t offset = file_args->offset;
> -    QIOChannel *ioc;
> +    int channels = 1;
> +    int i = 0, fd;
>  
>      trace_migration_file_incoming(filename);
>  
>      fioc = qio_channel_file_new_path(filename, O_RDONLY, 0, errp);
>      if (!fioc) {
> +        goto out;

Shouldn't here be a "return"?  Won't "goto out" try to error_setg() again
and crash?

It looks like that label can be dropped.

> +    }
> +
> +    if (offset &&
> +        qio_channel_io_seek(QIO_CHANNEL(fioc), offset, SEEK_SET, errp) < 0) {
>          return;
>      }
>  
> -    ioc = QIO_CHANNEL(fioc);
> -    if (offset && qio_channel_io_seek(ioc, offset, SEEK_SET, errp) < 0) {
> +    if (migrate_multifd()) {
> +        channels += migrate_multifd_channels();
> +    }
> +
> +    fd = fioc->fd;
> +
> +    do {
> +        QIOChannel *ioc = QIO_CHANNEL(fioc);
> +
> +        qio_channel_set_name(ioc, "migration-file-incoming");
> +        qio_channel_add_watch_full(ioc, G_IO_IN,
> +                                   file_accept_incoming_migration,
> +                                   NULL, NULL,
> +                                   g_main_context_get_thread_default());
> +    } while (++i < channels && (fioc = qio_channel_file_new_fd(fd)));
> +
> +out:
> +    if (!fioc) {
> +        error_setg(errp, "Error creating migration incoming channel");
>          return;
>      }
> -    qio_channel_set_name(QIO_CHANNEL(ioc), "migration-file-incoming");
> -    qio_channel_add_watch_full(ioc, G_IO_IN,
> -                               file_accept_incoming_migration,
> -                               NULL, NULL,
> -                               g_main_context_get_thread_default());
>  }
> diff --git a/migration/migration.c b/migration/migration.c
> index 897ed1db67..16689171ab 100644
> --- a/migration/migration.c
> +++ b/migration/migration.c
> @@ -838,7 +838,8 @@ void migration_ioc_process_incoming(QIOChannel *ioc, Error **errp)
>      uint32_t channel_magic = 0;
>      int ret = 0;
>  
> -    if (migrate_multifd() && !migrate_postcopy_ram() &&
> +    if (migrate_multifd() && migrate_multifd_packets() &&
> +        !migrate_postcopy_ram() &&
>          qio_channel_has_feature(ioc, QIO_CHANNEL_FEATURE_READ_MSG_PEEK)) {
>          /*
>           * With multiple channels, it is possible that we receive channels
> diff --git a/migration/multifd.c b/migration/multifd.c
> index 427740aab6..3476fac49f 100644
> --- a/migration/multifd.c
> +++ b/migration/multifd.c
> @@ -1283,8 +1283,7 @@ void multifd_recv_new_channel(QIOChannel *ioc, Error **errp)
>          /* initial packet */
>          num_packets = 1;
>      } else {
> -        /* next patch gives this a meaningful value */
> -        id = 0;
> +        id = qatomic_read(&multifd_recv_state->count);
>      }
>  
>      p = &multifd_recv_state->params[id];
> diff --git a/migration/multifd.h b/migration/multifd.h
> index a835643b48..a112ec7ac6 100644
> --- a/migration/multifd.h
> +++ b/migration/multifd.h
> @@ -18,6 +18,7 @@ void multifd_save_cleanup(void);
>  int multifd_load_setup(Error **errp);
>  void multifd_load_cleanup(void);
>  void multifd_load_shutdown(void);
> +bool multifd_recv_first_channel(void);

This can be dropped?

>  bool multifd_recv_all_channels_created(void);
>  void multifd_recv_new_channel(QIOChannel *ioc, Error **errp);
>  void multifd_recv_sync_main(void);
> -- 
> 2.35.3
> 

-- 
Peter Xu