A feature of the current rwlock is that if multiple coroutines hold a
reader lock, all must be runnable. The unlock implementation relies on
this, choosing to wake a single coroutine when the final read lock
holder exits the critical section, assuming that it will wake a
coroutine attempting to acquire a write lock.
The downgrade implementation violates this assumption by creating a
read lock owning coroutine that is exclusively runnable - any other
coroutines that are waiting to acquire a read lock are *not* made
runnable when the write lock holder converts its ownership to read
only.
To fix this, keep the queue of waiters explicitly in the CoRwLock
instead of using CoQueue, and store for each whether it is a
potential reader or a writer. This way, downgrade can look at the
first queued coroutines and wake it if it is a reader, causing
all other readers to be released in turn.
Reported-by: David Edmondson <david.edmondson@oracle.com>
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
include/qemu/coroutine.h | 10 ++-
util/qemu-coroutine-lock.c | 150 ++++++++++++++++++++++++-------------
2 files changed, 106 insertions(+), 54 deletions(-)
diff --git a/include/qemu/coroutine.h b/include/qemu/coroutine.h
index 84eab6e3bf..ae62d4bc8d 100644
--- a/include/qemu/coroutine.h
+++ b/include/qemu/coroutine.h
@@ -237,11 +237,15 @@ bool qemu_co_enter_next_impl(CoQueue *queue, QemuLockable *lock);
bool qemu_co_queue_empty(CoQueue *queue);
+typedef struct CoRwTicket CoRwTicket;
typedef struct CoRwlock {
- int pending_writer;
- int reader;
CoMutex mutex;
- CoQueue queue;
+
+ /* Number of readers, of -1 if owned for writing. */
+ int owner;
+
+ /* Waiting coroutines. */
+ QSIMPLEQ_HEAD(, CoRwTicket) tickets;
} CoRwlock;
/**
diff --git a/util/qemu-coroutine-lock.c b/util/qemu-coroutine-lock.c
index eb73cf11dc..655634d185 100644
--- a/util/qemu-coroutine-lock.c
+++ b/util/qemu-coroutine-lock.c
@@ -327,11 +327,70 @@ void coroutine_fn qemu_co_mutex_unlock(CoMutex *mutex)
trace_qemu_co_mutex_unlock_return(mutex, self);
}
+struct CoRwTicket {
+ bool read;
+ Coroutine *co;
+ QSIMPLEQ_ENTRY(CoRwTicket) next;
+};
+
void qemu_co_rwlock_init(CoRwlock *lock)
{
- memset(lock, 0, sizeof(*lock));
- qemu_co_queue_init(&lock->queue);
qemu_co_mutex_init(&lock->mutex);
+ lock->owner = 0;
+ QSIMPLEQ_INIT(&lock->tickets);
+}
+
+/* Releases the internal CoMutex. */
+static void qemu_co_rwlock_maybe_wake_one(CoRwlock *lock)
+{
+ CoRwTicket *tkt = QSIMPLEQ_FIRST(&lock->tickets);
+ Coroutine *co = NULL;
+
+ /*
+ * Setting lock->owner here prevents rdlock and wrlock from
+ * sneaking in between unlock and wake.
+ */
+
+ if (tkt) {
+ if (tkt->read) {
+ if (lock->owner >= 0) {
+ lock->owner++;
+ co = tkt->co;
+ }
+ } else {
+ if (lock->owner == 0) {
+ lock->owner = -1;
+ co = tkt->co;
+ }
+ }
+ }
+
+ if (co) {
+ QSIMPLEQ_REMOVE_HEAD(&lock->tickets, next);
+ qemu_co_mutex_unlock(&lock->mutex);
+ aio_co_wake(co);
+ } else {
+ qemu_co_mutex_unlock(&lock->mutex);
+ }
+}
+
+/* Releases the internal CoMutex. */
+static void qemu_co_rwlock_sleep(bool read, CoRwlock *lock)
+{
+ CoRwTicket my_ticket = { read, qemu_coroutine_self() };
+
+ QSIMPLEQ_INSERT_TAIL(&lock->tickets, &my_ticket, next);
+ qemu_co_mutex_unlock(&lock->mutex);
+ qemu_coroutine_yield();
+
+ if (read) {
+ /* Possibly wake another reader, which will wake the next in line. */
+ assert(lock->owner >= 1);
+ qemu_co_mutex_lock(&lock->mutex);
+ qemu_co_rwlock_maybe_wake_one(lock);
+ } else {
+ assert(lock->owner == -1);
+ }
}
void qemu_co_rwlock_rdlock(CoRwlock *lock)
@@ -339,13 +398,13 @@ void qemu_co_rwlock_rdlock(CoRwlock *lock)
qemu_co_mutex_lock(&lock->mutex);
/* For fairness, wait if a writer is in line. */
- while (lock->pending_writer) {
- qemu_co_queue_wait(&lock->queue, &lock->mutex);
+ if (lock->owner == 0 || (lock->owner > 0 && QSIMPLEQ_EMPTY(&lock->tickets))) {
+ lock->owner++;
+ qemu_co_mutex_unlock(&lock->mutex);
+ } else {
+ qemu_co_rwlock_sleep(true, lock);
}
- lock->reader++;
- qemu_co_mutex_unlock(&lock->mutex);
- /* The rest of the read-side critical section is run without the mutex. */
self->locks_held++;
}
@@ -355,69 +413,58 @@ void qemu_co_rwlock_unlock(CoRwlock *lock)
Coroutine *self = qemu_coroutine_self();
assert(qemu_in_coroutine());
- if (!lock->reader) {
- /* The critical section started in qemu_co_rwlock_wrlock. */
- qemu_co_queue_restart_all(&lock->queue);
+ self->locks_held--;
+
+ qemu_co_mutex_lock(&lock->mutex);
+ if (lock->owner == -1) {
+ lock->owner = 0;
} else {
- self->locks_held--;
+ lock->owner--;
+ }
- qemu_co_mutex_lock(&lock->mutex);
- lock->reader--;
- assert(lock->reader >= 0);
- /* Wakeup only one waiting writer */
- if (!lock->reader) {
- qemu_co_queue_next(&lock->queue);
- }
+ if (lock->owner == 0) {
+ qemu_co_rwlock_maybe_wake_one(lock);
+ } else {
+ qemu_co_mutex_unlock(&lock->mutex);
}
- qemu_co_mutex_unlock(&lock->mutex);
}
void qemu_co_rwlock_downgrade(CoRwlock *lock)
{
- Coroutine *self = qemu_coroutine_self();
-
- /* lock->mutex critical section started in qemu_co_rwlock_wrlock or
- * qemu_co_rwlock_upgrade.
- */
- assert(lock->reader == 0);
- lock->reader++;
- qemu_co_mutex_unlock(&lock->mutex);
+ qemu_co_mutex_lock(&lock->mutex);
+ assert(lock->owner == -1);
+ lock->owner = 1;
- /* The rest of the read-side critical section is run without the mutex. */
- self->locks_held++;
+ /* Possibly wake another reader, which will wake the next in line. */
+ qemu_co_rwlock_maybe_wake_one(lock);
}
void qemu_co_rwlock_wrlock(CoRwlock *lock)
{
+ Coroutine *self = qemu_coroutine_self();
+
qemu_co_mutex_lock(&lock->mutex);
- lock->pending_writer++;
- while (lock->reader) {
- qemu_co_queue_wait(&lock->queue, &lock->mutex);
+ if (lock->owner == 0) {
+ lock->owner = -1;
+ qemu_co_mutex_unlock(&lock->mutex);
+ } else {
+ qemu_co_rwlock_sleep(false, lock);
}
- lock->pending_writer--;
- /* The rest of the write-side critical section is run with
- * the mutex taken, so that lock->reader remains zero.
- * There is no need to update self->locks_held.
- */
+ self->locks_held++;
}
void qemu_co_rwlock_upgrade(CoRwlock *lock)
{
- Coroutine *self = qemu_coroutine_self();
-
qemu_co_mutex_lock(&lock->mutex);
- assert(lock->reader > 0);
- lock->reader--;
- lock->pending_writer++;
- while (lock->reader) {
- qemu_co_queue_wait(&lock->queue, &lock->mutex);
+ assert(lock->owner > 0);
+ /* For fairness, wait if a writer is in line. */
+ if (lock->owner == 1 && QSIMPLEQ_EMPTY(&lock->tickets)) {
+ lock->owner = -1;
+ qemu_co_mutex_unlock(&lock->mutex);
+ } else {
+ lock->owner--;
+ qemu_co_rwlock_sleep(false, lock);
}
- lock->pending_writer--;
- /* The rest of the write-side critical section is run with
- * the mutex taken, similar to qemu_co_rwlock_wrlock. Do
- * not account for the lock twice in self->locks_held.
- */
- self->locks_held--;
}
--
2.29.2
On Tuesday, 2021-03-16 at 17:00:06 +01, Paolo Bonzini wrote:
> A feature of the current rwlock is that if multiple coroutines hold a
> reader lock, all must be runnable. The unlock implementation relies on
> this, choosing to wake a single coroutine when the final read lock
> holder exits the critical section, assuming that it will wake a
> coroutine attempting to acquire a write lock.
>
> The downgrade implementation violates this assumption by creating a
> read lock owning coroutine that is exclusively runnable - any other
> coroutines that are waiting to acquire a read lock are *not* made
> runnable when the write lock holder converts its ownership to read
> only.
>
> To fix this, keep the queue of waiters explicitly in the CoRwLock
> instead of using CoQueue, and store for each whether it is a
> potential reader or a writer. This way, downgrade can look at the
> first queued coroutines and wake it if it is a reader, causing
> all other readers to be released in turn.
>
> Reported-by: David Edmondson <david.edmondson@oracle.com>
> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
> ---
> include/qemu/coroutine.h | 10 ++-
> util/qemu-coroutine-lock.c | 150 ++++++++++++++++++++++++-------------
> 2 files changed, 106 insertions(+), 54 deletions(-)
>
> diff --git a/include/qemu/coroutine.h b/include/qemu/coroutine.h
> index 84eab6e3bf..ae62d4bc8d 100644
> --- a/include/qemu/coroutine.h
> +++ b/include/qemu/coroutine.h
> @@ -237,11 +237,15 @@ bool qemu_co_enter_next_impl(CoQueue *queue, QemuLockable *lock);
> bool qemu_co_queue_empty(CoQueue *queue);
>
>
> +typedef struct CoRwTicket CoRwTicket;
> typedef struct CoRwlock {
> - int pending_writer;
> - int reader;
> CoMutex mutex;
> - CoQueue queue;
> +
> + /* Number of readers, of -1 if owned for writing. */
s/, of/, or/
> + int owner;
> +
> + /* Waiting coroutines. */
> + QSIMPLEQ_HEAD(, CoRwTicket) tickets;
> } CoRwlock;
Isn't this...
* ... Also, @qemu_co_rwlock_upgrade
* only overrides CoRwlock fairness if there are no concurrent readers, so
* another writer might run while @qemu_co_rwlock_upgrade blocks.
...now incorrect?
> /**
> diff --git a/util/qemu-coroutine-lock.c b/util/qemu-coroutine-lock.c
> index eb73cf11dc..655634d185 100644
> --- a/util/qemu-coroutine-lock.c
> +++ b/util/qemu-coroutine-lock.c
> @@ -327,11 +327,70 @@ void coroutine_fn qemu_co_mutex_unlock(CoMutex *mutex)
> trace_qemu_co_mutex_unlock_return(mutex, self);
> }
>
> +struct CoRwTicket {
> + bool read;
> + Coroutine *co;
> + QSIMPLEQ_ENTRY(CoRwTicket) next;
> +};
> +
> void qemu_co_rwlock_init(CoRwlock *lock)
> {
> - memset(lock, 0, sizeof(*lock));
> - qemu_co_queue_init(&lock->queue);
> qemu_co_mutex_init(&lock->mutex);
> + lock->owner = 0;
> + QSIMPLEQ_INIT(&lock->tickets);
> +}
> +
> +/* Releases the internal CoMutex. */
> +static void qemu_co_rwlock_maybe_wake_one(CoRwlock *lock)
> +{
> + CoRwTicket *tkt = QSIMPLEQ_FIRST(&lock->tickets);
> + Coroutine *co = NULL;
> +
> + /*
> + * Setting lock->owner here prevents rdlock and wrlock from
> + * sneaking in between unlock and wake.
> + */
> +
> + if (tkt) {
> + if (tkt->read) {
> + if (lock->owner >= 0) {
> + lock->owner++;
> + co = tkt->co;
> + }
> + } else {
> + if (lock->owner == 0) {
> + lock->owner = -1;
> + co = tkt->co;
> + }
> + }
> + }
> +
> + if (co) {
> + QSIMPLEQ_REMOVE_HEAD(&lock->tickets, next);
> + qemu_co_mutex_unlock(&lock->mutex);
> + aio_co_wake(co);
> + } else {
> + qemu_co_mutex_unlock(&lock->mutex);
> + }
> +}
> +
> +/* Releases the internal CoMutex. */
> +static void qemu_co_rwlock_sleep(bool read, CoRwlock *lock)
> +{
> + CoRwTicket my_ticket = { read, qemu_coroutine_self() };
> +
> + QSIMPLEQ_INSERT_TAIL(&lock->tickets, &my_ticket, next);
> + qemu_co_mutex_unlock(&lock->mutex);
> + qemu_coroutine_yield();
> +
> + if (read) {
> + /* Possibly wake another reader, which will wake the next in line. */
> + assert(lock->owner >= 1);
> + qemu_co_mutex_lock(&lock->mutex);
> + qemu_co_rwlock_maybe_wake_one(lock);
> + } else {
> + assert(lock->owner == -1);
> + }
> }
>
> void qemu_co_rwlock_rdlock(CoRwlock *lock)
> @@ -339,13 +398,13 @@ void qemu_co_rwlock_rdlock(CoRwlock *lock)
>
> qemu_co_mutex_lock(&lock->mutex);
> /* For fairness, wait if a writer is in line. */
> - while (lock->pending_writer) {
> - qemu_co_queue_wait(&lock->queue, &lock->mutex);
> + if (lock->owner == 0 || (lock->owner > 0 && QSIMPLEQ_EMPTY(&lock->tickets))) {
> + lock->owner++;
> + qemu_co_mutex_unlock(&lock->mutex);
> + } else {
> + qemu_co_rwlock_sleep(true, lock);
> }
> - lock->reader++;
> - qemu_co_mutex_unlock(&lock->mutex);
>
> - /* The rest of the read-side critical section is run without the mutex. */
> self->locks_held++;
> }
>
> @@ -355,69 +413,58 @@ void qemu_co_rwlock_unlock(CoRwlock *lock)
> Coroutine *self = qemu_coroutine_self();
>
> assert(qemu_in_coroutine());
> - if (!lock->reader) {
> - /* The critical section started in qemu_co_rwlock_wrlock. */
> - qemu_co_queue_restart_all(&lock->queue);
> + self->locks_held--;
> +
> + qemu_co_mutex_lock(&lock->mutex);
> + if (lock->owner == -1) {
> + lock->owner = 0;
> } else {
> - self->locks_held--;
> + lock->owner--;
> + }
>
> - qemu_co_mutex_lock(&lock->mutex);
> - lock->reader--;
> - assert(lock->reader >= 0);
> - /* Wakeup only one waiting writer */
> - if (!lock->reader) {
> - qemu_co_queue_next(&lock->queue);
> - }
> + if (lock->owner == 0) {
> + qemu_co_rwlock_maybe_wake_one(lock);
> + } else {
> + qemu_co_mutex_unlock(&lock->mutex);
> }
> - qemu_co_mutex_unlock(&lock->mutex);
> }
>
> void qemu_co_rwlock_downgrade(CoRwlock *lock)
> {
> - Coroutine *self = qemu_coroutine_self();
> -
> - /* lock->mutex critical section started in qemu_co_rwlock_wrlock or
> - * qemu_co_rwlock_upgrade.
> - */
> - assert(lock->reader == 0);
> - lock->reader++;
> - qemu_co_mutex_unlock(&lock->mutex);
> + qemu_co_mutex_lock(&lock->mutex);
> + assert(lock->owner == -1);
> + lock->owner = 1;
>
> - /* The rest of the read-side critical section is run without the mutex. */
> - self->locks_held++;
> + /* Possibly wake another reader, which will wake the next in line. */
> + qemu_co_rwlock_maybe_wake_one(lock);
> }
>
> void qemu_co_rwlock_wrlock(CoRwlock *lock)
> {
> + Coroutine *self = qemu_coroutine_self();
> +
> qemu_co_mutex_lock(&lock->mutex);
> - lock->pending_writer++;
> - while (lock->reader) {
> - qemu_co_queue_wait(&lock->queue, &lock->mutex);
> + if (lock->owner == 0) {
> + lock->owner = -1;
> + qemu_co_mutex_unlock(&lock->mutex);
> + } else {
> + qemu_co_rwlock_sleep(false, lock);
> }
> - lock->pending_writer--;
>
> - /* The rest of the write-side critical section is run with
> - * the mutex taken, so that lock->reader remains zero.
> - * There is no need to update self->locks_held.
> - */
> + self->locks_held++;
> }
>
> void qemu_co_rwlock_upgrade(CoRwlock *lock)
> {
> - Coroutine *self = qemu_coroutine_self();
> -
> qemu_co_mutex_lock(&lock->mutex);
> - assert(lock->reader > 0);
> - lock->reader--;
> - lock->pending_writer++;
> - while (lock->reader) {
> - qemu_co_queue_wait(&lock->queue, &lock->mutex);
> + assert(lock->owner > 0);
> + /* For fairness, wait if a writer is in line. */
> + if (lock->owner == 1 && QSIMPLEQ_EMPTY(&lock->tickets)) {
> + lock->owner = -1;
> + qemu_co_mutex_unlock(&lock->mutex);
> + } else {
> + lock->owner--;
> + qemu_co_rwlock_sleep(false, lock);
Doesn't this need something for the case where lock->owner hits 0?
If not, how is two readers both attempting to upgrade ever resolved?
It feels like it should jump into the second half of
qemu_co_rwlock_wrlock().
> }
> - lock->pending_writer--;
>
> - /* The rest of the write-side critical section is run with
> - * the mutex taken, similar to qemu_co_rwlock_wrlock. Do
> - * not account for the lock twice in self->locks_held.
> - */
> - self->locks_held--;
> }
> --
> 2.29.2
dme.
--
And you're standing here beside me, I love the passing of time.
On 17/03/21 11:40, David Edmondson wrote:
> Isn't this...
>
> * ... Also, @qemu_co_rwlock_upgrade
> * only overrides CoRwlock fairness if there are no concurrent readers, so
> * another writer might run while @qemu_co_rwlock_upgrade blocks.
>
> ...now incorrect?
Maybe, but for sure the comment was too hard to parse.
>>
>> + if (lock->owner == 1 && QSIMPLEQ_EMPTY(&lock->tickets)) {
>> + lock->owner = -1;
>> + qemu_co_mutex_unlock(&lock->mutex);
>> + } else {
>> + lock->owner--;
>> + qemu_co_rwlock_sleep(false, lock);
>
> Doesn't this need something for the case where lock->owner hits 0?
You're right, we need to call qemu_co_rwlock_maybe_wake_one if lock goes
to 0. The "else" branch would have to be
if (--lock->owner == 0) {
qemu_co_rwlock_maybe_wake_one(lock);
qemu_co_mutex_lock(&lock->mutex);
}
qemu_co_rwlock_sleep(false, lock);
In the end it's actually simpler to just inline qemu_co_rwlock_sleep,
which also leads to the following slightly more optimized code for the
"else" branch:
CoRwTicket my_ticket = { false, qemu_coroutine_self() };
lock->owner--;
QSIMPLEQ_INSERT_TAIL(&lock->tickets, &my_ticket, next);
qemu_co_rwlock_maybe_wake_one(lock);
qemu_coroutine_yield();
assert(lock->owner == -1);
I'll add a testcase, too. You don't even need two upgraders, for example:
rdlock
yield
wrlock
upgrade
<queued> <dequeued>
unlock
<dequeued>
unlock
In fact even for this simple case, the old implementation got it wrong!
(The new one also did, but the fix is easy).
There are a couple other improvements that can be made:
qemu_co_rwlock_unlock can also call qemu_co_rwlock_maybe_wake_one
unconditionally, the "if" around the call is unnecessary; and I'll
rename "owner" to "owners".
Anyway, there is nothing that really made you scream, so that's good.
Paolo
© 2016 - 2026 Red Hat, Inc.