Add the basic implementation for receiving vfio-user messages from the
control socket.
Originally-by: John Johnson <john.g.johnson@oracle.com>
Signed-off-by: Elena Ufimtseva <elena.ufimtseva@oracle.com>
Signed-off-by: Jagannathan Raman <jag.raman@oracle.com>
Signed-off-by: John Levon <john.levon@nutanix.com>
---
meson.build | 1 +
hw/vfio-user/protocol.h | 53 +++++
hw/vfio-user/proxy.h | 11 +
hw/vfio-user/trace.h | 1 +
hw/vfio-user/pci.c | 11 +
hw/vfio-user/proxy.c | 409 ++++++++++++++++++++++++++++++++++++++
hw/vfio-user/trace-events | 6 +
7 files changed, 492 insertions(+)
create mode 100644 hw/vfio-user/protocol.h
create mode 100644 hw/vfio-user/trace.h
create mode 100644 hw/vfio-user/trace-events
diff --git a/meson.build b/meson.build
index 34729c2a3d..3d2d8c97dc 100644
--- a/meson.build
+++ b/meson.build
@@ -3686,6 +3686,7 @@ if have_system
'hw/ufs',
'hw/usb',
'hw/vfio',
+ 'hw/vfio-user',
'hw/virtio',
'hw/vmapple',
'hw/watchdog',
diff --git a/hw/vfio-user/protocol.h b/hw/vfio-user/protocol.h
new file mode 100644
index 0000000000..4ddfb5f222
--- /dev/null
+++ b/hw/vfio-user/protocol.h
@@ -0,0 +1,53 @@
+#ifndef VFIO_USER_PROTOCOL_H
+#define VFIO_USER_PROTOCOL_H
+
+/*
+ * vfio protocol over a UNIX socket.
+ *
+ * Copyright © 2018, 2021 Oracle and/or its affiliates.
+ *
+ * Each message has a standard header that describes the command
+ * being sent, which is almost always a VFIO ioctl().
+ *
+ * The header may be followed by command-specific data, such as the
+ * region and offset info for read and write commands.
+ *
+ * SPDX-License-Identifier: GPL-2.0-or-later
+ */
+
+typedef struct {
+ uint16_t id;
+ uint16_t command;
+ uint32_t size;
+ uint32_t flags;
+ uint32_t error_reply;
+} VFIOUserHdr;
+
+/* VFIOUserHdr commands */
+enum vfio_user_command {
+ VFIO_USER_VERSION = 1,
+ VFIO_USER_DMA_MAP = 2,
+ VFIO_USER_DMA_UNMAP = 3,
+ VFIO_USER_DEVICE_GET_INFO = 4,
+ VFIO_USER_DEVICE_GET_REGION_INFO = 5,
+ VFIO_USER_DEVICE_GET_REGION_IO_FDS = 6,
+ VFIO_USER_DEVICE_GET_IRQ_INFO = 7,
+ VFIO_USER_DEVICE_SET_IRQS = 8,
+ VFIO_USER_REGION_READ = 9,
+ VFIO_USER_REGION_WRITE = 10,
+ VFIO_USER_DMA_READ = 11,
+ VFIO_USER_DMA_WRITE = 12,
+ VFIO_USER_DEVICE_RESET = 13,
+ VFIO_USER_DIRTY_PAGES = 14,
+ VFIO_USER_MAX,
+};
+
+/* VFIOUserHdr flags */
+#define VFIO_USER_REQUEST 0x0
+#define VFIO_USER_REPLY 0x1
+#define VFIO_USER_TYPE 0xF
+
+#define VFIO_USER_NO_REPLY 0x10
+#define VFIO_USER_ERROR 0x20
+
+#endif /* VFIO_USER_PROTOCOL_H */
diff --git a/hw/vfio-user/proxy.h b/hw/vfio-user/proxy.h
index a9bce82239..ff553cad9d 100644
--- a/hw/vfio-user/proxy.h
+++ b/hw/vfio-user/proxy.h
@@ -12,6 +12,9 @@
#include "io/channel.h"
#include "io/channel-socket.h"
+#include "qemu/sockets.h"
+#include "hw/vfio-user/protocol.h"
+
typedef struct {
int send_fds;
int recv_fds;
@@ -28,6 +31,7 @@ enum msg_type {
typedef struct VFIOUserMsg {
QTAILQ_ENTRY(VFIOUserMsg) next;
+ VFIOUserHdr *hdr;
VFIOUserFDs *fds;
uint32_t rsize;
uint32_t id;
@@ -67,13 +71,20 @@ typedef struct VFIOUserProxy {
VFIOUserMsgQ incoming;
VFIOUserMsgQ outgoing;
VFIOUserMsg *last_nowait;
+ VFIOUserMsg *part_recv;
+ size_t recv_left;
enum proxy_state state;
} VFIOUserProxy;
/* VFIOProxy flags */
#define VFIO_PROXY_CLIENT 0x1
+typedef struct VFIODevice VFIODevice;
+
VFIOUserProxy *vfio_user_connect_dev(SocketAddress *addr, Error **errp);
void vfio_user_disconnect(VFIOUserProxy *proxy);
+void vfio_user_set_handler(VFIODevice *vbasedev,
+ void (*handler)(void *opaque, VFIOUserMsg *msg),
+ void *reqarg);
#endif /* VFIO_USER_PROXY_H */
diff --git a/hw/vfio-user/trace.h b/hw/vfio-user/trace.h
new file mode 100644
index 0000000000..574b59aa89
--- /dev/null
+++ b/hw/vfio-user/trace.h
@@ -0,0 +1 @@
+#include "trace/trace-hw_vfio_user.h"
diff --git a/hw/vfio-user/pci.c b/hw/vfio-user/pci.c
index 642421e791..bad2829f5c 100644
--- a/hw/vfio-user/pci.c
+++ b/hw/vfio-user/pci.c
@@ -22,6 +22,16 @@ struct VFIOUserPCIDevice {
SocketAddress *socket;
};
+/*
+ * Incoming request message callback.
+ *
+ * Runs off main loop, so BQL held.
+ */
+static void vfio_user_pci_process_req(void *opaque, VFIOUserMsg *msg)
+{
+
+}
+
/*
* Emulated devices don't use host hot reset
*/
@@ -80,6 +90,7 @@ static void vfio_user_pci_realize(PCIDevice *pdev, Error **errp)
return;
}
vbasedev->proxy = proxy;
+ vfio_user_set_handler(vbasedev, vfio_user_pci_process_req, vdev);
/*
* vfio-user devices are effectively mdevs (don't use a host iommu).
diff --git a/hw/vfio-user/proxy.c b/hw/vfio-user/proxy.c
index bb436c9db9..31e08cbad3 100644
--- a/hw/vfio-user/proxy.c
+++ b/hw/vfio-user/proxy.c
@@ -11,15 +11,32 @@
#include "hw/vfio/vfio-device.h"
#include "hw/vfio-user/proxy.h"
+#include "hw/vfio-user/trace.h"
#include "qapi/error.h"
#include "qemu/error-report.h"
#include "qemu/lockable.h"
+#include "qemu/main-loop.h"
#include "system/iothread.h"
static IOThread *vfio_user_iothread;
static void vfio_user_shutdown(VFIOUserProxy *proxy);
+static VFIOUserMsg *vfio_user_getmsg(VFIOUserProxy *proxy, VFIOUserHdr *hdr,
+ VFIOUserFDs *fds);
+static VFIOUserFDs *vfio_user_getfds(int numfds);
+static void vfio_user_recycle(VFIOUserProxy *proxy, VFIOUserMsg *msg);
+static void vfio_user_recv(void *opaque);
+static int vfio_user_recv_one(VFIOUserProxy *proxy);
+static void vfio_user_cb(void *opaque);
+
+static void vfio_user_request(void *opaque);
+
+static inline void vfio_user_set_error(VFIOUserHdr *hdr, uint32_t err)
+{
+ hdr->flags |= VFIO_USER_ERROR;
+ hdr->error_reply = err;
+}
/*
* Functions called by main, CPU, or iothread threads
@@ -32,10 +49,340 @@ static void vfio_user_shutdown(VFIOUserProxy *proxy)
proxy->ctx, NULL, NULL);
}
+static VFIOUserMsg *vfio_user_getmsg(VFIOUserProxy *proxy, VFIOUserHdr *hdr,
+ VFIOUserFDs *fds)
+{
+ VFIOUserMsg *msg;
+
+ msg = QTAILQ_FIRST(&proxy->free);
+ if (msg != NULL) {
+ QTAILQ_REMOVE(&proxy->free, msg, next);
+ } else {
+ msg = g_malloc0(sizeof(*msg));
+ qemu_cond_init(&msg->cv);
+ }
+
+ msg->hdr = hdr;
+ msg->fds = fds;
+ return msg;
+}
+
+/*
+ * Recycle a message list entry to the free list.
+ */
+static void vfio_user_recycle(VFIOUserProxy *proxy, VFIOUserMsg *msg)
+{
+ if (msg->type == VFIO_MSG_NONE) {
+ error_printf("vfio_user_recycle - freeing free msg\n");
+ return;
+ }
+
+ /* free msg buffer if no one is waiting to consume the reply */
+ if (msg->type == VFIO_MSG_NOWAIT || msg->type == VFIO_MSG_ASYNC) {
+ g_free(msg->hdr);
+ if (msg->fds != NULL) {
+ g_free(msg->fds);
+ }
+ }
+
+ msg->type = VFIO_MSG_NONE;
+ msg->hdr = NULL;
+ msg->fds = NULL;
+ msg->complete = false;
+ QTAILQ_INSERT_HEAD(&proxy->free, msg, next);
+}
+
+static VFIOUserFDs *vfio_user_getfds(int numfds)
+{
+ VFIOUserFDs *fds = g_malloc0(sizeof(*fds) + (numfds * sizeof(int)));
+
+ fds->fds = (int *)((char *)fds + sizeof(*fds));
+
+ return fds;
+}
+
/*
* Functions only called by iothread
*/
+/*
+ * Process a received message.
+ */
+static void vfio_user_process(VFIOUserProxy *proxy, VFIOUserMsg *msg,
+ bool isreply)
+{
+
+ /*
+ * Replies signal a waiter, if none just check for errors
+ * and free the message buffer.
+ *
+ * Requests get queued for the BH.
+ */
+ if (isreply) {
+ msg->complete = true;
+ if (msg->type == VFIO_MSG_WAIT) {
+ qemu_cond_signal(&msg->cv);
+ } else {
+ if (msg->hdr->flags & VFIO_USER_ERROR) {
+ error_printf("vfio_user_process: error reply on async ");
+ error_printf("request command %x error %s\n",
+ msg->hdr->command,
+ strerror(msg->hdr->error_reply));
+ }
+ /* youngest nowait msg has been ack'd */
+ if (proxy->last_nowait == msg) {
+ proxy->last_nowait = NULL;
+ }
+ vfio_user_recycle(proxy, msg);
+ }
+ } else {
+ QTAILQ_INSERT_TAIL(&proxy->incoming, msg, next);
+ qemu_bh_schedule(proxy->req_bh);
+ }
+}
+
+/*
+ * Complete a partial message read
+ */
+static int vfio_user_complete(VFIOUserProxy *proxy, Error **errp)
+{
+ VFIOUserMsg *msg = proxy->part_recv;
+ size_t msgleft = proxy->recv_left;
+ bool isreply;
+ char *data;
+ int ret;
+
+ data = (char *)msg->hdr + (msg->hdr->size - msgleft);
+ while (msgleft > 0) {
+ ret = qio_channel_read(proxy->ioc, data, msgleft, errp);
+
+ /* error or would block */
+ if (ret <= 0) {
+ /* try for rest on next iternation */
+ if (ret == QIO_CHANNEL_ERR_BLOCK) {
+ proxy->recv_left = msgleft;
+ }
+ return ret;
+ }
+ trace_vfio_user_recv_read(msg->hdr->id, ret);
+
+ msgleft -= ret;
+ data += ret;
+ }
+
+ /*
+ * Read complete message, process it.
+ */
+ proxy->part_recv = NULL;
+ proxy->recv_left = 0;
+ isreply = (msg->hdr->flags & VFIO_USER_TYPE) == VFIO_USER_REPLY;
+ vfio_user_process(proxy, msg, isreply);
+
+ /* return positive value */
+ return 1;
+}
+
+static void vfio_user_recv(void *opaque)
+{
+ VFIOUserProxy *proxy = opaque;
+
+ QEMU_LOCK_GUARD(&proxy->lock);
+
+ if (proxy->state == VFIO_PROXY_CONNECTED) {
+ while (vfio_user_recv_one(proxy) == 0) {
+ ;
+ }
+ }
+}
+
+/*
+ * Receive and process one incoming message.
+ *
+ * For replies, find matching outgoing request and wake any waiters.
+ * For requests, queue in incoming list and run request BH.
+ */
+static int vfio_user_recv_one(VFIOUserProxy *proxy)
+{
+ VFIOUserMsg *msg = NULL;
+ g_autofree int *fdp = NULL;
+ VFIOUserFDs *reqfds;
+ VFIOUserHdr hdr;
+ struct iovec iov = {
+ .iov_base = &hdr,
+ .iov_len = sizeof(hdr),
+ };
+ bool isreply = false;
+ int i, ret;
+ size_t msgleft, numfds = 0;
+ char *data = NULL;
+ char *buf = NULL;
+ Error *local_err = NULL;
+
+ /*
+ * Complete any partial reads
+ */
+ if (proxy->part_recv != NULL) {
+ ret = vfio_user_complete(proxy, &local_err);
+
+ /* still not complete, try later */
+ if (ret == QIO_CHANNEL_ERR_BLOCK) {
+ return ret;
+ }
+
+ if (ret <= 0) {
+ goto fatal;
+ }
+ /* else fall into reading another msg */
+ }
+
+ /*
+ * Read header
+ */
+ ret = qio_channel_readv_full(proxy->ioc, &iov, 1, &fdp, &numfds, 0,
+ &local_err);
+ if (ret == QIO_CHANNEL_ERR_BLOCK) {
+ return ret;
+ }
+
+ /* read error or other side closed connection */
+ if (ret <= 0) {
+ goto fatal;
+ }
+
+ if (ret < sizeof(hdr)) {
+ error_setg(&local_err, "short read of header");
+ goto fatal;
+ }
+
+ /*
+ * Validate header
+ */
+ if (hdr.size < sizeof(VFIOUserHdr)) {
+ error_setg(&local_err, "bad header size");
+ goto fatal;
+ }
+ switch (hdr.flags & VFIO_USER_TYPE) {
+ case VFIO_USER_REQUEST:
+ isreply = false;
+ break;
+ case VFIO_USER_REPLY:
+ isreply = true;
+ break;
+ default:
+ error_setg(&local_err, "unknown message type");
+ goto fatal;
+ }
+ trace_vfio_user_recv_hdr(proxy->sockname, hdr.id, hdr.command, hdr.size,
+ hdr.flags);
+
+ /*
+ * For replies, find the matching pending request.
+ * For requests, reap incoming FDs.
+ */
+ if (isreply) {
+ QTAILQ_FOREACH(msg, &proxy->pending, next) {
+ if (hdr.id == msg->id) {
+ break;
+ }
+ }
+ if (msg == NULL) {
+ error_setg(&local_err, "unexpected reply");
+ goto err;
+ }
+ QTAILQ_REMOVE(&proxy->pending, msg, next);
+
+ /*
+ * Process any received FDs
+ */
+ if (numfds != 0) {
+ if (msg->fds == NULL || msg->fds->recv_fds < numfds) {
+ error_setg(&local_err, "unexpected FDs");
+ goto err;
+ }
+ msg->fds->recv_fds = numfds;
+ memcpy(msg->fds->fds, fdp, numfds * sizeof(int));
+ }
+ } else {
+ if (numfds != 0) {
+ reqfds = vfio_user_getfds(numfds);
+ memcpy(reqfds->fds, fdp, numfds * sizeof(int));
+ } else {
+ reqfds = NULL;
+ }
+ }
+
+ /*
+ * Put the whole message into a single buffer.
+ */
+ if (isreply) {
+ if (hdr.size > msg->rsize) {
+ error_setg(&local_err, "reply larger than recv buffer");
+ goto err;
+ }
+ *msg->hdr = hdr;
+ data = (char *)msg->hdr + sizeof(hdr);
+ } else {
+ buf = g_malloc0(hdr.size);
+ memcpy(buf, &hdr, sizeof(hdr));
+ data = buf + sizeof(hdr);
+ msg = vfio_user_getmsg(proxy, (VFIOUserHdr *)buf, reqfds);
+ msg->type = VFIO_MSG_REQ;
+ }
+
+ /*
+ * Read rest of message.
+ */
+ msgleft = hdr.size - sizeof(hdr);
+ while (msgleft > 0) {
+ ret = qio_channel_read(proxy->ioc, data, msgleft, &local_err);
+
+ /* prepare to complete read on next iternation */
+ if (ret == QIO_CHANNEL_ERR_BLOCK) {
+ proxy->part_recv = msg;
+ proxy->recv_left = msgleft;
+ return ret;
+ }
+
+ if (ret <= 0) {
+ goto fatal;
+ }
+ trace_vfio_user_recv_read(hdr.id, ret);
+
+ msgleft -= ret;
+ data += ret;
+ }
+
+ vfio_user_process(proxy, msg, isreply);
+ return 0;
+
+ /*
+ * fatal means the other side closed or we don't trust the stream
+ * err means this message is corrupt
+ */
+fatal:
+ vfio_user_shutdown(proxy);
+ proxy->state = VFIO_PROXY_ERROR;
+
+ /* set error if server side closed */
+ if (ret == 0) {
+ error_setg(&local_err, "server closed socket");
+ }
+
+err:
+ for (i = 0; i < numfds; i++) {
+ close(fdp[i]);
+ }
+ if (isreply && msg != NULL) {
+ /* force an error to keep sending thread from hanging */
+ vfio_user_set_error(msg->hdr, EINVAL);
+ msg->complete = true;
+ qemu_cond_signal(&msg->cv);
+ }
+ error_prepend(&local_err, "vfio_user_recv_one: ");
+ error_report_err(local_err);
+ return -1;
+}
+
static void vfio_user_cb(void *opaque)
{
VFIOUserProxy *proxy = opaque;
@@ -51,6 +398,53 @@ static void vfio_user_cb(void *opaque)
* Functions called by main or CPU threads
*/
+/*
+ * Process incoming requests.
+ *
+ * The bus-specific callback has the form:
+ * request(opaque, msg)
+ * where 'opaque' was specified in vfio_user_set_handler
+ * and 'msg' is the inbound message.
+ *
+ * The callback is responsible for disposing of the message buffer,
+ * usually by re-using it when calling vfio_send_reply or vfio_send_error,
+ * both of which free their message buffer when the reply is sent.
+ *
+ * If the callback uses a new buffer, it needs to free the old one.
+ */
+static void vfio_user_request(void *opaque)
+{
+ VFIOUserProxy *proxy = opaque;
+ VFIOUserMsgQ new, free;
+ VFIOUserMsg *msg, *m1;
+
+ /* reap all incoming */
+ QTAILQ_INIT(&new);
+ WITH_QEMU_LOCK_GUARD(&proxy->lock) {
+ QTAILQ_FOREACH_SAFE(msg, &proxy->incoming, next, m1) {
+ QTAILQ_REMOVE(&proxy->incoming, msg, next);
+ QTAILQ_INSERT_TAIL(&new, msg, next);
+ }
+ }
+
+ /* process list */
+ QTAILQ_INIT(&free);
+ QTAILQ_FOREACH_SAFE(msg, &new, next, m1) {
+ QTAILQ_REMOVE(&new, msg, next);
+ trace_vfio_user_recv_request(msg->hdr->command);
+ proxy->request(proxy->req_arg, msg);
+ QTAILQ_INSERT_HEAD(&free, msg, next);
+ }
+
+ /* free list */
+ WITH_QEMU_LOCK_GUARD(&proxy->lock) {
+ QTAILQ_FOREACH_SAFE(msg, &free, next, m1) {
+ vfio_user_recycle(proxy, msg);
+ }
+ }
+}
+
+
static QLIST_HEAD(, VFIOUserProxy) vfio_user_sockets =
QLIST_HEAD_INITIALIZER(vfio_user_sockets);
@@ -89,6 +483,7 @@ VFIOUserProxy *vfio_user_connect_dev(SocketAddress *addr, Error **errp)
}
proxy->ctx = iothread_get_aio_context(vfio_user_iothread);
+ proxy->req_bh = qemu_bh_new(vfio_user_request, proxy);
QTAILQ_INIT(&proxy->outgoing);
QTAILQ_INIT(&proxy->incoming);
@@ -99,6 +494,18 @@ VFIOUserProxy *vfio_user_connect_dev(SocketAddress *addr, Error **errp)
return proxy;
}
+void vfio_user_set_handler(VFIODevice *vbasedev,
+ void (*handler)(void *opaque, VFIOUserMsg *msg),
+ void *req_arg)
+{
+ VFIOUserProxy *proxy = vbasedev->proxy;
+
+ proxy->request = handler;
+ proxy->req_arg = req_arg;
+ qio_channel_set_aio_fd_handler(proxy->ioc, proxy->ctx,
+ vfio_user_recv, NULL, NULL, proxy);
+}
+
void vfio_user_disconnect(VFIOUserProxy *proxy)
{
VFIOUserMsg *r1, *r2;
@@ -114,6 +521,8 @@ void vfio_user_disconnect(VFIOUserProxy *proxy)
}
object_unref(OBJECT(proxy->ioc));
proxy->ioc = NULL;
+ qemu_bh_delete(proxy->req_bh);
+ proxy->req_bh = NULL;
proxy->state = VFIO_PROXY_CLOSING;
QTAILQ_FOREACH_SAFE(r1, &proxy->outgoing, next, r2) {
diff --git a/hw/vfio-user/trace-events b/hw/vfio-user/trace-events
new file mode 100644
index 0000000000..89d6c11c4c
--- /dev/null
+++ b/hw/vfio-user/trace-events
@@ -0,0 +1,6 @@
+# See docs/devel/tracing.rst for syntax documentation.
+
+# common.c
+vfio_user_recv_hdr(const char *name, uint16_t id, uint16_t cmd, uint32_t size, uint32_t flags) " (%s) id 0x%x cmd 0x%x size 0x%x flags 0x%x"
+vfio_user_recv_read(uint16_t id, int read) " id 0x%x read 0x%x"
+vfio_user_recv_request(uint16_t cmd) " command 0x%x"
--
2.43.0
On 6/19/25 15:31, John Levon wrote:
> Add the basic implementation for receiving vfio-user messages from the
> control socket.
>
> Originally-by: John Johnson <john.g.johnson@oracle.com>
> Signed-off-by: Elena Ufimtseva <elena.ufimtseva@oracle.com>
> Signed-off-by: Jagannathan Raman <jag.raman@oracle.com>
> Signed-off-by: John Levon <john.levon@nutanix.com>
> ---
> meson.build | 1 +
> hw/vfio-user/protocol.h | 53 +++++
> hw/vfio-user/proxy.h | 11 +
> hw/vfio-user/trace.h | 1 +
> hw/vfio-user/pci.c | 11 +
> hw/vfio-user/proxy.c | 409 ++++++++++++++++++++++++++++++++++++++
> hw/vfio-user/trace-events | 6 +
> 7 files changed, 492 insertions(+)
> create mode 100644 hw/vfio-user/protocol.h
> create mode 100644 hw/vfio-user/trace.h
> create mode 100644 hw/vfio-user/trace-events
>
> diff --git a/meson.build b/meson.build
> index 34729c2a3d..3d2d8c97dc 100644
> --- a/meson.build
> +++ b/meson.build
> @@ -3686,6 +3686,7 @@ if have_system
> 'hw/ufs',
> 'hw/usb',
> 'hw/vfio',
> + 'hw/vfio-user',
> 'hw/virtio',
> 'hw/vmapple',
> 'hw/watchdog',
> diff --git a/hw/vfio-user/protocol.h b/hw/vfio-user/protocol.h
> new file mode 100644
> index 0000000000..4ddfb5f222
> --- /dev/null
> +++ b/hw/vfio-user/protocol.h
> @@ -0,0 +1,53 @@
> +#ifndef VFIO_USER_PROTOCOL_H
> +#define VFIO_USER_PROTOCOL_H
> +
> +/*
> + * vfio protocol over a UNIX socket.
> + *
> + * Copyright © 2018, 2021 Oracle and/or its affiliates.
> + *
> + * Each message has a standard header that describes the command
> + * being sent, which is almost always a VFIO ioctl().
> + *
> + * The header may be followed by command-specific data, such as the
> + * region and offset info for read and write commands.
> + *
> + * SPDX-License-Identifier: GPL-2.0-or-later
> + */
> +
> +typedef struct {
> + uint16_t id;
> + uint16_t command;
> + uint32_t size;
> + uint32_t flags;
> + uint32_t error_reply;
> +} VFIOUserHdr;
> +
> +/* VFIOUserHdr commands */
> +enum vfio_user_command {
> + VFIO_USER_VERSION = 1,
> + VFIO_USER_DMA_MAP = 2,
> + VFIO_USER_DMA_UNMAP = 3,
> + VFIO_USER_DEVICE_GET_INFO = 4,
> + VFIO_USER_DEVICE_GET_REGION_INFO = 5,
> + VFIO_USER_DEVICE_GET_REGION_IO_FDS = 6,
> + VFIO_USER_DEVICE_GET_IRQ_INFO = 7,
> + VFIO_USER_DEVICE_SET_IRQS = 8,
> + VFIO_USER_REGION_READ = 9,
> + VFIO_USER_REGION_WRITE = 10,
> + VFIO_USER_DMA_READ = 11,
> + VFIO_USER_DMA_WRITE = 12,
> + VFIO_USER_DEVICE_RESET = 13,
> + VFIO_USER_DIRTY_PAGES = 14,
> + VFIO_USER_MAX,
> +};
> +
> +/* VFIOUserHdr flags */
> +#define VFIO_USER_REQUEST 0x0
> +#define VFIO_USER_REPLY 0x1
> +#define VFIO_USER_TYPE 0xF
> +
> +#define VFIO_USER_NO_REPLY 0x10
> +#define VFIO_USER_ERROR 0x20
> +
> +#endif /* VFIO_USER_PROTOCOL_H */
> diff --git a/hw/vfio-user/proxy.h b/hw/vfio-user/proxy.h
> index a9bce82239..ff553cad9d 100644
> --- a/hw/vfio-user/proxy.h
> +++ b/hw/vfio-user/proxy.h
> @@ -12,6 +12,9 @@
> #include "io/channel.h"
> #include "io/channel-socket.h"
>
> +#include "qemu/sockets.h"
> +#include "hw/vfio-user/protocol.h"
> +
> typedef struct {
> int send_fds;
> int recv_fds;
> @@ -28,6 +31,7 @@ enum msg_type {
>
> typedef struct VFIOUserMsg {
> QTAILQ_ENTRY(VFIOUserMsg) next;
> + VFIOUserHdr *hdr;
> VFIOUserFDs *fds;
> uint32_t rsize;
> uint32_t id;
> @@ -67,13 +71,20 @@ typedef struct VFIOUserProxy {
> VFIOUserMsgQ incoming;
> VFIOUserMsgQ outgoing;
> VFIOUserMsg *last_nowait;
> + VFIOUserMsg *part_recv;
> + size_t recv_left;
> enum proxy_state state;
> } VFIOUserProxy;
>
> /* VFIOProxy flags */
> #define VFIO_PROXY_CLIENT 0x1
>
> +typedef struct VFIODevice VFIODevice;
> +
> VFIOUserProxy *vfio_user_connect_dev(SocketAddress *addr, Error **errp);
> void vfio_user_disconnect(VFIOUserProxy *proxy);
> +void vfio_user_set_handler(VFIODevice *vbasedev,
> + void (*handler)(void *opaque, VFIOUserMsg *msg),
> + void *reqarg);
>
> #endif /* VFIO_USER_PROXY_H */
> diff --git a/hw/vfio-user/trace.h b/hw/vfio-user/trace.h
> new file mode 100644
> index 0000000000..574b59aa89
> --- /dev/null
> +++ b/hw/vfio-user/trace.h
> @@ -0,0 +1 @@
> +#include "trace/trace-hw_vfio_user.h"
> diff --git a/hw/vfio-user/pci.c b/hw/vfio-user/pci.c
> index 642421e791..bad2829f5c 100644
> --- a/hw/vfio-user/pci.c
> +++ b/hw/vfio-user/pci.c
> @@ -22,6 +22,16 @@ struct VFIOUserPCIDevice {
> SocketAddress *socket;
> };
>
> +/*
> + * Incoming request message callback.
> + *
> + * Runs off main loop, so BQL held.
> + */
> +static void vfio_user_pci_process_req(void *opaque, VFIOUserMsg *msg)
> +{
> +
> +}
> +
> /*
> * Emulated devices don't use host hot reset
> */
> @@ -80,6 +90,7 @@ static void vfio_user_pci_realize(PCIDevice *pdev, Error **errp)
> return;
> }
> vbasedev->proxy = proxy;
> + vfio_user_set_handler(vbasedev, vfio_user_pci_process_req, vdev);
>
> /*
> * vfio-user devices are effectively mdevs (don't use a host iommu).
> diff --git a/hw/vfio-user/proxy.c b/hw/vfio-user/proxy.c
> index bb436c9db9..31e08cbad3 100644
> --- a/hw/vfio-user/proxy.c
> +++ b/hw/vfio-user/proxy.c
> @@ -11,15 +11,32 @@
>
> #include "hw/vfio/vfio-device.h"
> #include "hw/vfio-user/proxy.h"
> +#include "hw/vfio-user/trace.h"
> #include "qapi/error.h"
> #include "qemu/error-report.h"
> #include "qemu/lockable.h"
> +#include "qemu/main-loop.h"
> #include "system/iothread.h"
>
> static IOThread *vfio_user_iothread;
>
> static void vfio_user_shutdown(VFIOUserProxy *proxy);
> +static VFIOUserMsg *vfio_user_getmsg(VFIOUserProxy *proxy, VFIOUserHdr *hdr,
> + VFIOUserFDs *fds);
> +static VFIOUserFDs *vfio_user_getfds(int numfds);
> +static void vfio_user_recycle(VFIOUserProxy *proxy, VFIOUserMsg *msg);
>
> +static void vfio_user_recv(void *opaque);
> +static int vfio_user_recv_one(VFIOUserProxy *proxy);
> +static void vfio_user_cb(void *opaque);
> +
> +static void vfio_user_request(void *opaque);
> +
> +static inline void vfio_user_set_error(VFIOUserHdr *hdr, uint32_t err)
> +{
> + hdr->flags |= VFIO_USER_ERROR;
> + hdr->error_reply = err;
> +}
>
> /*
> * Functions called by main, CPU, or iothread threads
> @@ -32,10 +49,340 @@ static void vfio_user_shutdown(VFIOUserProxy *proxy)
> proxy->ctx, NULL, NULL);
> }
>
> +static VFIOUserMsg *vfio_user_getmsg(VFIOUserProxy *proxy, VFIOUserHdr *hdr,
> + VFIOUserFDs *fds)
> +{
> + VFIOUserMsg *msg;
> +
> + msg = QTAILQ_FIRST(&proxy->free);
> + if (msg != NULL) {
> + QTAILQ_REMOVE(&proxy->free, msg, next);
> + } else {
> + msg = g_malloc0(sizeof(*msg));
> + qemu_cond_init(&msg->cv);
> + }
> +
> + msg->hdr = hdr;
> + msg->fds = fds;
> + return msg;
> +}
> +
> +/*
> + * Recycle a message list entry to the free list.
> + */
> +static void vfio_user_recycle(VFIOUserProxy *proxy, VFIOUserMsg *msg)
> +{
> + if (msg->type == VFIO_MSG_NONE) {
> + error_printf("vfio_user_recycle - freeing free msg\n");
> + return;
> + }
> +
> + /* free msg buffer if no one is waiting to consume the reply */
> + if (msg->type == VFIO_MSG_NOWAIT || msg->type == VFIO_MSG_ASYNC) {
> + g_free(msg->hdr);
> + if (msg->fds != NULL) {
> + g_free(msg->fds);
> + }
> + }
> +
> + msg->type = VFIO_MSG_NONE;
> + msg->hdr = NULL;
> + msg->fds = NULL;
> + msg->complete = false;
> + QTAILQ_INSERT_HEAD(&proxy->free, msg, next);
> +}
> +
> +static VFIOUserFDs *vfio_user_getfds(int numfds)
> +{
> + VFIOUserFDs *fds = g_malloc0(sizeof(*fds) + (numfds * sizeof(int)));
> +
> + fds->fds = (int *)((char *)fds + sizeof(*fds));
> +
> + return fds;
> +}
> +
> /*
> * Functions only called by iothread
> */
>
> +/*
> + * Process a received message.
> + */
> +static void vfio_user_process(VFIOUserProxy *proxy, VFIOUserMsg *msg,
> + bool isreply)
This routine could have an 'Error **errp' parameter and avoid the
error_printf() below. Can you fix that please ?
> +{
> +
> + /*
> + * Replies signal a waiter, if none just check for errors
> + * and free the message buffer.
> + *
> + * Requests get queued for the BH.
> + */
> + if (isreply) {
> + msg->complete = true;
> + if (msg->type == VFIO_MSG_WAIT) {
> + qemu_cond_signal(&msg->cv);
> + } else {
> + if (msg->hdr->flags & VFIO_USER_ERROR) {
> + error_printf("vfio_user_process: error reply on async ");
> + error_printf("request command %x error %s\n",
> + msg->hdr->command,
> + strerror(msg->hdr->error_reply));
> + }
> + /* youngest nowait msg has been ack'd */
> + if (proxy->last_nowait == msg) {
> + proxy->last_nowait = NULL;
> + }
> + vfio_user_recycle(proxy, msg);
> + }
> + } else {
> + QTAILQ_INSERT_TAIL(&proxy->incoming, msg, next);
> + qemu_bh_schedule(proxy->req_bh);
> + }
> +}
> +
> +/*
> + * Complete a partial message read
> + */
> +static int vfio_user_complete(VFIOUserProxy *proxy, Error **errp)
> +{
> + VFIOUserMsg *msg = proxy->part_recv;
> + size_t msgleft = proxy->recv_left;
> + bool isreply;
> + char *data;
> + int ret;
> +
> + data = (char *)msg->hdr + (msg->hdr->size - msgleft);
> + while (msgleft > 0) {
> + ret = qio_channel_read(proxy->ioc, data, msgleft, errp);
> +
> + /* error or would block */
> + if (ret <= 0) {
> + /* try for rest on next iternation */
> + if (ret == QIO_CHANNEL_ERR_BLOCK) {
> + proxy->recv_left = msgleft;
> + }
> + return ret;
> + }
> + trace_vfio_user_recv_read(msg->hdr->id, ret);
> +
> + msgleft -= ret;
> + data += ret;
> + }
> +
> + /*
> + * Read complete message, process it.
> + */
> + proxy->part_recv = NULL;
> + proxy->recv_left = 0;
> + isreply = (msg->hdr->flags & VFIO_USER_TYPE) == VFIO_USER_REPLY;
> + vfio_user_process(proxy, msg, isreply);
> +
> + /* return positive value */
> + return 1;
> +}
> +
> +static void vfio_user_recv(void *opaque)
> +{
> + VFIOUserProxy *proxy = opaque;
> +
> + QEMU_LOCK_GUARD(&proxy->lock);
> +
> + if (proxy->state == VFIO_PROXY_CONNECTED) {
> + while (vfio_user_recv_one(proxy) == 0) {
> + ;> + }
> + }
> +}
> +
> +/*
> + * Receive and process one incoming message.
> + *
> + * For replies, find matching outgoing request and wake any waiters.
> + * For requests, queue in incoming list and run request BH.
> + */
> +static int vfio_user_recv_one(VFIOUserProxy *proxy)
I would add an 'Error **' parameter too ...
> +{
> + VFIOUserMsg *msg = NULL;
> + g_autofree int *fdp = NULL;
> + VFIOUserFDs *reqfds;
> + VFIOUserHdr hdr;
> + struct iovec iov = {
> + .iov_base = &hdr,
> + .iov_len = sizeof(hdr),
> + };
> + bool isreply = false;
> + int i, ret;
> + size_t msgleft, numfds = 0;
> + char *data = NULL;
> + char *buf = NULL;
> + Error *local_err = NULL;
> +
> + /*
> + * Complete any partial reads
> + */
> + if (proxy->part_recv != NULL) {
> + ret = vfio_user_complete(proxy, &local_err);
> +
> + /* still not complete, try later */
> + if (ret == QIO_CHANNEL_ERR_BLOCK) {
> + return ret;
> + }
> +
> + if (ret <= 0) {
> + goto fatal;
> + }
> + /* else fall into reading another msg */
> + }
> +
> + /*
> + * Read header
> + */
> + ret = qio_channel_readv_full(proxy->ioc, &iov, 1, &fdp, &numfds, 0,
> + &local_err);
> + if (ret == QIO_CHANNEL_ERR_BLOCK) {
> + return ret;
> + }
> +
> + /* read error or other side closed connection */
> + if (ret <= 0) {
> + goto fatal;
> + }
> +
> + if (ret < sizeof(hdr)) {
> + error_setg(&local_err, "short read of header");
> + goto fatal;
> + }
> +
> + /*
> + * Validate header
> + */
> + if (hdr.size < sizeof(VFIOUserHdr)) {
> + error_setg(&local_err, "bad header size");
> + goto fatal;
> + }
> + switch (hdr.flags & VFIO_USER_TYPE) {
> + case VFIO_USER_REQUEST:
> + isreply = false;
> + break;
> + case VFIO_USER_REPLY:
> + isreply = true;
> + break;
> + default:
> + error_setg(&local_err, "unknown message type");
> + goto fatal;
> + }
> + trace_vfio_user_recv_hdr(proxy->sockname, hdr.id, hdr.command, hdr.size,
> + hdr.flags);
> +
> + /*
> + * For replies, find the matching pending request.
> + * For requests, reap incoming FDs.
> + */
> + if (isreply) {
> + QTAILQ_FOREACH(msg, &proxy->pending, next) {
> + if (hdr.id == msg->id) {
> + break;
> + }
> + }
> + if (msg == NULL) {
> + error_setg(&local_err, "unexpected reply");
> + goto err;
> + }
> + QTAILQ_REMOVE(&proxy->pending, msg, next);
> +
> + /*
> + * Process any received FDs
> + */
> + if (numfds != 0) {
> + if (msg->fds == NULL || msg->fds->recv_fds < numfds) {
> + error_setg(&local_err, "unexpected FDs");
> + goto err;
> + }
> + msg->fds->recv_fds = numfds;
> + memcpy(msg->fds->fds, fdp, numfds * sizeof(int));
> + }
> + } else {
> + if (numfds != 0) {
> + reqfds = vfio_user_getfds(numfds);
> + memcpy(reqfds->fds, fdp, numfds * sizeof(int));
> + } else {
> + reqfds = NULL;
> + }
> + }
> +
> + /*
> + * Put the whole message into a single buffer.
> + */
> + if (isreply) {
> + if (hdr.size > msg->rsize) {
> + error_setg(&local_err, "reply larger than recv buffer");
> + goto err;
> + }
> + *msg->hdr = hdr;
> + data = (char *)msg->hdr + sizeof(hdr);
> + } else {
> + buf = g_malloc0(hdr.size);
> + memcpy(buf, &hdr, sizeof(hdr));
> + data = buf + sizeof(hdr);
> + msg = vfio_user_getmsg(proxy, (VFIOUserHdr *)buf, reqfds);
> + msg->type = VFIO_MSG_REQ;
> + }
> +
> + /*
> + * Read rest of message.
> + */
> + msgleft = hdr.size - sizeof(hdr);
> + while (msgleft > 0) {
> + ret = qio_channel_read(proxy->ioc, data, msgleft, &local_err);
> +
> + /* prepare to complete read on next iternation */
> + if (ret == QIO_CHANNEL_ERR_BLOCK) {
> + proxy->part_recv = msg;
> + proxy->recv_left = msgleft;
> + return ret;
> + }
> +
> + if (ret <= 0) {
> + goto fatal;
> + }
> + trace_vfio_user_recv_read(hdr.id, ret);
> +
> + msgleft -= ret;
> + data += ret;
> + }
> +
> + vfio_user_process(proxy, msg, isreply);
> + return 0;
> +
> + /*
> + * fatal means the other side closed or we don't trust the stream
> + * err means this message is corrupt
> + */
> +fatal:
> + vfio_user_shutdown(proxy);
> + proxy->state = VFIO_PROXY_ERROR;
> +
> + /* set error if server side closed */
> + if (ret == 0) {
> + error_setg(&local_err, "server closed socket");
> + }
> +
> +err:
> + for (i = 0; i < numfds; i++) {
> + close(fdp[i]);
> + }
> + if (isreply && msg != NULL) {
> + /* force an error to keep sending thread from hanging */
> + vfio_user_set_error(msg->hdr, EINVAL);
> + msg->complete = true;
> + qemu_cond_signal(&msg->cv);
> + }
> + error_prepend(&local_err, "vfio_user_recv_one: ");
> + error_report_err(local_err);
... and let the caller vfio_user_recv() do the error reporting. Minor.
Thanks,
C.
> + return -1;
> +}
> +
> static void vfio_user_cb(void *opaque)
> {
> VFIOUserProxy *proxy = opaque;
> @@ -51,6 +398,53 @@ static void vfio_user_cb(void *opaque)
> * Functions called by main or CPU threads
> */
>
> +/*
> + * Process incoming requests.
> + *
> + * The bus-specific callback has the form:
> + * request(opaque, msg)
> + * where 'opaque' was specified in vfio_user_set_handler
> + * and 'msg' is the inbound message.
> + *
> + * The callback is responsible for disposing of the message buffer,
> + * usually by re-using it when calling vfio_send_reply or vfio_send_error,
> + * both of which free their message buffer when the reply is sent.
> + *
> + * If the callback uses a new buffer, it needs to free the old one.
> + */
> +static void vfio_user_request(void *opaque)
> +{
> + VFIOUserProxy *proxy = opaque;
> + VFIOUserMsgQ new, free;
> + VFIOUserMsg *msg, *m1;
> +
> + /* reap all incoming */
> + QTAILQ_INIT(&new);
> + WITH_QEMU_LOCK_GUARD(&proxy->lock) {
> + QTAILQ_FOREACH_SAFE(msg, &proxy->incoming, next, m1) {
> + QTAILQ_REMOVE(&proxy->incoming, msg, next);
> + QTAILQ_INSERT_TAIL(&new, msg, next);
> + }
> + }
> +
> + /* process list */
> + QTAILQ_INIT(&free);
> + QTAILQ_FOREACH_SAFE(msg, &new, next, m1) {
> + QTAILQ_REMOVE(&new, msg, next);
> + trace_vfio_user_recv_request(msg->hdr->command);
> + proxy->request(proxy->req_arg, msg);
> + QTAILQ_INSERT_HEAD(&free, msg, next);
> + }
> +
> + /* free list */
> + WITH_QEMU_LOCK_GUARD(&proxy->lock) {
> + QTAILQ_FOREACH_SAFE(msg, &free, next, m1) {
> + vfio_user_recycle(proxy, msg);
> + }
> + }
> +}
> +
> +
> static QLIST_HEAD(, VFIOUserProxy) vfio_user_sockets =
> QLIST_HEAD_INITIALIZER(vfio_user_sockets);
>
> @@ -89,6 +483,7 @@ VFIOUserProxy *vfio_user_connect_dev(SocketAddress *addr, Error **errp)
> }
>
> proxy->ctx = iothread_get_aio_context(vfio_user_iothread);
> + proxy->req_bh = qemu_bh_new(vfio_user_request, proxy);
>
> QTAILQ_INIT(&proxy->outgoing);
> QTAILQ_INIT(&proxy->incoming);
> @@ -99,6 +494,18 @@ VFIOUserProxy *vfio_user_connect_dev(SocketAddress *addr, Error **errp)
> return proxy;
> }
>
> +void vfio_user_set_handler(VFIODevice *vbasedev,
> + void (*handler)(void *opaque, VFIOUserMsg *msg),
> + void *req_arg)
> +{
> + VFIOUserProxy *proxy = vbasedev->proxy;
> +
> + proxy->request = handler;
> + proxy->req_arg = req_arg;
> + qio_channel_set_aio_fd_handler(proxy->ioc, proxy->ctx,
> + vfio_user_recv, NULL, NULL, proxy);
> +}
> +
> void vfio_user_disconnect(VFIOUserProxy *proxy)
> {
> VFIOUserMsg *r1, *r2;
> @@ -114,6 +521,8 @@ void vfio_user_disconnect(VFIOUserProxy *proxy)
> }
> object_unref(OBJECT(proxy->ioc));
> proxy->ioc = NULL;
> + qemu_bh_delete(proxy->req_bh);
> + proxy->req_bh = NULL;
>
> proxy->state = VFIO_PROXY_CLOSING;
> QTAILQ_FOREACH_SAFE(r1, &proxy->outgoing, next, r2) {
> diff --git a/hw/vfio-user/trace-events b/hw/vfio-user/trace-events
> new file mode 100644
> index 0000000000..89d6c11c4c
> --- /dev/null
> +++ b/hw/vfio-user/trace-events
> @@ -0,0 +1,6 @@
> +# See docs/devel/tracing.rst for syntax documentation.
> +
> +# common.c
> +vfio_user_recv_hdr(const char *name, uint16_t id, uint16_t cmd, uint32_t size, uint32_t flags) " (%s) id 0x%x cmd 0x%x size 0x%x flags 0x%x"
> +vfio_user_recv_read(uint16_t id, int read) " id 0x%x read 0x%x"
> +vfio_user_recv_request(uint16_t cmd) " command 0x%x"
On Wed, Jun 25, 2025 at 10:02:50AM +0200, Cédric Le Goater wrote: > > +/* > > + * Process a received message. > > + */ > > +static void vfio_user_process(VFIOUserProxy *proxy, VFIOUserMsg *msg, > > + bool isreply) > > This routine could have an 'Error **errp' parameter and avoid the > error_printf() below. Can you fix that please ? I could, but I think the current code makes more sense actually. vfio_user_process() handles a single message. If it's a reply with an error, we want to report it, but we don't want to tear down the device altogether. So it wouldn't make much sense for this particular routine to pass the error back to its caller? > > > + * For replies, find matching outgoing request and wake any waiters. > > + * For requests, queue in incoming list and run request BH. > > + */ > > +static int vfio_user_recv_one(VFIOUserProxy *proxy) > > I would add an 'Error **' parameter too ... This one I can do though regards john
© 2016 - 2025 Red Hat, Inc.