[PATCH V4 13/27] ublk: add batch I/O dispatch infrastructure

Ming Lei posted 27 patches 1 week, 3 days ago
[PATCH V4 13/27] ublk: add batch I/O dispatch infrastructure
Posted by Ming Lei 1 week, 3 days ago
Add infrastructure for delivering I/O commands to ublk server in batches,
preparing for the upcoming UBLK_U_IO_FETCH_IO_CMDS feature.

Key components:

- struct ublk_batch_fcmd: Represents a batch fetch uring_cmd that will
  receive multiple I/O tags in a single operation, using io_uring's
  multishot command for efficient ublk IO delivery.

- ublk_batch_dispatch(): Batch version of ublk_dispatch_req() that:
  * Pulls multiple request tags from the events FIFO (lock-free reader)
  * Prepares each I/O for delivery (including auto buffer registration)
  * Delivers tags to userspace via single uring_cmd notification
  * Handles partial failures by restoring undelivered tags to FIFO

The batch approach significantly reduces notification overhead by aggregating
multiple I/O completions into single uring_cmd, while maintaining the same
I/O processing semantics as individual operations.

Error handling ensures system consistency: if buffer selection or CQE
posting fails, undelivered tags are restored to the FIFO for retry,
meantime IO state has to be restored.

This runs in task work context, scheduled via io_uring_cmd_complete_in_task()
or called directly from ->uring_cmd(), enabling efficient batch processing
without blocking the I/O submission path.

Signed-off-by: Ming Lei <ming.lei@redhat.com>
---
 drivers/block/ublk_drv.c | 189 +++++++++++++++++++++++++++++++++++++++
 1 file changed, 189 insertions(+)

diff --git a/drivers/block/ublk_drv.c b/drivers/block/ublk_drv.c
index 6ff284243630..cc9c92d97349 100644
--- a/drivers/block/ublk_drv.c
+++ b/drivers/block/ublk_drv.c
@@ -91,6 +91,12 @@
 	 UBLK_BATCH_F_HAS_BUF_ADDR | \
 	 UBLK_BATCH_F_AUTO_BUF_REG_FALLBACK)
 
+/* ublk batch fetch uring_cmd */
+struct ublk_batch_fcmd {
+	struct io_uring_cmd *cmd;
+	unsigned short buf_group;
+};
+
 struct ublk_uring_cmd_pdu {
 	/*
 	 * Store requests in same batch temporarily for queuing them to
@@ -168,6 +174,9 @@ struct ublk_batch_io_data {
  */
 #define UBLK_REFCOUNT_INIT (REFCOUNT_MAX / 2)
 
+/* used for UBLK_F_BATCH_IO only */
+#define UBLK_BATCH_IO_UNUSED_TAG	((unsigned short)-1)
+
 union ublk_io_buf {
 	__u64	addr;
 	struct ublk_auto_buf_reg auto_reg;
@@ -616,6 +625,32 @@ static wait_queue_head_t ublk_idr_wq;	/* wait until one idr is freed */
 static DEFINE_MUTEX(ublk_ctl_mutex);
 
 
+static void ublk_batch_deinit_fetch_buf(const struct ublk_batch_io_data *data,
+					struct ublk_batch_fcmd *fcmd,
+					int res)
+{
+	io_uring_cmd_done(fcmd->cmd, res, data->issue_flags);
+	fcmd->cmd = NULL;
+}
+
+static int ublk_batch_fetch_post_cqe(struct ublk_batch_fcmd *fcmd,
+				     struct io_br_sel *sel,
+				     unsigned int issue_flags)
+{
+	if (io_uring_mshot_cmd_post_cqe(fcmd->cmd, sel, issue_flags))
+		return -ENOBUFS;
+	return 0;
+}
+
+static ssize_t ublk_batch_copy_io_tags(struct ublk_batch_fcmd *fcmd,
+				       void __user *buf, const u16 *tag_buf,
+				       unsigned int len)
+{
+	if (copy_to_user(buf, tag_buf, len))
+		return -EFAULT;
+	return len;
+}
+
 #define UBLK_MAX_UBLKS UBLK_MINORS
 
 /*
@@ -1378,6 +1413,160 @@ static void ublk_dispatch_req(struct ublk_queue *ubq,
 	}
 }
 
+static bool __ublk_batch_prep_dispatch(struct ublk_queue *ubq,
+				       const struct ublk_batch_io_data *data,
+				       unsigned short tag)
+{
+	struct ublk_device *ub = data->ub;
+	struct ublk_io *io = &ubq->ios[tag];
+	struct request *req = blk_mq_tag_to_rq(ub->tag_set.tags[ubq->q_id], tag);
+	enum auto_buf_reg_res res = AUTO_BUF_REG_FALLBACK;
+	struct io_uring_cmd *cmd = data->cmd;
+
+	if (!ublk_start_io(ubq, req, io))
+		return false;
+
+	if (ublk_support_auto_buf_reg(ubq) && ublk_rq_has_data(req))
+		res = __ublk_do_auto_buf_reg(ubq, req, io, cmd,
+				data->issue_flags);
+
+	if (res == AUTO_BUF_REG_FAIL)
+		return false;
+
+	ublk_io_lock(io);
+	ublk_prep_auto_buf_reg_io(ubq, req, io, cmd, res);
+	ublk_io_unlock(io);
+
+	return true;
+}
+
+static bool ublk_batch_prep_dispatch(struct ublk_queue *ubq,
+				     const struct ublk_batch_io_data *data,
+				     unsigned short *tag_buf,
+				     unsigned int len)
+{
+	bool has_unused = false;
+	int i;
+
+	for (i = 0; i < len; i += 1) {
+		unsigned short tag = tag_buf[i];
+
+		if (!__ublk_batch_prep_dispatch(ubq, data, tag)) {
+			tag_buf[i] = UBLK_BATCH_IO_UNUSED_TAG;
+			has_unused = true;
+		}
+	}
+
+	return has_unused;
+}
+
+/*
+ * Filter out UBLK_BATCH_IO_UNUSED_TAG entries from tag_buf.
+ * Returns the new length after filtering.
+ */
+static unsigned int ublk_filter_unused_tags(unsigned short *tag_buf,
+					    unsigned int len)
+{
+	unsigned int i, j;
+
+	for (i = 0, j = 0; i < len; i++) {
+		if (tag_buf[i] != UBLK_BATCH_IO_UNUSED_TAG) {
+			if (i != j)
+				tag_buf[j] = tag_buf[i];
+			j++;
+		}
+	}
+
+	return j;
+}
+
+#define MAX_NR_TAG 128
+static int __ublk_batch_dispatch(struct ublk_queue *ubq,
+				 const struct ublk_batch_io_data *data,
+				 struct ublk_batch_fcmd *fcmd)
+{
+	unsigned short tag_buf[MAX_NR_TAG];
+	struct io_br_sel sel;
+	size_t len = 0;
+	bool needs_filter;
+	int ret;
+
+	sel = io_uring_cmd_buffer_select(fcmd->cmd, fcmd->buf_group, &len,
+					 data->issue_flags);
+	if (sel.val < 0)
+		return sel.val;
+	if (!sel.addr)
+		return -ENOBUFS;
+
+	/* single reader needn't lock and sizeof(kfifo element) is 2 bytes */
+	len = min(len, sizeof(tag_buf)) / 2;
+	len = kfifo_out(&ubq->evts_fifo, tag_buf, len);
+
+	needs_filter = ublk_batch_prep_dispatch(ubq, data, tag_buf, len);
+	/* Filter out unused tags before posting to userspace */
+	if (unlikely(needs_filter)) {
+		int new_len = ublk_filter_unused_tags(tag_buf, len);
+
+		if (!new_len)
+			return len;
+		len = new_len;
+	}
+
+	sel.val = ublk_batch_copy_io_tags(fcmd, sel.addr, tag_buf, len * 2);
+	ret = ublk_batch_fetch_post_cqe(fcmd, &sel, data->issue_flags);
+	if (unlikely(ret < 0)) {
+		int i, res;
+
+		/*
+		 * Undo prep state for all IOs since userspace never received them.
+		 * This restores IOs to pre-prepared state so they can be cleanly
+		 * re-prepared when tags are pulled from FIFO again.
+		 */
+		for (i = 0; i < len; i++) {
+			struct ublk_io *io = &ubq->ios[tag_buf[i]];
+			int index = -1;
+
+			ublk_io_lock(io);
+			if (io->flags & UBLK_IO_FLAG_AUTO_BUF_REG)
+				index = io->buf.auto_reg.index;
+			io->flags &= ~(UBLK_IO_FLAG_OWNED_BY_SRV | UBLK_IO_FLAG_AUTO_BUF_REG);
+			io->flags |= UBLK_IO_FLAG_ACTIVE;
+			ublk_io_unlock(io);
+
+			if (index != -1)
+				io_buffer_unregister_bvec(data->cmd, index,
+						data->issue_flags);
+		}
+
+		res = kfifo_in_spinlocked_noirqsave(&ubq->evts_fifo,
+			tag_buf, len, &ubq->evts_lock);
+
+		pr_warn("%s: copy tags or post CQE failure, move back "
+				"tags(%d %zu) ret %d\n", __func__, res, len,
+				ret);
+	}
+	return ret;
+}
+
+static __maybe_unused int
+ublk_batch_dispatch(struct ublk_queue *ubq,
+		    const struct ublk_batch_io_data *data,
+		    struct ublk_batch_fcmd *fcmd)
+{
+	int ret = 0;
+
+	while (!ublk_io_evts_empty(ubq)) {
+		ret = __ublk_batch_dispatch(ubq, data, fcmd);
+		if (ret <= 0)
+			break;
+	}
+
+	if (ret < 0)
+		ublk_batch_deinit_fetch_buf(data, fcmd, ret);
+
+	return ret;
+}
+
 static void ublk_cmd_tw_cb(struct io_uring_cmd *cmd,
 			   unsigned int issue_flags)
 {
-- 
2.47.0
Re: [PATCH V4 13/27] ublk: add batch I/O dispatch infrastructure
Posted by Caleb Sander Mateos 1 day, 6 hours ago
On Thu, Nov 20, 2025 at 6:00 PM Ming Lei <ming.lei@redhat.com> wrote:
>
> Add infrastructure for delivering I/O commands to ublk server in batches,
> preparing for the upcoming UBLK_U_IO_FETCH_IO_CMDS feature.
>
> Key components:
>
> - struct ublk_batch_fcmd: Represents a batch fetch uring_cmd that will
>   receive multiple I/O tags in a single operation, using io_uring's
>   multishot command for efficient ublk IO delivery.
>
> - ublk_batch_dispatch(): Batch version of ublk_dispatch_req() that:
>   * Pulls multiple request tags from the events FIFO (lock-free reader)
>   * Prepares each I/O for delivery (including auto buffer registration)
>   * Delivers tags to userspace via single uring_cmd notification
>   * Handles partial failures by restoring undelivered tags to FIFO
>
> The batch approach significantly reduces notification overhead by aggregating
> multiple I/O completions into single uring_cmd, while maintaining the same
> I/O processing semantics as individual operations.
>
> Error handling ensures system consistency: if buffer selection or CQE
> posting fails, undelivered tags are restored to the FIFO for retry,
> meantime IO state has to be restored.
>
> This runs in task work context, scheduled via io_uring_cmd_complete_in_task()
> or called directly from ->uring_cmd(), enabling efficient batch processing
> without blocking the I/O submission path.
>
> Signed-off-by: Ming Lei <ming.lei@redhat.com>
> ---
>  drivers/block/ublk_drv.c | 189 +++++++++++++++++++++++++++++++++++++++
>  1 file changed, 189 insertions(+)
>
> diff --git a/drivers/block/ublk_drv.c b/drivers/block/ublk_drv.c
> index 6ff284243630..cc9c92d97349 100644
> --- a/drivers/block/ublk_drv.c
> +++ b/drivers/block/ublk_drv.c
> @@ -91,6 +91,12 @@
>          UBLK_BATCH_F_HAS_BUF_ADDR | \
>          UBLK_BATCH_F_AUTO_BUF_REG_FALLBACK)
>
> +/* ublk batch fetch uring_cmd */
> +struct ublk_batch_fcmd {

I would prefer "fetch_cmd" instead of "fcmd" for clarity

> +       struct io_uring_cmd *cmd;
> +       unsigned short buf_group;
> +};
> +
>  struct ublk_uring_cmd_pdu {
>         /*
>          * Store requests in same batch temporarily for queuing them to
> @@ -168,6 +174,9 @@ struct ublk_batch_io_data {
>   */
>  #define UBLK_REFCOUNT_INIT (REFCOUNT_MAX / 2)
>
> +/* used for UBLK_F_BATCH_IO only */
> +#define UBLK_BATCH_IO_UNUSED_TAG       ((unsigned short)-1)
> +
>  union ublk_io_buf {
>         __u64   addr;
>         struct ublk_auto_buf_reg auto_reg;
> @@ -616,6 +625,32 @@ static wait_queue_head_t ublk_idr_wq;      /* wait until one idr is freed */
>  static DEFINE_MUTEX(ublk_ctl_mutex);
>
>
> +static void ublk_batch_deinit_fetch_buf(const struct ublk_batch_io_data *data,
> +                                       struct ublk_batch_fcmd *fcmd,
> +                                       int res)
> +{
> +       io_uring_cmd_done(fcmd->cmd, res, data->issue_flags);
> +       fcmd->cmd = NULL;
> +}
> +
> +static int ublk_batch_fetch_post_cqe(struct ublk_batch_fcmd *fcmd,
> +                                    struct io_br_sel *sel,
> +                                    unsigned int issue_flags)
> +{
> +       if (io_uring_mshot_cmd_post_cqe(fcmd->cmd, sel, issue_flags))
> +               return -ENOBUFS;
> +       return 0;
> +}
> +
> +static ssize_t ublk_batch_copy_io_tags(struct ublk_batch_fcmd *fcmd,
> +                                      void __user *buf, const u16 *tag_buf,
> +                                      unsigned int len)
> +{
> +       if (copy_to_user(buf, tag_buf, len))
> +               return -EFAULT;
> +       return len;
> +}
> +
>  #define UBLK_MAX_UBLKS UBLK_MINORS
>
>  /*
> @@ -1378,6 +1413,160 @@ static void ublk_dispatch_req(struct ublk_queue *ubq,
>         }
>  }
>
> +static bool __ublk_batch_prep_dispatch(struct ublk_queue *ubq,
> +                                      const struct ublk_batch_io_data *data,
> +                                      unsigned short tag)
> +{
> +       struct ublk_device *ub = data->ub;
> +       struct ublk_io *io = &ubq->ios[tag];
> +       struct request *req = blk_mq_tag_to_rq(ub->tag_set.tags[ubq->q_id], tag);
> +       enum auto_buf_reg_res res = AUTO_BUF_REG_FALLBACK;
> +       struct io_uring_cmd *cmd = data->cmd;
> +
> +       if (!ublk_start_io(ubq, req, io))

This doesn't look correct for UBLK_F_NEED_GET_DATA. If that's not
supported in batch mode, then it should probably be disallowed when
creating a batch-mode ublk device. The ublk_need_get_data() check in
ublk_batch_commit_io_check() could also be dropped.

> +               return false;
> +
> +       if (ublk_support_auto_buf_reg(ubq) && ublk_rq_has_data(req))
> +               res = __ublk_do_auto_buf_reg(ubq, req, io, cmd,
> +                               data->issue_flags);

__ublk_do_auto_buf_reg() reads io->buf.auto_reg. That seems racy
without holding the io spinlock.

> +
> +       if (res == AUTO_BUF_REG_FAIL)
> +               return false;

Could be moved into the if (ublk_support_auto_buf_reg(ubq) &&
ublk_rq_has_data(req)) statement since it won't be true otherwise?

> +
> +       ublk_io_lock(io);
> +       ublk_prep_auto_buf_reg_io(ubq, req, io, cmd, res);
> +       ublk_io_unlock(io);
> +
> +       return true;
> +}
> +
> +static bool ublk_batch_prep_dispatch(struct ublk_queue *ubq,
> +                                    const struct ublk_batch_io_data *data,
> +                                    unsigned short *tag_buf,
> +                                    unsigned int len)
> +{
> +       bool has_unused = false;
> +       int i;

unsigned?

> +
> +       for (i = 0; i < len; i += 1) {

i++?

> +               unsigned short tag = tag_buf[i];
> +
> +               if (!__ublk_batch_prep_dispatch(ubq, data, tag)) {
> +                       tag_buf[i] = UBLK_BATCH_IO_UNUSED_TAG;
> +                       has_unused = true;
> +               }
> +       }
> +
> +       return has_unused;
> +}
> +
> +/*
> + * Filter out UBLK_BATCH_IO_UNUSED_TAG entries from tag_buf.
> + * Returns the new length after filtering.
> + */
> +static unsigned int ublk_filter_unused_tags(unsigned short *tag_buf,
> +                                           unsigned int len)
> +{
> +       unsigned int i, j;
> +
> +       for (i = 0, j = 0; i < len; i++) {
> +               if (tag_buf[i] != UBLK_BATCH_IO_UNUSED_TAG) {
> +                       if (i != j)
> +                               tag_buf[j] = tag_buf[i];
> +                       j++;
> +               }
> +       }
> +
> +       return j;
> +}
> +
> +#define MAX_NR_TAG 128
> +static int __ublk_batch_dispatch(struct ublk_queue *ubq,
> +                                const struct ublk_batch_io_data *data,
> +                                struct ublk_batch_fcmd *fcmd)
> +{
> +       unsigned short tag_buf[MAX_NR_TAG];
> +       struct io_br_sel sel;
> +       size_t len = 0;
> +       bool needs_filter;
> +       int ret;
> +
> +       sel = io_uring_cmd_buffer_select(fcmd->cmd, fcmd->buf_group, &len,
> +                                        data->issue_flags);
> +       if (sel.val < 0)
> +               return sel.val;
> +       if (!sel.addr)
> +               return -ENOBUFS;
> +
> +       /* single reader needn't lock and sizeof(kfifo element) is 2 bytes */
> +       len = min(len, sizeof(tag_buf)) / 2;

sizeof(unsigned short) instead of 2?

> +       len = kfifo_out(&ubq->evts_fifo, tag_buf, len);
> +
> +       needs_filter = ublk_batch_prep_dispatch(ubq, data, tag_buf, len);
> +       /* Filter out unused tags before posting to userspace */
> +       if (unlikely(needs_filter)) {
> +               int new_len = ublk_filter_unused_tags(tag_buf, len);
> +
> +               if (!new_len)
> +                       return len;

Is the purpose of this return value just to make ublk_batch_dispatch()
retry __ublk_batch_dispatch()? Otherwise, it seems like a strange
value to return.

Also, shouldn't this path release the selected buffer to avoid leaking it?

> +               len = new_len;
> +       }
> +
> +       sel.val = ublk_batch_copy_io_tags(fcmd, sel.addr, tag_buf, len * 2);

sizeof(unsigned short)?

> +       ret = ublk_batch_fetch_post_cqe(fcmd, &sel, data->issue_flags);
> +       if (unlikely(ret < 0)) {
> +               int i, res;
> +
> +               /*
> +                * Undo prep state for all IOs since userspace never received them.
> +                * This restores IOs to pre-prepared state so they can be cleanly
> +                * re-prepared when tags are pulled from FIFO again.
> +                */
> +               for (i = 0; i < len; i++) {
> +                       struct ublk_io *io = &ubq->ios[tag_buf[i]];
> +                       int index = -1;
> +
> +                       ublk_io_lock(io);
> +                       if (io->flags & UBLK_IO_FLAG_AUTO_BUF_REG)
> +                               index = io->buf.auto_reg.index;

This is missing the io->buf_ctx_handle == io_uring_cmd_ctx_handle(cmd)
check from ublk_handle_auto_buf_reg().

> +                       io->flags &= ~(UBLK_IO_FLAG_OWNED_BY_SRV | UBLK_IO_FLAG_AUTO_BUF_REG);
> +                       io->flags |= UBLK_IO_FLAG_ACTIVE;
> +                       ublk_io_unlock(io);
> +
> +                       if (index != -1)
> +                               io_buffer_unregister_bvec(data->cmd, index,
> +                                               data->issue_flags);
> +               }
> +
> +               res = kfifo_in_spinlocked_noirqsave(&ubq->evts_fifo,
> +                       tag_buf, len, &ubq->evts_lock);
> +
> +               pr_warn("%s: copy tags or post CQE failure, move back "
> +                               "tags(%d %zu) ret %d\n", __func__, res, len,
> +                               ret);
> +       }
> +       return ret;
> +}
> +
> +static __maybe_unused int

The return value looks completely unused. Just return void instead?

Best,
Caleb

> +ublk_batch_dispatch(struct ublk_queue *ubq,
> +                   const struct ublk_batch_io_data *data,
> +                   struct ublk_batch_fcmd *fcmd)
> +{
> +       int ret = 0;
> +
> +       while (!ublk_io_evts_empty(ubq)) {
> +               ret = __ublk_batch_dispatch(ubq, data, fcmd);
> +               if (ret <= 0)
> +                       break;
> +       }
> +
> +       if (ret < 0)
> +               ublk_batch_deinit_fetch_buf(data, fcmd, ret);
> +
> +       return ret;
> +}
> +
>  static void ublk_cmd_tw_cb(struct io_uring_cmd *cmd,
>                            unsigned int issue_flags)
>  {
> --
> 2.47.0
>
Re: [PATCH V4 13/27] ublk: add batch I/O dispatch infrastructure
Posted by Ming Lei 23 hours ago
On Sun, Nov 30, 2025 at 11:24:12AM -0800, Caleb Sander Mateos wrote:
> On Thu, Nov 20, 2025 at 6:00 PM Ming Lei <ming.lei@redhat.com> wrote:
> >
> > Add infrastructure for delivering I/O commands to ublk server in batches,
> > preparing for the upcoming UBLK_U_IO_FETCH_IO_CMDS feature.
> >
> > Key components:
> >
> > - struct ublk_batch_fcmd: Represents a batch fetch uring_cmd that will
> >   receive multiple I/O tags in a single operation, using io_uring's
> >   multishot command for efficient ublk IO delivery.
> >
> > - ublk_batch_dispatch(): Batch version of ublk_dispatch_req() that:
> >   * Pulls multiple request tags from the events FIFO (lock-free reader)
> >   * Prepares each I/O for delivery (including auto buffer registration)
> >   * Delivers tags to userspace via single uring_cmd notification
> >   * Handles partial failures by restoring undelivered tags to FIFO
> >
> > The batch approach significantly reduces notification overhead by aggregating
> > multiple I/O completions into single uring_cmd, while maintaining the same
> > I/O processing semantics as individual operations.
> >
> > Error handling ensures system consistency: if buffer selection or CQE
> > posting fails, undelivered tags are restored to the FIFO for retry,
> > meantime IO state has to be restored.
> >
> > This runs in task work context, scheduled via io_uring_cmd_complete_in_task()
> > or called directly from ->uring_cmd(), enabling efficient batch processing
> > without blocking the I/O submission path.
> >
> > Signed-off-by: Ming Lei <ming.lei@redhat.com>
> > ---
> >  drivers/block/ublk_drv.c | 189 +++++++++++++++++++++++++++++++++++++++
> >  1 file changed, 189 insertions(+)
> >
> > diff --git a/drivers/block/ublk_drv.c b/drivers/block/ublk_drv.c
> > index 6ff284243630..cc9c92d97349 100644
> > --- a/drivers/block/ublk_drv.c
> > +++ b/drivers/block/ublk_drv.c
> > @@ -91,6 +91,12 @@
> >          UBLK_BATCH_F_HAS_BUF_ADDR | \
> >          UBLK_BATCH_F_AUTO_BUF_REG_FALLBACK)
> >
> > +/* ublk batch fetch uring_cmd */
> > +struct ublk_batch_fcmd {
> 
> I would prefer "fetch_cmd" instead of "fcmd" for clarity
> 
> > +       struct io_uring_cmd *cmd;
> > +       unsigned short buf_group;
> > +};
> > +
> >  struct ublk_uring_cmd_pdu {
> >         /*
> >          * Store requests in same batch temporarily for queuing them to
> > @@ -168,6 +174,9 @@ struct ublk_batch_io_data {
> >   */
> >  #define UBLK_REFCOUNT_INIT (REFCOUNT_MAX / 2)
> >
> > +/* used for UBLK_F_BATCH_IO only */
> > +#define UBLK_BATCH_IO_UNUSED_TAG       ((unsigned short)-1)
> > +
> >  union ublk_io_buf {
> >         __u64   addr;
> >         struct ublk_auto_buf_reg auto_reg;
> > @@ -616,6 +625,32 @@ static wait_queue_head_t ublk_idr_wq;      /* wait until one idr is freed */
> >  static DEFINE_MUTEX(ublk_ctl_mutex);
> >
> >
> > +static void ublk_batch_deinit_fetch_buf(const struct ublk_batch_io_data *data,
> > +                                       struct ublk_batch_fcmd *fcmd,
> > +                                       int res)
> > +{
> > +       io_uring_cmd_done(fcmd->cmd, res, data->issue_flags);
> > +       fcmd->cmd = NULL;
> > +}
> > +
> > +static int ublk_batch_fetch_post_cqe(struct ublk_batch_fcmd *fcmd,
> > +                                    struct io_br_sel *sel,
> > +                                    unsigned int issue_flags)
> > +{
> > +       if (io_uring_mshot_cmd_post_cqe(fcmd->cmd, sel, issue_flags))
> > +               return -ENOBUFS;
> > +       return 0;
> > +}
> > +
> > +static ssize_t ublk_batch_copy_io_tags(struct ublk_batch_fcmd *fcmd,
> > +                                      void __user *buf, const u16 *tag_buf,
> > +                                      unsigned int len)
> > +{
> > +       if (copy_to_user(buf, tag_buf, len))
> > +               return -EFAULT;
> > +       return len;
> > +}
> > +
> >  #define UBLK_MAX_UBLKS UBLK_MINORS
> >
> >  /*
> > @@ -1378,6 +1413,160 @@ static void ublk_dispatch_req(struct ublk_queue *ubq,
> >         }
> >  }
> >
> > +static bool __ublk_batch_prep_dispatch(struct ublk_queue *ubq,
> > +                                      const struct ublk_batch_io_data *data,
> > +                                      unsigned short tag)
> > +{
> > +       struct ublk_device *ub = data->ub;
> > +       struct ublk_io *io = &ubq->ios[tag];
> > +       struct request *req = blk_mq_tag_to_rq(ub->tag_set.tags[ubq->q_id], tag);
> > +       enum auto_buf_reg_res res = AUTO_BUF_REG_FALLBACK;
> > +       struct io_uring_cmd *cmd = data->cmd;
> > +
> > +       if (!ublk_start_io(ubq, req, io))
> 
> This doesn't look correct for UBLK_F_NEED_GET_DATA. If that's not
> supported in batch mode, then it should probably be disallowed when
> creating a batch-mode ublk device. The ublk_need_get_data() check in
> ublk_batch_commit_io_check() could also be dropped.

OK.

BTW UBLK_F_NEED_GET_DATA isn't necessary any more since user copy.

It is only for handling WRITE io command, and ublk server can copy data to
new buffer by user copy.

> 
> > +               return false;
> > +
> > +       if (ublk_support_auto_buf_reg(ubq) && ublk_rq_has_data(req))
> > +               res = __ublk_do_auto_buf_reg(ubq, req, io, cmd,
> > +                               data->issue_flags);
> 
> __ublk_do_auto_buf_reg() reads io->buf.auto_reg. That seems racy
> without holding the io spinlock.

The io lock isn't needed.  Now the io state is guaranteed to be ACTIVE,
so UBLK_U_IO_COMMIT_IO_CMDS can't commit anything for this io.

> 
> > +
> > +       if (res == AUTO_BUF_REG_FAIL)
> > +               return false;
> 
> Could be moved into the if (ublk_support_auto_buf_reg(ubq) &&
> ublk_rq_has_data(req)) statement since it won't be true otherwise?

OK.

> 
> > +
> > +       ublk_io_lock(io);
> > +       ublk_prep_auto_buf_reg_io(ubq, req, io, cmd, res);
> > +       ublk_io_unlock(io);
> > +
> > +       return true;
> > +}
> > +
> > +static bool ublk_batch_prep_dispatch(struct ublk_queue *ubq,
> > +                                    const struct ublk_batch_io_data *data,
> > +                                    unsigned short *tag_buf,
> > +                                    unsigned int len)
> > +{
> > +       bool has_unused = false;
> > +       int i;
> 
> unsigned?
> 
> > +
> > +       for (i = 0; i < len; i += 1) {
> 
> i++?
> 
> > +               unsigned short tag = tag_buf[i];
> > +
> > +               if (!__ublk_batch_prep_dispatch(ubq, data, tag)) {
> > +                       tag_buf[i] = UBLK_BATCH_IO_UNUSED_TAG;
> > +                       has_unused = true;
> > +               }
> > +       }
> > +
> > +       return has_unused;
> > +}
> > +
> > +/*
> > + * Filter out UBLK_BATCH_IO_UNUSED_TAG entries from tag_buf.
> > + * Returns the new length after filtering.
> > + */
> > +static unsigned int ublk_filter_unused_tags(unsigned short *tag_buf,
> > +                                           unsigned int len)
> > +{
> > +       unsigned int i, j;
> > +
> > +       for (i = 0, j = 0; i < len; i++) {
> > +               if (tag_buf[i] != UBLK_BATCH_IO_UNUSED_TAG) {
> > +                       if (i != j)
> > +                               tag_buf[j] = tag_buf[i];
> > +                       j++;
> > +               }
> > +       }
> > +
> > +       return j;
> > +}
> > +
> > +#define MAX_NR_TAG 128
> > +static int __ublk_batch_dispatch(struct ublk_queue *ubq,
> > +                                const struct ublk_batch_io_data *data,
> > +                                struct ublk_batch_fcmd *fcmd)
> > +{
> > +       unsigned short tag_buf[MAX_NR_TAG];
> > +       struct io_br_sel sel;
> > +       size_t len = 0;
> > +       bool needs_filter;
> > +       int ret;
> > +
> > +       sel = io_uring_cmd_buffer_select(fcmd->cmd, fcmd->buf_group, &len,
> > +                                        data->issue_flags);
> > +       if (sel.val < 0)
> > +               return sel.val;
> > +       if (!sel.addr)
> > +               return -ENOBUFS;
> > +
> > +       /* single reader needn't lock and sizeof(kfifo element) is 2 bytes */
> > +       len = min(len, sizeof(tag_buf)) / 2;
> 
> sizeof(unsigned short) instead of 2?

OK

> 
> > +       len = kfifo_out(&ubq->evts_fifo, tag_buf, len);
> > +
> > +       needs_filter = ublk_batch_prep_dispatch(ubq, data, tag_buf, len);
> > +       /* Filter out unused tags before posting to userspace */
> > +       if (unlikely(needs_filter)) {
> > +               int new_len = ublk_filter_unused_tags(tag_buf, len);
> > +
> > +               if (!new_len)
> > +                       return len;
> 
> Is the purpose of this return value just to make ublk_batch_dispatch()
> retry __ublk_batch_dispatch()? Otherwise, it seems like a strange
> value to return.

If `new_len` becomes zero, it means all these requests are handled already,
either fail or requeue, so return `len` to tell the caller to move on. I
can comment this behavior.

> 
> Also, shouldn't this path release the selected buffer to avoid leaking it?

Good catch, but io_kbuf_recycle() isn't exported, we may have to call
io_uring_mshot_cmd_post_cqe() by zeroing sel->val.

> 
> > +               len = new_len;
> > +       }
> > +
> > +       sel.val = ublk_batch_copy_io_tags(fcmd, sel.addr, tag_buf, len * 2);
> 
> sizeof(unsigned short)?

OK

> 
> > +       ret = ublk_batch_fetch_post_cqe(fcmd, &sel, data->issue_flags);
> > +       if (unlikely(ret < 0)) {
> > +               int i, res;
> > +
> > +               /*
> > +                * Undo prep state for all IOs since userspace never received them.
> > +                * This restores IOs to pre-prepared state so they can be cleanly
> > +                * re-prepared when tags are pulled from FIFO again.
> > +                */
> > +               for (i = 0; i < len; i++) {
> > +                       struct ublk_io *io = &ubq->ios[tag_buf[i]];
> > +                       int index = -1;
> > +
> > +                       ublk_io_lock(io);
> > +                       if (io->flags & UBLK_IO_FLAG_AUTO_BUF_REG)
> > +                               index = io->buf.auto_reg.index;
> 
> This is missing the io->buf_ctx_handle == io_uring_cmd_ctx_handle(cmd)
> check from ublk_handle_auto_buf_reg().

As you replied, it isn't needed because it is the same multishot command
for registering bvec buf.

> 
> > +                       io->flags &= ~(UBLK_IO_FLAG_OWNED_BY_SRV | UBLK_IO_FLAG_AUTO_BUF_REG);
> > +                       io->flags |= UBLK_IO_FLAG_ACTIVE;
> > +                       ublk_io_unlock(io);
> > +
> > +                       if (index != -1)
> > +                               io_buffer_unregister_bvec(data->cmd, index,
> > +                                               data->issue_flags);
> > +               }
> > +
> > +               res = kfifo_in_spinlocked_noirqsave(&ubq->evts_fifo,
> > +                       tag_buf, len, &ubq->evts_lock);
> > +
> > +               pr_warn("%s: copy tags or post CQE failure, move back "
> > +                               "tags(%d %zu) ret %d\n", __func__, res, len,
> > +                               ret);
> > +       }
> > +       return ret;
> > +}
> > +
> > +static __maybe_unused int
> 
> The return value looks completely unused. Just return void instead?

Yes, looks it is removed in following patch.


Thanks,
Ming

Re: [PATCH V4 13/27] ublk: add batch I/O dispatch infrastructure
Posted by Caleb Sander Mateos 8 hours ago
On Sun, Nov 30, 2025 at 6:32 PM Ming Lei <ming.lei@redhat.com> wrote:
>
> On Sun, Nov 30, 2025 at 11:24:12AM -0800, Caleb Sander Mateos wrote:
> > On Thu, Nov 20, 2025 at 6:00 PM Ming Lei <ming.lei@redhat.com> wrote:
> > >
> > > Add infrastructure for delivering I/O commands to ublk server in batches,
> > > preparing for the upcoming UBLK_U_IO_FETCH_IO_CMDS feature.
> > >
> > > Key components:
> > >
> > > - struct ublk_batch_fcmd: Represents a batch fetch uring_cmd that will
> > >   receive multiple I/O tags in a single operation, using io_uring's
> > >   multishot command for efficient ublk IO delivery.
> > >
> > > - ublk_batch_dispatch(): Batch version of ublk_dispatch_req() that:
> > >   * Pulls multiple request tags from the events FIFO (lock-free reader)
> > >   * Prepares each I/O for delivery (including auto buffer registration)
> > >   * Delivers tags to userspace via single uring_cmd notification
> > >   * Handles partial failures by restoring undelivered tags to FIFO
> > >
> > > The batch approach significantly reduces notification overhead by aggregating
> > > multiple I/O completions into single uring_cmd, while maintaining the same
> > > I/O processing semantics as individual operations.
> > >
> > > Error handling ensures system consistency: if buffer selection or CQE
> > > posting fails, undelivered tags are restored to the FIFO for retry,
> > > meantime IO state has to be restored.
> > >
> > > This runs in task work context, scheduled via io_uring_cmd_complete_in_task()
> > > or called directly from ->uring_cmd(), enabling efficient batch processing
> > > without blocking the I/O submission path.
> > >
> > > Signed-off-by: Ming Lei <ming.lei@redhat.com>
> > > ---
> > >  drivers/block/ublk_drv.c | 189 +++++++++++++++++++++++++++++++++++++++
> > >  1 file changed, 189 insertions(+)
> > >
> > > diff --git a/drivers/block/ublk_drv.c b/drivers/block/ublk_drv.c
> > > index 6ff284243630..cc9c92d97349 100644
> > > --- a/drivers/block/ublk_drv.c
> > > +++ b/drivers/block/ublk_drv.c
> > > @@ -91,6 +91,12 @@
> > >          UBLK_BATCH_F_HAS_BUF_ADDR | \
> > >          UBLK_BATCH_F_AUTO_BUF_REG_FALLBACK)
> > >
> > > +/* ublk batch fetch uring_cmd */
> > > +struct ublk_batch_fcmd {
> >
> > I would prefer "fetch_cmd" instead of "fcmd" for clarity
> >
> > > +       struct io_uring_cmd *cmd;
> > > +       unsigned short buf_group;
> > > +};
> > > +
> > >  struct ublk_uring_cmd_pdu {
> > >         /*
> > >          * Store requests in same batch temporarily for queuing them to
> > > @@ -168,6 +174,9 @@ struct ublk_batch_io_data {
> > >   */
> > >  #define UBLK_REFCOUNT_INIT (REFCOUNT_MAX / 2)
> > >
> > > +/* used for UBLK_F_BATCH_IO only */
> > > +#define UBLK_BATCH_IO_UNUSED_TAG       ((unsigned short)-1)
> > > +
> > >  union ublk_io_buf {
> > >         __u64   addr;
> > >         struct ublk_auto_buf_reg auto_reg;
> > > @@ -616,6 +625,32 @@ static wait_queue_head_t ublk_idr_wq;      /* wait until one idr is freed */
> > >  static DEFINE_MUTEX(ublk_ctl_mutex);
> > >
> > >
> > > +static void ublk_batch_deinit_fetch_buf(const struct ublk_batch_io_data *data,
> > > +                                       struct ublk_batch_fcmd *fcmd,
> > > +                                       int res)
> > > +{
> > > +       io_uring_cmd_done(fcmd->cmd, res, data->issue_flags);
> > > +       fcmd->cmd = NULL;
> > > +}
> > > +
> > > +static int ublk_batch_fetch_post_cqe(struct ublk_batch_fcmd *fcmd,
> > > +                                    struct io_br_sel *sel,
> > > +                                    unsigned int issue_flags)
> > > +{
> > > +       if (io_uring_mshot_cmd_post_cqe(fcmd->cmd, sel, issue_flags))
> > > +               return -ENOBUFS;
> > > +       return 0;
> > > +}
> > > +
> > > +static ssize_t ublk_batch_copy_io_tags(struct ublk_batch_fcmd *fcmd,
> > > +                                      void __user *buf, const u16 *tag_buf,
> > > +                                      unsigned int len)
> > > +{
> > > +       if (copy_to_user(buf, tag_buf, len))
> > > +               return -EFAULT;
> > > +       return len;
> > > +}
> > > +
> > >  #define UBLK_MAX_UBLKS UBLK_MINORS
> > >
> > >  /*
> > > @@ -1378,6 +1413,160 @@ static void ublk_dispatch_req(struct ublk_queue *ubq,
> > >         }
> > >  }
> > >
> > > +static bool __ublk_batch_prep_dispatch(struct ublk_queue *ubq,
> > > +                                      const struct ublk_batch_io_data *data,
> > > +                                      unsigned short tag)
> > > +{
> > > +       struct ublk_device *ub = data->ub;
> > > +       struct ublk_io *io = &ubq->ios[tag];
> > > +       struct request *req = blk_mq_tag_to_rq(ub->tag_set.tags[ubq->q_id], tag);
> > > +       enum auto_buf_reg_res res = AUTO_BUF_REG_FALLBACK;
> > > +       struct io_uring_cmd *cmd = data->cmd;
> > > +
> > > +       if (!ublk_start_io(ubq, req, io))
> >
> > This doesn't look correct for UBLK_F_NEED_GET_DATA. If that's not
> > supported in batch mode, then it should probably be disallowed when
> > creating a batch-mode ublk device. The ublk_need_get_data() check in
> > ublk_batch_commit_io_check() could also be dropped.
>
> OK.
>
> BTW UBLK_F_NEED_GET_DATA isn't necessary any more since user copy.
>
> It is only for handling WRITE io command, and ublk server can copy data to
> new buffer by user copy.
>
> >
> > > +               return false;
> > > +
> > > +       if (ublk_support_auto_buf_reg(ubq) && ublk_rq_has_data(req))
> > > +               res = __ublk_do_auto_buf_reg(ubq, req, io, cmd,
> > > +                               data->issue_flags);
> >
> > __ublk_do_auto_buf_reg() reads io->buf.auto_reg. That seems racy
> > without holding the io spinlock.
>
> The io lock isn't needed.  Now the io state is guaranteed to be ACTIVE,
> so UBLK_U_IO_COMMIT_IO_CMDS can't commit anything for this io.

Makes sense.

Thanks,
Caleb

>
> >
> > > +
> > > +       if (res == AUTO_BUF_REG_FAIL)
> > > +               return false;
> >
> > Could be moved into the if (ublk_support_auto_buf_reg(ubq) &&
> > ublk_rq_has_data(req)) statement since it won't be true otherwise?
>
> OK.
>
> >
> > > +
> > > +       ublk_io_lock(io);
> > > +       ublk_prep_auto_buf_reg_io(ubq, req, io, cmd, res);
> > > +       ublk_io_unlock(io);
> > > +
> > > +       return true;
> > > +}
> > > +
> > > +static bool ublk_batch_prep_dispatch(struct ublk_queue *ubq,
> > > +                                    const struct ublk_batch_io_data *data,
> > > +                                    unsigned short *tag_buf,
> > > +                                    unsigned int len)
> > > +{
> > > +       bool has_unused = false;
> > > +       int i;
> >
> > unsigned?
> >
> > > +
> > > +       for (i = 0; i < len; i += 1) {
> >
> > i++?
> >
> > > +               unsigned short tag = tag_buf[i];
> > > +
> > > +               if (!__ublk_batch_prep_dispatch(ubq, data, tag)) {
> > > +                       tag_buf[i] = UBLK_BATCH_IO_UNUSED_TAG;
> > > +                       has_unused = true;
> > > +               }
> > > +       }
> > > +
> > > +       return has_unused;
> > > +}
> > > +
> > > +/*
> > > + * Filter out UBLK_BATCH_IO_UNUSED_TAG entries from tag_buf.
> > > + * Returns the new length after filtering.
> > > + */
> > > +static unsigned int ublk_filter_unused_tags(unsigned short *tag_buf,
> > > +                                           unsigned int len)
> > > +{
> > > +       unsigned int i, j;
> > > +
> > > +       for (i = 0, j = 0; i < len; i++) {
> > > +               if (tag_buf[i] != UBLK_BATCH_IO_UNUSED_TAG) {
> > > +                       if (i != j)
> > > +                               tag_buf[j] = tag_buf[i];
> > > +                       j++;
> > > +               }
> > > +       }
> > > +
> > > +       return j;
> > > +}
> > > +
> > > +#define MAX_NR_TAG 128
> > > +static int __ublk_batch_dispatch(struct ublk_queue *ubq,
> > > +                                const struct ublk_batch_io_data *data,
> > > +                                struct ublk_batch_fcmd *fcmd)
> > > +{
> > > +       unsigned short tag_buf[MAX_NR_TAG];
> > > +       struct io_br_sel sel;
> > > +       size_t len = 0;
> > > +       bool needs_filter;
> > > +       int ret;
> > > +
> > > +       sel = io_uring_cmd_buffer_select(fcmd->cmd, fcmd->buf_group, &len,
> > > +                                        data->issue_flags);
> > > +       if (sel.val < 0)
> > > +               return sel.val;
> > > +       if (!sel.addr)
> > > +               return -ENOBUFS;
> > > +
> > > +       /* single reader needn't lock and sizeof(kfifo element) is 2 bytes */
> > > +       len = min(len, sizeof(tag_buf)) / 2;
> >
> > sizeof(unsigned short) instead of 2?
>
> OK
>
> >
> > > +       len = kfifo_out(&ubq->evts_fifo, tag_buf, len);
> > > +
> > > +       needs_filter = ublk_batch_prep_dispatch(ubq, data, tag_buf, len);
> > > +       /* Filter out unused tags before posting to userspace */
> > > +       if (unlikely(needs_filter)) {
> > > +               int new_len = ublk_filter_unused_tags(tag_buf, len);
> > > +
> > > +               if (!new_len)
> > > +                       return len;
> >
> > Is the purpose of this return value just to make ublk_batch_dispatch()
> > retry __ublk_batch_dispatch()? Otherwise, it seems like a strange
> > value to return.
>
> If `new_len` becomes zero, it means all these requests are handled already,
> either fail or requeue, so return `len` to tell the caller to move on. I
> can comment this behavior.
>
> >
> > Also, shouldn't this path release the selected buffer to avoid leaking it?
>
> Good catch, but io_kbuf_recycle() isn't exported, we may have to call
> io_uring_mshot_cmd_post_cqe() by zeroing sel->val.
>
> >
> > > +               len = new_len;
> > > +       }
> > > +
> > > +       sel.val = ublk_batch_copy_io_tags(fcmd, sel.addr, tag_buf, len * 2);
> >
> > sizeof(unsigned short)?
>
> OK
>
> >
> > > +       ret = ublk_batch_fetch_post_cqe(fcmd, &sel, data->issue_flags);
> > > +       if (unlikely(ret < 0)) {
> > > +               int i, res;
> > > +
> > > +               /*
> > > +                * Undo prep state for all IOs since userspace never received them.
> > > +                * This restores IOs to pre-prepared state so they can be cleanly
> > > +                * re-prepared when tags are pulled from FIFO again.
> > > +                */
> > > +               for (i = 0; i < len; i++) {
> > > +                       struct ublk_io *io = &ubq->ios[tag_buf[i]];
> > > +                       int index = -1;
> > > +
> > > +                       ublk_io_lock(io);
> > > +                       if (io->flags & UBLK_IO_FLAG_AUTO_BUF_REG)
> > > +                               index = io->buf.auto_reg.index;
> >
> > This is missing the io->buf_ctx_handle == io_uring_cmd_ctx_handle(cmd)
> > check from ublk_handle_auto_buf_reg().
>
> As you replied, it isn't needed because it is the same multishot command
> for registering bvec buf.
>
> >
> > > +                       io->flags &= ~(UBLK_IO_FLAG_OWNED_BY_SRV | UBLK_IO_FLAG_AUTO_BUF_REG);
> > > +                       io->flags |= UBLK_IO_FLAG_ACTIVE;
> > > +                       ublk_io_unlock(io);
> > > +
> > > +                       if (index != -1)
> > > +                               io_buffer_unregister_bvec(data->cmd, index,
> > > +                                               data->issue_flags);
> > > +               }
> > > +
> > > +               res = kfifo_in_spinlocked_noirqsave(&ubq->evts_fifo,
> > > +                       tag_buf, len, &ubq->evts_lock);
> > > +
> > > +               pr_warn("%s: copy tags or post CQE failure, move back "
> > > +                               "tags(%d %zu) ret %d\n", __func__, res, len,
> > > +                               ret);
> > > +       }
> > > +       return ret;
> > > +}
> > > +
> > > +static __maybe_unused int
> >
> > The return value looks completely unused. Just return void instead?
>
> Yes, looks it is removed in following patch.
>
>
> Thanks,
> Ming
>
Re: [PATCH V4 13/27] ublk: add batch I/O dispatch infrastructure
Posted by Caleb Sander Mateos 1 day, 4 hours ago
On Sun, Nov 30, 2025 at 11:24 AM Caleb Sander Mateos
<csander@purestorage.com> wrote:
>
> On Thu, Nov 20, 2025 at 6:00 PM Ming Lei <ming.lei@redhat.com> wrote:
> >
> > Add infrastructure for delivering I/O commands to ublk server in batches,
> > preparing for the upcoming UBLK_U_IO_FETCH_IO_CMDS feature.
> >
> > Key components:
> >
> > - struct ublk_batch_fcmd: Represents a batch fetch uring_cmd that will
> >   receive multiple I/O tags in a single operation, using io_uring's
> >   multishot command for efficient ublk IO delivery.
> >
> > - ublk_batch_dispatch(): Batch version of ublk_dispatch_req() that:
> >   * Pulls multiple request tags from the events FIFO (lock-free reader)
> >   * Prepares each I/O for delivery (including auto buffer registration)
> >   * Delivers tags to userspace via single uring_cmd notification
> >   * Handles partial failures by restoring undelivered tags to FIFO
> >
> > The batch approach significantly reduces notification overhead by aggregating
> > multiple I/O completions into single uring_cmd, while maintaining the same
> > I/O processing semantics as individual operations.
> >
> > Error handling ensures system consistency: if buffer selection or CQE
> > posting fails, undelivered tags are restored to the FIFO for retry,
> > meantime IO state has to be restored.
> >
> > This runs in task work context, scheduled via io_uring_cmd_complete_in_task()
> > or called directly from ->uring_cmd(), enabling efficient batch processing
> > without blocking the I/O submission path.
> >
> > Signed-off-by: Ming Lei <ming.lei@redhat.com>
> > ---
> >  drivers/block/ublk_drv.c | 189 +++++++++++++++++++++++++++++++++++++++
> >  1 file changed, 189 insertions(+)
> >
> > diff --git a/drivers/block/ublk_drv.c b/drivers/block/ublk_drv.c
> > index 6ff284243630..cc9c92d97349 100644
> > --- a/drivers/block/ublk_drv.c
> > +++ b/drivers/block/ublk_drv.c
> > @@ -91,6 +91,12 @@
> >          UBLK_BATCH_F_HAS_BUF_ADDR | \
> >          UBLK_BATCH_F_AUTO_BUF_REG_FALLBACK)
> >
> > +/* ublk batch fetch uring_cmd */
> > +struct ublk_batch_fcmd {
>
> I would prefer "fetch_cmd" instead of "fcmd" for clarity
>
> > +       struct io_uring_cmd *cmd;
> > +       unsigned short buf_group;
> > +};
> > +
> >  struct ublk_uring_cmd_pdu {
> >         /*
> >          * Store requests in same batch temporarily for queuing them to
> > @@ -168,6 +174,9 @@ struct ublk_batch_io_data {
> >   */
> >  #define UBLK_REFCOUNT_INIT (REFCOUNT_MAX / 2)
> >
> > +/* used for UBLK_F_BATCH_IO only */
> > +#define UBLK_BATCH_IO_UNUSED_TAG       ((unsigned short)-1)
> > +
> >  union ublk_io_buf {
> >         __u64   addr;
> >         struct ublk_auto_buf_reg auto_reg;
> > @@ -616,6 +625,32 @@ static wait_queue_head_t ublk_idr_wq;      /* wait until one idr is freed */
> >  static DEFINE_MUTEX(ublk_ctl_mutex);
> >
> >
> > +static void ublk_batch_deinit_fetch_buf(const struct ublk_batch_io_data *data,
> > +                                       struct ublk_batch_fcmd *fcmd,
> > +                                       int res)
> > +{
> > +       io_uring_cmd_done(fcmd->cmd, res, data->issue_flags);
> > +       fcmd->cmd = NULL;
> > +}
> > +
> > +static int ublk_batch_fetch_post_cqe(struct ublk_batch_fcmd *fcmd,
> > +                                    struct io_br_sel *sel,
> > +                                    unsigned int issue_flags)
> > +{
> > +       if (io_uring_mshot_cmd_post_cqe(fcmd->cmd, sel, issue_flags))
> > +               return -ENOBUFS;
> > +       return 0;
> > +}
> > +
> > +static ssize_t ublk_batch_copy_io_tags(struct ublk_batch_fcmd *fcmd,
> > +                                      void __user *buf, const u16 *tag_buf,
> > +                                      unsigned int len)
> > +{
> > +       if (copy_to_user(buf, tag_buf, len))
> > +               return -EFAULT;
> > +       return len;
> > +}
> > +
> >  #define UBLK_MAX_UBLKS UBLK_MINORS
> >
> >  /*
> > @@ -1378,6 +1413,160 @@ static void ublk_dispatch_req(struct ublk_queue *ubq,
> >         }
> >  }
> >
> > +static bool __ublk_batch_prep_dispatch(struct ublk_queue *ubq,
> > +                                      const struct ublk_batch_io_data *data,
> > +                                      unsigned short tag)
> > +{
> > +       struct ublk_device *ub = data->ub;
> > +       struct ublk_io *io = &ubq->ios[tag];
> > +       struct request *req = blk_mq_tag_to_rq(ub->tag_set.tags[ubq->q_id], tag);
> > +       enum auto_buf_reg_res res = AUTO_BUF_REG_FALLBACK;
> > +       struct io_uring_cmd *cmd = data->cmd;
> > +
> > +       if (!ublk_start_io(ubq, req, io))
>
> This doesn't look correct for UBLK_F_NEED_GET_DATA. If that's not
> supported in batch mode, then it should probably be disallowed when
> creating a batch-mode ublk device. The ublk_need_get_data() check in
> ublk_batch_commit_io_check() could also be dropped.
>
> > +               return false;
> > +
> > +       if (ublk_support_auto_buf_reg(ubq) && ublk_rq_has_data(req))
> > +               res = __ublk_do_auto_buf_reg(ubq, req, io, cmd,
> > +                               data->issue_flags);
>
> __ublk_do_auto_buf_reg() reads io->buf.auto_reg. That seems racy
> without holding the io spinlock.
>
> > +
> > +       if (res == AUTO_BUF_REG_FAIL)
> > +               return false;
>
> Could be moved into the if (ublk_support_auto_buf_reg(ubq) &&
> ublk_rq_has_data(req)) statement since it won't be true otherwise?
>
> > +
> > +       ublk_io_lock(io);
> > +       ublk_prep_auto_buf_reg_io(ubq, req, io, cmd, res);
> > +       ublk_io_unlock(io);
> > +
> > +       return true;
> > +}
> > +
> > +static bool ublk_batch_prep_dispatch(struct ublk_queue *ubq,
> > +                                    const struct ublk_batch_io_data *data,
> > +                                    unsigned short *tag_buf,
> > +                                    unsigned int len)
> > +{
> > +       bool has_unused = false;
> > +       int i;
>
> unsigned?
>
> > +
> > +       for (i = 0; i < len; i += 1) {
>
> i++?
>
> > +               unsigned short tag = tag_buf[i];
> > +
> > +               if (!__ublk_batch_prep_dispatch(ubq, data, tag)) {
> > +                       tag_buf[i] = UBLK_BATCH_IO_UNUSED_TAG;
> > +                       has_unused = true;
> > +               }
> > +       }
> > +
> > +       return has_unused;
> > +}
> > +
> > +/*
> > + * Filter out UBLK_BATCH_IO_UNUSED_TAG entries from tag_buf.
> > + * Returns the new length after filtering.
> > + */
> > +static unsigned int ublk_filter_unused_tags(unsigned short *tag_buf,
> > +                                           unsigned int len)
> > +{
> > +       unsigned int i, j;
> > +
> > +       for (i = 0, j = 0; i < len; i++) {
> > +               if (tag_buf[i] != UBLK_BATCH_IO_UNUSED_TAG) {
> > +                       if (i != j)
> > +                               tag_buf[j] = tag_buf[i];
> > +                       j++;
> > +               }
> > +       }
> > +
> > +       return j;
> > +}
> > +
> > +#define MAX_NR_TAG 128
> > +static int __ublk_batch_dispatch(struct ublk_queue *ubq,
> > +                                const struct ublk_batch_io_data *data,
> > +                                struct ublk_batch_fcmd *fcmd)
> > +{
> > +       unsigned short tag_buf[MAX_NR_TAG];
> > +       struct io_br_sel sel;
> > +       size_t len = 0;
> > +       bool needs_filter;
> > +       int ret;
> > +
> > +       sel = io_uring_cmd_buffer_select(fcmd->cmd, fcmd->buf_group, &len,
> > +                                        data->issue_flags);
> > +       if (sel.val < 0)
> > +               return sel.val;
> > +       if (!sel.addr)
> > +               return -ENOBUFS;
> > +
> > +       /* single reader needn't lock and sizeof(kfifo element) is 2 bytes */
> > +       len = min(len, sizeof(tag_buf)) / 2;
>
> sizeof(unsigned short) instead of 2?
>
> > +       len = kfifo_out(&ubq->evts_fifo, tag_buf, len);
> > +
> > +       needs_filter = ublk_batch_prep_dispatch(ubq, data, tag_buf, len);
> > +       /* Filter out unused tags before posting to userspace */
> > +       if (unlikely(needs_filter)) {
> > +               int new_len = ublk_filter_unused_tags(tag_buf, len);
> > +
> > +               if (!new_len)
> > +                       return len;
>
> Is the purpose of this return value just to make ublk_batch_dispatch()
> retry __ublk_batch_dispatch()? Otherwise, it seems like a strange
> value to return.
>
> Also, shouldn't this path release the selected buffer to avoid leaking it?
>
> > +               len = new_len;
> > +       }
> > +
> > +       sel.val = ublk_batch_copy_io_tags(fcmd, sel.addr, tag_buf, len * 2);
>
> sizeof(unsigned short)?
>
> > +       ret = ublk_batch_fetch_post_cqe(fcmd, &sel, data->issue_flags);
> > +       if (unlikely(ret < 0)) {
> > +               int i, res;
> > +
> > +               /*
> > +                * Undo prep state for all IOs since userspace never received them.
> > +                * This restores IOs to pre-prepared state so they can be cleanly
> > +                * re-prepared when tags are pulled from FIFO again.
> > +                */
> > +               for (i = 0; i < len; i++) {
> > +                       struct ublk_io *io = &ubq->ios[tag_buf[i]];
> > +                       int index = -1;
> > +
> > +                       ublk_io_lock(io);
> > +                       if (io->flags & UBLK_IO_FLAG_AUTO_BUF_REG)
> > +                               index = io->buf.auto_reg.index;
>
> This is missing the io->buf_ctx_handle == io_uring_cmd_ctx_handle(cmd)
> check from ublk_handle_auto_buf_reg().

Never mind, I guess that's okay because both the register and register
are using data->cmd as the io_uring_cmd.

>
> > +                       io->flags &= ~(UBLK_IO_FLAG_OWNED_BY_SRV | UBLK_IO_FLAG_AUTO_BUF_REG);
> > +                       io->flags |= UBLK_IO_FLAG_ACTIVE;
> > +                       ublk_io_unlock(io);
> > +
> > +                       if (index != -1)
> > +                               io_buffer_unregister_bvec(data->cmd, index,
> > +                                               data->issue_flags);
> > +               }
> > +
> > +               res = kfifo_in_spinlocked_noirqsave(&ubq->evts_fifo,
> > +                       tag_buf, len, &ubq->evts_lock);
> > +
> > +               pr_warn("%s: copy tags or post CQE failure, move back "
> > +                               "tags(%d %zu) ret %d\n", __func__, res, len,
> > +                               ret);
> > +       }
> > +       return ret;
> > +}
> > +
> > +static __maybe_unused int
>
> The return value looks completely unused. Just return void instead?
>
> Best,
> Caleb
>
> > +ublk_batch_dispatch(struct ublk_queue *ubq,
> > +                   const struct ublk_batch_io_data *data,
> > +                   struct ublk_batch_fcmd *fcmd)
> > +{
> > +       int ret = 0;
> > +
> > +       while (!ublk_io_evts_empty(ubq)) {
> > +               ret = __ublk_batch_dispatch(ubq, data, fcmd);
> > +               if (ret <= 0)
> > +                       break;
> > +       }
> > +
> > +       if (ret < 0)
> > +               ublk_batch_deinit_fetch_buf(data, fcmd, ret);
> > +
> > +       return ret;
> > +}
> > +
> >  static void ublk_cmd_tw_cb(struct io_uring_cmd *cmd,
> >                            unsigned int issue_flags)
> >  {
> > --
> > 2.47.0
> >