[PATCH 15/17] migration: Add new migration channel connect API

Avihai Horon posted 17 patches 10 months ago
Maintainers: Peter Xu <peterx@redhat.com>, Fabiano Rosas <farosas@suse.de>
[PATCH 15/17] migration: Add new migration channel connect API
Posted by Avihai Horon 10 months ago
Add a new API to connect additional migration channels other than the
main migration channel. This API removes the burden of handling the
transport type and TLS upgrade logic, and thus simplifies migration
channel connection.

It will be used in the next patches to connect multifd and postcopy
preempt channels.

Export migration_channels_and_transport_compatible() as now it is also
used outside of migration.c.

Signed-off-by: Avihai Horon <avihaih@nvidia.com>
---
 migration/channel.h    | 24 ++++++++++++++
 migration/migration.h  |  2 ++
 migration/channel.c    | 74 ++++++++++++++++++++++++++++++++++++++++++
 migration/migration.c  |  5 ++-
 migration/trace-events |  3 ++
 5 files changed, 105 insertions(+), 3 deletions(-)

diff --git a/migration/channel.h b/migration/channel.h
index 1e36bdd866..f0fa94ad9e 100644
--- a/migration/channel.h
+++ b/migration/channel.h
@@ -27,4 +27,28 @@ int migration_channel_read_peek(QIOChannel *ioc,
                                 const char *buf,
                                 const size_t buflen,
                                 Error **errp);
+
+typedef void (*MigChannelCallback)(QIOChannel *ioc, void *opaque, Error *err);
+
+/**
+ * migration_channel_connect:
+ * @callback: The callback to invoke when completed
+ * @name: The name of the channel
+ * @opaque: Opaque data to pass to @callback
+ * @tls_in_thread: Whether to run TLS handshake in new thread or not (if TLS is
+ *                 needed).
+ * @errp: Pointer to a NULL-initialized error object pointer
+ *
+ * Establishes a new migration channel and TLS upgrades it if needed. If this
+ * function succeeds, @callback will be invoked upon completion and
+ * success/failure will be reported to it via the Error object.
+ * In case multiple channels are established in parallel, @tls_in_thread should
+ * be set to true so the TLS handshake will be performed in a new thread, to
+ * avoid a potential risk of migration hang.
+ *
+ * Returns: True on successful initiation of channel establishment process, or
+ * false on failure.
+ */
+bool migration_channel_connect(MigChannelCallback callback, const char *name,
+                               void *opaque, bool tls_in_thread, Error **errp);
 #endif
diff --git a/migration/migration.h b/migration/migration.h
index dc370ab3e8..52b340e00b 100644
--- a/migration/migration.h
+++ b/migration/migration.h
@@ -523,6 +523,8 @@ bool check_dirty_bitmap_mig_alias_map(const BitmapMigrationNodeAliasList *bbm,
 void migrate_add_address(SocketAddress *address);
 bool migrate_uri_parse(const char *uri, MigrationChannel **channel,
                        Error **errp);
+bool migration_channels_and_transport_compatible(MigrationAddress *addr,
+                                                 Error **errp);
 int foreach_not_ignored_block(RAMBlockIterFunc func, void *opaque);
 
 #define qemu_ram_foreach_block \
diff --git a/migration/channel.c b/migration/channel.c
index c1f7c6d556..741974279f 100644
--- a/migration/channel.c
+++ b/migration/channel.c
@@ -21,6 +21,7 @@
 #include "io/channel-socket.h"
 #include "qemu/yank.h"
 #include "yank_functions.h"
+#include "socket.h"
 
 /**
  * @migration_channel_process_incoming - Create new incoming migration channel
@@ -101,6 +102,79 @@ void migration_channel_connect_main(MigrationState *s, QIOChannel *ioc,
     error_free(error);
 }
 
+typedef struct {
+    MigChannelCallback callback;
+    void *opaque;
+    char *name;
+    bool tls_in_thread;
+} MigChannelData;
+
+static void migration_channel_connect_tls_handshake(QIOChannel *ioc,
+                                                    void *opaque, Error *err)
+{
+    MigChannelData *data = opaque;
+
+    data->callback(ioc, data->opaque, err);
+    g_free(data->name);
+    g_free(data);
+}
+
+static void migration_channel_connect_callback(QIOTask *task, void *opaque)
+{
+    QIOChannel *ioc = QIO_CHANNEL(qio_task_get_source(task));
+    MigChannelData *data = opaque;
+    MigrationState *s = migrate_get_current();
+    Error *err = NULL;
+
+    if (qio_task_propagate_error(task, &err)) {
+        trace_migration_channel_connect_error(data->name,
+                                              error_get_pretty(err));
+        goto out;
+    }
+
+    trace_migration_channel_connect_complete(data->name);
+    if (!migrate_channel_requires_tls_upgrade(ioc)) {
+        goto out;
+    }
+
+    if (migration_tls_channel_connect(ioc, data->name, s->hostname,
+                                      migration_channel_connect_tls_handshake,
+                                      data, data->tls_in_thread, &err)) {
+        object_unref(OBJECT(ioc));
+        /* data->callback will be invoked after handshake */
+        return;
+    }
+
+out:
+    data->callback(ioc, data->opaque, err);
+    g_free(data->name);
+    g_free(data);
+}
+
+bool migration_channel_connect(MigChannelCallback callback, const char *name,
+                               void *opaque, bool tls_in_thread, Error **errp)
+{
+    MigrationState *s = migrate_get_current();
+    MigChannelData *data;
+
+    g_assert(s->address);
+    g_assert(migration_channels_and_transport_compatible(s->address, NULL));
+
+    data = g_new0(MigChannelData, 1);
+    data->callback = callback;
+    data->opaque = opaque;
+    data->name = g_strdup(name);
+    data->tls_in_thread = tls_in_thread;
+
+    trace_migration_channel_connect_start(s->hostname, name);
+    /*
+     * Currently, creating migration channels other than main channel is
+     * supported only with socket transport.
+     */
+    socket_send_channel_create(migration_channel_connect_callback, data);
+
+    return true;
+}
 
 /**
  * @migration_channel_read_peek - Peek at migration channel, without
diff --git a/migration/migration.c b/migration/migration.c
index deaa79ff14..6f985e7f74 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -141,9 +141,8 @@ static bool transport_supports_multi_channels(MigrationAddress *addr)
     return false;
 }
 
-static bool
-migration_channels_and_transport_compatible(MigrationAddress *addr,
-                                            Error **errp)
+bool migration_channels_and_transport_compatible(MigrationAddress *addr,
+                                                 Error **errp)
 {
     if (migration_needs_multiple_sockets() &&
         !transport_supports_multi_channels(addr)) {
diff --git a/migration/trace-events b/migration/trace-events
index 9a8ec67115..6c915d8567 100644
--- a/migration/trace-events
+++ b/migration/trace-events
@@ -195,6 +195,9 @@ migration_transferred_bytes(uint64_t qemu_file, uint64_t multifd, uint64_t rdma)
 # channel.c
 migration_set_incoming_channel(void *ioc, const char *ioctype) "ioc=%p ioctype=%s"
 migration_set_outgoing_channel(void *ioc, const char *ioctype, const char *hostname, void *err)  "ioc=%p ioctype=%s hostname=%s err=%p"
+migration_channel_connect_start(const char *hostname, const char *name) "hostname=%s, name=%s"
+migration_channel_connect_error(const char *name, const char *err) "name=%s, err=%s"
+migration_channel_connect_complete(const char *name) "name=%s"
 
 # global_state.c
 migrate_state_too_big(void) ""
-- 
2.26.3