From nobody Mon Nov 25 05:09:59 2024 Delivered-To: importer@patchew.org Authentication-Results: mx.zohomail.com; spf=pass (zohomail.com: domain of gnu.org designates 209.51.188.17 as permitted sender) smtp.mailfrom=qemu-devel-bounces+importer=patchew.org@nongnu.org; dmarc=pass(p=none dis=none) header.from=nongnu.org ARC-Seal: i=1; a=rsa-sha256; t=1717503328; cv=none; d=zohomail.com; s=zohoarc; b=gvLqSuY+igAGOQ3eWCfpXhc64OgRpl9F3+EuO+0vj52+rAmHJfH1ZlZnSqDI5Yzg2IETd3wAchlM1e6WkD27m1ph1VGwJKX6urtvKP6oUoNBxdkR/x8R2juEEd/eQ111jQ0Qt/79BKVHAQt3tunzmC6T5vA2M6ncAjoRODDRsvA= ARC-Message-Signature: i=1; a=rsa-sha256; c=relaxed/relaxed; d=zohomail.com; s=zohoarc; t=1717503328; h=Content-Type:Cc:Cc:Date:Date:From:From:In-Reply-To:List-Subscribe:List-Post:List-Id:List-Archive:List-Help:List-Unsubscribe:MIME-Version:Message-ID:Reply-To:Reply-To:References:Sender:Subject:Subject:To:To:Message-Id; bh=YVvT3RFgZaIqyf+Z/LFIls77jROzVJvfDyHcGACa1Ps=; b=in4swNpelwqZw9UhnarsRGLpVeRlJ3h46IaHUnaePTDpVJ3nUeguQ5B8qqJwxnog7WYruFb2ajsIp9l1aMpmYIH/tmVwnuVpRxsEaw6MsLIrdQIY0D+WfmNRRte5NUaG2f37W2rhMAkCWylHAgyQ5s+zIdeSc5F2ovR6doWAc9w= ARC-Authentication-Results: i=1; mx.zohomail.com; spf=pass (zohomail.com: domain of gnu.org designates 209.51.188.17 as permitted sender) smtp.mailfrom=qemu-devel-bounces+importer=patchew.org@nongnu.org; dmarc=pass header.from= (p=none dis=none) Return-Path: Received: from lists.gnu.org (lists.gnu.org [209.51.188.17]) by mx.zohomail.com with SMTPS id 1717503328623879.509428279886; Tue, 4 Jun 2024 05:15:28 -0700 (PDT) Received: from localhost ([::1] helo=lists1p.gnu.org) by lists.gnu.org with esmtp (Exim 4.90_1) (envelope-from ) id 1sET3r-0003k6-NG; Tue, 04 Jun 2024 08:14:43 -0400 Received: from eggs.gnu.org ([2001:470:142:3::10]) by lists.gnu.org with esmtps (TLS1.2:ECDHE_RSA_AES_256_GCM_SHA384:256) (Exim 4.90_1) (envelope-from ) id 1sET3o-0003iN-RU for qemu-devel@nongnu.org; Tue, 04 Jun 2024 08:14:40 -0400 Received: from szxga08-in.huawei.com ([45.249.212.255]) by eggs.gnu.org with esmtps (TLS1.2:ECDHE_RSA_AES_256_GCM_SHA384:256) (Exim 4.90_1) (envelope-from ) id 1sET3j-0006wf-91 for qemu-devel@nongnu.org; Tue, 04 Jun 2024 08:14:40 -0400 Received: from mail.maildlp.com (unknown [172.19.163.48]) by szxga08-in.huawei.com (SkyGuard) with ESMTP id 4VtqDL10fyz1S9Lw; Tue, 4 Jun 2024 20:10:26 +0800 (CST) Received: from dggpemf200006.china.huawei.com (unknown [7.185.36.61]) by mail.maildlp.com (Postfix) with ESMTPS id 5BC93180060; Tue, 4 Jun 2024 20:14:21 +0800 (CST) Received: from DESKTOP-8LI8G6S.china.huawei.com (10.173.124.235) by dggpemf200006.china.huawei.com (7.185.36.61) with Microsoft SMTP Server (version=TLS1_2, cipher=TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384) id 15.2.1544.11; Tue, 4 Jun 2024 20:14:20 +0800 To: CC: , , , , , , , , , , , , , , , Jialin Wang Subject: [PATCH 3/6] io/channel-rdma: support working in coroutine Date: Tue, 4 Jun 2024 20:14:09 +0800 Message-ID: <1717503252-51884-4-git-send-email-arei.gonglei@huawei.com> X-Mailer: git-send-email 2.8.2.windows.1 In-Reply-To: <1717503252-51884-1-git-send-email-arei.gonglei@huawei.com> References: <1717503252-51884-1-git-send-email-arei.gonglei@huawei.com> MIME-Version: 1.0 X-Originating-IP: [10.173.124.235] X-ClientProxiedBy: dggems704-chm.china.huawei.com (10.3.19.181) To dggpemf200006.china.huawei.com (7.185.36.61) Received-SPF: pass (zohomail.com: domain of gnu.org designates 209.51.188.17 as permitted sender) client-ip=209.51.188.17; envelope-from=qemu-devel-bounces+importer=patchew.org@nongnu.org; helo=lists.gnu.org; Received-SPF: pass client-ip=45.249.212.255; envelope-from=arei.gonglei@huawei.com; helo=szxga08-in.huawei.com X-Spam_score_int: -41 X-Spam_score: -4.2 X-Spam_bar: ---- X-Spam_report: (-4.2 / 5.0 requ) BAYES_00=-1.9, RCVD_IN_DNSWL_MED=-2.3, RCVD_IN_MSPIKE_H4=0.001, RCVD_IN_MSPIKE_WL=0.001, SPF_HELO_NONE=0.001, SPF_PASS=-0.001, T_SCC_BODY_TEXT_LINE=-0.01 autolearn=ham autolearn_force=no X-Spam_action: no action X-BeenThere: qemu-devel@nongnu.org X-Mailman-Version: 2.1.29 Precedence: list List-Id: List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Reply-to: Gonglei From: Gonglei via Errors-To: qemu-devel-bounces+importer=patchew.org@nongnu.org Sender: qemu-devel-bounces+importer=patchew.org@nongnu.org X-ZM-MESSAGEID: 1717503330246100001 Content-Transfer-Encoding: quoted-printable Content-Type: text/plain; charset="utf-8" From: Jialin Wang It is not feasible to obtain RDMA completion queue notifications through poll/ppoll on the rsocket fd. Therefore, we create a thread named rpoller for each rsocket fd and two eventfds: pollin_eventfd and pollout_eventfd. When using io_create_watch or io_set_aio_fd_handler waits for POLLIN or POLLOUT events, it will actually poll/ppoll on the pollin_eventfd and pollout_eventfd instead of the rsocket fd. The rpoller rpoll() on the rsocket fd to receive POLLIN and POLLOUT events. When a POLLIN event occurs, the rpoller write the pollin_eventfd, and then poll/ppoll will return the POLLIN event. When a POLLOUT event occurs, the rpoller read the pollout_eventfd, and then poll/ppoll will return the POLLOUT event. For a non-blocking rsocket fd, if rread/rwrite returns EAGAIN, it will read/write the pollin/pollout_eventfd, preventing poll/ppoll from returning POLLIN/POLLOUT events. Known limitations: For a blocking rsocket fd, if we use io_create_watch to wait for POLLIN or POLLOUT events, since the rsocket fd is blocking, we cannot determine when it is not ready to read/write as we can with non-blocking fds. Therefore, when an event occurs, it will occurs always, potentially leave the qemu hanging. So we need be cautious to avoid hanging when using io_create_watch . Luckily, channel-rdma works well in coroutines :) Signed-off-by: Jialin Wang Signed-off-by: Gonglei --- include/io/channel-rdma.h | 15 +- io/channel-rdma.c | 363 +++++++++++++++++++++++++++++++++++++- 2 files changed, 376 insertions(+), 2 deletions(-) diff --git a/include/io/channel-rdma.h b/include/io/channel-rdma.h index 8cab2459e5..cb56127d76 100644 --- a/include/io/channel-rdma.h +++ b/include/io/channel-rdma.h @@ -47,6 +47,18 @@ struct QIOChannelRDMA { socklen_t localAddrLen; struct sockaddr_storage remoteAddr; socklen_t remoteAddrLen; + + /* private */ + + /* qemu g_poll/ppoll() POLLIN event on it */ + int pollin_eventfd; + /* qemu g_poll/ppoll() POLLOUT event on it */ + int pollout_eventfd; + + /* the index in the rpoller's fds array */ + int index; + /* rpoller will rpoll() rpoll_events on the rsocket fd */ + short int rpoll_events; }; =20 /** @@ -147,6 +159,7 @@ void qio_channel_rdma_listen_async(QIOChannelRDMA *ioc,= InetSocketAddress *addr, * * Returns: the new client channel, or NULL on error */ -QIOChannelRDMA *qio_channel_rdma_accept(QIOChannelRDMA *ioc, Error **errp); +QIOChannelRDMA *coroutine_mixed_fn qio_channel_rdma_accept(QIOChannelRDMA = *ioc, + Error **errp); =20 #endif /* QIO_CHANNEL_RDMA_H */ diff --git a/io/channel-rdma.c b/io/channel-rdma.c index 92c362df52..9792add5cf 100644 --- a/io/channel-rdma.c +++ b/io/channel-rdma.c @@ -23,10 +23,15 @@ =20 #include "qemu/osdep.h" #include "io/channel-rdma.h" +#include "io/channel-util.h" +#include "io/channel-watch.h" #include "io/channel.h" #include "qapi/clone-visitor.h" #include "qapi/error.h" #include "qapi/qapi-visit-sockets.h" +#include "qemu/atomic.h" +#include "qemu/error-report.h" +#include "qemu/thread.h" #include "trace.h" #include #include @@ -39,11 +44,274 @@ #include #include =20 +typedef enum { + CLEAR_POLLIN, + CLEAR_POLLOUT, + SET_POLLIN, + SET_POLLOUT, +} UpdateEvent; + +typedef enum { + RP_CMD_ADD_IOC, + RP_CMD_DEL_IOC, + RP_CMD_UPDATE, +} RpollerCMD; + +typedef struct { + RpollerCMD cmd; + QIOChannelRDMA *rioc; +} RpollerMsg; + +/* + * rpoll() on the rsocket fd with rpoll_events, when POLLIN/POLLOUT event + * occurs, it will write/read the pollin_eventfd/pollout_eventfd to allow + * qemu g_poll/ppoll() get the POLLIN/POLLOUT event + */ +static struct Rpoller { + QemuThread thread; + bool is_running; + int sock[2]; + int count; /* the number of rsocket fds being rpoll() */ + int size; /* the size of fds/riocs */ + struct pollfd *fds; + QIOChannelRDMA **riocs; +} rpoller; + +static void qio_channel_rdma_notify_rpoller(QIOChannelRDMA *rioc, + RpollerCMD cmd) +{ + RpollerMsg msg; + int ret; + + msg.cmd =3D cmd; + msg.rioc =3D rioc; + + ret =3D RETRY_ON_EINTR(write(rpoller.sock[0], &msg, sizeof msg)); + if (ret !=3D sizeof msg) { + error_report("%s: failed to send msg, errno: %d", __func__, errno); + } +} + +static void qio_channel_rdma_update_poll_event(QIOChannelRDMA *rioc, + UpdateEvent action, + bool notify_rpoller) +{ + /* An eventfd with the value of ULLONG_MAX - 1 is readable but unwrita= ble */ + unsigned long long buf =3D ULLONG_MAX - 1; + + switch (action) { + /* only rpoller do SET_* action, to allow qemu ppoll() get the event */ + case SET_POLLIN: + RETRY_ON_EINTR(write(rioc->pollin_eventfd, &buf, sizeof buf)); + rioc->rpoll_events &=3D ~POLLIN; + break; + case SET_POLLOUT: + RETRY_ON_EINTR(read(rioc->pollout_eventfd, &buf, sizeof buf)); + rioc->rpoll_events &=3D ~POLLOUT; + break; + + /* the rsocket fd is not ready to rread/rwrite */ + case CLEAR_POLLIN: + RETRY_ON_EINTR(read(rioc->pollin_eventfd, &buf, sizeof buf)); + rioc->rpoll_events |=3D POLLIN; + break; + case CLEAR_POLLOUT: + RETRY_ON_EINTR(write(rioc->pollout_eventfd, &buf, sizeof buf)); + rioc->rpoll_events |=3D POLLOUT; + break; + default: + break; + } + + /* notify rpoller to rpoll() POLLIN/POLLOUT events */ + if (notify_rpoller) { + qio_channel_rdma_notify_rpoller(rioc, RP_CMD_UPDATE); + } +} + +static void qio_channel_rdma_rpoller_add_rioc(QIOChannelRDMA *rioc) +{ + if (rioc->index !=3D -1) { + error_report("%s: rioc already exsits", __func__); + return; + } + + rioc->index =3D ++rpoller.count; + + if (rpoller.count + 1 > rpoller.size) { + rpoller.size *=3D 2; + rpoller.fds =3D g_renew(struct pollfd, rpoller.fds, rpoller.size); + rpoller.riocs =3D g_renew(QIOChannelRDMA *, rpoller.riocs, rpoller= .size); + } + + rpoller.fds[rioc->index].fd =3D rioc->fd; + rpoller.fds[rioc->index].events =3D rioc->rpoll_events; + rpoller.riocs[rioc->index] =3D rioc; +} + +static void qio_channel_rdma_rpoller_del_rioc(QIOChannelRDMA *rioc) +{ + if (rioc->index =3D=3D -1) { + error_report("%s: rioc not exsits", __func__); + return; + } + + rpoller.fds[rioc->index] =3D rpoller.fds[rpoller.count]; + rpoller.riocs[rioc->index] =3D rpoller.riocs[rpoller.count]; + rpoller.riocs[rioc->index]->index =3D rioc->index; + rpoller.count--; + + close(rioc->pollin_eventfd); + close(rioc->pollout_eventfd); + rioc->index =3D -1; + rioc->rpoll_events =3D 0; +} + +static void qio_channel_rdma_rpoller_update_ioc(QIOChannelRDMA *rioc) +{ + if (rioc->index =3D=3D -1) { + error_report("%s: rioc not exsits", __func__); + return; + } + + rpoller.fds[rioc->index].fd =3D rioc->fd; + rpoller.fds[rioc->index].events =3D rioc->rpoll_events; +} + +static void qio_channel_rdma_rpoller_process_msg(void) +{ + RpollerMsg msg; + int ret; + + ret =3D RETRY_ON_EINTR(read(rpoller.sock[1], &msg, sizeof msg)); + if (ret !=3D sizeof msg) { + error_report("%s: rpoller failed to recv msg: %s", __func__, + strerror(errno)); + return; + } + + switch (msg.cmd) { + case RP_CMD_ADD_IOC: + qio_channel_rdma_rpoller_add_rioc(msg.rioc); + break; + case RP_CMD_DEL_IOC: + qio_channel_rdma_rpoller_del_rioc(msg.rioc); + break; + case RP_CMD_UPDATE: + qio_channel_rdma_rpoller_update_ioc(msg.rioc); + break; + default: + break; + } +} + +static void qio_channel_rdma_rpoller_cleanup(void) +{ + close(rpoller.sock[0]); + close(rpoller.sock[1]); + rpoller.sock[0] =3D -1; + rpoller.sock[1] =3D -1; + g_free(rpoller.fds); + g_free(rpoller.riocs); + rpoller.fds =3D NULL; + rpoller.riocs =3D NULL; + rpoller.count =3D 0; + rpoller.size =3D 0; + rpoller.is_running =3D false; +} + +static void *qio_channel_rdma_rpoller_thread(void *opaque) +{ + int i, ret, error_events =3D POLLERR | POLLHUP | POLLNVAL; + + do { + ret =3D rpoll(rpoller.fds, rpoller.count + 1, -1); + if (ret < 0 && errno !=3D -EINTR) { + error_report("%s: rpoll() error: %s", __func__, strerror(errno= )); + break; + } + + for (i =3D 1; i <=3D rpoller.count; i++) { + if (rpoller.fds[i].revents & (POLLIN | error_events)) { + qio_channel_rdma_update_poll_event(rpoller.riocs[i], SET_P= OLLIN, + false); + rpoller.fds[i].events &=3D ~POLLIN; + } + if (rpoller.fds[i].revents & (POLLOUT | error_events)) { + qio_channel_rdma_update_poll_event(rpoller.riocs[i], + SET_POLLOUT, false); + rpoller.fds[i].events &=3D ~POLLOUT; + } + /* ignore this fd */ + if (rpoller.fds[i].revents & (error_events)) { + rpoller.fds[i].fd =3D -1; + } + } + + if (rpoller.fds[0].revents) { + qio_channel_rdma_rpoller_process_msg(); + } + } while (rpoller.count >=3D 1); + + qio_channel_rdma_rpoller_cleanup(); + + return NULL; +} + +static void qio_channel_rdma_rpoller_start(void) +{ + if (qatomic_xchg(&rpoller.is_running, true)) { + return; + } + + if (qemu_socketpair(AF_UNIX, SOCK_STREAM, 0, rpoller.sock)) { + rpoller.is_running =3D false; + error_report("%s: failed to create socketpair %s", __func__, + strerror(errno)); + return; + } + + rpoller.count =3D 0; + rpoller.size =3D 4; + rpoller.fds =3D g_malloc0_n(rpoller.size, sizeof(struct pollfd)); + rpoller.riocs =3D g_malloc0_n(rpoller.size, sizeof(QIOChannelRDMA *)); + rpoller.fds[0].fd =3D rpoller.sock[1]; + rpoller.fds[0].events =3D POLLIN; + + qemu_thread_create(&rpoller.thread, "qio-channel-rdma-rpoller", + qio_channel_rdma_rpoller_thread, NULL, + QEMU_THREAD_JOINABLE); +} + +static void qio_channel_rdma_add_rioc_to_rpoller(QIOChannelRDMA *rioc) +{ + int flags =3D EFD_CLOEXEC | EFD_NONBLOCK; + + /* + * A single eventfd is either readable or writable. A single eventfd c= annot + * represent a state where it is neither readable nor writable. so use= two + * eventfds here. + */ + rioc->pollin_eventfd =3D eventfd(0, flags); + rioc->pollout_eventfd =3D eventfd(0, flags); + /* pollout_eventfd with the value 0, means writable, make it unwritabl= e */ + qio_channel_rdma_update_poll_event(rioc, CLEAR_POLLOUT, false); + + /* tell the rpoller to rpoll() events on rioc->socketfd */ + rioc->rpoll_events =3D POLLIN | POLLOUT; + qio_channel_rdma_notify_rpoller(rioc, RP_CMD_ADD_IOC); +} + QIOChannelRDMA *qio_channel_rdma_new(void) { QIOChannelRDMA *rioc; QIOChannel *ioc; =20 + qio_channel_rdma_rpoller_start(); + if (!rpoller.is_running) { + return NULL; + } + rioc =3D QIO_CHANNEL_RDMA(object_new(TYPE_QIO_CHANNEL_RDMA)); ioc =3D QIO_CHANNEL(rioc); qio_channel_set_feature(ioc, QIO_CHANNEL_FEATURE_SHUTDOWN); @@ -125,6 +393,8 @@ retry: goto out; } =20 + qio_channel_rdma_add_rioc_to_rpoller(rioc); + out: if (ret) { trace_qio_channel_rdma_connect_fail(rioc); @@ -211,6 +481,8 @@ int qio_channel_rdma_listen_sync(QIOChannelRDMA *rioc, = InetSocketAddress *addr, qio_channel_set_feature(QIO_CHANNEL(rioc), QIO_CHANNEL_FEATURE_LISTEN); trace_qio_channel_rdma_listen_complete(rioc, fd); =20 + qio_channel_rdma_add_rioc_to_rpoller(rioc); + out: if (ret) { trace_qio_channel_rdma_listen_fail(rioc); @@ -267,8 +539,10 @@ void qio_channel_rdma_listen_async(QIOChannelRDMA *ioc= , InetSocketAddress *addr, qio_channel_listen_worker_free, context); } =20 -QIOChannelRDMA *qio_channel_rdma_accept(QIOChannelRDMA *rioc, Error **errp) +QIOChannelRDMA *coroutine_mixed_fn qio_channel_rdma_accept(QIOChannelRDMA = *rioc, + Error **errp) { + QIOChannel *ioc =3D QIO_CHANNEL(rioc); QIOChannelRDMA *cioc; =20 cioc =3D qio_channel_rdma_new(); @@ -283,6 +557,17 @@ retry: if (errno =3D=3D EINTR) { goto retry; } + if (errno =3D=3D EAGAIN) { + if (!(rioc->rpoll_events & POLLIN)) { + qio_channel_rdma_update_poll_event(rioc, CLEAR_POLLIN, tru= e); + } + if (qemu_in_coroutine()) { + qio_channel_yield(ioc, G_IO_IN); + } else { + qio_channel_wait(ioc, G_IO_IN); + } + goto retry; + } error_setg_errno(errp, errno, "Unable to accept connection"); goto error; } @@ -294,6 +579,8 @@ retry: goto error; } =20 + qio_channel_rdma_add_rioc_to_rpoller(cioc); + trace_qio_channel_rdma_accept_complete(rioc, cioc, cioc->fd); return cioc; =20 @@ -307,6 +594,10 @@ static void qio_channel_rdma_init(Object *obj) { QIOChannelRDMA *ioc =3D QIO_CHANNEL_RDMA(obj); ioc->fd =3D -1; + ioc->pollin_eventfd =3D -1; + ioc->pollout_eventfd =3D -1; + ioc->index =3D -1; + ioc->rpoll_events =3D 0; } =20 static void qio_channel_rdma_finalize(Object *obj) @@ -314,6 +605,7 @@ static void qio_channel_rdma_finalize(Object *obj) QIOChannelRDMA *ioc =3D QIO_CHANNEL_RDMA(obj); =20 if (ioc->fd !=3D -1) { + qio_channel_rdma_notify_rpoller(ioc, RP_CMD_DEL_IOC); rclose(ioc->fd); ioc->fd =3D -1; } @@ -330,6 +622,12 @@ static ssize_t qio_channel_rdma_readv(QIOChannel *ioc,= const struct iovec *iov, retry: ret =3D rreadv(rioc->fd, iov, niov); if (ret < 0) { + if (errno =3D=3D EAGAIN) { + if (!(rioc->rpoll_events & POLLIN)) { + qio_channel_rdma_update_poll_event(rioc, CLEAR_POLLIN, tru= e); + } + return QIO_CHANNEL_ERR_BLOCK; + } if (errno =3D=3D EINTR) { goto retry; } @@ -351,6 +649,12 @@ static ssize_t qio_channel_rdma_writev(QIOChannel *ioc= , const struct iovec *iov, retry: ret =3D rwritev(rioc->fd, iov, niov); if (ret <=3D 0) { + if (errno =3D=3D EAGAIN) { + if (!(rioc->rpoll_events & POLLOUT)) { + qio_channel_rdma_update_poll_event(rioc, CLEAR_POLLOUT, tr= ue); + } + return QIO_CHANNEL_ERR_BLOCK; + } if (errno =3D=3D EINTR) { goto retry; } @@ -361,6 +665,28 @@ retry: return ret; } =20 +static int qio_channel_rdma_set_blocking(QIOChannel *ioc, bool enabled, + Error **errp G_GNUC_UNUSED) +{ + QIOChannelRDMA *rioc =3D QIO_CHANNEL_RDMA(ioc); + int flags, ret; + + flags =3D rfcntl(rioc->fd, F_GETFL); + if (enabled) { + flags &=3D ~O_NONBLOCK; + } else { + flags |=3D O_NONBLOCK; + } + + ret =3D rfcntl(rioc->fd, F_SETFL, flags); + if (ret) { + error_setg_errno(errp, errno, + "Unable to rfcntl rsocket fd with flags %d", flag= s); + } + + return ret; +} + static void qio_channel_rdma_set_delay(QIOChannel *ioc, bool enabled) { QIOChannelRDMA *rioc =3D QIO_CHANNEL_RDMA(ioc); @@ -374,6 +700,7 @@ static int qio_channel_rdma_close(QIOChannel *ioc, Erro= r **errp) QIOChannelRDMA *rioc =3D QIO_CHANNEL_RDMA(ioc); =20 if (rioc->fd !=3D -1) { + qio_channel_rdma_notify_rpoller(rioc, RP_CMD_DEL_IOC); rclose(rioc->fd); rioc->fd =3D -1; } @@ -408,6 +735,37 @@ static int qio_channel_rdma_shutdown(QIOChannel *ioc, = QIOChannelShutdown how, return 0; } =20 +static void +qio_channel_rdma_set_aio_fd_handler(QIOChannel *ioc, AioContext *read_ctx, + IOHandler *io_read, AioContext *write_= ctx, + IOHandler *io_write, void *opaque) +{ + QIOChannelRDMA *rioc =3D QIO_CHANNEL_RDMA(ioc); + + qio_channel_util_set_aio_fd_handler(rioc->pollin_eventfd, read_ctx, io= _read, + rioc->pollout_eventfd, write_ctx, + io_write, opaque); +} + +static GSource *qio_channel_rdma_create_watch(QIOChannel *ioc, + GIOCondition condition) +{ + QIOChannelRDMA *rioc =3D QIO_CHANNEL_RDMA(ioc); + + switch (condition) { + case G_IO_IN: + return qio_channel_create_fd_watch(ioc, rioc->pollin_eventfd, + condition); + case G_IO_OUT: + return qio_channel_create_fd_watch(ioc, rioc->pollout_eventfd, + condition); + default: + error_report("%s: do not support watch 0x%x event", __func__, + condition); + return NULL; + } +} + static void qio_channel_rdma_class_init(ObjectClass *klass, void *class_data G_GNUC_UNUSED) { @@ -415,9 +773,12 @@ static void qio_channel_rdma_class_init(ObjectClass *k= lass, =20 ioc_klass->io_writev =3D qio_channel_rdma_writev; ioc_klass->io_readv =3D qio_channel_rdma_readv; + ioc_klass->io_set_blocking =3D qio_channel_rdma_set_blocking; ioc_klass->io_close =3D qio_channel_rdma_close; ioc_klass->io_shutdown =3D qio_channel_rdma_shutdown; ioc_klass->io_set_delay =3D qio_channel_rdma_set_delay; + ioc_klass->io_create_watch =3D qio_channel_rdma_create_watch; + ioc_klass->io_set_aio_fd_handler =3D qio_channel_rdma_set_aio_fd_handl= er; } =20 static const TypeInfo qio_channel_rdma_info =3D { --=20 2.43.0