From nobody Sun May  4 00:03:41 2025
Delivered-To: importer@patchew.org
Received-SPF: pass (zoho.com: domain of gnu.org designates 208.118.235.17 as
 permitted sender) client-ip=208.118.235.17;
 envelope-from=qemu-devel-bounces+importer=patchew.org@nongnu.org;
 helo=lists.gnu.org;
Authentication-Results: mx.zoho.com;
	spf=pass (zoho.com: domain of gnu.org designates 208.118.235.17 as permitted
 sender)  smtp.mailfrom=qemu-devel-bounces+importer=patchew.org@nongnu.org;
Return-Path: <qemu-devel-bounces+importer=patchew.org@nongnu.org>
Received: from lists.gnu.org (lists.gnu.org [208.118.235.17]) by
 mx.zohomail.com
	with SMTPS id 1487680944246799.4627963258977;
 Tue, 21 Feb 2017 04:42:24 -0800 (PST)
Received: from localhost ([::1]:44196 helo=lists.gnu.org)
	by lists.gnu.org with esmtp (Exim 4.71)
	(envelope-from <qemu-devel-bounces+importer=patchew.org@nongnu.org>)
	id 1cg9ls-0005Kn-W8
	for importer@patchew.org; Tue, 21 Feb 2017 07:42:21 -0500
Received: from eggs.gnu.org ([2001:4830:134:3::10]:38635)
	by lists.gnu.org with esmtp (Exim 4.71)
	(envelope-from <stefanha@redhat.com>) id 1cg948-0006b6-Ek
	for qemu-devel@nongnu.org; Tue, 21 Feb 2017 06:57:10 -0500
Received: from Debian-exim by eggs.gnu.org with spam-scanned (Exim 4.71)
	(envelope-from <stefanha@redhat.com>) id 1cg943-0005QS-62
	for qemu-devel@nongnu.org; Tue, 21 Feb 2017 06:57:08 -0500
Received: from mx1.redhat.com ([209.132.183.28]:37546)
	by eggs.gnu.org with esmtps (TLS1.0:DHE_RSA_AES_256_CBC_SHA1:32)
	(Exim 4.71) (envelope-from <stefanha@redhat.com>) id 1cg942-0005QH-TV
	for qemu-devel@nongnu.org; Tue, 21 Feb 2017 06:57:03 -0500
Received: from int-mx11.intmail.prod.int.phx2.redhat.com
	(int-mx11.intmail.prod.int.phx2.redhat.com [10.5.11.24])
	(using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits))
	(No client certificate requested)
	by mx1.redhat.com (Postfix) with ESMTPS id F1AB83D944;
	Tue, 21 Feb 2017 11:57:02 +0000 (UTC)
Received: from localhost (ovpn-117-191.ams2.redhat.com [10.36.117.191])
	by int-mx11.intmail.prod.int.phx2.redhat.com (8.14.4/8.14.4) with ESMTP
	id v1LBv1De018937; Tue, 21 Feb 2017 06:57:02 -0500
From: Stefan Hajnoczi <stefanha@redhat.com>
To: <qemu-devel@nongnu.org>
Date: Tue, 21 Feb 2017 11:56:27 +0000
Message-Id: <20170221115644.28264-8-stefanha@redhat.com>
In-Reply-To: <20170221115644.28264-1-stefanha@redhat.com>
References: <20170221115644.28264-1-stefanha@redhat.com>
X-Scanned-By: MIMEDefang 2.68 on 10.5.11.24
X-Greylist: Sender IP whitelisted, not delayed by milter-greylist-4.5.16
	(mx1.redhat.com [10.5.110.30]);
	Tue, 21 Feb 2017 11:57:03 +0000 (UTC)
X-detected-operating-system: by eggs.gnu.org: GNU/Linux 2.2.x-3.x [generic]
	[fuzzy]
X-Received-From: 209.132.183.28
Subject: [Qemu-devel] [PULL v2 07/24] nbd: convert to use qio_channel_yield
X-BeenThere: qemu-devel@nongnu.org
X-Mailman-Version: 2.1.21
Precedence: list
List-Id: <qemu-devel.nongnu.org>
List-Unsubscribe: <https://lists.nongnu.org/mailman/options/qemu-devel>,
	<mailto:qemu-devel-request@nongnu.org?subject=unsubscribe>
List-Archive: <http://lists.nongnu.org/archive/html/qemu-devel/>
List-Post: <mailto:qemu-devel@nongnu.org>
List-Help: <mailto:qemu-devel-request@nongnu.org?subject=help>
List-Subscribe: <https://lists.nongnu.org/mailman/listinfo/qemu-devel>,
	<mailto:qemu-devel-request@nongnu.org?subject=subscribe>
Cc: Peter Maydell <peter.maydell@linaro.org>,
	Stefan Hajnoczi <stefanha@redhat.com>, Paolo Bonzini <pbonzini@redhat.com>
Errors-To: qemu-devel-bounces+importer=patchew.org@nongnu.org
Sender: "Qemu-devel" <qemu-devel-bounces+importer=patchew.org@nongnu.org>
X-ZohoMail: RSF_0  Z_629925259 SPT_0
Content-Transfer-Encoding: quoted-printable
MIME-Version: 1.0
Content-Type: text/plain; charset="utf-8"

From: Paolo Bonzini <pbonzini@redhat.com>

In the client, read the reply headers from a coroutine, switching the
read side between the "read header" coroutine and the I/O coroutine that
reads the body of the reply.

In the server, if the server can read more requests it will create a new
"read request" coroutine as soon as a request has been read.  Otherwise,
the new coroutine is created in nbd_request_put.

Reviewed-by: Stefan Hajnoczi <stefanha@redhat.com>
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
Reviewed-by: Fam Zheng <famz@redhat.com>
Reviewed-by: Daniel P. Berrange <berrange@redhat.com>
Message-id: 20170213135235.12274-8-pbonzini@redhat.com
Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
---
 block/nbd-client.h |   2 +-
 block/nbd-client.c | 117 ++++++++++++++++++++++++-------------------------=
----
 nbd/client.c       |   2 +-
 nbd/common.c       |   9 +----
 nbd/server.c       |  94 +++++++++++++-----------------------------
 5 files changed, 83 insertions(+), 141 deletions(-)

diff --git a/block/nbd-client.h b/block/nbd-client.h
index f8d6006..8cdfc92 100644
--- a/block/nbd-client.h
+++ b/block/nbd-client.h
@@ -25,7 +25,7 @@ typedef struct NBDClientSession {
=20
     CoMutex send_mutex;
     CoQueue free_sema;
-    Coroutine *send_coroutine;
+    Coroutine *read_reply_co;
     int in_flight;
=20
     Coroutine *recv_coroutine[MAX_NBD_REQUESTS];
diff --git a/block/nbd-client.c b/block/nbd-client.c
index 06f1532..10fcc9e 100644
--- a/block/nbd-client.c
+++ b/block/nbd-client.c
@@ -33,8 +33,9 @@
 #define HANDLE_TO_INDEX(bs, handle) ((handle) ^ ((uint64_t)(intptr_t)bs))
 #define INDEX_TO_HANDLE(bs, index)  ((index)  ^ ((uint64_t)(intptr_t)bs))
=20
-static void nbd_recv_coroutines_enter_all(NBDClientSession *s)
+static void nbd_recv_coroutines_enter_all(BlockDriverState *bs)
 {
+    NBDClientSession *s =3D nbd_get_client_session(bs);
     int i;
=20
     for (i =3D 0; i < MAX_NBD_REQUESTS; i++) {
@@ -42,6 +43,7 @@ static void nbd_recv_coroutines_enter_all(NBDClientSessio=
n *s)
             qemu_coroutine_enter(s->recv_coroutine[i]);
         }
     }
+    BDRV_POLL_WHILE(bs, s->read_reply_co);
 }
=20
 static void nbd_teardown_connection(BlockDriverState *bs)
@@ -56,7 +58,7 @@ static void nbd_teardown_connection(BlockDriverState *bs)
     qio_channel_shutdown(client->ioc,
                          QIO_CHANNEL_SHUTDOWN_BOTH,
                          NULL);
-    nbd_recv_coroutines_enter_all(client);
+    nbd_recv_coroutines_enter_all(bs);
=20
     nbd_client_detach_aio_context(bs);
     object_unref(OBJECT(client->sioc));
@@ -65,54 +67,43 @@ static void nbd_teardown_connection(BlockDriverState *b=
s)
     client->ioc =3D NULL;
 }
=20
-static void nbd_reply_ready(void *opaque)
+static coroutine_fn void nbd_read_reply_entry(void *opaque)
 {
-    BlockDriverState *bs =3D opaque;
-    NBDClientSession *s =3D nbd_get_client_session(bs);
+    NBDClientSession *s =3D opaque;
     uint64_t i;
     int ret;
=20
-    if (!s->ioc) { /* Already closed */
-        return;
-    }
-
-    if (s->reply.handle =3D=3D 0) {
-        /* No reply already in flight.  Fetch a header.  It is possible
-         * that another thread has done the same thing in parallel, so
-         * the socket is not readable anymore.
-         */
+    for (;;) {
+        assert(s->reply.handle =3D=3D 0);
         ret =3D nbd_receive_reply(s->ioc, &s->reply);
-        if (ret =3D=3D -EAGAIN) {
-            return;
-        }
         if (ret < 0) {
-            s->reply.handle =3D 0;
-            goto fail;
+            break;
         }
-    }
=20
-    /* There's no need for a mutex on the receive side, because the
-     * handler acts as a synchronization point and ensures that only
-     * one coroutine is called until the reply finishes.  */
-    i =3D HANDLE_TO_INDEX(s, s->reply.handle);
-    if (i >=3D MAX_NBD_REQUESTS) {
-        goto fail;
-    }
+        /* There's no need for a mutex on the receive side, because the
+         * handler acts as a synchronization point and ensures that only
+         * one coroutine is called until the reply finishes.
+         */
+        i =3D HANDLE_TO_INDEX(s, s->reply.handle);
+        if (i >=3D MAX_NBD_REQUESTS || !s->recv_coroutine[i]) {
+            break;
+        }
=20
-    if (s->recv_coroutine[i]) {
-        qemu_coroutine_enter(s->recv_coroutine[i]);
-        return;
+        /* We're woken up by the recv_coroutine itself.  Note that there
+         * is no race between yielding and reentering read_reply_co.  This
+         * is because:
+         *
+         * - if recv_coroutine[i] runs on the same AioContext, it is only
+         *   entered after we yield
+         *
+         * - if recv_coroutine[i] runs on a different AioContext, reenteri=
ng
+         *   read_reply_co happens through a bottom half, which can only
+         *   run after we yield.
+         */
+        aio_co_wake(s->recv_coroutine[i]);
+        qemu_coroutine_yield();
     }
-
-fail:
-    nbd_teardown_connection(bs);
-}
-
-static void nbd_restart_write(void *opaque)
-{
-    BlockDriverState *bs =3D opaque;
-
-    qemu_coroutine_enter(nbd_get_client_session(bs)->send_coroutine);
+    s->read_reply_co =3D NULL;
 }
=20
 static int nbd_co_send_request(BlockDriverState *bs,
@@ -120,7 +111,6 @@ static int nbd_co_send_request(BlockDriverState *bs,
                                QEMUIOVector *qiov)
 {
     NBDClientSession *s =3D nbd_get_client_session(bs);
-    AioContext *aio_context;
     int rc, ret, i;
=20
     qemu_co_mutex_lock(&s->send_mutex);
@@ -141,11 +131,6 @@ static int nbd_co_send_request(BlockDriverState *bs,
         return -EPIPE;
     }
=20
-    s->send_coroutine =3D qemu_coroutine_self();
-    aio_context =3D bdrv_get_aio_context(bs);
-
-    aio_set_fd_handler(aio_context, s->sioc->fd, false,
-                       nbd_reply_ready, nbd_restart_write, NULL, bs);
     if (qiov) {
         qio_channel_set_cork(s->ioc, true);
         rc =3D nbd_send_request(s->ioc, request);
@@ -160,9 +145,6 @@ static int nbd_co_send_request(BlockDriverState *bs,
     } else {
         rc =3D nbd_send_request(s->ioc, request);
     }
-    aio_set_fd_handler(aio_context, s->sioc->fd, false,
-                       nbd_reply_ready, NULL, NULL, bs);
-    s->send_coroutine =3D NULL;
     qemu_co_mutex_unlock(&s->send_mutex);
     return rc;
 }
@@ -174,8 +156,7 @@ static void nbd_co_receive_reply(NBDClientSession *s,
 {
     int ret;
=20
-    /* Wait until we're woken up by the read handler.  TODO: perhaps
-     * peek at the next reply and avoid yielding if it's ours?  */
+    /* Wait until we're woken up by nbd_read_reply_entry.  */
     qemu_coroutine_yield();
     *reply =3D s->reply;
     if (reply->handle !=3D request->handle ||
@@ -209,13 +190,19 @@ static void nbd_coroutine_start(NBDClientSession *s,
     /* s->recv_coroutine[i] is set as soon as we get the send_lock.  */
 }
=20
-static void nbd_coroutine_end(NBDClientSession *s,
+static void nbd_coroutine_end(BlockDriverState *bs,
                               NBDRequest *request)
 {
+    NBDClientSession *s =3D nbd_get_client_session(bs);
     int i =3D HANDLE_TO_INDEX(s, request->handle);
+
     s->recv_coroutine[i] =3D NULL;
-    if (s->in_flight-- =3D=3D MAX_NBD_REQUESTS) {
-        qemu_co_queue_next(&s->free_sema);
+    s->in_flight--;
+    qemu_co_queue_next(&s->free_sema);
+
+    /* Kick the read_reply_co to get the next reply.  */
+    if (s->read_reply_co) {
+        aio_co_wake(s->read_reply_co);
     }
 }
=20
@@ -241,7 +228,7 @@ int nbd_client_co_preadv(BlockDriverState *bs, uint64_t=
 offset,
     } else {
         nbd_co_receive_reply(client, &request, &reply, qiov);
     }
-    nbd_coroutine_end(client, &request);
+    nbd_coroutine_end(bs, &request);
     return -reply.error;
 }
=20
@@ -271,7 +258,7 @@ int nbd_client_co_pwritev(BlockDriverState *bs, uint64_=
t offset,
     } else {
         nbd_co_receive_reply(client, &request, &reply, NULL);
     }
-    nbd_coroutine_end(client, &request);
+    nbd_coroutine_end(bs, &request);
     return -reply.error;
 }
=20
@@ -306,7 +293,7 @@ int nbd_client_co_pwrite_zeroes(BlockDriverState *bs, i=
nt64_t offset,
     } else {
         nbd_co_receive_reply(client, &request, &reply, NULL);
     }
-    nbd_coroutine_end(client, &request);
+    nbd_coroutine_end(bs, &request);
     return -reply.error;
 }
=20
@@ -331,7 +318,7 @@ int nbd_client_co_flush(BlockDriverState *bs)
     } else {
         nbd_co_receive_reply(client, &request, &reply, NULL);
     }
-    nbd_coroutine_end(client, &request);
+    nbd_coroutine_end(bs, &request);
     return -reply.error;
 }
=20
@@ -357,23 +344,23 @@ int nbd_client_co_pdiscard(BlockDriverState *bs, int6=
4_t offset, int count)
     } else {
         nbd_co_receive_reply(client, &request, &reply, NULL);
     }
-    nbd_coroutine_end(client, &request);
+    nbd_coroutine_end(bs, &request);
     return -reply.error;
=20
 }
=20
 void nbd_client_detach_aio_context(BlockDriverState *bs)
 {
-    aio_set_fd_handler(bdrv_get_aio_context(bs),
-                       nbd_get_client_session(bs)->sioc->fd,
-                       false, NULL, NULL, NULL, NULL);
+    NBDClientSession *client =3D nbd_get_client_session(bs);
+    qio_channel_detach_aio_context(QIO_CHANNEL(client->sioc));
 }
=20
 void nbd_client_attach_aio_context(BlockDriverState *bs,
                                    AioContext *new_context)
 {
-    aio_set_fd_handler(new_context, nbd_get_client_session(bs)->sioc->fd,
-                       false, nbd_reply_ready, NULL, NULL, bs);
+    NBDClientSession *client =3D nbd_get_client_session(bs);
+    qio_channel_attach_aio_context(QIO_CHANNEL(client->sioc), new_context);
+    aio_co_schedule(new_context, client->read_reply_co);
 }
=20
 void nbd_client_close(BlockDriverState *bs)
@@ -434,7 +421,7 @@ int nbd_client_init(BlockDriverState *bs,
     /* Now that we're connected, set the socket to be non-blocking and
      * kick the reply mechanism.  */
     qio_channel_set_blocking(QIO_CHANNEL(sioc), false, NULL);
-
+    client->read_reply_co =3D qemu_coroutine_create(nbd_read_reply_entry, =
client);
     nbd_client_attach_aio_context(bs, bdrv_get_aio_context(bs));
=20
     logout("Established connection with NBD server\n");
diff --git a/nbd/client.c b/nbd/client.c
index ffb0743..5c9dee3 100644
--- a/nbd/client.c
+++ b/nbd/client.c
@@ -778,7 +778,7 @@ ssize_t nbd_receive_reply(QIOChannel *ioc, NBDReply *re=
ply)
     ssize_t ret;
=20
     ret =3D read_sync(ioc, buf, sizeof(buf));
-    if (ret < 0) {
+    if (ret <=3D 0) {
         return ret;
     }
=20
diff --git a/nbd/common.c b/nbd/common.c
index a5f39ea..dccbb8e 100644
--- a/nbd/common.c
+++ b/nbd/common.c
@@ -43,14 +43,7 @@ ssize_t nbd_wr_syncv(QIOChannel *ioc,
         }
         if (len =3D=3D QIO_CHANNEL_ERR_BLOCK) {
             if (qemu_in_coroutine()) {
-                /* XXX figure out if we can create a variant on
-                 * qio_channel_yield() that works with AIO contexts
-                 * and consider using that in this branch */
-                qemu_coroutine_yield();
-            } else if (done) {
-                /* XXX this is needed by nbd_reply_ready.  */
-                qio_channel_wait(ioc,
-                                 do_read ? G_IO_IN : G_IO_OUT);
+                qio_channel_yield(ioc, do_read ? G_IO_IN : G_IO_OUT);
             } else {
                 return -EAGAIN;
             }
diff --git a/nbd/server.c b/nbd/server.c
index efe5cb8..ac92fa0 100644
--- a/nbd/server.c
+++ b/nbd/server.c
@@ -95,8 +95,6 @@ struct NBDClient {
     CoMutex send_lock;
     Coroutine *send_coroutine;
=20
-    bool can_read;
-
     QTAILQ_ENTRY(NBDClient) next;
     int nb_requests;
     bool closing;
@@ -104,9 +102,7 @@ struct NBDClient {
=20
 /* That's all folks */
=20
-static void nbd_set_handlers(NBDClient *client);
-static void nbd_unset_handlers(NBDClient *client);
-static void nbd_update_can_read(NBDClient *client);
+static void nbd_client_receive_next_request(NBDClient *client);
=20
 static gboolean nbd_negotiate_continue(QIOChannel *ioc,
                                        GIOCondition condition,
@@ -785,7 +781,7 @@ void nbd_client_put(NBDClient *client)
          */
         assert(client->closing);
=20
-        nbd_unset_handlers(client);
+        qio_channel_detach_aio_context(client->ioc);
         object_unref(OBJECT(client->sioc));
         object_unref(OBJECT(client->ioc));
         if (client->tlscreds) {
@@ -826,7 +822,6 @@ static NBDRequestData *nbd_request_get(NBDClient *clien=
t)
=20
     assert(client->nb_requests <=3D MAX_NBD_REQUESTS - 1);
     client->nb_requests++;
-    nbd_update_can_read(client);
=20
     req =3D g_new0(NBDRequestData, 1);
     nbd_client_get(client);
@@ -844,7 +839,8 @@ static void nbd_request_put(NBDRequestData *req)
     g_free(req);
=20
     client->nb_requests--;
-    nbd_update_can_read(client);
+    nbd_client_receive_next_request(client);
+
     nbd_client_put(client);
 }
=20
@@ -858,7 +854,13 @@ static void blk_aio_attached(AioContext *ctx, void *op=
aque)
     exp->ctx =3D ctx;
=20
     QTAILQ_FOREACH(client, &exp->clients, next) {
-        nbd_set_handlers(client);
+        qio_channel_attach_aio_context(client->ioc, ctx);
+        if (client->recv_coroutine) {
+            aio_co_schedule(ctx, client->recv_coroutine);
+        }
+        if (client->send_coroutine) {
+            aio_co_schedule(ctx, client->send_coroutine);
+        }
     }
 }
=20
@@ -870,7 +872,7 @@ static void blk_aio_detach(void *opaque)
     TRACE("Export %s: Detaching clients from AIO context %p\n", exp->name,=
 exp->ctx);
=20
     QTAILQ_FOREACH(client, &exp->clients, next) {
-        nbd_unset_handlers(client);
+        qio_channel_detach_aio_context(client->ioc);
     }
=20
     exp->ctx =3D NULL;
@@ -1045,7 +1047,6 @@ static ssize_t nbd_co_send_reply(NBDRequestData *req,=
 NBDReply *reply,
     g_assert(qemu_in_coroutine());
     qemu_co_mutex_lock(&client->send_lock);
     client->send_coroutine =3D qemu_coroutine_self();
-    nbd_set_handlers(client);
=20
     if (!len) {
         rc =3D nbd_send_reply(client->ioc, reply);
@@ -1062,7 +1063,6 @@ static ssize_t nbd_co_send_reply(NBDRequestData *req,=
 NBDReply *reply,
     }
=20
     client->send_coroutine =3D NULL;
-    nbd_set_handlers(client);
     qemu_co_mutex_unlock(&client->send_lock);
     return rc;
 }
@@ -1079,9 +1079,7 @@ static ssize_t nbd_co_receive_request(NBDRequestData =
*req,
     ssize_t rc;
=20
     g_assert(qemu_in_coroutine());
-    client->recv_coroutine =3D qemu_coroutine_self();
-    nbd_update_can_read(client);
-
+    assert(client->recv_coroutine =3D=3D qemu_coroutine_self());
     rc =3D nbd_receive_request(client->ioc, request);
     if (rc < 0) {
         if (rc !=3D -EAGAIN) {
@@ -1163,23 +1161,25 @@ static ssize_t nbd_co_receive_request(NBDRequestDat=
a *req,
=20
 out:
     client->recv_coroutine =3D NULL;
-    nbd_update_can_read(client);
+    nbd_client_receive_next_request(client);
=20
     return rc;
 }
=20
-static void nbd_trip(void *opaque)
+/* Owns a reference to the NBDClient passed as opaque.  */
+static coroutine_fn void nbd_trip(void *opaque)
 {
     NBDClient *client =3D opaque;
     NBDExport *exp =3D client->exp;
     NBDRequestData *req;
-    NBDRequest request;
+    NBDRequest request =3D { 0 };    /* GCC thinks it can be used uninitia=
lized */
     NBDReply reply;
     ssize_t ret;
     int flags;
=20
     TRACE("Reading request.");
     if (client->closing) {
+        nbd_client_put(client);
         return;
     }
=20
@@ -1338,60 +1338,21 @@ static void nbd_trip(void *opaque)
=20
 done:
     nbd_request_put(req);
+    nbd_client_put(client);
     return;
=20
 out:
     nbd_request_put(req);
     client_close(client);
+    nbd_client_put(client);
 }
=20
-static void nbd_read(void *opaque)
+static void nbd_client_receive_next_request(NBDClient *client)
 {
-    NBDClient *client =3D opaque;
-
-    if (client->recv_coroutine) {
-        qemu_coroutine_enter(client->recv_coroutine);
-    } else {
-        qemu_coroutine_enter(qemu_coroutine_create(nbd_trip, client));
-    }
-}
-
-static void nbd_restart_write(void *opaque)
-{
-    NBDClient *client =3D opaque;
-
-    qemu_coroutine_enter(client->send_coroutine);
-}
-
-static void nbd_set_handlers(NBDClient *client)
-{
-    if (client->exp && client->exp->ctx) {
-        aio_set_fd_handler(client->exp->ctx, client->sioc->fd, true,
-                           client->can_read ? nbd_read : NULL,
-                           client->send_coroutine ? nbd_restart_write : NU=
LL,
-                           NULL, client);
-    }
-}
-
-static void nbd_unset_handlers(NBDClient *client)
-{
-    if (client->exp && client->exp->ctx) {
-        aio_set_fd_handler(client->exp->ctx, client->sioc->fd, true, NULL,
-                           NULL, NULL, NULL);
-    }
-}
-
-static void nbd_update_can_read(NBDClient *client)
-{
-    bool can_read =3D client->recv_coroutine ||
-                    client->nb_requests < MAX_NBD_REQUESTS;
-
-    if (can_read !=3D client->can_read) {
-        client->can_read =3D can_read;
-        nbd_set_handlers(client);
-
-        /* There is no need to invoke aio_notify(), since aio_set_fd_handl=
er()
-         * in nbd_set_handlers() will have taken care of that */
+    if (!client->recv_coroutine && client->nb_requests < MAX_NBD_REQUESTS)=
 {
+        nbd_client_get(client);
+        client->recv_coroutine =3D qemu_coroutine_create(nbd_trip, client);
+        aio_co_schedule(client->exp->ctx, client->recv_coroutine);
     }
 }
=20
@@ -1409,11 +1370,13 @@ static coroutine_fn void nbd_co_client_start(void *=
opaque)
         goto out;
     }
     qemu_co_mutex_init(&client->send_lock);
-    nbd_set_handlers(client);
=20
     if (exp) {
         QTAILQ_INSERT_TAIL(&exp->clients, client, next);
     }
+
+    nbd_client_receive_next_request(client);
+
 out:
     g_free(data);
 }
@@ -1439,7 +1402,6 @@ void nbd_client_new(NBDExport *exp,
     object_ref(OBJECT(client->sioc));
     client->ioc =3D QIO_CHANNEL(sioc);
     object_ref(OBJECT(client->ioc));
-    client->can_read =3D true;
     client->close =3D close_fn;
=20
     data->client =3D client;
--=20
2.9.3