[PATCH v3 08/24] migration: Add thread pool of optional load threads

Maciej S. Szmigiero posted 24 patches 1 year, 2 months ago
There is a newer version of this series
[PATCH v3 08/24] migration: Add thread pool of optional load threads
Posted by Maciej S. Szmigiero 1 year, 2 months ago
From: "Maciej S. Szmigiero" <maciej.szmigiero@oracle.com>

Some drivers might want to make use of auxiliary helper threads during VM
state loading, for example to make sure that their blocking (sync) I/O
operations don't block the rest of the migration process.

Add a migration core managed thread pool to facilitate this use case.

The migration core will wait for these threads to finish before
(re)starting the VM at destination.

Signed-off-by: Maciej S. Szmigiero <maciej.szmigiero@oracle.com>
---
 include/migration/misc.h |  3 ++
 include/qemu/typedefs.h  |  1 +
 migration/savevm.c       | 77 ++++++++++++++++++++++++++++++++++++++++
 3 files changed, 81 insertions(+)

diff --git a/include/migration/misc.h b/include/migration/misc.h
index 804eb23c0607..c92ca018ab3b 100644
--- a/include/migration/misc.h
+++ b/include/migration/misc.h
@@ -45,9 +45,12 @@ bool migrate_ram_is_ignored(RAMBlock *block);
 /* migration/block.c */
 
 AnnounceParameters *migrate_announce_params(void);
+
 /* migration/savevm.c */
 
 void dump_vmstate_json_to_file(FILE *out_fp);
+void qemu_loadvm_start_load_thread(MigrationLoadThread function,
+                                   void *opaque);
 
 /* migration/migration.c */
 void migration_object_init(void);
diff --git a/include/qemu/typedefs.h b/include/qemu/typedefs.h
index 3d84efcac47a..8c8ea5c2840d 100644
--- a/include/qemu/typedefs.h
+++ b/include/qemu/typedefs.h
@@ -131,5 +131,6 @@ typedef struct IRQState *qemu_irq;
  * Function types
  */
 typedef void (*qemu_irq_handler)(void *opaque, int n, int level);
+typedef int (*MigrationLoadThread)(bool *abort_flag, void *opaque);
 
 #endif /* QEMU_TYPEDEFS_H */
diff --git a/migration/savevm.c b/migration/savevm.c
index 1f58a2fa54ae..6ea9054c4083 100644
--- a/migration/savevm.c
+++ b/migration/savevm.c
@@ -54,6 +54,7 @@
 #include "qemu/job.h"
 #include "qemu/main-loop.h"
 #include "block/snapshot.h"
+#include "block/thread-pool.h"
 #include "qemu/cutils.h"
 #include "io/channel-buffer.h"
 #include "io/channel-file.h"
@@ -71,6 +72,10 @@
 
 const unsigned int postcopy_ram_discard_version;
 
+static ThreadPool *load_threads;
+static int load_threads_ret;
+static bool load_threads_abort;
+
 /* Subcommands for QEMU_VM_COMMAND */
 enum qemu_vm_cmd {
     MIG_CMD_INVALID = 0,   /* Must be 0 */
@@ -2788,6 +2793,12 @@ static int qemu_loadvm_state_setup(QEMUFile *f, Error **errp)
     int ret;
 
     trace_loadvm_state_setup();
+
+    assert(!load_threads);
+    load_threads = thread_pool_new();
+    load_threads_ret = 0;
+    load_threads_abort = false;
+
     QTAILQ_FOREACH(se, &savevm_state.handlers, entry) {
         if (!se->ops || !se->ops->load_setup) {
             continue;
@@ -2806,19 +2817,72 @@ static int qemu_loadvm_state_setup(QEMUFile *f, Error **errp)
             return ret;
         }
     }
+
+    return 0;
+}
+
+struct LoadThreadData {
+    MigrationLoadThread function;
+    void *opaque;
+};
+
+static int qemu_loadvm_load_thread(void *thread_opaque)
+{
+    struct LoadThreadData *data = thread_opaque;
+    int ret;
+
+    ret = data->function(&load_threads_abort, data->opaque);
+    if (ret && !qatomic_read(&load_threads_ret)) {
+        /*
+         * Racy with the above read but that's okay - which thread error
+         * return we report is purely arbitrary anyway.
+         */
+        qatomic_set(&load_threads_ret, ret);
+    }
+
     return 0;
 }
 
+void qemu_loadvm_start_load_thread(MigrationLoadThread function,
+                                   void *opaque)
+{
+    struct LoadThreadData *data;
+
+    /* We only set it from this thread so it's okay to read it directly */
+    assert(!load_threads_abort);
+
+    data = g_new(struct LoadThreadData, 1);
+    data->function = function;
+    data->opaque = opaque;
+
+    thread_pool_submit(load_threads, qemu_loadvm_load_thread,
+                       data, g_free);
+    thread_pool_adjust_max_threads_to_work(load_threads);
+}
+
 void qemu_loadvm_state_cleanup(void)
 {
     SaveStateEntry *se;
 
     trace_loadvm_state_cleanup();
+
     QTAILQ_FOREACH(se, &savevm_state.handlers, entry) {
         if (se->ops && se->ops->load_cleanup) {
             se->ops->load_cleanup(se->opaque);
         }
     }
+
+    /*
+     * We might be called even without earlier qemu_loadvm_state_setup()
+     * call if qemu_loadvm_state() fails very early.
+     */
+    if (load_threads) {
+        qatomic_set(&load_threads_abort, true);
+        bql_unlock(); /* Load threads might be waiting for BQL */
+        thread_pool_wait(load_threads);
+        bql_lock();
+        g_clear_pointer(&load_threads, thread_pool_free);
+    }
 }
 
 /* Return true if we should continue the migration, or false. */
@@ -3007,6 +3071,19 @@ int qemu_loadvm_state(QEMUFile *f)
         return ret;
     }
 
+    if (ret == 0) {
+        bql_unlock(); /* Let load threads do work requiring BQL */
+        thread_pool_wait(load_threads);
+        bql_lock();
+
+        ret = load_threads_ret;
+    }
+    /*
+     * Set this flag unconditionally so we'll catch further attempts to
+     * start additional threads via an appropriate assert()
+     */
+    qatomic_set(&load_threads_abort, true);
+
     if (ret == 0) {
         ret = qemu_file_get_error(f);
     }
Re: [PATCH v3 08/24] migration: Add thread pool of optional load threads
Posted by Avihai Horon 1 year, 2 months ago
On 17/11/2024 21:20, Maciej S. Szmigiero wrote:
> External email: Use caution opening links or attachments
>
>
> From: "Maciej S. Szmigiero" <maciej.szmigiero@oracle.com>
>
> Some drivers might want to make use of auxiliary helper threads during VM
> state loading, for example to make sure that their blocking (sync) I/O
> operations don't block the rest of the migration process.
>
> Add a migration core managed thread pool to facilitate this use case.
>
> The migration core will wait for these threads to finish before
> (re)starting the VM at destination.
>
> Signed-off-by: Maciej S. Szmigiero <maciej.szmigiero@oracle.com>
> ---
>   include/migration/misc.h |  3 ++
>   include/qemu/typedefs.h  |  1 +
>   migration/savevm.c       | 77 ++++++++++++++++++++++++++++++++++++++++
>   3 files changed, 81 insertions(+)
>
> diff --git a/include/migration/misc.h b/include/migration/misc.h
> index 804eb23c0607..c92ca018ab3b 100644
> --- a/include/migration/misc.h
> +++ b/include/migration/misc.h
> @@ -45,9 +45,12 @@ bool migrate_ram_is_ignored(RAMBlock *block);
>   /* migration/block.c */
>
>   AnnounceParameters *migrate_announce_params(void);
> +
>   /* migration/savevm.c */
>
>   void dump_vmstate_json_to_file(FILE *out_fp);
> +void qemu_loadvm_start_load_thread(MigrationLoadThread function,
> +                                   void *opaque);
>
>   /* migration/migration.c */
>   void migration_object_init(void);
> diff --git a/include/qemu/typedefs.h b/include/qemu/typedefs.h
> index 3d84efcac47a..8c8ea5c2840d 100644
> --- a/include/qemu/typedefs.h
> +++ b/include/qemu/typedefs.h
> @@ -131,5 +131,6 @@ typedef struct IRQState *qemu_irq;
>    * Function types
>    */
>   typedef void (*qemu_irq_handler)(void *opaque, int n, int level);
> +typedef int (*MigrationLoadThread)(bool *abort_flag, void *opaque);
>
>   #endif /* QEMU_TYPEDEFS_H */
> diff --git a/migration/savevm.c b/migration/savevm.c
> index 1f58a2fa54ae..6ea9054c4083 100644
> --- a/migration/savevm.c
> +++ b/migration/savevm.c
> @@ -54,6 +54,7 @@
>   #include "qemu/job.h"
>   #include "qemu/main-loop.h"
>   #include "block/snapshot.h"
> +#include "block/thread-pool.h"
>   #include "qemu/cutils.h"
>   #include "io/channel-buffer.h"
>   #include "io/channel-file.h"
> @@ -71,6 +72,10 @@
>
>   const unsigned int postcopy_ram_discard_version;
>
> +static ThreadPool *load_threads;
> +static int load_threads_ret;
> +static bool load_threads_abort;
> +
>   /* Subcommands for QEMU_VM_COMMAND */
>   enum qemu_vm_cmd {
>       MIG_CMD_INVALID = 0,   /* Must be 0 */
> @@ -2788,6 +2793,12 @@ static int qemu_loadvm_state_setup(QEMUFile *f, Error **errp)
>       int ret;
>
>       trace_loadvm_state_setup();
> +
> +    assert(!load_threads);
> +    load_threads = thread_pool_new();
> +    load_threads_ret = 0;
> +    load_threads_abort = false;
> +
>       QTAILQ_FOREACH(se, &savevm_state.handlers, entry) {
>           if (!se->ops || !se->ops->load_setup) {
>               continue;
> @@ -2806,19 +2817,72 @@ static int qemu_loadvm_state_setup(QEMUFile *f, Error **errp)
>               return ret;
>           }
>       }
> +
> +    return 0;
> +}
> +
> +struct LoadThreadData {
> +    MigrationLoadThread function;
> +    void *opaque;
> +};
> +
> +static int qemu_loadvm_load_thread(void *thread_opaque)
> +{
> +    struct LoadThreadData *data = thread_opaque;
> +    int ret;
> +
> +    ret = data->function(&load_threads_abort, data->opaque);
> +    if (ret && !qatomic_read(&load_threads_ret)) {
> +        /*
> +         * Racy with the above read but that's okay - which thread error
> +         * return we report is purely arbitrary anyway.
> +         */
> +        qatomic_set(&load_threads_ret, ret);
> +    }

Can we use cmpxchg instead? E.g.:

if (ret) {
     qatomic_cmpxchg(&load_threads_ret, 0, ret);
}

> +
>       return 0;
>   }
>
> +void qemu_loadvm_start_load_thread(MigrationLoadThread function,
> +                                   void *opaque)
> +{
> +    struct LoadThreadData *data;
> +
> +    /* We only set it from this thread so it's okay to read it directly */
> +    assert(!load_threads_abort);
> +
> +    data = g_new(struct LoadThreadData, 1);
> +    data->function = function;
> +    data->opaque = opaque;
> +
> +    thread_pool_submit(load_threads, qemu_loadvm_load_thread,
> +                       data, g_free);
> +    thread_pool_adjust_max_threads_to_work(load_threads);
> +}
> +
>   void qemu_loadvm_state_cleanup(void)
>   {
>       SaveStateEntry *se;
>
>       trace_loadvm_state_cleanup();
> +
>       QTAILQ_FOREACH(se, &savevm_state.handlers, entry) {
>           if (se->ops && se->ops->load_cleanup) {
>               se->ops->load_cleanup(se->opaque);
>           }
>       }
> +
> +    /*
> +     * We might be called even without earlier qemu_loadvm_state_setup()
> +     * call if qemu_loadvm_state() fails very early.
> +     */
> +    if (load_threads) {
> +        qatomic_set(&load_threads_abort, true);
> +        bql_unlock(); /* Load threads might be waiting for BQL */
> +        thread_pool_wait(load_threads);
> +        bql_lock();
> +        g_clear_pointer(&load_threads, thread_pool_free);

Since thread_pool_free() also waits for pending jobs before returning, 
can we drop the explicit thread_pool_wait()? E.g.:

qatomic_set(&load_threads_abort, true);
bql_unlock(); /* Load threads might be waiting for BQL */
g_clear_pointer(&load_threads, thread_pool_free);
bql_lock();

Thanks.

> +    }
>   }
>
>   /* Return true if we should continue the migration, or false. */
> @@ -3007,6 +3071,19 @@ int qemu_loadvm_state(QEMUFile *f)
>           return ret;
>       }
>
> +    if (ret == 0) {
> +        bql_unlock(); /* Let load threads do work requiring BQL */
> +        thread_pool_wait(load_threads);
> +        bql_lock();
> +
> +        ret = load_threads_ret;
> +    }
> +    /*
> +     * Set this flag unconditionally so we'll catch further attempts to
> +     * start additional threads via an appropriate assert()
> +     */
> +    qatomic_set(&load_threads_abort, true);
> +
>       if (ret == 0) {
>           ret = qemu_file_get_error(f);
>       }

Re: [PATCH v3 08/24] migration: Add thread pool of optional load threads
Posted by Maciej S. Szmigiero 1 year, 2 months ago
On 28.11.2024 11:26, Avihai Horon wrote:
> 
> On 17/11/2024 21:20, Maciej S. Szmigiero wrote:
>> External email: Use caution opening links or attachments
>>
>>
>> From: "Maciej S. Szmigiero" <maciej.szmigiero@oracle.com>
>>
>> Some drivers might want to make use of auxiliary helper threads during VM
>> state loading, for example to make sure that their blocking (sync) I/O
>> operations don't block the rest of the migration process.
>>
>> Add a migration core managed thread pool to facilitate this use case.
>>
>> The migration core will wait for these threads to finish before
>> (re)starting the VM at destination.
>>
>> Signed-off-by: Maciej S. Szmigiero <maciej.szmigiero@oracle.com>
>> ---
>>   include/migration/misc.h |  3 ++
>>   include/qemu/typedefs.h  |  1 +
>>   migration/savevm.c       | 77 ++++++++++++++++++++++++++++++++++++++++
>>   3 files changed, 81 insertions(+)
>>
>> diff --git a/include/migration/misc.h b/include/migration/misc.h
>> index 804eb23c0607..c92ca018ab3b 100644
>> --- a/include/migration/misc.h
>> +++ b/include/migration/misc.h
>> @@ -45,9 +45,12 @@ bool migrate_ram_is_ignored(RAMBlock *block);
>>   /* migration/block.c */
>>
>>   AnnounceParameters *migrate_announce_params(void);
>> +
>>   /* migration/savevm.c */
>>
>>   void dump_vmstate_json_to_file(FILE *out_fp);
>> +void qemu_loadvm_start_load_thread(MigrationLoadThread function,
>> +                                   void *opaque);
>>
>>   /* migration/migration.c */
>>   void migration_object_init(void);
>> diff --git a/include/qemu/typedefs.h b/include/qemu/typedefs.h
>> index 3d84efcac47a..8c8ea5c2840d 100644
>> --- a/include/qemu/typedefs.h
>> +++ b/include/qemu/typedefs.h
>> @@ -131,5 +131,6 @@ typedef struct IRQState *qemu_irq;
>>    * Function types
>>    */
>>   typedef void (*qemu_irq_handler)(void *opaque, int n, int level);
>> +typedef int (*MigrationLoadThread)(bool *abort_flag, void *opaque);
>>
>>   #endif /* QEMU_TYPEDEFS_H */
>> diff --git a/migration/savevm.c b/migration/savevm.c
>> index 1f58a2fa54ae..6ea9054c4083 100644
>> --- a/migration/savevm.c
>> +++ b/migration/savevm.c
>> @@ -54,6 +54,7 @@
>>   #include "qemu/job.h"
>>   #include "qemu/main-loop.h"
>>   #include "block/snapshot.h"
>> +#include "block/thread-pool.h"
>>   #include "qemu/cutils.h"
>>   #include "io/channel-buffer.h"
>>   #include "io/channel-file.h"
>> @@ -71,6 +72,10 @@
>>
>>   const unsigned int postcopy_ram_discard_version;
>>
>> +static ThreadPool *load_threads;
>> +static int load_threads_ret;
>> +static bool load_threads_abort;
>> +
>>   /* Subcommands for QEMU_VM_COMMAND */
>>   enum qemu_vm_cmd {
>>       MIG_CMD_INVALID = 0,   /* Must be 0 */
>> @@ -2788,6 +2793,12 @@ static int qemu_loadvm_state_setup(QEMUFile *f, Error **errp)
>>       int ret;
>>
>>       trace_loadvm_state_setup();
>> +
>> +    assert(!load_threads);
>> +    load_threads = thread_pool_new();
>> +    load_threads_ret = 0;
>> +    load_threads_abort = false;
>> +
>>       QTAILQ_FOREACH(se, &savevm_state.handlers, entry) {
>>           if (!se->ops || !se->ops->load_setup) {
>>               continue;
>> @@ -2806,19 +2817,72 @@ static int qemu_loadvm_state_setup(QEMUFile *f, Error **errp)
>>               return ret;
>>           }
>>       }
>> +
>> +    return 0;
>> +}
>> +
>> +struct LoadThreadData {
>> +    MigrationLoadThread function;
>> +    void *opaque;
>> +};
>> +
>> +static int qemu_loadvm_load_thread(void *thread_opaque)
>> +{
>> +    struct LoadThreadData *data = thread_opaque;
>> +    int ret;
>> +
>> +    ret = data->function(&load_threads_abort, data->opaque);
>> +    if (ret && !qatomic_read(&load_threads_ret)) {
>> +        /*
>> +         * Racy with the above read but that's okay - which thread error
>> +         * return we report is purely arbitrary anyway.
>> +         */
>> +        qatomic_set(&load_threads_ret, ret);
>> +    }
> 
> Can we use cmpxchg instead? E.g.:
> 
> if (ret) {
>      qatomic_cmpxchg(&load_threads_ret, 0, ret);
> }

cmpxchg always forces sequentially consistent ordering
while qatomic_read() and qatomic_set() have relaxed ordering.

As the comment above describes, there's no need for sequential
consistency since which thread error is returned is arbitrary
anyway.

>> +
>>       return 0;
>>   }
>>
>> +void qemu_loadvm_start_load_thread(MigrationLoadThread function,
>> +                                   void *opaque)
>> +{
>> +    struct LoadThreadData *data;
>> +
>> +    /* We only set it from this thread so it's okay to read it directly */
>> +    assert(!load_threads_abort);
>> +
>> +    data = g_new(struct LoadThreadData, 1);
>> +    data->function = function;
>> +    data->opaque = opaque;
>> +
>> +    thread_pool_submit(load_threads, qemu_loadvm_load_thread,
>> +                       data, g_free);
>> +    thread_pool_adjust_max_threads_to_work(load_threads);
>> +}
>> +
>>   void qemu_loadvm_state_cleanup(void)
>>   {
>>       SaveStateEntry *se;
>>
>>       trace_loadvm_state_cleanup();
>> +
>>       QTAILQ_FOREACH(se, &savevm_state.handlers, entry) {
>>           if (se->ops && se->ops->load_cleanup) {
>>               se->ops->load_cleanup(se->opaque);
>>           }
>>       }
>> +
>> +    /*
>> +     * We might be called even without earlier qemu_loadvm_state_setup()
>> +     * call if qemu_loadvm_state() fails very early.
>> +     */
>> +    if (load_threads) {
>> +        qatomic_set(&load_threads_abort, true);
>> +        bql_unlock(); /* Load threads might be waiting for BQL */
>> +        thread_pool_wait(load_threads);
>> +        bql_lock();
>> +        g_clear_pointer(&load_threads, thread_pool_free);
> 
> Since thread_pool_free() also waits for pending jobs before returning, can we drop the explicit thread_pool_wait()? E.g.:
> 
> qatomic_set(&load_threads_abort, true);
> bql_unlock(); /* Load threads might be waiting for BQL */
> g_clear_pointer(&load_threads, thread_pool_free);
> bql_lock();

If we document that thread_pool_free() has also wait semantics
as Cédric has suggested then we can indeed avoid the explicit
wait on cleanup.
  
> Thanks.

Thanks,
Maciej


Re: [PATCH v3 08/24] migration: Add thread pool of optional load threads
Posted by Peter Xu 1 year, 2 months ago
On Thu, Nov 28, 2024 at 01:11:53PM +0100, Maciej S. Szmigiero wrote:
> > > +static int qemu_loadvm_load_thread(void *thread_opaque)
> > > +{
> > > +    struct LoadThreadData *data = thread_opaque;
> > > +    int ret;
> > > +
> > > +    ret = data->function(&load_threads_abort, data->opaque);
> > > +    if (ret && !qatomic_read(&load_threads_ret)) {
> > > +        /*
> > > +         * Racy with the above read but that's okay - which thread error
> > > +         * return we report is purely arbitrary anyway.
> > > +         */
> > > +        qatomic_set(&load_threads_ret, ret);
> > > +    }
> > 
> > Can we use cmpxchg instead? E.g.:
> > 
> > if (ret) {
> >      qatomic_cmpxchg(&load_threads_ret, 0, ret);
> > }
> 
> cmpxchg always forces sequentially consistent ordering
> while qatomic_read() and qatomic_set() have relaxed ordering.
> 
> As the comment above describes, there's no need for sequential
> consistency since which thread error is returned is arbitrary
> anyway.

IMHO this is not a hot path, so mem ordering isn't an issue.  If we could
avoid any data race we still should try to.

I do feel uneasy on the current design where everybody shares the "whether
to quit" via one bool, and any thread can set it... meanwhile we can't
stablize the first error to report later.

E.g., ideally we want to capture the first error no matter where it came
from, then keep it with migrate_set_error() so that "query-migrate" on dest
later can tell us what was wrong.  I think libvirt generally uses that.

So as to support a string error, at least we'll need to allow Error** in
the thread fn:

typedef bool (*MigrationLoadThread)(void *opaque, bool *should_quit,
                                    Error **errp);

I also changed retval to bool, as I mentioned elsewhere QEMU tries to stick
with "bool SOME_FUNCTION(..., Error **errp)" kind of error reporting.

Then any thread should only report error to qemu_loadvm_load_thread(), and
the report should always be a local Error**, then it further reports to the
global error.  Something like:

static int qemu_loadvm_load_thread(void *thread_opaque)
{
    MigrationIncomingState *mis = migration_incoming_get_current();
    struct LoadThreadData *data = thread_opaque;
    Error *error = NULL;

    if (!data->function(data->opaque, &mis->should_quit, &error)) {
       migrate_set_error(migrate_get_current(), error);
    }

    return 0;
}

migrate_set_error() is thread-safe, and it'll only record the 1st error.
Then the thread should only read &should_quit, and only set &error.  If we
want, migrate_set_error() can set &should_quit.

PS: I wished we have an unified place to tell whether we should quit
incoming migration - we already have multifd_recv_state->exiting, we could
have had a global flag like that then we can already use.  But I know I'm
asking too much.. However would you think it make sense to still have at
least Error** report the error and record it?

-- 
Peter Xu


Re: [PATCH v3 08/24] migration: Add thread pool of optional load threads
Posted by Maciej S. Szmigiero 1 year, 2 months ago
On 4.12.2024 23:43, Peter Xu wrote:
> On Thu, Nov 28, 2024 at 01:11:53PM +0100, Maciej S. Szmigiero wrote:
>>>> +static int qemu_loadvm_load_thread(void *thread_opaque)
>>>> +{
>>>> +    struct LoadThreadData *data = thread_opaque;
>>>> +    int ret;
>>>> +
>>>> +    ret = data->function(&load_threads_abort, data->opaque);
>>>> +    if (ret && !qatomic_read(&load_threads_ret)) {
>>>> +        /*
>>>> +         * Racy with the above read but that's okay - which thread error
>>>> +         * return we report is purely arbitrary anyway.
>>>> +         */
>>>> +        qatomic_set(&load_threads_ret, ret);
>>>> +    }
>>>
>>> Can we use cmpxchg instead? E.g.:
>>>
>>> if (ret) {
>>>       qatomic_cmpxchg(&load_threads_ret, 0, ret);
>>> }
>>
>> cmpxchg always forces sequentially consistent ordering
>> while qatomic_read() and qatomic_set() have relaxed ordering.
>>
>> As the comment above describes, there's no need for sequential
>> consistency since which thread error is returned is arbitrary
>> anyway.
> 
> IMHO this is not a hot path, so mem ordering isn't an issue.  If we could
> avoid any data race we still should try to.
> 
> I do feel uneasy on the current design where everybody shares the "whether
> to quit" via one bool, and any thread can set it... meanwhile we can't
> stablize the first error to report later.
> 
> E.g., ideally we want to capture the first error no matter where it came
> from, then keep it with migrate_set_error() so that "query-migrate" on dest
> later can tell us what was wrong.  I think libvirt generally uses that.
> 
> So as to support a string error, at least we'll need to allow Error** in
> the thread fn:
> 
> typedef bool (*MigrationLoadThread)(void *opaque, bool *should_quit,
>                                      Error **errp);
> 
> I also changed retval to bool, as I mentioned elsewhere QEMU tries to stick
> with "bool SOME_FUNCTION(..., Error **errp)" kind of error reporting.
> 
> Then any thread should only report error to qemu_loadvm_load_thread(), and
> the report should always be a local Error**, then it further reports to the
> global error.  Something like:
> 
> static int qemu_loadvm_load_thread(void *thread_opaque)
> {
>      MigrationIncomingState *mis = migration_incoming_get_current();
>      struct LoadThreadData *data = thread_opaque;
>      Error *error = NULL;
> 
>      if (!data->function(data->opaque, &mis->should_quit, &error)) {
>         migrate_set_error(migrate_get_current(), error);
>      }
> 
>      return 0;
> }
> 
> migrate_set_error() is thread-safe, and it'll only record the 1st error.
>
> Then the thread should only read &should_quit, and only set &error.  If we
> want, migrate_set_error() can set &should_quit.
> 
> PS: I wished we have an unified place to tell whether we should quit
> incoming migration - we already have multifd_recv_state->exiting, we could
> have had a global flag like that then we can already use.  But I know I'm
> asking too much.. However would you think it make sense to still have at
> least Error** report the error and record it?
> 

This could work with the following changes/caveats:
* Needs g_autoptr(Error) otherwise these Error objects will leak.

* "1st error" here is as arbitrary as with my current code since which
thread first acquires the mutex in migrate_set_error() is unspecified.

* We still need to test this new error flag (now as migrate_has_error())
in qemu_loadvm_state() to see whether we proceed forward with the
migration.

-------------------------------------------------------------------

Also, I am not in favor of replacing load_threads_abort with something
else since we still want to ask threads to quit for other reasons, like
earlier (non-load threads related) failure in the migration process.

That's why we set this flag unconditionally in qemu_loadvm_state() -
see also my answer about that flag in the next message.

Thanks,
Maciej



Re: [PATCH v3 08/24] migration: Add thread pool of optional load threads
Posted by Peter Xu 1 year, 1 month ago
On Wed, Dec 11, 2024 at 12:05:03AM +0100, Maciej S. Szmigiero wrote:
> On 4.12.2024 23:43, Peter Xu wrote:
> > On Thu, Nov 28, 2024 at 01:11:53PM +0100, Maciej S. Szmigiero wrote:
> > > > > +static int qemu_loadvm_load_thread(void *thread_opaque)
> > > > > +{
> > > > > +    struct LoadThreadData *data = thread_opaque;
> > > > > +    int ret;
> > > > > +
> > > > > +    ret = data->function(&load_threads_abort, data->opaque);
> > > > > +    if (ret && !qatomic_read(&load_threads_ret)) {
> > > > > +        /*
> > > > > +         * Racy with the above read but that's okay - which thread error
> > > > > +         * return we report is purely arbitrary anyway.
> > > > > +         */
> > > > > +        qatomic_set(&load_threads_ret, ret);
> > > > > +    }
> > > > 
> > > > Can we use cmpxchg instead? E.g.:
> > > > 
> > > > if (ret) {
> > > >       qatomic_cmpxchg(&load_threads_ret, 0, ret);
> > > > }
> > > 
> > > cmpxchg always forces sequentially consistent ordering
> > > while qatomic_read() and qatomic_set() have relaxed ordering.
> > > 
> > > As the comment above describes, there's no need for sequential
> > > consistency since which thread error is returned is arbitrary
> > > anyway.
> > 
> > IMHO this is not a hot path, so mem ordering isn't an issue.  If we could
> > avoid any data race we still should try to.
> > 
> > I do feel uneasy on the current design where everybody shares the "whether
> > to quit" via one bool, and any thread can set it... meanwhile we can't
> > stablize the first error to report later.
> > 
> > E.g., ideally we want to capture the first error no matter where it came
> > from, then keep it with migrate_set_error() so that "query-migrate" on dest
> > later can tell us what was wrong.  I think libvirt generally uses that.
> > 
> > So as to support a string error, at least we'll need to allow Error** in
> > the thread fn:
> > 
> > typedef bool (*MigrationLoadThread)(void *opaque, bool *should_quit,
> >                                      Error **errp);
> > 
> > I also changed retval to bool, as I mentioned elsewhere QEMU tries to stick
> > with "bool SOME_FUNCTION(..., Error **errp)" kind of error reporting.
> > 
> > Then any thread should only report error to qemu_loadvm_load_thread(), and
> > the report should always be a local Error**, then it further reports to the
> > global error.  Something like:
> > 
> > static int qemu_loadvm_load_thread(void *thread_opaque)
> > {
> >      MigrationIncomingState *mis = migration_incoming_get_current();
> >      struct LoadThreadData *data = thread_opaque;
> >      Error *error = NULL;
> > 
> >      if (!data->function(data->opaque, &mis->should_quit, &error)) {
> >         migrate_set_error(migrate_get_current(), error);
> >      }
> > 
> >      return 0;
> > }
> > 
> > migrate_set_error() is thread-safe, and it'll only record the 1st error.
> > 
> > Then the thread should only read &should_quit, and only set &error.  If we
> > want, migrate_set_error() can set &should_quit.
> > 
> > PS: I wished we have an unified place to tell whether we should quit
> > incoming migration - we already have multifd_recv_state->exiting, we could
> > have had a global flag like that then we can already use.  But I know I'm
> > asking too much.. However would you think it make sense to still have at
> > least Error** report the error and record it?
> > 
> 
> This could work with the following changes/caveats:
> * Needs g_autoptr(Error) otherwise these Error objects will leak.

True.. or just error_free() it after set.

> 
> * "1st error" here is as arbitrary as with my current code since which
> thread first acquires the mutex in migrate_set_error() is unspecified.

Yes that's still a step forward on being verbose of errors, which is almost
always more helpful than a bool..

Not exactly the 1st error in time sequence is ok - we don't strongly ask
for that, e.g. if two threads error at merely the same time it's ok we only
record one of them no matter which one is first.  That's unusual to start
with.

OTOH it matters on that we fail other threads only _after_ we set_error()
for the first error.  If so it's mostly always the case the captured error
will be valid and the real 1st error.

> 
> * We still need to test this new error flag (now as migrate_has_error())
> in qemu_loadvm_state() to see whether we proceed forward with the
> migration.

Yes, or just to work like what this patch does: set mis->should_quit within
the 1st setup of migrate_set_error().  For the longer term, maybe we need
to do more to put together all error setup/detection for migration.. but
for now we can at least do that for this series to set should_quit=true
there only.  It should work like your series, only that the boolean won't
be writable to data->function() but read-only there, for the sake of
capturing the Error string.

> 
> -------------------------------------------------------------------
> 
> Also, I am not in favor of replacing load_threads_abort with something
> else since we still want to ask threads to quit for other reasons, like
> earlier (non-load threads related) failure in the migration process.
> 
> That's why we set this flag unconditionally in qemu_loadvm_state() -
> see also my answer about that flag in the next message.

I'm not against having a boolean to say quit, maybe we should have that for
!vfio use case too, and I'm ok we introduce one.  But I hope two things can
work out:

  - Capture Error* and persist it in query-migrate (aka, use
    migrate_set_error).

  - Avoid setting load_threads_abort explicitly in vmstate load path.  It
    should really be part of destroy(), IMHO, as I mentioned in the other
    email, to recycle load threads in a failure case.

Thanks,

-- 
Peter Xu


Re: [PATCH v3 08/24] migration: Add thread pool of optional load threads
Posted by Maciej S. Szmigiero 1 year, 1 month ago
On 12.12.2024 17:55, Peter Xu wrote:
> On Wed, Dec 11, 2024 at 12:05:03AM +0100, Maciej S. Szmigiero wrote:
>> On 4.12.2024 23:43, Peter Xu wrote:
>>> On Thu, Nov 28, 2024 at 01:11:53PM +0100, Maciej S. Szmigiero wrote:
>>>>>> +static int qemu_loadvm_load_thread(void *thread_opaque)
>>>>>> +{
>>>>>> +    struct LoadThreadData *data = thread_opaque;
>>>>>> +    int ret;
>>>>>> +
>>>>>> +    ret = data->function(&load_threads_abort, data->opaque);
>>>>>> +    if (ret && !qatomic_read(&load_threads_ret)) {
>>>>>> +        /*
>>>>>> +         * Racy with the above read but that's okay - which thread error
>>>>>> +         * return we report is purely arbitrary anyway.
>>>>>> +         */
>>>>>> +        qatomic_set(&load_threads_ret, ret);
>>>>>> +    }
>>>>>
>>>>> Can we use cmpxchg instead? E.g.:
>>>>>
>>>>> if (ret) {
>>>>>        qatomic_cmpxchg(&load_threads_ret, 0, ret);
>>>>> }
>>>>
>>>> cmpxchg always forces sequentially consistent ordering
>>>> while qatomic_read() and qatomic_set() have relaxed ordering.
>>>>
>>>> As the comment above describes, there's no need for sequential
>>>> consistency since which thread error is returned is arbitrary
>>>> anyway.
>>>
>>> IMHO this is not a hot path, so mem ordering isn't an issue.  If we could
>>> avoid any data race we still should try to.
>>>
>>> I do feel uneasy on the current design where everybody shares the "whether
>>> to quit" via one bool, and any thread can set it... meanwhile we can't
>>> stablize the first error to report later.
>>>
>>> E.g., ideally we want to capture the first error no matter where it came
>>> from, then keep it with migrate_set_error() so that "query-migrate" on dest
>>> later can tell us what was wrong.  I think libvirt generally uses that.
>>>
>>> So as to support a string error, at least we'll need to allow Error** in
>>> the thread fn:
>>>
>>> typedef bool (*MigrationLoadThread)(void *opaque, bool *should_quit,
>>>                                       Error **errp);
>>>
>>> I also changed retval to bool, as I mentioned elsewhere QEMU tries to stick
>>> with "bool SOME_FUNCTION(..., Error **errp)" kind of error reporting.
>>>
>>> Then any thread should only report error to qemu_loadvm_load_thread(), and
>>> the report should always be a local Error**, then it further reports to the
>>> global error.  Something like:
>>>
>>> static int qemu_loadvm_load_thread(void *thread_opaque)
>>> {
>>>       MigrationIncomingState *mis = migration_incoming_get_current();
>>>       struct LoadThreadData *data = thread_opaque;
>>>       Error *error = NULL;
>>>
>>>       if (!data->function(data->opaque, &mis->should_quit, &error)) {
>>>          migrate_set_error(migrate_get_current(), error);
>>>       }
>>>
>>>       return 0;
>>> }
>>>
>>> migrate_set_error() is thread-safe, and it'll only record the 1st error.
>>>
>>> Then the thread should only read &should_quit, and only set &error.  If we
>>> want, migrate_set_error() can set &should_quit.
>>>
>>> PS: I wished we have an unified place to tell whether we should quit
>>> incoming migration - we already have multifd_recv_state->exiting, we could
>>> have had a global flag like that then we can already use.  But I know I'm
>>> asking too much.. However would you think it make sense to still have at
>>> least Error** report the error and record it?
>>>
>>
>> This could work with the following changes/caveats:
>> * Needs g_autoptr(Error) otherwise these Error objects will leak.
> 
> True.. or just error_free() it after set.
> 
>>
>> * "1st error" here is as arbitrary as with my current code since which
>> thread first acquires the mutex in migrate_set_error() is unspecified.
> 
> Yes that's still a step forward on being verbose of errors, which is almost
> always more helpful than a bool..
> 
> Not exactly the 1st error in time sequence is ok - we don't strongly ask
> for that, e.g. if two threads error at merely the same time it's ok we only
> record one of them no matter which one is first.  That's unusual to start
> with.
> 
> OTOH it matters on that we fail other threads only _after_ we set_error()
> for the first error.  If so it's mostly always the case the captured error
> will be valid and the real 1st error.
> 
>>
>> * We still need to test this new error flag (now as migrate_has_error())
>> in qemu_loadvm_state() to see whether we proceed forward with the
>> migration.
> 
> Yes, or just to work like what this patch does: set mis->should_quit within
> the 1st setup of migrate_set_error().  For the longer term, maybe we need
> to do more to put together all error setup/detection for migration.. but
> for now we can at least do that for this series to set should_quit=true
> there only.  It should work like your series, only that the boolean won't
> be writable to data->function() but read-only there, for the sake of
> capturing the Error string.

migrate_set_error() wouldn't be called until qemu_loadvm_state() exits
into process_incoming_migration_co().

Also this does not account other qemu_loadvm_state() callers like
qmp_xen_load_devices_state() or load_snapshot().

While these other callers might not use load threads currently, it feels
wrong to wait for these threads in qemu_loadvm_state() but set their
termination/abort flag as a side effect of completely different function
(migrate_set_error()).

Having a dedicated abort flag also makes the semantics easy to infer
from code since once can simply grep for this flag name (load_threads_abort)
to see where it is being written.

Its name is also pretty descriptive making it easy to immediately tell
what it does.

>>
>> -------------------------------------------------------------------
>>
>> Also, I am not in favor of replacing load_threads_abort with something
>> else since we still want to ask threads to quit for other reasons, like
>> earlier (non-load threads related) failure in the migration process.
>>
>> That's why we set this flag unconditionally in qemu_loadvm_state() -
>> see also my answer about that flag in the next message.
> 
> I'm not against having a boolean to say quit, maybe we should have that for
> !vfio use case too, and I'm ok we introduce one.  But I hope two things can
> work out:
> 
>    - Capture Error* and persist it in query-migrate (aka, use
>      migrate_set_error).

Will do.

>    - Avoid setting load_threads_abort explicitly in vmstate load path.  It
>      should really be part of destroy(), IMHO, as I mentioned in the other
>      email, to recycle load threads in a failure case.

That's the same thread abort flag issue as in the first block of my reply
above.

> Thanks,
> 

Thanks,
Maciej


Re: [PATCH v3 08/24] migration: Add thread pool of optional load threads
Posted by Peter Xu 1 year, 1 month ago
On Thu, Dec 12, 2024 at 11:53:42PM +0100, Maciej S. Szmigiero wrote:
> migrate_set_error() wouldn't be called until qemu_loadvm_state() exits
> into process_incoming_migration_co().
> 
> Also this does not account other qemu_loadvm_state() callers like
> qmp_xen_load_devices_state() or load_snapshot().
> 
> While these other callers might not use load threads currently, it feels
> wrong to wait for these threads in qemu_loadvm_state() but set their
> termination/abort flag as a side effect of completely different function
> (migrate_set_error()).
> 
> Having a dedicated abort flag also makes the semantics easy to infer
> from code since once can simply grep for this flag name (load_threads_abort)
> to see where it is being written.
> 
> Its name is also pretty descriptive making it easy to immediately tell
> what it does.

That's fine. As long as we can at least report an Error** and remember that
it's OK to me.

Thanks,

-- 
Peter Xu
Re: [PATCH v3 08/24] migration: Add thread pool of optional load threads
Posted by Maciej S. Szmigiero 1 year, 1 month ago
On 16.12.2024 17:33, Peter Xu wrote:
> On Thu, Dec 12, 2024 at 11:53:42PM +0100, Maciej S. Szmigiero wrote:
>> migrate_set_error() wouldn't be called until qemu_loadvm_state() exits
>> into process_incoming_migration_co().
>>
>> Also this does not account other qemu_loadvm_state() callers like
>> qmp_xen_load_devices_state() or load_snapshot().
>>
>> While these other callers might not use load threads currently, it feels
>> wrong to wait for these threads in qemu_loadvm_state() but set their
>> termination/abort flag as a side effect of completely different function
>> (migrate_set_error()).
>>
>> Having a dedicated abort flag also makes the semantics easy to infer
>> from code since once can simply grep for this flag name (load_threads_abort)
>> to see where it is being written.
>>
>> Its name is also pretty descriptive making it easy to immediately tell
>> what it does.
> 
> That's fine. As long as we can at least report an Error** and remember that
> it's OK to me.

I think the above will be a good design indeed.
  > Thanks,
> 

Thanks,
Maciej
Re: [PATCH v3 08/24] migration: Add thread pool of optional load threads
Posted by Cédric Le Goater 1 year, 2 months ago
On 11/17/24 20:20, Maciej S. Szmigiero wrote:
> From: "Maciej S. Szmigiero" <maciej.szmigiero@oracle.com>
> 
> Some drivers might want to make use of auxiliary helper threads during VM
> state loading, for example to make sure that their blocking (sync) I/O
> operations don't block the rest of the migration process.
> 
> Add a migration core managed thread pool to facilitate this use case.
> 
> The migration core will wait for these threads to finish before
> (re)starting the VM at destination.
> 
> Signed-off-by: Maciej S. Szmigiero <maciej.szmigiero@oracle.com>
> ---
>   include/migration/misc.h |  3 ++
>   include/qemu/typedefs.h  |  1 +
>   migration/savevm.c       | 77 ++++++++++++++++++++++++++++++++++++++++
>   3 files changed, 81 insertions(+)
> 
> diff --git a/include/migration/misc.h b/include/migration/misc.h
> index 804eb23c0607..c92ca018ab3b 100644
> --- a/include/migration/misc.h
> +++ b/include/migration/misc.h
> @@ -45,9 +45,12 @@ bool migrate_ram_is_ignored(RAMBlock *block);
>   /* migration/block.c */
>   
>   AnnounceParameters *migrate_announce_params(void);
> +
>   /* migration/savevm.c */
>   
>   void dump_vmstate_json_to_file(FILE *out_fp);
> +void qemu_loadvm_start_load_thread(MigrationLoadThread function,
> +                                   void *opaque);
>   
>   /* migration/migration.c */
>   void migration_object_init(void);
> diff --git a/include/qemu/typedefs.h b/include/qemu/typedefs.h
> index 3d84efcac47a..8c8ea5c2840d 100644
> --- a/include/qemu/typedefs.h
> +++ b/include/qemu/typedefs.h
> @@ -131,5 +131,6 @@ typedef struct IRQState *qemu_irq;
>    * Function types
>    */
>   typedef void (*qemu_irq_handler)(void *opaque, int n, int level);
> +typedef int (*MigrationLoadThread)(bool *abort_flag, void *opaque);
>   
>   #endif /* QEMU_TYPEDEFS_H */
> diff --git a/migration/savevm.c b/migration/savevm.c
> index 1f58a2fa54ae..6ea9054c4083 100644
> --- a/migration/savevm.c
> +++ b/migration/savevm.c
> @@ -54,6 +54,7 @@
>   #include "qemu/job.h"
>   #include "qemu/main-loop.h"
>   #include "block/snapshot.h"
> +#include "block/thread-pool.h"
>   #include "qemu/cutils.h"
>   #include "io/channel-buffer.h"
>   #include "io/channel-file.h"
> @@ -71,6 +72,10 @@
>   
>   const unsigned int postcopy_ram_discard_version;
>   
> +static ThreadPool *load_threads;
> +static int load_threads_ret;
> +static bool load_threads_abort;
> +
>   /* Subcommands for QEMU_VM_COMMAND */
>   enum qemu_vm_cmd {
>       MIG_CMD_INVALID = 0,   /* Must be 0 */
> @@ -2788,6 +2793,12 @@ static int qemu_loadvm_state_setup(QEMUFile *f, Error **errp)
>       int ret;
>   
>       trace_loadvm_state_setup();
> +
> +    assert(!load_threads);
> +    load_threads = thread_pool_new();
> +    load_threads_ret = 0;
> +    load_threads_abort = false;

I would introduce a qemu_loadvm_thread_pool_create() helper.

Why is the thead pool always created ? Might be OK.


> +
>       QTAILQ_FOREACH(se, &savevm_state.handlers, entry) {
>           if (!se->ops || !se->ops->load_setup) {
>               continue;
> @@ -2806,19 +2817,72 @@ static int qemu_loadvm_state_setup(QEMUFile *f, Error **errp)
>               return ret;
>           }
>       }
> +
> +    return 0;
> +}
> +
> +struct LoadThreadData {
> +    MigrationLoadThread function;
> +    void *opaque;
> +};
> +
> +static int qemu_loadvm_load_thread(void *thread_opaque)
> +{
> +    struct LoadThreadData *data = thread_opaque;
> +    int ret;
> +
> +    ret = data->function(&load_threads_abort, data->opaque);
> +    if (ret && !qatomic_read(&load_threads_ret)) {
> +        /*
> +         * Racy with the above read but that's okay - which thread error
> +         * return we report is purely arbitrary anyway.
> +         */
> +        qatomic_set(&load_threads_ret, ret);
> +    }
> +
>       return 0;>   }
>   
> +void qemu_loadvm_start_load_thread(MigrationLoadThread function,
> +                                   void *opaque)
> +{> +    struct LoadThreadData *data;
> +
> +    /* We only set it from this thread so it's okay to read it directly */
> +    assert(!load_threads_abort);
> +
> +    data = g_new(struct LoadThreadData, 1);
> +    data->function = function;
> +    data->opaque = opaque;
> +
> +    thread_pool_submit(load_threads, qemu_loadvm_load_thread,
> +                       data, g_free);
> +    thread_pool_adjust_max_threads_to_work(load_threads);
> +}> +>   void qemu_loadvm_state_cleanup(void)
>   {
>       SaveStateEntry *se;
>   
>       trace_loadvm_state_cleanup();
> +
>       QTAILQ_FOREACH(se, &savevm_state.handlers, entry) {
>           if (se->ops && se->ops->load_cleanup) {
>               se->ops->load_cleanup(se->opaque);
>           }
>       }
> +
> +    /*
> +     * We might be called even without earlier qemu_loadvm_state_setup()
> +     * call if qemu_loadvm_state() fails very early.
> +     */
> +    if (load_threads) {
> +        qatomic_set(&load_threads_abort, true);
> +        bql_unlock(); /* Load threads might be waiting for BQL */
> +        thread_pool_wait(load_threads);
> +        bql_lock();
> +        g_clear_pointer(&load_threads, thread_pool_free);
> +    }

I would introduce a qemu_loadvm_thread_pool_destroy() helper

>   }
>   
>   /* Return true if we should continue the migration, or false. */
> @@ -3007,6 +3071,19 @@ int qemu_loadvm_state(QEMUFile *f)
>           return ret;
>       }
>   
> +    if (ret == 0) {
> +        bql_unlock(); /* Let load threads do work requiring BQL */
> +        thread_pool_wait(load_threads);
> +        bql_lock();
> +
> +        ret = load_threads_ret;
> +    }
> +    /*
> +     * Set this flag unconditionally so we'll catch further attempts to
> +     * start additional threads via an appropriate assert()
> +     */
> +    qatomic_set(&load_threads_abort, true);
> +


I would introduce a qemu_loadvm_thread_pool_wait() helper

>       if (ret == 0) {
>           ret = qemu_file_get_error(f);
>       }
> 

I think we could hide the implementation in a new component of
the migration subsystem or, at least, we could group the
implementation at the top the file. It would help the uninitiated
reader to become familiar with the migration area.

Thanks,

C.
Re: [PATCH v3 08/24] migration: Add thread pool of optional load threads
Posted by Maciej S. Szmigiero 1 year, 2 months ago
On 27.11.2024 10:13, Cédric Le Goater wrote:
> On 11/17/24 20:20, Maciej S. Szmigiero wrote:
>> From: "Maciej S. Szmigiero" <maciej.szmigiero@oracle.com>
>>
>> Some drivers might want to make use of auxiliary helper threads during VM
>> state loading, for example to make sure that their blocking (sync) I/O
>> operations don't block the rest of the migration process.
>>
>> Add a migration core managed thread pool to facilitate this use case.
>>
>> The migration core will wait for these threads to finish before
>> (re)starting the VM at destination.
>>
>> Signed-off-by: Maciej S. Szmigiero <maciej.szmigiero@oracle.com>
>> ---
>>   include/migration/misc.h |  3 ++
>>   include/qemu/typedefs.h  |  1 +
>>   migration/savevm.c       | 77 ++++++++++++++++++++++++++++++++++++++++
>>   3 files changed, 81 insertions(+)
>>
>> diff --git a/include/migration/misc.h b/include/migration/misc.h
>> index 804eb23c0607..c92ca018ab3b 100644
>> --- a/include/migration/misc.h
>> +++ b/include/migration/misc.h
>> @@ -45,9 +45,12 @@ bool migrate_ram_is_ignored(RAMBlock *block);
>>   /* migration/block.c */
>>   AnnounceParameters *migrate_announce_params(void);
>> +
>>   /* migration/savevm.c */
>>   void dump_vmstate_json_to_file(FILE *out_fp);
>> +void qemu_loadvm_start_load_thread(MigrationLoadThread function,
>> +                                   void *opaque);
>>   /* migration/migration.c */
>>   void migration_object_init(void);
>> diff --git a/include/qemu/typedefs.h b/include/qemu/typedefs.h
>> index 3d84efcac47a..8c8ea5c2840d 100644
>> --- a/include/qemu/typedefs.h
>> +++ b/include/qemu/typedefs.h
>> @@ -131,5 +131,6 @@ typedef struct IRQState *qemu_irq;
>>    * Function types
>>    */
>>   typedef void (*qemu_irq_handler)(void *opaque, int n, int level);
>> +typedef int (*MigrationLoadThread)(bool *abort_flag, void *opaque);
>>   #endif /* QEMU_TYPEDEFS_H */
>> diff --git a/migration/savevm.c b/migration/savevm.c
>> index 1f58a2fa54ae..6ea9054c4083 100644
>> --- a/migration/savevm.c
>> +++ b/migration/savevm.c
>> @@ -54,6 +54,7 @@
>>   #include "qemu/job.h"
>>   #include "qemu/main-loop.h"
>>   #include "block/snapshot.h"
>> +#include "block/thread-pool.h"
>>   #include "qemu/cutils.h"
>>   #include "io/channel-buffer.h"
>>   #include "io/channel-file.h"
>> @@ -71,6 +72,10 @@
>>   const unsigned int postcopy_ram_discard_version;
>> +static ThreadPool *load_threads;
>> +static int load_threads_ret;
>> +static bool load_threads_abort;
>> +
>>   /* Subcommands for QEMU_VM_COMMAND */
>>   enum qemu_vm_cmd {
>>       MIG_CMD_INVALID = 0,   /* Must be 0 */
>> @@ -2788,6 +2793,12 @@ static int qemu_loadvm_state_setup(QEMUFile *f, Error **errp)
>>       int ret;
>>       trace_loadvm_state_setup();
>> +
>> +    assert(!load_threads);
>> +    load_threads = thread_pool_new();
>> +    load_threads_ret = 0;
>> +    load_threads_abort = false;
> 
> I would introduce a qemu_loadvm_thread_pool_create() helper.

Will do.

> Why is the thead pool always created ? Might be OK.
> 

This functionality provides a generic auxiliary load helper threads
pool, not necessarily tied to the multifd device state transfer.

That's why the pool is created unconditionally.

>> +
>>       QTAILQ_FOREACH(se, &savevm_state.handlers, entry) {
>>           if (!se->ops || !se->ops->load_setup) {
>>               continue;
>> @@ -2806,19 +2817,72 @@ static int qemu_loadvm_state_setup(QEMUFile *f, Error **errp)
>>               return ret;
>>           }
>>       }
>> +
>> +    return 0;
>> +}
>> +
>> +struct LoadThreadData {
>> +    MigrationLoadThread function;
>> +    void *opaque;
>> +};
>> +
>> +static int qemu_loadvm_load_thread(void *thread_opaque)
>> +{
>> +    struct LoadThreadData *data = thread_opaque;
>> +    int ret;
>> +
>> +    ret = data->function(&load_threads_abort, data->opaque);
>> +    if (ret && !qatomic_read(&load_threads_ret)) {
>> +        /*
>> +         * Racy with the above read but that's okay - which thread error
>> +         * return we report is purely arbitrary anyway.
>> +         */
>> +        qatomic_set(&load_threads_ret, ret);
>> +    }
>> +
>>       return 0;>   }
>> +void qemu_loadvm_start_load_thread(MigrationLoadThread function,
>> +                                   void *opaque)
>> +{> +    struct LoadThreadData *data;
>> +
>> +    /* We only set it from this thread so it's okay to read it directly */
>> +    assert(!load_threads_abort);
>> +
>> +    data = g_new(struct LoadThreadData, 1);
>> +    data->function = function;
>> +    data->opaque = opaque;
>> +
>> +    thread_pool_submit(load_threads, qemu_loadvm_load_thread,
>> +                       data, g_free);
>> +    thread_pool_adjust_max_threads_to_work(load_threads);
>> +}> +>   void qemu_loadvm_state_cleanup(void)
>>   {
>>       SaveStateEntry *se;
>>       trace_loadvm_state_cleanup();
>> +
>>       QTAILQ_FOREACH(se, &savevm_state.handlers, entry) {
>>           if (se->ops && se->ops->load_cleanup) {
>>               se->ops->load_cleanup(se->opaque);
>>           }
>>       }
>> +
>> +    /*
>> +     * We might be called even without earlier qemu_loadvm_state_setup()
>> +     * call if qemu_loadvm_state() fails very early.
>> +     */
>> +    if (load_threads) {
>> +        qatomic_set(&load_threads_abort, true);
>> +        bql_unlock(); /* Load threads might be waiting for BQL */
>> +        thread_pool_wait(load_threads);
>> +        bql_lock();
>> +        g_clear_pointer(&load_threads, thread_pool_free);
>> +    }
> 
> I would introduce a qemu_loadvm_thread_pool_destroy() helper

Will do.

>>   }
>>   /* Return true if we should continue the migration, or false. */
>> @@ -3007,6 +3071,19 @@ int qemu_loadvm_state(QEMUFile *f)
>>           return ret;
>>       }
>> +    if (ret == 0) {
>> +        bql_unlock(); /* Let load threads do work requiring BQL */
>> +        thread_pool_wait(load_threads);
>> +        bql_lock();
>> +
>> +        ret = load_threads_ret;
>> +    }
>> +    /*
>> +     * Set this flag unconditionally so we'll catch further attempts to
>> +     * start additional threads via an appropriate assert()
>> +     */
>> +    qatomic_set(&load_threads_abort, true);
>> +
> 
> 
> I would introduce a qemu_loadvm_thread_pool_wait() helper

Will do.

>>       if (ret == 0) {
>>           ret = qemu_file_get_error(f);
>>       }
>>
> 
> I think we could hide the implementation in a new component of
> the migration subsystem or, at least, we could group the
> implementation at the top the file. It would help the uninitiated
> reader to become familiar with the migration area.

I will move these new helpers to a separate area of the "savevm.c"
file, marked/separated by an appropriate comment.

> Thanks,
> 
> C.
> 

Thanks,
Maciej


Re: [PATCH v3 08/24] migration: Add thread pool of optional load threads
Posted by Peter Xu 1 year, 2 months ago
On Wed, Nov 27, 2024 at 09:16:49PM +0100, Maciej S. Szmigiero wrote:
> On 27.11.2024 10:13, Cédric Le Goater wrote:
> > On 11/17/24 20:20, Maciej S. Szmigiero wrote:
> > > From: "Maciej S. Szmigiero" <maciej.szmigiero@oracle.com>
> > > 
> > > Some drivers might want to make use of auxiliary helper threads during VM
> > > state loading, for example to make sure that their blocking (sync) I/O
> > > operations don't block the rest of the migration process.
> > > 
> > > Add a migration core managed thread pool to facilitate this use case.
> > > 
> > > The migration core will wait for these threads to finish before
> > > (re)starting the VM at destination.
> > > 
> > > Signed-off-by: Maciej S. Szmigiero <maciej.szmigiero@oracle.com>
> > > ---
> > >   include/migration/misc.h |  3 ++
> > >   include/qemu/typedefs.h  |  1 +
> > >   migration/savevm.c       | 77 ++++++++++++++++++++++++++++++++++++++++
> > >   3 files changed, 81 insertions(+)
> > > 
> > > diff --git a/include/migration/misc.h b/include/migration/misc.h
> > > index 804eb23c0607..c92ca018ab3b 100644
> > > --- a/include/migration/misc.h
> > > +++ b/include/migration/misc.h
> > > @@ -45,9 +45,12 @@ bool migrate_ram_is_ignored(RAMBlock *block);
> > >   /* migration/block.c */
> > >   AnnounceParameters *migrate_announce_params(void);
> > > +
> > >   /* migration/savevm.c */
> > >   void dump_vmstate_json_to_file(FILE *out_fp);
> > > +void qemu_loadvm_start_load_thread(MigrationLoadThread function,
> > > +                                   void *opaque);
> > >   /* migration/migration.c */
> > >   void migration_object_init(void);
> > > diff --git a/include/qemu/typedefs.h b/include/qemu/typedefs.h
> > > index 3d84efcac47a..8c8ea5c2840d 100644
> > > --- a/include/qemu/typedefs.h
> > > +++ b/include/qemu/typedefs.h
> > > @@ -131,5 +131,6 @@ typedef struct IRQState *qemu_irq;
> > >    * Function types
> > >    */
> > >   typedef void (*qemu_irq_handler)(void *opaque, int n, int level);
> > > +typedef int (*MigrationLoadThread)(bool *abort_flag, void *opaque);
> > >   #endif /* QEMU_TYPEDEFS_H */
> > > diff --git a/migration/savevm.c b/migration/savevm.c
> > > index 1f58a2fa54ae..6ea9054c4083 100644
> > > --- a/migration/savevm.c
> > > +++ b/migration/savevm.c
> > > @@ -54,6 +54,7 @@
> > >   #include "qemu/job.h"
> > >   #include "qemu/main-loop.h"
> > >   #include "block/snapshot.h"
> > > +#include "block/thread-pool.h"
> > >   #include "qemu/cutils.h"
> > >   #include "io/channel-buffer.h"
> > >   #include "io/channel-file.h"
> > > @@ -71,6 +72,10 @@
> > >   const unsigned int postcopy_ram_discard_version;
> > > +static ThreadPool *load_threads;
> > > +static int load_threads_ret;
> > > +static bool load_threads_abort;
> > > +
> > >   /* Subcommands for QEMU_VM_COMMAND */
> > >   enum qemu_vm_cmd {
> > >       MIG_CMD_INVALID = 0,   /* Must be 0 */
> > > @@ -2788,6 +2793,12 @@ static int qemu_loadvm_state_setup(QEMUFile *f, Error **errp)
> > >       int ret;
> > >       trace_loadvm_state_setup();
> > > +
> > > +    assert(!load_threads);
> > > +    load_threads = thread_pool_new();
> > > +    load_threads_ret = 0;
> > > +    load_threads_abort = false;
> > 
> > I would introduce a qemu_loadvm_thread_pool_create() helper.
> 
> Will do.

On top of Cedric's suggestion..

Maybe move it over to migration_object_init()?  Then we keep
qemu_loadvm_state_setup() only invoke the load_setup()s.

> 
> > Why is the thead pool always created ? Might be OK.
> > 
> 
> This functionality provides a generic auxiliary load helper threads
> pool, not necessarily tied to the multifd device state transfer.
> 
> That's why the pool is created unconditionally.
> 
> > > +
> > >       QTAILQ_FOREACH(se, &savevm_state.handlers, entry) {
> > >           if (!se->ops || !se->ops->load_setup) {
> > >               continue;
> > > @@ -2806,19 +2817,72 @@ static int qemu_loadvm_state_setup(QEMUFile *f, Error **errp)
> > >               return ret;
> > >           }
> > >       }
> > > +
> > > +    return 0;
> > > +}
> > > +
> > > +struct LoadThreadData {
> > > +    MigrationLoadThread function;
> > > +    void *opaque;
> > > +};
> > > +
> > > +static int qemu_loadvm_load_thread(void *thread_opaque)
> > > +{
> > > +    struct LoadThreadData *data = thread_opaque;
> > > +    int ret;
> > > +
> > > +    ret = data->function(&load_threads_abort, data->opaque);
> > > +    if (ret && !qatomic_read(&load_threads_ret)) {
> > > +        /*
> > > +         * Racy with the above read but that's okay - which thread error
> > > +         * return we report is purely arbitrary anyway.
> > > +         */
> > > +        qatomic_set(&load_threads_ret, ret);
> > > +    }
> > > +
> > >       return 0;>   }
> > > +void qemu_loadvm_start_load_thread(MigrationLoadThread function,
> > > +                                   void *opaque)
> > > +{> +    struct LoadThreadData *data;
> > > +
> > > +    /* We only set it from this thread so it's okay to read it directly */
> > > +    assert(!load_threads_abort);
> > > +
> > > +    data = g_new(struct LoadThreadData, 1);
> > > +    data->function = function;
> > > +    data->opaque = opaque;
> > > +
> > > +    thread_pool_submit(load_threads, qemu_loadvm_load_thread,
> > > +                       data, g_free);
> > > +    thread_pool_adjust_max_threads_to_work(load_threads);
> > > +}> +>   void qemu_loadvm_state_cleanup(void)
> > >   {
> > >       SaveStateEntry *se;
> > >       trace_loadvm_state_cleanup();
> > > +
> > >       QTAILQ_FOREACH(se, &savevm_state.handlers, entry) {
> > >           if (se->ops && se->ops->load_cleanup) {
> > >               se->ops->load_cleanup(se->opaque);
> > >           }
> > >       }
> > > +
> > > +    /*
> > > +     * We might be called even without earlier qemu_loadvm_state_setup()
> > > +     * call if qemu_loadvm_state() fails very early.
> > > +     */
> > > +    if (load_threads) {
> > > +        qatomic_set(&load_threads_abort, true);
> > > +        bql_unlock(); /* Load threads might be waiting for BQL */
> > > +        thread_pool_wait(load_threads);
> > > +        bql_lock();
> > > +        g_clear_pointer(&load_threads, thread_pool_free);
> > > +    }
> > 
> > I would introduce a qemu_loadvm_thread_pool_destroy() helper
> 
> Will do.

Then this one may belong to migration_incoming_state_destroy().

> 
> > >   }
> > >   /* Return true if we should continue the migration, or false. */
> > > @@ -3007,6 +3071,19 @@ int qemu_loadvm_state(QEMUFile *f)
> > >           return ret;
> > >       }
> > > +    if (ret == 0) {
> > > +        bql_unlock(); /* Let load threads do work requiring BQL */
> > > +        thread_pool_wait(load_threads);
> > > +        bql_lock();
> > > +
> > > +        ret = load_threads_ret;
> > > +    }
> > > +    /*
> > > +     * Set this flag unconditionally so we'll catch further attempts to
> > > +     * start additional threads via an appropriate assert()
> > > +     */
> > > +    qatomic_set(&load_threads_abort, true);

I assume this is only for debugging purpose and not required.

Setting "abort all threads" to make sure "nobody will add more thread
tasks" is pretty awkward, IMHO.  If we really want to protect against it
and fail hard, it might be easier after the thread_pool_wait() we free the
pool directly (destroy() will see NULL so it'll skip; still need to free
there in case migration failed before this).  Then any enqueue will access
null pointer on the pool.

-- 
Peter Xu


Re: [PATCH v3 08/24] migration: Add thread pool of optional load threads
Posted by Maciej S. Szmigiero 1 year, 2 months ago
On 4.12.2024 23:48, Peter Xu wrote:
> On Wed, Nov 27, 2024 at 09:16:49PM +0100, Maciej S. Szmigiero wrote:
>> On 27.11.2024 10:13, Cédric Le Goater wrote:
>>> On 11/17/24 20:20, Maciej S. Szmigiero wrote:
>>>> From: "Maciej S. Szmigiero" <maciej.szmigiero@oracle.com>
>>>>
>>>> Some drivers might want to make use of auxiliary helper threads during VM
>>>> state loading, for example to make sure that their blocking (sync) I/O
>>>> operations don't block the rest of the migration process.
>>>>
>>>> Add a migration core managed thread pool to facilitate this use case.
>>>>
>>>> The migration core will wait for these threads to finish before
>>>> (re)starting the VM at destination.
>>>>
>>>> Signed-off-by: Maciej S. Szmigiero <maciej.szmigiero@oracle.com>
>>>> ---
>>>>    include/migration/misc.h |  3 ++
>>>>    include/qemu/typedefs.h  |  1 +
>>>>    migration/savevm.c       | 77 ++++++++++++++++++++++++++++++++++++++++
>>>>    3 files changed, 81 insertions(+)
>>>>
>>>> diff --git a/include/migration/misc.h b/include/migration/misc.h
>>>> index 804eb23c0607..c92ca018ab3b 100644
>>>> --- a/include/migration/misc.h
>>>> +++ b/include/migration/misc.h
>>>> @@ -45,9 +45,12 @@ bool migrate_ram_is_ignored(RAMBlock *block);
>>>>    /* migration/block.c */
>>>>    AnnounceParameters *migrate_announce_params(void);
>>>> +
>>>>    /* migration/savevm.c */
>>>>    void dump_vmstate_json_to_file(FILE *out_fp);
>>>> +void qemu_loadvm_start_load_thread(MigrationLoadThread function,
>>>> +                                   void *opaque);
>>>>    /* migration/migration.c */
>>>>    void migration_object_init(void);
>>>> diff --git a/include/qemu/typedefs.h b/include/qemu/typedefs.h
>>>> index 3d84efcac47a..8c8ea5c2840d 100644
>>>> --- a/include/qemu/typedefs.h
>>>> +++ b/include/qemu/typedefs.h
>>>> @@ -131,5 +131,6 @@ typedef struct IRQState *qemu_irq;
>>>>     * Function types
>>>>     */
>>>>    typedef void (*qemu_irq_handler)(void *opaque, int n, int level);
>>>> +typedef int (*MigrationLoadThread)(bool *abort_flag, void *opaque);
>>>>    #endif /* QEMU_TYPEDEFS_H */
>>>> diff --git a/migration/savevm.c b/migration/savevm.c
>>>> index 1f58a2fa54ae..6ea9054c4083 100644
>>>> --- a/migration/savevm.c
>>>> +++ b/migration/savevm.c
>>>> @@ -54,6 +54,7 @@
>>>>    #include "qemu/job.h"
>>>>    #include "qemu/main-loop.h"
>>>>    #include "block/snapshot.h"
>>>> +#include "block/thread-pool.h"
>>>>    #include "qemu/cutils.h"
>>>>    #include "io/channel-buffer.h"
>>>>    #include "io/channel-file.h"
>>>> @@ -71,6 +72,10 @@
>>>>    const unsigned int postcopy_ram_discard_version;
>>>> +static ThreadPool *load_threads;
>>>> +static int load_threads_ret;
>>>> +static bool load_threads_abort;
>>>> +
>>>>    /* Subcommands for QEMU_VM_COMMAND */
>>>>    enum qemu_vm_cmd {
>>>>        MIG_CMD_INVALID = 0,   /* Must be 0 */
>>>> @@ -2788,6 +2793,12 @@ static int qemu_loadvm_state_setup(QEMUFile *f, Error **errp)
>>>>        int ret;
>>>>        trace_loadvm_state_setup();
>>>> +
>>>> +    assert(!load_threads);
>>>> +    load_threads = thread_pool_new();
>>>> +    load_threads_ret = 0;
>>>> +    load_threads_abort = false;
>>>
>>> I would introduce a qemu_loadvm_thread_pool_create() helper.
>>
>> Will do.
> 
> On top of Cedric's suggestion..
> 
> Maybe move it over to migration_object_init()?  Then we keep
> qemu_loadvm_state_setup() only invoke the load_setup()s.

AFAIK migration_object_init() is called unconditionally
at QEMU startup even if there won't me any migration done?

Creating a load thread pool there seems wasteful if no
incoming migration will ever take place (or will but only
much later).

>>
>>> Why is the thead pool always created ? Might be OK.
>>>
>>
>> This functionality provides a generic auxiliary load helper threads
>> pool, not necessarily tied to the multifd device state transfer.
>>
>> That's why the pool is created unconditionally.
>>
>>>> +
>>>>        QTAILQ_FOREACH(se, &savevm_state.handlers, entry) {
>>>>            if (!se->ops || !se->ops->load_setup) {
>>>>                continue;
>>>> @@ -2806,19 +2817,72 @@ static int qemu_loadvm_state_setup(QEMUFile *f, Error **errp)
>>>>                return ret;
>>>>            }
>>>>        }
>>>> +
>>>> +    return 0;
>>>> +}
>>>> +
>>>> +struct LoadThreadData {
>>>> +    MigrationLoadThread function;
>>>> +    void *opaque;
>>>> +};
>>>> +
>>>> +static int qemu_loadvm_load_thread(void *thread_opaque)
>>>> +{
>>>> +    struct LoadThreadData *data = thread_opaque;
>>>> +    int ret;
>>>> +
>>>> +    ret = data->function(&load_threads_abort, data->opaque);
>>>> +    if (ret && !qatomic_read(&load_threads_ret)) {
>>>> +        /*
>>>> +         * Racy with the above read but that's okay - which thread error
>>>> +         * return we report is purely arbitrary anyway.
>>>> +         */
>>>> +        qatomic_set(&load_threads_ret, ret);
>>>> +    }
>>>> +
>>>>        return 0;>   }
>>>> +void qemu_loadvm_start_load_thread(MigrationLoadThread function,
>>>> +                                   void *opaque)
>>>> +{> +    struct LoadThreadData *data;
>>>> +
>>>> +    /* We only set it from this thread so it's okay to read it directly */
>>>> +    assert(!load_threads_abort);
>>>> +
>>>> +    data = g_new(struct LoadThreadData, 1);
>>>> +    data->function = function;
>>>> +    data->opaque = opaque;
>>>> +
>>>> +    thread_pool_submit(load_threads, qemu_loadvm_load_thread,
>>>> +                       data, g_free);
>>>> +    thread_pool_adjust_max_threads_to_work(load_threads);
>>>> +}> +>   void qemu_loadvm_state_cleanup(void)
>>>>    {
>>>>        SaveStateEntry *se;
>>>>        trace_loadvm_state_cleanup();
>>>> +
>>>>        QTAILQ_FOREACH(se, &savevm_state.handlers, entry) {
>>>>            if (se->ops && se->ops->load_cleanup) {
>>>>                se->ops->load_cleanup(se->opaque);
>>>>            }
>>>>        }
>>>> +
>>>> +    /*
>>>> +     * We might be called even without earlier qemu_loadvm_state_setup()
>>>> +     * call if qemu_loadvm_state() fails very early.
>>>> +     */
>>>> +    if (load_threads) {
>>>> +        qatomic_set(&load_threads_abort, true);
>>>> +        bql_unlock(); /* Load threads might be waiting for BQL */
>>>> +        thread_pool_wait(load_threads);
>>>> +        bql_lock();
>>>> +        g_clear_pointer(&load_threads, thread_pool_free);
>>>> +    }
>>>
>>> I would introduce a qemu_loadvm_thread_pool_destroy() helper
>>
>> Will do.
> 
> Then this one may belong to migration_incoming_state_destroy().
> 
>>
>>>>    }
>>>>    /* Return true if we should continue the migration, or false. */
>>>> @@ -3007,6 +3071,19 @@ int qemu_loadvm_state(QEMUFile *f)
>>>>            return ret;
>>>>        }
>>>> +    if (ret == 0) {
>>>> +        bql_unlock(); /* Let load threads do work requiring BQL */
>>>> +        thread_pool_wait(load_threads);
>>>> +        bql_lock();
>>>> +
>>>> +        ret = load_threads_ret;
>>>> +    }
>>>> +    /*
>>>> +     * Set this flag unconditionally so we'll catch further attempts to
>>>> +     * start additional threads via an appropriate assert()
>>>> +     */
>>>> +    qatomic_set(&load_threads_abort, true);
> 
> I assume this is only for debugging purpose and not required.
> 
> Setting "abort all threads" to make sure "nobody will add more thread
> tasks" is pretty awkward, IMHO.  If we really want to protect against it
> and fail hard, it might be easier after the thread_pool_wait() we free the
> pool directly (destroy() will see NULL so it'll skip; still need to free
> there in case migration failed before this).  Then any enqueue will access
> null pointer on the pool.

We don't want to destroy the thread pool in the path where the downtime
is still counting.

That's why we only do cleanup after the migration is complete.

The above setting of load_threads_abort flag also makes sure that we abort
load threads if the migration is going to fail for other reasons (non-load
threads related) - in other words, when the above block with thread_pool_wait()
isn't even entered due to ret already containing an earlier error.

Thanks,
Maciej


Re: [PATCH v3 08/24] migration: Add thread pool of optional load threads
Posted by Peter Xu 1 year, 1 month ago
On Wed, Dec 11, 2024 at 12:05:23AM +0100, Maciej S. Szmigiero wrote:
> > Maybe move it over to migration_object_init()?  Then we keep
> > qemu_loadvm_state_setup() only invoke the load_setup()s.
> 
> AFAIK migration_object_init() is called unconditionally
> at QEMU startup even if there won't me any migration done?
> 
> Creating a load thread pool there seems wasteful if no
> incoming migration will ever take place (or will but only
> much later).

I was expecting an empty pool to not be a major resource, but if that's a
concern, yes we can do that until later.

[...]

> > > > > @@ -3007,6 +3071,19 @@ int qemu_loadvm_state(QEMUFile *f)
> > > > >            return ret;
> > > > >        }
> > > > > +    if (ret == 0) {
> > > > > +        bql_unlock(); /* Let load threads do work requiring BQL */
> > > > > +        thread_pool_wait(load_threads);
> > > > > +        bql_lock();
> > > > > +
> > > > > +        ret = load_threads_ret;
> > > > > +    }
> > > > > +    /*
> > > > > +     * Set this flag unconditionally so we'll catch further attempts to
> > > > > +     * start additional threads via an appropriate assert()
> > > > > +     */
> > > > > +    qatomic_set(&load_threads_abort, true);
> > 
> > I assume this is only for debugging purpose and not required.
> > 
> > Setting "abort all threads" to make sure "nobody will add more thread
> > tasks" is pretty awkward, IMHO.  If we really want to protect against it
> > and fail hard, it might be easier after the thread_pool_wait() we free the
> > pool directly (destroy() will see NULL so it'll skip; still need to free
> > there in case migration failed before this).  Then any enqueue will access
> > null pointer on the pool.
> 
> We don't want to destroy the thread pool in the path where the downtime
> is still counting.

Yeah this makes sense.

> 
> That's why we only do cleanup after the migration is complete.
> 
> The above setting of load_threads_abort flag also makes sure that we abort
> load threads if the migration is going to fail for other reasons (non-load
> threads related) - in other words, when the above block with thread_pool_wait()
> isn't even entered due to ret already containing an earlier error.

In that case IIUC we should cleanup the load threads in destroy(), not
here?  Especially with the comment that's even more confusing.

-- 
Peter Xu


Re: [PATCH v3 08/24] migration: Add thread pool of optional load threads
Posted by Maciej S. Szmigiero 1 year, 1 month ago
On 12.12.2024 17:38, Peter Xu wrote:
> On Wed, Dec 11, 2024 at 12:05:23AM +0100, Maciej S. Szmigiero wrote:
>>> Maybe move it over to migration_object_init()?  Then we keep
>>> qemu_loadvm_state_setup() only invoke the load_setup()s.
>>
>> AFAIK migration_object_init() is called unconditionally
>> at QEMU startup even if there won't me any migration done?
>>
>> Creating a load thread pool there seems wasteful if no
>> incoming migration will ever take place (or will but only
>> much later).
> 
> I was expecting an empty pool to not be a major resource, but if that's a
> concern, yes we can do that until later.
> 
> [...]
> 
>>>>>> @@ -3007,6 +3071,19 @@ int qemu_loadvm_state(QEMUFile *f)
>>>>>>             return ret;
>>>>>>         }
>>>>>> +    if (ret == 0) {
>>>>>> +        bql_unlock(); /* Let load threads do work requiring BQL */
>>>>>> +        thread_pool_wait(load_threads);
>>>>>> +        bql_lock();
>>>>>> +
>>>>>> +        ret = load_threads_ret;
>>>>>> +    }
>>>>>> +    /*
>>>>>> +     * Set this flag unconditionally so we'll catch further attempts to
>>>>>> +     * start additional threads via an appropriate assert()
>>>>>> +     */
>>>>>> +    qatomic_set(&load_threads_abort, true);
>>>
>>> I assume this is only for debugging purpose and not required.
>>>
>>> Setting "abort all threads" to make sure "nobody will add more thread
>>> tasks" is pretty awkward, IMHO.  If we really want to protect against it
>>> and fail hard, it might be easier after the thread_pool_wait() we free the
>>> pool directly (destroy() will see NULL so it'll skip; still need to free
>>> there in case migration failed before this).  Then any enqueue will access
>>> null pointer on the pool.
>>
>> We don't want to destroy the thread pool in the path where the downtime
>> is still counting.
> 
> Yeah this makes sense.
> 
>>
>> That's why we only do cleanup after the migration is complete.
>>
>> The above setting of load_threads_abort flag also makes sure that we abort
>> load threads if the migration is going to fail for other reasons (non-load
>> threads related) - in other words, when the above block with thread_pool_wait()
>> isn't even entered due to ret already containing an earlier error.
> 
> In that case IIUC we should cleanup the load threads in destroy(), not
> here?  Especially with the comment that's even more confusing.
> 

This flag only asks the threads in pool which are still running to exit ASAP
(without waiting for them in the "fail for other reasons"
qemu_loadvm_state() code flow).

Setting this flag does *not* do the cleanup of the whole thread pool - this
only happens in qemu_loadvm_state_cleanup().

Thanks,
Maciej


Re: [PATCH v3 08/24] migration: Add thread pool of optional load threads
Posted by Peter Xu 1 year, 1 month ago
On Thu, Dec 12, 2024 at 11:53:24PM +0100, Maciej S. Szmigiero wrote:
> On 12.12.2024 17:38, Peter Xu wrote:
> > On Wed, Dec 11, 2024 at 12:05:23AM +0100, Maciej S. Szmigiero wrote:
> > > > Maybe move it over to migration_object_init()?  Then we keep
> > > > qemu_loadvm_state_setup() only invoke the load_setup()s.
> > > 
> > > AFAIK migration_object_init() is called unconditionally
> > > at QEMU startup even if there won't me any migration done?
> > > 
> > > Creating a load thread pool there seems wasteful if no
> > > incoming migration will ever take place (or will but only
> > > much later).
> > 
> > I was expecting an empty pool to not be a major resource, but if that's a
> > concern, yes we can do that until later.
> > 
> > [...]
> > 
> > > > > > > @@ -3007,6 +3071,19 @@ int qemu_loadvm_state(QEMUFile *f)
> > > > > > >             return ret;
> > > > > > >         }
> > > > > > > +    if (ret == 0) {
> > > > > > > +        bql_unlock(); /* Let load threads do work requiring BQL */
> > > > > > > +        thread_pool_wait(load_threads);
> > > > > > > +        bql_lock();
> > > > > > > +
> > > > > > > +        ret = load_threads_ret;
> > > > > > > +    }
> > > > > > > +    /*
> > > > > > > +     * Set this flag unconditionally so we'll catch further attempts to
> > > > > > > +     * start additional threads via an appropriate assert()
> > > > > > > +     */
> > > > > > > +    qatomic_set(&load_threads_abort, true);
> > > > 
> > > > I assume this is only for debugging purpose and not required.
> > > > 
> > > > Setting "abort all threads" to make sure "nobody will add more thread
> > > > tasks" is pretty awkward, IMHO.  If we really want to protect against it
> > > > and fail hard, it might be easier after the thread_pool_wait() we free the
> > > > pool directly (destroy() will see NULL so it'll skip; still need to free
> > > > there in case migration failed before this).  Then any enqueue will access
> > > > null pointer on the pool.
> > > 
> > > We don't want to destroy the thread pool in the path where the downtime
> > > is still counting.
> > 
> > Yeah this makes sense.
> > 
> > > 
> > > That's why we only do cleanup after the migration is complete.
> > > 
> > > The above setting of load_threads_abort flag also makes sure that we abort
> > > load threads if the migration is going to fail for other reasons (non-load
> > > threads related) - in other words, when the above block with thread_pool_wait()
> > > isn't even entered due to ret already containing an earlier error.
> > 
> > In that case IIUC we should cleanup the load threads in destroy(), not
> > here?  Especially with the comment that's even more confusing.
> > 
> 
> This flag only asks the threads in pool which are still running to exit ASAP
> (without waiting for them in the "fail for other reasons"
> qemu_loadvm_state() code flow).

I thought we could switch to an Error** model as we talked elsewhere, then
the thread who hits the error should set the quit flag, IIUC.

Even without it..

> 
> Setting this flag does *not* do the cleanup of the whole thread pool - this
> only happens in qemu_loadvm_state_cleanup().

... we have two cases here:

Either no error at all, then thread_pool_wait() will wait for all threads
until finished.  When reaching here setting this flag shouldn't matter for
the threads because they're all finished.

Or there's error in some thread, then QEMU should be stuck at
thread_pool_wait() anyway, until all threads quit.  Again, I thought it
could be the qemu_loadvm_load_thread() that sets the quit flag (rather than
here) so the failed thread will notify all threads to quit.

I just still don't see what's the help of setting it after
thread_pool_wait(), which already marked all threads finished at its
return.  That goes back to my question on whether it was only for debugging
(so no new threads to be created after this), rather than the flag to tell
all threads to quit.

Thanks,

-- 
Peter Xu


Re: [PATCH v3 08/24] migration: Add thread pool of optional load threads
Posted by Maciej S. Szmigiero 1 year, 1 month ago
On 16.12.2024 17:29, Peter Xu wrote:
> On Thu, Dec 12, 2024 at 11:53:24PM +0100, Maciej S. Szmigiero wrote:
>> On 12.12.2024 17:38, Peter Xu wrote:
>>> On Wed, Dec 11, 2024 at 12:05:23AM +0100, Maciej S. Szmigiero wrote:
>>>>> Maybe move it over to migration_object_init()?  Then we keep
>>>>> qemu_loadvm_state_setup() only invoke the load_setup()s.
>>>>
>>>> AFAIK migration_object_init() is called unconditionally
>>>> at QEMU startup even if there won't me any migration done?
>>>>
>>>> Creating a load thread pool there seems wasteful if no
>>>> incoming migration will ever take place (or will but only
>>>> much later).
>>>
>>> I was expecting an empty pool to not be a major resource, but if that's a
>>> concern, yes we can do that until later.
>>>
>>> [...]
>>>
>>>>>>>> @@ -3007,6 +3071,19 @@ int qemu_loadvm_state(QEMUFile *f)
>>>>>>>>              return ret;
>>>>>>>>          }
>>>>>>>> +    if (ret == 0) {
>>>>>>>> +        bql_unlock(); /* Let load threads do work requiring BQL */
>>>>>>>> +        thread_pool_wait(load_threads);
>>>>>>>> +        bql_lock();
>>>>>>>> +
>>>>>>>> +        ret = load_threads_ret;
>>>>>>>> +    }
>>>>>>>> +    /*
>>>>>>>> +     * Set this flag unconditionally so we'll catch further attempts to
>>>>>>>> +     * start additional threads via an appropriate assert()
>>>>>>>> +     */
>>>>>>>> +    qatomic_set(&load_threads_abort, true);
>>>>>
>>>>> I assume this is only for debugging purpose and not required.
>>>>>
>>>>> Setting "abort all threads" to make sure "nobody will add more thread
>>>>> tasks" is pretty awkward, IMHO.  If we really want to protect against it
>>>>> and fail hard, it might be easier after the thread_pool_wait() we free the
>>>>> pool directly (destroy() will see NULL so it'll skip; still need to free
>>>>> there in case migration failed before this).  Then any enqueue will access
>>>>> null pointer on the pool.
>>>>
>>>> We don't want to destroy the thread pool in the path where the downtime
>>>> is still counting.
>>>
>>> Yeah this makes sense.
>>>
>>>>
>>>> That's why we only do cleanup after the migration is complete.
>>>>
>>>> The above setting of load_threads_abort flag also makes sure that we abort
>>>> load threads if the migration is going to fail for other reasons (non-load
>>>> threads related) - in other words, when the above block with thread_pool_wait()
>>>> isn't even entered due to ret already containing an earlier error.
>>>
>>> In that case IIUC we should cleanup the load threads in destroy(), not
>>> here?  Especially with the comment that's even more confusing.
>>>
>>
>> This flag only asks the threads in pool which are still running to exit ASAP
>> (without waiting for them in the "fail for other reasons"
>> qemu_loadvm_state() code flow).
> 
> I thought we could switch to an Error** model as we talked elsewhere, then
> the thread who hits the error should set the quit flag, IIUC.
> 
> Even without it..
> 
>>
>> Setting this flag does *not* do the cleanup of the whole thread pool - this
>> only happens in qemu_loadvm_state_cleanup().
> 
> ... we have two cases here:
> 
> Either no error at all, then thread_pool_wait() will wait for all threads
> until finished.  When reaching here setting this flag shouldn't matter for
> the threads because they're all finished.
> 
> Or there's error in some thread, then QEMU should be stuck at
> thread_pool_wait() anyway, until all threads quit.  Again, I thought it
> could be the qemu_loadvm_load_thread() that sets the quit flag (rather than
> here) so the failed thread will notify all threads to quit.
> 
> I just still don't see what's the help of setting it after
> thread_pool_wait(), which already marked all threads finished at its
> return.  That goes back to my question on whether it was only for debugging
> (so no new threads to be created after this), rather than the flag to tell
> all threads to quit.

There's also a possibility of earlier error in qemu_loadvm_state()
(not in the load threads themselves), for example if qemu_loadvm_state_main()
returns an error.

In this case thread_pool_wait() *won't* be called but the load threads
would still be running needlessly - setting load_threads_abort flag makes
them stop.

The debugging benefit of assert()ing when someone tries to create
a load thread after that point comes essentially for free then.

> Thanks,
> 

Thanks,
Maciej


Re: [PATCH v3 08/24] migration: Add thread pool of optional load threads
Posted by Peter Xu 1 year, 1 month ago
On Tue, Dec 17, 2024 at 12:15:36AM +0100, Maciej S. Szmigiero wrote:
> On 16.12.2024 17:29, Peter Xu wrote:
> > On Thu, Dec 12, 2024 at 11:53:24PM +0100, Maciej S. Szmigiero wrote:
> > > On 12.12.2024 17:38, Peter Xu wrote:
> > > > On Wed, Dec 11, 2024 at 12:05:23AM +0100, Maciej S. Szmigiero wrote:
> > > > > > Maybe move it over to migration_object_init()?  Then we keep
> > > > > > qemu_loadvm_state_setup() only invoke the load_setup()s.
> > > > > 
> > > > > AFAIK migration_object_init() is called unconditionally
> > > > > at QEMU startup even if there won't me any migration done?
> > > > > 
> > > > > Creating a load thread pool there seems wasteful if no
> > > > > incoming migration will ever take place (or will but only
> > > > > much later).
> > > > 
> > > > I was expecting an empty pool to not be a major resource, but if that's a
> > > > concern, yes we can do that until later.
> > > > 
> > > > [...]
> > > > 
> > > > > > > > > @@ -3007,6 +3071,19 @@ int qemu_loadvm_state(QEMUFile *f)
> > > > > > > > >              return ret;
> > > > > > > > >          }
> > > > > > > > > +    if (ret == 0) {
> > > > > > > > > +        bql_unlock(); /* Let load threads do work requiring BQL */
> > > > > > > > > +        thread_pool_wait(load_threads);
> > > > > > > > > +        bql_lock();
> > > > > > > > > +
> > > > > > > > > +        ret = load_threads_ret;
> > > > > > > > > +    }
> > > > > > > > > +    /*
> > > > > > > > > +     * Set this flag unconditionally so we'll catch further attempts to
> > > > > > > > > +     * start additional threads via an appropriate assert()
> > > > > > > > > +     */
> > > > > > > > > +    qatomic_set(&load_threads_abort, true);
> > > > > > 
> > > > > > I assume this is only for debugging purpose and not required.
> > > > > > 
> > > > > > Setting "abort all threads" to make sure "nobody will add more thread
> > > > > > tasks" is pretty awkward, IMHO.  If we really want to protect against it
> > > > > > and fail hard, it might be easier after the thread_pool_wait() we free the
> > > > > > pool directly (destroy() will see NULL so it'll skip; still need to free
> > > > > > there in case migration failed before this).  Then any enqueue will access
> > > > > > null pointer on the pool.
> > > > > 
> > > > > We don't want to destroy the thread pool in the path where the downtime
> > > > > is still counting.
> > > > 
> > > > Yeah this makes sense.
> > > > 
> > > > > 
> > > > > That's why we only do cleanup after the migration is complete.
> > > > > 
> > > > > The above setting of load_threads_abort flag also makes sure that we abort
> > > > > load threads if the migration is going to fail for other reasons (non-load
> > > > > threads related) - in other words, when the above block with thread_pool_wait()
> > > > > isn't even entered due to ret already containing an earlier error.
> > > > 
> > > > In that case IIUC we should cleanup the load threads in destroy(), not
> > > > here?  Especially with the comment that's even more confusing.
> > > > 
> > > 
> > > This flag only asks the threads in pool which are still running to exit ASAP
> > > (without waiting for them in the "fail for other reasons"
> > > qemu_loadvm_state() code flow).
> > 
> > I thought we could switch to an Error** model as we talked elsewhere, then
> > the thread who hits the error should set the quit flag, IIUC.
> > 
> > Even without it..
> > 
> > > 
> > > Setting this flag does *not* do the cleanup of the whole thread pool - this
> > > only happens in qemu_loadvm_state_cleanup().
> > 
> > ... we have two cases here:
> > 
> > Either no error at all, then thread_pool_wait() will wait for all threads
> > until finished.  When reaching here setting this flag shouldn't matter for
> > the threads because they're all finished.
> > 
> > Or there's error in some thread, then QEMU should be stuck at
> > thread_pool_wait() anyway, until all threads quit.  Again, I thought it
> > could be the qemu_loadvm_load_thread() that sets the quit flag (rather than
> > here) so the failed thread will notify all threads to quit.
> > 
> > I just still don't see what's the help of setting it after
> > thread_pool_wait(), which already marked all threads finished at its
> > return.  That goes back to my question on whether it was only for debugging
> > (so no new threads to be created after this), rather than the flag to tell
> > all threads to quit.
> 
> There's also a possibility of earlier error in qemu_loadvm_state()
> (not in the load threads themselves), for example if qemu_loadvm_state_main()
> returns an error.
> 
> In this case thread_pool_wait() *won't* be called but the load threads
> would still be running needlessly - setting load_threads_abort flag makes
> them stop.
> 
> The debugging benefit of assert()ing when someone tries to create
> a load thread after that point comes essentially for free then.

In that case, IMHO we should put all cleanup stuff into the cleanup
function, like migration_incoming_state_destroy().  I'd hope not having
that only for such debug purpose, OTOH.

This step can be easily overlook when this function adds more things.
Personally, I'm totally not a fan of using "if (ret==0) {do something}" and
keep writting like that.. but that's not the issue of this patch alone, so
we can do that for later.  Even so, having it here is still error prone
(e.g. consider one "goto" in the future before this step, logically it
should skip all next steps if a prior one fails).

Thanks,

-- 
Peter Xu


Re: [PATCH v3 08/24] migration: Add thread pool of optional load threads
Posted by Peter Xu 1 year, 2 months ago
On Wed, Dec 04, 2024 at 05:48:52PM -0500, Peter Xu wrote:
> > > > @@ -71,6 +72,10 @@
> > > >   const unsigned int postcopy_ram_discard_version;
> > > > +static ThreadPool *load_threads;
> > > > +static int load_threads_ret;
> > > > +static bool load_threads_abort;

One thing I forgot to mention in the previous reply..

We should avoid adding random global vars.  I hope we can still move these
into MigrationIncomingState.

-- 
Peter Xu


Re: [PATCH v3 08/24] migration: Add thread pool of optional load threads
Posted by Maciej S. Szmigiero 1 year, 2 months ago
On 5.12.2024 17:15, Peter Xu wrote:
> On Wed, Dec 04, 2024 at 05:48:52PM -0500, Peter Xu wrote:
>>>>> @@ -71,6 +72,10 @@
>>>>>    const unsigned int postcopy_ram_discard_version;
>>>>> +static ThreadPool *load_threads;
>>>>> +static int load_threads_ret;
>>>>> +static bool load_threads_abort;
> 
> One thing I forgot to mention in the previous reply..
> 
> We should avoid adding random global vars.  I hope we can still move these
> into MigrationIncomingState.
> 

Sure, this should be possible even if the thread pool
initialization happens in qemu_loadvm_state_setup().

Thanks,
Maciej


Re: [PATCH v3 08/24] migration: Add thread pool of optional load threads
Posted by Fabiano Rosas 1 year, 2 months ago
"Maciej S. Szmigiero" <mail@maciej.szmigiero.name> writes:

> From: "Maciej S. Szmigiero" <maciej.szmigiero@oracle.com>
>
> Some drivers might want to make use of auxiliary helper threads during VM
> state loading, for example to make sure that their blocking (sync) I/O
> operations don't block the rest of the migration process.
>
> Add a migration core managed thread pool to facilitate this use case.
>
> The migration core will wait for these threads to finish before
> (re)starting the VM at destination.
>
> Signed-off-by: Maciej S. Szmigiero <maciej.szmigiero@oracle.com>

Reviewed-by: Fabiano Rosas <farosas@suse.de>