Since commit f9fc8932b1 ("thread-posix: remove the posix semaphore
support", 2022-04-06) QemuSemaphore has its own mutex and condition
variable; this adds unnecessary overhead on I/O with small block sizes.
Check the QTAILQ directly instead of adding the indirection of a
semaphore's count. Using a semaphore has not been necessary since
qemu_cond_timedwait was introduced; the new code has to be careful about
spurious wakeups but it is simpler, for example thread_pool_cancel does
not have to worry about synchronizing the semaphore count with the number
of elements of pool->request_list.
Note that the return value of qemu_cond_timedwait (0 for timeout, 1 for
signal or spurious wakeup) is different from that of qemu_sem_timedwait
(-1 for timeout, 0 for success).
Reported-by: Lukáš Doktor <ldoktor@redhat.com>
Suggested-by: Stefan Hajnoczi <stefanha@redhat.com>
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
util/thread-pool.c | 68 +++++++++++++++++++---------------------------
1 file changed, 28 insertions(+), 40 deletions(-)
diff --git a/util/thread-pool.c b/util/thread-pool.c
index 4979f30ca3..6bbf24754a 100644
--- a/util/thread-pool.c
+++ b/util/thread-pool.c
@@ -57,7 +57,7 @@ struct ThreadPool {
QEMUBH *completion_bh;
QemuMutex lock;
QemuCond worker_stopped;
- QemuSemaphore sem;
+ QemuCond request_cond;
QEMUBH *new_thread_bh;
/* The following variables are only accessed from one AioContext. */
@@ -74,23 +74,6 @@ struct ThreadPool {
int max_threads;
};
-static inline bool back_to_sleep(ThreadPool *pool, int ret)
-{
- /*
- * The semaphore timed out, we should exit the loop except when:
- * - There is work to do, we raced with the signal.
- * - The max threads threshold just changed, we raced with the signal.
- * - The thread pool forces a minimum number of readily available threads.
- */
- if (ret == -1 && (!QTAILQ_EMPTY(&pool->request_list) ||
- pool->cur_threads > pool->max_threads ||
- pool->cur_threads <= pool->min_threads)) {
- return true;
- }
-
- return false;
-}
-
static void *worker_thread(void *opaque)
{
ThreadPool *pool = opaque;
@@ -99,20 +82,25 @@ static void *worker_thread(void *opaque)
pool->pending_threads--;
do_spawn_thread(pool);
- while (!pool->stopping) {
+ while (!pool->stopping && pool->cur_threads <= pool->max_threads) {
ThreadPoolElement *req;
int ret;
- do {
+ if (QTAILQ_EMPTY(&pool->request_list)) {
pool->idle_threads++;
- qemu_mutex_unlock(&pool->lock);
- ret = qemu_sem_timedwait(&pool->sem, 10000);
- qemu_mutex_lock(&pool->lock);
+ ret = qemu_cond_timedwait(&pool->request_cond, &pool->lock, 10000);
pool->idle_threads--;
- } while (back_to_sleep(pool, ret));
- if (ret == -1 || pool->stopping ||
- pool->cur_threads > pool->max_threads) {
- break;
+ if (ret == 0 &&
+ QTAILQ_EMPTY(&pool->request_list) &&
+ pool->cur_threads > pool->min_threads) {
+ /* Timed out + no work to do + no need for warm threads = exit. */
+ break;
+ }
+ /*
+ * Even if there was some work to do, check if there aren't
+ * too many worker threads before picking it up.
+ */
+ continue;
}
req = QTAILQ_FIRST(&pool->request_list);
@@ -134,6 +122,12 @@ static void *worker_thread(void *opaque)
pool->cur_threads--;
qemu_cond_signal(&pool->worker_stopped);
qemu_mutex_unlock(&pool->lock);
+
+ /*
+ * Wake up another thread, in case we got a wakeup but decided
+ * to exit due to pool->cur_threads > pool->max_threads.
+ */
+ qemu_cond_signal(&pool->worker_stopped);
return NULL;
}
@@ -229,13 +223,7 @@ static void thread_pool_cancel(BlockAIOCB *acb)
trace_thread_pool_cancel(elem, elem->common.opaque);
QEMU_LOCK_GUARD(&pool->lock);
- if (elem->state == THREAD_QUEUED &&
- /* No thread has yet started working on elem. we can try to "steal"
- * the item from the worker if we can get a signal from the
- * semaphore. Because this is non-blocking, we can do it with
- * the lock taken and ensure that elem will remain THREAD_QUEUED.
- */
- qemu_sem_timedwait(&pool->sem, 0) == 0) {
+ if (elem->state == THREAD_QUEUED) {
QTAILQ_REMOVE(&pool->request_list, elem, reqs);
qemu_bh_schedule(pool->completion_bh);
@@ -280,7 +268,7 @@ BlockAIOCB *thread_pool_submit_aio(ThreadPool *pool,
}
QTAILQ_INSERT_TAIL(&pool->request_list, req, reqs);
qemu_mutex_unlock(&pool->lock);
- qemu_sem_post(&pool->sem);
+ qemu_cond_signal(&pool->request_cond);
return &req->common;
}
@@ -323,7 +311,7 @@ void thread_pool_update_params(ThreadPool *pool, AioContext *ctx)
* We either have to:
* - Increase the number available of threads until over the min_threads
* threshold.
- * - Decrease the number of available threads until under the max_threads
+ * - Bump the worker threads so that they exit, until under the max_threads
* threshold.
* - Do nothing. The current number of threads fall in between the min and
* max thresholds. We'll let the pool manage itself.
@@ -333,7 +321,7 @@ void thread_pool_update_params(ThreadPool *pool, AioContext *ctx)
}
for (int i = pool->cur_threads; i > pool->max_threads; i--) {
- qemu_sem_post(&pool->sem);
+ qemu_cond_signal(&pool->request_cond);
}
qemu_mutex_unlock(&pool->lock);
@@ -350,7 +338,7 @@ static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx)
pool->completion_bh = aio_bh_new(ctx, thread_pool_completion_bh, pool);
qemu_mutex_init(&pool->lock);
qemu_cond_init(&pool->worker_stopped);
- qemu_sem_init(&pool->sem, 0);
+ qemu_cond_init(&pool->request_cond);
pool->new_thread_bh = aio_bh_new(ctx, spawn_thread_bh_fn, pool);
QLIST_INIT(&pool->head);
@@ -383,15 +371,15 @@ void thread_pool_free(ThreadPool *pool)
/* Wait for worker threads to terminate */
pool->stopping = true;
+ qemu_cond_broadcast(&pool->request_cond);
while (pool->cur_threads > 0) {
- qemu_sem_post(&pool->sem);
qemu_cond_wait(&pool->worker_stopped, &pool->lock);
}
qemu_mutex_unlock(&pool->lock);
qemu_bh_delete(pool->completion_bh);
- qemu_sem_destroy(&pool->sem);
+ qemu_cond_destroy(&pool->request_cond);
qemu_cond_destroy(&pool->worker_stopped);
qemu_mutex_destroy(&pool->lock);
g_free(pool);
--
2.36.0
On Sat, May 14, 2022 at 08:50:11AM +0200, Paolo Bonzini wrote: > @@ -134,6 +122,12 @@ static void *worker_thread(void *opaque) > pool->cur_threads--; > qemu_cond_signal(&pool->worker_stopped); > qemu_mutex_unlock(&pool->lock); > + > + /* > + * Wake up another thread, in case we got a wakeup but decided > + * to exit due to pool->cur_threads > pool->max_threads. > + */ > + qemu_cond_signal(&pool->worker_stopped); &pool->worker_stopped? Was this supposed to be &pool->request_cond?
On Tue, May 17, 2022 at 5:20 PM Stefan Hajnoczi <stefanha@redhat.com> wrote: > > On Sat, May 14, 2022 at 08:50:11AM +0200, Paolo Bonzini wrote: > > @@ -134,6 +122,12 @@ static void *worker_thread(void *opaque) > > pool->cur_threads--; > > qemu_cond_signal(&pool->worker_stopped); > > qemu_mutex_unlock(&pool->lock); > > + > > + /* > > + * Wake up another thread, in case we got a wakeup but decided > > + * to exit due to pool->cur_threads > pool->max_threads. > > + */ > > + qemu_cond_signal(&pool->worker_stopped); > > &pool->worker_stopped? Was this supposed to be &pool->request_cond? Yes, of course. Paolo
Hi Paolo,
On Sat, 2022-05-14 at 08:50 +0200, Paolo Bonzini wrote:
[...]
> static void *worker_thread(void *opaque)
> {
> ThreadPool *pool = opaque;
> @@ -99,20 +82,25 @@ static void *worker_thread(void *opaque)
> pool->pending_threads--;
> do_spawn_thread(pool);
>
> - while (!pool->stopping) {
> + while (!pool->stopping && pool->cur_threads <= pool->max_threads) {
> ThreadPoolElement *req;
> int ret;
>
> - do {
> + if (QTAILQ_EMPTY(&pool->request_list)) {
> pool->idle_threads++;
> - qemu_mutex_unlock(&pool->lock);
> - ret = qemu_sem_timedwait(&pool->sem, 10000);
> - qemu_mutex_lock(&pool->lock);
> + ret = qemu_cond_timedwait(&pool->request_cond, &pool->lock, 10000);
> pool->idle_threads--;
> - } while (back_to_sleep(pool, ret));
> - if (ret == -1 || pool->stopping ||
> - pool->cur_threads > pool->max_threads) {
> - break;
> + if (ret == 0 &&
> + QTAILQ_EMPTY(&pool->request_list) &&
> + pool->cur_threads > pool->min_threads) {
> + /* Timed out + no work to do + no need for warm threads = exit. */
> + break;
> + }
Some comments:
- A completely idle pool will now never be able to lose its threads, as the
'pool->cur_threads <= pool->max_threads' condition is only checked after a
non-timeout wakeup.
- You don't take into account the possibility of being woken up with an empty
queue. Which I belive possible:
CPU0: CPU1:
// in worker_thread(), queue empty
qemu_cond_timedwait();
acb = thread_pool_submit_aio();
...
qemu_cond_signal();
thread_pool_cancel(acb);
// wakes-up, ret == 1
req = QTAILQ_FIRST(&pool->request_list);
QTAILQ_REMOVE(&pool->request_list, req, reqs);
// segmentation fault....
------------------------------------------------------
CPU0: CPU1:
// in worker_thread(), queue empty
qemu_cond_timedwait();
thread_pool_free()
...
qemu_cond_broadcast();
// wakes-up, ret == 1
req = QTAILQ_FIRST(&pool->request_list);
QTAILQ_REMOVE(&pool->request_list, req, reqs);
// segmentation fault....
Regards,
--
Nicolás Sáenz
On 5/17/22 14:46, Nicolas Saenz Julienne wrote:
>> - while (!pool->stopping) {
>> + while (!pool->stopping && pool->cur_threads <= pool->max_threads) {
>> ThreadPoolElement *req;
>> int ret;
>>
>> - do {
>> + if (QTAILQ_EMPTY(&pool->request_list)) {
>> pool->idle_threads++;
>> - qemu_mutex_unlock(&pool->lock);
>> - ret = qemu_sem_timedwait(&pool->sem, 10000);
>> - qemu_mutex_lock(&pool->lock);
>> + ret = qemu_cond_timedwait(&pool->request_cond, &pool->lock, 10000);
>> pool->idle_threads--;
>> - } while (back_to_sleep(pool, ret));
>> - if (ret == -1 || pool->stopping ||
>> - pool->cur_threads > pool->max_threads) {
>> - break;
>> + if (ret == 0 &&
>> + QTAILQ_EMPTY(&pool->request_list) &&
>> + pool->cur_threads > pool->min_threads) {
>> + /* Timed out + no work to do + no need for warm threads = exit. */
>> + break;
>> + }
>
> Some comments:
>
> - A completely idle pool will now never be able to lose its threads, as the
> 'pool->cur_threads <= pool->max_threads' condition is only checked after a
> non-timeout wakeup.
Are you sure? The full code is:
ret = qemu_cond_timedwait(&pool->request_cond, &pool->lock, 10000);
pool->idle_threads--;
if (ret == 0 &&
QTAILQ_EMPTY(&pool->request_list) &&
pool->cur_threads > pool->min_threads) {
/* Timed out + no work to do + no need for warm threads exit. */
break;
}
/*
* Even if there was some work to do, check if there aren't
* too many worker threads before picking it up.
*/
continue;
That is, it won't immediately pick up the job after _any_ wait,
whether successful or not. It will first of all "continue" to
check pool->cur_threads <= pool->max_threads.
This is also the reason why I had to add a qemu_cond_signal() at the
bottom of the worker thread (because maybe it got a signal to act on a
non-empty queue, but decided to exit instead).
> - You don't take into account the possibility of being woken up with an empty
> queue. Which I belive possible:
It's absolutely possible, but the difference between v2 and v3 _should_
be the fix. Of course I could have screwed up, but it seems correct
this time.
Paolo
On Tue, 2022-05-17 at 16:18 +0200, Paolo Bonzini wrote:
> On 5/17/22 14:46, Nicolas Saenz Julienne wrote:
> > > - while (!pool->stopping) {
> > > + while (!pool->stopping && pool->cur_threads <= pool->max_threads) {
> > > ThreadPoolElement *req;
> > > int ret;
> > >
> > > - do {
> > > + if (QTAILQ_EMPTY(&pool->request_list)) {
> > > pool->idle_threads++;
> > > - qemu_mutex_unlock(&pool->lock);
> > > - ret = qemu_sem_timedwait(&pool->sem, 10000);
> > > - qemu_mutex_lock(&pool->lock);
> > > + ret = qemu_cond_timedwait(&pool->request_cond, &pool->lock, 10000);
> > > pool->idle_threads--;
> > > - } while (back_to_sleep(pool, ret));
> > > - if (ret == -1 || pool->stopping ||
> > > - pool->cur_threads > pool->max_threads) {
> > > - break;
> > > + if (ret == 0 &&
> > > + QTAILQ_EMPTY(&pool->request_list) &&
> > > + pool->cur_threads > pool->min_threads) {
> > > + /* Timed out + no work to do + no need for warm threads = exit. */
> > > + break;
> > > + }
> >
> > Some comments:
> >
> > - A completely idle pool will now never be able to lose its threads, as the
> > 'pool->cur_threads <= pool->max_threads' condition is only checked after a
> > non-timeout wakeup.
>
> Are you sure? The full code is:
>
> ret = qemu_cond_timedwait(&pool->request_cond, &pool->lock, 10000);
> pool->idle_threads--;
> if (ret == 0 &&
> QTAILQ_EMPTY(&pool->request_list) &&
> pool->cur_threads > pool->min_threads) {
> /* Timed out + no work to do + no need for warm threads exit. */
> break;
> }
> /*
> * Even if there was some work to do, check if there aren't
> * too many worker threads before picking it up.
> */
> continue;
>
OK, I somehow missed this 'continue', I wonder if I managed to re-review v2 by
accident... Anyway, it looks fine to me now.
Regards,
--
Nicolás Sáenz
© 2016 - 2026 Red Hat, Inc.