[PATCH V4 12/27] ublk: add io events fifo structure

Ming Lei posted 27 patches 1 week, 3 days ago
[PATCH V4 12/27] ublk: add io events fifo structure
Posted by Ming Lei 1 week, 3 days ago
Add ublk io events fifo structure and prepare for supporting command
batch, which will use io_uring multishot uring_cmd for fetching one
batch of io commands each time.

One nice feature of kfifo is to allow multiple producer vs single
consumer. We just need lock the producer side, meantime the single
consumer can be lockless.

The producer is actually from ublk_queue_rq() or ublk_queue_rqs(), so
lock contention can be eased by setting proper blk-mq nr_queues.

Signed-off-by: Ming Lei <ming.lei@redhat.com>
---
 drivers/block/ublk_drv.c | 65 ++++++++++++++++++++++++++++++++++++----
 1 file changed, 60 insertions(+), 5 deletions(-)

diff --git a/drivers/block/ublk_drv.c b/drivers/block/ublk_drv.c
index ea992366af5b..6ff284243630 100644
--- a/drivers/block/ublk_drv.c
+++ b/drivers/block/ublk_drv.c
@@ -44,6 +44,7 @@
 #include <linux/task_work.h>
 #include <linux/namei.h>
 #include <linux/kref.h>
+#include <linux/kfifo.h>
 #include <uapi/linux/ublk_cmd.h>
 
 #define UBLK_MINORS		(1U << MINORBITS)
@@ -217,6 +218,22 @@ struct ublk_queue {
 	bool fail_io; /* copy of dev->state == UBLK_S_DEV_FAIL_IO */
 	spinlock_t		cancel_lock;
 	struct ublk_device *dev;
+
+	/*
+	 * Inflight ublk request tag is saved in this fifo
+	 *
+	 * There are multiple writer from ublk_queue_rq() or ublk_queue_rqs(),
+	 * so lock is required for storing request tag to fifo
+	 *
+	 * Make sure just one reader for fetching request from task work
+	 * function to ublk server, so no need to grab the lock in reader
+	 * side.
+	 */
+	struct {
+		DECLARE_KFIFO_PTR(evts_fifo, unsigned short);
+		spinlock_t evts_lock;
+	}____cacheline_aligned_in_smp;
+
 	struct ublk_io ios[] __counted_by(q_depth);
 };
 
@@ -282,6 +299,32 @@ static inline void ublk_io_unlock(struct ublk_io *io)
 	spin_unlock(&io->lock);
 }
 
+/* Initialize the queue */
+static inline int ublk_io_evts_init(struct ublk_queue *q, unsigned int size,
+				    int numa_node)
+{
+	spin_lock_init(&q->evts_lock);
+	return kfifo_alloc_node(&q->evts_fifo, size, GFP_KERNEL, numa_node);
+}
+
+/* Check if queue is empty */
+static inline bool ublk_io_evts_empty(const struct ublk_queue *q)
+{
+	return kfifo_is_empty(&q->evts_fifo);
+}
+
+/* Check if queue is full */
+static inline bool ublk_io_evts_full(const struct ublk_queue *q)
+{
+	return kfifo_is_full(&q->evts_fifo);
+}
+
+static inline void ublk_io_evts_deinit(struct ublk_queue *q)
+{
+	WARN_ON_ONCE(!kfifo_is_empty(&q->evts_fifo));
+	kfifo_free(&q->evts_fifo);
+}
+
 static inline struct ublksrv_io_desc *
 ublk_get_iod(const struct ublk_queue *ubq, unsigned tag)
 {
@@ -3038,6 +3081,9 @@ static void ublk_deinit_queue(struct ublk_device *ub, int q_id)
 	if (ubq->io_cmd_buf)
 		free_pages((unsigned long)ubq->io_cmd_buf, get_order(size));
 
+	if (ublk_dev_support_batch_io(ub))
+		ublk_io_evts_deinit(ubq);
+
 	kvfree(ubq);
 	ub->queues[q_id] = NULL;
 }
@@ -3062,7 +3108,7 @@ static int ublk_init_queue(struct ublk_device *ub, int q_id)
 	struct ublk_queue *ubq;
 	struct page *page;
 	int numa_node;
-	int size, i;
+	int size, i, ret = -ENOMEM;
 
 	/* Determine NUMA node based on queue's CPU affinity */
 	numa_node = ublk_get_queue_numa_node(ub, q_id);
@@ -3081,18 +3127,27 @@ static int ublk_init_queue(struct ublk_device *ub, int q_id)
 
 	/* Allocate I/O command buffer on local NUMA node */
 	page = alloc_pages_node(numa_node, gfp_flags, get_order(size));
-	if (!page) {
-		kvfree(ubq);
-		return -ENOMEM;
-	}
+	if (!page)
+		goto fail_nomem;
 	ubq->io_cmd_buf = page_address(page);
 
 	for (i = 0; i < ubq->q_depth; i++)
 		spin_lock_init(&ubq->ios[i].lock);
 
+	if (ublk_dev_support_batch_io(ub)) {
+		ret = ublk_io_evts_init(ubq, ubq->q_depth, numa_node);
+		if (ret)
+			goto fail;
+	}
 	ub->queues[q_id] = ubq;
 	ubq->dev = ub;
+
 	return 0;
+fail:
+	ublk_deinit_queue(ub, q_id);
+fail_nomem:
+	kvfree(ubq);
+	return ret;
 }
 
 static void ublk_deinit_queues(struct ublk_device *ub)
-- 
2.47.0
Re: [PATCH V4 12/27] ublk: add io events fifo structure
Posted by Caleb Sander Mateos 1 day, 8 hours ago
On Thu, Nov 20, 2025 at 5:59 PM Ming Lei <ming.lei@redhat.com> wrote:
>
> Add ublk io events fifo structure and prepare for supporting command
> batch, which will use io_uring multishot uring_cmd for fetching one
> batch of io commands each time.
>
> One nice feature of kfifo is to allow multiple producer vs single
> consumer. We just need lock the producer side, meantime the single
> consumer can be lockless.
>
> The producer is actually from ublk_queue_rq() or ublk_queue_rqs(), so
> lock contention can be eased by setting proper blk-mq nr_queues.
>
> Signed-off-by: Ming Lei <ming.lei@redhat.com>
> ---
>  drivers/block/ublk_drv.c | 65 ++++++++++++++++++++++++++++++++++++----
>  1 file changed, 60 insertions(+), 5 deletions(-)
>
> diff --git a/drivers/block/ublk_drv.c b/drivers/block/ublk_drv.c
> index ea992366af5b..6ff284243630 100644
> --- a/drivers/block/ublk_drv.c
> +++ b/drivers/block/ublk_drv.c
> @@ -44,6 +44,7 @@
>  #include <linux/task_work.h>
>  #include <linux/namei.h>
>  #include <linux/kref.h>
> +#include <linux/kfifo.h>
>  #include <uapi/linux/ublk_cmd.h>
>
>  #define UBLK_MINORS            (1U << MINORBITS)
> @@ -217,6 +218,22 @@ struct ublk_queue {
>         bool fail_io; /* copy of dev->state == UBLK_S_DEV_FAIL_IO */
>         spinlock_t              cancel_lock;
>         struct ublk_device *dev;
> +
> +       /*
> +        * Inflight ublk request tag is saved in this fifo
> +        *
> +        * There are multiple writer from ublk_queue_rq() or ublk_queue_rqs(),
> +        * so lock is required for storing request tag to fifo
> +        *
> +        * Make sure just one reader for fetching request from task work
> +        * function to ublk server, so no need to grab the lock in reader
> +        * side.

Can you clarify that this is only used for batch mode?

> +        */
> +       struct {
> +               DECLARE_KFIFO_PTR(evts_fifo, unsigned short);
> +               spinlock_t evts_lock;
> +       }____cacheline_aligned_in_smp;
> +
>         struct ublk_io ios[] __counted_by(q_depth);
>  };
>
> @@ -282,6 +299,32 @@ static inline void ublk_io_unlock(struct ublk_io *io)
>         spin_unlock(&io->lock);
>  }
>
> +/* Initialize the queue */

"queue" -> "events queue"? Otherwise, it sounds like it's referring to
struct ublk_queue.

> +static inline int ublk_io_evts_init(struct ublk_queue *q, unsigned int size,
> +                                   int numa_node)
> +{
> +       spin_lock_init(&q->evts_lock);
> +       return kfifo_alloc_node(&q->evts_fifo, size, GFP_KERNEL, numa_node);
> +}
> +
> +/* Check if queue is empty */
> +static inline bool ublk_io_evts_empty(const struct ublk_queue *q)
> +{
> +       return kfifo_is_empty(&q->evts_fifo);
> +}
> +
> +/* Check if queue is full */
> +static inline bool ublk_io_evts_full(const struct ublk_queue *q)

Function is unused?

> +{
> +       return kfifo_is_full(&q->evts_fifo);
> +}
> +
> +static inline void ublk_io_evts_deinit(struct ublk_queue *q)
> +{
> +       WARN_ON_ONCE(!kfifo_is_empty(&q->evts_fifo));
> +       kfifo_free(&q->evts_fifo);
> +}
> +
>  static inline struct ublksrv_io_desc *
>  ublk_get_iod(const struct ublk_queue *ubq, unsigned tag)
>  {
> @@ -3038,6 +3081,9 @@ static void ublk_deinit_queue(struct ublk_device *ub, int q_id)
>         if (ubq->io_cmd_buf)
>                 free_pages((unsigned long)ubq->io_cmd_buf, get_order(size));
>
> +       if (ublk_dev_support_batch_io(ub))
> +               ublk_io_evts_deinit(ubq);
> +
>         kvfree(ubq);
>         ub->queues[q_id] = NULL;
>  }
> @@ -3062,7 +3108,7 @@ static int ublk_init_queue(struct ublk_device *ub, int q_id)
>         struct ublk_queue *ubq;
>         struct page *page;
>         int numa_node;
> -       int size, i;
> +       int size, i, ret = -ENOMEM;
>
>         /* Determine NUMA node based on queue's CPU affinity */
>         numa_node = ublk_get_queue_numa_node(ub, q_id);
> @@ -3081,18 +3127,27 @@ static int ublk_init_queue(struct ublk_device *ub, int q_id)
>
>         /* Allocate I/O command buffer on local NUMA node */
>         page = alloc_pages_node(numa_node, gfp_flags, get_order(size));
> -       if (!page) {
> -               kvfree(ubq);
> -               return -ENOMEM;
> -       }
> +       if (!page)
> +               goto fail_nomem;
>         ubq->io_cmd_buf = page_address(page);
>
>         for (i = 0; i < ubq->q_depth; i++)
>                 spin_lock_init(&ubq->ios[i].lock);
>
> +       if (ublk_dev_support_batch_io(ub)) {
> +               ret = ublk_io_evts_init(ubq, ubq->q_depth, numa_node);
> +               if (ret)
> +                       goto fail;
> +       }
>         ub->queues[q_id] = ubq;
>         ubq->dev = ub;
> +
>         return 0;
> +fail:
> +       ublk_deinit_queue(ub, q_id);

This is a no-op since ub->queues[q_id] hasn't been assigned yet?

Best,
Caleb

> +fail_nomem:
> +       kvfree(ubq);
> +       return ret;
>  }
>
>  static void ublk_deinit_queues(struct ublk_device *ub)
> --
> 2.47.0
>
Re: [PATCH V4 12/27] ublk: add io events fifo structure
Posted by Ming Lei 22 hours ago
On Sun, Nov 30, 2025 at 08:53:03AM -0800, Caleb Sander Mateos wrote:
> On Thu, Nov 20, 2025 at 5:59 PM Ming Lei <ming.lei@redhat.com> wrote:
> >
> > Add ublk io events fifo structure and prepare for supporting command
> > batch, which will use io_uring multishot uring_cmd for fetching one
> > batch of io commands each time.
> >
> > One nice feature of kfifo is to allow multiple producer vs single
> > consumer. We just need lock the producer side, meantime the single
> > consumer can be lockless.
> >
> > The producer is actually from ublk_queue_rq() or ublk_queue_rqs(), so
> > lock contention can be eased by setting proper blk-mq nr_queues.
> >
> > Signed-off-by: Ming Lei <ming.lei@redhat.com>
> > ---
> >  drivers/block/ublk_drv.c | 65 ++++++++++++++++++++++++++++++++++++----
> >  1 file changed, 60 insertions(+), 5 deletions(-)
> >
> > diff --git a/drivers/block/ublk_drv.c b/drivers/block/ublk_drv.c
> > index ea992366af5b..6ff284243630 100644
> > --- a/drivers/block/ublk_drv.c
> > +++ b/drivers/block/ublk_drv.c
> > @@ -44,6 +44,7 @@
> >  #include <linux/task_work.h>
> >  #include <linux/namei.h>
> >  #include <linux/kref.h>
> > +#include <linux/kfifo.h>
> >  #include <uapi/linux/ublk_cmd.h>
> >
> >  #define UBLK_MINORS            (1U << MINORBITS)
> > @@ -217,6 +218,22 @@ struct ublk_queue {
> >         bool fail_io; /* copy of dev->state == UBLK_S_DEV_FAIL_IO */
> >         spinlock_t              cancel_lock;
> >         struct ublk_device *dev;
> > +
> > +       /*
> > +        * Inflight ublk request tag is saved in this fifo
> > +        *
> > +        * There are multiple writer from ublk_queue_rq() or ublk_queue_rqs(),
> > +        * so lock is required for storing request tag to fifo
> > +        *
> > +        * Make sure just one reader for fetching request from task work
> > +        * function to ublk server, so no need to grab the lock in reader
> > +        * side.
> 
> Can you clarify that this is only used for batch mode?

Yes.

> 
> > +        */
> > +       struct {
> > +               DECLARE_KFIFO_PTR(evts_fifo, unsigned short);
> > +               spinlock_t evts_lock;
> > +       }____cacheline_aligned_in_smp;
> > +
> >         struct ublk_io ios[] __counted_by(q_depth);
> >  };
> >
> > @@ -282,6 +299,32 @@ static inline void ublk_io_unlock(struct ublk_io *io)
> >         spin_unlock(&io->lock);
> >  }
> >
> > +/* Initialize the queue */
> 
> "queue" -> "events queue"? Otherwise, it sounds like it's referring to
> struct ublk_queue.

OK.

> 
> > +static inline int ublk_io_evts_init(struct ublk_queue *q, unsigned int size,
> > +                                   int numa_node)
> > +{
> > +       spin_lock_init(&q->evts_lock);
> > +       return kfifo_alloc_node(&q->evts_fifo, size, GFP_KERNEL, numa_node);
> > +}
> > +
> > +/* Check if queue is empty */
> > +static inline bool ublk_io_evts_empty(const struct ublk_queue *q)
> > +{
> > +       return kfifo_is_empty(&q->evts_fifo);
> > +}
> > +
> > +/* Check if queue is full */
> > +static inline bool ublk_io_evts_full(const struct ublk_queue *q)
> 
> Function is unused?

Yes, will remove it.

> 
> > +{
> > +       return kfifo_is_full(&q->evts_fifo);
> > +}
> > +
> > +static inline void ublk_io_evts_deinit(struct ublk_queue *q)
> > +{
> > +       WARN_ON_ONCE(!kfifo_is_empty(&q->evts_fifo));
> > +       kfifo_free(&q->evts_fifo);
> > +}
> > +
> >  static inline struct ublksrv_io_desc *
> >  ublk_get_iod(const struct ublk_queue *ubq, unsigned tag)
> >  {
> > @@ -3038,6 +3081,9 @@ static void ublk_deinit_queue(struct ublk_device *ub, int q_id)
> >         if (ubq->io_cmd_buf)
> >                 free_pages((unsigned long)ubq->io_cmd_buf, get_order(size));
> >
> > +       if (ublk_dev_support_batch_io(ub))
> > +               ublk_io_evts_deinit(ubq);
> > +
> >         kvfree(ubq);
> >         ub->queues[q_id] = NULL;
> >  }
> > @@ -3062,7 +3108,7 @@ static int ublk_init_queue(struct ublk_device *ub, int q_id)
> >         struct ublk_queue *ubq;
> >         struct page *page;
> >         int numa_node;
> > -       int size, i;
> > +       int size, i, ret = -ENOMEM;
> >
> >         /* Determine NUMA node based on queue's CPU affinity */
> >         numa_node = ublk_get_queue_numa_node(ub, q_id);
> > @@ -3081,18 +3127,27 @@ static int ublk_init_queue(struct ublk_device *ub, int q_id)
> >
> >         /* Allocate I/O command buffer on local NUMA node */
> >         page = alloc_pages_node(numa_node, gfp_flags, get_order(size));
> > -       if (!page) {
> > -               kvfree(ubq);
> > -               return -ENOMEM;
> > -       }
> > +       if (!page)
> > +               goto fail_nomem;
> >         ubq->io_cmd_buf = page_address(page);
> >
> >         for (i = 0; i < ubq->q_depth; i++)
> >                 spin_lock_init(&ubq->ios[i].lock);
> >
> > +       if (ublk_dev_support_batch_io(ub)) {
> > +               ret = ublk_io_evts_init(ubq, ubq->q_depth, numa_node);
> > +               if (ret)
> > +                       goto fail;
> > +       }
> >         ub->queues[q_id] = ubq;
> >         ubq->dev = ub;
> > +
> >         return 0;
> > +fail:
> > +       ublk_deinit_queue(ub, q_id);
> 
> This is a no-op since ub->queues[q_id] hasn't been assigned yet?

Good catch, __ublk_deinit_queue(ub, ubq) can be added for solving the
failure handling.


Thanks,
Ming