From: "Maciej S. Szmigiero" <maciej.szmigiero@oracle.com>
This SaveVMHandler helps device provide its own asynchronous transmission
of the remaining data at the end of a precopy phase via multifd channels,
in parallel with the transfer done by save_live_complete_precopy handlers.
These threads are launched only when multifd device state transfer is
supported, after all save_live_complete_precopy_begin handlers have
already finished (for stream synchronization purposes).
Management of these threads in done in the multifd migration code,
wrapping them in the generic thread pool.
Signed-off-by: Maciej S. Szmigiero <maciej.szmigiero@oracle.com>
---
include/migration/misc.h | 10 ++++
include/migration/register.h | 25 +++++++++
include/qemu/typedefs.h | 4 ++
migration/multifd-device-state.c | 87 ++++++++++++++++++++++++++++++++
migration/savevm.c | 40 ++++++++++++++-
5 files changed, 165 insertions(+), 1 deletion(-)
diff --git a/include/migration/misc.h b/include/migration/misc.h
index 189de6d02ad6..26f7f3140f03 100644
--- a/include/migration/misc.h
+++ b/include/migration/misc.h
@@ -116,4 +116,14 @@ bool multifd_queue_device_state(char *idstr, uint32_t instance_id,
char *data, size_t len);
bool migration_has_device_state_support(void);
+void
+multifd_spawn_device_state_save_thread(SaveLiveCompletePrecopyThreadHandler hdlr,
+ char *idstr, uint32_t instance_id,
+ void *opaque);
+
+void multifd_launch_device_state_save_threads(int max_count);
+
+void multifd_abort_device_state_save_threads(void);
+int multifd_join_device_state_save_threads(void);
+
#endif
diff --git a/include/migration/register.h b/include/migration/register.h
index 44d8cf5192ae..ace2cfc0f75e 100644
--- a/include/migration/register.h
+++ b/include/migration/register.h
@@ -139,6 +139,31 @@ typedef struct SaveVMHandlers {
*/
int (*save_live_complete_precopy_end)(QEMUFile *f, void *opaque);
+ /* This runs in a separate thread. */
+
+ /**
+ * @save_live_complete_precopy_thread
+ *
+ * Called at the end of a precopy phase from a separate worker thread
+ * in configurations where multifd device state transfer is supported
+ * in order to perform asynchronous transmission of the remaining data in
+ * parallel with @save_live_complete_precopy handlers.
+ * The call happens after all @save_live_complete_precopy_begin handlers
+ * have finished.
+ * When postcopy is enabled, devices that support postcopy will skip this
+ * step.
+ *
+ * @idstr: this device section idstr
+ * @instance_id: this device section instance_id
+ * @abort_flag: flag indicating that the migration core wants to abort
+ * the transmission and so the handler should exit ASAP. To be read by
+ * qatomic_read() or similar.
+ * @opaque: data pointer passed to register_savevm_live()
+ *
+ * Returns zero to indicate success and negative for error
+ */
+ SaveLiveCompletePrecopyThreadHandler save_live_complete_precopy_thread;
+
/* This runs both outside and inside the BQL. */
/**
diff --git a/include/qemu/typedefs.h b/include/qemu/typedefs.h
index 9d222dc37628..edd6e7b9c116 100644
--- a/include/qemu/typedefs.h
+++ b/include/qemu/typedefs.h
@@ -130,5 +130,9 @@ typedef struct IRQState *qemu_irq;
* Function types
*/
typedef void (*qemu_irq_handler)(void *opaque, int n, int level);
+typedef int (*SaveLiveCompletePrecopyThreadHandler)(char *idstr,
+ uint32_t instance_id,
+ bool *abort_flag,
+ void *opaque);
#endif /* QEMU_TYPEDEFS_H */
diff --git a/migration/multifd-device-state.c b/migration/multifd-device-state.c
index 7b34fe736c7f..9b364e8ef33c 100644
--- a/migration/multifd-device-state.c
+++ b/migration/multifd-device-state.c
@@ -9,12 +9,17 @@
#include "qemu/osdep.h"
#include "qemu/lockable.h"
+#include "block/thread-pool.h"
#include "migration/misc.h"
#include "multifd.h"
#include "options.h"
static QemuMutex queue_job_mutex;
+ThreadPool *send_threads;
+int send_threads_ret;
+bool send_threads_abort;
+
static MultiFDSendData *device_state_send;
size_t multifd_device_state_payload_size(void)
@@ -27,6 +32,10 @@ void multifd_device_state_save_setup(void)
qemu_mutex_init(&queue_job_mutex);
device_state_send = multifd_send_data_alloc();
+
+ send_threads = thread_pool_new(NULL);
+ send_threads_ret = 0;
+ send_threads_abort = false;
}
void multifd_device_state_clear(MultiFDDeviceState_t *device_state)
@@ -37,6 +46,7 @@ void multifd_device_state_clear(MultiFDDeviceState_t *device_state)
void multifd_device_state_save_cleanup(void)
{
+ g_clear_pointer(&send_threads, thread_pool_free);
g_clear_pointer(&device_state_send, multifd_send_data_free);
qemu_mutex_destroy(&queue_job_mutex);
@@ -104,3 +114,80 @@ bool migration_has_device_state_support(void)
return migrate_multifd() && !migrate_mapped_ram() &&
migrate_multifd_compression() == MULTIFD_COMPRESSION_NONE;
}
+
+static void multifd_device_state_save_thread_complete(void *opaque, int ret)
+{
+ if (ret && !send_threads_ret) {
+ send_threads_ret = ret;
+ }
+}
+
+struct MultiFDDSSaveThreadData {
+ SaveLiveCompletePrecopyThreadHandler hdlr;
+ char *idstr;
+ uint32_t instance_id;
+ void *opaque;
+};
+
+static void multifd_device_state_save_thread_data_free(void *opaque)
+{
+ struct MultiFDDSSaveThreadData *data = opaque;
+
+ g_clear_pointer(&data->idstr, g_free);
+ g_free(data);
+}
+
+static int multifd_device_state_save_thread(void *opaque)
+{
+ struct MultiFDDSSaveThreadData *data = opaque;
+
+ return data->hdlr(data->idstr, data->instance_id, &send_threads_abort,
+ data->opaque);
+}
+
+void
+multifd_spawn_device_state_save_thread(SaveLiveCompletePrecopyThreadHandler hdlr,
+ char *idstr, uint32_t instance_id,
+ void *opaque)
+{
+ struct MultiFDDSSaveThreadData *data;
+
+ assert(migration_has_device_state_support());
+
+ data = g_new(struct MultiFDDSSaveThreadData, 1);
+ data->hdlr = hdlr;
+ data->idstr = g_strdup(idstr);
+ data->instance_id = instance_id;
+ data->opaque = opaque;
+
+ thread_pool_submit(send_threads,
+ multifd_device_state_save_thread,
+ data, multifd_device_state_save_thread_data_free,
+ multifd_device_state_save_thread_complete, NULL);
+}
+
+void multifd_launch_device_state_save_threads(int max_count)
+{
+ assert(migration_has_device_state_support());
+
+ thread_pool_set_minmax_threads(send_threads,
+ 0, max_count);
+
+ thread_pool_poll(send_threads);
+}
+
+void multifd_abort_device_state_save_threads(void)
+{
+ assert(migration_has_device_state_support());
+
+ qatomic_set(&send_threads_abort, true);
+}
+
+int multifd_join_device_state_save_threads(void)
+{
+ assert(migration_has_device_state_support());
+
+ thread_pool_join(send_threads);
+
+ return send_threads_ret;
+}
diff --git a/migration/savevm.c b/migration/savevm.c
index 33c9200d1e78..a70f6ed006f2 100644
--- a/migration/savevm.c
+++ b/migration/savevm.c
@@ -1495,6 +1495,7 @@ int qemu_savevm_state_complete_precopy_iterable(QEMUFile *f, bool in_postcopy)
int64_t start_ts_each, end_ts_each;
SaveStateEntry *se;
int ret;
+ bool multifd_device_state = migration_has_device_state_support();
QTAILQ_FOREACH(se, &savevm_state.handlers, entry) {
if (!se->ops || (in_postcopy && se->ops->has_postcopy &&
@@ -1517,6 +1518,27 @@ int qemu_savevm_state_complete_precopy_iterable(QEMUFile *f, bool in_postcopy)
}
}
+ if (multifd_device_state) {
+ int thread_count = 0;
+
+ QTAILQ_FOREACH(se, &savevm_state.handlers, entry) {
+ SaveLiveCompletePrecopyThreadHandler hdlr;
+
+ if (!se->ops || (in_postcopy && se->ops->has_postcopy &&
+ se->ops->has_postcopy(se->opaque)) ||
+ !se->ops->save_live_complete_precopy_thread) {
+ continue;
+ }
+
+ hdlr = se->ops->save_live_complete_precopy_thread;
+ multifd_spawn_device_state_save_thread(hdlr,
+ se->idstr, se->instance_id,
+ se->opaque);
+ thread_count++;
+ }
+ multifd_launch_device_state_save_threads(thread_count);
+ }
+
QTAILQ_FOREACH(se, &savevm_state.handlers, entry) {
if (!se->ops ||
(in_postcopy && se->ops->has_postcopy &&
@@ -1541,13 +1563,21 @@ int qemu_savevm_state_complete_precopy_iterable(QEMUFile *f, bool in_postcopy)
save_section_footer(f, se);
if (ret < 0) {
qemu_file_set_error(f, ret);
- return -1;
+ goto ret_fail_abort_threads;
}
end_ts_each = qemu_clock_get_us(QEMU_CLOCK_REALTIME);
trace_vmstate_downtime_save("iterable", se->idstr, se->instance_id,
end_ts_each - start_ts_each);
}
+ if (multifd_device_state) {
+ ret = multifd_join_device_state_save_threads();
+ if (ret) {
+ qemu_file_set_error(f, ret);
+ return -1;
+ }
+ }
+
QTAILQ_FOREACH(se, &savevm_state.handlers, entry) {
if (!se->ops || (in_postcopy && se->ops->has_postcopy &&
se->ops->has_postcopy(se->opaque)) ||
@@ -1565,6 +1595,14 @@ int qemu_savevm_state_complete_precopy_iterable(QEMUFile *f, bool in_postcopy)
trace_vmstate_downtime_checkpoint("src-iterable-saved");
return 0;
+
+ret_fail_abort_threads:
+ if (multifd_device_state) {
+ multifd_abort_device_state_save_threads();
+ multifd_join_device_state_save_threads();
+ }
+
+ return -1;
}
int qemu_savevm_state_complete_precopy_non_iterable(QEMUFile *f,