From: "Maciej S. Szmigiero" <maciej.szmigiero@oracle.com>
This SaveVMHandler helps device provide its own asynchronous transmission
of the remaining data at the end of a precopy phase via multifd channels,
in parallel with the transfer done by save_live_complete_precopy handlers.
These threads are launched only when multifd device state transfer is
supported.
Management of these threads in done in the multifd migration code,
wrapping them in the generic thread pool.
Signed-off-by: Maciej S. Szmigiero <maciej.szmigiero@oracle.com>
---
include/migration/misc.h | 17 +++++++
include/migration/register.h | 19 +++++++
include/qemu/typedefs.h | 3 ++
migration/multifd-device-state.c | 85 ++++++++++++++++++++++++++++++++
migration/savevm.c | 35 ++++++++++++-
5 files changed, 158 insertions(+), 1 deletion(-)
diff --git a/include/migration/misc.h b/include/migration/misc.h
index 273ebfca6256..8fd36eba1da7 100644
--- a/include/migration/misc.h
+++ b/include/migration/misc.h
@@ -119,8 +119,25 @@ bool migrate_uri_parse(const char *uri, MigrationChannel **channel,
Error **errp);
/* migration/multifd-device-state.c */
+typedef struct SaveLiveCompletePrecopyThreadData {
+ SaveLiveCompletePrecopyThreadHandler hdlr;
+ char *idstr;
+ uint32_t instance_id;
+ void *handler_opaque;
+} SaveLiveCompletePrecopyThreadData;
+
bool multifd_queue_device_state(char *idstr, uint32_t instance_id,
char *data, size_t len);
bool multifd_device_state_supported(void);
+void
+multifd_spawn_device_state_save_thread(SaveLiveCompletePrecopyThreadHandler hdlr,
+ char *idstr, uint32_t instance_id,
+ void *opaque);
+
+bool multifd_device_state_save_thread_should_exit(void);
+
+void multifd_abort_device_state_save_threads(void);
+bool multifd_join_device_state_save_threads(void);
+
#endif
diff --git a/include/migration/register.h b/include/migration/register.h
index 58891aa54b76..c041ce32f2fc 100644
--- a/include/migration/register.h
+++ b/include/migration/register.h
@@ -105,6 +105,25 @@ typedef struct SaveVMHandlers {
*/
int (*save_live_complete_precopy)(QEMUFile *f, void *opaque);
+ /**
+ * @save_live_complete_precopy_thread (invoked in a separate thread)
+ *
+ * Called at the end of a precopy phase from a separate worker thread
+ * in configurations where multifd device state transfer is supported
+ * in order to perform asynchronous transmission of the remaining data in
+ * parallel with @save_live_complete_precopy handlers.
+ * When postcopy is enabled, devices that support postcopy will skip this
+ * step.
+ *
+ * @d: a #SaveLiveCompletePrecopyThreadData containing parameters that the
+ * handler may need, including this device section idstr and instance_id,
+ * and opaque data pointer passed to register_savevm_live().
+ * @errp: pointer to Error*, to store an error if it happens.
+ *
+ * Returns true to indicate success and false for errors.
+ */
+ SaveLiveCompletePrecopyThreadHandler save_live_complete_precopy_thread;
+
/* This runs both outside and inside the BQL. */
/**
diff --git a/include/qemu/typedefs.h b/include/qemu/typedefs.h
index fd23ff7771b1..42ed4e6be150 100644
--- a/include/qemu/typedefs.h
+++ b/include/qemu/typedefs.h
@@ -108,6 +108,7 @@ typedef struct QString QString;
typedef struct RAMBlock RAMBlock;
typedef struct Range Range;
typedef struct ReservedRegion ReservedRegion;
+typedef struct SaveLiveCompletePrecopyThreadData SaveLiveCompletePrecopyThreadData;
typedef struct SHPCDevice SHPCDevice;
typedef struct SSIBus SSIBus;
typedef struct TCGCPUOps TCGCPUOps;
@@ -133,5 +134,7 @@ typedef struct IRQState *qemu_irq;
typedef void (*qemu_irq_handler)(void *opaque, int n, int level);
typedef bool (*MigrationLoadThread)(void *opaque, bool *should_quit,
Error **errp);
+typedef bool (*SaveLiveCompletePrecopyThreadHandler)(SaveLiveCompletePrecopyThreadData *d,
+ Error **errp);
#endif /* QEMU_TYPEDEFS_H */
diff --git a/migration/multifd-device-state.c b/migration/multifd-device-state.c
index 5de3cf27d6e8..63f021fb8dad 100644
--- a/migration/multifd-device-state.c
+++ b/migration/multifd-device-state.c
@@ -8,7 +8,10 @@
*/
#include "qemu/osdep.h"
+#include "qapi/error.h"
#include "qemu/lockable.h"
+#include "block/thread-pool.h"
+#include "migration.h"
#include "migration/misc.h"
#include "multifd.h"
#include "options.h"
@@ -17,6 +20,9 @@ static struct {
QemuMutex queue_job_mutex;
MultiFDSendData *send_data;
+
+ ThreadPool *threads;
+ bool threads_abort;
} *multifd_send_device_state;
void multifd_device_state_send_setup(void)
@@ -27,10 +33,14 @@ void multifd_device_state_send_setup(void)
qemu_mutex_init(&multifd_send_device_state->queue_job_mutex);
multifd_send_device_state->send_data = multifd_send_data_alloc();
+
+ multifd_send_device_state->threads = thread_pool_new();
+ multifd_send_device_state->threads_abort = false;
}
void multifd_device_state_send_cleanup(void)
{
+ g_clear_pointer(&multifd_send_device_state->threads, thread_pool_free);
g_clear_pointer(&multifd_send_device_state->send_data,
multifd_send_data_free);
@@ -115,3 +125,78 @@ bool multifd_device_state_supported(void)
return migrate_multifd() && !migrate_mapped_ram() &&
migrate_multifd_compression() == MULTIFD_COMPRESSION_NONE;
}
+
+static void multifd_device_state_save_thread_data_free(void *opaque)
+{
+ SaveLiveCompletePrecopyThreadData *data = opaque;
+
+ g_clear_pointer(&data->idstr, g_free);
+ g_free(data);
+}
+
+static int multifd_device_state_save_thread(void *opaque)
+{
+ SaveLiveCompletePrecopyThreadData *data = opaque;
+ g_autoptr(Error) local_err = NULL;
+
+ if (!data->hdlr(data, &local_err)) {
+ MigrationState *s = migrate_get_current();
+
+ assert(local_err);
+
+ /*
+ * In case of multiple save threads failing which thread error
+ * return we end setting is purely arbitrary.
+ */
+ migrate_set_error(s, local_err);
+ }
+
+ return 0;
+}
+
+bool multifd_device_state_save_thread_should_exit(void)
+{
+ return qatomic_read(&multifd_send_device_state->threads_abort);
+}
+
+void
+multifd_spawn_device_state_save_thread(SaveLiveCompletePrecopyThreadHandler hdlr,
+ char *idstr, uint32_t instance_id,
+ void *opaque)
+{
+ SaveLiveCompletePrecopyThreadData *data;
+
+ assert(multifd_device_state_supported());
+ assert(multifd_send_device_state);
+
+ assert(!qatomic_read(&multifd_send_device_state->threads_abort));
+
+ data = g_new(SaveLiveCompletePrecopyThreadData, 1);
+ data->hdlr = hdlr;
+ data->idstr = g_strdup(idstr);
+ data->instance_id = instance_id;
+ data->handler_opaque = opaque;
+
+ thread_pool_submit_immediate(multifd_send_device_state->threads,
+ multifd_device_state_save_thread,
+ data,
+ multifd_device_state_save_thread_data_free);
+}
+
+void multifd_abort_device_state_save_threads(void)
+{
+ assert(multifd_device_state_supported());
+
+ qatomic_set(&multifd_send_device_state->threads_abort, true);
+}
+
+bool multifd_join_device_state_save_threads(void)
+{
+ MigrationState *s = migrate_get_current();
+
+ assert(multifd_device_state_supported());
+
+ thread_pool_wait(multifd_send_device_state->threads);
+
+ return !migrate_has_error(s);
+}
diff --git a/migration/savevm.c b/migration/savevm.c
index e412d05657a1..9a1e0ac807a0 100644
--- a/migration/savevm.c
+++ b/migration/savevm.c
@@ -37,6 +37,7 @@
#include "migration/register.h"
#include "migration/global_state.h"
#include "migration/channel-block.h"
+#include "multifd.h"
#include "ram.h"
#include "qemu-file.h"
#include "savevm.h"
@@ -1527,6 +1528,24 @@ int qemu_savevm_state_complete_precopy_iterable(QEMUFile *f, bool in_postcopy)
int64_t start_ts_each, end_ts_each;
SaveStateEntry *se;
int ret;
+ bool multifd_device_state = multifd_device_state_supported();
+
+ if (multifd_device_state) {
+ QTAILQ_FOREACH(se, &savevm_state.handlers, entry) {
+ SaveLiveCompletePrecopyThreadHandler hdlr;
+
+ if (!se->ops || (in_postcopy && se->ops->has_postcopy &&
+ se->ops->has_postcopy(se->opaque)) ||
+ !se->ops->save_live_complete_precopy_thread) {
+ continue;
+ }
+
+ hdlr = se->ops->save_live_complete_precopy_thread;
+ multifd_spawn_device_state_save_thread(hdlr,
+ se->idstr, se->instance_id,
+ se->opaque);
+ }
+ }
QTAILQ_FOREACH(se, &savevm_state.handlers, entry) {
if (!se->ops ||
@@ -1552,16 +1571,30 @@ int qemu_savevm_state_complete_precopy_iterable(QEMUFile *f, bool in_postcopy)
save_section_footer(f, se);
if (ret < 0) {
qemu_file_set_error(f, ret);
- return -1;
+ goto ret_fail_abort_threads;
}
end_ts_each = qemu_clock_get_us(QEMU_CLOCK_REALTIME);
trace_vmstate_downtime_save("iterable", se->idstr, se->instance_id,
end_ts_each - start_ts_each);
}
+ if (multifd_device_state &&
+ !multifd_join_device_state_save_threads()) {
+ qemu_file_set_error(f, -EINVAL);
+ return -1;
+ }
+
trace_vmstate_downtime_checkpoint("src-iterable-saved");
return 0;
+
+ret_fail_abort_threads:
+ if (multifd_device_state) {
+ multifd_abort_device_state_save_threads();
+ multifd_join_device_state_save_threads();
+ }
+
+ return -1;
}
int qemu_savevm_state_complete_precopy_non_iterable(QEMUFile *f,
On Wed, Feb 19, 2025 at 09:33:59PM +0100, Maciej S. Szmigiero wrote: > From: "Maciej S. Szmigiero" <maciej.szmigiero@oracle.com> > > This SaveVMHandler helps device provide its own asynchronous transmission > of the remaining data at the end of a precopy phase via multifd channels, > in parallel with the transfer done by save_live_complete_precopy handlers. > > These threads are launched only when multifd device state transfer is > supported. > > Management of these threads in done in the multifd migration code, > wrapping them in the generic thread pool. > > Signed-off-by: Maciej S. Szmigiero <maciej.szmigiero@oracle.com> > --- > include/migration/misc.h | 17 +++++++ > include/migration/register.h | 19 +++++++ > include/qemu/typedefs.h | 3 ++ > migration/multifd-device-state.c | 85 ++++++++++++++++++++++++++++++++ > migration/savevm.c | 35 ++++++++++++- > 5 files changed, 158 insertions(+), 1 deletion(-) > > diff --git a/include/migration/misc.h b/include/migration/misc.h > index 273ebfca6256..8fd36eba1da7 100644 > --- a/include/migration/misc.h > +++ b/include/migration/misc.h > @@ -119,8 +119,25 @@ bool migrate_uri_parse(const char *uri, MigrationChannel **channel, > Error **errp); > > /* migration/multifd-device-state.c */ > +typedef struct SaveLiveCompletePrecopyThreadData { > + SaveLiveCompletePrecopyThreadHandler hdlr; > + char *idstr; > + uint32_t instance_id; > + void *handler_opaque; > +} SaveLiveCompletePrecopyThreadData; > + > bool multifd_queue_device_state(char *idstr, uint32_t instance_id, > char *data, size_t len); > bool multifd_device_state_supported(void); > > +void > +multifd_spawn_device_state_save_thread(SaveLiveCompletePrecopyThreadHandler hdlr, > + char *idstr, uint32_t instance_id, > + void *opaque); > + > +bool multifd_device_state_save_thread_should_exit(void); > + > +void multifd_abort_device_state_save_threads(void); > +bool multifd_join_device_state_save_threads(void); > + > #endif > diff --git a/include/migration/register.h b/include/migration/register.h > index 58891aa54b76..c041ce32f2fc 100644 > --- a/include/migration/register.h > +++ b/include/migration/register.h > @@ -105,6 +105,25 @@ typedef struct SaveVMHandlers { > */ > int (*save_live_complete_precopy)(QEMUFile *f, void *opaque); > > + /** > + * @save_live_complete_precopy_thread (invoked in a separate thread) > + * > + * Called at the end of a precopy phase from a separate worker thread > + * in configurations where multifd device state transfer is supported > + * in order to perform asynchronous transmission of the remaining data in > + * parallel with @save_live_complete_precopy handlers. > + * When postcopy is enabled, devices that support postcopy will skip this > + * step. > + * > + * @d: a #SaveLiveCompletePrecopyThreadData containing parameters that the > + * handler may need, including this device section idstr and instance_id, > + * and opaque data pointer passed to register_savevm_live(). > + * @errp: pointer to Error*, to store an error if it happens. > + * > + * Returns true to indicate success and false for errors. > + */ > + SaveLiveCompletePrecopyThreadHandler save_live_complete_precopy_thread; > + > /* This runs both outside and inside the BQL. */ > > /** > diff --git a/include/qemu/typedefs.h b/include/qemu/typedefs.h > index fd23ff7771b1..42ed4e6be150 100644 > --- a/include/qemu/typedefs.h > +++ b/include/qemu/typedefs.h > @@ -108,6 +108,7 @@ typedef struct QString QString; > typedef struct RAMBlock RAMBlock; > typedef struct Range Range; > typedef struct ReservedRegion ReservedRegion; > +typedef struct SaveLiveCompletePrecopyThreadData SaveLiveCompletePrecopyThreadData; > typedef struct SHPCDevice SHPCDevice; > typedef struct SSIBus SSIBus; > typedef struct TCGCPUOps TCGCPUOps; > @@ -133,5 +134,7 @@ typedef struct IRQState *qemu_irq; > typedef void (*qemu_irq_handler)(void *opaque, int n, int level); > typedef bool (*MigrationLoadThread)(void *opaque, bool *should_quit, > Error **errp); > +typedef bool (*SaveLiveCompletePrecopyThreadHandler)(SaveLiveCompletePrecopyThreadData *d, > + Error **errp); > > #endif /* QEMU_TYPEDEFS_H */ > diff --git a/migration/multifd-device-state.c b/migration/multifd-device-state.c > index 5de3cf27d6e8..63f021fb8dad 100644 > --- a/migration/multifd-device-state.c > +++ b/migration/multifd-device-state.c > @@ -8,7 +8,10 @@ > */ > > #include "qemu/osdep.h" > +#include "qapi/error.h" > #include "qemu/lockable.h" > +#include "block/thread-pool.h" > +#include "migration.h" > #include "migration/misc.h" > #include "multifd.h" > #include "options.h" > @@ -17,6 +20,9 @@ static struct { > QemuMutex queue_job_mutex; > > MultiFDSendData *send_data; > + > + ThreadPool *threads; > + bool threads_abort; > } *multifd_send_device_state; > > void multifd_device_state_send_setup(void) > @@ -27,10 +33,14 @@ void multifd_device_state_send_setup(void) > qemu_mutex_init(&multifd_send_device_state->queue_job_mutex); > > multifd_send_device_state->send_data = multifd_send_data_alloc(); > + > + multifd_send_device_state->threads = thread_pool_new(); > + multifd_send_device_state->threads_abort = false; > } > > void multifd_device_state_send_cleanup(void) > { > + g_clear_pointer(&multifd_send_device_state->threads, thread_pool_free); > g_clear_pointer(&multifd_send_device_state->send_data, > multifd_send_data_free); > > @@ -115,3 +125,78 @@ bool multifd_device_state_supported(void) > return migrate_multifd() && !migrate_mapped_ram() && > migrate_multifd_compression() == MULTIFD_COMPRESSION_NONE; > } > + > +static void multifd_device_state_save_thread_data_free(void *opaque) > +{ > + SaveLiveCompletePrecopyThreadData *data = opaque; > + > + g_clear_pointer(&data->idstr, g_free); > + g_free(data); > +} > + > +static int multifd_device_state_save_thread(void *opaque) > +{ > + SaveLiveCompletePrecopyThreadData *data = opaque; > + g_autoptr(Error) local_err = NULL; > + > + if (!data->hdlr(data, &local_err)) { > + MigrationState *s = migrate_get_current(); > + > + assert(local_err); > + > + /* > + * In case of multiple save threads failing which thread error > + * return we end setting is purely arbitrary. > + */ > + migrate_set_error(s, local_err); Where did you kick off all the threads when one hit error? I wonder if migrate_set_error() should just set quit flag for everything, but for this series it might be easier to use multifd_abort_device_state_save_threads(). Other than that, looks good to me, thanks. > + } > + > + return 0; > +} > + > +bool multifd_device_state_save_thread_should_exit(void) > +{ > + return qatomic_read(&multifd_send_device_state->threads_abort); > +} > + > +void > +multifd_spawn_device_state_save_thread(SaveLiveCompletePrecopyThreadHandler hdlr, > + char *idstr, uint32_t instance_id, > + void *opaque) > +{ > + SaveLiveCompletePrecopyThreadData *data; > + > + assert(multifd_device_state_supported()); > + assert(multifd_send_device_state); > + > + assert(!qatomic_read(&multifd_send_device_state->threads_abort)); > + > + data = g_new(SaveLiveCompletePrecopyThreadData, 1); > + data->hdlr = hdlr; > + data->idstr = g_strdup(idstr); > + data->instance_id = instance_id; > + data->handler_opaque = opaque; > + > + thread_pool_submit_immediate(multifd_send_device_state->threads, > + multifd_device_state_save_thread, > + data, > + multifd_device_state_save_thread_data_free); > +} > + > +void multifd_abort_device_state_save_threads(void) > +{ > + assert(multifd_device_state_supported()); > + > + qatomic_set(&multifd_send_device_state->threads_abort, true); > +} > + > +bool multifd_join_device_state_save_threads(void) > +{ > + MigrationState *s = migrate_get_current(); > + > + assert(multifd_device_state_supported()); > + > + thread_pool_wait(multifd_send_device_state->threads); > + > + return !migrate_has_error(s); > +} > diff --git a/migration/savevm.c b/migration/savevm.c > index e412d05657a1..9a1e0ac807a0 100644 > --- a/migration/savevm.c > +++ b/migration/savevm.c > @@ -37,6 +37,7 @@ > #include "migration/register.h" > #include "migration/global_state.h" > #include "migration/channel-block.h" > +#include "multifd.h" > #include "ram.h" > #include "qemu-file.h" > #include "savevm.h" > @@ -1527,6 +1528,24 @@ int qemu_savevm_state_complete_precopy_iterable(QEMUFile *f, bool in_postcopy) > int64_t start_ts_each, end_ts_each; > SaveStateEntry *se; > int ret; > + bool multifd_device_state = multifd_device_state_supported(); > + > + if (multifd_device_state) { > + QTAILQ_FOREACH(se, &savevm_state.handlers, entry) { > + SaveLiveCompletePrecopyThreadHandler hdlr; > + > + if (!se->ops || (in_postcopy && se->ops->has_postcopy && > + se->ops->has_postcopy(se->opaque)) || > + !se->ops->save_live_complete_precopy_thread) { > + continue; > + } > + > + hdlr = se->ops->save_live_complete_precopy_thread; > + multifd_spawn_device_state_save_thread(hdlr, > + se->idstr, se->instance_id, > + se->opaque); > + } > + } > > QTAILQ_FOREACH(se, &savevm_state.handlers, entry) { > if (!se->ops || > @@ -1552,16 +1571,30 @@ int qemu_savevm_state_complete_precopy_iterable(QEMUFile *f, bool in_postcopy) > save_section_footer(f, se); > if (ret < 0) { > qemu_file_set_error(f, ret); > - return -1; > + goto ret_fail_abort_threads; > } > end_ts_each = qemu_clock_get_us(QEMU_CLOCK_REALTIME); > trace_vmstate_downtime_save("iterable", se->idstr, se->instance_id, > end_ts_each - start_ts_each); > } > > + if (multifd_device_state && > + !multifd_join_device_state_save_threads()) { > + qemu_file_set_error(f, -EINVAL); > + return -1; > + } > + > trace_vmstate_downtime_checkpoint("src-iterable-saved"); > > return 0; > + > +ret_fail_abort_threads: > + if (multifd_device_state) { > + multifd_abort_device_state_save_threads(); > + multifd_join_device_state_save_threads(); > + } > + > + return -1; > } > > int qemu_savevm_state_complete_precopy_non_iterable(QEMUFile *f, > -- Peter Xu
On 26.02.2025 17:43, Peter Xu wrote: > On Wed, Feb 19, 2025 at 09:33:59PM +0100, Maciej S. Szmigiero wrote: >> From: "Maciej S. Szmigiero" <maciej.szmigiero@oracle.com> >> >> This SaveVMHandler helps device provide its own asynchronous transmission >> of the remaining data at the end of a precopy phase via multifd channels, >> in parallel with the transfer done by save_live_complete_precopy handlers. >> >> These threads are launched only when multifd device state transfer is >> supported. >> >> Management of these threads in done in the multifd migration code, >> wrapping them in the generic thread pool. >> >> Signed-off-by: Maciej S. Szmigiero <maciej.szmigiero@oracle.com> >> --- >> include/migration/misc.h | 17 +++++++ >> include/migration/register.h | 19 +++++++ >> include/qemu/typedefs.h | 3 ++ >> migration/multifd-device-state.c | 85 ++++++++++++++++++++++++++++++++ >> migration/savevm.c | 35 ++++++++++++- >> 5 files changed, 158 insertions(+), 1 deletion(-) >> >> diff --git a/include/migration/misc.h b/include/migration/misc.h >> index 273ebfca6256..8fd36eba1da7 100644 >> --- a/include/migration/misc.h >> +++ b/include/migration/misc.h >> @@ -119,8 +119,25 @@ bool migrate_uri_parse(const char *uri, MigrationChannel **channel, >> Error **errp); >> >> /* migration/multifd-device-state.c */ >> +typedef struct SaveLiveCompletePrecopyThreadData { >> + SaveLiveCompletePrecopyThreadHandler hdlr; >> + char *idstr; >> + uint32_t instance_id; >> + void *handler_opaque; >> +} SaveLiveCompletePrecopyThreadData; >> + >> bool multifd_queue_device_state(char *idstr, uint32_t instance_id, >> char *data, size_t len); >> bool multifd_device_state_supported(void); >> >> +void >> +multifd_spawn_device_state_save_thread(SaveLiveCompletePrecopyThreadHandler hdlr, >> + char *idstr, uint32_t instance_id, >> + void *opaque); >> + >> +bool multifd_device_state_save_thread_should_exit(void); >> + >> +void multifd_abort_device_state_save_threads(void); >> +bool multifd_join_device_state_save_threads(void); >> + >> #endif >> diff --git a/include/migration/register.h b/include/migration/register.h >> index 58891aa54b76..c041ce32f2fc 100644 >> --- a/include/migration/register.h >> +++ b/include/migration/register.h >> @@ -105,6 +105,25 @@ typedef struct SaveVMHandlers { >> */ >> int (*save_live_complete_precopy)(QEMUFile *f, void *opaque); >> >> + /** >> + * @save_live_complete_precopy_thread (invoked in a separate thread) >> + * >> + * Called at the end of a precopy phase from a separate worker thread >> + * in configurations where multifd device state transfer is supported >> + * in order to perform asynchronous transmission of the remaining data in >> + * parallel with @save_live_complete_precopy handlers. >> + * When postcopy is enabled, devices that support postcopy will skip this >> + * step. >> + * >> + * @d: a #SaveLiveCompletePrecopyThreadData containing parameters that the >> + * handler may need, including this device section idstr and instance_id, >> + * and opaque data pointer passed to register_savevm_live(). >> + * @errp: pointer to Error*, to store an error if it happens. >> + * >> + * Returns true to indicate success and false for errors. >> + */ >> + SaveLiveCompletePrecopyThreadHandler save_live_complete_precopy_thread; >> + >> /* This runs both outside and inside the BQL. */ >> >> /** >> diff --git a/include/qemu/typedefs.h b/include/qemu/typedefs.h >> index fd23ff7771b1..42ed4e6be150 100644 >> --- a/include/qemu/typedefs.h >> +++ b/include/qemu/typedefs.h >> @@ -108,6 +108,7 @@ typedef struct QString QString; >> typedef struct RAMBlock RAMBlock; >> typedef struct Range Range; >> typedef struct ReservedRegion ReservedRegion; >> +typedef struct SaveLiveCompletePrecopyThreadData SaveLiveCompletePrecopyThreadData; >> typedef struct SHPCDevice SHPCDevice; >> typedef struct SSIBus SSIBus; >> typedef struct TCGCPUOps TCGCPUOps; >> @@ -133,5 +134,7 @@ typedef struct IRQState *qemu_irq; >> typedef void (*qemu_irq_handler)(void *opaque, int n, int level); >> typedef bool (*MigrationLoadThread)(void *opaque, bool *should_quit, >> Error **errp); >> +typedef bool (*SaveLiveCompletePrecopyThreadHandler)(SaveLiveCompletePrecopyThreadData *d, >> + Error **errp); >> >> #endif /* QEMU_TYPEDEFS_H */ >> diff --git a/migration/multifd-device-state.c b/migration/multifd-device-state.c >> index 5de3cf27d6e8..63f021fb8dad 100644 >> --- a/migration/multifd-device-state.c >> +++ b/migration/multifd-device-state.c >> @@ -8,7 +8,10 @@ >> */ >> >> #include "qemu/osdep.h" >> +#include "qapi/error.h" >> #include "qemu/lockable.h" >> +#include "block/thread-pool.h" >> +#include "migration.h" >> #include "migration/misc.h" >> #include "multifd.h" >> #include "options.h" >> @@ -17,6 +20,9 @@ static struct { >> QemuMutex queue_job_mutex; >> >> MultiFDSendData *send_data; >> + >> + ThreadPool *threads; >> + bool threads_abort; >> } *multifd_send_device_state; >> >> void multifd_device_state_send_setup(void) >> @@ -27,10 +33,14 @@ void multifd_device_state_send_setup(void) >> qemu_mutex_init(&multifd_send_device_state->queue_job_mutex); >> >> multifd_send_device_state->send_data = multifd_send_data_alloc(); >> + >> + multifd_send_device_state->threads = thread_pool_new(); >> + multifd_send_device_state->threads_abort = false; >> } >> >> void multifd_device_state_send_cleanup(void) >> { >> + g_clear_pointer(&multifd_send_device_state->threads, thread_pool_free); >> g_clear_pointer(&multifd_send_device_state->send_data, >> multifd_send_data_free); >> >> @@ -115,3 +125,78 @@ bool multifd_device_state_supported(void) >> return migrate_multifd() && !migrate_mapped_ram() && >> migrate_multifd_compression() == MULTIFD_COMPRESSION_NONE; >> } >> + >> +static void multifd_device_state_save_thread_data_free(void *opaque) >> +{ >> + SaveLiveCompletePrecopyThreadData *data = opaque; >> + >> + g_clear_pointer(&data->idstr, g_free); >> + g_free(data); >> +} >> + >> +static int multifd_device_state_save_thread(void *opaque) >> +{ >> + SaveLiveCompletePrecopyThreadData *data = opaque; >> + g_autoptr(Error) local_err = NULL; >> + >> + if (!data->hdlr(data, &local_err)) { >> + MigrationState *s = migrate_get_current(); >> + >> + assert(local_err); >> + >> + /* >> + * In case of multiple save threads failing which thread error >> + * return we end setting is purely arbitrary. >> + */ >> + migrate_set_error(s, local_err); > > Where did you kick off all the threads when one hit error? I wonder if > migrate_set_error() should just set quit flag for everything, but for this > series it might be easier to use multifd_abort_device_state_save_threads(). I've now added call to multifd_abort_device_state_save_threads() if a migration error is already set to avoid needlessly waiting for the remaining threads to do all of their work. > Other than that, looks good to me, thanks. > Thanks, Maciej
On Tue, Mar 04, 2025 at 10:50:29PM +0100, Maciej S. Szmigiero wrote: > On 26.02.2025 17:43, Peter Xu wrote: > > On Wed, Feb 19, 2025 at 09:33:59PM +0100, Maciej S. Szmigiero wrote: > > > From: "Maciej S. Szmigiero" <maciej.szmigiero@oracle.com> > > > > > > This SaveVMHandler helps device provide its own asynchronous transmission > > > of the remaining data at the end of a precopy phase via multifd channels, > > > in parallel with the transfer done by save_live_complete_precopy handlers. > > > > > > These threads are launched only when multifd device state transfer is > > > supported. > > > > > > Management of these threads in done in the multifd migration code, > > > wrapping them in the generic thread pool. > > > > > > Signed-off-by: Maciej S. Szmigiero <maciej.szmigiero@oracle.com> > > > --- > > > include/migration/misc.h | 17 +++++++ > > > include/migration/register.h | 19 +++++++ > > > include/qemu/typedefs.h | 3 ++ > > > migration/multifd-device-state.c | 85 ++++++++++++++++++++++++++++++++ > > > migration/savevm.c | 35 ++++++++++++- > > > 5 files changed, 158 insertions(+), 1 deletion(-) > > > > > > diff --git a/include/migration/misc.h b/include/migration/misc.h > > > index 273ebfca6256..8fd36eba1da7 100644 > > > --- a/include/migration/misc.h > > > +++ b/include/migration/misc.h > > > @@ -119,8 +119,25 @@ bool migrate_uri_parse(const char *uri, MigrationChannel **channel, > > > Error **errp); > > > /* migration/multifd-device-state.c */ > > > +typedef struct SaveLiveCompletePrecopyThreadData { > > > + SaveLiveCompletePrecopyThreadHandler hdlr; > > > + char *idstr; > > > + uint32_t instance_id; > > > + void *handler_opaque; > > > +} SaveLiveCompletePrecopyThreadData; > > > + > > > bool multifd_queue_device_state(char *idstr, uint32_t instance_id, > > > char *data, size_t len); > > > bool multifd_device_state_supported(void); > > > +void > > > +multifd_spawn_device_state_save_thread(SaveLiveCompletePrecopyThreadHandler hdlr, > > > + char *idstr, uint32_t instance_id, > > > + void *opaque); > > > + > > > +bool multifd_device_state_save_thread_should_exit(void); > > > + > > > +void multifd_abort_device_state_save_threads(void); > > > +bool multifd_join_device_state_save_threads(void); > > > + > > > #endif > > > diff --git a/include/migration/register.h b/include/migration/register.h > > > index 58891aa54b76..c041ce32f2fc 100644 > > > --- a/include/migration/register.h > > > +++ b/include/migration/register.h > > > @@ -105,6 +105,25 @@ typedef struct SaveVMHandlers { > > > */ > > > int (*save_live_complete_precopy)(QEMUFile *f, void *opaque); > > > + /** > > > + * @save_live_complete_precopy_thread (invoked in a separate thread) > > > + * > > > + * Called at the end of a precopy phase from a separate worker thread > > > + * in configurations where multifd device state transfer is supported > > > + * in order to perform asynchronous transmission of the remaining data in > > > + * parallel with @save_live_complete_precopy handlers. > > > + * When postcopy is enabled, devices that support postcopy will skip this > > > + * step. > > > + * > > > + * @d: a #SaveLiveCompletePrecopyThreadData containing parameters that the > > > + * handler may need, including this device section idstr and instance_id, > > > + * and opaque data pointer passed to register_savevm_live(). > > > + * @errp: pointer to Error*, to store an error if it happens. > > > + * > > > + * Returns true to indicate success and false for errors. > > > + */ > > > + SaveLiveCompletePrecopyThreadHandler save_live_complete_precopy_thread; > > > + > > > /* This runs both outside and inside the BQL. */ > > > /** > > > diff --git a/include/qemu/typedefs.h b/include/qemu/typedefs.h > > > index fd23ff7771b1..42ed4e6be150 100644 > > > --- a/include/qemu/typedefs.h > > > +++ b/include/qemu/typedefs.h > > > @@ -108,6 +108,7 @@ typedef struct QString QString; > > > typedef struct RAMBlock RAMBlock; > > > typedef struct Range Range; > > > typedef struct ReservedRegion ReservedRegion; > > > +typedef struct SaveLiveCompletePrecopyThreadData SaveLiveCompletePrecopyThreadData; > > > typedef struct SHPCDevice SHPCDevice; > > > typedef struct SSIBus SSIBus; > > > typedef struct TCGCPUOps TCGCPUOps; > > > @@ -133,5 +134,7 @@ typedef struct IRQState *qemu_irq; > > > typedef void (*qemu_irq_handler)(void *opaque, int n, int level); > > > typedef bool (*MigrationLoadThread)(void *opaque, bool *should_quit, > > > Error **errp); > > > +typedef bool (*SaveLiveCompletePrecopyThreadHandler)(SaveLiveCompletePrecopyThreadData *d, > > > + Error **errp); > > > #endif /* QEMU_TYPEDEFS_H */ > > > diff --git a/migration/multifd-device-state.c b/migration/multifd-device-state.c > > > index 5de3cf27d6e8..63f021fb8dad 100644 > > > --- a/migration/multifd-device-state.c > > > +++ b/migration/multifd-device-state.c > > > @@ -8,7 +8,10 @@ > > > */ > > > #include "qemu/osdep.h" > > > +#include "qapi/error.h" > > > #include "qemu/lockable.h" > > > +#include "block/thread-pool.h" > > > +#include "migration.h" > > > #include "migration/misc.h" > > > #include "multifd.h" > > > #include "options.h" > > > @@ -17,6 +20,9 @@ static struct { > > > QemuMutex queue_job_mutex; > > > MultiFDSendData *send_data; > > > + > > > + ThreadPool *threads; > > > + bool threads_abort; > > > } *multifd_send_device_state; > > > void multifd_device_state_send_setup(void) > > > @@ -27,10 +33,14 @@ void multifd_device_state_send_setup(void) > > > qemu_mutex_init(&multifd_send_device_state->queue_job_mutex); > > > multifd_send_device_state->send_data = multifd_send_data_alloc(); > > > + > > > + multifd_send_device_state->threads = thread_pool_new(); > > > + multifd_send_device_state->threads_abort = false; > > > } > > > void multifd_device_state_send_cleanup(void) > > > { > > > + g_clear_pointer(&multifd_send_device_state->threads, thread_pool_free); > > > g_clear_pointer(&multifd_send_device_state->send_data, > > > multifd_send_data_free); > > > @@ -115,3 +125,78 @@ bool multifd_device_state_supported(void) > > > return migrate_multifd() && !migrate_mapped_ram() && > > > migrate_multifd_compression() == MULTIFD_COMPRESSION_NONE; > > > } > > > + > > > +static void multifd_device_state_save_thread_data_free(void *opaque) > > > +{ > > > + SaveLiveCompletePrecopyThreadData *data = opaque; > > > + > > > + g_clear_pointer(&data->idstr, g_free); > > > + g_free(data); > > > +} > > > + > > > +static int multifd_device_state_save_thread(void *opaque) > > > +{ > > > + SaveLiveCompletePrecopyThreadData *data = opaque; > > > + g_autoptr(Error) local_err = NULL; > > > + > > > + if (!data->hdlr(data, &local_err)) { > > > + MigrationState *s = migrate_get_current(); > > > + > > > + assert(local_err); > > > + > > > + /* > > > + * In case of multiple save threads failing which thread error > > > + * return we end setting is purely arbitrary. > > > + */ > > > + migrate_set_error(s, local_err); > > > > Where did you kick off all the threads when one hit error? I wonder if > > migrate_set_error() should just set quit flag for everything, but for this > > series it might be easier to use multifd_abort_device_state_save_threads(). > > I've now added call to multifd_abort_device_state_save_threads() if a migration > error is already set to avoid needlessly waiting for the remaining threads to > do all of their work. With that, feel free to take: Reviewed-by: Peter Xu <peterx@redhat.com> -- Peter Xu
© 2016 - 2025 Red Hat, Inc.