[RFC 09/11] aio-posix: add aio_add_sqe() API for user-defined io_uring requests

Stefan Hajnoczi posted 11 patches 5 months, 2 weeks ago
Maintainers: Kevin Wolf <kwolf@redhat.com>, Hanna Reitz <hreitz@redhat.com>, Aarushi Mehta <mehta.aaru20@gmail.com>, Julia Suvorova <jusual@redhat.com>, Stefan Hajnoczi <stefanha@redhat.com>, Stefano Garzarella <sgarzare@redhat.com>, Fam Zheng <fam@euphon.net>, Paolo Bonzini <pbonzini@redhat.com>, "Marc-André Lureau" <marcandre.lureau@redhat.com>, "Daniel P. Berrangé" <berrange@redhat.com>, "Philippe Mathieu-Daudé" <philmd@linaro.org>, Stefan Weil <sw@weilnetz.de>
There is a newer version of this series
[RFC 09/11] aio-posix: add aio_add_sqe() API for user-defined io_uring requests
Posted by Stefan Hajnoczi 5 months, 2 weeks ago
Introduce the aio_add_sqe() API for submitting io_uring requests in the
current AioContext. This allows other components in QEMU, like the block
layer, to take advantage of io_uring features without creating their own
io_uring context.

This API supports nested event loops just like file descriptor
monitoring and BHs do. This comes at a complexity cost: a BH is required
to dispatch CQE callbacks and they are placed on a list so that a nested
event loop can invoke its parent's pending CQE callbacks. If you're
wondering why CqeHandler exists instead of just a callback function
pointer, this is why.

Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
---
 include/block/aio.h   |  82 ++++++++++++++++++++++++
 util/aio-posix.h      |   1 +
 util/aio-posix.c      |   9 +++
 util/fdmon-io_uring.c | 145 +++++++++++++++++++++++++++++++-----------
 4 files changed, 200 insertions(+), 37 deletions(-)

diff --git a/include/block/aio.h b/include/block/aio.h
index d919d7c8f4..95beef28c3 100644
--- a/include/block/aio.h
+++ b/include/block/aio.h
@@ -61,6 +61,27 @@ typedef struct LuringState LuringState;
 /* Is polling disabled? */
 bool aio_poll_disabled(AioContext *ctx);
 
+#ifdef CONFIG_LINUX_IO_URING
+/*
+ * Each io_uring request must have a unique CqeHandler that processes the cqe.
+ * The lifetime of a CqeHandler must be at least from aio_add_sqe() until
+ * ->cb() invocation.
+ */
+typedef struct CqeHandler CqeHandler;
+struct CqeHandler {
+    /* Called by the AioContext when the request has completed */
+    void (*cb)(CqeHandler *handler);
+
+    /* Used internally, do not access this */
+    QSIMPLEQ_ENTRY(CqeHandler) next;
+
+    /* This field is filled in before ->cb() is called */
+    struct io_uring_cqe cqe;
+};
+
+typedef QSIMPLEQ_HEAD(, CqeHandler) CqeHandlerSimpleQ;
+#endif /* CONFIG_LINUX_IO_URING */
+
 /* Callbacks for file descriptor monitoring implementations */
 typedef struct {
     /*
@@ -138,6 +159,27 @@ typedef struct {
      * Called with list_lock incremented.
      */
     void (*gsource_dispatch)(AioContext *ctx, AioHandlerList *ready_list);
+
+#ifdef CONFIG_LINUX_IO_URING
+    /**
+     * aio_add_sqe: Add an io_uring sqe for submission.
+     * @prep_sqe: invoked with an sqe that should be prepared for submission
+     * @opaque: user-defined argument to @prep_sqe()
+     * @cqe_handler: the unique cqe handler associated with this request
+     *
+     * The caller's @prep_sqe() function is invoked to fill in the details of
+     * the sqe. Do not call io_uring_sqe_set_data() on this sqe.
+     *
+     * The kernel may see the sqe as soon as @pre_sqe() returns or it may take
+     * until the next event loop iteration.
+     *
+     * This function is called from the current AioContext and is not
+     * thread-safe.
+     */
+    void (*add_sqe)(AioContext *ctx,
+                    void (*prep_sqe)(struct io_uring_sqe *sqe, void *opaque),
+                    void *opaque, CqeHandler *cqe_handler);
+#endif /* CONFIG_LINUX_IO_URING */
 } FDMonOps;
 
 /*
@@ -255,6 +297,10 @@ struct AioContext {
     struct io_uring fdmon_io_uring;
     AioHandlerSList submit_list;
     gpointer io_uring_fd_tag;
+
+    /* Pending callback state for cqe handlers */
+    CqeHandlerSimpleQ cqe_handler_ready_list;
+    QEMUBH *cqe_handler_bh;
 #endif
 
     /* TimerLists for calling timers - one per clock type.  Has its own
@@ -761,4 +807,40 @@ void aio_context_set_aio_params(AioContext *ctx, int64_t max_batch);
  */
 void aio_context_set_thread_pool_params(AioContext *ctx, int64_t min,
                                         int64_t max, Error **errp);
+
+#ifdef CONFIG_LINUX_IO_URING
+/**
+ * aio_has_io_uring: Return whether io_uring is available.
+ *
+ * io_uring is either available in all AioContexts or in none, so this only
+ * needs to be called once from within any thread's AioContext.
+ */
+static inline bool aio_has_io_uring(void)
+{
+    AioContext *ctx = qemu_get_current_aio_context();
+    return ctx->fdmon_ops->add_sqe;
+}
+
+/**
+ * aio_add_sqe: Add an io_uring sqe for submission.
+ * @prep_sqe: invoked with an sqe that should be prepared for submission
+ * @opaque: user-defined argument to @prep_sqe()
+ * @cqe_handler: the unique cqe handler associated with this request
+ *
+ * The caller's @prep_sqe() function is invoked to fill in the details of the
+ * sqe. Do not call io_uring_sqe_set_data() on this sqe.
+ *
+ * The sqe is submitted by the current AioContext. The kernel may see the sqe
+ * as soon as @pre_sqe() returns or it may take until the next event loop
+ * iteration.
+ *
+ * When the AioContext is destroyed, pending sqes are ignored and their
+ * CqeHandlers are not invoked.
+ *
+ * This function must be called only when aio_has_io_uring() returns true.
+ */
+void aio_add_sqe(void (*prep_sqe)(struct io_uring_sqe *sqe, void *opaque),
+                 void *opaque, CqeHandler *cqe_handler);
+#endif /* CONFIG_LINUX_IO_URING */
+
 #endif
diff --git a/util/aio-posix.h b/util/aio-posix.h
index 6f9d97d866..57ef801a5f 100644
--- a/util/aio-posix.h
+++ b/util/aio-posix.h
@@ -36,6 +36,7 @@ struct AioHandler {
 #ifdef CONFIG_LINUX_IO_URING
     QSLIST_ENTRY(AioHandler) node_submitted;
     unsigned flags; /* see fdmon-io_uring.c */
+    CqeHandler cqe_handler;
 #endif
     int64_t poll_idle_timeout; /* when to stop userspace polling */
     bool poll_ready; /* has polling detected an event? */
diff --git a/util/aio-posix.c b/util/aio-posix.c
index 44b3df61f9..89bb215a2f 100644
--- a/util/aio-posix.c
+++ b/util/aio-posix.c
@@ -790,3 +790,12 @@ void aio_context_set_aio_params(AioContext *ctx, int64_t max_batch)
 
     aio_notify(ctx);
 }
+
+#ifdef CONFIG_LINUX_IO_URING
+void aio_add_sqe(void (*prep_sqe)(struct io_uring_sqe *sqe, void *opaque),
+                 void *opaque, CqeHandler *cqe_handler)
+{
+    AioContext *ctx = qemu_get_current_aio_context();
+    ctx->fdmon_ops->add_sqe(ctx, prep_sqe, opaque, cqe_handler);
+}
+#endif /* CONFIG_LINUX_IO_URING */
diff --git a/util/fdmon-io_uring.c b/util/fdmon-io_uring.c
index ef1a866a03..3a49d6a20a 100644
--- a/util/fdmon-io_uring.c
+++ b/util/fdmon-io_uring.c
@@ -75,8 +75,8 @@ static inline int pfd_events_from_poll(int poll_events)
 }
 
 /*
- * Returns an sqe for submitting a request.  Only be called within
- * fdmon_io_uring_wait().
+ * Returns an sqe for submitting a request. Only called from the AioContext
+ * thread.
  */
 static struct io_uring_sqe *get_sqe(AioContext *ctx)
 {
@@ -166,23 +166,43 @@ static void fdmon_io_uring_update(AioContext *ctx,
     }
 }
 
+static void fdmon_io_uring_add_sqe(AioContext *ctx,
+        void (*prep_sqe)(struct io_uring_sqe *sqe, void *opaque),
+        void *opaque, CqeHandler *cqe_handler)
+{
+    struct io_uring_sqe *sqe = get_sqe(ctx);
+
+    prep_sqe(sqe, opaque);
+    io_uring_sqe_set_data(sqe, cqe_handler);
+}
+
+static void fdmon_special_cqe_handler(CqeHandler *cqe_handler)
+{
+    /*
+     * This is an empty function that is never called. It is used as a function
+     * pointer to distinguish it from ordinary cqe handlers.
+     */
+}
+
 static void add_poll_multishot_sqe(AioContext *ctx, AioHandler *node)
 {
     struct io_uring_sqe *sqe = get_sqe(ctx);
     int events = poll_events_from_pfd(node->pfd.events);
 
     io_uring_prep_poll_multishot(sqe, node->pfd.fd, events);
-    io_uring_sqe_set_data(sqe, node);
+    node->cqe_handler.cb = fdmon_special_cqe_handler;
+    io_uring_sqe_set_data(sqe, &node->cqe_handler);
 }
 
 static void add_poll_remove_sqe(AioContext *ctx, AioHandler *node)
 {
     struct io_uring_sqe *sqe = get_sqe(ctx);
+    CqeHandler *cqe_handler = &node->cqe_handler;
 
 #ifdef LIBURING_HAVE_DATA64
-    io_uring_prep_poll_remove(sqe, (uintptr_t)node);
+    io_uring_prep_poll_remove(sqe, (uintptr_t)cqe_handler);
 #else
-    io_uring_prep_poll_remove(sqe, node);
+    io_uring_prep_poll_remove(sqe, cqe_handler);
 #endif
     io_uring_sqe_set_data(sqe, NULL);
 }
@@ -221,20 +241,12 @@ static void fill_sq_ring(AioContext *ctx)
     }
 }
 
-/* Returns true if a handler became ready */
-static bool process_cqe(AioContext *ctx,
-                        AioHandlerList *ready_list,
-                        struct io_uring_cqe *cqe)
+static bool process_cqe_aio_handler(AioContext *ctx,
+                                    AioHandlerList *ready_list,
+                                    AioHandler *node,
+                                    struct io_uring_cqe *cqe)
 {
-    AioHandler *node = io_uring_cqe_get_data(cqe);
-    unsigned flags;
-
-    /* poll_timeout and poll_remove have a zero user_data field */
-    if (!node) {
-        return false;
-    }
-
-    flags = qatomic_read(&node->flags);
+    unsigned flags = qatomic_read(&node->flags);
 
     /*
      * poll_multishot cancelled by poll_remove? Or completed early because fd
@@ -261,6 +273,56 @@ static bool process_cqe(AioContext *ctx,
     return true;
 }
 
+/* Process CqeHandlers from the ready list */
+static void cqe_handler_bh(void *opaque)
+{
+    AioContext *ctx = opaque;
+    CqeHandlerSimpleQ *ready_list = &ctx->cqe_handler_ready_list;
+
+    /*
+     * If cqe_handler->cb() calls aio_poll() it must continue processing
+     * ready_list. Schedule a BH so the inner event loop calls us again.
+     */
+    qemu_bh_schedule(ctx->cqe_handler_bh);
+
+    while (!QSIMPLEQ_EMPTY(ready_list)) {
+        CqeHandler *cqe_handler = QSIMPLEQ_FIRST(ready_list);
+
+        QSIMPLEQ_REMOVE_HEAD(ready_list, next);
+
+        cqe_handler->cb(cqe_handler);
+    }
+
+    qemu_bh_cancel(ctx->cqe_handler_bh);
+}
+
+/* Returns true if a handler became ready */
+static bool process_cqe(AioContext *ctx,
+                        AioHandlerList *ready_list,
+                        struct io_uring_cqe *cqe)
+{
+    CqeHandler *cqe_handler = io_uring_cqe_get_data(cqe);
+
+    /* poll_timeout and poll_remove have a zero user_data field */
+    if (!cqe_handler) {
+        return false;
+    }
+
+    /*
+     * Special handling for AioHandler cqes. They need ready_list and have a
+     * return value.
+     */
+    if (cqe_handler->cb == fdmon_special_cqe_handler) {
+        AioHandler *node = container_of(cqe_handler, AioHandler, cqe_handler);
+        return process_cqe_aio_handler(ctx, ready_list, node, cqe);
+    }
+
+    cqe_handler->cqe = *cqe;
+    QSIMPLEQ_INSERT_TAIL(&ctx->cqe_handler_ready_list, cqe_handler, next);
+    qemu_bh_schedule(ctx->cqe_handler_bh);
+    return false;
+}
+
 static int process_cq_ring(AioContext *ctx, AioHandlerList *ready_list)
 {
     struct io_uring *ring = &ctx->fdmon_io_uring;
@@ -360,6 +422,7 @@ static const FDMonOps fdmon_io_uring_ops = {
     .gsource_prepare = fdmon_io_uring_gsource_prepare,
     .gsource_check = fdmon_io_uring_gsource_check,
     .gsource_dispatch = fdmon_io_uring_gsource_dispatch,
+    .add_sqe = fdmon_io_uring_add_sqe,
 };
 
 void fdmon_io_uring_setup(AioContext *ctx, Error **errp)
@@ -375,6 +438,8 @@ void fdmon_io_uring_setup(AioContext *ctx, Error **errp)
     }
 
     QSLIST_INIT(&ctx->submit_list);
+    QSIMPLEQ_INIT(&ctx->cqe_handler_ready_list);
+    ctx->cqe_handler_bh = aio_bh_new(ctx, cqe_handler_bh, ctx);
     ctx->fdmon_ops = &fdmon_io_uring_ops;
     ctx->io_uring_fd_tag = g_source_add_unix_fd(&ctx->source,
             ctx->fdmon_io_uring.ring_fd, G_IO_IN);
@@ -382,30 +447,36 @@ void fdmon_io_uring_setup(AioContext *ctx, Error **errp)
 
 void fdmon_io_uring_destroy(AioContext *ctx)
 {
-    if (ctx->fdmon_ops == &fdmon_io_uring_ops) {
-        AioHandler *node;
+    AioHandler *node;
 
-        io_uring_queue_exit(&ctx->fdmon_io_uring);
+    if (ctx->fdmon_ops != &fdmon_io_uring_ops) {
+        return;
+    }
 
-        /* Move handlers due to be removed onto the deleted list */
-        while ((node = QSLIST_FIRST_RCU(&ctx->submit_list))) {
-            unsigned flags = qatomic_fetch_and(&node->flags,
-                    ~(FDMON_IO_URING_PENDING |
-                      FDMON_IO_URING_ADD |
-                      FDMON_IO_URING_REMOVE));
+    io_uring_queue_exit(&ctx->fdmon_io_uring);
 
-            if (flags & FDMON_IO_URING_REMOVE) {
-                QLIST_INSERT_HEAD_RCU(&ctx->deleted_aio_handlers, node, node_deleted);
-            }
+    /* Move handlers due to be removed onto the deleted list */
+    while ((node = QSLIST_FIRST_RCU(&ctx->submit_list))) {
+        unsigned flags = qatomic_fetch_and(&node->flags,
+                ~(FDMON_IO_URING_PENDING |
+                  FDMON_IO_URING_ADD |
+                  FDMON_IO_URING_REMOVE));
 
-            QSLIST_REMOVE_HEAD_RCU(&ctx->submit_list, node_submitted);
+        if (flags & FDMON_IO_URING_REMOVE) {
+            QLIST_INSERT_HEAD_RCU(&ctx->deleted_aio_handlers,
+                                  node, node_deleted);
         }
 
-        g_source_remove_unix_fd(&ctx->source, ctx->io_uring_fd_tag);
-        ctx->io_uring_fd_tag = NULL;
-
-        qemu_lockcnt_lock(&ctx->list_lock);
-        fdmon_poll_downgrade(ctx);
-        qemu_lockcnt_unlock(&ctx->list_lock);
+        QSLIST_REMOVE_HEAD_RCU(&ctx->submit_list, node_submitted);
     }
+
+    g_source_remove_unix_fd(&ctx->source, ctx->io_uring_fd_tag);
+    ctx->io_uring_fd_tag = NULL;
+
+    assert(QSIMPLEQ_EMPTY(&ctx->cqe_handler_ready_list));
+    qemu_bh_delete(ctx->cqe_handler_bh);
+
+    qemu_lockcnt_lock(&ctx->list_lock);
+    fdmon_poll_downgrade(ctx);
+    qemu_lockcnt_unlock(&ctx->list_lock);
 }
-- 
2.49.0
Re: [RFC 09/11] aio-posix: add aio_add_sqe() API for user-defined io_uring requests
Posted by Eric Blake 5 months, 2 weeks ago
On Wed, May 28, 2025 at 03:09:14PM -0400, Stefan Hajnoczi wrote:
> Introduce the aio_add_sqe() API for submitting io_uring requests in the
> current AioContext. This allows other components in QEMU, like the block
> layer, to take advantage of io_uring features without creating their own
> io_uring context.
> 
> This API supports nested event loops just like file descriptor
> monitoring and BHs do. This comes at a complexity cost: a BH is required
> to dispatch CQE callbacks and they are placed on a list so that a nested
> event loop can invoke its parent's pending CQE callbacks. If you're
> wondering why CqeHandler exists instead of just a callback function
> pointer, this is why.
> 
> Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
> ---

Large patch.  I found a couple of nits, but the overall design looks
sound.

Reviewed-by: Eric Blake <eblake@redhat.com>

>  include/block/aio.h   |  82 ++++++++++++++++++++++++
>  util/aio-posix.h      |   1 +
>  util/aio-posix.c      |   9 +++
>  util/fdmon-io_uring.c | 145 +++++++++++++++++++++++++++++++-----------
>  4 files changed, 200 insertions(+), 37 deletions(-)
> 
> diff --git a/include/block/aio.h b/include/block/aio.h
> index d919d7c8f4..95beef28c3 100644
> --- a/include/block/aio.h
> +++ b/include/block/aio.h
> @@ -61,6 +61,27 @@ typedef struct LuringState LuringState;
>  /* Is polling disabled? */
>  bool aio_poll_disabled(AioContext *ctx);
>  
> +#ifdef CONFIG_LINUX_IO_URING
> +/*
> + * Each io_uring request must have a unique CqeHandler that processes the cqe.
> + * The lifetime of a CqeHandler must be at least from aio_add_sqe() until
> + * ->cb() invocation.
> + */
> +typedef struct CqeHandler CqeHandler;
> +struct CqeHandler {
> +    /* Called by the AioContext when the request has completed */
> +    void (*cb)(CqeHandler *handler);

I see an opaque callback pointer in prep_cqe below, but not one here.
Is that because callers can write their own struct that includes a
CqeHandler as its first member, if more state is needed?

> +
> +    /* Used internally, do not access this */
> +    QSIMPLEQ_ENTRY(CqeHandler) next;
> +
> +    /* This field is filled in before ->cb() is called */
> +    struct io_uring_cqe cqe;
> +};
> +
> +typedef QSIMPLEQ_HEAD(, CqeHandler) CqeHandlerSimpleQ;
> +#endif /* CONFIG_LINUX_IO_URING */
> +
>  /* Callbacks for file descriptor monitoring implementations */
>  typedef struct {
>      /*
> @@ -138,6 +159,27 @@ typedef struct {
>       * Called with list_lock incremented.
>       */
>      void (*gsource_dispatch)(AioContext *ctx, AioHandlerList *ready_list);
> +
> +#ifdef CONFIG_LINUX_IO_URING
> +    /**
> +     * aio_add_sqe: Add an io_uring sqe for submission.
> +     * @prep_sqe: invoked with an sqe that should be prepared for submission
> +     * @opaque: user-defined argument to @prep_sqe()
> +     * @cqe_handler: the unique cqe handler associated with this request
> +     *
> +     * The caller's @prep_sqe() function is invoked to fill in the details of
> +     * the sqe. Do not call io_uring_sqe_set_data() on this sqe.
> +     *
> +     * The kernel may see the sqe as soon as @pre_sqe() returns or it may take

prep_sqe

> +     * until the next event loop iteration.
> +     *
> +     * This function is called from the current AioContext and is not
> +     * thread-safe.
> +     */
> +    void (*add_sqe)(AioContext *ctx,
> +                    void (*prep_sqe)(struct io_uring_sqe *sqe, void *opaque),
> +                    void *opaque, CqeHandler *cqe_handler);
> +#endif /* CONFIG_LINUX_IO_URING */
>  } FDMonOps;
>  
>  /*
> @@ -255,6 +297,10 @@ struct AioContext {
>      struct io_uring fdmon_io_uring;
>      AioHandlerSList submit_list;
>      gpointer io_uring_fd_tag;
> +
> +    /* Pending callback state for cqe handlers */
> +    CqeHandlerSimpleQ cqe_handler_ready_list;
> +    QEMUBH *cqe_handler_bh;
>  #endif

While here, is it worth adding a comment to state which matching #if
it ends (similar to what you did above in FDMonOps add_sqe)?

>  
>      /* TimerLists for calling timers - one per clock type.  Has its own
> @@ -761,4 +807,40 @@ void aio_context_set_aio_params(AioContext *ctx, int64_t max_batch);
>   */
>  void aio_context_set_thread_pool_params(AioContext *ctx, int64_t min,
>                                          int64_t max, Error **errp);
> +
> +#ifdef CONFIG_LINUX_IO_URING
> +/**
> + * aio_has_io_uring: Return whether io_uring is available.
> + *
> + * io_uring is either available in all AioContexts or in none, so this only
> + * needs to be called once from within any thread's AioContext.
> + */
> +static inline bool aio_has_io_uring(void)
> +{
> +    AioContext *ctx = qemu_get_current_aio_context();
> +    return ctx->fdmon_ops->add_sqe;
> +}
> +
> +/**
> + * aio_add_sqe: Add an io_uring sqe for submission.
> + * @prep_sqe: invoked with an sqe that should be prepared for submission
> + * @opaque: user-defined argument to @prep_sqe()
> + * @cqe_handler: the unique cqe handler associated with this request
> + *
> + * The caller's @prep_sqe() function is invoked to fill in the details of the
> + * sqe. Do not call io_uring_sqe_set_data() on this sqe.
> + *
> + * The sqe is submitted by the current AioContext. The kernel may see the sqe
> + * as soon as @pre_sqe() returns or it may take until the next event loop

prep_sqe

> + * iteration.
> + *
> + * When the AioContext is destroyed, pending sqes are ignored and their
> + * CqeHandlers are not invoked.
> + *
> + * This function must be called only when aio_has_io_uring() returns true.
> + */
> +void aio_add_sqe(void (*prep_sqe)(struct io_uring_sqe *sqe, void *opaque),
> +                 void *opaque, CqeHandler *cqe_handler);
> +#endif /* CONFIG_LINUX_IO_URING */
> +
>  #endif
> diff --git a/util/aio-posix.h b/util/aio-posix.h
> index 6f9d97d866..57ef801a5f 100644
> --- a/util/aio-posix.h
> +++ b/util/aio-posix.h
> @@ -36,6 +36,7 @@ struct AioHandler {
>  #ifdef CONFIG_LINUX_IO_URING
>      QSLIST_ENTRY(AioHandler) node_submitted;
>      unsigned flags; /* see fdmon-io_uring.c */
> +    CqeHandler cqe_handler;
>  #endif
>      int64_t poll_idle_timeout; /* when to stop userspace polling */
>      bool poll_ready; /* has polling detected an event? */
> diff --git a/util/aio-posix.c b/util/aio-posix.c
> index 44b3df61f9..89bb215a2f 100644
> --- a/util/aio-posix.c
> +++ b/util/aio-posix.c
> @@ -790,3 +790,12 @@ void aio_context_set_aio_params(AioContext *ctx, int64_t max_batch)
>  
>      aio_notify(ctx);
>  }
> +
> +#ifdef CONFIG_LINUX_IO_URING
> +void aio_add_sqe(void (*prep_sqe)(struct io_uring_sqe *sqe, void *opaque),
> +                 void *opaque, CqeHandler *cqe_handler)
> +{
> +    AioContext *ctx = qemu_get_current_aio_context();
> +    ctx->fdmon_ops->add_sqe(ctx, prep_sqe, opaque, cqe_handler);
> +}
> +#endif /* CONFIG_LINUX_IO_URING */
> diff --git a/util/fdmon-io_uring.c b/util/fdmon-io_uring.c
> index ef1a866a03..3a49d6a20a 100644
> --- a/util/fdmon-io_uring.c
> +++ b/util/fdmon-io_uring.c
> @@ -75,8 +75,8 @@ static inline int pfd_events_from_poll(int poll_events)
>  }
>  
>  /*
> - * Returns an sqe for submitting a request.  Only be called within
> - * fdmon_io_uring_wait().
> + * Returns an sqe for submitting a request. Only called from the AioContext
> + * thread.
>   */
>  static struct io_uring_sqe *get_sqe(AioContext *ctx)
>  {
> @@ -166,23 +166,43 @@ static void fdmon_io_uring_update(AioContext *ctx,
>      }
>  }
>  
> +static void fdmon_io_uring_add_sqe(AioContext *ctx,
> +        void (*prep_sqe)(struct io_uring_sqe *sqe, void *opaque),
> +        void *opaque, CqeHandler *cqe_handler)
> +{
> +    struct io_uring_sqe *sqe = get_sqe(ctx);
> +
> +    prep_sqe(sqe, opaque);
> +    io_uring_sqe_set_data(sqe, cqe_handler);
> +}
> +
> +static void fdmon_special_cqe_handler(CqeHandler *cqe_handler)
> +{
> +    /*
> +     * This is an empty function that is never called. It is used as a function
> +     * pointer to distinguish it from ordinary cqe handlers.
> +     */
> +}
> +
>  static void add_poll_multishot_sqe(AioContext *ctx, AioHandler *node)
>  {
>      struct io_uring_sqe *sqe = get_sqe(ctx);
>      int events = poll_events_from_pfd(node->pfd.events);
>  
>      io_uring_prep_poll_multishot(sqe, node->pfd.fd, events);
> -    io_uring_sqe_set_data(sqe, node);
> +    node->cqe_handler.cb = fdmon_special_cqe_handler;
> +    io_uring_sqe_set_data(sqe, &node->cqe_handler);
>  }
>  
>  static void add_poll_remove_sqe(AioContext *ctx, AioHandler *node)
>  {
>      struct io_uring_sqe *sqe = get_sqe(ctx);
> +    CqeHandler *cqe_handler = &node->cqe_handler;
>  
>  #ifdef LIBURING_HAVE_DATA64
> -    io_uring_prep_poll_remove(sqe, (uintptr_t)node);
> +    io_uring_prep_poll_remove(sqe, (uintptr_t)cqe_handler);
>  #else
> -    io_uring_prep_poll_remove(sqe, node);
> +    io_uring_prep_poll_remove(sqe, cqe_handler);
>  #endif
>      io_uring_sqe_set_data(sqe, NULL);
>  }
> @@ -221,20 +241,12 @@ static void fill_sq_ring(AioContext *ctx)
>      }
>  }
>  
> -/* Returns true if a handler became ready */
> -static bool process_cqe(AioContext *ctx,
> -                        AioHandlerList *ready_list,
> -                        struct io_uring_cqe *cqe)
> +static bool process_cqe_aio_handler(AioContext *ctx,
> +                                    AioHandlerList *ready_list,
> +                                    AioHandler *node,
> +                                    struct io_uring_cqe *cqe)
>  {
> -    AioHandler *node = io_uring_cqe_get_data(cqe);
> -    unsigned flags;
> -
> -    /* poll_timeout and poll_remove have a zero user_data field */
> -    if (!node) {
> -        return false;
> -    }
> -
> -    flags = qatomic_read(&node->flags);
> +    unsigned flags = qatomic_read(&node->flags);
>  
>      /*
>       * poll_multishot cancelled by poll_remove? Or completed early because fd
> @@ -261,6 +273,56 @@ static bool process_cqe(AioContext *ctx,
>      return true;
>  }
>  
> +/* Process CqeHandlers from the ready list */
> +static void cqe_handler_bh(void *opaque)
> +{
> +    AioContext *ctx = opaque;
> +    CqeHandlerSimpleQ *ready_list = &ctx->cqe_handler_ready_list;
> +
> +    /*
> +     * If cqe_handler->cb() calls aio_poll() it must continue processing
> +     * ready_list. Schedule a BH so the inner event loop calls us again.
> +     */
> +    qemu_bh_schedule(ctx->cqe_handler_bh);
> +
> +    while (!QSIMPLEQ_EMPTY(ready_list)) {
> +        CqeHandler *cqe_handler = QSIMPLEQ_FIRST(ready_list);
> +
> +        QSIMPLEQ_REMOVE_HEAD(ready_list, next);
> +
> +        cqe_handler->cb(cqe_handler);
> +    }
> +
> +    qemu_bh_cancel(ctx->cqe_handler_bh);
> +}
> +
> +/* Returns true if a handler became ready */
> +static bool process_cqe(AioContext *ctx,
> +                        AioHandlerList *ready_list,
> +                        struct io_uring_cqe *cqe)
> +{
> +    CqeHandler *cqe_handler = io_uring_cqe_get_data(cqe);
> +
> +    /* poll_timeout and poll_remove have a zero user_data field */
> +    if (!cqe_handler) {
> +        return false;
> +    }
> +
> +    /*
> +     * Special handling for AioHandler cqes. They need ready_list and have a
> +     * return value.
> +     */
> +    if (cqe_handler->cb == fdmon_special_cqe_handler) {
> +        AioHandler *node = container_of(cqe_handler, AioHandler, cqe_handler);
> +        return process_cqe_aio_handler(ctx, ready_list, node, cqe);
> +    }
> +
> +    cqe_handler->cqe = *cqe;
> +    QSIMPLEQ_INSERT_TAIL(&ctx->cqe_handler_ready_list, cqe_handler, next);
> +    qemu_bh_schedule(ctx->cqe_handler_bh);
> +    return false;
> +}
> +
>  static int process_cq_ring(AioContext *ctx, AioHandlerList *ready_list)
>  {
>      struct io_uring *ring = &ctx->fdmon_io_uring;
> @@ -360,6 +422,7 @@ static const FDMonOps fdmon_io_uring_ops = {
>      .gsource_prepare = fdmon_io_uring_gsource_prepare,
>      .gsource_check = fdmon_io_uring_gsource_check,
>      .gsource_dispatch = fdmon_io_uring_gsource_dispatch,
> +    .add_sqe = fdmon_io_uring_add_sqe,
>  };
>  
>  void fdmon_io_uring_setup(AioContext *ctx, Error **errp)
> @@ -375,6 +438,8 @@ void fdmon_io_uring_setup(AioContext *ctx, Error **errp)
>      }
>  
>      QSLIST_INIT(&ctx->submit_list);
> +    QSIMPLEQ_INIT(&ctx->cqe_handler_ready_list);
> +    ctx->cqe_handler_bh = aio_bh_new(ctx, cqe_handler_bh, ctx);
>      ctx->fdmon_ops = &fdmon_io_uring_ops;
>      ctx->io_uring_fd_tag = g_source_add_unix_fd(&ctx->source,
>              ctx->fdmon_io_uring.ring_fd, G_IO_IN);
> @@ -382,30 +447,36 @@ void fdmon_io_uring_setup(AioContext *ctx, Error **errp)
>  
>  void fdmon_io_uring_destroy(AioContext *ctx)
>  {
> -    if (ctx->fdmon_ops == &fdmon_io_uring_ops) {
> -        AioHandler *node;
> +    AioHandler *node;
>  
> -        io_uring_queue_exit(&ctx->fdmon_io_uring);
> +    if (ctx->fdmon_ops != &fdmon_io_uring_ops) {
> +        return;
> +    }
>  
> -        /* Move handlers due to be removed onto the deleted list */
> -        while ((node = QSLIST_FIRST_RCU(&ctx->submit_list))) {
> -            unsigned flags = qatomic_fetch_and(&node->flags,
> -                    ~(FDMON_IO_URING_PENDING |
> -                      FDMON_IO_URING_ADD |
> -                      FDMON_IO_URING_REMOVE));
> +    io_uring_queue_exit(&ctx->fdmon_io_uring);
>  
> -            if (flags & FDMON_IO_URING_REMOVE) {
> -                QLIST_INSERT_HEAD_RCU(&ctx->deleted_aio_handlers, node, node_deleted);
> -            }
> +    /* Move handlers due to be removed onto the deleted list */
> +    while ((node = QSLIST_FIRST_RCU(&ctx->submit_list))) {
> +        unsigned flags = qatomic_fetch_and(&node->flags,
> +                ~(FDMON_IO_URING_PENDING |
> +                  FDMON_IO_URING_ADD |
> +                  FDMON_IO_URING_REMOVE));
>  
> -            QSLIST_REMOVE_HEAD_RCU(&ctx->submit_list, node_submitted);
> +        if (flags & FDMON_IO_URING_REMOVE) {
> +            QLIST_INSERT_HEAD_RCU(&ctx->deleted_aio_handlers,
> +                                  node, node_deleted);
>          }
>  
> -        g_source_remove_unix_fd(&ctx->source, ctx->io_uring_fd_tag);
> -        ctx->io_uring_fd_tag = NULL;
> -
> -        qemu_lockcnt_lock(&ctx->list_lock);
> -        fdmon_poll_downgrade(ctx);
> -        qemu_lockcnt_unlock(&ctx->list_lock);
> +        QSLIST_REMOVE_HEAD_RCU(&ctx->submit_list, node_submitted);
>      }
> +
> +    g_source_remove_unix_fd(&ctx->source, ctx->io_uring_fd_tag);
> +    ctx->io_uring_fd_tag = NULL;
> +
> +    assert(QSIMPLEQ_EMPTY(&ctx->cqe_handler_ready_list));
> +    qemu_bh_delete(ctx->cqe_handler_bh);
> +
> +    qemu_lockcnt_lock(&ctx->list_lock);
> +    fdmon_poll_downgrade(ctx);
> +    qemu_lockcnt_unlock(&ctx->list_lock);
>  }
> -- 
> 2.49.0
> 
> 

-- 
Eric Blake, Principal Software Engineer
Red Hat, Inc.
Virtualization:  qemu.org | libguestfs.org
Re: [RFC 09/11] aio-posix: add aio_add_sqe() API for user-defined io_uring requests
Posted by Eric Blake 5 months, 2 weeks ago
On Wed, May 28, 2025 at 03:09:14PM -0400, Stefan Hajnoczi wrote:
> Introduce the aio_add_sqe() API for submitting io_uring requests in the
> current AioContext. This allows other components in QEMU, like the block
> layer, to take advantage of io_uring features without creating their own
> io_uring context.

Interesting that 8/11 refers to the function that we are just adding
now; but since the mention is only in the comments, I'm okay with
introducing the comment early and avoiding churn.

> 
> This API supports nested event loops just like file descriptor
> monitoring and BHs do. This comes at a complexity cost: a BH is required
> to dispatch CQE callbacks and they are placed on a list so that a nested
> event loop can invoke its parent's pending CQE callbacks. If you're
> wondering why CqeHandler exists instead of just a callback function
> pointer, this is why.
> 
> Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
> ---
>  include/block/aio.h   |  82 ++++++++++++++++++++++++
>  util/aio-posix.h      |   1 +
>  util/aio-posix.c      |   9 +++
>  util/fdmon-io_uring.c | 145 +++++++++++++++++++++++++++++++-----------
>  4 files changed, 200 insertions(+), 37 deletions(-)

I'll have to resume my review tomorrow.  (I'm having fun learning
about io_uring while reviewing this)

-- 
Eric Blake, Principal Software Engineer
Red Hat, Inc.
Virtualization:  qemu.org | libguestfs.org