The condition for waiting on the s->free_sema queue depends on
both s->in_flight and s->state. The latter is currently using
atomics, but this is quite dubious and probably wrong.
Because s->state is written in the main thread too, for example by
the yank callback, it cannot be protected by a CoMutex. Introduce a
separate lock that can be used by nbd_co_send_request(); later on this
lock will also be used for s->state. There will not be any contention
on the lock unless there is a yank or reconnect, so this is not
performance sensitive.
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
block/nbd.c | 46 +++++++++++++++++++++++++++-------------------
1 file changed, 27 insertions(+), 19 deletions(-)
diff --git a/block/nbd.c b/block/nbd.c
index 62dd338ef3..a2414566d1 100644
--- a/block/nbd.c
+++ b/block/nbd.c
@@ -71,17 +71,22 @@ typedef struct BDRVNBDState {
QIOChannel *ioc; /* The current I/O channel */
NBDExportInfo info;
- CoMutex send_mutex;
+ /*
+ * Protects free_sema, in_flight, requests[].coroutine,
+ * reconnect_delay_timer.
+ */
+ QemuMutex requests_lock;
CoQueue free_sema;
-
- CoMutex receive_mutex;
int in_flight;
+ NBDClientRequest requests[MAX_NBD_REQUESTS];
+ QEMUTimer *reconnect_delay_timer;
+
+ CoMutex send_mutex;
+ CoMutex receive_mutex;
NBDClientState state;
- QEMUTimer *reconnect_delay_timer;
QEMUTimer *open_timer;
- NBDClientRequest requests[MAX_NBD_REQUESTS];
NBDReply reply;
BlockDriverState *bs;
@@ -350,7 +355,7 @@ int coroutine_fn nbd_co_do_establish_connection(BlockDriverState *bs,
return 0;
}
-/* called under s->send_mutex */
+/* Called with s->requests_lock taken. */
static coroutine_fn void nbd_reconnect_attempt(BDRVNBDState *s)
{
bool blocking = nbd_client_connecting_wait(s);
@@ -382,9 +387,9 @@ static coroutine_fn void nbd_reconnect_attempt(BDRVNBDState *s)
s->ioc = NULL;
}
- qemu_co_mutex_unlock(&s->send_mutex);
+ qemu_mutex_unlock(&s->requests_lock);
nbd_co_do_establish_connection(s->bs, blocking, NULL);
- qemu_co_mutex_lock(&s->send_mutex);
+ qemu_mutex_lock(&s->requests_lock);
/*
* The reconnect attempt is done (maybe successfully, maybe not), so
@@ -466,11 +471,10 @@ static int coroutine_fn nbd_co_send_request(BlockDriverState *bs,
BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
int rc, i = -1;
- qemu_co_mutex_lock(&s->send_mutex);
-
+ qemu_mutex_lock(&s->requests_lock);
while (s->in_flight == MAX_NBD_REQUESTS ||
(!nbd_client_connected(s) && s->in_flight > 0)) {
- qemu_co_queue_wait(&s->free_sema, &s->send_mutex);
+ qemu_co_queue_wait(&s->free_sema, &s->requests_lock);
}
s->in_flight++;
@@ -491,13 +495,13 @@ static int coroutine_fn nbd_co_send_request(BlockDriverState *bs,
}
}
- g_assert(qemu_in_coroutine());
assert(i < MAX_NBD_REQUESTS);
-
s->requests[i].coroutine = qemu_coroutine_self();
s->requests[i].offset = request->from;
s->requests[i].receiving = false;
+ qemu_mutex_unlock(&s->requests_lock);
+ qemu_co_mutex_lock(&s->send_mutex);
request->handle = INDEX_TO_HANDLE(s, i);
assert(s->ioc);
@@ -517,17 +521,19 @@ static int coroutine_fn nbd_co_send_request(BlockDriverState *bs,
} else {
rc = nbd_send_request(s->ioc, request);
}
+ qemu_co_mutex_unlock(&s->send_mutex);
-err:
if (rc < 0) {
+ qemu_mutex_lock(&s->requests_lock);
+err:
nbd_channel_error(s, rc);
if (i != -1) {
s->requests[i].coroutine = NULL;
}
s->in_flight--;
qemu_co_queue_next(&s->free_sema);
+ qemu_mutex_unlock(&s->requests_lock);
}
- qemu_co_mutex_unlock(&s->send_mutex);
return rc;
}
@@ -1017,12 +1023,11 @@ static bool nbd_reply_chunk_iter_receive(BDRVNBDState *s,
return true;
break_loop:
+ qemu_mutex_lock(&s->requests_lock);
s->requests[HANDLE_TO_INDEX(s, handle)].coroutine = NULL;
-
- qemu_co_mutex_lock(&s->send_mutex);
s->in_flight--;
qemu_co_queue_next(&s->free_sema);
- qemu_co_mutex_unlock(&s->send_mutex);
+ qemu_mutex_unlock(&s->requests_lock);
return false;
}
@@ -1855,8 +1860,9 @@ static int nbd_open(BlockDriverState *bs, QDict *options, int flags,
BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
s->bs = bs;
- qemu_co_mutex_init(&s->send_mutex);
+ qemu_mutex_init(&s->requests_lock);
qemu_co_queue_init(&s->free_sema);
+ qemu_co_mutex_init(&s->send_mutex);
qemu_co_mutex_init(&s->receive_mutex);
if (!yank_register_instance(BLOCKDEV_YANK_INSTANCE(bs->node_name), errp)) {
@@ -2044,9 +2050,11 @@ static void nbd_cancel_in_flight(BlockDriverState *bs)
reconnect_delay_timer_del(s);
+ qemu_mutex_lock(&s->requests_lock);
if (s->state == NBD_CLIENT_CONNECTING_WAIT) {
s->state = NBD_CLIENT_CONNECTING_NOWAIT;
}
+ qemu_mutex_unlock(&s->requests_lock);
nbd_co_establish_connection_cancel(s->conn);
}
--
2.35.1
14.04.2022 20:57, Paolo Bonzini wrote:
> The condition for waiting on the s->free_sema queue depends on
> both s->in_flight and s->state. The latter is currently using
> atomics, but this is quite dubious and probably wrong.
>
> Because s->state is written in the main thread too, for example by
> the yank callback, it cannot be protected by a CoMutex. Introduce a
> separate lock that can be used by nbd_co_send_request(); later on this
> lock will also be used for s->state. There will not be any contention
> on the lock unless there is a yank or reconnect, so this is not
> performance sensitive.
>
> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
> ---
> block/nbd.c | 46 +++++++++++++++++++++++++++-------------------
> 1 file changed, 27 insertions(+), 19 deletions(-)
>
> diff --git a/block/nbd.c b/block/nbd.c
> index 62dd338ef3..a2414566d1 100644
> --- a/block/nbd.c
> +++ b/block/nbd.c
> @@ -71,17 +71,22 @@ typedef struct BDRVNBDState {
> QIOChannel *ioc; /* The current I/O channel */
> NBDExportInfo info;
>
> - CoMutex send_mutex;
> + /*
> + * Protects free_sema, in_flight, requests[].coroutine,
> + * reconnect_delay_timer.
> + */
requests[].coroutine is read without mutex in nbd_receive_replies
reconnect_delay_timer_del() is called without mutex in nbd_cancel_in_flight() and in reconnect_delay_timer_cb()..
Is it OK? Worth improving the comment?
> + QemuMutex requests_lock;
> CoQueue free_sema;
> -
> - CoMutex receive_mutex;
> int in_flight;
> + NBDClientRequest requests[MAX_NBD_REQUESTS];
> + QEMUTimer *reconnect_delay_timer;
> +
> + CoMutex send_mutex;
> + CoMutex receive_mutex;
> NBDClientState state;
>
> - QEMUTimer *reconnect_delay_timer;
> QEMUTimer *open_timer;
>
> - NBDClientRequest requests[MAX_NBD_REQUESTS];
> NBDReply reply;
> BlockDriverState *bs;
>
> @@ -350,7 +355,7 @@ int coroutine_fn nbd_co_do_establish_connection(BlockDriverState *bs,
> return 0;
> }
>
> -/* called under s->send_mutex */
> +/* Called with s->requests_lock taken. */
> static coroutine_fn void nbd_reconnect_attempt(BDRVNBDState *s)
> {
> bool blocking = nbd_client_connecting_wait(s);
> @@ -382,9 +387,9 @@ static coroutine_fn void nbd_reconnect_attempt(BDRVNBDState *s)
> s->ioc = NULL;
> }
>
> - qemu_co_mutex_unlock(&s->send_mutex);
> + qemu_mutex_unlock(&s->requests_lock);
> nbd_co_do_establish_connection(s->bs, blocking, NULL);
> - qemu_co_mutex_lock(&s->send_mutex);
> + qemu_mutex_lock(&s->requests_lock);
>
> /*
> * The reconnect attempt is done (maybe successfully, maybe not), so
> @@ -466,11 +471,10 @@ static int coroutine_fn nbd_co_send_request(BlockDriverState *bs,
> BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
> int rc, i = -1;
>
> - qemu_co_mutex_lock(&s->send_mutex);
> -
> + qemu_mutex_lock(&s->requests_lock);
> while (s->in_flight == MAX_NBD_REQUESTS ||
> (!nbd_client_connected(s) && s->in_flight > 0)) {
> - qemu_co_queue_wait(&s->free_sema, &s->send_mutex);
> + qemu_co_queue_wait(&s->free_sema, &s->requests_lock);
> }
>
> s->in_flight++;
> @@ -491,13 +495,13 @@ static int coroutine_fn nbd_co_send_request(BlockDriverState *bs,
Prior to this patch, if request sending fails, we'll not send further requests. After the patch, we can send more requests after failure on unlocking send_mutex.
May be that's not bad..
What's wrong if we keep send_mutex critical section as is and just lock requests_lock additionally inside send_mutex-critical-section?
> }
> }
>
> - g_assert(qemu_in_coroutine());
> assert(i < MAX_NBD_REQUESTS);
> -
> s->requests[i].coroutine = qemu_coroutine_self();
> s->requests[i].offset = request->from;
> s->requests[i].receiving = false;
> + qemu_mutex_unlock(&s->requests_lock);
>
> + qemu_co_mutex_lock(&s->send_mutex);
> request->handle = INDEX_TO_HANDLE(s, i);
>
> assert(s->ioc);
> @@ -517,17 +521,19 @@ static int coroutine_fn nbd_co_send_request(BlockDriverState *bs,
> } else {
> rc = nbd_send_request(s->ioc, request);
> }
> + qemu_co_mutex_unlock(&s->send_mutex);
>
> -err:
> if (rc < 0) {
> + qemu_mutex_lock(&s->requests_lock);
> +err:
> nbd_channel_error(s, rc);
> if (i != -1) {
> s->requests[i].coroutine = NULL;
> }
> s->in_flight--;
> qemu_co_queue_next(&s->free_sema);
> + qemu_mutex_unlock(&s->requests_lock);
> }
> - qemu_co_mutex_unlock(&s->send_mutex);
> return rc;
> }
>
> @@ -1017,12 +1023,11 @@ static bool nbd_reply_chunk_iter_receive(BDRVNBDState *s,
> return true;
>
> break_loop:
> + qemu_mutex_lock(&s->requests_lock);
> s->requests[HANDLE_TO_INDEX(s, handle)].coroutine = NULL;
> -
> - qemu_co_mutex_lock(&s->send_mutex);
> s->in_flight--;
> qemu_co_queue_next(&s->free_sema);
> - qemu_co_mutex_unlock(&s->send_mutex);
> + qemu_mutex_unlock(&s->requests_lock);
>
> return false;
> }
> @@ -1855,8 +1860,9 @@ static int nbd_open(BlockDriverState *bs, QDict *options, int flags,
> BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
>
> s->bs = bs;
> - qemu_co_mutex_init(&s->send_mutex);
> + qemu_mutex_init(&s->requests_lock);
> qemu_co_queue_init(&s->free_sema);
> + qemu_co_mutex_init(&s->send_mutex);
> qemu_co_mutex_init(&s->receive_mutex);
>
> if (!yank_register_instance(BLOCKDEV_YANK_INSTANCE(bs->node_name), errp)) {
> @@ -2044,9 +2050,11 @@ static void nbd_cancel_in_flight(BlockDriverState *bs)
>
> reconnect_delay_timer_del(s);
>
> + qemu_mutex_lock(&s->requests_lock);
> if (s->state == NBD_CLIENT_CONNECTING_WAIT) {
> s->state = NBD_CLIENT_CONNECTING_NOWAIT;
> }
> + qemu_mutex_unlock(&s->requests_lock);
>
> nbd_co_establish_connection_cancel(s->conn);
> }
--
Best regards,
Vladimir
On 4/16/22 14:18, Vladimir Sementsov-Ogievskiy wrote:
> 14.04.2022 20:57, Paolo Bonzini wrote:
>> The condition for waiting on the s->free_sema queue depends on
>> both s->in_flight and s->state. The latter is currently using
>> atomics, but this is quite dubious and probably wrong.
>>
>> Because s->state is written in the main thread too, for example by
>> the yank callback, it cannot be protected by a CoMutex. Introduce a
>> separate lock that can be used by nbd_co_send_request(); later on this
>> lock will also be used for s->state. There will not be any contention
>> on the lock unless there is a yank or reconnect, so this is not
>> performance sensitive.
>>
>> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
>> ---
>> block/nbd.c | 46 +++++++++++++++++++++++++++-------------------
>> 1 file changed, 27 insertions(+), 19 deletions(-)
>>
>> diff --git a/block/nbd.c b/block/nbd.c
>> index 62dd338ef3..a2414566d1 100644
>> --- a/block/nbd.c
>> +++ b/block/nbd.c
>> @@ -71,17 +71,22 @@ typedef struct BDRVNBDState {
>> QIOChannel *ioc; /* The current I/O channel */
>> NBDExportInfo info;
>> - CoMutex send_mutex;
>> + /*
>> + * Protects free_sema, in_flight, requests[].coroutine,
>> + * reconnect_delay_timer.
>> + */
>
> requests[].coroutine is read without mutex in nbd_receive_replies
>
> reconnect_delay_timer_del() is called without mutex in
> nbd_cancel_in_flight() and in reconnect_delay_timer_cb()..
>
> Is it OK? Worth improving the comment?
Yeah, let me check the reconnect_delay_timer idea you had in the
previous patch, too.
> Prior to this patch, if request sending fails, we'll not send further
> requests. After the patch, we can send more requests after failure on
> unlocking send_mutex.
>
> What's wrong if we keep send_mutex critical section as is and just lock
> requests_lock additionally inside send_mutex-critical-section?
I wanted to keep send_mutex just for the socket, basically. The cse you
point out exists but it is harmless.
Paolo
On Thu, Apr 14, 2022 at 07:57:52PM +0200, Paolo Bonzini wrote: > The condition for waiting on the s->free_sema queue depends on > both s->in_flight and s->state. The latter is currently using > atomics, but this is quite dubious and probably wrong. > > Because s->state is written in the main thread too, for example by > the yank callback, it cannot be protected by a CoMutex. Introduce a > separate lock that can be used by nbd_co_send_request(); later on this > lock will also be used for s->state. There will not be any contention > on the lock unless there is a yank or reconnect, so this is not > performance sensitive. > > Signed-off-by: Paolo Bonzini <pbonzini@redhat.com> > --- > block/nbd.c | 46 +++++++++++++++++++++++++++------------------- > 1 file changed, 27 insertions(+), 19 deletions(-) Reviewed-by: Eric Blake <eblake@redhat.com> -- Eric Blake, Principal Software Engineer Red Hat, Inc. +1-919-301-3266 Virtualization: qemu.org | libvirt.org
© 2016 - 2026 Red Hat, Inc.