[PATCH v3 2/3] thread-pool: replace semaphore with condition variable

Paolo Bonzini posted 3 patches 3 years, 8 months ago
[PATCH v3 2/3] thread-pool: replace semaphore with condition variable
Posted by Paolo Bonzini 3 years, 8 months ago
Since commit f9fc8932b1 ("thread-posix: remove the posix semaphore
support", 2022-04-06) QemuSemaphore has its own mutex and condition
variable; this adds unnecessary overhead on I/O with small block sizes.

Check the QTAILQ directly instead of adding the indirection of a
semaphore's count.  Using a semaphore has not been necessary since
qemu_cond_timedwait was introduced; the new code has to be careful about
spurious wakeups but it is simpler, for example thread_pool_cancel does
not have to worry about synchronizing the semaphore count with the number
of elements of pool->request_list.

Note that the return value of qemu_cond_timedwait (0 for timeout, 1 for
signal or spurious wakeup) is different from that of qemu_sem_timedwait
(-1 for timeout, 0 for success).

Reported-by: Lukáš Doktor <ldoktor@redhat.com>
Suggested-by: Stefan Hajnoczi <stefanha@redhat.com>
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 util/thread-pool.c | 68 +++++++++++++++++++---------------------------
 1 file changed, 28 insertions(+), 40 deletions(-)

diff --git a/util/thread-pool.c b/util/thread-pool.c
index 4979f30ca3..6bbf24754a 100644
--- a/util/thread-pool.c
+++ b/util/thread-pool.c
@@ -57,7 +57,7 @@ struct ThreadPool {
     QEMUBH *completion_bh;
     QemuMutex lock;
     QemuCond worker_stopped;
-    QemuSemaphore sem;
+    QemuCond request_cond;
     QEMUBH *new_thread_bh;
 
     /* The following variables are only accessed from one AioContext. */
@@ -74,23 +74,6 @@ struct ThreadPool {
     int max_threads;
 };
 
-static inline bool back_to_sleep(ThreadPool *pool, int ret)
-{
-    /*
-     * The semaphore timed out, we should exit the loop except when:
-     *  - There is work to do, we raced with the signal.
-     *  - The max threads threshold just changed, we raced with the signal.
-     *  - The thread pool forces a minimum number of readily available threads.
-     */
-    if (ret == -1 && (!QTAILQ_EMPTY(&pool->request_list) ||
-            pool->cur_threads > pool->max_threads ||
-            pool->cur_threads <= pool->min_threads)) {
-            return true;
-    }
-
-    return false;
-}
-
 static void *worker_thread(void *opaque)
 {
     ThreadPool *pool = opaque;
@@ -99,20 +82,25 @@ static void *worker_thread(void *opaque)
     pool->pending_threads--;
     do_spawn_thread(pool);
 
-    while (!pool->stopping) {
+    while (!pool->stopping && pool->cur_threads <= pool->max_threads) {
         ThreadPoolElement *req;
         int ret;
 
-        do {
+        if (QTAILQ_EMPTY(&pool->request_list)) {
             pool->idle_threads++;
-            qemu_mutex_unlock(&pool->lock);
-            ret = qemu_sem_timedwait(&pool->sem, 10000);
-            qemu_mutex_lock(&pool->lock);
+            ret = qemu_cond_timedwait(&pool->request_cond, &pool->lock, 10000);
             pool->idle_threads--;
-        } while (back_to_sleep(pool, ret));
-        if (ret == -1 || pool->stopping ||
-            pool->cur_threads > pool->max_threads) {
-            break;
+            if (ret == 0 &&
+                QTAILQ_EMPTY(&pool->request_list) &&
+                pool->cur_threads > pool->min_threads) {
+                /* Timed out + no work to do + no need for warm threads = exit.  */
+                break;
+            }
+            /*
+             * Even if there was some work to do, check if there aren't
+             * too many worker threads before picking it up.
+             */
+            continue;
         }
 
         req = QTAILQ_FIRST(&pool->request_list);
@@ -134,6 +122,12 @@ static void *worker_thread(void *opaque)
     pool->cur_threads--;
     qemu_cond_signal(&pool->worker_stopped);
     qemu_mutex_unlock(&pool->lock);
+
+    /*
+     * Wake up another thread, in case we got a wakeup but decided
+     * to exit due to pool->cur_threads > pool->max_threads.
+     */
+    qemu_cond_signal(&pool->worker_stopped);
     return NULL;
 }
 
@@ -229,13 +223,7 @@ static void thread_pool_cancel(BlockAIOCB *acb)
     trace_thread_pool_cancel(elem, elem->common.opaque);
 
     QEMU_LOCK_GUARD(&pool->lock);
-    if (elem->state == THREAD_QUEUED &&
-        /* No thread has yet started working on elem. we can try to "steal"
-         * the item from the worker if we can get a signal from the
-         * semaphore.  Because this is non-blocking, we can do it with
-         * the lock taken and ensure that elem will remain THREAD_QUEUED.
-         */
-        qemu_sem_timedwait(&pool->sem, 0) == 0) {
+    if (elem->state == THREAD_QUEUED) {
         QTAILQ_REMOVE(&pool->request_list, elem, reqs);
         qemu_bh_schedule(pool->completion_bh);
 
@@ -280,7 +268,7 @@ BlockAIOCB *thread_pool_submit_aio(ThreadPool *pool,
     }
     QTAILQ_INSERT_TAIL(&pool->request_list, req, reqs);
     qemu_mutex_unlock(&pool->lock);
-    qemu_sem_post(&pool->sem);
+    qemu_cond_signal(&pool->request_cond);
     return &req->common;
 }
 
@@ -323,7 +311,7 @@ void thread_pool_update_params(ThreadPool *pool, AioContext *ctx)
      * We either have to:
      *  - Increase the number available of threads until over the min_threads
      *    threshold.
-     *  - Decrease the number of available threads until under the max_threads
+     *  - Bump the worker threads so that they exit, until under the max_threads
      *    threshold.
      *  - Do nothing. The current number of threads fall in between the min and
      *    max thresholds. We'll let the pool manage itself.
@@ -333,7 +321,7 @@ void thread_pool_update_params(ThreadPool *pool, AioContext *ctx)
     }
 
     for (int i = pool->cur_threads; i > pool->max_threads; i--) {
-        qemu_sem_post(&pool->sem);
+        qemu_cond_signal(&pool->request_cond);
     }
 
     qemu_mutex_unlock(&pool->lock);
@@ -350,7 +338,7 @@ static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx)
     pool->completion_bh = aio_bh_new(ctx, thread_pool_completion_bh, pool);
     qemu_mutex_init(&pool->lock);
     qemu_cond_init(&pool->worker_stopped);
-    qemu_sem_init(&pool->sem, 0);
+    qemu_cond_init(&pool->request_cond);
     pool->new_thread_bh = aio_bh_new(ctx, spawn_thread_bh_fn, pool);
 
     QLIST_INIT(&pool->head);
@@ -383,15 +371,15 @@ void thread_pool_free(ThreadPool *pool)
 
     /* Wait for worker threads to terminate */
     pool->stopping = true;
+    qemu_cond_broadcast(&pool->request_cond);
     while (pool->cur_threads > 0) {
-        qemu_sem_post(&pool->sem);
         qemu_cond_wait(&pool->worker_stopped, &pool->lock);
     }
 
     qemu_mutex_unlock(&pool->lock);
 
     qemu_bh_delete(pool->completion_bh);
-    qemu_sem_destroy(&pool->sem);
+    qemu_cond_destroy(&pool->request_cond);
     qemu_cond_destroy(&pool->worker_stopped);
     qemu_mutex_destroy(&pool->lock);
     g_free(pool);
-- 
2.36.0


Re: [PATCH v3 2/3] thread-pool: replace semaphore with condition variable
Posted by Stefan Hajnoczi 3 years, 8 months ago
On Sat, May 14, 2022 at 08:50:11AM +0200, Paolo Bonzini wrote:
> @@ -134,6 +122,12 @@ static void *worker_thread(void *opaque)
>      pool->cur_threads--;
>      qemu_cond_signal(&pool->worker_stopped);
>      qemu_mutex_unlock(&pool->lock);
> +
> +    /*
> +     * Wake up another thread, in case we got a wakeup but decided
> +     * to exit due to pool->cur_threads > pool->max_threads.
> +     */
> +    qemu_cond_signal(&pool->worker_stopped);

&pool->worker_stopped? Was this supposed to be &pool->request_cond?
Re: [PATCH v3 2/3] thread-pool: replace semaphore with condition variable
Posted by Paolo Bonzini 3 years, 8 months ago
On Tue, May 17, 2022 at 5:20 PM Stefan Hajnoczi <stefanha@redhat.com> wrote:
>
> On Sat, May 14, 2022 at 08:50:11AM +0200, Paolo Bonzini wrote:
> > @@ -134,6 +122,12 @@ static void *worker_thread(void *opaque)
> >      pool->cur_threads--;
> >      qemu_cond_signal(&pool->worker_stopped);
> >      qemu_mutex_unlock(&pool->lock);
> > +
> > +    /*
> > +     * Wake up another thread, in case we got a wakeup but decided
> > +     * to exit due to pool->cur_threads > pool->max_threads.
> > +     */
> > +    qemu_cond_signal(&pool->worker_stopped);
>
> &pool->worker_stopped? Was this supposed to be &pool->request_cond?

Yes, of course.

Paolo
Re: [PATCH v3 2/3] thread-pool: replace semaphore with condition variable
Posted by Nicolas Saenz Julienne 3 years, 8 months ago
Hi Paolo,

On Sat, 2022-05-14 at 08:50 +0200, Paolo Bonzini wrote:

[...]

>  static void *worker_thread(void *opaque)
>  {
>      ThreadPool *pool = opaque;
> @@ -99,20 +82,25 @@ static void *worker_thread(void *opaque)
>      pool->pending_threads--;
>      do_spawn_thread(pool);
>  
> -    while (!pool->stopping) {
> +    while (!pool->stopping && pool->cur_threads <= pool->max_threads) {
>          ThreadPoolElement *req;
>          int ret;
>  
> -        do {
> +        if (QTAILQ_EMPTY(&pool->request_list)) {
>              pool->idle_threads++;
> -            qemu_mutex_unlock(&pool->lock);
> -            ret = qemu_sem_timedwait(&pool->sem, 10000);
> -            qemu_mutex_lock(&pool->lock);
> +            ret = qemu_cond_timedwait(&pool->request_cond, &pool->lock, 10000);
>              pool->idle_threads--;
> -        } while (back_to_sleep(pool, ret));
> -        if (ret == -1 || pool->stopping ||
> -            pool->cur_threads > pool->max_threads) {
> -            break;
> +            if (ret == 0 &&
> +                QTAILQ_EMPTY(&pool->request_list) &&
> +                pool->cur_threads > pool->min_threads) {
> +                /* Timed out + no work to do + no need for warm threads = exit.  */
> +                break;
> +            }

 Some comments:

- A completely idle pool will now never be able to lose its threads, as the
  'pool->cur_threads <= pool->max_threads' condition is only checked after a
  non-timeout wakeup.

- You don't take into account the possibility of being woken up with an empty
  queue. Which I belive possible:

	CPU0:					      CPU1:
						   // in worker_thread(), queue empty
						   qemu_cond_timedwait();

	acb = thread_pool_submit_aio();
		    ...
	    qemu_cond_signal();
	thread_pool_cancel(acb);
						   // wakes-up, ret == 1
					           req = QTAILQ_FIRST(&pool->request_list);
					           QTAILQ_REMOVE(&pool->request_list, req, reqs);
						   // segmentation fault....

		------------------------------------------------------

	CPU0:					      CPU1:
						   // in worker_thread(), queue empty
						   qemu_cond_timedwait();
	thread_pool_free()
		...
            qemu_cond_broadcast();
						   // wakes-up, ret == 1
					           req = QTAILQ_FIRST(&pool->request_list);
					           QTAILQ_REMOVE(&pool->request_list, req, reqs);
						   // segmentation fault....

Regards,

-- 
Nicolás Sáenz
Re: [PATCH v3 2/3] thread-pool: replace semaphore with condition variable
Posted by Paolo Bonzini 3 years, 8 months ago
On 5/17/22 14:46, Nicolas Saenz Julienne wrote:
>> -    while (!pool->stopping) {
>> +    while (!pool->stopping && pool->cur_threads <= pool->max_threads) {
>>           ThreadPoolElement *req;
>>           int ret;
>>   
>> -        do {
>> +        if (QTAILQ_EMPTY(&pool->request_list)) {
>>               pool->idle_threads++;
>> -            qemu_mutex_unlock(&pool->lock);
>> -            ret = qemu_sem_timedwait(&pool->sem, 10000);
>> -            qemu_mutex_lock(&pool->lock);
>> +            ret = qemu_cond_timedwait(&pool->request_cond, &pool->lock, 10000);
>>               pool->idle_threads--;
>> -        } while (back_to_sleep(pool, ret));
>> -        if (ret == -1 || pool->stopping ||
>> -            pool->cur_threads > pool->max_threads) {
>> -            break;
>> +            if (ret == 0 &&
>> +                QTAILQ_EMPTY(&pool->request_list) &&
>> +                pool->cur_threads > pool->min_threads) {
>> +                /* Timed out + no work to do + no need for warm threads = exit.  */
>> +                break;
>> +            }
> 
>   Some comments:
> 
> - A completely idle pool will now never be able to lose its threads, as the
>    'pool->cur_threads <= pool->max_threads' condition is only checked after a
>    non-timeout wakeup.

Are you sure?  The full code is:

             ret = qemu_cond_timedwait(&pool->request_cond, &pool->lock, 10000);
             pool->idle_threads--;
             if (ret == 0 &&
                 QTAILQ_EMPTY(&pool->request_list) &&
                 pool->cur_threads > pool->min_threads) {
                 /* Timed out + no work to do + no need for warm threads  exit.  */
                 break;
             }
             /*
              * Even if there was some work to do, check if there aren't
              * too many worker threads before picking it up.
              */
             continue;

That is, it won't immediately pick up the job after _any_ wait,
whether successful or not.  It will first of all "continue" to
check pool->cur_threads <= pool->max_threads.

This is also the reason why I had to add a qemu_cond_signal() at the
bottom of the worker thread (because maybe it got a signal to act on a
non-empty queue, but decided to exit instead).

> - You don't take into account the possibility of being woken up with an empty
>    queue. Which I belive possible:

It's absolutely possible, but the difference between v2 and v3 _should_
be the fix.  Of course I could have screwed up, but it seems correct
this time.

Paolo
Re: [PATCH v3 2/3] thread-pool: replace semaphore with condition variable
Posted by Nicolas Saenz Julienne 3 years, 8 months ago
On Tue, 2022-05-17 at 16:18 +0200, Paolo Bonzini wrote:
> On 5/17/22 14:46, Nicolas Saenz Julienne wrote:
> > > -    while (!pool->stopping) {
> > > +    while (!pool->stopping && pool->cur_threads <= pool->max_threads) {
> > >           ThreadPoolElement *req;
> > >           int ret;
> > >   
> > > -        do {
> > > +        if (QTAILQ_EMPTY(&pool->request_list)) {
> > >               pool->idle_threads++;
> > > -            qemu_mutex_unlock(&pool->lock);
> > > -            ret = qemu_sem_timedwait(&pool->sem, 10000);
> > > -            qemu_mutex_lock(&pool->lock);
> > > +            ret = qemu_cond_timedwait(&pool->request_cond, &pool->lock, 10000);
> > >               pool->idle_threads--;
> > > -        } while (back_to_sleep(pool, ret));
> > > -        if (ret == -1 || pool->stopping ||
> > > -            pool->cur_threads > pool->max_threads) {
> > > -            break;
> > > +            if (ret == 0 &&
> > > +                QTAILQ_EMPTY(&pool->request_list) &&
> > > +                pool->cur_threads > pool->min_threads) {
> > > +                /* Timed out + no work to do + no need for warm threads = exit.  */
> > > +                break;
> > > +            }
> > 
> >   Some comments:
> > 
> > - A completely idle pool will now never be able to lose its threads, as the
> >    'pool->cur_threads <= pool->max_threads' condition is only checked after a
> >    non-timeout wakeup.
> 
> Are you sure?  The full code is:
> 
>              ret = qemu_cond_timedwait(&pool->request_cond, &pool->lock, 10000);
>              pool->idle_threads--;
>              if (ret == 0 &&
>                  QTAILQ_EMPTY(&pool->request_list) &&
>                  pool->cur_threads > pool->min_threads) {
>                  /* Timed out + no work to do + no need for warm threads  exit.  */
>                  break;
>              }
>              /*
>               * Even if there was some work to do, check if there aren't
>               * too many worker threads before picking it up.
>               */
>              continue;
> 

OK, I somehow missed this 'continue', I wonder if I managed to re-review v2 by
accident... Anyway, it looks fine to me now.

Regards,

-- 
Nicolás Sáenz