[PATCH v2 for-7.1 5/9] nbd: use a QemuMutex to synchronize yanking, reconnection and coroutines

Paolo Bonzini posted 9 patches 3 years, 10 months ago
Maintainers: Kevin Wolf <kwolf@redhat.com>, Hanna Reitz <hreitz@redhat.com>, Eric Blake <eblake@redhat.com>, Vladimir Sementsov-Ogievskiy <v.sementsov-og@mail.ru>
[PATCH v2 for-7.1 5/9] nbd: use a QemuMutex to synchronize yanking, reconnection and coroutines
Posted by Paolo Bonzini 3 years, 10 months ago
The condition for waiting on the s->free_sema queue depends on
both s->in_flight and s->state.  The latter is currently using
atomics, but this is quite dubious and probably wrong.

Because s->state is written in the main thread too, for example by
the yank callback, it cannot be protected by a CoMutex.  Introduce a
separate lock that can be used by nbd_co_send_request(); later on this
lock will also be used for s->state.  There will not be any contention
on the lock unless there is a yank or reconnect, so this is not
performance sensitive.

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
---
 block/nbd.c | 46 +++++++++++++++++++++++++++-------------------
 1 file changed, 27 insertions(+), 19 deletions(-)

diff --git a/block/nbd.c b/block/nbd.c
index 62dd338ef3..a2414566d1 100644
--- a/block/nbd.c
+++ b/block/nbd.c
@@ -71,17 +71,22 @@ typedef struct BDRVNBDState {
     QIOChannel *ioc; /* The current I/O channel */
     NBDExportInfo info;
 
-    CoMutex send_mutex;
+    /*
+     * Protects free_sema, in_flight, requests[].coroutine,
+     * reconnect_delay_timer.
+     */
+    QemuMutex requests_lock;
     CoQueue free_sema;
-
-    CoMutex receive_mutex;
     int in_flight;
+    NBDClientRequest requests[MAX_NBD_REQUESTS];
+    QEMUTimer *reconnect_delay_timer;
+
+    CoMutex send_mutex;
+    CoMutex receive_mutex;
     NBDClientState state;
 
-    QEMUTimer *reconnect_delay_timer;
     QEMUTimer *open_timer;
 
-    NBDClientRequest requests[MAX_NBD_REQUESTS];
     NBDReply reply;
     BlockDriverState *bs;
 
@@ -350,7 +355,7 @@ int coroutine_fn nbd_co_do_establish_connection(BlockDriverState *bs,
     return 0;
 }
 
-/* called under s->send_mutex */
+/* Called with s->requests_lock taken.  */
 static coroutine_fn void nbd_reconnect_attempt(BDRVNBDState *s)
 {
     bool blocking = nbd_client_connecting_wait(s);
@@ -382,9 +387,9 @@ static coroutine_fn void nbd_reconnect_attempt(BDRVNBDState *s)
         s->ioc = NULL;
     }
 
-    qemu_co_mutex_unlock(&s->send_mutex);
+    qemu_mutex_unlock(&s->requests_lock);
     nbd_co_do_establish_connection(s->bs, blocking, NULL);
-    qemu_co_mutex_lock(&s->send_mutex);
+    qemu_mutex_lock(&s->requests_lock);
 
     /*
      * The reconnect attempt is done (maybe successfully, maybe not), so
@@ -466,11 +471,10 @@ static int coroutine_fn nbd_co_send_request(BlockDriverState *bs,
     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
     int rc, i = -1;
 
-    qemu_co_mutex_lock(&s->send_mutex);
-
+    qemu_mutex_lock(&s->requests_lock);
     while (s->in_flight == MAX_NBD_REQUESTS ||
            (!nbd_client_connected(s) && s->in_flight > 0)) {
-        qemu_co_queue_wait(&s->free_sema, &s->send_mutex);
+        qemu_co_queue_wait(&s->free_sema, &s->requests_lock);
     }
 
     s->in_flight++;
@@ -491,13 +495,13 @@ static int coroutine_fn nbd_co_send_request(BlockDriverState *bs,
         }
     }
 
-    g_assert(qemu_in_coroutine());
     assert(i < MAX_NBD_REQUESTS);
-
     s->requests[i].coroutine = qemu_coroutine_self();
     s->requests[i].offset = request->from;
     s->requests[i].receiving = false;
+    qemu_mutex_unlock(&s->requests_lock);
 
+    qemu_co_mutex_lock(&s->send_mutex);
     request->handle = INDEX_TO_HANDLE(s, i);
 
     assert(s->ioc);
@@ -517,17 +521,19 @@ static int coroutine_fn nbd_co_send_request(BlockDriverState *bs,
     } else {
         rc = nbd_send_request(s->ioc, request);
     }
+    qemu_co_mutex_unlock(&s->send_mutex);
 
-err:
     if (rc < 0) {
+        qemu_mutex_lock(&s->requests_lock);
+err:
         nbd_channel_error(s, rc);
         if (i != -1) {
             s->requests[i].coroutine = NULL;
         }
         s->in_flight--;
         qemu_co_queue_next(&s->free_sema);
+        qemu_mutex_unlock(&s->requests_lock);
     }
-    qemu_co_mutex_unlock(&s->send_mutex);
     return rc;
 }
 
@@ -1017,12 +1023,11 @@ static bool nbd_reply_chunk_iter_receive(BDRVNBDState *s,
     return true;
 
 break_loop:
+    qemu_mutex_lock(&s->requests_lock);
     s->requests[HANDLE_TO_INDEX(s, handle)].coroutine = NULL;
-
-    qemu_co_mutex_lock(&s->send_mutex);
     s->in_flight--;
     qemu_co_queue_next(&s->free_sema);
-    qemu_co_mutex_unlock(&s->send_mutex);
+    qemu_mutex_unlock(&s->requests_lock);
 
     return false;
 }
@@ -1855,8 +1860,9 @@ static int nbd_open(BlockDriverState *bs, QDict *options, int flags,
     BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
 
     s->bs = bs;
-    qemu_co_mutex_init(&s->send_mutex);
+    qemu_mutex_init(&s->requests_lock);
     qemu_co_queue_init(&s->free_sema);
+    qemu_co_mutex_init(&s->send_mutex);
     qemu_co_mutex_init(&s->receive_mutex);
 
     if (!yank_register_instance(BLOCKDEV_YANK_INSTANCE(bs->node_name), errp)) {
@@ -2044,9 +2050,11 @@ static void nbd_cancel_in_flight(BlockDriverState *bs)
 
     reconnect_delay_timer_del(s);
 
+    qemu_mutex_lock(&s->requests_lock);
     if (s->state == NBD_CLIENT_CONNECTING_WAIT) {
         s->state = NBD_CLIENT_CONNECTING_NOWAIT;
     }
+    qemu_mutex_unlock(&s->requests_lock);
 
     nbd_co_establish_connection_cancel(s->conn);
 }
-- 
2.35.1
Re: [PATCH v2 for-7.1 5/9] nbd: use a QemuMutex to synchronize yanking, reconnection and coroutines
Posted by Vladimir Sementsov-Ogievskiy 3 years, 9 months ago
14.04.2022 20:57, Paolo Bonzini wrote:
> The condition for waiting on the s->free_sema queue depends on
> both s->in_flight and s->state.  The latter is currently using
> atomics, but this is quite dubious and probably wrong.
> 
> Because s->state is written in the main thread too, for example by
> the yank callback, it cannot be protected by a CoMutex.  Introduce a
> separate lock that can be used by nbd_co_send_request(); later on this
> lock will also be used for s->state.  There will not be any contention
> on the lock unless there is a yank or reconnect, so this is not
> performance sensitive.
> 
> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
> ---
>   block/nbd.c | 46 +++++++++++++++++++++++++++-------------------
>   1 file changed, 27 insertions(+), 19 deletions(-)
> 
> diff --git a/block/nbd.c b/block/nbd.c
> index 62dd338ef3..a2414566d1 100644
> --- a/block/nbd.c
> +++ b/block/nbd.c
> @@ -71,17 +71,22 @@ typedef struct BDRVNBDState {
>       QIOChannel *ioc; /* The current I/O channel */
>       NBDExportInfo info;
>   
> -    CoMutex send_mutex;
> +    /*
> +     * Protects free_sema, in_flight, requests[].coroutine,
> +     * reconnect_delay_timer.
> +     */

requests[].coroutine is read without mutex in nbd_receive_replies

reconnect_delay_timer_del() is called without mutex in nbd_cancel_in_flight() and in reconnect_delay_timer_cb()..

Is it OK? Worth improving the comment?

> +    QemuMutex requests_lock;
>       CoQueue free_sema;
> -
> -    CoMutex receive_mutex;
>       int in_flight;
> +    NBDClientRequest requests[MAX_NBD_REQUESTS];
> +    QEMUTimer *reconnect_delay_timer;
> +
> +    CoMutex send_mutex;
> +    CoMutex receive_mutex;
>       NBDClientState state;
>   
> -    QEMUTimer *reconnect_delay_timer;
>       QEMUTimer *open_timer;
>   
> -    NBDClientRequest requests[MAX_NBD_REQUESTS];
>       NBDReply reply;
>       BlockDriverState *bs;
>   
> @@ -350,7 +355,7 @@ int coroutine_fn nbd_co_do_establish_connection(BlockDriverState *bs,
>       return 0;
>   }
>   
> -/* called under s->send_mutex */
> +/* Called with s->requests_lock taken.  */
>   static coroutine_fn void nbd_reconnect_attempt(BDRVNBDState *s)
>   {
>       bool blocking = nbd_client_connecting_wait(s);
> @@ -382,9 +387,9 @@ static coroutine_fn void nbd_reconnect_attempt(BDRVNBDState *s)
>           s->ioc = NULL;
>       }
>   
> -    qemu_co_mutex_unlock(&s->send_mutex);
> +    qemu_mutex_unlock(&s->requests_lock);
>       nbd_co_do_establish_connection(s->bs, blocking, NULL);
> -    qemu_co_mutex_lock(&s->send_mutex);
> +    qemu_mutex_lock(&s->requests_lock);
>   
>       /*
>        * The reconnect attempt is done (maybe successfully, maybe not), so
> @@ -466,11 +471,10 @@ static int coroutine_fn nbd_co_send_request(BlockDriverState *bs,
>       BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
>       int rc, i = -1;
>   
> -    qemu_co_mutex_lock(&s->send_mutex);
> -
> +    qemu_mutex_lock(&s->requests_lock);
>       while (s->in_flight == MAX_NBD_REQUESTS ||
>              (!nbd_client_connected(s) && s->in_flight > 0)) {
> -        qemu_co_queue_wait(&s->free_sema, &s->send_mutex);
> +        qemu_co_queue_wait(&s->free_sema, &s->requests_lock);
>       }
>   
>       s->in_flight++;
> @@ -491,13 +495,13 @@ static int coroutine_fn nbd_co_send_request(BlockDriverState *bs,

Prior to this patch, if request sending fails, we'll not send further requests. After the patch, we can send more requests after failure on unlocking send_mutex.

May be that's not bad..

What's wrong if we keep send_mutex critical section as is and just lock requests_lock additionally inside send_mutex-critical-section?


>           }
>       }
>   
> -    g_assert(qemu_in_coroutine());
>       assert(i < MAX_NBD_REQUESTS);
> -
>       s->requests[i].coroutine = qemu_coroutine_self();
>       s->requests[i].offset = request->from;
>       s->requests[i].receiving = false;
> +    qemu_mutex_unlock(&s->requests_lock);
>   
> +    qemu_co_mutex_lock(&s->send_mutex);
>       request->handle = INDEX_TO_HANDLE(s, i);
>   
>       assert(s->ioc);
> @@ -517,17 +521,19 @@ static int coroutine_fn nbd_co_send_request(BlockDriverState *bs,
>       } else {
>           rc = nbd_send_request(s->ioc, request);
>       }
> +    qemu_co_mutex_unlock(&s->send_mutex);
>   
> -err:
>       if (rc < 0) {
> +        qemu_mutex_lock(&s->requests_lock);
> +err:
>           nbd_channel_error(s, rc);
>           if (i != -1) {
>               s->requests[i].coroutine = NULL;
>           }
>           s->in_flight--;
>           qemu_co_queue_next(&s->free_sema);
> +        qemu_mutex_unlock(&s->requests_lock);
>       }
> -    qemu_co_mutex_unlock(&s->send_mutex);
>       return rc;
>   }
>   
> @@ -1017,12 +1023,11 @@ static bool nbd_reply_chunk_iter_receive(BDRVNBDState *s,
>       return true;
>   
>   break_loop:
> +    qemu_mutex_lock(&s->requests_lock);
>       s->requests[HANDLE_TO_INDEX(s, handle)].coroutine = NULL;
> -
> -    qemu_co_mutex_lock(&s->send_mutex);
>       s->in_flight--;
>       qemu_co_queue_next(&s->free_sema);
> -    qemu_co_mutex_unlock(&s->send_mutex);
> +    qemu_mutex_unlock(&s->requests_lock);
>   
>       return false;
>   }
> @@ -1855,8 +1860,9 @@ static int nbd_open(BlockDriverState *bs, QDict *options, int flags,
>       BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
>   
>       s->bs = bs;
> -    qemu_co_mutex_init(&s->send_mutex);
> +    qemu_mutex_init(&s->requests_lock);
>       qemu_co_queue_init(&s->free_sema);
> +    qemu_co_mutex_init(&s->send_mutex);
>       qemu_co_mutex_init(&s->receive_mutex);
>   
>       if (!yank_register_instance(BLOCKDEV_YANK_INSTANCE(bs->node_name), errp)) {
> @@ -2044,9 +2050,11 @@ static void nbd_cancel_in_flight(BlockDriverState *bs)
>   
>       reconnect_delay_timer_del(s);
>   
> +    qemu_mutex_lock(&s->requests_lock);
>       if (s->state == NBD_CLIENT_CONNECTING_WAIT) {
>           s->state = NBD_CLIENT_CONNECTING_NOWAIT;
>       }
> +    qemu_mutex_unlock(&s->requests_lock);
>   
>       nbd_co_establish_connection_cancel(s->conn);
>   }


-- 
Best regards,
Vladimir
Re: [PATCH v2 for-7.1 5/9] nbd: use a QemuMutex to synchronize yanking, reconnection and coroutines
Posted by Paolo Bonzini 3 years, 9 months ago
On 4/16/22 14:18, Vladimir Sementsov-Ogievskiy wrote:
> 14.04.2022 20:57, Paolo Bonzini wrote:
>> The condition for waiting on the s->free_sema queue depends on
>> both s->in_flight and s->state.  The latter is currently using
>> atomics, but this is quite dubious and probably wrong.
>>
>> Because s->state is written in the main thread too, for example by
>> the yank callback, it cannot be protected by a CoMutex.  Introduce a
>> separate lock that can be used by nbd_co_send_request(); later on this
>> lock will also be used for s->state.  There will not be any contention
>> on the lock unless there is a yank or reconnect, so this is not
>> performance sensitive.
>>
>> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
>> ---
>>   block/nbd.c | 46 +++++++++++++++++++++++++++-------------------
>>   1 file changed, 27 insertions(+), 19 deletions(-)
>>
>> diff --git a/block/nbd.c b/block/nbd.c
>> index 62dd338ef3..a2414566d1 100644
>> --- a/block/nbd.c
>> +++ b/block/nbd.c
>> @@ -71,17 +71,22 @@ typedef struct BDRVNBDState {
>>       QIOChannel *ioc; /* The current I/O channel */
>>       NBDExportInfo info;
>> -    CoMutex send_mutex;
>> +    /*
>> +     * Protects free_sema, in_flight, requests[].coroutine,
>> +     * reconnect_delay_timer.
>> +     */
> 
> requests[].coroutine is read without mutex in nbd_receive_replies
> 
> reconnect_delay_timer_del() is called without mutex in 
> nbd_cancel_in_flight() and in reconnect_delay_timer_cb()..
> 
> Is it OK? Worth improving the comment?

Yeah, let me check the reconnect_delay_timer idea you had in the 
previous patch, too.

> Prior to this patch, if request sending fails, we'll not send further 
> requests. After the patch, we can send more requests after failure on 
> unlocking send_mutex.
>
> What's wrong if we keep send_mutex critical section as is and just lock 
> requests_lock additionally inside send_mutex-critical-section?

I wanted to keep send_mutex just for the socket, basically.  The cse you 
point out exists but it is harmless.

Paolo

Re: [PATCH v2 for-7.1 5/9] nbd: use a QemuMutex to synchronize yanking, reconnection and coroutines
Posted by Eric Blake 3 years, 10 months ago
On Thu, Apr 14, 2022 at 07:57:52PM +0200, Paolo Bonzini wrote:
> The condition for waiting on the s->free_sema queue depends on
> both s->in_flight and s->state.  The latter is currently using
> atomics, but this is quite dubious and probably wrong.
> 
> Because s->state is written in the main thread too, for example by
> the yank callback, it cannot be protected by a CoMutex.  Introduce a
> separate lock that can be used by nbd_co_send_request(); later on this
> lock will also be used for s->state.  There will not be any contention
> on the lock unless there is a yank or reconnect, so this is not
> performance sensitive.
> 
> Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
> ---
>  block/nbd.c | 46 +++++++++++++++++++++++++++-------------------
>  1 file changed, 27 insertions(+), 19 deletions(-)

Reviewed-by: Eric Blake <eblake@redhat.com>

-- 
Eric Blake, Principal Software Engineer
Red Hat, Inc.           +1-919-301-3266
Virtualization:  qemu.org | libvirt.org