An invariant of the current rwlock is that if multiple coroutines hold a
reader lock, all must be runnable. The unlock implementation relies on
this, choosing to wake a single coroutine when the final read lock
holder exits the critical section, assuming that it will wake a
coroutine attempting to acquire a write lock.
The downgrade implementation violates this assumption by creating a
read lock owning coroutine that is exclusively runnable - any other
coroutines that are waiting to acquire a read lock are *not* made
runnable when the write lock holder converts its ownership to read
only.
More in general, the old implementation had lots of other fairness bugs.
The root cause of the bugs was that CoQueue would wake up readers even
if there were pending writers, and would wake up writers even if there
were readers. In that case, the coroutine would go back to sleep *at
the end* of the CoQueue, losing its place at the head of the line.
To fix this, keep the queue of waiters explicitly in the CoRwlock
instead of using CoQueue, and store for each whether it is a
potential reader or a writer. This way, downgrade can look at the
first queued coroutines and wake it only if it is a reader, causing
all other readers in line to be released in turn.
Reported-by: David Edmondson <david.edmondson@oracle.com>
Reviewed-by: David Edmondson <david.edmondson@oracle.com>
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
v3->v4: clean up the code and fix upgrade logic. Fix upgrade comment too.
include/qemu/coroutine.h | 17 +++--
util/qemu-coroutine-lock.c | 148 ++++++++++++++++++++++++-------------
2 files changed, 106 insertions(+), 59 deletions(-)
diff --git a/include/qemu/coroutine.h b/include/qemu/coroutine.h
index 84eab6e3bf..7919d3bb62 100644
--- a/include/qemu/coroutine.h
+++ b/include/qemu/coroutine.h
@@ -237,11 +237,15 @@ bool qemu_co_enter_next_impl(CoQueue *queue, QemuLockable *lock);
bool qemu_co_queue_empty(CoQueue *queue);
+typedef struct CoRwTicket CoRwTicket;
typedef struct CoRwlock {
- int pending_writer;
- int reader;
CoMutex mutex;
- CoQueue queue;
+
+ /* Number of readers, or -1 if owned for writing. */
+ int owners;
+
+ /* Waiting coroutines. */
+ QSIMPLEQ_HEAD(, CoRwTicket) tickets;
} CoRwlock;
/**
@@ -260,10 +264,9 @@ void qemu_co_rwlock_rdlock(CoRwlock *lock);
/**
* Write Locks the CoRwlock from a reader. This is a bit more efficient than
* @qemu_co_rwlock_unlock followed by a separate @qemu_co_rwlock_wrlock.
- * However, if the lock cannot be upgraded immediately, control is transferred
- * to the caller of the current coroutine. Also, @qemu_co_rwlock_upgrade
- * only overrides CoRwlock fairness if there are no concurrent readers, so
- * another writer might run while @qemu_co_rwlock_upgrade blocks.
+ * Note that if the lock cannot be upgraded immediately, control is transferred
+ * to the caller of the current coroutine; another writer might run while
+ * @qemu_co_rwlock_upgrade blocks.
*/
void qemu_co_rwlock_upgrade(CoRwlock *lock);
diff --git a/util/qemu-coroutine-lock.c b/util/qemu-coroutine-lock.c
index eb73cf11dc..2669403839 100644
--- a/util/qemu-coroutine-lock.c
+++ b/util/qemu-coroutine-lock.c
@@ -327,11 +327,51 @@ void coroutine_fn qemu_co_mutex_unlock(CoMutex *mutex)
trace_qemu_co_mutex_unlock_return(mutex, self);
}
+struct CoRwTicket {
+ bool read;
+ Coroutine *co;
+ QSIMPLEQ_ENTRY(CoRwTicket) next;
+};
+
void qemu_co_rwlock_init(CoRwlock *lock)
{
- memset(lock, 0, sizeof(*lock));
- qemu_co_queue_init(&lock->queue);
qemu_co_mutex_init(&lock->mutex);
+ lock->owners = 0;
+ QSIMPLEQ_INIT(&lock->tickets);
+}
+
+/* Releases the internal CoMutex. */
+static void qemu_co_rwlock_maybe_wake_one(CoRwlock *lock)
+{
+ CoRwTicket *tkt = QSIMPLEQ_FIRST(&lock->tickets);
+ Coroutine *co = NULL;
+
+ /*
+ * Setting lock->owners here prevents rdlock and wrlock from
+ * sneaking in between unlock and wake.
+ */
+
+ if (tkt) {
+ if (tkt->read) {
+ if (lock->owners >= 0) {
+ lock->owners++;
+ co = tkt->co;
+ }
+ } else {
+ if (lock->owners == 0) {
+ lock->owners = -1;
+ co = tkt->co;
+ }
+ }
+ }
+
+ if (co) {
+ QSIMPLEQ_REMOVE_HEAD(&lock->tickets, next);
+ qemu_co_mutex_unlock(&lock->mutex);
+ aio_co_wake(co);
+ } else {
+ qemu_co_mutex_unlock(&lock->mutex);
+ }
}
void qemu_co_rwlock_rdlock(CoRwlock *lock)
@@ -340,13 +380,22 @@ void qemu_co_rwlock_rdlock(CoRwlock *lock)
qemu_co_mutex_lock(&lock->mutex);
/* For fairness, wait if a writer is in line. */
- while (lock->pending_writer) {
- qemu_co_queue_wait(&lock->queue, &lock->mutex);
+ if (lock->owners == 0 || (lock->owners > 0 && QSIMPLEQ_EMPTY(&lock->tickets))) {
+ lock->owners++;
+ qemu_co_mutex_unlock(&lock->mutex);
+ } else {
+ CoRwTicket my_ticket = { true, self };
+
+ QSIMPLEQ_INSERT_TAIL(&lock->tickets, &my_ticket, next);
+ qemu_co_mutex_unlock(&lock->mutex);
+ qemu_coroutine_yield();
+ assert(lock->owners >= 1);
+
+ /* Possibly wake another reader, which will wake the next in line. */
+ qemu_co_mutex_lock(&lock->mutex);
+ qemu_co_rwlock_maybe_wake_one(lock);
}
- lock->reader++;
- qemu_co_mutex_unlock(&lock->mutex);
- /* The rest of the read-side critical section is run without the mutex. */
self->locks_held++;
}
@@ -355,69 +404,64 @@ void qemu_co_rwlock_unlock(CoRwlock *lock)
Coroutine *self = qemu_coroutine_self();
assert(qemu_in_coroutine());
- if (!lock->reader) {
- /* The critical section started in qemu_co_rwlock_wrlock. */
- qemu_co_queue_restart_all(&lock->queue);
- } else {
- self->locks_held--;
+ self->locks_held--;
- qemu_co_mutex_lock(&lock->mutex);
- lock->reader--;
- assert(lock->reader >= 0);
- /* Wakeup only one waiting writer */
- if (!lock->reader) {
- qemu_co_queue_next(&lock->queue);
- }
+ qemu_co_mutex_lock(&lock->mutex);
+ if (lock->owners > 0) {
+ lock->owners--;
+ } else {
+ assert(lock->owners == -1);
+ lock->owners = 0;
}
- qemu_co_mutex_unlock(&lock->mutex);
+
+ qemu_co_rwlock_maybe_wake_one(lock);
}
void qemu_co_rwlock_downgrade(CoRwlock *lock)
{
- Coroutine *self = qemu_coroutine_self();
-
- /* lock->mutex critical section started in qemu_co_rwlock_wrlock or
- * qemu_co_rwlock_upgrade.
- */
- assert(lock->reader == 0);
- lock->reader++;
- qemu_co_mutex_unlock(&lock->mutex);
+ qemu_co_mutex_lock(&lock->mutex);
+ assert(lock->owners == -1);
+ lock->owners = 1;
- /* The rest of the read-side critical section is run without the mutex. */
- self->locks_held++;
+ /* Possibly wake another reader, which will wake the next in line. */
+ qemu_co_rwlock_maybe_wake_one(lock);
}
void qemu_co_rwlock_wrlock(CoRwlock *lock)
{
+ Coroutine *self = qemu_coroutine_self();
+
qemu_co_mutex_lock(&lock->mutex);
- lock->pending_writer++;
- while (lock->reader) {
- qemu_co_queue_wait(&lock->queue, &lock->mutex);
+ if (lock->owners == 0) {
+ lock->owners = -1;
+ qemu_co_mutex_unlock(&lock->mutex);
+ } else {
+ CoRwTicket my_ticket = { false, qemu_coroutine_self() };
+
+ QSIMPLEQ_INSERT_TAIL(&lock->tickets, &my_ticket, next);
+ qemu_co_mutex_unlock(&lock->mutex);
+ qemu_coroutine_yield();
+ assert(lock->owners == -1);
}
- lock->pending_writer--;
- /* The rest of the write-side critical section is run with
- * the mutex taken, so that lock->reader remains zero.
- * There is no need to update self->locks_held.
- */
+ self->locks_held++;
}
void qemu_co_rwlock_upgrade(CoRwlock *lock)
{
- Coroutine *self = qemu_coroutine_self();
-
qemu_co_mutex_lock(&lock->mutex);
- assert(lock->reader > 0);
- lock->reader--;
- lock->pending_writer++;
- while (lock->reader) {
- qemu_co_queue_wait(&lock->queue, &lock->mutex);
- }
- lock->pending_writer--;
+ assert(lock->owners > 0);
+ /* For fairness, wait if a writer is in line. */
+ if (lock->owners == 1 && QSIMPLEQ_EMPTY(&lock->tickets)) {
+ lock->owners = -1;
+ qemu_co_mutex_unlock(&lock->mutex);
+ } else {
+ CoRwTicket my_ticket = { false, qemu_coroutine_self() };
- /* The rest of the write-side critical section is run with
- * the mutex taken, similar to qemu_co_rwlock_wrlock. Do
- * not account for the lock twice in self->locks_held.
- */
- self->locks_held--;
+ lock->owners--;
+ QSIMPLEQ_INSERT_TAIL(&lock->tickets, &my_ticket, next);
+ qemu_co_rwlock_maybe_wake_one(lock);
+ qemu_coroutine_yield();
+ assert(lock->owners == -1);
+ }
}
--
2.29.2
On Wed, Mar 17, 2021 at 07:00:11PM +0100, Paolo Bonzini wrote:
> +static void qemu_co_rwlock_maybe_wake_one(CoRwlock *lock)
> +{
> + CoRwTicket *tkt = QSIMPLEQ_FIRST(&lock->tickets);
> + Coroutine *co = NULL;
> +
> + /*
> + * Setting lock->owners here prevents rdlock and wrlock from
> + * sneaking in between unlock and wake.
> + */
> +
> + if (tkt) {
> + if (tkt->read) {
> + if (lock->owners >= 0) {
> + lock->owners++;
> + co = tkt->co;
> + }
> + } else {
> + if (lock->owners == 0) {
> + lock->owners = -1;
> + co = tkt->co;
> + }
> + }
> + }
> +
> + if (co) {
> + QSIMPLEQ_REMOVE_HEAD(&lock->tickets, next);
> + qemu_co_mutex_unlock(&lock->mutex);
> + aio_co_wake(co);
I find it hard to reason about QSIMPLEQ_EMPTY(&lock->tickets) callers
that execute before co is entered. They see an empty queue even though a
coroutine is about to run. Updating owners above ensures that the code
correctly tracks the state of the rwlock, but I'm not 100% confident
about this aspect of the code.
> void qemu_co_rwlock_upgrade(CoRwlock *lock)
> {
> - Coroutine *self = qemu_coroutine_self();
> -
> qemu_co_mutex_lock(&lock->mutex);
> - assert(lock->reader > 0);
> - lock->reader--;
> - lock->pending_writer++;
> - while (lock->reader) {
> - qemu_co_queue_wait(&lock->queue, &lock->mutex);
> - }
> - lock->pending_writer--;
> + assert(lock->owners > 0);
> + /* For fairness, wait if a writer is in line. */
> + if (lock->owners == 1 && QSIMPLEQ_EMPTY(&lock->tickets)) {
> + lock->owners = -1;
> + qemu_co_mutex_unlock(&lock->mutex);
> + } else {
> + CoRwTicket my_ticket = { false, qemu_coroutine_self() };
>
> - /* The rest of the write-side critical section is run with
> - * the mutex taken, similar to qemu_co_rwlock_wrlock. Do
> - * not account for the lock twice in self->locks_held.
> - */
> - self->locks_held--;
> + lock->owners--;
> + QSIMPLEQ_INSERT_TAIL(&lock->tickets, &my_ticket, next);
> + qemu_co_rwlock_maybe_wake_one(lock);
> + qemu_coroutine_yield();
> + assert(lock->owners == -1);
lock->owners is read outside lock->mutex here. Not sure if this can
cause problems.
> + }
> }
locks_held is kept unchanged across qemu_coroutine_yield() even though
the read lock has been released. rdlock() and wrlock() only increment
locks_held after acquiring the rwlock.
In practice I don't think it matters, but it seems inconsistent. If
locks_held is supposed to track tickets (not just coroutines currently
holding a lock), then rdlock() and wrlock() should increment before
yielding.
On 24/03/21 17:15, Stefan Hajnoczi wrote:
> On Wed, Mar 17, 2021 at 07:00:11PM +0100, Paolo Bonzini wrote:
>> +static void qemu_co_rwlock_maybe_wake_one(CoRwlock *lock)
>> +{
>> + CoRwTicket *tkt = QSIMPLEQ_FIRST(&lock->tickets);
>> + Coroutine *co = NULL;
>> +
>> + /*
>> + * Setting lock->owners here prevents rdlock and wrlock from
>> + * sneaking in between unlock and wake.
>> + */
>> +
>> + if (tkt) {
>> + if (tkt->read) {
>> + if (lock->owners >= 0) {
>> + lock->owners++;
>> + co = tkt->co;
>> + }
>> + } else {
>> + if (lock->owners == 0) {
>> + lock->owners = -1;
>> + co = tkt->co;
>> + }
>> + }
>> + }
>> +
>> + if (co) {
>> + QSIMPLEQ_REMOVE_HEAD(&lock->tickets, next);
>> + qemu_co_mutex_unlock(&lock->mutex);
>> + aio_co_wake(co);
>
> I find it hard to reason about QSIMPLEQ_EMPTY(&lock->tickets) callers
> that execute before co is entered. They see an empty queue even though a
> coroutine is about to run. Updating owners above ensures that the code
> correctly tracks the state of the rwlock, but I'm not 100% confident
> about this aspect of the code.
Good point. The invariant when lock->mutex is released should be
clarified; a better way to phrase the comment above "if (tkt)" is:
The invariant when lock->mutex is released is that every ticket is
tracked in either lock->owners or lock->tickets. By updating
lock->owners here, rdlock/wrlock/upgrade will block even if they execute
between qemu_co_mutex_unlock and aio_co_wake.
>> - self->locks_held--;
>> + lock->owners--;
>> + QSIMPLEQ_INSERT_TAIL(&lock->tickets, &my_ticket, next);
>> + qemu_co_rwlock_maybe_wake_one(lock);
>> + qemu_coroutine_yield();
>> + assert(lock->owners == -1);
>
> lock->owners is read outside lock->mutex here. Not sure if this can
> cause problems.
True. It is okay though because lock->owners cannot change until this
coroutine unlocks. A worse occurrence of the issue is in rdlock:
assert(lock->owners >= 1);
/* Possibly wake another reader, which will wake the next in
line. */
qemu_co_mutex_lock(&lock->mutex);
where the assert should be moved after taking the lock, or possibly
changed to use qatomic_read. (I prefer the former).
> locks_held is kept unchanged across qemu_coroutine_yield() even though
> the read lock has been released. rdlock() and wrlock() only increment
> locks_held after acquiring the rwlock.
>
> In practice I don't think it matters, but it seems inconsistent. If
> locks_held is supposed to track tickets (not just coroutines currently
> holding a lock), then rdlock() and wrlock() should increment before
> yielding.
locks_held (unlike owners) is not part of the lock, it's part of the
Coroutine and only used for debugging (asserting that terminating
coroutines are not holding any lock).
Paolo
© 2016 - 2025 Red Hat, Inc.