[PATCH 3/3] util/event-loop: Introduce options to set the thread pool size

Nicolas Saenz Julienne posted 3 patches 3 years, 11 months ago
Maintainers: Hanna Reitz <hreitz@redhat.com>, Paolo Bonzini <pbonzini@redhat.com>, Stefan Hajnoczi <stefanha@redhat.com>, Kevin Wolf <kwolf@redhat.com>, Michael Roth <michael.roth@amd.com>, Eric Blake <eblake@redhat.com>, Eduardo Habkost <eduardo@habkost.net>, Markus Armbruster <armbru@redhat.com>, "Daniel P. Berrangé" <berrange@redhat.com>, Fam Zheng <fam@euphon.net>
There is a newer version of this series
[PATCH 3/3] util/event-loop: Introduce options to set the thread pool size
Posted by Nicolas Saenz Julienne 3 years, 11 months ago
The thread pool regulates itself: when idle, it kills threads until
empty, when in demand, it creates new threads until full. This behaviour
doesn't play well with latency sensitive workloads where the price of
creating a new thread is too high. For example, when paired with qemu's
'-mlock', or using safety features like SafeStack, creating a new thread
has been measured take multiple milliseconds.

In order to mitigate this let's introduce a new 'EventLoopBackend'
property to set the thread pool size. The threads will be created during
the pool's initialization, remain available during its lifetime
regardless of demand, and destroyed upon freeing it. A properly
characterized workload will then be able to configure the pool to avoid
any latency spike.

Signed-off-by: Nicolas Saenz Julienne <nsaenzju@redhat.com>
---
 include/block/aio.h | 11 +++++++++++
 qapi/qom.json       |  4 +++-
 util/async.c        |  3 +++
 util/event-loop.c   | 15 ++++++++++++++-
 util/event-loop.h   |  4 ++++
 util/main-loop.c    | 13 +++++++++++++
 util/thread-pool.c  | 41 +++++++++++++++++++++++++++++++++++++----
 7 files changed, 85 insertions(+), 6 deletions(-)

diff --git a/include/block/aio.h b/include/block/aio.h
index 5634173b12..331483d1d1 100644
--- a/include/block/aio.h
+++ b/include/block/aio.h
@@ -192,6 +192,8 @@ struct AioContext {
     QSLIST_HEAD(, Coroutine) scheduled_coroutines;
     QEMUBH *co_schedule_bh;
 
+    int pool_min;
+    int pool_max;
     /* Thread pool for performing work and receiving completion callbacks.
      * Has its own locking.
      */
@@ -769,4 +771,13 @@ void aio_context_set_poll_params(AioContext *ctx, int64_t max_ns,
 void aio_context_set_aio_params(AioContext *ctx, int64_t max_batch,
                                 Error **errp);
 
+/**
+ * aio_context_set_thread_pool_params:
+ * @ctx: the aio context
+ * @min: Thread pool's min size, 0 by default
+ * @max: Thread pool's max size, 64 by default
+ */
+void aio_context_set_thread_pool_params(AioContext *ctx, uint64_t min,
+                                        uint64_t max, Error **errp);
+
 #endif
diff --git a/qapi/qom.json b/qapi/qom.json
index e7730ef62f..79c141b6bf 100644
--- a/qapi/qom.json
+++ b/qapi/qom.json
@@ -526,7 +526,9 @@
   'data': { '*poll-max-ns': 'int',
             '*poll-grow': 'int',
             '*poll-shrink': 'int',
-            '*aio-max-batch': 'int' } }
+            '*aio-max-batch': 'int',
+            '*thread-pool-min': 'int',
+            '*thread-pool-max': 'int' } }
 
 ##
 # @MemoryBackendProperties:
diff --git a/util/async.c b/util/async.c
index 08d25feef5..58ef2e2ee5 100644
--- a/util/async.c
+++ b/util/async.c
@@ -562,6 +562,9 @@ AioContext *aio_context_new(Error **errp)
 
     ctx->aio_max_batch = 0;
 
+    ctx->pool_min = 0;
+    ctx->pool_max = 64;
+
     return ctx;
 fail:
     g_source_destroy(&ctx->source);
diff --git a/util/event-loop.c b/util/event-loop.c
index c0ddd61f20..f2532e7d31 100644
--- a/util/event-loop.c
+++ b/util/event-loop.c
@@ -51,6 +51,12 @@ static EventLoopBackendParamInfo poll_shrink_info = {
 static EventLoopBackendParamInfo aio_max_batch_info = {
     "aio-max-batch", offsetof(EventLoopBackend, aio_max_batch),
 };
+static EventLoopBackendParamInfo thread_pool_min_info = {
+    "thread-pool-min", offsetof(EventLoopBackend, thread_pool_min),
+};
+static EventLoopBackendParamInfo thread_pool_max_info = {
+    "thread-pool-max", offsetof(EventLoopBackend, thread_pool_max),
+};
 
 static void event_loop_backend_get_param(Object *obj, Visitor *v,
         const char *name, void *opaque, Error **errp)
@@ -84,7 +90,6 @@ static void event_loop_backend_set_param(Object *obj, Visitor *v,
     *field = value;
 
     return;
-
 }
 
 static void
@@ -132,6 +137,14 @@ static void event_loop_backend_class_init(ObjectClass *klass, void *class_data)
                               event_loop_backend_get_param,
                               event_loop_backend_set_param,
                               NULL, &aio_max_batch_info);
+    object_class_property_add(klass, "thread-pool-min", "int",
+                              event_loop_backend_get_param,
+                              event_loop_backend_set_param,
+                              NULL, &thread_pool_min_info);
+    object_class_property_add(klass, "thread-pool-max", "int",
+                              event_loop_backend_get_param,
+                              event_loop_backend_set_param,
+                              NULL, &thread_pool_max_info);
 }
 
 static const TypeInfo event_loop_backend_info = {
diff --git a/util/event-loop.h b/util/event-loop.h
index 34cf9309af..0f4255ee7b 100644
--- a/util/event-loop.h
+++ b/util/event-loop.h
@@ -37,5 +37,9 @@ struct EventLoopBackend {
 
     /* AioContext AIO engine parameters */
     int64_t aio_max_batch;
+
+    /* AioContext thread pool parameters */
+    int64_t thread_pool_min;
+    int64_t thread_pool_max;
 };
 #endif
diff --git a/util/main-loop.c b/util/main-loop.c
index 395fd9bd3e..266a9c72d8 100644
--- a/util/main-loop.c
+++ b/util/main-loop.c
@@ -190,12 +190,25 @@ MainLoop *mloop;
 static void main_loop_init(EventLoopBackend *bc, Error **errp)
 {
     MainLoop *m = MAIN_LOOP(bc);
+    Error *local_error = NULL;
+
+    if (!qemu_aio_context) {
+        error_setg(errp, "qemu aio context not ready");
+        return;
+    }
 
     if (mloop) {
         error_setg(errp, "only one main-loop instance allowed");
         return;
     }
 
+    aio_context_set_thread_pool_params(qemu_aio_context, bc->thread_pool_min,
+                                       bc->thread_pool_max, &local_error);
+    if (local_error) {
+        error_propagate(errp, local_error);
+        return;
+    }
+
     mloop = m;
     return;
 }
diff --git a/util/thread-pool.c b/util/thread-pool.c
index d763cea505..95c339cb00 100644
--- a/util/thread-pool.c
+++ b/util/thread-pool.c
@@ -21,6 +21,7 @@
 #include "trace.h"
 #include "block/thread-pool.h"
 #include "qemu/main-loop.h"
+#include "qapi/error.h"
 
 static void do_spawn_thread(ThreadPool *pool);
 
@@ -58,7 +59,6 @@ struct ThreadPool {
     QemuMutex lock;
     QemuCond worker_stopped;
     QemuSemaphore sem;
-    int max_threads;
     QEMUBH *new_thread_bh;
 
     /* The following variables are only accessed from one AioContext. */
@@ -76,6 +76,7 @@ struct ThreadPool {
 static void *worker_thread(void *opaque)
 {
     ThreadPool *pool = opaque;
+    AioContext *ctx = pool->ctx;
 
     qemu_mutex_lock(&pool->lock);
     pool->pending_threads--;
@@ -91,7 +92,8 @@ static void *worker_thread(void *opaque)
             ret = qemu_sem_timedwait(&pool->sem, 10000);
             qemu_mutex_lock(&pool->lock);
             pool->idle_threads--;
-        } while (ret == -1 && !QTAILQ_EMPTY(&pool->request_list));
+        } while (ret == -1 && (!QTAILQ_EMPTY(&pool->request_list) ||
+                 pool->cur_threads <= ctx->pool_min));
         if (ret == -1 || pool->stopping) {
             break;
         }
@@ -244,6 +246,7 @@ BlockAIOCB *thread_pool_submit_aio(ThreadPool *pool,
         ThreadPoolFunc *func, void *arg,
         BlockCompletionFunc *cb, void *opaque)
 {
+    AioContext *ctx = pool->ctx;
     ThreadPoolElement *req;
 
     req = qemu_aio_get(&thread_pool_aiocb_info, NULL, cb, opaque);
@@ -257,7 +260,7 @@ BlockAIOCB *thread_pool_submit_aio(ThreadPool *pool,
     trace_thread_pool_submit(pool, req, arg);
 
     qemu_mutex_lock(&pool->lock);
-    if (pool->idle_threads == 0 && pool->cur_threads < pool->max_threads) {
+    if (pool->idle_threads == 0 && pool->cur_threads < ctx->pool_max) {
         spawn_thread(pool);
     }
     QTAILQ_INSERT_TAIL(&pool->request_list, req, reqs);
@@ -306,11 +309,16 @@ static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx)
     qemu_mutex_init(&pool->lock);
     qemu_cond_init(&pool->worker_stopped);
     qemu_sem_init(&pool->sem, 0);
-    pool->max_threads = 64;
     pool->new_thread_bh = aio_bh_new(ctx, spawn_thread_bh_fn, pool);
 
     QLIST_INIT(&pool->head);
     QTAILQ_INIT(&pool->request_list);
+
+    qemu_mutex_lock(&pool->lock);
+    for (int i = pool->cur_threads; i < ctx->pool_min; i++) {
+        spawn_thread(pool);
+    }
+    qemu_mutex_unlock(&pool->lock);
 }
 
 ThreadPool *thread_pool_new(AioContext *ctx)
@@ -350,3 +358,28 @@ void thread_pool_free(ThreadPool *pool)
     qemu_mutex_destroy(&pool->lock);
     g_free(pool);
 }
+
+void aio_context_set_thread_pool_params(AioContext *ctx, uint64_t min,
+                                        uint64_t max, Error **errp)
+{
+    ThreadPool *pool = ctx->thread_pool;
+
+    if (min > max || !max) {
+        error_setg(errp, "bad thread-pool-min/thread-pool-max values");
+        return;
+    }
+
+    if (pool) {
+        qemu_mutex_lock(&pool->lock);
+    }
+
+    ctx->pool_min = min;
+    ctx->pool_max = max;
+
+    if (pool) {
+        for (int i = pool->cur_threads; i < ctx->pool_min; i++) {
+            spawn_thread(pool);
+        }
+        qemu_mutex_unlock(&pool->lock);
+    }
+}
-- 
2.35.1


Re: [PATCH 3/3] util/event-loop: Introduce options to set the thread pool size
Posted by Stefan Hajnoczi 3 years, 11 months ago
On Mon, Feb 21, 2022 at 06:08:45PM +0100, Nicolas Saenz Julienne wrote:
> The thread pool regulates itself: when idle, it kills threads until
> empty, when in demand, it creates new threads until full. This behaviour
> doesn't play well with latency sensitive workloads where the price of
> creating a new thread is too high. For example, when paired with qemu's
> '-mlock', or using safety features like SafeStack, creating a new thread
> has been measured take multiple milliseconds.
> 
> In order to mitigate this let's introduce a new 'EventLoopBackend'
> property to set the thread pool size. The threads will be created during
> the pool's initialization, remain available during its lifetime
> regardless of demand, and destroyed upon freeing it. A properly
> characterized workload will then be able to configure the pool to avoid
> any latency spike.
> 
> Signed-off-by: Nicolas Saenz Julienne <nsaenzju@redhat.com>
> ---
>  include/block/aio.h | 11 +++++++++++
>  qapi/qom.json       |  4 +++-
>  util/async.c        |  3 +++
>  util/event-loop.c   | 15 ++++++++++++++-
>  util/event-loop.h   |  4 ++++
>  util/main-loop.c    | 13 +++++++++++++
>  util/thread-pool.c  | 41 +++++++++++++++++++++++++++++++++++++----
>  7 files changed, 85 insertions(+), 6 deletions(-)
> 
> diff --git a/include/block/aio.h b/include/block/aio.h
> index 5634173b12..331483d1d1 100644
> --- a/include/block/aio.h
> +++ b/include/block/aio.h
> @@ -192,6 +192,8 @@ struct AioContext {
>      QSLIST_HEAD(, Coroutine) scheduled_coroutines;
>      QEMUBH *co_schedule_bh;
>  
> +    int pool_min;
> +    int pool_max;

Are these fields protected by ThreadPool->lock? Please document. This is
a clue that maybe these fields belong in ThreadPool.

Regarding the field names: the AioContext thread pool field is called
thread_pool and the user-visible parameters are thread-pool-min/max. I
suggest calling the fields thread_pool_min/max too so it's clear which
pool we're talking about and there is a correspondence to user-visible
parameters.

> @@ -350,3 +358,28 @@ void thread_pool_free(ThreadPool *pool)
>      qemu_mutex_destroy(&pool->lock);
>      g_free(pool);
>  }
> +
> +void aio_context_set_thread_pool_params(AioContext *ctx, uint64_t min,
> +                                        uint64_t max, Error **errp)
> +{
> +    ThreadPool *pool = ctx->thread_pool;
> +
> +    if (min > max || !max) {

ctx->pool_min/max are int while the min/max arguments are uint64_t.
Please add an INT_MAX check to detect overflow.

> +        error_setg(errp, "bad thread-pool-min/thread-pool-max values");
> +        return;
> +    }
> +
> +    if (pool) {
> +        qemu_mutex_lock(&pool->lock);
> +    }

This code belongs in util/thread-pool.c. I guess the reason for keeping
the fields in AioContext instead of ThreadPool is because the ThreadPool
is created on demand and we'd have nowhere to store the parameter value.
I suggest we bite the bullet and keep an extra copy of the variables in
AioContext with a clean ThreadPool interface (thread_pool_set_params())
instead of letting AioContext and ThreadPool access each other's
internals.

> +
> +    ctx->pool_min = min;
> +    ctx->pool_max = max;
> +
> +    if (pool) {
> +        for (int i = pool->cur_threads; i < ctx->pool_min; i++) {
> +            spawn_thread(pool);
> +        }

What about the reverse: when min is lowered and there are a bunch of
idle worker threads we could wake them up so they terminate until
->pool_min is reached again?
Re: [PATCH 3/3] util/event-loop: Introduce options to set the thread pool size
Posted by Nicolas Saenz Julienne 3 years, 11 months ago
On Thu, 2022-02-24 at 10:40 +0000, Stefan Hajnoczi wrote:
> On Mon, Feb 21, 2022 at 06:08:45PM +0100, Nicolas Saenz Julienne wrote:
> > The thread pool regulates itself: when idle, it kills threads until
> > empty, when in demand, it creates new threads until full. This behaviour
> > doesn't play well with latency sensitive workloads where the price of
> > creating a new thread is too high. For example, when paired with qemu's
> > '-mlock', or using safety features like SafeStack, creating a new thread
> > has been measured take multiple milliseconds.
> > 
> > In order to mitigate this let's introduce a new 'EventLoopBackend'
> > property to set the thread pool size. The threads will be created during
> > the pool's initialization, remain available during its lifetime
> > regardless of demand, and destroyed upon freeing it. A properly
> > characterized workload will then be able to configure the pool to avoid
> > any latency spike.
> > 
> > Signed-off-by: Nicolas Saenz Julienne <nsaenzju@redhat.com>
> > ---
> >  include/block/aio.h | 11 +++++++++++
> >  qapi/qom.json       |  4 +++-
> >  util/async.c        |  3 +++
> >  util/event-loop.c   | 15 ++++++++++++++-
> >  util/event-loop.h   |  4 ++++
> >  util/main-loop.c    | 13 +++++++++++++
> >  util/thread-pool.c  | 41 +++++++++++++++++++++++++++++++++++++----
> >  7 files changed, 85 insertions(+), 6 deletions(-)
> > 
> > diff --git a/include/block/aio.h b/include/block/aio.h
> > index 5634173b12..331483d1d1 100644
> > --- a/include/block/aio.h
> > +++ b/include/block/aio.h
> > @@ -192,6 +192,8 @@ struct AioContext {
> >      QSLIST_HEAD(, Coroutine) scheduled_coroutines;
> >      QEMUBH *co_schedule_bh;
> >  
> > +    int pool_min;
> > +    int pool_max;
> 
> Are these fields protected by ThreadPool->lock? Please document. This is
> a clue that maybe these fields belong in ThreadPool.

Yes they are. I'll document it properly.

> Regarding the field names: the AioContext thread pool field is called
> thread_pool and the user-visible parameters are thread-pool-min/max. I
> suggest calling the fields thread_pool_min/max too so it's clear which
> pool we're talking about and there is a correspondence to user-visible
> parameters.

Noted.

> > @@ -350,3 +358,28 @@ void thread_pool_free(ThreadPool *pool)
> >      qemu_mutex_destroy(&pool->lock);
> >      g_free(pool);
> >  }
> > +
> > +void aio_context_set_thread_pool_params(AioContext *ctx, uint64_t min,
> > +                                        uint64_t max, Error **errp)
> > +{
> > +    ThreadPool *pool = ctx->thread_pool;
> > +
> > +    if (min > max || !max) {
> 
> ctx->pool_min/max are int while the min/max arguments are uint64_t.
> Please add an INT_MAX check to detect overflow.

Noted.

> > +        error_setg(errp, "bad thread-pool-min/thread-pool-max values");
> > +        return;
> > +    }
> > +
> > +    if (pool) {
> > +        qemu_mutex_lock(&pool->lock);
> > +    }
> 
> This code belongs in util/thread-pool.c. I guess the reason for keeping
> the fields in AioContext instead of ThreadPool is because the ThreadPool
> is created on demand and we'd have nowhere to store the parameter value.

Indeed.

> I suggest we bite the bullet and keep an extra copy of the variables in
> AioContext with a clean ThreadPool interface (thread_pool_set_params())
> instead of letting AioContext and ThreadPool access each other's
> internals.

OK!

> > +
> > +    ctx->pool_min = min;
> > +    ctx->pool_max = max;
> > +
> > +    if (pool) {
> > +        for (int i = pool->cur_threads; i < ctx->pool_min; i++) {
> > +            spawn_thread(pool);
> > +        }
> 
> What about the reverse: when min is lowered and there are a bunch of
> idle worker threads we could wake them up so they terminate until
> ->pool_min is reached again?

Makes sense, I'll look into it.

-- 
Nicolás Sáenz