From: "Maciej S. Szmigiero" <maciej.szmigiero@oracle.com>
Migration code wants to manage device data sending threads in one place.
QEMU has an existing thread pool implementation, however it is limited
to queuing AIO operations only and essentially has a 1:1 mapping between
the current AioContext and the AIO ThreadPool in use.
Implement generic (non-AIO) ThreadPool by essentially wrapping Glib's
GThreadPool.
This brings a few new operations on a pool:
* thread_pool_wait() operation waits until all the submitted work requests
have finished.
* thread_pool_set_max_threads() explicitly sets the maximum thread count
in the pool.
* thread_pool_adjust_max_threads_to_work() adjusts the maximum thread count
in the pool to equal the number of still waiting in queue or unfinished work.
Signed-off-by: Maciej S. Szmigiero <maciej.szmigiero@oracle.com>
---
include/block/thread-pool.h | 9 +++
util/thread-pool.c | 109 ++++++++++++++++++++++++++++++++++++
2 files changed, 118 insertions(+)
diff --git a/include/block/thread-pool.h b/include/block/thread-pool.h
index 6f27eb085b45..3f9f66307b65 100644
--- a/include/block/thread-pool.h
+++ b/include/block/thread-pool.h
@@ -38,5 +38,14 @@ BlockAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func, void *arg,
int coroutine_fn thread_pool_submit_co(ThreadPoolFunc *func, void *arg);
void thread_pool_update_params(ThreadPoolAio *pool, struct AioContext *ctx);
+typedef struct ThreadPool ThreadPool;
+
+ThreadPool *thread_pool_new(void);
+void thread_pool_free(ThreadPool *pool);
+void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func,
+ void *opaque, GDestroyNotify opaque_destroy);
+void thread_pool_wait(ThreadPool *pool);
+bool thread_pool_set_max_threads(ThreadPool *pool, int max_threads);
+bool thread_pool_adjust_max_threads_to_work(ThreadPool *pool);
#endif
diff --git a/util/thread-pool.c b/util/thread-pool.c
index 908194dc070f..d80c4181c897 100644
--- a/util/thread-pool.c
+++ b/util/thread-pool.c
@@ -374,3 +374,112 @@ void thread_pool_free_aio(ThreadPoolAio *pool)
qemu_mutex_destroy(&pool->lock);
g_free(pool);
}
+
+struct ThreadPool { /* type safety */
+ GThreadPool *t;
+ size_t unfinished_el_ctr;
+ QemuMutex unfinished_el_ctr_mutex;
+ QemuCond unfinished_el_ctr_zero_cond;
+};
+
+typedef struct {
+ ThreadPoolFunc *func;
+ void *opaque;
+ GDestroyNotify opaque_destroy;
+} ThreadPoolElement;
+
+static void thread_pool_func(gpointer data, gpointer user_data)
+{
+ ThreadPool *pool = user_data;
+ g_autofree ThreadPoolElement *el = data;
+
+ el->func(el->opaque);
+
+ if (el->opaque_destroy) {
+ el->opaque_destroy(el->opaque);
+ }
+
+ QEMU_LOCK_GUARD(&pool->unfinished_el_ctr_mutex);
+
+ assert(pool->unfinished_el_ctr > 0);
+ pool->unfinished_el_ctr--;
+
+ if (pool->unfinished_el_ctr == 0) {
+ qemu_cond_signal(&pool->unfinished_el_ctr_zero_cond);
+ }
+}
+
+ThreadPool *thread_pool_new(void)
+{
+ ThreadPool *pool = g_new(ThreadPool, 1);
+
+ pool->unfinished_el_ctr = 0;
+ qemu_mutex_init(&pool->unfinished_el_ctr_mutex);
+ qemu_cond_init(&pool->unfinished_el_ctr_zero_cond);
+
+ pool->t = g_thread_pool_new(thread_pool_func, pool, 0, TRUE, NULL);
+ /*
+ * g_thread_pool_new() can only return errors if initial thread(s)
+ * creation fails but we ask for 0 initial threads above.
+ */
+ assert(pool->t);
+
+ return pool;
+}
+
+void thread_pool_free(ThreadPool *pool)
+{
+ g_thread_pool_free(pool->t, FALSE, TRUE);
+
+ qemu_cond_destroy(&pool->unfinished_el_ctr_zero_cond);
+ qemu_mutex_destroy(&pool->unfinished_el_ctr_mutex);
+
+ g_free(pool);
+}
+
+void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func,
+ void *opaque, GDestroyNotify opaque_destroy)
+{
+ ThreadPoolElement *el = g_new(ThreadPoolElement, 1);
+
+ el->func = func;
+ el->opaque = opaque;
+ el->opaque_destroy = opaque_destroy;
+
+ WITH_QEMU_LOCK_GUARD(&pool->unfinished_el_ctr_mutex) {
+ pool->unfinished_el_ctr++;
+ }
+
+ /*
+ * Ignore the return value since this function can only return errors
+ * if creation of an additional thread fails but even in this case the
+ * provided work is still getting queued (just for the existing threads).
+ */
+ g_thread_pool_push(pool->t, el, NULL);
+}
+
+void thread_pool_wait(ThreadPool *pool)
+{
+ QEMU_LOCK_GUARD(&pool->unfinished_el_ctr_mutex);
+
+ if (pool->unfinished_el_ctr > 0) {
+ qemu_cond_wait(&pool->unfinished_el_ctr_zero_cond,
+ &pool->unfinished_el_ctr_mutex);
+ assert(pool->unfinished_el_ctr == 0);
+ }
+}
+
+bool thread_pool_set_max_threads(ThreadPool *pool,
+ int max_threads)
+{
+ assert(max_threads > 0);
+
+ return g_thread_pool_set_max_threads(pool->t, max_threads, NULL);
+}
+
+bool thread_pool_adjust_max_threads_to_work(ThreadPool *pool)
+{
+ QEMU_LOCK_GUARD(&pool->unfinished_el_ctr_mutex);
+
+ return thread_pool_set_max_threads(pool, pool->unfinished_el_ctr);
+}
On Sun, Nov 17, 2024 at 08:19:59PM +0100, Maciej S. Szmigiero wrote: > From: "Maciej S. Szmigiero" <maciej.szmigiero@oracle.com> > > Migration code wants to manage device data sending threads in one place. > > QEMU has an existing thread pool implementation, however it is limited > to queuing AIO operations only and essentially has a 1:1 mapping between > the current AioContext and the AIO ThreadPool in use. > > Implement generic (non-AIO) ThreadPool by essentially wrapping Glib's > GThreadPool. > > This brings a few new operations on a pool: > * thread_pool_wait() operation waits until all the submitted work requests > have finished. > > * thread_pool_set_max_threads() explicitly sets the maximum thread count > in the pool. > > * thread_pool_adjust_max_threads_to_work() adjusts the maximum thread count > in the pool to equal the number of still waiting in queue or unfinished work. > > Signed-off-by: Maciej S. Szmigiero <maciej.szmigiero@oracle.com> All the comments so far make sense to me too, so if you address all of them, feel free to take this alone: Reviewed-by: Peter Xu <peterx@redhat.com> -- Peter Xu
Hi Maciej,
On 17/11/2024 21:19, Maciej S. Szmigiero wrote:
> External email: Use caution opening links or attachments
>
>
> From: "Maciej S. Szmigiero" <maciej.szmigiero@oracle.com>
>
> Migration code wants to manage device data sending threads in one place.
>
> QEMU has an existing thread pool implementation, however it is limited
> to queuing AIO operations only and essentially has a 1:1 mapping between
> the current AioContext and the AIO ThreadPool in use.
>
> Implement generic (non-AIO) ThreadPool by essentially wrapping Glib's
> GThreadPool.
>
> This brings a few new operations on a pool:
> * thread_pool_wait() operation waits until all the submitted work requests
> have finished.
>
> * thread_pool_set_max_threads() explicitly sets the maximum thread count
> in the pool.
>
> * thread_pool_adjust_max_threads_to_work() adjusts the maximum thread count
> in the pool to equal the number of still waiting in queue or unfinished work.
>
> Signed-off-by: Maciej S. Szmigiero <maciej.szmigiero@oracle.com>
> ---
> include/block/thread-pool.h | 9 +++
> util/thread-pool.c | 109 ++++++++++++++++++++++++++++++++++++
> 2 files changed, 118 insertions(+)
>
> diff --git a/include/block/thread-pool.h b/include/block/thread-pool.h
> index 6f27eb085b45..3f9f66307b65 100644
> --- a/include/block/thread-pool.h
> +++ b/include/block/thread-pool.h
> @@ -38,5 +38,14 @@ BlockAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func, void *arg,
> int coroutine_fn thread_pool_submit_co(ThreadPoolFunc *func, void *arg);
> void thread_pool_update_params(ThreadPoolAio *pool, struct AioContext *ctx);
>
> +typedef struct ThreadPool ThreadPool;
> +
> +ThreadPool *thread_pool_new(void);
> +void thread_pool_free(ThreadPool *pool);
> +void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func,
> + void *opaque, GDestroyNotify opaque_destroy);
> +void thread_pool_wait(ThreadPool *pool);
> +bool thread_pool_set_max_threads(ThreadPool *pool, int max_threads);
> +bool thread_pool_adjust_max_threads_to_work(ThreadPool *pool);
>
> #endif
> diff --git a/util/thread-pool.c b/util/thread-pool.c
> index 908194dc070f..d80c4181c897 100644
> --- a/util/thread-pool.c
> +++ b/util/thread-pool.c
> @@ -374,3 +374,112 @@ void thread_pool_free_aio(ThreadPoolAio *pool)
> qemu_mutex_destroy(&pool->lock);
> g_free(pool);
> }
> +
> +struct ThreadPool { /* type safety */
> + GThreadPool *t;
> + size_t unfinished_el_ctr;
> + QemuMutex unfinished_el_ctr_mutex;
> + QemuCond unfinished_el_ctr_zero_cond;
> +};
> +
> +typedef struct {
> + ThreadPoolFunc *func;
> + void *opaque;
> + GDestroyNotify opaque_destroy;
> +} ThreadPoolElement;
> +
> +static void thread_pool_func(gpointer data, gpointer user_data)
> +{
> + ThreadPool *pool = user_data;
> + g_autofree ThreadPoolElement *el = data;
> +
> + el->func(el->opaque);
> +
> + if (el->opaque_destroy) {
> + el->opaque_destroy(el->opaque);
> + }
> +
> + QEMU_LOCK_GUARD(&pool->unfinished_el_ctr_mutex);
> +
> + assert(pool->unfinished_el_ctr > 0);
> + pool->unfinished_el_ctr--;
> +
> + if (pool->unfinished_el_ctr == 0) {
> + qemu_cond_signal(&pool->unfinished_el_ctr_zero_cond);
> + }
> +}
> +
> +ThreadPool *thread_pool_new(void)
> +{
> + ThreadPool *pool = g_new(ThreadPool, 1);
> +
> + pool->unfinished_el_ctr = 0;
> + qemu_mutex_init(&pool->unfinished_el_ctr_mutex);
> + qemu_cond_init(&pool->unfinished_el_ctr_zero_cond);
> +
> + pool->t = g_thread_pool_new(thread_pool_func, pool, 0, TRUE, NULL);
> + /*
> + * g_thread_pool_new() can only return errors if initial thread(s)
> + * creation fails but we ask for 0 initial threads above.
> + */
> + assert(pool->t);
> +
> + return pool;
> +}
> +
> +void thread_pool_free(ThreadPool *pool)
> +{
> + g_thread_pool_free(pool->t, FALSE, TRUE);
> +
> + qemu_cond_destroy(&pool->unfinished_el_ctr_zero_cond);
> + qemu_mutex_destroy(&pool->unfinished_el_ctr_mutex);
> +
> + g_free(pool);
> +}
> +
> +void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func,
> + void *opaque, GDestroyNotify opaque_destroy)
> +{
> + ThreadPoolElement *el = g_new(ThreadPoolElement, 1);
> +
> + el->func = func;
> + el->opaque = opaque;
> + el->opaque_destroy = opaque_destroy;
> +
> + WITH_QEMU_LOCK_GUARD(&pool->unfinished_el_ctr_mutex) {
> + pool->unfinished_el_ctr++;
> + }
> +
> + /*
> + * Ignore the return value since this function can only return errors
> + * if creation of an additional thread fails but even in this case the
> + * provided work is still getting queued (just for the existing threads).
> + */
> + g_thread_pool_push(pool->t, el, NULL);
> +}
> +
> +void thread_pool_wait(ThreadPool *pool)
> +{
> + QEMU_LOCK_GUARD(&pool->unfinished_el_ctr_mutex);
> +
> + if (pool->unfinished_el_ctr > 0) {
> + qemu_cond_wait(&pool->unfinished_el_ctr_zero_cond,
> + &pool->unfinished_el_ctr_mutex);
> + assert(pool->unfinished_el_ctr == 0);
> + }
Shouldn't we put the condition in a while loop and remove the assert (as
the wait may wake up spuriously)?
Thanks.
On 28.11.2024 11:08, Avihai Horon wrote:
> Hi Maciej,
>
> On 17/11/2024 21:19, Maciej S. Szmigiero wrote:
>> External email: Use caution opening links or attachments
>>
>>
>> From: "Maciej S. Szmigiero" <maciej.szmigiero@oracle.com>
>>
>> Migration code wants to manage device data sending threads in one place.
>>
>> QEMU has an existing thread pool implementation, however it is limited
>> to queuing AIO operations only and essentially has a 1:1 mapping between
>> the current AioContext and the AIO ThreadPool in use.
>>
>> Implement generic (non-AIO) ThreadPool by essentially wrapping Glib's
>> GThreadPool.
>>
>> This brings a few new operations on a pool:
>> * thread_pool_wait() operation waits until all the submitted work requests
>> have finished.
>>
>> * thread_pool_set_max_threads() explicitly sets the maximum thread count
>> in the pool.
>>
>> * thread_pool_adjust_max_threads_to_work() adjusts the maximum thread count
>> in the pool to equal the number of still waiting in queue or unfinished work.
>>
>> Signed-off-by: Maciej S. Szmigiero <maciej.szmigiero@oracle.com>
>> ---
>> include/block/thread-pool.h | 9 +++
>> util/thread-pool.c | 109 ++++++++++++++++++++++++++++++++++++
>> 2 files changed, 118 insertions(+)
>>
>> diff --git a/include/block/thread-pool.h b/include/block/thread-pool.h
>> index 6f27eb085b45..3f9f66307b65 100644
>> --- a/include/block/thread-pool.h
>> +++ b/include/block/thread-pool.h
>> @@ -38,5 +38,14 @@ BlockAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func, void *arg,
>> int coroutine_fn thread_pool_submit_co(ThreadPoolFunc *func, void *arg);
>> void thread_pool_update_params(ThreadPoolAio *pool, struct AioContext *ctx);
>>
>> +typedef struct ThreadPool ThreadPool;
>> +
>> +ThreadPool *thread_pool_new(void);
>> +void thread_pool_free(ThreadPool *pool);
>> +void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func,
>> + void *opaque, GDestroyNotify opaque_destroy);
>> +void thread_pool_wait(ThreadPool *pool);
>> +bool thread_pool_set_max_threads(ThreadPool *pool, int max_threads);
>> +bool thread_pool_adjust_max_threads_to_work(ThreadPool *pool);
>>
>> #endif
>> diff --git a/util/thread-pool.c b/util/thread-pool.c
>> index 908194dc070f..d80c4181c897 100644
>> --- a/util/thread-pool.c
>> +++ b/util/thread-pool.c
>> @@ -374,3 +374,112 @@ void thread_pool_free_aio(ThreadPoolAio *pool)
>> qemu_mutex_destroy(&pool->lock);
>> g_free(pool);
>> }
>> +
>> +struct ThreadPool { /* type safety */
>> + GThreadPool *t;
>> + size_t unfinished_el_ctr;
>> + QemuMutex unfinished_el_ctr_mutex;
>> + QemuCond unfinished_el_ctr_zero_cond;
>> +};
>> +
>> +typedef struct {
>> + ThreadPoolFunc *func;
>> + void *opaque;
>> + GDestroyNotify opaque_destroy;
>> +} ThreadPoolElement;
>> +
>> +static void thread_pool_func(gpointer data, gpointer user_data)
>> +{
>> + ThreadPool *pool = user_data;
>> + g_autofree ThreadPoolElement *el = data;
>> +
>> + el->func(el->opaque);
>> +
>> + if (el->opaque_destroy) {
>> + el->opaque_destroy(el->opaque);
>> + }
>> +
>> + QEMU_LOCK_GUARD(&pool->unfinished_el_ctr_mutex);
>> +
>> + assert(pool->unfinished_el_ctr > 0);
>> + pool->unfinished_el_ctr--;
>> +
>> + if (pool->unfinished_el_ctr == 0) {
>> + qemu_cond_signal(&pool->unfinished_el_ctr_zero_cond);
>> + }
>> +}
>> +
>> +ThreadPool *thread_pool_new(void)
>> +{
>> + ThreadPool *pool = g_new(ThreadPool, 1);
>> +
>> + pool->unfinished_el_ctr = 0;
>> + qemu_mutex_init(&pool->unfinished_el_ctr_mutex);
>> + qemu_cond_init(&pool->unfinished_el_ctr_zero_cond);
>> +
>> + pool->t = g_thread_pool_new(thread_pool_func, pool, 0, TRUE, NULL);
>> + /*
>> + * g_thread_pool_new() can only return errors if initial thread(s)
>> + * creation fails but we ask for 0 initial threads above.
>> + */
>> + assert(pool->t);
>> +
>> + return pool;
>> +}
>> +
>> +void thread_pool_free(ThreadPool *pool)
>> +{
>> + g_thread_pool_free(pool->t, FALSE, TRUE);
>> +
>> + qemu_cond_destroy(&pool->unfinished_el_ctr_zero_cond);
>> + qemu_mutex_destroy(&pool->unfinished_el_ctr_mutex);
>> +
>> + g_free(pool);
>> +}
>> +
>> +void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func,
>> + void *opaque, GDestroyNotify opaque_destroy)
>> +{
>> + ThreadPoolElement *el = g_new(ThreadPoolElement, 1);
>> +
>> + el->func = func;
>> + el->opaque = opaque;
>> + el->opaque_destroy = opaque_destroy;
>> +
>> + WITH_QEMU_LOCK_GUARD(&pool->unfinished_el_ctr_mutex) {
>> + pool->unfinished_el_ctr++;
>> + }
>> +
>> + /*
>> + * Ignore the return value since this function can only return errors
>> + * if creation of an additional thread fails but even in this case the
>> + * provided work is still getting queued (just for the existing threads).
>> + */
>> + g_thread_pool_push(pool->t, el, NULL);
>> +}
>> +
>> +void thread_pool_wait(ThreadPool *pool)
>> +{
>> + QEMU_LOCK_GUARD(&pool->unfinished_el_ctr_mutex);
>> +
>> + if (pool->unfinished_el_ctr > 0) {
>> + qemu_cond_wait(&pool->unfinished_el_ctr_zero_cond,
>> + &pool->unfinished_el_ctr_mutex);
>> + assert(pool->unfinished_el_ctr == 0);
>> + }
>
> Shouldn't we put the condition in a while loop and remove the assert (as the wait may wake up spuriously)?
You're right - spurious wake-ups can theoretically happen.
> Thanks.
>
Thanks,
Maciej
On 11/17/24 20:19, Maciej S. Szmigiero wrote:
> From: "Maciej S. Szmigiero" <maciej.szmigiero@oracle.com>
>
> Migration code wants to manage device data sending threads in one place.
>
> QEMU has an existing thread pool implementation, however it is limited
> to queuing AIO operations only and essentially has a 1:1 mapping between
> the current AioContext and the AIO ThreadPool in use.
>
> Implement generic (non-AIO) ThreadPool by essentially wrapping Glib's
> GThreadPool.
>
> This brings a few new operations on a pool:
> * thread_pool_wait() operation waits until all the submitted work requests
> have finished.
>
> * thread_pool_set_max_threads() explicitly sets the maximum thread count
> in the pool.
>
> * thread_pool_adjust_max_threads_to_work() adjusts the maximum thread count
> in the pool to equal the number of still waiting in queue or unfinished work.
>
> Signed-off-by: Maciej S. Szmigiero <maciej.szmigiero@oracle.com>
> ---
> include/block/thread-pool.h | 9 +++
> util/thread-pool.c | 109 ++++++++++++++++++++++++++++++++++++
> 2 files changed, 118 insertions(+)
>
> diff --git a/include/block/thread-pool.h b/include/block/thread-pool.h
> index 6f27eb085b45..3f9f66307b65 100644
> --- a/include/block/thread-pool.h
> +++ b/include/block/thread-pool.h
> @@ -38,5 +38,14 @@ BlockAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func, void *arg,
> int coroutine_fn thread_pool_submit_co(ThreadPoolFunc *func, void *arg);
> void thread_pool_update_params(ThreadPoolAio *pool, struct AioContext *ctx);
>
> +typedef struct ThreadPool ThreadPool;
> +
> +ThreadPool *thread_pool_new(void);
> +void thread_pool_free(ThreadPool *pool);
> +void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func,
> + void *opaque, GDestroyNotify opaque_destroy);
> +void thread_pool_wait(ThreadPool *pool);
> +bool thread_pool_set_max_threads(ThreadPool *pool, int max_threads);
> +bool thread_pool_adjust_max_threads_to_work(ThreadPool *pool);
We should add documentation for these routines.
> #endif
> diff --git a/util/thread-pool.c b/util/thread-pool.c
> index 908194dc070f..d80c4181c897 100644
> --- a/util/thread-pool.c
> +++ b/util/thread-pool.c
> @@ -374,3 +374,112 @@ void thread_pool_free_aio(ThreadPoolAio *pool)
> qemu_mutex_destroy(&pool->lock);
> g_free(pool);
> }
> +
> +struct ThreadPool { /* type safety */
> + GThreadPool *t;
> + size_t unfinished_el_ctr;
> + QemuMutex unfinished_el_ctr_mutex;
> + QemuCond unfinished_el_ctr_zero_cond;
> +};
I find the naming of the attributes a little confusing. Could we
use names similar to ThreadPoolAio. Something like :
struct ThreadPool { /* type safety */
GThreadPool *t;
int cur_threads;
QemuMutex lock;
QemuCond finished_cond;
};
> +
> +typedef struct {
> + ThreadPoolFunc *func;
> + void *opaque;
> + GDestroyNotify opaque_destroy;
> +} ThreadPoolElement;
> +
> +static void thread_pool_func(gpointer data, gpointer user_data)
> +{
> + ThreadPool *pool = user_data;
> + g_autofree ThreadPoolElement *el = data;
> +
> + el->func(el->opaque);
> +
> + if (el->opaque_destroy) {
> + el->opaque_destroy(el->opaque);
> + }
> +
> + QEMU_LOCK_GUARD(&pool->unfinished_el_ctr_mutex);
> +
> + assert(pool->unfinished_el_ctr > 0);
> + pool->unfinished_el_ctr--;
> +
> + if (pool->unfinished_el_ctr == 0) {
> + qemu_cond_signal(&pool->unfinished_el_ctr_zero_cond);
> + }
> +}
> +
> +ThreadPool *thread_pool_new(void)
> +{
> + ThreadPool *pool = g_new(ThreadPool, 1);
> +
> + pool->unfinished_el_ctr = 0;
> + qemu_mutex_init(&pool->unfinished_el_ctr_mutex);
> + qemu_cond_init(&pool->unfinished_el_ctr_zero_cond);
> +
> + pool->t = g_thread_pool_new(thread_pool_func, pool, 0, TRUE, NULL);
> + /*
> + * g_thread_pool_new() can only return errors if initial thread(s)
> + * creation fails but we ask for 0 initial threads above.
> + */
> + assert(pool->t);
> +
> + return pool;
> +}
> +
> +void thread_pool_free(ThreadPool *pool)
> +{
> + g_thread_pool_free(pool->t, FALSE, TRUE);
> +
> + qemu_cond_destroy(&pool->unfinished_el_ctr_zero_cond);
> + qemu_mutex_destroy(&pool->unfinished_el_ctr_mutex);
> +
> + g_free(pool);
> +}
> +
> +void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func,
> + void *opaque, GDestroyNotify opaque_destroy)
> +{
> + ThreadPoolElement *el = g_new(ThreadPoolElement, 1);
Where are the ThreadPool elements freed ? I am missing something
may be.
Thanks,
C.
> +
> + el->func = func;
> + el->opaque = opaque;
> + el->opaque_destroy = opaque_destroy;
> +
> + WITH_QEMU_LOCK_GUARD(&pool->unfinished_el_ctr_mutex) {
> + pool->unfinished_el_ctr++;
> + }
> +
> + /*
> + * Ignore the return value since this function can only return errors
> + * if creation of an additional thread fails but even in this case the
> + * provided work is still getting queued (just for the existing threads).
> + */
> + g_thread_pool_push(pool->t, el, NULL);
> +}
> +
> +void thread_pool_wait(ThreadPool *pool)
> +{
> + QEMU_LOCK_GUARD(&pool->unfinished_el_ctr_mutex);
> +
> + if (pool->unfinished_el_ctr > 0) {
> + qemu_cond_wait(&pool->unfinished_el_ctr_zero_cond,
> + &pool->unfinished_el_ctr_mutex);
> + assert(pool->unfinished_el_ctr == 0);
> + }
> +}
> +
> +bool thread_pool_set_max_threads(ThreadPool *pool,
> + int max_threads)
> +{
> + assert(max_threads > 0);
> +
> + return g_thread_pool_set_max_threads(pool->t, max_threads, NULL);
> +}
> +
> +bool thread_pool_adjust_max_threads_to_work(ThreadPool *pool)
> +{
> + QEMU_LOCK_GUARD(&pool->unfinished_el_ctr_mutex);
> +
> + return thread_pool_set_max_threads(pool, pool->unfinished_el_ctr);
> +}
>
On 26.11.2024 20:29, Cédric Le Goater wrote:
> On 11/17/24 20:19, Maciej S. Szmigiero wrote:
>> From: "Maciej S. Szmigiero" <maciej.szmigiero@oracle.com>
>>
>> Migration code wants to manage device data sending threads in one place.
>>
>> QEMU has an existing thread pool implementation, however it is limited
>> to queuing AIO operations only and essentially has a 1:1 mapping between
>> the current AioContext and the AIO ThreadPool in use.
>>
>> Implement generic (non-AIO) ThreadPool by essentially wrapping Glib's
>> GThreadPool.
>>
>> This brings a few new operations on a pool:
>> * thread_pool_wait() operation waits until all the submitted work requests
>> have finished.
>>
>> * thread_pool_set_max_threads() explicitly sets the maximum thread count
>> in the pool.
>>
>> * thread_pool_adjust_max_threads_to_work() adjusts the maximum thread count
>> in the pool to equal the number of still waiting in queue or unfinished work.
>>
>> Signed-off-by: Maciej S. Szmigiero <maciej.szmigiero@oracle.com>
>> ---
>> include/block/thread-pool.h | 9 +++
>> util/thread-pool.c | 109 ++++++++++++++++++++++++++++++++++++
>> 2 files changed, 118 insertions(+)
>>
>> diff --git a/include/block/thread-pool.h b/include/block/thread-pool.h
>> index 6f27eb085b45..3f9f66307b65 100644
>> --- a/include/block/thread-pool.h
>> +++ b/include/block/thread-pool.h
>> @@ -38,5 +38,14 @@ BlockAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func, void *arg,
>> int coroutine_fn thread_pool_submit_co(ThreadPoolFunc *func, void *arg);
>> void thread_pool_update_params(ThreadPoolAio *pool, struct AioContext *ctx);
>> +typedef struct ThreadPool ThreadPool;
>> +
>> +ThreadPool *thread_pool_new(void);
>> +void thread_pool_free(ThreadPool *pool);
>> +void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func,
>> + void *opaque, GDestroyNotify opaque_destroy);
>> +void thread_pool_wait(ThreadPool *pool);
>> +bool thread_pool_set_max_threads(ThreadPool *pool, int max_threads);
>> +bool thread_pool_adjust_max_threads_to_work(ThreadPool *pool);
>
> We should add documentation for these routines.
Ack.
>> #endif
>> diff --git a/util/thread-pool.c b/util/thread-pool.c
>> index 908194dc070f..d80c4181c897 100644
>> --- a/util/thread-pool.c
>> +++ b/util/thread-pool.c
>> @@ -374,3 +374,112 @@ void thread_pool_free_aio(ThreadPoolAio *pool)
>> qemu_mutex_destroy(&pool->lock);
>> g_free(pool);
>> }
>> +
>> +struct ThreadPool { /* type safety */
>> + GThreadPool *t;
>> + size_t unfinished_el_ctr;
>> + QemuMutex unfinished_el_ctr_mutex;
>> + QemuCond unfinished_el_ctr_zero_cond;
>> +};
>
>
> I find the naming of the attributes a little confusing. Could we
> use names similar to ThreadPoolAio. Something like :
>
> struct ThreadPool { /* type safety */
> GThreadPool *t;
> int cur_threads;
"cur_work" would probably be more accurate since the code that
decrements this counter is still running inside a worker thread
so by the time this reaches zero technically there are still
threads running.
> QemuMutex lock;
This lock only protects the counter above, not the rest of the
structure so I guess "cur_work_lock" would be more accurate.
> QemuCond finished_cond;
I would go for "all_finished_cond", since it's only signaled once
all of the work is finished (the counter above reaches zero).
> };
>
>
>
>> +
>> +typedef struct {
>> + ThreadPoolFunc *func;
>> + void *opaque;
>> + GDestroyNotify opaque_destroy;
>> +} ThreadPoolElement;
>> +
>> +static void thread_pool_func(gpointer data, gpointer user_data)
>> +{
>> + ThreadPool *pool = user_data;
>> + g_autofree ThreadPoolElement *el = data;
>> +
>> + el->func(el->opaque);
>> +
>> + if (el->opaque_destroy) {
>> + el->opaque_destroy(el->opaque);
>> + }
>> +
>> + QEMU_LOCK_GUARD(&pool->unfinished_el_ctr_mutex);
>> +
>> + assert(pool->unfinished_el_ctr > 0);
>> + pool->unfinished_el_ctr--;
>> +
>> + if (pool->unfinished_el_ctr == 0) {
>> + qemu_cond_signal(&pool->unfinished_el_ctr_zero_cond);
>> + }
>> +}
>> +
>> +ThreadPool *thread_pool_new(void)
>> +{
>> + ThreadPool *pool = g_new(ThreadPool, 1);
>> +
>> + pool->unfinished_el_ctr = 0;
>> + qemu_mutex_init(&pool->unfinished_el_ctr_mutex);
>> + qemu_cond_init(&pool->unfinished_el_ctr_zero_cond);
>> +
>> + pool->t = g_thread_pool_new(thread_pool_func, pool, 0, TRUE, NULL);
>> + /*
>> + * g_thread_pool_new() can only return errors if initial thread(s)
>> + * creation fails but we ask for 0 initial threads above.
>> + */
>> + assert(pool->t);
>> +
>> + return pool;
>> +}
>> +
>> +void thread_pool_free(ThreadPool *pool)
>> +{
>> + g_thread_pool_free(pool->t, FALSE, TRUE);
>> +
>> + qemu_cond_destroy(&pool->unfinished_el_ctr_zero_cond);
>> + qemu_mutex_destroy(&pool->unfinished_el_ctr_mutex);
>> +
>> + g_free(pool);
>> +}
>> +
>> +void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func,
>> + void *opaque, GDestroyNotify opaque_destroy)
>> +{
>> + ThreadPoolElement *el = g_new(ThreadPoolElement, 1);
>
> Where are the ThreadPool elements freed ? I am missing something
> may be.
At the entry to thread_pool_func(), the initialization of
automatic storage duration variable "ThreadPoolElement *el" takes
ownership of this object (RAII) and frees it when this variable
goes out of scope (that is, when this function exits) since it is
marked as a g_autofree.
> Thanks,
>
> C.
Thanks,
Maciej
On 11/26/24 22:22, Maciej S. Szmigiero wrote:
> On 26.11.2024 20:29, Cédric Le Goater wrote:
>> On 11/17/24 20:19, Maciej S. Szmigiero wrote:
>>> From: "Maciej S. Szmigiero" <maciej.szmigiero@oracle.com>
>>>
>>> Migration code wants to manage device data sending threads in one place.
>>>
>>> QEMU has an existing thread pool implementation, however it is limited
>>> to queuing AIO operations only and essentially has a 1:1 mapping between
>>> the current AioContext and the AIO ThreadPool in use.
>>>
>>> Implement generic (non-AIO) ThreadPool by essentially wrapping Glib's
>>> GThreadPool.
>>>
>>> This brings a few new operations on a pool:
>>> * thread_pool_wait() operation waits until all the submitted work requests
>>> have finished.
>>>
>>> * thread_pool_set_max_threads() explicitly sets the maximum thread count
>>> in the pool.
>>>
>>> * thread_pool_adjust_max_threads_to_work() adjusts the maximum thread count
>>> in the pool to equal the number of still waiting in queue or unfinished work.
>>>
>>> Signed-off-by: Maciej S. Szmigiero <maciej.szmigiero@oracle.com>
>>> ---
>>> include/block/thread-pool.h | 9 +++
>>> util/thread-pool.c | 109 ++++++++++++++++++++++++++++++++++++
>>> 2 files changed, 118 insertions(+)
>>>
>>> diff --git a/include/block/thread-pool.h b/include/block/thread-pool.h
>>> index 6f27eb085b45..3f9f66307b65 100644
>>> --- a/include/block/thread-pool.h
>>> +++ b/include/block/thread-pool.h
>>> @@ -38,5 +38,14 @@ BlockAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func, void *arg,
>>> int coroutine_fn thread_pool_submit_co(ThreadPoolFunc *func, void *arg);
>>> void thread_pool_update_params(ThreadPoolAio *pool, struct AioContext *ctx);
>>> +typedef struct ThreadPool ThreadPool;
>>> +
>>> +ThreadPool *thread_pool_new(void);
>>> +void thread_pool_free(ThreadPool *pool);
>>> +void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func,
>>> + void *opaque, GDestroyNotify opaque_destroy);
>>> +void thread_pool_wait(ThreadPool *pool);
>>> +bool thread_pool_set_max_threads(ThreadPool *pool, int max_threads);
>>> +bool thread_pool_adjust_max_threads_to_work(ThreadPool *pool);
>>
>> We should add documentation for these routines.
>
> Ack.
>
>>> #endif
>>> diff --git a/util/thread-pool.c b/util/thread-pool.c
>>> index 908194dc070f..d80c4181c897 100644
>>> --- a/util/thread-pool.c
>>> +++ b/util/thread-pool.c
>>> @@ -374,3 +374,112 @@ void thread_pool_free_aio(ThreadPoolAio *pool)
>>> qemu_mutex_destroy(&pool->lock);
>>> g_free(pool);
>>> }
>>> +
>>> +struct ThreadPool { /* type safety */
>>> + GThreadPool *t;
>>> + size_t unfinished_el_ctr;
>>> + QemuMutex unfinished_el_ctr_mutex;
>>> + QemuCond unfinished_el_ctr_zero_cond;
>>> +};
>>
>>
>> I find the naming of the attributes a little confusing. Could we
>> use names similar to ThreadPoolAio. Something like :
>>
>> struct ThreadPool { /* type safety */
>> GThreadPool *t;
>> int cur_threads;
>
> "cur_work" would probably be more accurate since the code that
> decrements this counter is still running inside a worker thread
> so by the time this reaches zero technically there are still
> threads running.
>
>> QemuMutex lock;
>
> This lock only protects the counter above, not the rest of the
> structure so I guess "cur_work_lock" would be more accurate.
>
>> QemuCond finished_cond;
>
> I would go for "all_finished_cond", since it's only signaled once
> all of the work is finished (the counter above reaches zero).
All good for me.
>
>> };
>>
>>
>>
>>> +
>>> +typedef struct {
>>> + ThreadPoolFunc *func;
>>> + void *opaque;
>>> + GDestroyNotify opaque_destroy;
>>> +} ThreadPoolElement;
>>> +
>>> +static void thread_pool_func(gpointer data, gpointer user_data)
>>> +{
>>> + ThreadPool *pool = user_data;
>>> + g_autofree ThreadPoolElement *el = data;
>>> +
>>> + el->func(el->opaque);
>>> +
>>> + if (el->opaque_destroy) {
>>> + el->opaque_destroy(el->opaque);
>>> + }
>>> +
>>> + QEMU_LOCK_GUARD(&pool->unfinished_el_ctr_mutex);
>>> +
>>> + assert(pool->unfinished_el_ctr > 0);
>>> + pool->unfinished_el_ctr--;
>>> +
>>> + if (pool->unfinished_el_ctr == 0) {
>>> + qemu_cond_signal(&pool->unfinished_el_ctr_zero_cond);
>>> + }
>>> +}
>>> +
>>> +ThreadPool *thread_pool_new(void)
>>> +{
>>> + ThreadPool *pool = g_new(ThreadPool, 1);
>>> +
>>> + pool->unfinished_el_ctr = 0;
>>> + qemu_mutex_init(&pool->unfinished_el_ctr_mutex);
>>> + qemu_cond_init(&pool->unfinished_el_ctr_zero_cond);
>>> +
>>> + pool->t = g_thread_pool_new(thread_pool_func, pool, 0, TRUE, NULL);
>>> + /*
>>> + * g_thread_pool_new() can only return errors if initial thread(s)
>>> + * creation fails but we ask for 0 initial threads above.
>>> + */
>>> + assert(pool->t);
>>> +
>>> + return pool;
>>> +}
>>> +
>>> +void thread_pool_free(ThreadPool *pool)
>>> +{
>>> + g_thread_pool_free(pool->t, FALSE, TRUE);
>>> +
>>> + qemu_cond_destroy(&pool->unfinished_el_ctr_zero_cond);
>>> + qemu_mutex_destroy(&pool->unfinished_el_ctr_mutex);
>>> +
>>> + g_free(pool);
>>> +}
>>> +
>>> +void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func,
>>> + void *opaque, GDestroyNotify opaque_destroy)
>>> +{
>>> + ThreadPoolElement *el = g_new(ThreadPoolElement, 1);
>>
>> Where are the ThreadPool elements freed ? I am missing something
>> may be.
>
> At the entry to thread_pool_func(), the initialization of
> automatic storage duration variable "ThreadPoolElement *el" takes
> ownership of this object (RAII) and frees it when this variable
> goes out of scope (that is, when this function exits) since it is
> marked as a g_autofree.
OK. I missed it.
Thanks,
C.
"Maciej S. Szmigiero" <mail@maciej.szmigiero.name> writes:
> From: "Maciej S. Szmigiero" <maciej.szmigiero@oracle.com>
>
> Migration code wants to manage device data sending threads in one place.
>
> QEMU has an existing thread pool implementation, however it is limited
> to queuing AIO operations only and essentially has a 1:1 mapping between
> the current AioContext and the AIO ThreadPool in use.
>
> Implement generic (non-AIO) ThreadPool by essentially wrapping Glib's
> GThreadPool.
>
> This brings a few new operations on a pool:
> * thread_pool_wait() operation waits until all the submitted work requests
> have finished.
>
> * thread_pool_set_max_threads() explicitly sets the maximum thread count
> in the pool.
>
> * thread_pool_adjust_max_threads_to_work() adjusts the maximum thread count
> in the pool to equal the number of still waiting in queue or unfinished work.
>
> Signed-off-by: Maciej S. Szmigiero <maciej.szmigiero@oracle.com>
> ---
> include/block/thread-pool.h | 9 +++
> util/thread-pool.c | 109 ++++++++++++++++++++++++++++++++++++
> 2 files changed, 118 insertions(+)
>
> diff --git a/include/block/thread-pool.h b/include/block/thread-pool.h
> index 6f27eb085b45..3f9f66307b65 100644
> --- a/include/block/thread-pool.h
> +++ b/include/block/thread-pool.h
> @@ -38,5 +38,14 @@ BlockAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func, void *arg,
> int coroutine_fn thread_pool_submit_co(ThreadPoolFunc *func, void *arg);
> void thread_pool_update_params(ThreadPoolAio *pool, struct AioContext *ctx);
>
> +typedef struct ThreadPool ThreadPool;
> +
> +ThreadPool *thread_pool_new(void);
> +void thread_pool_free(ThreadPool *pool);
> +void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func,
> + void *opaque, GDestroyNotify opaque_destroy);
> +void thread_pool_wait(ThreadPool *pool);
> +bool thread_pool_set_max_threads(ThreadPool *pool, int max_threads);
> +bool thread_pool_adjust_max_threads_to_work(ThreadPool *pool);
>
> #endif
> diff --git a/util/thread-pool.c b/util/thread-pool.c
> index 908194dc070f..d80c4181c897 100644
> --- a/util/thread-pool.c
> +++ b/util/thread-pool.c
> @@ -374,3 +374,112 @@ void thread_pool_free_aio(ThreadPoolAio *pool)
> qemu_mutex_destroy(&pool->lock);
> g_free(pool);
> }
> +
> +struct ThreadPool { /* type safety */
> + GThreadPool *t;
> + size_t unfinished_el_ctr;
> + QemuMutex unfinished_el_ctr_mutex;
> + QemuCond unfinished_el_ctr_zero_cond;
> +};
> +
> +typedef struct {
> + ThreadPoolFunc *func;
> + void *opaque;
> + GDestroyNotify opaque_destroy;
> +} ThreadPoolElement;
> +
> +static void thread_pool_func(gpointer data, gpointer user_data)
> +{
> + ThreadPool *pool = user_data;
> + g_autofree ThreadPoolElement *el = data;
> +
> + el->func(el->opaque);
> +
> + if (el->opaque_destroy) {
> + el->opaque_destroy(el->opaque);
> + }
> +
> + QEMU_LOCK_GUARD(&pool->unfinished_el_ctr_mutex);
> +
> + assert(pool->unfinished_el_ctr > 0);
> + pool->unfinished_el_ctr--;
> +
> + if (pool->unfinished_el_ctr == 0) {
> + qemu_cond_signal(&pool->unfinished_el_ctr_zero_cond);
> + }
> +}
> +
> +ThreadPool *thread_pool_new(void)
> +{
> + ThreadPool *pool = g_new(ThreadPool, 1);
> +
> + pool->unfinished_el_ctr = 0;
> + qemu_mutex_init(&pool->unfinished_el_ctr_mutex);
> + qemu_cond_init(&pool->unfinished_el_ctr_zero_cond);
> +
> + pool->t = g_thread_pool_new(thread_pool_func, pool, 0, TRUE, NULL);
> + /*
> + * g_thread_pool_new() can only return errors if initial thread(s)
> + * creation fails but we ask for 0 initial threads above.
> + */
> + assert(pool->t);
> +
> + return pool;
> +}
> +
> +void thread_pool_free(ThreadPool *pool)
> +{
> + g_thread_pool_free(pool->t, FALSE, TRUE);
Should we make it an error to call thread_poll_free without first
calling thread_poll_wait? I worry the current usage will lead to having
two different ways of waiting with one of them (this one) being quite
implicit.
> +
> + qemu_cond_destroy(&pool->unfinished_el_ctr_zero_cond);
> + qemu_mutex_destroy(&pool->unfinished_el_ctr_mutex);
> +
> + g_free(pool);
> +}
> +
> +void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func,
> + void *opaque, GDestroyNotify opaque_destroy)
> +{
> + ThreadPoolElement *el = g_new(ThreadPoolElement, 1);
> +
> + el->func = func;
> + el->opaque = opaque;
> + el->opaque_destroy = opaque_destroy;
> +
> + WITH_QEMU_LOCK_GUARD(&pool->unfinished_el_ctr_mutex) {
> + pool->unfinished_el_ctr++;
> + }
> +
> + /*
> + * Ignore the return value since this function can only return errors
> + * if creation of an additional thread fails but even in this case the
> + * provided work is still getting queued (just for the existing threads).
> + */
> + g_thread_pool_push(pool->t, el, NULL);
> +}
> +
> +void thread_pool_wait(ThreadPool *pool)
> +{
> + QEMU_LOCK_GUARD(&pool->unfinished_el_ctr_mutex);
> +
> + if (pool->unfinished_el_ctr > 0) {
> + qemu_cond_wait(&pool->unfinished_el_ctr_zero_cond,
> + &pool->unfinished_el_ctr_mutex);
> + assert(pool->unfinished_el_ctr == 0);
> + }
> +}
> +
> +bool thread_pool_set_max_threads(ThreadPool *pool,
> + int max_threads)
> +{
> + assert(max_threads > 0);
> +
> + return g_thread_pool_set_max_threads(pool->t, max_threads, NULL);
> +}
> +
> +bool thread_pool_adjust_max_threads_to_work(ThreadPool *pool)
> +{
> + QEMU_LOCK_GUARD(&pool->unfinished_el_ctr_mutex);
> +
> + return thread_pool_set_max_threads(pool, pool->unfinished_el_ctr);
> +}
On 25.11.2024 20:41, Fabiano Rosas wrote:
> "Maciej S. Szmigiero" <mail@maciej.szmigiero.name> writes:
>
>> From: "Maciej S. Szmigiero" <maciej.szmigiero@oracle.com>
>>
>> Migration code wants to manage device data sending threads in one place.
>>
>> QEMU has an existing thread pool implementation, however it is limited
>> to queuing AIO operations only and essentially has a 1:1 mapping between
>> the current AioContext and the AIO ThreadPool in use.
>>
>> Implement generic (non-AIO) ThreadPool by essentially wrapping Glib's
>> GThreadPool.
>>
>> This brings a few new operations on a pool:
>> * thread_pool_wait() operation waits until all the submitted work requests
>> have finished.
>>
>> * thread_pool_set_max_threads() explicitly sets the maximum thread count
>> in the pool.
>>
>> * thread_pool_adjust_max_threads_to_work() adjusts the maximum thread count
>> in the pool to equal the number of still waiting in queue or unfinished work.
>>
>> Signed-off-by: Maciej S. Szmigiero <maciej.szmigiero@oracle.com>
>> ---
>> include/block/thread-pool.h | 9 +++
>> util/thread-pool.c | 109 ++++++++++++++++++++++++++++++++++++
>> 2 files changed, 118 insertions(+)
>>
>> diff --git a/include/block/thread-pool.h b/include/block/thread-pool.h
>> index 6f27eb085b45..3f9f66307b65 100644
>> --- a/include/block/thread-pool.h
>> +++ b/include/block/thread-pool.h
>> @@ -38,5 +38,14 @@ BlockAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func, void *arg,
>> int coroutine_fn thread_pool_submit_co(ThreadPoolFunc *func, void *arg);
>> void thread_pool_update_params(ThreadPoolAio *pool, struct AioContext *ctx);
>>
>> +typedef struct ThreadPool ThreadPool;
>> +
>> +ThreadPool *thread_pool_new(void);
>> +void thread_pool_free(ThreadPool *pool);
>> +void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func,
>> + void *opaque, GDestroyNotify opaque_destroy);
>> +void thread_pool_wait(ThreadPool *pool);
>> +bool thread_pool_set_max_threads(ThreadPool *pool, int max_threads);
>> +bool thread_pool_adjust_max_threads_to_work(ThreadPool *pool);
>>
>> #endif
>> diff --git a/util/thread-pool.c b/util/thread-pool.c
>> index 908194dc070f..d80c4181c897 100644
>> --- a/util/thread-pool.c
>> +++ b/util/thread-pool.c
>> @@ -374,3 +374,112 @@ void thread_pool_free_aio(ThreadPoolAio *pool)
>> qemu_mutex_destroy(&pool->lock);
>> g_free(pool);
>> }
>> +
>> +struct ThreadPool { /* type safety */
>> + GThreadPool *t;
>> + size_t unfinished_el_ctr;
>> + QemuMutex unfinished_el_ctr_mutex;
>> + QemuCond unfinished_el_ctr_zero_cond;
>> +};
>> +
>> +typedef struct {
>> + ThreadPoolFunc *func;
>> + void *opaque;
>> + GDestroyNotify opaque_destroy;
>> +} ThreadPoolElement;
>> +
>> +static void thread_pool_func(gpointer data, gpointer user_data)
>> +{
>> + ThreadPool *pool = user_data;
>> + g_autofree ThreadPoolElement *el = data;
>> +
>> + el->func(el->opaque);
>> +
>> + if (el->opaque_destroy) {
>> + el->opaque_destroy(el->opaque);
>> + }
>> +
>> + QEMU_LOCK_GUARD(&pool->unfinished_el_ctr_mutex);
>> +
>> + assert(pool->unfinished_el_ctr > 0);
>> + pool->unfinished_el_ctr--;
>> +
>> + if (pool->unfinished_el_ctr == 0) {
>> + qemu_cond_signal(&pool->unfinished_el_ctr_zero_cond);
>> + }
>> +}
>> +
>> +ThreadPool *thread_pool_new(void)
>> +{
>> + ThreadPool *pool = g_new(ThreadPool, 1);
>> +
>> + pool->unfinished_el_ctr = 0;
>> + qemu_mutex_init(&pool->unfinished_el_ctr_mutex);
>> + qemu_cond_init(&pool->unfinished_el_ctr_zero_cond);
>> +
>> + pool->t = g_thread_pool_new(thread_pool_func, pool, 0, TRUE, NULL);
>> + /*
>> + * g_thread_pool_new() can only return errors if initial thread(s)
>> + * creation fails but we ask for 0 initial threads above.
>> + */
>> + assert(pool->t);
>> +
>> + return pool;
>> +}
>> +
>> +void thread_pool_free(ThreadPool *pool)
>> +{
>> + g_thread_pool_free(pool->t, FALSE, TRUE);
>
> Should we make it an error to call thread_poll_free without first
> calling thread_poll_wait? I worry the current usage will lead to having
> two different ways of waiting with one of them (this one) being quite
> implicit.
>
thread_pool_wait() can be used as a barrier between two sets of
tasks executed on a thread pool without destroying it or in a performance
sensitive path where we want to just wait for task completion while
deferring the free operation for later, less sensitive time.
I don't think requiring explicit thread_pool_wait() before
thread_pool_free() actually gives any advantage, while at the same
time it's making this API usage slightly more complex in cases
where the consumer is fine with having combined wait+free semantics
for thread_pool_free().
Thanks,
Maciej
On 11/25/24 20:55, Maciej S. Szmigiero wrote:
> On 25.11.2024 20:41, Fabiano Rosas wrote:
>> "Maciej S. Szmigiero" <mail@maciej.szmigiero.name> writes:
>>
>>> From: "Maciej S. Szmigiero" <maciej.szmigiero@oracle.com>
>>>
>>> Migration code wants to manage device data sending threads in one place.
>>>
>>> QEMU has an existing thread pool implementation, however it is limited
>>> to queuing AIO operations only and essentially has a 1:1 mapping between
>>> the current AioContext and the AIO ThreadPool in use.
>>>
>>> Implement generic (non-AIO) ThreadPool by essentially wrapping Glib's
>>> GThreadPool.
>>>
>>> This brings a few new operations on a pool:
>>> * thread_pool_wait() operation waits until all the submitted work requests
>>> have finished.
>>>
>>> * thread_pool_set_max_threads() explicitly sets the maximum thread count
>>> in the pool.
>>>
>>> * thread_pool_adjust_max_threads_to_work() adjusts the maximum thread count
>>> in the pool to equal the number of still waiting in queue or unfinished work.
>>>
>>> Signed-off-by: Maciej S. Szmigiero <maciej.szmigiero@oracle.com>
>>> ---
>>> include/block/thread-pool.h | 9 +++
>>> util/thread-pool.c | 109 ++++++++++++++++++++++++++++++++++++
>>> 2 files changed, 118 insertions(+)
>>>
>>> diff --git a/include/block/thread-pool.h b/include/block/thread-pool.h
>>> index 6f27eb085b45..3f9f66307b65 100644
>>> --- a/include/block/thread-pool.h
>>> +++ b/include/block/thread-pool.h
>>> @@ -38,5 +38,14 @@ BlockAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func, void *arg,
>>> int coroutine_fn thread_pool_submit_co(ThreadPoolFunc *func, void *arg);
>>> void thread_pool_update_params(ThreadPoolAio *pool, struct AioContext *ctx);
>>> +typedef struct ThreadPool ThreadPool;
>>> +
>>> +ThreadPool *thread_pool_new(void);
>>> +void thread_pool_free(ThreadPool *pool);
>>> +void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func,
>>> + void *opaque, GDestroyNotify opaque_destroy);
>>> +void thread_pool_wait(ThreadPool *pool);
>>> +bool thread_pool_set_max_threads(ThreadPool *pool, int max_threads);
>>> +bool thread_pool_adjust_max_threads_to_work(ThreadPool *pool);
>>> #endif
>>> diff --git a/util/thread-pool.c b/util/thread-pool.c
>>> index 908194dc070f..d80c4181c897 100644
>>> --- a/util/thread-pool.c
>>> +++ b/util/thread-pool.c
>>> @@ -374,3 +374,112 @@ void thread_pool_free_aio(ThreadPoolAio *pool)
>>> qemu_mutex_destroy(&pool->lock);
>>> g_free(pool);
>>> }
>>> +
>>> +struct ThreadPool { /* type safety */
>>> + GThreadPool *t;
>>> + size_t unfinished_el_ctr;
>>> + QemuMutex unfinished_el_ctr_mutex;
>>> + QemuCond unfinished_el_ctr_zero_cond;
>>> +};
>>> +
>>> +typedef struct {
>>> + ThreadPoolFunc *func;
>>> + void *opaque;
>>> + GDestroyNotify opaque_destroy;
>>> +} ThreadPoolElement;
>>> +
>>> +static void thread_pool_func(gpointer data, gpointer user_data)
>>> +{
>>> + ThreadPool *pool = user_data;
>>> + g_autofree ThreadPoolElement *el = data;
>>> +
>>> + el->func(el->opaque);
>>> +
>>> + if (el->opaque_destroy) {
>>> + el->opaque_destroy(el->opaque);
>>> + }
>>> +
>>> + QEMU_LOCK_GUARD(&pool->unfinished_el_ctr_mutex);
>>> +
>>> + assert(pool->unfinished_el_ctr > 0);
>>> + pool->unfinished_el_ctr--;
>>> +
>>> + if (pool->unfinished_el_ctr == 0) {
>>> + qemu_cond_signal(&pool->unfinished_el_ctr_zero_cond);
>>> + }
>>> +}
>>> +
>>> +ThreadPool *thread_pool_new(void)
>>> +{
>>> + ThreadPool *pool = g_new(ThreadPool, 1);
>>> +
>>> + pool->unfinished_el_ctr = 0;
>>> + qemu_mutex_init(&pool->unfinished_el_ctr_mutex);
>>> + qemu_cond_init(&pool->unfinished_el_ctr_zero_cond);
>>> +
>>> + pool->t = g_thread_pool_new(thread_pool_func, pool, 0, TRUE, NULL);
>>> + /*
>>> + * g_thread_pool_new() can only return errors if initial thread(s)
>>> + * creation fails but we ask for 0 initial threads above.
>>> + */
>>> + assert(pool->t);
>>> +
>>> + return pool;
>>> +}
>>> +
>>> +void thread_pool_free(ThreadPool *pool)
>>> +{
>>> + g_thread_pool_free(pool->t, FALSE, TRUE);
>>
>> Should we make it an error to call thread_poll_free without first
>> calling thread_poll_wait? I worry the current usage will lead to having
>> two different ways of waiting with one of them (this one) being quite
>> implicit.
>>
>
> thread_pool_wait() can be used as a barrier between two sets of
> tasks executed on a thread pool without destroying it or in a performance
> sensitive path where we want to just wait for task completion while
> deferring the free operation for later, less sensitive time.
A comment above g_thread_pool_free() would be good to have since
the wait_ argument is TRUE and g_thread_pool_free() effectively
waits for all threads to complete.
Thanks,
C.
>
> I don't think requiring explicit thread_pool_wait() before
> thread_pool_free() actually gives any advantage, while at the same
> time it's making this API usage slightly more complex in cases
> where the consumer is fine with having combined wait+free semantics
> for thread_pool_free().
>
> Thanks,
> Maciej
>
On 26.11.2024 20:25, Cédric Le Goater wrote:
> On 11/25/24 20:55, Maciej S. Szmigiero wrote:
>> On 25.11.2024 20:41, Fabiano Rosas wrote:
>>> "Maciej S. Szmigiero" <mail@maciej.szmigiero.name> writes:
>>>
>>>> From: "Maciej S. Szmigiero" <maciej.szmigiero@oracle.com>
>>>>
>>>> Migration code wants to manage device data sending threads in one place.
>>>>
>>>> QEMU has an existing thread pool implementation, however it is limited
>>>> to queuing AIO operations only and essentially has a 1:1 mapping between
>>>> the current AioContext and the AIO ThreadPool in use.
>>>>
>>>> Implement generic (non-AIO) ThreadPool by essentially wrapping Glib's
>>>> GThreadPool.
>>>>
>>>> This brings a few new operations on a pool:
>>>> * thread_pool_wait() operation waits until all the submitted work requests
>>>> have finished.
>>>>
>>>> * thread_pool_set_max_threads() explicitly sets the maximum thread count
>>>> in the pool.
>>>>
>>>> * thread_pool_adjust_max_threads_to_work() adjusts the maximum thread count
>>>> in the pool to equal the number of still waiting in queue or unfinished work.
>>>>
>>>> Signed-off-by: Maciej S. Szmigiero <maciej.szmigiero@oracle.com>
>>>> ---
>>>> include/block/thread-pool.h | 9 +++
>>>> util/thread-pool.c | 109 ++++++++++++++++++++++++++++++++++++
>>>> 2 files changed, 118 insertions(+)
>>>>
>>>> diff --git a/include/block/thread-pool.h b/include/block/thread-pool.h
>>>> index 6f27eb085b45..3f9f66307b65 100644
>>>> --- a/include/block/thread-pool.h
>>>> +++ b/include/block/thread-pool.h
>>>> @@ -38,5 +38,14 @@ BlockAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func, void *arg,
>>>> int coroutine_fn thread_pool_submit_co(ThreadPoolFunc *func, void *arg);
>>>> void thread_pool_update_params(ThreadPoolAio *pool, struct AioContext *ctx);
>>>> +typedef struct ThreadPool ThreadPool;
>>>> +
>>>> +ThreadPool *thread_pool_new(void);
>>>> +void thread_pool_free(ThreadPool *pool);
>>>> +void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func,
>>>> + void *opaque, GDestroyNotify opaque_destroy);
>>>> +void thread_pool_wait(ThreadPool *pool);
>>>> +bool thread_pool_set_max_threads(ThreadPool *pool, int max_threads);
>>>> +bool thread_pool_adjust_max_threads_to_work(ThreadPool *pool);
>>>> #endif
>>>> diff --git a/util/thread-pool.c b/util/thread-pool.c
>>>> index 908194dc070f..d80c4181c897 100644
>>>> --- a/util/thread-pool.c
>>>> +++ b/util/thread-pool.c
>>>> @@ -374,3 +374,112 @@ void thread_pool_free_aio(ThreadPoolAio *pool)
>>>> qemu_mutex_destroy(&pool->lock);
>>>> g_free(pool);
>>>> }
>>>> +
>>>> +struct ThreadPool { /* type safety */
>>>> + GThreadPool *t;
>>>> + size_t unfinished_el_ctr;
>>>> + QemuMutex unfinished_el_ctr_mutex;
>>>> + QemuCond unfinished_el_ctr_zero_cond;
>>>> +};
>>>> +
>>>> +typedef struct {
>>>> + ThreadPoolFunc *func;
>>>> + void *opaque;
>>>> + GDestroyNotify opaque_destroy;
>>>> +} ThreadPoolElement;
>>>> +
>>>> +static void thread_pool_func(gpointer data, gpointer user_data)
>>>> +{
>>>> + ThreadPool *pool = user_data;
>>>> + g_autofree ThreadPoolElement *el = data;
>>>> +
>>>> + el->func(el->opaque);
>>>> +
>>>> + if (el->opaque_destroy) {
>>>> + el->opaque_destroy(el->opaque);
>>>> + }
>>>> +
>>>> + QEMU_LOCK_GUARD(&pool->unfinished_el_ctr_mutex);
>>>> +
>>>> + assert(pool->unfinished_el_ctr > 0);
>>>> + pool->unfinished_el_ctr--;
>>>> +
>>>> + if (pool->unfinished_el_ctr == 0) {
>>>> + qemu_cond_signal(&pool->unfinished_el_ctr_zero_cond);
>>>> + }
>>>> +}
>>>> +
>>>> +ThreadPool *thread_pool_new(void)
>>>> +{
>>>> + ThreadPool *pool = g_new(ThreadPool, 1);
>>>> +
>>>> + pool->unfinished_el_ctr = 0;
>>>> + qemu_mutex_init(&pool->unfinished_el_ctr_mutex);
>>>> + qemu_cond_init(&pool->unfinished_el_ctr_zero_cond);
>>>> +
>>>> + pool->t = g_thread_pool_new(thread_pool_func, pool, 0, TRUE, NULL);
>>>> + /*
>>>> + * g_thread_pool_new() can only return errors if initial thread(s)
>>>> + * creation fails but we ask for 0 initial threads above.
>>>> + */
>>>> + assert(pool->t);
>>>> +
>>>> + return pool;
>>>> +}
>>>> +
>>>> +void thread_pool_free(ThreadPool *pool)
>>>> +{
>>>> + g_thread_pool_free(pool->t, FALSE, TRUE);
>>>
>>> Should we make it an error to call thread_poll_free without first
>>> calling thread_poll_wait? I worry the current usage will lead to having
>>> two different ways of waiting with one of them (this one) being quite
>>> implicit.
>>>
>>
>> thread_pool_wait() can be used as a barrier between two sets of
>> tasks executed on a thread pool without destroying it or in a performance
>> sensitive path where we want to just wait for task completion while
>> deferring the free operation for later, less sensitive time.
>
> A comment above g_thread_pool_free() would be good to have since
> the wait_ argument is TRUE and g_thread_pool_free() effectively
> waits for all threads to complete.
>
Will add an appropriate comment there.
> Thanks,
>
> C.
>
Thanks,
Maciej
"Maciej S. Szmigiero" <mail@maciej.szmigiero.name> writes:
> On 25.11.2024 20:41, Fabiano Rosas wrote:
>> "Maciej S. Szmigiero" <mail@maciej.szmigiero.name> writes:
>>
>>> From: "Maciej S. Szmigiero" <maciej.szmigiero@oracle.com>
>>>
>>> Migration code wants to manage device data sending threads in one place.
>>>
>>> QEMU has an existing thread pool implementation, however it is limited
>>> to queuing AIO operations only and essentially has a 1:1 mapping between
>>> the current AioContext and the AIO ThreadPool in use.
>>>
>>> Implement generic (non-AIO) ThreadPool by essentially wrapping Glib's
>>> GThreadPool.
>>>
>>> This brings a few new operations on a pool:
>>> * thread_pool_wait() operation waits until all the submitted work requests
>>> have finished.
>>>
>>> * thread_pool_set_max_threads() explicitly sets the maximum thread count
>>> in the pool.
>>>
>>> * thread_pool_adjust_max_threads_to_work() adjusts the maximum thread count
>>> in the pool to equal the number of still waiting in queue or unfinished work.
>>>
>>> Signed-off-by: Maciej S. Szmigiero <maciej.szmigiero@oracle.com>
>>> ---
>>> include/block/thread-pool.h | 9 +++
>>> util/thread-pool.c | 109 ++++++++++++++++++++++++++++++++++++
>>> 2 files changed, 118 insertions(+)
>>>
>>> diff --git a/include/block/thread-pool.h b/include/block/thread-pool.h
>>> index 6f27eb085b45..3f9f66307b65 100644
>>> --- a/include/block/thread-pool.h
>>> +++ b/include/block/thread-pool.h
>>> @@ -38,5 +38,14 @@ BlockAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func, void *arg,
>>> int coroutine_fn thread_pool_submit_co(ThreadPoolFunc *func, void *arg);
>>> void thread_pool_update_params(ThreadPoolAio *pool, struct AioContext *ctx);
>>>
>>> +typedef struct ThreadPool ThreadPool;
>>> +
>>> +ThreadPool *thread_pool_new(void);
>>> +void thread_pool_free(ThreadPool *pool);
>>> +void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func,
>>> + void *opaque, GDestroyNotify opaque_destroy);
>>> +void thread_pool_wait(ThreadPool *pool);
>>> +bool thread_pool_set_max_threads(ThreadPool *pool, int max_threads);
>>> +bool thread_pool_adjust_max_threads_to_work(ThreadPool *pool);
>>>
>>> #endif
>>> diff --git a/util/thread-pool.c b/util/thread-pool.c
>>> index 908194dc070f..d80c4181c897 100644
>>> --- a/util/thread-pool.c
>>> +++ b/util/thread-pool.c
>>> @@ -374,3 +374,112 @@ void thread_pool_free_aio(ThreadPoolAio *pool)
>>> qemu_mutex_destroy(&pool->lock);
>>> g_free(pool);
>>> }
>>> +
>>> +struct ThreadPool { /* type safety */
>>> + GThreadPool *t;
>>> + size_t unfinished_el_ctr;
>>> + QemuMutex unfinished_el_ctr_mutex;
>>> + QemuCond unfinished_el_ctr_zero_cond;
>>> +};
>>> +
>>> +typedef struct {
>>> + ThreadPoolFunc *func;
>>> + void *opaque;
>>> + GDestroyNotify opaque_destroy;
>>> +} ThreadPoolElement;
>>> +
>>> +static void thread_pool_func(gpointer data, gpointer user_data)
>>> +{
>>> + ThreadPool *pool = user_data;
>>> + g_autofree ThreadPoolElement *el = data;
>>> +
>>> + el->func(el->opaque);
>>> +
>>> + if (el->opaque_destroy) {
>>> + el->opaque_destroy(el->opaque);
>>> + }
>>> +
>>> + QEMU_LOCK_GUARD(&pool->unfinished_el_ctr_mutex);
>>> +
>>> + assert(pool->unfinished_el_ctr > 0);
>>> + pool->unfinished_el_ctr--;
>>> +
>>> + if (pool->unfinished_el_ctr == 0) {
>>> + qemu_cond_signal(&pool->unfinished_el_ctr_zero_cond);
>>> + }
>>> +}
>>> +
>>> +ThreadPool *thread_pool_new(void)
>>> +{
>>> + ThreadPool *pool = g_new(ThreadPool, 1);
>>> +
>>> + pool->unfinished_el_ctr = 0;
>>> + qemu_mutex_init(&pool->unfinished_el_ctr_mutex);
>>> + qemu_cond_init(&pool->unfinished_el_ctr_zero_cond);
>>> +
>>> + pool->t = g_thread_pool_new(thread_pool_func, pool, 0, TRUE, NULL);
>>> + /*
>>> + * g_thread_pool_new() can only return errors if initial thread(s)
>>> + * creation fails but we ask for 0 initial threads above.
>>> + */
>>> + assert(pool->t);
>>> +
>>> + return pool;
>>> +}
>>> +
>>> +void thread_pool_free(ThreadPool *pool)
>>> +{
>>> + g_thread_pool_free(pool->t, FALSE, TRUE);
>>
>> Should we make it an error to call thread_poll_free without first
>> calling thread_poll_wait? I worry the current usage will lead to having
>> two different ways of waiting with one of them (this one) being quite
>> implicit.
>>
>
> thread_pool_wait() can be used as a barrier between two sets of
> tasks executed on a thread pool without destroying it or in a performance
> sensitive path where we want to just wait for task completion while
> deferring the free operation for later, less sensitive time.
>
> I don't think requiring explicit thread_pool_wait() before
> thread_pool_free() actually gives any advantage, while at the same
> time it's making this API usage slightly more complex in cases
> where the consumer is fine with having combined wait+free semantics
> for thread_pool_free().
Fair enough,
Reviewed-by: Fabiano Rosas <farosas@suse.de>
© 2016 - 2026 Red Hat, Inc.