[PATCH V4 10/27] ublk: handle UBLK_U_IO_PREP_IO_CMDS

Ming Lei posted 27 patches 1 week, 3 days ago
[PATCH V4 10/27] ublk: handle UBLK_U_IO_PREP_IO_CMDS
Posted by Ming Lei 1 week, 3 days ago
This commit implements the handling of the UBLK_U_IO_PREP_IO_CMDS command,
which allows userspace to prepare a batch of I/O requests.

The core of this change is the `ublk_walk_cmd_buf` function, which iterates
over the elements in the uring_cmd fixed buffer. For each element, it parses
the I/O details, finds the corresponding `ublk_io` structure, and prepares it
for future dispatch.

Add per-io lock for protecting concurrent delivery and committing.

Signed-off-by: Ming Lei <ming.lei@redhat.com>
---
 drivers/block/ublk_drv.c      | 193 +++++++++++++++++++++++++++++++++-
 include/uapi/linux/ublk_cmd.h |   5 +
 2 files changed, 197 insertions(+), 1 deletion(-)

diff --git a/drivers/block/ublk_drv.c b/drivers/block/ublk_drv.c
index 21890947ceec..66c77daae955 100644
--- a/drivers/block/ublk_drv.c
+++ b/drivers/block/ublk_drv.c
@@ -117,6 +117,7 @@ struct ublk_batch_io_data {
 	struct ublk_device *ub;
 	struct io_uring_cmd *cmd;
 	struct ublk_batch_io header;
+	unsigned int issue_flags;
 };
 
 /*
@@ -201,6 +202,7 @@ struct ublk_io {
 	unsigned task_registered_buffers;
 
 	void *buf_ctx_handle;
+	spinlock_t lock;
 } ____cacheline_aligned_in_smp;
 
 struct ublk_queue {
@@ -270,6 +272,16 @@ static inline bool ublk_dev_support_batch_io(const struct ublk_device *ub)
 	return false;
 }
 
+static inline void ublk_io_lock(struct ublk_io *io)
+{
+	spin_lock(&io->lock);
+}
+
+static inline void ublk_io_unlock(struct ublk_io *io)
+{
+	spin_unlock(&io->lock);
+}
+
 static inline struct ublksrv_io_desc *
 ublk_get_iod(const struct ublk_queue *ubq, unsigned tag)
 {
@@ -2531,6 +2543,171 @@ static int ublk_ch_uring_cmd(struct io_uring_cmd *cmd, unsigned int issue_flags)
 	return ublk_ch_uring_cmd_local(cmd, issue_flags);
 }
 
+static inline __u64 ublk_batch_buf_addr(const struct ublk_batch_io *uc,
+					const struct ublk_elem_header *elem)
+{
+	const void *buf = elem;
+
+	if (uc->flags & UBLK_BATCH_F_HAS_BUF_ADDR)
+		return *(__u64 *)(buf + sizeof(*elem));
+	return 0;
+}
+
+static struct ublk_auto_buf_reg
+ublk_batch_auto_buf_reg(const struct ublk_batch_io *uc,
+			const struct ublk_elem_header *elem)
+{
+	struct ublk_auto_buf_reg reg = {
+		.index = elem->buf_index,
+		.flags = (uc->flags & UBLK_BATCH_F_AUTO_BUF_REG_FALLBACK) ?
+			UBLK_AUTO_BUF_REG_FALLBACK : 0,
+	};
+
+	return reg;
+}
+
+/*
+ * 48 can hold any type of buffer element(8, 16 and 24 bytes) because
+ * it is the least common multiple(LCM) of 8, 16 and 24
+ */
+#define UBLK_CMD_BATCH_TMP_BUF_SZ  (48 * 10)
+struct ublk_batch_io_iter {
+	void __user *uaddr;
+	unsigned done, total;
+	unsigned char elem_bytes;
+	/* copy to this buffer from user space */
+	unsigned char buf[UBLK_CMD_BATCH_TMP_BUF_SZ];
+};
+
+static inline int
+__ublk_walk_cmd_buf(struct ublk_queue *ubq,
+		    struct ublk_batch_io_iter *iter,
+		    const struct ublk_batch_io_data *data,
+		    unsigned bytes,
+		    int (*cb)(struct ublk_queue *q,
+			    const struct ublk_batch_io_data *data,
+			    const struct ublk_elem_header *elem))
+{
+	unsigned int i;
+	int ret = 0;
+
+	for (i = 0; i < bytes; i += iter->elem_bytes) {
+		const struct ublk_elem_header *elem =
+			(const struct ublk_elem_header *)&iter->buf[i];
+
+		if (unlikely(elem->tag >= data->ub->dev_info.queue_depth)) {
+			ret = -EINVAL;
+			break;
+		}
+
+		ret = cb(ubq, data, elem);
+		if (unlikely(ret))
+			break;
+	}
+
+	iter->done += i;
+	return ret;
+}
+
+static int ublk_walk_cmd_buf(struct ublk_batch_io_iter *iter,
+			     const struct ublk_batch_io_data *data,
+			     int (*cb)(struct ublk_queue *q,
+				     const struct ublk_batch_io_data *data,
+				     const struct ublk_elem_header *elem))
+{
+	struct ublk_queue *ubq = ublk_get_queue(data->ub, data->header.q_id);
+	int ret = 0;
+
+	while (iter->done < iter->total) {
+		unsigned int len = min(sizeof(iter->buf), iter->total - iter->done);
+
+		if (copy_from_user(iter->buf, iter->uaddr + iter->done, len)) {
+			pr_warn("ublk%d: read batch cmd buffer failed\n",
+					data->ub->dev_info.dev_id);
+			return -EFAULT;
+		}
+
+		ret = __ublk_walk_cmd_buf(ubq, iter, data, len, cb);
+		if (ret)
+			return ret;
+	}
+	return 0;
+}
+
+static int ublk_batch_unprep_io(struct ublk_queue *ubq,
+				const struct ublk_batch_io_data *data,
+				const struct ublk_elem_header *elem)
+{
+	struct ublk_io *io = &ubq->ios[elem->tag];
+
+	data->ub->nr_io_ready--;
+	ublk_io_lock(io);
+	io->flags = 0;
+	ublk_io_unlock(io);
+	return 0;
+}
+
+static void ublk_batch_revert_prep_cmd(struct ublk_batch_io_iter *iter,
+				       const struct ublk_batch_io_data *data)
+{
+	int ret;
+
+	/* Re-process only what we've already processed, starting from beginning */
+	iter->total = iter->done;
+	iter->done = 0;
+
+	ret = ublk_walk_cmd_buf(iter, data, ublk_batch_unprep_io);
+	WARN_ON_ONCE(ret);
+}
+
+static int ublk_batch_prep_io(struct ublk_queue *ubq,
+			      const struct ublk_batch_io_data *data,
+			      const struct ublk_elem_header *elem)
+{
+	struct ublk_io *io = &ubq->ios[elem->tag];
+	const struct ublk_batch_io *uc = &data->header;
+	union ublk_io_buf buf = { 0 };
+	int ret;
+
+	if (ublk_dev_support_auto_buf_reg(data->ub))
+		buf.auto_reg = ublk_batch_auto_buf_reg(uc, elem);
+	else if (ublk_dev_need_map_io(data->ub)) {
+		buf.addr = ublk_batch_buf_addr(uc, elem);
+
+		ret = ublk_check_fetch_buf(data->ub, buf.addr);
+		if (ret)
+			return ret;
+	}
+
+	ublk_io_lock(io);
+	ret = __ublk_fetch(data->cmd, data->ub, io);
+	if (!ret)
+		io->buf = buf;
+	ublk_io_unlock(io);
+
+	return ret;
+}
+
+static int ublk_handle_batch_prep_cmd(const struct ublk_batch_io_data *data)
+{
+	const struct ublk_batch_io *uc = &data->header;
+	struct io_uring_cmd *cmd = data->cmd;
+	struct ublk_batch_io_iter iter = {
+		.uaddr = u64_to_user_ptr(READ_ONCE(cmd->sqe->addr)),
+		.total = uc->nr_elem * uc->elem_bytes,
+		.elem_bytes = uc->elem_bytes,
+	};
+	int ret;
+
+	mutex_lock(&data->ub->mutex);
+	ret = ublk_walk_cmd_buf(&iter, data, ublk_batch_prep_io);
+
+	if (ret && iter.done)
+		ublk_batch_revert_prep_cmd(&iter, data);
+	mutex_unlock(&data->ub->mutex);
+	return ret;
+}
+
 static int ublk_check_batch_cmd_flags(const struct ublk_batch_io *uc)
 {
 	unsigned elem_bytes = sizeof(struct ublk_elem_header);
@@ -2587,6 +2764,7 @@ static int ublk_ch_batch_io_uring_cmd(struct io_uring_cmd *cmd,
 			.nr_elem = READ_ONCE(uc->nr_elem),
 			.elem_bytes = READ_ONCE(uc->elem_bytes),
 		},
+		.issue_flags = issue_flags,
 	};
 	u32 cmd_op = cmd->cmd_op;
 	int ret = -EINVAL;
@@ -2596,6 +2774,11 @@ static int ublk_ch_batch_io_uring_cmd(struct io_uring_cmd *cmd,
 
 	switch (cmd_op) {
 	case UBLK_U_IO_PREP_IO_CMDS:
+		ret = ublk_check_batch_cmd(&data);
+		if (ret)
+			goto out;
+		ret = ublk_handle_batch_prep_cmd(&data);
+		break;
 	case UBLK_U_IO_COMMIT_IO_CMDS:
 		ret = ublk_check_batch_cmd(&data);
 		if (ret)
@@ -2770,7 +2953,7 @@ static int ublk_init_queue(struct ublk_device *ub, int q_id)
 	struct ublk_queue *ubq;
 	struct page *page;
 	int numa_node;
-	int size;
+	int size, i;
 
 	/* Determine NUMA node based on queue's CPU affinity */
 	numa_node = ublk_get_queue_numa_node(ub, q_id);
@@ -2795,6 +2978,9 @@ static int ublk_init_queue(struct ublk_device *ub, int q_id)
 	}
 	ubq->io_cmd_buf = page_address(page);
 
+	for (i = 0; i < ubq->q_depth; i++)
+		spin_lock_init(&ubq->ios[i].lock);
+
 	ub->queues[q_id] = ubq;
 	ubq->dev = ub;
 	return 0;
@@ -3021,6 +3207,11 @@ static int ublk_ctrl_start_dev(struct ublk_device *ub,
 		return -EINVAL;
 
 	mutex_lock(&ub->mutex);
+	/* device may become not ready in case of F_BATCH */
+	if (!ublk_dev_ready(ub)) {
+		ret = -EINVAL;
+		goto out_unlock;
+	}
 	if (ub->dev_info.state == UBLK_S_DEV_LIVE ||
 	    test_bit(UB_STATE_USED, &ub->state)) {
 		ret = -EEXIST;
diff --git a/include/uapi/linux/ublk_cmd.h b/include/uapi/linux/ublk_cmd.h
index 2ce5a496b622..c96c299057c3 100644
--- a/include/uapi/linux/ublk_cmd.h
+++ b/include/uapi/linux/ublk_cmd.h
@@ -102,6 +102,11 @@
 	_IOWR('u', 0x23, struct ublksrv_io_cmd)
 #define	UBLK_U_IO_UNREGISTER_IO_BUF	\
 	_IOWR('u', 0x24, struct ublksrv_io_cmd)
+
+/*
+ * return 0 if the command is run successfully, otherwise failure code
+ * is returned
+ */
 #define	UBLK_U_IO_PREP_IO_CMDS	\
 	_IOWR('u', 0x25, struct ublk_batch_io)
 #define	UBLK_U_IO_COMMIT_IO_CMDS	\
-- 
2.47.0
Re: [PATCH V4 10/27] ublk: handle UBLK_U_IO_PREP_IO_CMDS
Posted by Caleb Sander Mateos 1 day, 6 hours ago
On Thu, Nov 20, 2025 at 5:59 PM Ming Lei <ming.lei@redhat.com> wrote:
>
> This commit implements the handling of the UBLK_U_IO_PREP_IO_CMDS command,
> which allows userspace to prepare a batch of I/O requests.
>
> The core of this change is the `ublk_walk_cmd_buf` function, which iterates
> over the elements in the uring_cmd fixed buffer. For each element, it parses
> the I/O details, finds the corresponding `ublk_io` structure, and prepares it
> for future dispatch.
>
> Add per-io lock for protecting concurrent delivery and committing.
>
> Signed-off-by: Ming Lei <ming.lei@redhat.com>
> ---
>  drivers/block/ublk_drv.c      | 193 +++++++++++++++++++++++++++++++++-
>  include/uapi/linux/ublk_cmd.h |   5 +
>  2 files changed, 197 insertions(+), 1 deletion(-)
>
> diff --git a/drivers/block/ublk_drv.c b/drivers/block/ublk_drv.c
> index 21890947ceec..66c77daae955 100644
> --- a/drivers/block/ublk_drv.c
> +++ b/drivers/block/ublk_drv.c
> @@ -117,6 +117,7 @@ struct ublk_batch_io_data {
>         struct ublk_device *ub;
>         struct io_uring_cmd *cmd;
>         struct ublk_batch_io header;
> +       unsigned int issue_flags;
>  };
>
>  /*
> @@ -201,6 +202,7 @@ struct ublk_io {
>         unsigned task_registered_buffers;
>
>         void *buf_ctx_handle;
> +       spinlock_t lock;
>  } ____cacheline_aligned_in_smp;
>
>  struct ublk_queue {
> @@ -270,6 +272,16 @@ static inline bool ublk_dev_support_batch_io(const struct ublk_device *ub)
>         return false;
>  }
>
> +static inline void ublk_io_lock(struct ublk_io *io)
> +{
> +       spin_lock(&io->lock);
> +}
> +
> +static inline void ublk_io_unlock(struct ublk_io *io)
> +{
> +       spin_unlock(&io->lock);
> +}
> +
>  static inline struct ublksrv_io_desc *
>  ublk_get_iod(const struct ublk_queue *ubq, unsigned tag)
>  {
> @@ -2531,6 +2543,171 @@ static int ublk_ch_uring_cmd(struct io_uring_cmd *cmd, unsigned int issue_flags)
>         return ublk_ch_uring_cmd_local(cmd, issue_flags);
>  }
>
> +static inline __u64 ublk_batch_buf_addr(const struct ublk_batch_io *uc,
> +                                       const struct ublk_elem_header *elem)
> +{
> +       const void *buf = elem;
> +
> +       if (uc->flags & UBLK_BATCH_F_HAS_BUF_ADDR)
> +               return *(__u64 *)(buf + sizeof(*elem));

Sorry, one more minor suggestion: cast to a const pointer?

Best,
Caleb

> +       return 0;
> +}
> +
> +static struct ublk_auto_buf_reg
> +ublk_batch_auto_buf_reg(const struct ublk_batch_io *uc,
> +                       const struct ublk_elem_header *elem)
> +{
> +       struct ublk_auto_buf_reg reg = {
> +               .index = elem->buf_index,
> +               .flags = (uc->flags & UBLK_BATCH_F_AUTO_BUF_REG_FALLBACK) ?
> +                       UBLK_AUTO_BUF_REG_FALLBACK : 0,
> +       };
> +
> +       return reg;
> +}
> +
> +/*
> + * 48 can hold any type of buffer element(8, 16 and 24 bytes) because
> + * it is the least common multiple(LCM) of 8, 16 and 24
> + */
> +#define UBLK_CMD_BATCH_TMP_BUF_SZ  (48 * 10)
> +struct ublk_batch_io_iter {
> +       void __user *uaddr;
> +       unsigned done, total;
> +       unsigned char elem_bytes;
> +       /* copy to this buffer from user space */
> +       unsigned char buf[UBLK_CMD_BATCH_TMP_BUF_SZ];
> +};
> +
> +static inline int
> +__ublk_walk_cmd_buf(struct ublk_queue *ubq,
> +                   struct ublk_batch_io_iter *iter,
> +                   const struct ublk_batch_io_data *data,
> +                   unsigned bytes,
> +                   int (*cb)(struct ublk_queue *q,
> +                           const struct ublk_batch_io_data *data,
> +                           const struct ublk_elem_header *elem))
> +{
> +       unsigned int i;
> +       int ret = 0;
> +
> +       for (i = 0; i < bytes; i += iter->elem_bytes) {
> +               const struct ublk_elem_header *elem =
> +                       (const struct ublk_elem_header *)&iter->buf[i];
> +
> +               if (unlikely(elem->tag >= data->ub->dev_info.queue_depth)) {
> +                       ret = -EINVAL;
> +                       break;
> +               }
> +
> +               ret = cb(ubq, data, elem);
> +               if (unlikely(ret))
> +                       break;
> +       }
> +
> +       iter->done += i;
> +       return ret;
> +}
> +
> +static int ublk_walk_cmd_buf(struct ublk_batch_io_iter *iter,
> +                            const struct ublk_batch_io_data *data,
> +                            int (*cb)(struct ublk_queue *q,
> +                                    const struct ublk_batch_io_data *data,
> +                                    const struct ublk_elem_header *elem))
> +{
> +       struct ublk_queue *ubq = ublk_get_queue(data->ub, data->header.q_id);
> +       int ret = 0;
> +
> +       while (iter->done < iter->total) {
> +               unsigned int len = min(sizeof(iter->buf), iter->total - iter->done);
> +
> +               if (copy_from_user(iter->buf, iter->uaddr + iter->done, len)) {
> +                       pr_warn("ublk%d: read batch cmd buffer failed\n",
> +                                       data->ub->dev_info.dev_id);
> +                       return -EFAULT;
> +               }
> +
> +               ret = __ublk_walk_cmd_buf(ubq, iter, data, len, cb);
> +               if (ret)
> +                       return ret;
> +       }
> +       return 0;
> +}
> +
> +static int ublk_batch_unprep_io(struct ublk_queue *ubq,
> +                               const struct ublk_batch_io_data *data,
> +                               const struct ublk_elem_header *elem)
> +{
> +       struct ublk_io *io = &ubq->ios[elem->tag];
> +
> +       data->ub->nr_io_ready--;
> +       ublk_io_lock(io);
> +       io->flags = 0;
> +       ublk_io_unlock(io);
> +       return 0;
> +}
> +
> +static void ublk_batch_revert_prep_cmd(struct ublk_batch_io_iter *iter,
> +                                      const struct ublk_batch_io_data *data)
> +{
> +       int ret;
> +
> +       /* Re-process only what we've already processed, starting from beginning */
> +       iter->total = iter->done;
> +       iter->done = 0;
> +
> +       ret = ublk_walk_cmd_buf(iter, data, ublk_batch_unprep_io);
> +       WARN_ON_ONCE(ret);
> +}
> +
> +static int ublk_batch_prep_io(struct ublk_queue *ubq,
> +                             const struct ublk_batch_io_data *data,
> +                             const struct ublk_elem_header *elem)
> +{
> +       struct ublk_io *io = &ubq->ios[elem->tag];
> +       const struct ublk_batch_io *uc = &data->header;
> +       union ublk_io_buf buf = { 0 };
> +       int ret;
> +
> +       if (ublk_dev_support_auto_buf_reg(data->ub))
> +               buf.auto_reg = ublk_batch_auto_buf_reg(uc, elem);
> +       else if (ublk_dev_need_map_io(data->ub)) {
> +               buf.addr = ublk_batch_buf_addr(uc, elem);
> +
> +               ret = ublk_check_fetch_buf(data->ub, buf.addr);
> +               if (ret)
> +                       return ret;
> +       }
> +
> +       ublk_io_lock(io);
> +       ret = __ublk_fetch(data->cmd, data->ub, io);
> +       if (!ret)
> +               io->buf = buf;
> +       ublk_io_unlock(io);
> +
> +       return ret;
> +}
> +
> +static int ublk_handle_batch_prep_cmd(const struct ublk_batch_io_data *data)
> +{
> +       const struct ublk_batch_io *uc = &data->header;
> +       struct io_uring_cmd *cmd = data->cmd;
> +       struct ublk_batch_io_iter iter = {
> +               .uaddr = u64_to_user_ptr(READ_ONCE(cmd->sqe->addr)),
> +               .total = uc->nr_elem * uc->elem_bytes,
> +               .elem_bytes = uc->elem_bytes,
> +       };
> +       int ret;
> +
> +       mutex_lock(&data->ub->mutex);
> +       ret = ublk_walk_cmd_buf(&iter, data, ublk_batch_prep_io);
> +
> +       if (ret && iter.done)
> +               ublk_batch_revert_prep_cmd(&iter, data);
> +       mutex_unlock(&data->ub->mutex);
> +       return ret;
> +}
> +
>  static int ublk_check_batch_cmd_flags(const struct ublk_batch_io *uc)
>  {
>         unsigned elem_bytes = sizeof(struct ublk_elem_header);
> @@ -2587,6 +2764,7 @@ static int ublk_ch_batch_io_uring_cmd(struct io_uring_cmd *cmd,
>                         .nr_elem = READ_ONCE(uc->nr_elem),
>                         .elem_bytes = READ_ONCE(uc->elem_bytes),
>                 },
> +               .issue_flags = issue_flags,
>         };
>         u32 cmd_op = cmd->cmd_op;
>         int ret = -EINVAL;
> @@ -2596,6 +2774,11 @@ static int ublk_ch_batch_io_uring_cmd(struct io_uring_cmd *cmd,
>
>         switch (cmd_op) {
>         case UBLK_U_IO_PREP_IO_CMDS:
> +               ret = ublk_check_batch_cmd(&data);
> +               if (ret)
> +                       goto out;
> +               ret = ublk_handle_batch_prep_cmd(&data);
> +               break;
>         case UBLK_U_IO_COMMIT_IO_CMDS:
>                 ret = ublk_check_batch_cmd(&data);
>                 if (ret)
> @@ -2770,7 +2953,7 @@ static int ublk_init_queue(struct ublk_device *ub, int q_id)
>         struct ublk_queue *ubq;
>         struct page *page;
>         int numa_node;
> -       int size;
> +       int size, i;
>
>         /* Determine NUMA node based on queue's CPU affinity */
>         numa_node = ublk_get_queue_numa_node(ub, q_id);
> @@ -2795,6 +2978,9 @@ static int ublk_init_queue(struct ublk_device *ub, int q_id)
>         }
>         ubq->io_cmd_buf = page_address(page);
>
> +       for (i = 0; i < ubq->q_depth; i++)
> +               spin_lock_init(&ubq->ios[i].lock);
> +
>         ub->queues[q_id] = ubq;
>         ubq->dev = ub;
>         return 0;
> @@ -3021,6 +3207,11 @@ static int ublk_ctrl_start_dev(struct ublk_device *ub,
>                 return -EINVAL;
>
>         mutex_lock(&ub->mutex);
> +       /* device may become not ready in case of F_BATCH */
> +       if (!ublk_dev_ready(ub)) {
> +               ret = -EINVAL;
> +               goto out_unlock;
> +       }
>         if (ub->dev_info.state == UBLK_S_DEV_LIVE ||
>             test_bit(UB_STATE_USED, &ub->state)) {
>                 ret = -EEXIST;
> diff --git a/include/uapi/linux/ublk_cmd.h b/include/uapi/linux/ublk_cmd.h
> index 2ce5a496b622..c96c299057c3 100644
> --- a/include/uapi/linux/ublk_cmd.h
> +++ b/include/uapi/linux/ublk_cmd.h
> @@ -102,6 +102,11 @@
>         _IOWR('u', 0x23, struct ublksrv_io_cmd)
>  #define        UBLK_U_IO_UNREGISTER_IO_BUF     \
>         _IOWR('u', 0x24, struct ublksrv_io_cmd)
> +
> +/*
> + * return 0 if the command is run successfully, otherwise failure code
> + * is returned
> + */
>  #define        UBLK_U_IO_PREP_IO_CMDS  \
>         _IOWR('u', 0x25, struct ublk_batch_io)
>  #define        UBLK_U_IO_COMMIT_IO_CMDS        \
> --
> 2.47.0
>
Re: [PATCH V4 10/27] ublk: handle UBLK_U_IO_PREP_IO_CMDS
Posted by Caleb Sander Mateos 2 days, 6 hours ago
On Thu, Nov 20, 2025 at 5:59 PM Ming Lei <ming.lei@redhat.com> wrote:
>
> This commit implements the handling of the UBLK_U_IO_PREP_IO_CMDS command,
> which allows userspace to prepare a batch of I/O requests.
>
> The core of this change is the `ublk_walk_cmd_buf` function, which iterates
> over the elements in the uring_cmd fixed buffer. For each element, it parses
> the I/O details, finds the corresponding `ublk_io` structure, and prepares it
> for future dispatch.
>
> Add per-io lock for protecting concurrent delivery and committing.
>
> Signed-off-by: Ming Lei <ming.lei@redhat.com>
> ---
>  drivers/block/ublk_drv.c      | 193 +++++++++++++++++++++++++++++++++-
>  include/uapi/linux/ublk_cmd.h |   5 +
>  2 files changed, 197 insertions(+), 1 deletion(-)
>
> diff --git a/drivers/block/ublk_drv.c b/drivers/block/ublk_drv.c
> index 21890947ceec..66c77daae955 100644
> --- a/drivers/block/ublk_drv.c
> +++ b/drivers/block/ublk_drv.c
> @@ -117,6 +117,7 @@ struct ublk_batch_io_data {
>         struct ublk_device *ub;
>         struct io_uring_cmd *cmd;
>         struct ublk_batch_io header;
> +       unsigned int issue_flags;

This looks unused in this commit. Move it to the previous commit
introducing struct ublk_batch_io_data, or the next commit that uses
issue_flags?

Other than that,
Reviewed-by: Caleb Sander Mateos <csander@purestorage.com>

>  };
>
>  /*
> @@ -201,6 +202,7 @@ struct ublk_io {
>         unsigned task_registered_buffers;
>
>         void *buf_ctx_handle;
> +       spinlock_t lock;
>  } ____cacheline_aligned_in_smp;
>
>  struct ublk_queue {
> @@ -270,6 +272,16 @@ static inline bool ublk_dev_support_batch_io(const struct ublk_device *ub)
>         return false;
>  }
>
> +static inline void ublk_io_lock(struct ublk_io *io)
> +{
> +       spin_lock(&io->lock);
> +}
> +
> +static inline void ublk_io_unlock(struct ublk_io *io)
> +{
> +       spin_unlock(&io->lock);
> +}
> +
>  static inline struct ublksrv_io_desc *
>  ublk_get_iod(const struct ublk_queue *ubq, unsigned tag)
>  {
> @@ -2531,6 +2543,171 @@ static int ublk_ch_uring_cmd(struct io_uring_cmd *cmd, unsigned int issue_flags)
>         return ublk_ch_uring_cmd_local(cmd, issue_flags);
>  }
>
> +static inline __u64 ublk_batch_buf_addr(const struct ublk_batch_io *uc,
> +                                       const struct ublk_elem_header *elem)
> +{
> +       const void *buf = elem;
> +
> +       if (uc->flags & UBLK_BATCH_F_HAS_BUF_ADDR)
> +               return *(__u64 *)(buf + sizeof(*elem));
> +       return 0;
> +}
> +
> +static struct ublk_auto_buf_reg
> +ublk_batch_auto_buf_reg(const struct ublk_batch_io *uc,
> +                       const struct ublk_elem_header *elem)
> +{
> +       struct ublk_auto_buf_reg reg = {
> +               .index = elem->buf_index,
> +               .flags = (uc->flags & UBLK_BATCH_F_AUTO_BUF_REG_FALLBACK) ?
> +                       UBLK_AUTO_BUF_REG_FALLBACK : 0,
> +       };
> +
> +       return reg;
> +}
> +
> +/*
> + * 48 can hold any type of buffer element(8, 16 and 24 bytes) because
> + * it is the least common multiple(LCM) of 8, 16 and 24
> + */
> +#define UBLK_CMD_BATCH_TMP_BUF_SZ  (48 * 10)
> +struct ublk_batch_io_iter {
> +       void __user *uaddr;
> +       unsigned done, total;
> +       unsigned char elem_bytes;
> +       /* copy to this buffer from user space */
> +       unsigned char buf[UBLK_CMD_BATCH_TMP_BUF_SZ];
> +};
> +
> +static inline int
> +__ublk_walk_cmd_buf(struct ublk_queue *ubq,
> +                   struct ublk_batch_io_iter *iter,
> +                   const struct ublk_batch_io_data *data,
> +                   unsigned bytes,
> +                   int (*cb)(struct ublk_queue *q,
> +                           const struct ublk_batch_io_data *data,
> +                           const struct ublk_elem_header *elem))
> +{
> +       unsigned int i;
> +       int ret = 0;
> +
> +       for (i = 0; i < bytes; i += iter->elem_bytes) {
> +               const struct ublk_elem_header *elem =
> +                       (const struct ublk_elem_header *)&iter->buf[i];
> +
> +               if (unlikely(elem->tag >= data->ub->dev_info.queue_depth)) {
> +                       ret = -EINVAL;
> +                       break;
> +               }
> +
> +               ret = cb(ubq, data, elem);
> +               if (unlikely(ret))
> +                       break;
> +       }
> +
> +       iter->done += i;
> +       return ret;
> +}
> +
> +static int ublk_walk_cmd_buf(struct ublk_batch_io_iter *iter,
> +                            const struct ublk_batch_io_data *data,
> +                            int (*cb)(struct ublk_queue *q,
> +                                    const struct ublk_batch_io_data *data,
> +                                    const struct ublk_elem_header *elem))
> +{
> +       struct ublk_queue *ubq = ublk_get_queue(data->ub, data->header.q_id);
> +       int ret = 0;
> +
> +       while (iter->done < iter->total) {
> +               unsigned int len = min(sizeof(iter->buf), iter->total - iter->done);
> +
> +               if (copy_from_user(iter->buf, iter->uaddr + iter->done, len)) {
> +                       pr_warn("ublk%d: read batch cmd buffer failed\n",
> +                                       data->ub->dev_info.dev_id);
> +                       return -EFAULT;
> +               }
> +
> +               ret = __ublk_walk_cmd_buf(ubq, iter, data, len, cb);
> +               if (ret)
> +                       return ret;
> +       }
> +       return 0;
> +}
> +
> +static int ublk_batch_unprep_io(struct ublk_queue *ubq,
> +                               const struct ublk_batch_io_data *data,
> +                               const struct ublk_elem_header *elem)
> +{
> +       struct ublk_io *io = &ubq->ios[elem->tag];
> +
> +       data->ub->nr_io_ready--;
> +       ublk_io_lock(io);
> +       io->flags = 0;
> +       ublk_io_unlock(io);
> +       return 0;
> +}
> +
> +static void ublk_batch_revert_prep_cmd(struct ublk_batch_io_iter *iter,
> +                                      const struct ublk_batch_io_data *data)
> +{
> +       int ret;
> +
> +       /* Re-process only what we've already processed, starting from beginning */
> +       iter->total = iter->done;
> +       iter->done = 0;
> +
> +       ret = ublk_walk_cmd_buf(iter, data, ublk_batch_unprep_io);
> +       WARN_ON_ONCE(ret);
> +}
> +
> +static int ublk_batch_prep_io(struct ublk_queue *ubq,
> +                             const struct ublk_batch_io_data *data,
> +                             const struct ublk_elem_header *elem)
> +{
> +       struct ublk_io *io = &ubq->ios[elem->tag];
> +       const struct ublk_batch_io *uc = &data->header;
> +       union ublk_io_buf buf = { 0 };
> +       int ret;
> +
> +       if (ublk_dev_support_auto_buf_reg(data->ub))
> +               buf.auto_reg = ublk_batch_auto_buf_reg(uc, elem);
> +       else if (ublk_dev_need_map_io(data->ub)) {
> +               buf.addr = ublk_batch_buf_addr(uc, elem);
> +
> +               ret = ublk_check_fetch_buf(data->ub, buf.addr);
> +               if (ret)
> +                       return ret;
> +       }
> +
> +       ublk_io_lock(io);
> +       ret = __ublk_fetch(data->cmd, data->ub, io);
> +       if (!ret)
> +               io->buf = buf;
> +       ublk_io_unlock(io);
> +
> +       return ret;
> +}
> +
> +static int ublk_handle_batch_prep_cmd(const struct ublk_batch_io_data *data)
> +{
> +       const struct ublk_batch_io *uc = &data->header;
> +       struct io_uring_cmd *cmd = data->cmd;
> +       struct ublk_batch_io_iter iter = {
> +               .uaddr = u64_to_user_ptr(READ_ONCE(cmd->sqe->addr)),
> +               .total = uc->nr_elem * uc->elem_bytes,
> +               .elem_bytes = uc->elem_bytes,
> +       };
> +       int ret;
> +
> +       mutex_lock(&data->ub->mutex);
> +       ret = ublk_walk_cmd_buf(&iter, data, ublk_batch_prep_io);
> +
> +       if (ret && iter.done)
> +               ublk_batch_revert_prep_cmd(&iter, data);
> +       mutex_unlock(&data->ub->mutex);
> +       return ret;
> +}
> +
>  static int ublk_check_batch_cmd_flags(const struct ublk_batch_io *uc)
>  {
>         unsigned elem_bytes = sizeof(struct ublk_elem_header);
> @@ -2587,6 +2764,7 @@ static int ublk_ch_batch_io_uring_cmd(struct io_uring_cmd *cmd,
>                         .nr_elem = READ_ONCE(uc->nr_elem),
>                         .elem_bytes = READ_ONCE(uc->elem_bytes),
>                 },
> +               .issue_flags = issue_flags,
>         };
>         u32 cmd_op = cmd->cmd_op;
>         int ret = -EINVAL;
> @@ -2596,6 +2774,11 @@ static int ublk_ch_batch_io_uring_cmd(struct io_uring_cmd *cmd,
>
>         switch (cmd_op) {
>         case UBLK_U_IO_PREP_IO_CMDS:
> +               ret = ublk_check_batch_cmd(&data);
> +               if (ret)
> +                       goto out;
> +               ret = ublk_handle_batch_prep_cmd(&data);
> +               break;
>         case UBLK_U_IO_COMMIT_IO_CMDS:
>                 ret = ublk_check_batch_cmd(&data);
>                 if (ret)
> @@ -2770,7 +2953,7 @@ static int ublk_init_queue(struct ublk_device *ub, int q_id)
>         struct ublk_queue *ubq;
>         struct page *page;
>         int numa_node;
> -       int size;
> +       int size, i;
>
>         /* Determine NUMA node based on queue's CPU affinity */
>         numa_node = ublk_get_queue_numa_node(ub, q_id);
> @@ -2795,6 +2978,9 @@ static int ublk_init_queue(struct ublk_device *ub, int q_id)
>         }
>         ubq->io_cmd_buf = page_address(page);
>
> +       for (i = 0; i < ubq->q_depth; i++)
> +               spin_lock_init(&ubq->ios[i].lock);
> +
>         ub->queues[q_id] = ubq;
>         ubq->dev = ub;
>         return 0;
> @@ -3021,6 +3207,11 @@ static int ublk_ctrl_start_dev(struct ublk_device *ub,
>                 return -EINVAL;
>
>         mutex_lock(&ub->mutex);
> +       /* device may become not ready in case of F_BATCH */
> +       if (!ublk_dev_ready(ub)) {
> +               ret = -EINVAL;
> +               goto out_unlock;
> +       }
>         if (ub->dev_info.state == UBLK_S_DEV_LIVE ||
>             test_bit(UB_STATE_USED, &ub->state)) {
>                 ret = -EEXIST;
> diff --git a/include/uapi/linux/ublk_cmd.h b/include/uapi/linux/ublk_cmd.h
> index 2ce5a496b622..c96c299057c3 100644
> --- a/include/uapi/linux/ublk_cmd.h
> +++ b/include/uapi/linux/ublk_cmd.h
> @@ -102,6 +102,11 @@
>         _IOWR('u', 0x23, struct ublksrv_io_cmd)
>  #define        UBLK_U_IO_UNREGISTER_IO_BUF     \
>         _IOWR('u', 0x24, struct ublksrv_io_cmd)
> +
> +/*
> + * return 0 if the command is run successfully, otherwise failure code
> + * is returned
> + */
>  #define        UBLK_U_IO_PREP_IO_CMDS  \
>         _IOWR('u', 0x25, struct ublk_batch_io)
>  #define        UBLK_U_IO_COMMIT_IO_CMDS        \
> --
> 2.47.0
>