14.04.2022 20:57, Paolo Bonzini wrote:
> requests[].receiving is set by nbd_receive_replies() under the receive_mutex;
> Read it under the same mutex as well. Waking up receivers on errors happens
> after each reply finishes processing, in nbd_co_receive_one_chunk().
> If there is no currently-active reply, there are two cases:
>
> * either there is no active request at all, in which case no
> element of request[] can have .receiving = true
>
> * or nbd_receive_replies() must be running and owns receive_mutex;
> in that case it will get back to nbd_co_receive_one_chunk() because
> the socket has been shutdown, and all waiting coroutines will wake up
> in turn.
>
Seems that removing "nbd_recv_coroutines_wake(s, true);" from nbd_channel_error_locked() could be a separate preparing patch. It's a separate thing:
no sense to wake all receving coroutines on error, if they will wake each-other in a chain anyway..
> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
Reviewed-by: Vladimir Sementsov-Ogievskiy <vsementsov@openvz.org>
> ---
> block/nbd.c | 15 +++++++--------
> 1 file changed, 7 insertions(+), 8 deletions(-)
>
> diff --git a/block/nbd.c b/block/nbd.c
> index b5aac2548c..31c684772e 100644
> --- a/block/nbd.c
> +++ b/block/nbd.c
> @@ -131,6 +131,7 @@ static void nbd_clear_bdrvstate(BlockDriverState *bs)
> s->x_dirty_bitmap = NULL;
> }
>
> +/* Called with s->receive_mutex taken. */
> static bool coroutine_fn nbd_recv_coroutine_wake_one(NBDClientRequest *req)
> {
> if (req->receiving) {
> @@ -142,12 +143,13 @@ static bool coroutine_fn nbd_recv_coroutine_wake_one(NBDClientRequest *req)
> return false;
> }
>
> -static void coroutine_fn nbd_recv_coroutines_wake(BDRVNBDState *s, bool all)
> +static void coroutine_fn nbd_recv_coroutines_wake(BDRVNBDState *s)
> {
> int i;
>
> + QEMU_LOCK_GUARD(&s->receive_mutex);
> for (i = 0; i < MAX_NBD_REQUESTS; i++) {
> - if (nbd_recv_coroutine_wake_one(&s->requests[i]) && !all) {
> + if (nbd_recv_coroutine_wake_one(&s->requests[i])) {
> return;
> }
> }
> @@ -168,8 +170,6 @@ static void coroutine_fn nbd_channel_error_locked(BDRVNBDState *s, int ret)
> } else {
> s->state = NBD_CLIENT_QUIT;
> }
> -
> - nbd_recv_coroutines_wake(s, true);
> }
>
> static void coroutine_fn nbd_channel_error(BDRVNBDState *s, int ret)
> @@ -432,11 +432,10 @@ static coroutine_fn int nbd_receive_replies(BDRVNBDState *s, uint64_t handle)
>
> qemu_coroutine_yield();
> /*
> - * We may be woken for 3 reasons:
> + * We may be woken for 2 reasons:
> * 1. From this function, executing in parallel coroutine, when our
> * handle is received.
> - * 2. From nbd_channel_error(), when connection is lost.
> - * 3. From nbd_co_receive_one_chunk(), when previous request is
> + * 2. From nbd_co_receive_one_chunk(), when previous request is
> * finished and s->reply.handle set to 0.
> * Anyway, it's OK to lock the mutex and go to the next iteration.
> */
> @@ -928,7 +927,7 @@ static coroutine_fn int nbd_co_receive_one_chunk(
> }
> s->reply.handle = 0;
>
> - nbd_recv_coroutines_wake(s, false);
> + nbd_recv_coroutines_wake(s);
>
> return ret;
> }
--
Best regards,
Vladimir