[PATCH V4 14/27] ublk: add UBLK_U_IO_FETCH_IO_CMDS for batch I/O processing

Ming Lei posted 27 patches 1 week, 3 days ago
[PATCH V4 14/27] ublk: add UBLK_U_IO_FETCH_IO_CMDS for batch I/O processing
Posted by Ming Lei 1 week, 3 days ago
Add UBLK_U_IO_FETCH_IO_CMDS command to enable efficient batch processing
of I/O requests. This multishot uring_cmd allows the ublk server to fetch
multiple I/O commands in a single operation, significantly reducing
submission overhead compared to individual FETCH_REQ* commands.

Key Design Features:

1. Multishot Operation: One UBLK_U_IO_FETCH_IO_CMDS can fetch many I/O
   commands, with the batch size limited by the provided buffer length.

2. Dynamic Load Balancing: Multiple fetch commands can be submitted
   simultaneously, but only one is active at any time. This enables
   efficient load distribution across multiple server task contexts.

3. Implicit State Management: The implementation uses three key variables
   to track state:
   - evts_fifo: Queue of request tags awaiting processing
   - fcmd_head: List of available fetch commands
   - active_fcmd: Currently active fetch command (NULL = none active)

   States are derived implicitly:
   - IDLE: No fetch commands available
   - READY: Fetch commands available, none active
   - ACTIVE: One fetch command processing events

4. Lockless Reader Optimization: The active fetch command can read from
   evts_fifo without locking (single reader guarantee), while writers
   (ublk_queue_rq/ublk_queue_rqs) use evts_lock protection. The memory
   barrier pairing plays key role for the single lockless reader
   optimization.

Implementation Details:

- ublk_queue_rq() and ublk_queue_rqs() save request tags to evts_fifo
- __ublk_pick_active_fcmd() selects an available fetch command when
  events arrive and no command is currently active
- ublk_batch_dispatch() moves tags from evts_fifo to the fetch command's
  buffer and posts completion via io_uring_mshot_cmd_post_cqe()
- State transitions are coordinated via evts_lock to maintain consistency

Signed-off-by: Ming Lei <ming.lei@redhat.com>
---
 drivers/block/ublk_drv.c      | 412 +++++++++++++++++++++++++++++++---
 include/uapi/linux/ublk_cmd.h |   7 +
 2 files changed, 388 insertions(+), 31 deletions(-)

diff --git a/drivers/block/ublk_drv.c b/drivers/block/ublk_drv.c
index cc9c92d97349..2e5e392c939e 100644
--- a/drivers/block/ublk_drv.c
+++ b/drivers/block/ublk_drv.c
@@ -93,6 +93,7 @@
 
 /* ublk batch fetch uring_cmd */
 struct ublk_batch_fcmd {
+	struct list_head node;
 	struct io_uring_cmd *cmd;
 	unsigned short buf_group;
 };
@@ -117,7 +118,10 @@ struct ublk_uring_cmd_pdu {
 	 */
 	struct ublk_queue *ubq;
 
-	u16 tag;
+	union {
+		u16 tag;
+		struct ublk_batch_fcmd *fcmd; /* batch io only */
+	};
 };
 
 struct ublk_batch_io_data {
@@ -229,18 +233,36 @@ struct ublk_queue {
 	struct ublk_device *dev;
 
 	/*
-	 * Inflight ublk request tag is saved in this fifo
+	 * Batch I/O State Management:
+	 *
+	 * The batch I/O system uses implicit state management based on the
+	 * combination of three key variables below.
+	 *
+	 * - IDLE: list_empty(&fcmd_head) && !active_fcmd
+	 *   No fetch commands available, events queue in evts_fifo
+	 *
+	 * - READY: !list_empty(&fcmd_head) && !active_fcmd
+	 *   Fetch commands available but none processing events
 	 *
-	 * There are multiple writer from ublk_queue_rq() or ublk_queue_rqs(),
-	 * so lock is required for storing request tag to fifo
+	 * - ACTIVE: active_fcmd
+	 *   One fetch command actively processing events from evts_fifo
 	 *
-	 * Make sure just one reader for fetching request from task work
-	 * function to ublk server, so no need to grab the lock in reader
-	 * side.
+	 * Key Invariants:
+	 * - At most one active_fcmd at any time (single reader)
+	 * - active_fcmd is always from fcmd_head list when non-NULL
+	 * - evts_fifo can be read locklessly by the single active reader
+	 * - All state transitions require evts_lock protection
+	 * - Multiple writers to evts_fifo require lock protection
 	 */
 	struct {
 		DECLARE_KFIFO_PTR(evts_fifo, unsigned short);
 		spinlock_t evts_lock;
+
+		/* List of fetch commands available to process events */
+		struct list_head fcmd_head;
+
+		/* Currently active fetch command (NULL = none active) */
+		struct ublk_batch_fcmd  *active_fcmd;
 	}____cacheline_aligned_in_smp;
 
 	struct ublk_io ios[] __counted_by(q_depth);
@@ -292,12 +314,20 @@ static void ublk_abort_queue(struct ublk_device *ub, struct ublk_queue *ubq);
 static inline struct request *__ublk_check_and_get_req(struct ublk_device *ub,
 		u16 q_id, u16 tag, struct ublk_io *io, size_t offset);
 static inline unsigned int ublk_req_build_flags(struct request *req);
+static void ublk_batch_dispatch(struct ublk_queue *ubq,
+				struct ublk_batch_io_data *data,
+				struct ublk_batch_fcmd *fcmd);
 
 static inline bool ublk_dev_support_batch_io(const struct ublk_device *ub)
 {
 	return false;
 }
 
+static inline bool ublk_support_batch_io(const struct ublk_queue *ubq)
+{
+	return false;
+}
+
 static inline void ublk_io_lock(struct ublk_io *io)
 {
 	spin_lock(&io->lock);
@@ -624,13 +654,45 @@ static wait_queue_head_t ublk_idr_wq;	/* wait until one idr is freed */
 
 static DEFINE_MUTEX(ublk_ctl_mutex);
 
+static struct ublk_batch_fcmd *
+ublk_batch_alloc_fcmd(struct io_uring_cmd *cmd)
+{
+	struct ublk_batch_fcmd *fcmd = kzalloc(sizeof(*fcmd), GFP_NOIO);
+
+	if (fcmd) {
+		fcmd->cmd = cmd;
+		fcmd->buf_group = READ_ONCE(cmd->sqe->buf_index);
+	}
+	return fcmd;
+}
+
+static void ublk_batch_free_fcmd(struct ublk_batch_fcmd *fcmd)
+{
+	kfree(fcmd);
+}
+
+static void __ublk_release_fcmd(struct ublk_queue *ubq)
+{
+	WRITE_ONCE(ubq->active_fcmd, NULL);
+}
 
-static void ublk_batch_deinit_fetch_buf(const struct ublk_batch_io_data *data,
+/*
+ * Nothing can move on, so clear ->active_fcmd, and the caller should stop
+ * dispatching
+ */
+static void ublk_batch_deinit_fetch_buf(struct ublk_queue *ubq,
+					const struct ublk_batch_io_data *data,
 					struct ublk_batch_fcmd *fcmd,
 					int res)
 {
+	spin_lock(&ubq->evts_lock);
+	list_del(&fcmd->node);
+	WARN_ON_ONCE(fcmd != ubq->active_fcmd);
+	__ublk_release_fcmd(ubq);
+	spin_unlock(&ubq->evts_lock);
+
 	io_uring_cmd_done(fcmd->cmd, res, data->issue_flags);
-	fcmd->cmd = NULL;
+	ublk_batch_free_fcmd(fcmd);
 }
 
 static int ublk_batch_fetch_post_cqe(struct ublk_batch_fcmd *fcmd,
@@ -1491,6 +1553,8 @@ static int __ublk_batch_dispatch(struct ublk_queue *ubq,
 	bool needs_filter;
 	int ret;
 
+	WARN_ON_ONCE(data->cmd != fcmd->cmd);
+
 	sel = io_uring_cmd_buffer_select(fcmd->cmd, fcmd->buf_group, &len,
 					 data->issue_flags);
 	if (sel.val < 0)
@@ -1548,23 +1612,94 @@ static int __ublk_batch_dispatch(struct ublk_queue *ubq,
 	return ret;
 }
 
-static __maybe_unused int
-ublk_batch_dispatch(struct ublk_queue *ubq,
-		    const struct ublk_batch_io_data *data,
-		    struct ublk_batch_fcmd *fcmd)
+static struct ublk_batch_fcmd *__ublk_acquire_fcmd(
+		struct ublk_queue *ubq)
+{
+	struct ublk_batch_fcmd *fcmd;
+
+	lockdep_assert_held(&ubq->evts_lock);
+
+	/*
+	 * Ordering updating ubq->evts_fifo and checking ubq->active_fcmd.
+	 *
+	 * The pair is the smp_mb() in ublk_batch_dispatch().
+	 *
+	 * If ubq->active_fcmd is observed as non-NULL, the new added tags
+	 * can be visisible in ublk_batch_dispatch() with the barrier pairing.
+	 */
+	smp_mb();
+	if (READ_ONCE(ubq->active_fcmd)) {
+		fcmd = NULL;
+	} else {
+		fcmd = list_first_entry_or_null(&ubq->fcmd_head,
+				struct ublk_batch_fcmd, node);
+		WRITE_ONCE(ubq->active_fcmd, fcmd);
+	}
+	return fcmd;
+}
+
+static void ublk_batch_tw_cb(struct io_uring_cmd *cmd,
+			   unsigned int issue_flags)
+{
+	struct ublk_uring_cmd_pdu *pdu = ublk_get_uring_cmd_pdu(cmd);
+	struct ublk_batch_fcmd *fcmd = pdu->fcmd;
+	struct ublk_batch_io_data data = {
+		.ub = pdu->ubq->dev,
+		.cmd = fcmd->cmd,
+		.issue_flags = issue_flags,
+	};
+
+	WARN_ON_ONCE(pdu->ubq->active_fcmd != fcmd);
+
+	ublk_batch_dispatch(pdu->ubq, &data, fcmd);
+}
+
+static void ublk_batch_dispatch(struct ublk_queue *ubq,
+				struct ublk_batch_io_data *data,
+				struct ublk_batch_fcmd *fcmd)
 {
+	struct ublk_batch_fcmd *new_fcmd;
+	void *handle;
+	bool empty;
 	int ret = 0;
 
+again:
 	while (!ublk_io_evts_empty(ubq)) {
 		ret = __ublk_batch_dispatch(ubq, data, fcmd);
 		if (ret <= 0)
 			break;
 	}
 
-	if (ret < 0)
-		ublk_batch_deinit_fetch_buf(data, fcmd, ret);
+	if (ret < 0) {
+		ublk_batch_deinit_fetch_buf(ubq, data, fcmd, ret);
+		return;
+	}
 
-	return ret;
+	handle = io_uring_cmd_ctx_handle(fcmd->cmd);
+	__ublk_release_fcmd(ubq);
+	/*
+	 * Order clearing ubq->active_fcmd from __ublk_release_fcmd() and
+	 * checking ubq->evts_fifo.
+	 *
+	 * The pair is the smp_mb() in __ublk_acquire_fcmd().
+	 */
+	smp_mb();
+	empty = ublk_io_evts_empty(ubq);
+	if (likely(empty))
+		return;
+
+	spin_lock(&ubq->evts_lock);
+	new_fcmd = __ublk_acquire_fcmd(ubq);
+	spin_unlock(&ubq->evts_lock);
+
+	if (!new_fcmd)
+		return;
+	if (handle == io_uring_cmd_ctx_handle(new_fcmd->cmd)) {
+		data->cmd = new_fcmd->cmd;
+		fcmd = new_fcmd;
+		goto again;
+	}
+	io_uring_cmd_complete_in_task(new_fcmd->cmd, ublk_batch_tw_cb);
 }
 
 static void ublk_cmd_tw_cb(struct io_uring_cmd *cmd,
@@ -1576,13 +1711,27 @@ static void ublk_cmd_tw_cb(struct io_uring_cmd *cmd,
 	ublk_dispatch_req(ubq, pdu->req, issue_flags);
 }
 
-static void ublk_queue_cmd(struct ublk_queue *ubq, struct request *rq)
+static void ublk_queue_cmd(struct ublk_queue *ubq, struct request *rq, bool last)
 {
-	struct io_uring_cmd *cmd = ubq->ios[rq->tag].cmd;
-	struct ublk_uring_cmd_pdu *pdu = ublk_get_uring_cmd_pdu(cmd);
+	if (ublk_support_batch_io(ubq)) {
+		unsigned short tag = rq->tag;
+		struct ublk_batch_fcmd *fcmd = NULL;
 
-	pdu->req = rq;
-	io_uring_cmd_complete_in_task(cmd, ublk_cmd_tw_cb);
+		spin_lock(&ubq->evts_lock);
+		kfifo_put(&ubq->evts_fifo, tag);
+		if (last)
+			fcmd = __ublk_acquire_fcmd(ubq);
+		spin_unlock(&ubq->evts_lock);
+
+		if (fcmd)
+			io_uring_cmd_complete_in_task(fcmd->cmd, ublk_batch_tw_cb);
+	} else {
+		struct io_uring_cmd *cmd = ubq->ios[rq->tag].cmd;
+		struct ublk_uring_cmd_pdu *pdu = ublk_get_uring_cmd_pdu(cmd);
+
+		pdu->req = rq;
+		io_uring_cmd_complete_in_task(cmd, ublk_cmd_tw_cb);
+	}
 }
 
 static void ublk_cmd_list_tw_cb(struct io_uring_cmd *cmd,
@@ -1600,14 +1749,44 @@ static void ublk_cmd_list_tw_cb(struct io_uring_cmd *cmd,
 	} while (rq);
 }
 
-static void ublk_queue_cmd_list(struct ublk_io *io, struct rq_list *l)
+static void ublk_batch_queue_cmd_list(struct ublk_queue *ubq, struct rq_list *l)
 {
-	struct io_uring_cmd *cmd = io->cmd;
-	struct ublk_uring_cmd_pdu *pdu = ublk_get_uring_cmd_pdu(cmd);
+	unsigned short tags[MAX_NR_TAG];
+	struct ublk_batch_fcmd *fcmd;
+	struct request *rq;
+	unsigned cnt = 0;
+
+	spin_lock(&ubq->evts_lock);
+	rq_list_for_each(l, rq) {
+		tags[cnt++] = (unsigned short)rq->tag;
+		if (cnt >= MAX_NR_TAG) {
+			kfifo_in(&ubq->evts_fifo, tags, cnt);
+			cnt = 0;
+		}
+	}
+	if (cnt)
+		kfifo_in(&ubq->evts_fifo, tags, cnt);
+	fcmd = __ublk_acquire_fcmd(ubq);
+	spin_unlock(&ubq->evts_lock);
 
-	pdu->req_list = rq_list_peek(l);
 	rq_list_init(l);
-	io_uring_cmd_complete_in_task(cmd, ublk_cmd_list_tw_cb);
+	if (fcmd)
+		io_uring_cmd_complete_in_task(fcmd->cmd, ublk_batch_tw_cb);
+}
+
+static void ublk_queue_cmd_list(struct ublk_queue *ubq, struct ublk_io *io,
+				struct rq_list *l, bool batch)
+{
+	if (batch) {
+		ublk_batch_queue_cmd_list(ubq, l);
+	} else {
+		struct io_uring_cmd *cmd = io->cmd;
+		struct ublk_uring_cmd_pdu *pdu = ublk_get_uring_cmd_pdu(cmd);
+
+		pdu->req_list = rq_list_peek(l);
+		rq_list_init(l);
+		io_uring_cmd_complete_in_task(cmd, ublk_cmd_list_tw_cb);
+	}
 }
 
 static enum blk_eh_timer_return ublk_timeout(struct request *rq)
@@ -1686,7 +1865,7 @@ static blk_status_t ublk_queue_rq(struct blk_mq_hw_ctx *hctx,
 		return BLK_STS_OK;
 	}
 
-	ublk_queue_cmd(ubq, rq);
+	ublk_queue_cmd(ubq, rq, bd->last);
 	return BLK_STS_OK;
 }
 
@@ -1698,11 +1877,25 @@ static inline bool ublk_belong_to_same_batch(const struct ublk_io *io,
 		(io->task == io2->task);
 }
 
-static void ublk_queue_rqs(struct rq_list *rqlist)
+static void ublk_commit_rqs(struct blk_mq_hw_ctx *hctx)
+{
+	struct ublk_queue *ubq = hctx->driver_data;
+	struct ublk_batch_fcmd *fcmd;
+
+	spin_lock(&ubq->evts_lock);
+	fcmd = __ublk_acquire_fcmd(ubq);
+	spin_unlock(&ubq->evts_lock);
+
+	if (fcmd)
+		io_uring_cmd_complete_in_task(fcmd->cmd, ublk_batch_tw_cb);
+}
+
+static void __ublk_queue_rqs(struct rq_list *rqlist, bool batch)
 {
 	struct rq_list requeue_list = { };
 	struct rq_list submit_list = { };
 	struct ublk_io *io = NULL;
+	struct ublk_queue *ubq = NULL;
 	struct request *req;
 
 	while ((req = rq_list_pop(rqlist))) {
@@ -1716,16 +1909,27 @@ static void ublk_queue_rqs(struct rq_list *rqlist)
 
 		if (io && !ublk_belong_to_same_batch(io, this_io) &&
 				!rq_list_empty(&submit_list))
-			ublk_queue_cmd_list(io, &submit_list);
+			ublk_queue_cmd_list(ubq, io, &submit_list, batch);
 		io = this_io;
+		ubq = this_q;
 		rq_list_add_tail(&submit_list, req);
 	}
 
 	if (!rq_list_empty(&submit_list))
-		ublk_queue_cmd_list(io, &submit_list);
+		ublk_queue_cmd_list(ubq, io, &submit_list, batch);
 	*rqlist = requeue_list;
 }
 
+static void ublk_queue_rqs(struct rq_list *rqlist)
+{
+	__ublk_queue_rqs(rqlist, false);
+}
+
+static void ublk_batch_queue_rqs(struct rq_list *rqlist)
+{
+	__ublk_queue_rqs(rqlist, true);
+}
+
 static int ublk_init_hctx(struct blk_mq_hw_ctx *hctx, void *driver_data,
 		unsigned int hctx_idx)
 {
@@ -1743,6 +1947,14 @@ static const struct blk_mq_ops ublk_mq_ops = {
 	.timeout	= ublk_timeout,
 };
 
+static const struct blk_mq_ops ublk_batch_mq_ops = {
+	.commit_rqs	= ublk_commit_rqs,
+	.queue_rq       = ublk_queue_rq,
+	.queue_rqs      = ublk_batch_queue_rqs,
+	.init_hctx	= ublk_init_hctx,
+	.timeout	= ublk_timeout,
+};
+
 static void ublk_queue_reinit(struct ublk_device *ub, struct ublk_queue *ubq)
 {
 	int i;
@@ -2120,6 +2332,56 @@ static void ublk_cancel_cmd(struct ublk_queue *ubq, unsigned tag,
 		io_uring_cmd_done(io->cmd, UBLK_IO_RES_ABORT, issue_flags);
 }
 
+static void ublk_batch_cancel_cmd(struct ublk_queue *ubq,
+				  struct ublk_batch_fcmd *fcmd,
+				  unsigned int issue_flags)
+{
+	bool done;
+
+	spin_lock(&ubq->evts_lock);
+	done = (ubq->active_fcmd != fcmd);
+	if (done)
+		list_del(&fcmd->node);
+	spin_unlock(&ubq->evts_lock);
+
+	if (done) {
+		io_uring_cmd_done(fcmd->cmd, UBLK_IO_RES_ABORT, issue_flags);
+		ublk_batch_free_fcmd(fcmd);
+	}
+}
+
+static void ublk_batch_cancel_queue(struct ublk_queue *ubq)
+{
+	LIST_HEAD(fcmd_list);
+
+	spin_lock(&ubq->evts_lock);
+	ubq->force_abort = true;
+	list_splice_init(&ubq->fcmd_head, &fcmd_list);
+	if (ubq->active_fcmd)
+		list_move(&ubq->active_fcmd->node, &ubq->fcmd_head);
+	spin_unlock(&ubq->evts_lock);
+
+	while (!list_empty(&fcmd_list)) {
+		struct ublk_batch_fcmd *fcmd = list_first_entry(&fcmd_list,
+				struct ublk_batch_fcmd, node);
+
+		ublk_batch_cancel_cmd(ubq, fcmd, IO_URING_F_UNLOCKED);
+	}
+}
+
+static void ublk_batch_cancel_fn(struct io_uring_cmd *cmd,
+				 unsigned int issue_flags)
+{
+	struct ublk_uring_cmd_pdu *pdu = ublk_get_uring_cmd_pdu(cmd);
+	struct ublk_batch_fcmd *fcmd = pdu->fcmd;
+	struct ublk_queue *ubq = pdu->ubq;
+
+	if (!ubq->canceling)
+		ublk_start_cancel(ubq->dev);
+
+	ublk_batch_cancel_cmd(ubq, fcmd, issue_flags);
+}
+
 /*
  * The ublk char device won't be closed when calling cancel fn, so both
  * ublk device and queue are guaranteed to be live
@@ -2171,6 +2433,11 @@ static void ublk_cancel_queue(struct ublk_queue *ubq)
 {
 	int i;
 
+	if (ublk_support_batch_io(ubq)) {
+		ublk_batch_cancel_queue(ubq);
+		return;
+	}
+
 	for (i = 0; i < ubq->q_depth; i++)
 		ublk_cancel_cmd(ubq, i, IO_URING_F_UNLOCKED);
 }
@@ -3091,6 +3358,74 @@ static int ublk_check_batch_cmd(const struct ublk_batch_io_data *data)
 	return ublk_check_batch_cmd_flags(uc);
 }
 
+static int ublk_batch_attach(struct ublk_queue *ubq,
+			     struct ublk_batch_io_data *data,
+			     struct ublk_batch_fcmd *fcmd)
+{
+	struct ublk_batch_fcmd *new_fcmd = NULL;
+	bool free = false;
+
+	spin_lock(&ubq->evts_lock);
+	if (unlikely(ubq->force_abort || ubq->canceling)) {
+		free = true;
+	} else {
+		list_add_tail(&fcmd->node, &ubq->fcmd_head);
+		new_fcmd = __ublk_acquire_fcmd(ubq);
+	}
+	spin_unlock(&ubq->evts_lock);
+
+	/*
+	 * If the two fetch commands are originated from same io_ring_ctx,
+	 * run batch dispatch directly. Otherwise, schedule task work for
+	 * doing it.
+	 */
+	if (new_fcmd && io_uring_cmd_ctx_handle(new_fcmd->cmd) ==
+			io_uring_cmd_ctx_handle(fcmd->cmd)) {
+		data->cmd = new_fcmd->cmd;
+		ublk_batch_dispatch(ubq, data, new_fcmd);
+	} else if (new_fcmd) {
+		io_uring_cmd_complete_in_task(new_fcmd->cmd,
+				ublk_batch_tw_cb);
+	}
+
+	if (free) {
+		ublk_batch_free_fcmd(fcmd);
+		return -ENODEV;
+	}
+	return -EIOCBQUEUED;
+}
+
+static int ublk_handle_batch_fetch_cmd(struct ublk_batch_io_data *data)
+{
+	struct ublk_queue *ubq = ublk_get_queue(data->ub, data->header.q_id);
+	struct ublk_batch_fcmd *fcmd = ublk_batch_alloc_fcmd(data->cmd);
+	struct ublk_uring_cmd_pdu *pdu = ublk_get_uring_cmd_pdu(data->cmd);
+
+	if (!fcmd)
+		return -ENOMEM;
+
+	pdu->ubq = ubq;
+	pdu->fcmd = fcmd;
+	io_uring_cmd_mark_cancelable(data->cmd, data->issue_flags);
+
+	return ublk_batch_attach(ubq, data, fcmd);
+}
+
+static int ublk_validate_batch_fetch_cmd(struct ublk_batch_io_data *data,
+					 const struct ublk_batch_io *uc)
+{
+	if (!(data->cmd->flags & IORING_URING_CMD_MULTISHOT))
+		return -EINVAL;
+
+	if (uc->elem_bytes != sizeof(__u16))
+		return -EINVAL;
+
+	if (uc->flags != 0)
+		return -E2BIG;
+
+	return 0;
+}
+
 static int ublk_ch_batch_io_uring_cmd(struct io_uring_cmd *cmd,
 				       unsigned int issue_flags)
 {
@@ -3113,6 +3448,11 @@ static int ublk_ch_batch_io_uring_cmd(struct io_uring_cmd *cmd,
 	if (data.header.q_id >= ub->dev_info.nr_hw_queues)
 		goto out;
 
+	if (unlikely(issue_flags & IO_URING_F_CANCEL)) {
+		ublk_batch_cancel_fn(cmd, issue_flags);
+		return 0;
+	}
+
 	switch (cmd_op) {
 	case UBLK_U_IO_PREP_IO_CMDS:
 		ret = ublk_check_batch_cmd(&data);
@@ -3126,6 +3466,12 @@ static int ublk_ch_batch_io_uring_cmd(struct io_uring_cmd *cmd,
 			goto out;
 		ret = ublk_handle_batch_commit_cmd(&data);
 		break;
+	case UBLK_U_IO_FETCH_IO_CMDS:
+		ret = ublk_validate_batch_fetch_cmd(&data, uc);
+		if (ret)
+			goto out;
+		ret = ublk_handle_batch_fetch_cmd(&data);
+		break;
 	default:
 		ret = -EOPNOTSUPP;
 	}
@@ -3327,6 +3673,7 @@ static int ublk_init_queue(struct ublk_device *ub, int q_id)
 		ret = ublk_io_evts_init(ubq, ubq->q_depth, numa_node);
 		if (ret)
 			goto fail;
+		INIT_LIST_HEAD(&ubq->fcmd_head);
 	}
 	ub->queues[q_id] = ubq;
 	ubq->dev = ub;
@@ -3451,7 +3798,10 @@ static void ublk_align_max_io_size(struct ublk_device *ub)
 
 static int ublk_add_tag_set(struct ublk_device *ub)
 {
-	ub->tag_set.ops = &ublk_mq_ops;
+	if (ublk_dev_support_batch_io(ub))
+		ub->tag_set.ops = &ublk_batch_mq_ops;
+	else
+		ub->tag_set.ops = &ublk_mq_ops;
 	ub->tag_set.nr_hw_queues = ub->dev_info.nr_hw_queues;
 	ub->tag_set.queue_depth = ub->dev_info.queue_depth;
 	ub->tag_set.numa_node = NUMA_NO_NODE;
diff --git a/include/uapi/linux/ublk_cmd.h b/include/uapi/linux/ublk_cmd.h
index 295ec8f34173..cd894c1d188e 100644
--- a/include/uapi/linux/ublk_cmd.h
+++ b/include/uapi/linux/ublk_cmd.h
@@ -120,6 +120,13 @@
 #define	UBLK_U_IO_COMMIT_IO_CMDS	\
 	_IOWR('u', 0x26, struct ublk_batch_io)
 
+/*
+ * Fetch io commands to provided buffer in multishot style,
+ * `IORING_URING_CMD_MULTISHOT` is required for this command.
+ */
+#define	UBLK_U_IO_FETCH_IO_CMDS 	\
+	_IOWR('u', 0x27, struct ublk_batch_io)
+
 /* only ABORT means that no re-fetch */
 #define UBLK_IO_RES_OK			0
 #define UBLK_IO_RES_NEED_GET_DATA	1
-- 
2.47.0
Re: [PATCH V4 14/27] ublk: add UBLK_U_IO_FETCH_IO_CMDS for batch I/O processing
Posted by Caleb Sander Mateos 19 hours ago
On Thu, Nov 20, 2025 at 6:00 PM Ming Lei <ming.lei@redhat.com> wrote:
>
> Add UBLK_U_IO_FETCH_IO_CMDS command to enable efficient batch processing
> of I/O requests. This multishot uring_cmd allows the ublk server to fetch
> multiple I/O commands in a single operation, significantly reducing
> submission overhead compared to individual FETCH_REQ* commands.
>
> Key Design Features:
>
> 1. Multishot Operation: One UBLK_U_IO_FETCH_IO_CMDS can fetch many I/O
>    commands, with the batch size limited by the provided buffer length.
>
> 2. Dynamic Load Balancing: Multiple fetch commands can be submitted
>    simultaneously, but only one is active at any time. This enables
>    efficient load distribution across multiple server task contexts.
>
> 3. Implicit State Management: The implementation uses three key variables
>    to track state:
>    - evts_fifo: Queue of request tags awaiting processing
>    - fcmd_head: List of available fetch commands
>    - active_fcmd: Currently active fetch command (NULL = none active)
>
>    States are derived implicitly:
>    - IDLE: No fetch commands available
>    - READY: Fetch commands available, none active
>    - ACTIVE: One fetch command processing events
>
> 4. Lockless Reader Optimization: The active fetch command can read from
>    evts_fifo without locking (single reader guarantee), while writers
>    (ublk_queue_rq/ublk_queue_rqs) use evts_lock protection. The memory
>    barrier pairing plays key role for the single lockless reader
>    optimization.
>
> Implementation Details:
>
> - ublk_queue_rq() and ublk_queue_rqs() save request tags to evts_fifo
> - __ublk_pick_active_fcmd() selects an available fetch command when
>   events arrive and no command is currently active

What is __ublk_pick_active_fcmd()? I don't see a function with that name.

> - ublk_batch_dispatch() moves tags from evts_fifo to the fetch command's
>   buffer and posts completion via io_uring_mshot_cmd_post_cqe()
> - State transitions are coordinated via evts_lock to maintain consistency
>
> Signed-off-by: Ming Lei <ming.lei@redhat.com>
> ---
>  drivers/block/ublk_drv.c      | 412 +++++++++++++++++++++++++++++++---
>  include/uapi/linux/ublk_cmd.h |   7 +
>  2 files changed, 388 insertions(+), 31 deletions(-)
>
> diff --git a/drivers/block/ublk_drv.c b/drivers/block/ublk_drv.c
> index cc9c92d97349..2e5e392c939e 100644
> --- a/drivers/block/ublk_drv.c
> +++ b/drivers/block/ublk_drv.c
> @@ -93,6 +93,7 @@
>
>  /* ublk batch fetch uring_cmd */
>  struct ublk_batch_fcmd {
> +       struct list_head node;
>         struct io_uring_cmd *cmd;
>         unsigned short buf_group;
>  };
> @@ -117,7 +118,10 @@ struct ublk_uring_cmd_pdu {
>          */
>         struct ublk_queue *ubq;
>
> -       u16 tag;
> +       union {
> +               u16 tag;
> +               struct ublk_batch_fcmd *fcmd; /* batch io only */
> +       };
>  };
>
>  struct ublk_batch_io_data {
> @@ -229,18 +233,36 @@ struct ublk_queue {
>         struct ublk_device *dev;
>
>         /*
> -        * Inflight ublk request tag is saved in this fifo
> +        * Batch I/O State Management:
> +        *
> +        * The batch I/O system uses implicit state management based on the
> +        * combination of three key variables below.
> +        *
> +        * - IDLE: list_empty(&fcmd_head) && !active_fcmd
> +        *   No fetch commands available, events queue in evts_fifo
> +        *
> +        * - READY: !list_empty(&fcmd_head) && !active_fcmd
> +        *   Fetch commands available but none processing events
>          *
> -        * There are multiple writer from ublk_queue_rq() or ublk_queue_rqs(),
> -        * so lock is required for storing request tag to fifo
> +        * - ACTIVE: active_fcmd
> +        *   One fetch command actively processing events from evts_fifo
>          *
> -        * Make sure just one reader for fetching request from task work
> -        * function to ublk server, so no need to grab the lock in reader
> -        * side.
> +        * Key Invariants:
> +        * - At most one active_fcmd at any time (single reader)
> +        * - active_fcmd is always from fcmd_head list when non-NULL
> +        * - evts_fifo can be read locklessly by the single active reader
> +        * - All state transitions require evts_lock protection
> +        * - Multiple writers to evts_fifo require lock protection
>          */
>         struct {
>                 DECLARE_KFIFO_PTR(evts_fifo, unsigned short);
>                 spinlock_t evts_lock;
> +
> +               /* List of fetch commands available to process events */
> +               struct list_head fcmd_head;
> +
> +               /* Currently active fetch command (NULL = none active) */
> +               struct ublk_batch_fcmd  *active_fcmd;
>         }____cacheline_aligned_in_smp;
>
>         struct ublk_io ios[] __counted_by(q_depth);
> @@ -292,12 +314,20 @@ static void ublk_abort_queue(struct ublk_device *ub, struct ublk_queue *ubq);
>  static inline struct request *__ublk_check_and_get_req(struct ublk_device *ub,
>                 u16 q_id, u16 tag, struct ublk_io *io, size_t offset);
>  static inline unsigned int ublk_req_build_flags(struct request *req);
> +static void ublk_batch_dispatch(struct ublk_queue *ubq,
> +                               struct ublk_batch_io_data *data,
> +                               struct ublk_batch_fcmd *fcmd);
>
>  static inline bool ublk_dev_support_batch_io(const struct ublk_device *ub)
>  {
>         return false;
>  }
>
> +static inline bool ublk_support_batch_io(const struct ublk_queue *ubq)
> +{
> +       return false;
> +}
> +
>  static inline void ublk_io_lock(struct ublk_io *io)
>  {
>         spin_lock(&io->lock);
> @@ -624,13 +654,45 @@ static wait_queue_head_t ublk_idr_wq;     /* wait until one idr is freed */
>
>  static DEFINE_MUTEX(ublk_ctl_mutex);
>
> +static struct ublk_batch_fcmd *
> +ublk_batch_alloc_fcmd(struct io_uring_cmd *cmd)
> +{
> +       struct ublk_batch_fcmd *fcmd = kzalloc(sizeof(*fcmd), GFP_NOIO);

An allocation in the I/O path seems unfortunate. Is there not room to
store the struct ublk_batch_fcmd in the io_uring_cmd pdu?
> +
> +       if (fcmd) {
> +               fcmd->cmd = cmd;
> +               fcmd->buf_group = READ_ONCE(cmd->sqe->buf_index);

Is it necessary to store sample this here just to pass it back to the
io_uring layer? Wouldn't the io_uring layer already have access to it
in struct io_kiocb's buf_index field?

> +       }
> +       return fcmd;
> +}
> +
> +static void ublk_batch_free_fcmd(struct ublk_batch_fcmd *fcmd)
> +{
> +       kfree(fcmd);
> +}
> +
> +static void __ublk_release_fcmd(struct ublk_queue *ubq)
> +{
> +       WRITE_ONCE(ubq->active_fcmd, NULL);
> +}
>
> -static void ublk_batch_deinit_fetch_buf(const struct ublk_batch_io_data *data,
> +/*
> + * Nothing can move on, so clear ->active_fcmd, and the caller should stop
> + * dispatching
> + */
> +static void ublk_batch_deinit_fetch_buf(struct ublk_queue *ubq,
> +                                       const struct ublk_batch_io_data *data,
>                                         struct ublk_batch_fcmd *fcmd,
>                                         int res)
>  {
> +       spin_lock(&ubq->evts_lock);
> +       list_del(&fcmd->node);
> +       WARN_ON_ONCE(fcmd != ubq->active_fcmd);
> +       __ublk_release_fcmd(ubq);
> +       spin_unlock(&ubq->evts_lock);
> +
>         io_uring_cmd_done(fcmd->cmd, res, data->issue_flags);
> -       fcmd->cmd = NULL;
> +       ublk_batch_free_fcmd(fcmd);
>  }
>
>  static int ublk_batch_fetch_post_cqe(struct ublk_batch_fcmd *fcmd,
> @@ -1491,6 +1553,8 @@ static int __ublk_batch_dispatch(struct ublk_queue *ubq,
>         bool needs_filter;
>         int ret;
>
> +       WARN_ON_ONCE(data->cmd != fcmd->cmd);
> +
>         sel = io_uring_cmd_buffer_select(fcmd->cmd, fcmd->buf_group, &len,
>                                          data->issue_flags);
>         if (sel.val < 0)
> @@ -1548,23 +1612,94 @@ static int __ublk_batch_dispatch(struct ublk_queue *ubq,
>         return ret;
>  }
>
> -static __maybe_unused int
> -ublk_batch_dispatch(struct ublk_queue *ubq,
> -                   const struct ublk_batch_io_data *data,
> -                   struct ublk_batch_fcmd *fcmd)
> +static struct ublk_batch_fcmd *__ublk_acquire_fcmd(
> +               struct ublk_queue *ubq)
> +{
> +       struct ublk_batch_fcmd *fcmd;
> +
> +       lockdep_assert_held(&ubq->evts_lock);
> +
> +       /*
> +        * Ordering updating ubq->evts_fifo and checking ubq->active_fcmd.
> +        *
> +        * The pair is the smp_mb() in ublk_batch_dispatch().
> +        *
> +        * If ubq->active_fcmd is observed as non-NULL, the new added tags
> +        * can be visisible in ublk_batch_dispatch() with the barrier pairing.
> +        */
> +       smp_mb();
> +       if (READ_ONCE(ubq->active_fcmd)) {
> +               fcmd = NULL;
> +       } else {
> +               fcmd = list_first_entry_or_null(&ubq->fcmd_head,
> +                               struct ublk_batch_fcmd, node);
> +               WRITE_ONCE(ubq->active_fcmd, fcmd);
> +       }
> +       return fcmd;
> +}
> +
> +static void ublk_batch_tw_cb(struct io_uring_cmd *cmd,
> +                          unsigned int issue_flags)
> +{
> +       struct ublk_uring_cmd_pdu *pdu = ublk_get_uring_cmd_pdu(cmd);
> +       struct ublk_batch_fcmd *fcmd = pdu->fcmd;
> +       struct ublk_batch_io_data data = {
> +               .ub = pdu->ubq->dev,
> +               .cmd = fcmd->cmd,
> +               .issue_flags = issue_flags,
> +       };
> +
> +       WARN_ON_ONCE(pdu->ubq->active_fcmd != fcmd);
> +
> +       ublk_batch_dispatch(pdu->ubq, &data, fcmd);
> +}
> +
> +static void ublk_batch_dispatch(struct ublk_queue *ubq,
> +                               struct ublk_batch_io_data *data,
> +                               struct ublk_batch_fcmd *fcmd)
>  {
> +       struct ublk_batch_fcmd *new_fcmd;

Is the new_fcmd variable necessary? Can fcmd be reused instead?

> +       void *handle;
> +       bool empty;
>         int ret = 0;
>
> +again:
>         while (!ublk_io_evts_empty(ubq)) {
>                 ret = __ublk_batch_dispatch(ubq, data, fcmd);
>                 if (ret <= 0)
>                         break;
>         }
>
> -       if (ret < 0)
> -               ublk_batch_deinit_fetch_buf(data, fcmd, ret);
> +       if (ret < 0) {
> +               ublk_batch_deinit_fetch_buf(ubq, data, fcmd, ret);
> +               return;
> +       }
>
> -       return ret;
> +       handle = io_uring_cmd_ctx_handle(fcmd->cmd);
> +       __ublk_release_fcmd(ubq);
> +       /*
> +        * Order clearing ubq->active_fcmd from __ublk_release_fcmd() and
> +        * checking ubq->evts_fifo.
> +        *
> +        * The pair is the smp_mb() in __ublk_acquire_fcmd().
> +        */
> +       smp_mb();
> +       empty = ublk_io_evts_empty(ubq);
> +       if (likely(empty))

nit: empty variable seems unnecessary

> +               return;
> +
> +       spin_lock(&ubq->evts_lock);
> +       new_fcmd = __ublk_acquire_fcmd(ubq);
> +       spin_unlock(&ubq->evts_lock);
> +
> +       if (!new_fcmd)
> +               return;
> +       if (handle == io_uring_cmd_ctx_handle(new_fcmd->cmd)) {

This check seems to be meant to decide whether the new and old
UBLK_U_IO_FETCH_IO_CMDS commands can execute in the same task work?
But belonging to the same io_uring context doesn't necessarily mean
that the same task issued them. It seems like it would be safer to
always dispatch new_fcmd->cmd to task work.

> +               data->cmd = new_fcmd->cmd;
> +               fcmd = new_fcmd;
> +               goto again;
> +       }
> +       io_uring_cmd_complete_in_task(new_fcmd->cmd, ublk_batch_tw_cb);
>  }
>
>  static void ublk_cmd_tw_cb(struct io_uring_cmd *cmd,
> @@ -1576,13 +1711,27 @@ static void ublk_cmd_tw_cb(struct io_uring_cmd *cmd,
>         ublk_dispatch_req(ubq, pdu->req, issue_flags);
>  }
>
> -static void ublk_queue_cmd(struct ublk_queue *ubq, struct request *rq)
> +static void ublk_queue_cmd(struct ublk_queue *ubq, struct request *rq, bool last)
>  {
> -       struct io_uring_cmd *cmd = ubq->ios[rq->tag].cmd;
> -       struct ublk_uring_cmd_pdu *pdu = ublk_get_uring_cmd_pdu(cmd);
> +       if (ublk_support_batch_io(ubq)) {
> +               unsigned short tag = rq->tag;
> +               struct ublk_batch_fcmd *fcmd = NULL;
>
> -       pdu->req = rq;
> -       io_uring_cmd_complete_in_task(cmd, ublk_cmd_tw_cb);
> +               spin_lock(&ubq->evts_lock);
> +               kfifo_put(&ubq->evts_fifo, tag);
> +               if (last)
> +                       fcmd = __ublk_acquire_fcmd(ubq);
> +               spin_unlock(&ubq->evts_lock);
> +
> +               if (fcmd)
> +                       io_uring_cmd_complete_in_task(fcmd->cmd, ublk_batch_tw_cb);
> +       } else {
> +               struct io_uring_cmd *cmd = ubq->ios[rq->tag].cmd;
> +               struct ublk_uring_cmd_pdu *pdu = ublk_get_uring_cmd_pdu(cmd);
> +
> +               pdu->req = rq;
> +               io_uring_cmd_complete_in_task(cmd, ublk_cmd_tw_cb);
> +       }
>  }
>
>  static void ublk_cmd_list_tw_cb(struct io_uring_cmd *cmd,
> @@ -1600,14 +1749,44 @@ static void ublk_cmd_list_tw_cb(struct io_uring_cmd *cmd,
>         } while (rq);
>  }
>
> -static void ublk_queue_cmd_list(struct ublk_io *io, struct rq_list *l)
> +static void ublk_batch_queue_cmd_list(struct ublk_queue *ubq, struct rq_list *l)
>  {
> -       struct io_uring_cmd *cmd = io->cmd;
> -       struct ublk_uring_cmd_pdu *pdu = ublk_get_uring_cmd_pdu(cmd);
> +       unsigned short tags[MAX_NR_TAG];
> +       struct ublk_batch_fcmd *fcmd;
> +       struct request *rq;
> +       unsigned cnt = 0;
> +
> +       spin_lock(&ubq->evts_lock);
> +       rq_list_for_each(l, rq) {
> +               tags[cnt++] = (unsigned short)rq->tag;
> +               if (cnt >= MAX_NR_TAG) {
> +                       kfifo_in(&ubq->evts_fifo, tags, cnt);
> +                       cnt = 0;
> +               }
> +       }
> +       if (cnt)
> +               kfifo_in(&ubq->evts_fifo, tags, cnt);
> +       fcmd = __ublk_acquire_fcmd(ubq);
> +       spin_unlock(&ubq->evts_lock);
>
> -       pdu->req_list = rq_list_peek(l);
>         rq_list_init(l);
> -       io_uring_cmd_complete_in_task(cmd, ublk_cmd_list_tw_cb);
> +       if (fcmd)
> +               io_uring_cmd_complete_in_task(fcmd->cmd, ublk_batch_tw_cb);
> +}
> +
> +static void ublk_queue_cmd_list(struct ublk_queue *ubq, struct ublk_io *io,
> +                               struct rq_list *l, bool batch)
> +{
> +       if (batch) {
> +               ublk_batch_queue_cmd_list(ubq, l);
> +       } else {
> +               struct io_uring_cmd *cmd = io->cmd;
> +               struct ublk_uring_cmd_pdu *pdu = ublk_get_uring_cmd_pdu(cmd);
> +
> +               pdu->req_list = rq_list_peek(l);
> +               rq_list_init(l);
> +               io_uring_cmd_complete_in_task(cmd, ublk_cmd_list_tw_cb);
> +       }
>  }
>
>  static enum blk_eh_timer_return ublk_timeout(struct request *rq)
> @@ -1686,7 +1865,7 @@ static blk_status_t ublk_queue_rq(struct blk_mq_hw_ctx *hctx,
>                 return BLK_STS_OK;
>         }
>
> -       ublk_queue_cmd(ubq, rq);
> +       ublk_queue_cmd(ubq, rq, bd->last);
>         return BLK_STS_OK;
>  }
>
> @@ -1698,11 +1877,25 @@ static inline bool ublk_belong_to_same_batch(const struct ublk_io *io,
>                 (io->task == io2->task);
>  }
>
> -static void ublk_queue_rqs(struct rq_list *rqlist)
> +static void ublk_commit_rqs(struct blk_mq_hw_ctx *hctx)
> +{
> +       struct ublk_queue *ubq = hctx->driver_data;
> +       struct ublk_batch_fcmd *fcmd;
> +
> +       spin_lock(&ubq->evts_lock);
> +       fcmd = __ublk_acquire_fcmd(ubq);
> +       spin_unlock(&ubq->evts_lock);
> +
> +       if (fcmd)
> +               io_uring_cmd_complete_in_task(fcmd->cmd, ublk_batch_tw_cb);
> +}
> +
> +static void __ublk_queue_rqs(struct rq_list *rqlist, bool batch)
>  {
>         struct rq_list requeue_list = { };
>         struct rq_list submit_list = { };
>         struct ublk_io *io = NULL;
> +       struct ublk_queue *ubq = NULL;
>         struct request *req;
>
>         while ((req = rq_list_pop(rqlist))) {
> @@ -1716,16 +1909,27 @@ static void ublk_queue_rqs(struct rq_list *rqlist)
>
>                 if (io && !ublk_belong_to_same_batch(io, this_io) &&
>                                 !rq_list_empty(&submit_list))
> -                       ublk_queue_cmd_list(io, &submit_list);
> +                       ublk_queue_cmd_list(ubq, io, &submit_list, batch);

This seems to assume that all the requests belong to the same
ublk_queue, which isn't required

>                 io = this_io;
> +               ubq = this_q;
>                 rq_list_add_tail(&submit_list, req);
>         }
>
>         if (!rq_list_empty(&submit_list))
> -               ublk_queue_cmd_list(io, &submit_list);
> +               ublk_queue_cmd_list(ubq, io, &submit_list, batch);

Same here

>         *rqlist = requeue_list;
>  }
>
> +static void ublk_queue_rqs(struct rq_list *rqlist)
> +{
> +       __ublk_queue_rqs(rqlist, false);
> +}
> +
> +static void ublk_batch_queue_rqs(struct rq_list *rqlist)
> +{
> +       __ublk_queue_rqs(rqlist, true);
> +}
> +
>  static int ublk_init_hctx(struct blk_mq_hw_ctx *hctx, void *driver_data,
>                 unsigned int hctx_idx)
>  {
> @@ -1743,6 +1947,14 @@ static const struct blk_mq_ops ublk_mq_ops = {
>         .timeout        = ublk_timeout,
>  };
>
> +static const struct blk_mq_ops ublk_batch_mq_ops = {
> +       .commit_rqs     = ublk_commit_rqs,
> +       .queue_rq       = ublk_queue_rq,
> +       .queue_rqs      = ublk_batch_queue_rqs,
> +       .init_hctx      = ublk_init_hctx,
> +       .timeout        = ublk_timeout,
> +};
> +
>  static void ublk_queue_reinit(struct ublk_device *ub, struct ublk_queue *ubq)
>  {
>         int i;
> @@ -2120,6 +2332,56 @@ static void ublk_cancel_cmd(struct ublk_queue *ubq, unsigned tag,
>                 io_uring_cmd_done(io->cmd, UBLK_IO_RES_ABORT, issue_flags);
>  }
>
> +static void ublk_batch_cancel_cmd(struct ublk_queue *ubq,
> +                                 struct ublk_batch_fcmd *fcmd,
> +                                 unsigned int issue_flags)
> +{
> +       bool done;
> +
> +       spin_lock(&ubq->evts_lock);
> +       done = (ubq->active_fcmd != fcmd);

Needs to use READ_ONCE() since __ublk_release_fcmd() can be called
without holding evts_lock?

> +       if (done)
> +               list_del(&fcmd->node);
> +       spin_unlock(&ubq->evts_lock);
> +
> +       if (done) {
> +               io_uring_cmd_done(fcmd->cmd, UBLK_IO_RES_ABORT, issue_flags);
> +               ublk_batch_free_fcmd(fcmd);
> +       }
> +}
> +
> +static void ublk_batch_cancel_queue(struct ublk_queue *ubq)
> +{
> +       LIST_HEAD(fcmd_list);
> +
> +       spin_lock(&ubq->evts_lock);
> +       ubq->force_abort = true;
> +       list_splice_init(&ubq->fcmd_head, &fcmd_list);
> +       if (ubq->active_fcmd)
> +               list_move(&ubq->active_fcmd->node, &ubq->fcmd_head);

Similarly, needs READ_ONCE()?

> +       spin_unlock(&ubq->evts_lock);
> +
> +       while (!list_empty(&fcmd_list)) {
> +               struct ublk_batch_fcmd *fcmd = list_first_entry(&fcmd_list,
> +                               struct ublk_batch_fcmd, node);
> +
> +               ublk_batch_cancel_cmd(ubq, fcmd, IO_URING_F_UNLOCKED);
> +       }
> +}
> +
> +static void ublk_batch_cancel_fn(struct io_uring_cmd *cmd,
> +                                unsigned int issue_flags)
> +{
> +       struct ublk_uring_cmd_pdu *pdu = ublk_get_uring_cmd_pdu(cmd);
> +       struct ublk_batch_fcmd *fcmd = pdu->fcmd;
> +       struct ublk_queue *ubq = pdu->ubq;
> +
> +       if (!ubq->canceling)

Is it not racy to access ubq->canceling without any lock held?

> +               ublk_start_cancel(ubq->dev);
> +
> +       ublk_batch_cancel_cmd(ubq, fcmd, issue_flags);
> +}
> +
>  /*
>   * The ublk char device won't be closed when calling cancel fn, so both
>   * ublk device and queue are guaranteed to be live
> @@ -2171,6 +2433,11 @@ static void ublk_cancel_queue(struct ublk_queue *ubq)
>  {
>         int i;
>
> +       if (ublk_support_batch_io(ubq)) {
> +               ublk_batch_cancel_queue(ubq);
> +               return;
> +       }
> +
>         for (i = 0; i < ubq->q_depth; i++)
>                 ublk_cancel_cmd(ubq, i, IO_URING_F_UNLOCKED);
>  }
> @@ -3091,6 +3358,74 @@ static int ublk_check_batch_cmd(const struct ublk_batch_io_data *data)
>         return ublk_check_batch_cmd_flags(uc);
>  }
>
> +static int ublk_batch_attach(struct ublk_queue *ubq,
> +                            struct ublk_batch_io_data *data,
> +                            struct ublk_batch_fcmd *fcmd)
> +{
> +       struct ublk_batch_fcmd *new_fcmd = NULL;
> +       bool free = false;
> +
> +       spin_lock(&ubq->evts_lock);
> +       if (unlikely(ubq->force_abort || ubq->canceling)) {
> +               free = true;
> +       } else {
> +               list_add_tail(&fcmd->node, &ubq->fcmd_head);
> +               new_fcmd = __ublk_acquire_fcmd(ubq);
> +       }
> +       spin_unlock(&ubq->evts_lock);
> +
> +       /*
> +        * If the two fetch commands are originated from same io_ring_ctx,
> +        * run batch dispatch directly. Otherwise, schedule task work for
> +        * doing it.
> +        */
> +       if (new_fcmd && io_uring_cmd_ctx_handle(new_fcmd->cmd) ==
> +                       io_uring_cmd_ctx_handle(fcmd->cmd)) {
> +               data->cmd = new_fcmd->cmd;
> +               ublk_batch_dispatch(ubq, data, new_fcmd);
> +       } else if (new_fcmd) {
> +               io_uring_cmd_complete_in_task(new_fcmd->cmd,
> +                               ublk_batch_tw_cb);
> +       }

Return early if (!new_fcmd) to reduce indentation?

> +
> +       if (free) {
> +               ublk_batch_free_fcmd(fcmd);
> +               return -ENODEV;
> +       }

Move the if (free) check directly after spin_unlock(&ubq->evts_lock)?

> +       return -EIOCBQUEUED;

> +}
> +
> +static int ublk_handle_batch_fetch_cmd(struct ublk_batch_io_data *data)
> +{
> +       struct ublk_queue *ubq = ublk_get_queue(data->ub, data->header.q_id);
> +       struct ublk_batch_fcmd *fcmd = ublk_batch_alloc_fcmd(data->cmd);
> +       struct ublk_uring_cmd_pdu *pdu = ublk_get_uring_cmd_pdu(data->cmd);
> +
> +       if (!fcmd)
> +               return -ENOMEM;
> +
> +       pdu->ubq = ubq;
> +       pdu->fcmd = fcmd;
> +       io_uring_cmd_mark_cancelable(data->cmd, data->issue_flags);
> +
> +       return ublk_batch_attach(ubq, data, fcmd);
> +}
> +
> +static int ublk_validate_batch_fetch_cmd(struct ublk_batch_io_data *data,
> +                                        const struct ublk_batch_io *uc)
> +{
> +       if (!(data->cmd->flags & IORING_URING_CMD_MULTISHOT))
> +               return -EINVAL;
> +
> +       if (uc->elem_bytes != sizeof(__u16))
> +               return -EINVAL;
> +
> +       if (uc->flags != 0)
> +               return -E2BIG;
> +
> +       return 0;
> +}
> +
>  static int ublk_ch_batch_io_uring_cmd(struct io_uring_cmd *cmd,
>                                        unsigned int issue_flags)
>  {
> @@ -3113,6 +3448,11 @@ static int ublk_ch_batch_io_uring_cmd(struct io_uring_cmd *cmd,
>         if (data.header.q_id >= ub->dev_info.nr_hw_queues)
>                 goto out;
>
> +       if (unlikely(issue_flags & IO_URING_F_CANCEL)) {
> +               ublk_batch_cancel_fn(cmd, issue_flags);
> +               return 0;
> +       }

Move this to the top of the function before the other logic that's not
necessary in the cancel case?

Best,
Caleb

> +
>         switch (cmd_op) {
>         case UBLK_U_IO_PREP_IO_CMDS:
>                 ret = ublk_check_batch_cmd(&data);
> @@ -3126,6 +3466,12 @@ static int ublk_ch_batch_io_uring_cmd(struct io_uring_cmd *cmd,
>                         goto out;
>                 ret = ublk_handle_batch_commit_cmd(&data);
>                 break;
> +       case UBLK_U_IO_FETCH_IO_CMDS:
> +               ret = ublk_validate_batch_fetch_cmd(&data, uc);
> +               if (ret)
> +                       goto out;
> +               ret = ublk_handle_batch_fetch_cmd(&data);
> +               break;
>         default:
>                 ret = -EOPNOTSUPP;
>         }
> @@ -3327,6 +3673,7 @@ static int ublk_init_queue(struct ublk_device *ub, int q_id)
>                 ret = ublk_io_evts_init(ubq, ubq->q_depth, numa_node);
>                 if (ret)
>                         goto fail;
> +               INIT_LIST_HEAD(&ubq->fcmd_head);
>         }
>         ub->queues[q_id] = ubq;
>         ubq->dev = ub;
> @@ -3451,7 +3798,10 @@ static void ublk_align_max_io_size(struct ublk_device *ub)
>
>  static int ublk_add_tag_set(struct ublk_device *ub)
>  {
> -       ub->tag_set.ops = &ublk_mq_ops;
> +       if (ublk_dev_support_batch_io(ub))
> +               ub->tag_set.ops = &ublk_batch_mq_ops;
> +       else
> +               ub->tag_set.ops = &ublk_mq_ops;
>         ub->tag_set.nr_hw_queues = ub->dev_info.nr_hw_queues;
>         ub->tag_set.queue_depth = ub->dev_info.queue_depth;
>         ub->tag_set.numa_node = NUMA_NO_NODE;
> diff --git a/include/uapi/linux/ublk_cmd.h b/include/uapi/linux/ublk_cmd.h
> index 295ec8f34173..cd894c1d188e 100644
> --- a/include/uapi/linux/ublk_cmd.h
> +++ b/include/uapi/linux/ublk_cmd.h
> @@ -120,6 +120,13 @@
>  #define        UBLK_U_IO_COMMIT_IO_CMDS        \
>         _IOWR('u', 0x26, struct ublk_batch_io)
>
> +/*
> + * Fetch io commands to provided buffer in multishot style,
> + * `IORING_URING_CMD_MULTISHOT` is required for this command.
> + */
> +#define        UBLK_U_IO_FETCH_IO_CMDS         \
> +       _IOWR('u', 0x27, struct ublk_batch_io)
> +
>  /* only ABORT means that no re-fetch */
>  #define UBLK_IO_RES_OK                 0
>  #define UBLK_IO_RES_NEED_GET_DATA      1
> --
> 2.47.0
>
Re: [PATCH V4 14/27] ublk: add UBLK_U_IO_FETCH_IO_CMDS for batch I/O processing
Posted by Ming Lei 16 hours ago
On Sun, Nov 30, 2025 at 09:55:47PM -0800, Caleb Sander Mateos wrote:
> On Thu, Nov 20, 2025 at 6:00 PM Ming Lei <ming.lei@redhat.com> wrote:
> >
> > Add UBLK_U_IO_FETCH_IO_CMDS command to enable efficient batch processing
> > of I/O requests. This multishot uring_cmd allows the ublk server to fetch
> > multiple I/O commands in a single operation, significantly reducing
> > submission overhead compared to individual FETCH_REQ* commands.
> >
> > Key Design Features:
> >
> > 1. Multishot Operation: One UBLK_U_IO_FETCH_IO_CMDS can fetch many I/O
> >    commands, with the batch size limited by the provided buffer length.
> >
> > 2. Dynamic Load Balancing: Multiple fetch commands can be submitted
> >    simultaneously, but only one is active at any time. This enables
> >    efficient load distribution across multiple server task contexts.
> >
> > 3. Implicit State Management: The implementation uses three key variables
> >    to track state:
> >    - evts_fifo: Queue of request tags awaiting processing
> >    - fcmd_head: List of available fetch commands
> >    - active_fcmd: Currently active fetch command (NULL = none active)
> >
> >    States are derived implicitly:
> >    - IDLE: No fetch commands available
> >    - READY: Fetch commands available, none active
> >    - ACTIVE: One fetch command processing events
> >
> > 4. Lockless Reader Optimization: The active fetch command can read from
> >    evts_fifo without locking (single reader guarantee), while writers
> >    (ublk_queue_rq/ublk_queue_rqs) use evts_lock protection. The memory
> >    barrier pairing plays key role for the single lockless reader
> >    optimization.
> >
> > Implementation Details:
> >
> > - ublk_queue_rq() and ublk_queue_rqs() save request tags to evts_fifo
> > - __ublk_pick_active_fcmd() selects an available fetch command when
> >   events arrive and no command is currently active
> 
> What is __ublk_pick_active_fcmd()? I don't see a function with that name.

It is renamed as __ublk_acquire_fcmd(), and its counter pair is
__ublk_release_fcmd().

> 
> > - ublk_batch_dispatch() moves tags from evts_fifo to the fetch command's
> >   buffer and posts completion via io_uring_mshot_cmd_post_cqe()
> > - State transitions are coordinated via evts_lock to maintain consistency
> >
> > Signed-off-by: Ming Lei <ming.lei@redhat.com>
> > ---
> >  drivers/block/ublk_drv.c      | 412 +++++++++++++++++++++++++++++++---
> >  include/uapi/linux/ublk_cmd.h |   7 +
> >  2 files changed, 388 insertions(+), 31 deletions(-)
> >
> > diff --git a/drivers/block/ublk_drv.c b/drivers/block/ublk_drv.c
> > index cc9c92d97349..2e5e392c939e 100644
> > --- a/drivers/block/ublk_drv.c
> > +++ b/drivers/block/ublk_drv.c
> > @@ -93,6 +93,7 @@
> >
> >  /* ublk batch fetch uring_cmd */
> >  struct ublk_batch_fcmd {
> > +       struct list_head node;
> >         struct io_uring_cmd *cmd;
> >         unsigned short buf_group;
> >  };
> > @@ -117,7 +118,10 @@ struct ublk_uring_cmd_pdu {
> >          */
> >         struct ublk_queue *ubq;
> >
> > -       u16 tag;
> > +       union {
> > +               u16 tag;
> > +               struct ublk_batch_fcmd *fcmd; /* batch io only */
> > +       };
> >  };
> >
> >  struct ublk_batch_io_data {
> > @@ -229,18 +233,36 @@ struct ublk_queue {
> >         struct ublk_device *dev;
> >
> >         /*
> > -        * Inflight ublk request tag is saved in this fifo
> > +        * Batch I/O State Management:
> > +        *
> > +        * The batch I/O system uses implicit state management based on the
> > +        * combination of three key variables below.
> > +        *
> > +        * - IDLE: list_empty(&fcmd_head) && !active_fcmd
> > +        *   No fetch commands available, events queue in evts_fifo
> > +        *
> > +        * - READY: !list_empty(&fcmd_head) && !active_fcmd
> > +        *   Fetch commands available but none processing events
> >          *
> > -        * There are multiple writer from ublk_queue_rq() or ublk_queue_rqs(),
> > -        * so lock is required for storing request tag to fifo
> > +        * - ACTIVE: active_fcmd
> > +        *   One fetch command actively processing events from evts_fifo
> >          *
> > -        * Make sure just one reader for fetching request from task work
> > -        * function to ublk server, so no need to grab the lock in reader
> > -        * side.
> > +        * Key Invariants:
> > +        * - At most one active_fcmd at any time (single reader)
> > +        * - active_fcmd is always from fcmd_head list when non-NULL
> > +        * - evts_fifo can be read locklessly by the single active reader
> > +        * - All state transitions require evts_lock protection
> > +        * - Multiple writers to evts_fifo require lock protection
> >          */
> >         struct {
> >                 DECLARE_KFIFO_PTR(evts_fifo, unsigned short);
> >                 spinlock_t evts_lock;
> > +
> > +               /* List of fetch commands available to process events */
> > +               struct list_head fcmd_head;
> > +
> > +               /* Currently active fetch command (NULL = none active) */
> > +               struct ublk_batch_fcmd  *active_fcmd;
> >         }____cacheline_aligned_in_smp;
> >
> >         struct ublk_io ios[] __counted_by(q_depth);
> > @@ -292,12 +314,20 @@ static void ublk_abort_queue(struct ublk_device *ub, struct ublk_queue *ubq);
> >  static inline struct request *__ublk_check_and_get_req(struct ublk_device *ub,
> >                 u16 q_id, u16 tag, struct ublk_io *io, size_t offset);
> >  static inline unsigned int ublk_req_build_flags(struct request *req);
> > +static void ublk_batch_dispatch(struct ublk_queue *ubq,
> > +                               struct ublk_batch_io_data *data,
> > +                               struct ublk_batch_fcmd *fcmd);
> >
> >  static inline bool ublk_dev_support_batch_io(const struct ublk_device *ub)
> >  {
> >         return false;
> >  }
> >
> > +static inline bool ublk_support_batch_io(const struct ublk_queue *ubq)
> > +{
> > +       return false;
> > +}
> > +
> >  static inline void ublk_io_lock(struct ublk_io *io)
> >  {
> >         spin_lock(&io->lock);
> > @@ -624,13 +654,45 @@ static wait_queue_head_t ublk_idr_wq;     /* wait until one idr is freed */
> >
> >  static DEFINE_MUTEX(ublk_ctl_mutex);
> >
> > +static struct ublk_batch_fcmd *
> > +ublk_batch_alloc_fcmd(struct io_uring_cmd *cmd)
> > +{
> > +       struct ublk_batch_fcmd *fcmd = kzalloc(sizeof(*fcmd), GFP_NOIO);
> 
> An allocation in the I/O path seems unfortunate. Is there not room to
> store the struct ublk_batch_fcmd in the io_uring_cmd pdu?

It is allocated once for one mshot request, which covers many IOs.

It can't be held in uring_cmd pdu, but the allocation can be optimized in
future. Not a big deal in enablement stage.

> > +
> > +       if (fcmd) {
> > +               fcmd->cmd = cmd;
> > +               fcmd->buf_group = READ_ONCE(cmd->sqe->buf_index);
> 
> Is it necessary to store sample this here just to pass it back to the
> io_uring layer? Wouldn't the io_uring layer already have access to it
> in struct io_kiocb's buf_index field?

->buf_group is used by io_uring_cmd_buffer_select(), and this way also
follows ->buf_index uses in both io_uring/net.c and io_uring/rw.c.

More importantly req->buf_index is used in io_uring/kbuf.c internally, see
io_ring_buffer_select(), so we can't reuse req->buf_index here.

> 
> > +       }
> > +       return fcmd;
> > +}
> > +
> > +static void ublk_batch_free_fcmd(struct ublk_batch_fcmd *fcmd)
> > +{
> > +       kfree(fcmd);
> > +}
> > +
> > +static void __ublk_release_fcmd(struct ublk_queue *ubq)
> > +{
> > +       WRITE_ONCE(ubq->active_fcmd, NULL);
> > +}
> >
> > -static void ublk_batch_deinit_fetch_buf(const struct ublk_batch_io_data *data,
> > +/*
> > + * Nothing can move on, so clear ->active_fcmd, and the caller should stop
> > + * dispatching
> > + */
> > +static void ublk_batch_deinit_fetch_buf(struct ublk_queue *ubq,
> > +                                       const struct ublk_batch_io_data *data,
> >                                         struct ublk_batch_fcmd *fcmd,
> >                                         int res)
> >  {
> > +       spin_lock(&ubq->evts_lock);
> > +       list_del(&fcmd->node);
> > +       WARN_ON_ONCE(fcmd != ubq->active_fcmd);
> > +       __ublk_release_fcmd(ubq);
> > +       spin_unlock(&ubq->evts_lock);
> > +
> >         io_uring_cmd_done(fcmd->cmd, res, data->issue_flags);
> > -       fcmd->cmd = NULL;
> > +       ublk_batch_free_fcmd(fcmd);
> >  }
> >
> >  static int ublk_batch_fetch_post_cqe(struct ublk_batch_fcmd *fcmd,
> > @@ -1491,6 +1553,8 @@ static int __ublk_batch_dispatch(struct ublk_queue *ubq,
> >         bool needs_filter;
> >         int ret;
> >
> > +       WARN_ON_ONCE(data->cmd != fcmd->cmd);
> > +
> >         sel = io_uring_cmd_buffer_select(fcmd->cmd, fcmd->buf_group, &len,
> >                                          data->issue_flags);
> >         if (sel.val < 0)
> > @@ -1548,23 +1612,94 @@ static int __ublk_batch_dispatch(struct ublk_queue *ubq,
> >         return ret;
> >  }
> >
> > -static __maybe_unused int
> > -ublk_batch_dispatch(struct ublk_queue *ubq,
> > -                   const struct ublk_batch_io_data *data,
> > -                   struct ublk_batch_fcmd *fcmd)
> > +static struct ublk_batch_fcmd *__ublk_acquire_fcmd(
> > +               struct ublk_queue *ubq)
> > +{
> > +       struct ublk_batch_fcmd *fcmd;
> > +
> > +       lockdep_assert_held(&ubq->evts_lock);
> > +
> > +       /*
> > +        * Ordering updating ubq->evts_fifo and checking ubq->active_fcmd.
> > +        *
> > +        * The pair is the smp_mb() in ublk_batch_dispatch().
> > +        *
> > +        * If ubq->active_fcmd is observed as non-NULL, the new added tags
> > +        * can be visisible in ublk_batch_dispatch() with the barrier pairing.
> > +        */
> > +       smp_mb();
> > +       if (READ_ONCE(ubq->active_fcmd)) {
> > +               fcmd = NULL;
> > +       } else {
> > +               fcmd = list_first_entry_or_null(&ubq->fcmd_head,
> > +                               struct ublk_batch_fcmd, node);
> > +               WRITE_ONCE(ubq->active_fcmd, fcmd);
> > +       }
> > +       return fcmd;
> > +}
> > +
> > +static void ublk_batch_tw_cb(struct io_uring_cmd *cmd,
> > +                          unsigned int issue_flags)
> > +{
> > +       struct ublk_uring_cmd_pdu *pdu = ublk_get_uring_cmd_pdu(cmd);
> > +       struct ublk_batch_fcmd *fcmd = pdu->fcmd;
> > +       struct ublk_batch_io_data data = {
> > +               .ub = pdu->ubq->dev,
> > +               .cmd = fcmd->cmd,
> > +               .issue_flags = issue_flags,
> > +       };
> > +
> > +       WARN_ON_ONCE(pdu->ubq->active_fcmd != fcmd);
> > +
> > +       ublk_batch_dispatch(pdu->ubq, &data, fcmd);
> > +}
> > +
> > +static void ublk_batch_dispatch(struct ublk_queue *ubq,
> > +                               struct ublk_batch_io_data *data,
> > +                               struct ublk_batch_fcmd *fcmd)
> >  {
> > +       struct ublk_batch_fcmd *new_fcmd;
> 
> Is the new_fcmd variable necessary? Can fcmd be reused instead?
> 
> > +       void *handle;
> > +       bool empty;
> >         int ret = 0;
> >
> > +again:
> >         while (!ublk_io_evts_empty(ubq)) {
> >                 ret = __ublk_batch_dispatch(ubq, data, fcmd);
> >                 if (ret <= 0)
> >                         break;
> >         }
> >
> > -       if (ret < 0)
> > -               ublk_batch_deinit_fetch_buf(data, fcmd, ret);
> > +       if (ret < 0) {
> > +               ublk_batch_deinit_fetch_buf(ubq, data, fcmd, ret);
> > +               return;
> > +       }
> >
> > -       return ret;
> > +       handle = io_uring_cmd_ctx_handle(fcmd->cmd);
> > +       __ublk_release_fcmd(ubq);
> > +       /*
> > +        * Order clearing ubq->active_fcmd from __ublk_release_fcmd() and
> > +        * checking ubq->evts_fifo.
> > +        *
> > +        * The pair is the smp_mb() in __ublk_acquire_fcmd().
> > +        */
> > +       smp_mb();
> > +       empty = ublk_io_evts_empty(ubq);
> > +       if (likely(empty))
> 
> nit: empty variable seems unnecessary
> 
> > +               return;
> > +
> > +       spin_lock(&ubq->evts_lock);
> > +       new_fcmd = __ublk_acquire_fcmd(ubq);
> > +       spin_unlock(&ubq->evts_lock);
> > +
> > +       if (!new_fcmd)
> > +               return;
> > +       if (handle == io_uring_cmd_ctx_handle(new_fcmd->cmd)) {
> 
> This check seems to be meant to decide whether the new and old
> UBLK_U_IO_FETCH_IO_CMDS commands can execute in the same task work?

Actually not.

> But belonging to the same io_uring context doesn't necessarily mean
> that the same task issued them. It seems like it would be safer to
> always dispatch new_fcmd->cmd to task work.

What matters is just that ctx->uring_lock & issue_flag matches from ublk
viewpoint, so it is safe to do so.

However, given it is hit in slow path, so starting new dispatch
is easier.

> 
> > +               data->cmd = new_fcmd->cmd;
> > +               fcmd = new_fcmd;
> > +               goto again;
> > +       }
> > +       io_uring_cmd_complete_in_task(new_fcmd->cmd, ublk_batch_tw_cb);
> >  }
> >
> >  static void ublk_cmd_tw_cb(struct io_uring_cmd *cmd,
> > @@ -1576,13 +1711,27 @@ static void ublk_cmd_tw_cb(struct io_uring_cmd *cmd,
> >         ublk_dispatch_req(ubq, pdu->req, issue_flags);
> >  }
> >
> > -static void ublk_queue_cmd(struct ublk_queue *ubq, struct request *rq)
> > +static void ublk_queue_cmd(struct ublk_queue *ubq, struct request *rq, bool last)
> >  {
> > -       struct io_uring_cmd *cmd = ubq->ios[rq->tag].cmd;
> > -       struct ublk_uring_cmd_pdu *pdu = ublk_get_uring_cmd_pdu(cmd);
> > +       if (ublk_support_batch_io(ubq)) {
> > +               unsigned short tag = rq->tag;
> > +               struct ublk_batch_fcmd *fcmd = NULL;
> >
> > -       pdu->req = rq;
> > -       io_uring_cmd_complete_in_task(cmd, ublk_cmd_tw_cb);
> > +               spin_lock(&ubq->evts_lock);
> > +               kfifo_put(&ubq->evts_fifo, tag);
> > +               if (last)
> > +                       fcmd = __ublk_acquire_fcmd(ubq);
> > +               spin_unlock(&ubq->evts_lock);
> > +
> > +               if (fcmd)
> > +                       io_uring_cmd_complete_in_task(fcmd->cmd, ublk_batch_tw_cb);
> > +       } else {
> > +               struct io_uring_cmd *cmd = ubq->ios[rq->tag].cmd;
> > +               struct ublk_uring_cmd_pdu *pdu = ublk_get_uring_cmd_pdu(cmd);
> > +
> > +               pdu->req = rq;
> > +               io_uring_cmd_complete_in_task(cmd, ublk_cmd_tw_cb);
> > +       }
> >  }
> >
> >  static void ublk_cmd_list_tw_cb(struct io_uring_cmd *cmd,
> > @@ -1600,14 +1749,44 @@ static void ublk_cmd_list_tw_cb(struct io_uring_cmd *cmd,
> >         } while (rq);
> >  }
> >
> > -static void ublk_queue_cmd_list(struct ublk_io *io, struct rq_list *l)
> > +static void ublk_batch_queue_cmd_list(struct ublk_queue *ubq, struct rq_list *l)
> >  {
> > -       struct io_uring_cmd *cmd = io->cmd;
> > -       struct ublk_uring_cmd_pdu *pdu = ublk_get_uring_cmd_pdu(cmd);
> > +       unsigned short tags[MAX_NR_TAG];
> > +       struct ublk_batch_fcmd *fcmd;
> > +       struct request *rq;
> > +       unsigned cnt = 0;
> > +
> > +       spin_lock(&ubq->evts_lock);
> > +       rq_list_for_each(l, rq) {
> > +               tags[cnt++] = (unsigned short)rq->tag;
> > +               if (cnt >= MAX_NR_TAG) {
> > +                       kfifo_in(&ubq->evts_fifo, tags, cnt);
> > +                       cnt = 0;
> > +               }
> > +       }
> > +       if (cnt)
> > +               kfifo_in(&ubq->evts_fifo, tags, cnt);
> > +       fcmd = __ublk_acquire_fcmd(ubq);
> > +       spin_unlock(&ubq->evts_lock);
> >
> > -       pdu->req_list = rq_list_peek(l);
> >         rq_list_init(l);
> > -       io_uring_cmd_complete_in_task(cmd, ublk_cmd_list_tw_cb);
> > +       if (fcmd)
> > +               io_uring_cmd_complete_in_task(fcmd->cmd, ublk_batch_tw_cb);
> > +}
> > +
> > +static void ublk_queue_cmd_list(struct ublk_queue *ubq, struct ublk_io *io,
> > +                               struct rq_list *l, bool batch)
> > +{
> > +       if (batch) {
> > +               ublk_batch_queue_cmd_list(ubq, l);
> > +       } else {
> > +               struct io_uring_cmd *cmd = io->cmd;
> > +               struct ublk_uring_cmd_pdu *pdu = ublk_get_uring_cmd_pdu(cmd);
> > +
> > +               pdu->req_list = rq_list_peek(l);
> > +               rq_list_init(l);
> > +               io_uring_cmd_complete_in_task(cmd, ublk_cmd_list_tw_cb);
> > +       }
> >  }
> >
> >  static enum blk_eh_timer_return ublk_timeout(struct request *rq)
> > @@ -1686,7 +1865,7 @@ static blk_status_t ublk_queue_rq(struct blk_mq_hw_ctx *hctx,
> >                 return BLK_STS_OK;
> >         }
> >
> > -       ublk_queue_cmd(ubq, rq);
> > +       ublk_queue_cmd(ubq, rq, bd->last);
> >         return BLK_STS_OK;
> >  }
> >
> > @@ -1698,11 +1877,25 @@ static inline bool ublk_belong_to_same_batch(const struct ublk_io *io,
> >                 (io->task == io2->task);
> >  }
> >
> > -static void ublk_queue_rqs(struct rq_list *rqlist)
> > +static void ublk_commit_rqs(struct blk_mq_hw_ctx *hctx)
> > +{
> > +       struct ublk_queue *ubq = hctx->driver_data;
> > +       struct ublk_batch_fcmd *fcmd;
> > +
> > +       spin_lock(&ubq->evts_lock);
> > +       fcmd = __ublk_acquire_fcmd(ubq);
> > +       spin_unlock(&ubq->evts_lock);
> > +
> > +       if (fcmd)
> > +               io_uring_cmd_complete_in_task(fcmd->cmd, ublk_batch_tw_cb);
> > +}
> > +
> > +static void __ublk_queue_rqs(struct rq_list *rqlist, bool batch)
> >  {
> >         struct rq_list requeue_list = { };
> >         struct rq_list submit_list = { };
> >         struct ublk_io *io = NULL;
> > +       struct ublk_queue *ubq = NULL;
> >         struct request *req;
> >
> >         while ((req = rq_list_pop(rqlist))) {
> > @@ -1716,16 +1909,27 @@ static void ublk_queue_rqs(struct rq_list *rqlist)
> >
> >                 if (io && !ublk_belong_to_same_batch(io, this_io) &&
> >                                 !rq_list_empty(&submit_list))
> > -                       ublk_queue_cmd_list(io, &submit_list);
> > +                       ublk_queue_cmd_list(ubq, io, &submit_list, batch);
> 
> This seems to assume that all the requests belong to the same
> ublk_queue, which isn't required

Here, it is required for BATCH_IO, which needs new __ublk_queue_rqs()
implementation now.

Nice catch!

> 
> >                 io = this_io;
> > +               ubq = this_q;
> >                 rq_list_add_tail(&submit_list, req);
> >         }
> >
> >         if (!rq_list_empty(&submit_list))
> > -               ublk_queue_cmd_list(io, &submit_list);
> > +               ublk_queue_cmd_list(ubq, io, &submit_list, batch);
> 
> Same here
> 
> >         *rqlist = requeue_list;
> >  }
> >
> > +static void ublk_queue_rqs(struct rq_list *rqlist)
> > +{
> > +       __ublk_queue_rqs(rqlist, false);
> > +}
> > +
> > +static void ublk_batch_queue_rqs(struct rq_list *rqlist)
> > +{
> > +       __ublk_queue_rqs(rqlist, true);
> > +}
> > +
> >  static int ublk_init_hctx(struct blk_mq_hw_ctx *hctx, void *driver_data,
> >                 unsigned int hctx_idx)
> >  {
> > @@ -1743,6 +1947,14 @@ static const struct blk_mq_ops ublk_mq_ops = {
> >         .timeout        = ublk_timeout,
> >  };
> >
> > +static const struct blk_mq_ops ublk_batch_mq_ops = {
> > +       .commit_rqs     = ublk_commit_rqs,
> > +       .queue_rq       = ublk_queue_rq,
> > +       .queue_rqs      = ublk_batch_queue_rqs,
> > +       .init_hctx      = ublk_init_hctx,
> > +       .timeout        = ublk_timeout,
> > +};
> > +
> >  static void ublk_queue_reinit(struct ublk_device *ub, struct ublk_queue *ubq)
> >  {
> >         int i;
> > @@ -2120,6 +2332,56 @@ static void ublk_cancel_cmd(struct ublk_queue *ubq, unsigned tag,
> >                 io_uring_cmd_done(io->cmd, UBLK_IO_RES_ABORT, issue_flags);
> >  }
> >
> > +static void ublk_batch_cancel_cmd(struct ublk_queue *ubq,
> > +                                 struct ublk_batch_fcmd *fcmd,
> > +                                 unsigned int issue_flags)
> > +{
> > +       bool done;
> > +
> > +       spin_lock(&ubq->evts_lock);
> > +       done = (ubq->active_fcmd != fcmd);
> 
> Needs to use READ_ONCE() since __ublk_release_fcmd() can be called
> without holding evts_lock?

OK.

> 
> > +       if (done)
> > +               list_del(&fcmd->node);
> > +       spin_unlock(&ubq->evts_lock);
> > +
> > +       if (done) {
> > +               io_uring_cmd_done(fcmd->cmd, UBLK_IO_RES_ABORT, issue_flags);
> > +               ublk_batch_free_fcmd(fcmd);
> > +       }
> > +}
> > +
> > +static void ublk_batch_cancel_queue(struct ublk_queue *ubq)
> > +{
> > +       LIST_HEAD(fcmd_list);
> > +
> > +       spin_lock(&ubq->evts_lock);
> > +       ubq->force_abort = true;
> > +       list_splice_init(&ubq->fcmd_head, &fcmd_list);
> > +       if (ubq->active_fcmd)
> > +               list_move(&ubq->active_fcmd->node, &ubq->fcmd_head);
> 
> Similarly, needs READ_ONCE()?

OK.

But this one may not be necessary, since now everything is just quiesced,
and the lockless code path won't hit any more.

> 
> > +       spin_unlock(&ubq->evts_lock);
> > +
> > +       while (!list_empty(&fcmd_list)) {
> > +               struct ublk_batch_fcmd *fcmd = list_first_entry(&fcmd_list,
> > +                               struct ublk_batch_fcmd, node);
> > +
> > +               ublk_batch_cancel_cmd(ubq, fcmd, IO_URING_F_UNLOCKED);
> > +       }
> > +}
> > +
> > +static void ublk_batch_cancel_fn(struct io_uring_cmd *cmd,
> > +                                unsigned int issue_flags)
> > +{
> > +       struct ublk_uring_cmd_pdu *pdu = ublk_get_uring_cmd_pdu(cmd);
> > +       struct ublk_batch_fcmd *fcmd = pdu->fcmd;
> > +       struct ublk_queue *ubq = pdu->ubq;
> > +
> > +       if (!ubq->canceling)
> 
> Is it not racy to access ubq->canceling without any lock held?

OK, will switch to call ublk_start_cancel() unconditionally.

> 
> > +               ublk_start_cancel(ubq->dev);
> > +
> > +       ublk_batch_cancel_cmd(ubq, fcmd, issue_flags);
> > +}
> > +
> >  /*
> >   * The ublk char device won't be closed when calling cancel fn, so both
> >   * ublk device and queue are guaranteed to be live
> > @@ -2171,6 +2433,11 @@ static void ublk_cancel_queue(struct ublk_queue *ubq)
> >  {
> >         int i;
> >
> > +       if (ublk_support_batch_io(ubq)) {
> > +               ublk_batch_cancel_queue(ubq);
> > +               return;
> > +       }
> > +
> >         for (i = 0; i < ubq->q_depth; i++)
> >                 ublk_cancel_cmd(ubq, i, IO_URING_F_UNLOCKED);
> >  }
> > @@ -3091,6 +3358,74 @@ static int ublk_check_batch_cmd(const struct ublk_batch_io_data *data)
> >         return ublk_check_batch_cmd_flags(uc);
> >  }
> >
> > +static int ublk_batch_attach(struct ublk_queue *ubq,
> > +                            struct ublk_batch_io_data *data,
> > +                            struct ublk_batch_fcmd *fcmd)
> > +{
> > +       struct ublk_batch_fcmd *new_fcmd = NULL;
> > +       bool free = false;
> > +
> > +       spin_lock(&ubq->evts_lock);
> > +       if (unlikely(ubq->force_abort || ubq->canceling)) {
> > +               free = true;
> > +       } else {
> > +               list_add_tail(&fcmd->node, &ubq->fcmd_head);
> > +               new_fcmd = __ublk_acquire_fcmd(ubq);
> > +       }
> > +       spin_unlock(&ubq->evts_lock);
> > +
> > +       /*
> > +        * If the two fetch commands are originated from same io_ring_ctx,
> > +        * run batch dispatch directly. Otherwise, schedule task work for
> > +        * doing it.
> > +        */
> > +       if (new_fcmd && io_uring_cmd_ctx_handle(new_fcmd->cmd) ==
> > +                       io_uring_cmd_ctx_handle(fcmd->cmd)) {
> > +               data->cmd = new_fcmd->cmd;
> > +               ublk_batch_dispatch(ubq, data, new_fcmd);
> > +       } else if (new_fcmd) {
> > +               io_uring_cmd_complete_in_task(new_fcmd->cmd,
> > +                               ublk_batch_tw_cb);
> > +       }
> 
> Return early if (!new_fcmd) to reduce indentation?
> 
> > +
> > +       if (free) {
> > +               ublk_batch_free_fcmd(fcmd);
> > +               return -ENODEV;
> > +       }
> 
> Move the if (free) check directly after spin_unlock(&ubq->evts_lock)?

Yeah, this is better.

> 
> > +       return -EIOCBQUEUED;
> 
> > +}
> > +
> > +static int ublk_handle_batch_fetch_cmd(struct ublk_batch_io_data *data)
> > +{
> > +       struct ublk_queue *ubq = ublk_get_queue(data->ub, data->header.q_id);
> > +       struct ublk_batch_fcmd *fcmd = ublk_batch_alloc_fcmd(data->cmd);
> > +       struct ublk_uring_cmd_pdu *pdu = ublk_get_uring_cmd_pdu(data->cmd);
> > +
> > +       if (!fcmd)
> > +               return -ENOMEM;
> > +
> > +       pdu->ubq = ubq;
> > +       pdu->fcmd = fcmd;
> > +       io_uring_cmd_mark_cancelable(data->cmd, data->issue_flags);
> > +
> > +       return ublk_batch_attach(ubq, data, fcmd);
> > +}
> > +
> > +static int ublk_validate_batch_fetch_cmd(struct ublk_batch_io_data *data,
> > +                                        const struct ublk_batch_io *uc)
> > +{
> > +       if (!(data->cmd->flags & IORING_URING_CMD_MULTISHOT))
> > +               return -EINVAL;
> > +
> > +       if (uc->elem_bytes != sizeof(__u16))
> > +               return -EINVAL;
> > +
> > +       if (uc->flags != 0)
> > +               return -E2BIG;
> > +
> > +       return 0;
> > +}
> > +
> >  static int ublk_ch_batch_io_uring_cmd(struct io_uring_cmd *cmd,
> >                                        unsigned int issue_flags)
> >  {
> > @@ -3113,6 +3448,11 @@ static int ublk_ch_batch_io_uring_cmd(struct io_uring_cmd *cmd,
> >         if (data.header.q_id >= ub->dev_info.nr_hw_queues)
> >                 goto out;
> >
> > +       if (unlikely(issue_flags & IO_URING_F_CANCEL)) {
> > +               ublk_batch_cancel_fn(cmd, issue_flags);
> > +               return 0;
> > +       }
> 
> Move this to the top of the function before the other logic that's not
> necessary in the cancel case?

Yeah, looks better.


Thanks,
Ming

Re: [PATCH V4 14/27] ublk: add UBLK_U_IO_FETCH_IO_CMDS for batch I/O processing
Posted by Caleb Sander Mateos 7 hours ago
On Mon, Dec 1, 2025 at 1:42 AM Ming Lei <ming.lei@redhat.com> wrote:
>
> On Sun, Nov 30, 2025 at 09:55:47PM -0800, Caleb Sander Mateos wrote:
> > On Thu, Nov 20, 2025 at 6:00 PM Ming Lei <ming.lei@redhat.com> wrote:
> > >
> > > Add UBLK_U_IO_FETCH_IO_CMDS command to enable efficient batch processing
> > > of I/O requests. This multishot uring_cmd allows the ublk server to fetch
> > > multiple I/O commands in a single operation, significantly reducing
> > > submission overhead compared to individual FETCH_REQ* commands.
> > >
> > > Key Design Features:
> > >
> > > 1. Multishot Operation: One UBLK_U_IO_FETCH_IO_CMDS can fetch many I/O
> > >    commands, with the batch size limited by the provided buffer length.
> > >
> > > 2. Dynamic Load Balancing: Multiple fetch commands can be submitted
> > >    simultaneously, but only one is active at any time. This enables
> > >    efficient load distribution across multiple server task contexts.
> > >
> > > 3. Implicit State Management: The implementation uses three key variables
> > >    to track state:
> > >    - evts_fifo: Queue of request tags awaiting processing
> > >    - fcmd_head: List of available fetch commands
> > >    - active_fcmd: Currently active fetch command (NULL = none active)
> > >
> > >    States are derived implicitly:
> > >    - IDLE: No fetch commands available
> > >    - READY: Fetch commands available, none active
> > >    - ACTIVE: One fetch command processing events
> > >
> > > 4. Lockless Reader Optimization: The active fetch command can read from
> > >    evts_fifo without locking (single reader guarantee), while writers
> > >    (ublk_queue_rq/ublk_queue_rqs) use evts_lock protection. The memory
> > >    barrier pairing plays key role for the single lockless reader
> > >    optimization.
> > >
> > > Implementation Details:
> > >
> > > - ublk_queue_rq() and ublk_queue_rqs() save request tags to evts_fifo
> > > - __ublk_pick_active_fcmd() selects an available fetch command when
> > >   events arrive and no command is currently active
> >
> > What is __ublk_pick_active_fcmd()? I don't see a function with that name.
>
> It is renamed as __ublk_acquire_fcmd(), and its counter pair is
> __ublk_release_fcmd().

Okay, update the commit message then?

>
> >
> > > - ublk_batch_dispatch() moves tags from evts_fifo to the fetch command's
> > >   buffer and posts completion via io_uring_mshot_cmd_post_cqe()
> > > - State transitions are coordinated via evts_lock to maintain consistency
> > >
> > > Signed-off-by: Ming Lei <ming.lei@redhat.com>
> > > ---
> > >  drivers/block/ublk_drv.c      | 412 +++++++++++++++++++++++++++++++---
> > >  include/uapi/linux/ublk_cmd.h |   7 +
> > >  2 files changed, 388 insertions(+), 31 deletions(-)
> > >
> > > diff --git a/drivers/block/ublk_drv.c b/drivers/block/ublk_drv.c
> > > index cc9c92d97349..2e5e392c939e 100644
> > > --- a/drivers/block/ublk_drv.c
> > > +++ b/drivers/block/ublk_drv.c
> > > @@ -93,6 +93,7 @@
> > >
> > >  /* ublk batch fetch uring_cmd */
> > >  struct ublk_batch_fcmd {
> > > +       struct list_head node;
> > >         struct io_uring_cmd *cmd;
> > >         unsigned short buf_group;
> > >  };
> > > @@ -117,7 +118,10 @@ struct ublk_uring_cmd_pdu {
> > >          */
> > >         struct ublk_queue *ubq;
> > >
> > > -       u16 tag;
> > > +       union {
> > > +               u16 tag;
> > > +               struct ublk_batch_fcmd *fcmd; /* batch io only */
> > > +       };
> > >  };
> > >
> > >  struct ublk_batch_io_data {
> > > @@ -229,18 +233,36 @@ struct ublk_queue {
> > >         struct ublk_device *dev;
> > >
> > >         /*
> > > -        * Inflight ublk request tag is saved in this fifo
> > > +        * Batch I/O State Management:
> > > +        *
> > > +        * The batch I/O system uses implicit state management based on the
> > > +        * combination of three key variables below.
> > > +        *
> > > +        * - IDLE: list_empty(&fcmd_head) && !active_fcmd
> > > +        *   No fetch commands available, events queue in evts_fifo
> > > +        *
> > > +        * - READY: !list_empty(&fcmd_head) && !active_fcmd
> > > +        *   Fetch commands available but none processing events
> > >          *
> > > -        * There are multiple writer from ublk_queue_rq() or ublk_queue_rqs(),
> > > -        * so lock is required for storing request tag to fifo
> > > +        * - ACTIVE: active_fcmd
> > > +        *   One fetch command actively processing events from evts_fifo
> > >          *
> > > -        * Make sure just one reader for fetching request from task work
> > > -        * function to ublk server, so no need to grab the lock in reader
> > > -        * side.
> > > +        * Key Invariants:
> > > +        * - At most one active_fcmd at any time (single reader)
> > > +        * - active_fcmd is always from fcmd_head list when non-NULL
> > > +        * - evts_fifo can be read locklessly by the single active reader
> > > +        * - All state transitions require evts_lock protection
> > > +        * - Multiple writers to evts_fifo require lock protection
> > >          */
> > >         struct {
> > >                 DECLARE_KFIFO_PTR(evts_fifo, unsigned short);
> > >                 spinlock_t evts_lock;
> > > +
> > > +               /* List of fetch commands available to process events */
> > > +               struct list_head fcmd_head;
> > > +
> > > +               /* Currently active fetch command (NULL = none active) */
> > > +               struct ublk_batch_fcmd  *active_fcmd;
> > >         }____cacheline_aligned_in_smp;
> > >
> > >         struct ublk_io ios[] __counted_by(q_depth);
> > > @@ -292,12 +314,20 @@ static void ublk_abort_queue(struct ublk_device *ub, struct ublk_queue *ubq);
> > >  static inline struct request *__ublk_check_and_get_req(struct ublk_device *ub,
> > >                 u16 q_id, u16 tag, struct ublk_io *io, size_t offset);
> > >  static inline unsigned int ublk_req_build_flags(struct request *req);
> > > +static void ublk_batch_dispatch(struct ublk_queue *ubq,
> > > +                               struct ublk_batch_io_data *data,
> > > +                               struct ublk_batch_fcmd *fcmd);
> > >
> > >  static inline bool ublk_dev_support_batch_io(const struct ublk_device *ub)
> > >  {
> > >         return false;
> > >  }
> > >
> > > +static inline bool ublk_support_batch_io(const struct ublk_queue *ubq)
> > > +{
> > > +       return false;
> > > +}
> > > +
> > >  static inline void ublk_io_lock(struct ublk_io *io)
> > >  {
> > >         spin_lock(&io->lock);
> > > @@ -624,13 +654,45 @@ static wait_queue_head_t ublk_idr_wq;     /* wait until one idr is freed */
> > >
> > >  static DEFINE_MUTEX(ublk_ctl_mutex);
> > >
> > > +static struct ublk_batch_fcmd *
> > > +ublk_batch_alloc_fcmd(struct io_uring_cmd *cmd)
> > > +{
> > > +       struct ublk_batch_fcmd *fcmd = kzalloc(sizeof(*fcmd), GFP_NOIO);
> >
> > An allocation in the I/O path seems unfortunate. Is there not room to
> > store the struct ublk_batch_fcmd in the io_uring_cmd pdu?
>
> It is allocated once for one mshot request, which covers many IOs.
>
> It can't be held in uring_cmd pdu, but the allocation can be optimized in
> future. Not a big deal in enablement stage.

Okay, seems fine to optimize it in the future.

>
> > > +
> > > +       if (fcmd) {
> > > +               fcmd->cmd = cmd;
> > > +               fcmd->buf_group = READ_ONCE(cmd->sqe->buf_index);
> >
> > Is it necessary to store sample this here just to pass it back to the
> > io_uring layer? Wouldn't the io_uring layer already have access to it
> > in struct io_kiocb's buf_index field?
>
> ->buf_group is used by io_uring_cmd_buffer_select(), and this way also
> follows ->buf_index uses in both io_uring/net.c and io_uring/rw.c.
>
>
> io_ring_buffer_select(), so we can't reuse req->buf_index here.

But io_uring/net.c and io_uring/rw.c both retrieve the buf_group value
from req->buf_index instead of the SQE, for example:
if (req->flags & REQ_F_BUFFER_SELECT)
        sr->buf_group = req->buf_index;

Seems like it would make sense to do the same for
UBLK_U_IO_FETCH_IO_CMDS. That also saves one pointer dereference here.

>
> >
> > > +       }
> > > +       return fcmd;
> > > +}
> > > +
> > > +static void ublk_batch_free_fcmd(struct ublk_batch_fcmd *fcmd)
> > > +{
> > > +       kfree(fcmd);
> > > +}
> > > +
> > > +static void __ublk_release_fcmd(struct ublk_queue *ubq)
> > > +{
> > > +       WRITE_ONCE(ubq->active_fcmd, NULL);
> > > +}
> > >
> > > -static void ublk_batch_deinit_fetch_buf(const struct ublk_batch_io_data *data,
> > > +/*
> > > + * Nothing can move on, so clear ->active_fcmd, and the caller should stop
> > > + * dispatching
> > > + */
> > > +static void ublk_batch_deinit_fetch_buf(struct ublk_queue *ubq,
> > > +                                       const struct ublk_batch_io_data *data,
> > >                                         struct ublk_batch_fcmd *fcmd,
> > >                                         int res)
> > >  {
> > > +       spin_lock(&ubq->evts_lock);
> > > +       list_del(&fcmd->node);
> > > +       WARN_ON_ONCE(fcmd != ubq->active_fcmd);
> > > +       __ublk_release_fcmd(ubq);
> > > +       spin_unlock(&ubq->evts_lock);
> > > +
> > >         io_uring_cmd_done(fcmd->cmd, res, data->issue_flags);
> > > -       fcmd->cmd = NULL;
> > > +       ublk_batch_free_fcmd(fcmd);
> > >  }
> > >
> > >  static int ublk_batch_fetch_post_cqe(struct ublk_batch_fcmd *fcmd,
> > > @@ -1491,6 +1553,8 @@ static int __ublk_batch_dispatch(struct ublk_queue *ubq,
> > >         bool needs_filter;
> > >         int ret;
> > >
> > > +       WARN_ON_ONCE(data->cmd != fcmd->cmd);
> > > +
> > >         sel = io_uring_cmd_buffer_select(fcmd->cmd, fcmd->buf_group, &len,
> > >                                          data->issue_flags);
> > >         if (sel.val < 0)
> > > @@ -1548,23 +1612,94 @@ static int __ublk_batch_dispatch(struct ublk_queue *ubq,
> > >         return ret;
> > >  }
> > >
> > > -static __maybe_unused int
> > > -ublk_batch_dispatch(struct ublk_queue *ubq,
> > > -                   const struct ublk_batch_io_data *data,
> > > -                   struct ublk_batch_fcmd *fcmd)
> > > +static struct ublk_batch_fcmd *__ublk_acquire_fcmd(
> > > +               struct ublk_queue *ubq)
> > > +{
> > > +       struct ublk_batch_fcmd *fcmd;
> > > +
> > > +       lockdep_assert_held(&ubq->evts_lock);
> > > +
> > > +       /*
> > > +        * Ordering updating ubq->evts_fifo and checking ubq->active_fcmd.
> > > +        *
> > > +        * The pair is the smp_mb() in ublk_batch_dispatch().
> > > +        *
> > > +        * If ubq->active_fcmd is observed as non-NULL, the new added tags
> > > +        * can be visisible in ublk_batch_dispatch() with the barrier pairing.
> > > +        */
> > > +       smp_mb();
> > > +       if (READ_ONCE(ubq->active_fcmd)) {
> > > +               fcmd = NULL;
> > > +       } else {
> > > +               fcmd = list_first_entry_or_null(&ubq->fcmd_head,
> > > +                               struct ublk_batch_fcmd, node);
> > > +               WRITE_ONCE(ubq->active_fcmd, fcmd);
> > > +       }
> > > +       return fcmd;
> > > +}
> > > +
> > > +static void ublk_batch_tw_cb(struct io_uring_cmd *cmd,
> > > +                          unsigned int issue_flags)
> > > +{
> > > +       struct ublk_uring_cmd_pdu *pdu = ublk_get_uring_cmd_pdu(cmd);
> > > +       struct ublk_batch_fcmd *fcmd = pdu->fcmd;
> > > +       struct ublk_batch_io_data data = {
> > > +               .ub = pdu->ubq->dev,
> > > +               .cmd = fcmd->cmd,
> > > +               .issue_flags = issue_flags,
> > > +       };
> > > +
> > > +       WARN_ON_ONCE(pdu->ubq->active_fcmd != fcmd);
> > > +
> > > +       ublk_batch_dispatch(pdu->ubq, &data, fcmd);
> > > +}
> > > +
> > > +static void ublk_batch_dispatch(struct ublk_queue *ubq,
> > > +                               struct ublk_batch_io_data *data,
> > > +                               struct ublk_batch_fcmd *fcmd)
> > >  {
> > > +       struct ublk_batch_fcmd *new_fcmd;
> >
> > Is the new_fcmd variable necessary? Can fcmd be reused instead?
> >
> > > +       void *handle;
> > > +       bool empty;
> > >         int ret = 0;
> > >
> > > +again:
> > >         while (!ublk_io_evts_empty(ubq)) {
> > >                 ret = __ublk_batch_dispatch(ubq, data, fcmd);
> > >                 if (ret <= 0)
> > >                         break;
> > >         }
> > >
> > > -       if (ret < 0)
> > > -               ublk_batch_deinit_fetch_buf(data, fcmd, ret);
> > > +       if (ret < 0) {
> > > +               ublk_batch_deinit_fetch_buf(ubq, data, fcmd, ret);
> > > +               return;
> > > +       }
> > >
> > > -       return ret;
> > > +       handle = io_uring_cmd_ctx_handle(fcmd->cmd);
> > > +       __ublk_release_fcmd(ubq);
> > > +       /*
> > > +        * Order clearing ubq->active_fcmd from __ublk_release_fcmd() and
> > > +        * checking ubq->evts_fifo.
> > > +        *
> > > +        * The pair is the smp_mb() in __ublk_acquire_fcmd().
> > > +        */
> > > +       smp_mb();
> > > +       empty = ublk_io_evts_empty(ubq);
> > > +       if (likely(empty))
> >
> > nit: empty variable seems unnecessary
> >
> > > +               return;
> > > +
> > > +       spin_lock(&ubq->evts_lock);
> > > +       new_fcmd = __ublk_acquire_fcmd(ubq);
> > > +       spin_unlock(&ubq->evts_lock);
> > > +
> > > +       if (!new_fcmd)
> > > +               return;
> > > +       if (handle == io_uring_cmd_ctx_handle(new_fcmd->cmd)) {
> >
> > This check seems to be meant to decide whether the new and old
> > UBLK_U_IO_FETCH_IO_CMDS commands can execute in the same task work?
>
> Actually not.
>
> > But belonging to the same io_uring context doesn't necessarily mean
> > that the same task issued them. It seems like it would be safer to
> > always dispatch new_fcmd->cmd to task work.
>
> What matters is just that ctx->uring_lock & issue_flag matches from ublk
> viewpoint, so it is safe to do so.

Okay, that makes sense.

>
> However, given it is hit in slow path, so starting new dispatch
> is easier.

Yeah, I'd agree it makes sense to keep the unexpected path code
simpler. There may also be fairness concerns from looping indefinitely
here if the evts_fifo continues to be nonempty, so dispatching to task
work seems safer.

>
> >
> > > +               data->cmd = new_fcmd->cmd;
> > > +               fcmd = new_fcmd;
> > > +               goto again;
> > > +       }
> > > +       io_uring_cmd_complete_in_task(new_fcmd->cmd, ublk_batch_tw_cb);
> > >  }
> > >
> > >  static void ublk_cmd_tw_cb(struct io_uring_cmd *cmd,
> > > @@ -1576,13 +1711,27 @@ static void ublk_cmd_tw_cb(struct io_uring_cmd *cmd,
> > >         ublk_dispatch_req(ubq, pdu->req, issue_flags);
> > >  }
> > >
> > > -static void ublk_queue_cmd(struct ublk_queue *ubq, struct request *rq)
> > > +static void ublk_queue_cmd(struct ublk_queue *ubq, struct request *rq, bool last)
> > >  {
> > > -       struct io_uring_cmd *cmd = ubq->ios[rq->tag].cmd;
> > > -       struct ublk_uring_cmd_pdu *pdu = ublk_get_uring_cmd_pdu(cmd);
> > > +       if (ublk_support_batch_io(ubq)) {
> > > +               unsigned short tag = rq->tag;
> > > +               struct ublk_batch_fcmd *fcmd = NULL;
> > >
> > > -       pdu->req = rq;
> > > -       io_uring_cmd_complete_in_task(cmd, ublk_cmd_tw_cb);
> > > +               spin_lock(&ubq->evts_lock);
> > > +               kfifo_put(&ubq->evts_fifo, tag);
> > > +               if (last)
> > > +                       fcmd = __ublk_acquire_fcmd(ubq);
> > > +               spin_unlock(&ubq->evts_lock);
> > > +
> > > +               if (fcmd)
> > > +                       io_uring_cmd_complete_in_task(fcmd->cmd, ublk_batch_tw_cb);
> > > +       } else {
> > > +               struct io_uring_cmd *cmd = ubq->ios[rq->tag].cmd;
> > > +               struct ublk_uring_cmd_pdu *pdu = ublk_get_uring_cmd_pdu(cmd);
> > > +
> > > +               pdu->req = rq;
> > > +               io_uring_cmd_complete_in_task(cmd, ublk_cmd_tw_cb);
> > > +       }
> > >  }
> > >
> > >  static void ublk_cmd_list_tw_cb(struct io_uring_cmd *cmd,
> > > @@ -1600,14 +1749,44 @@ static void ublk_cmd_list_tw_cb(struct io_uring_cmd *cmd,
> > >         } while (rq);
> > >  }
> > >
> > > -static void ublk_queue_cmd_list(struct ublk_io *io, struct rq_list *l)
> > > +static void ublk_batch_queue_cmd_list(struct ublk_queue *ubq, struct rq_list *l)
> > >  {
> > > -       struct io_uring_cmd *cmd = io->cmd;
> > > -       struct ublk_uring_cmd_pdu *pdu = ublk_get_uring_cmd_pdu(cmd);
> > > +       unsigned short tags[MAX_NR_TAG];
> > > +       struct ublk_batch_fcmd *fcmd;
> > > +       struct request *rq;
> > > +       unsigned cnt = 0;
> > > +
> > > +       spin_lock(&ubq->evts_lock);
> > > +       rq_list_for_each(l, rq) {
> > > +               tags[cnt++] = (unsigned short)rq->tag;
> > > +               if (cnt >= MAX_NR_TAG) {
> > > +                       kfifo_in(&ubq->evts_fifo, tags, cnt);
> > > +                       cnt = 0;
> > > +               }
> > > +       }
> > > +       if (cnt)
> > > +               kfifo_in(&ubq->evts_fifo, tags, cnt);
> > > +       fcmd = __ublk_acquire_fcmd(ubq);
> > > +       spin_unlock(&ubq->evts_lock);
> > >
> > > -       pdu->req_list = rq_list_peek(l);
> > >         rq_list_init(l);
> > > -       io_uring_cmd_complete_in_task(cmd, ublk_cmd_list_tw_cb);
> > > +       if (fcmd)
> > > +               io_uring_cmd_complete_in_task(fcmd->cmd, ublk_batch_tw_cb);
> > > +}
> > > +
> > > +static void ublk_queue_cmd_list(struct ublk_queue *ubq, struct ublk_io *io,
> > > +                               struct rq_list *l, bool batch)
> > > +{
> > > +       if (batch) {
> > > +               ublk_batch_queue_cmd_list(ubq, l);
> > > +       } else {
> > > +               struct io_uring_cmd *cmd = io->cmd;
> > > +               struct ublk_uring_cmd_pdu *pdu = ublk_get_uring_cmd_pdu(cmd);
> > > +
> > > +               pdu->req_list = rq_list_peek(l);
> > > +               rq_list_init(l);
> > > +               io_uring_cmd_complete_in_task(cmd, ublk_cmd_list_tw_cb);
> > > +       }
> > >  }
> > >
> > >  static enum blk_eh_timer_return ublk_timeout(struct request *rq)
> > > @@ -1686,7 +1865,7 @@ static blk_status_t ublk_queue_rq(struct blk_mq_hw_ctx *hctx,
> > >                 return BLK_STS_OK;
> > >         }
> > >
> > > -       ublk_queue_cmd(ubq, rq);
> > > +       ublk_queue_cmd(ubq, rq, bd->last);
> > >         return BLK_STS_OK;
> > >  }
> > >
> > > @@ -1698,11 +1877,25 @@ static inline bool ublk_belong_to_same_batch(const struct ublk_io *io,
> > >                 (io->task == io2->task);
> > >  }
> > >
> > > -static void ublk_queue_rqs(struct rq_list *rqlist)
> > > +static void ublk_commit_rqs(struct blk_mq_hw_ctx *hctx)
> > > +{
> > > +       struct ublk_queue *ubq = hctx->driver_data;
> > > +       struct ublk_batch_fcmd *fcmd;
> > > +
> > > +       spin_lock(&ubq->evts_lock);
> > > +       fcmd = __ublk_acquire_fcmd(ubq);
> > > +       spin_unlock(&ubq->evts_lock);
> > > +
> > > +       if (fcmd)
> > > +               io_uring_cmd_complete_in_task(fcmd->cmd, ublk_batch_tw_cb);
> > > +}
> > > +
> > > +static void __ublk_queue_rqs(struct rq_list *rqlist, bool batch)
> > >  {
> > >         struct rq_list requeue_list = { };
> > >         struct rq_list submit_list = { };
> > >         struct ublk_io *io = NULL;
> > > +       struct ublk_queue *ubq = NULL;
> > >         struct request *req;
> > >
> > >         while ((req = rq_list_pop(rqlist))) {
> > > @@ -1716,16 +1909,27 @@ static void ublk_queue_rqs(struct rq_list *rqlist)
> > >
> > >                 if (io && !ublk_belong_to_same_batch(io, this_io) &&
> > >                                 !rq_list_empty(&submit_list))
> > > -                       ublk_queue_cmd_list(io, &submit_list);
> > > +                       ublk_queue_cmd_list(ubq, io, &submit_list, batch);
> >
> > This seems to assume that all the requests belong to the same
> > ublk_queue, which isn't required
>
> Here, it is required for BATCH_IO, which needs new __ublk_queue_rqs()
> implementation now.
>
> Nice catch!
>
> >
> > >                 io = this_io;
> > > +               ubq = this_q;
> > >                 rq_list_add_tail(&submit_list, req);
> > >         }
> > >
> > >         if (!rq_list_empty(&submit_list))
> > > -               ublk_queue_cmd_list(io, &submit_list);
> > > +               ublk_queue_cmd_list(ubq, io, &submit_list, batch);
> >
> > Same here
> >
> > >         *rqlist = requeue_list;
> > >  }
> > >
> > > +static void ublk_queue_rqs(struct rq_list *rqlist)
> > > +{
> > > +       __ublk_queue_rqs(rqlist, false);
> > > +}
> > > +
> > > +static void ublk_batch_queue_rqs(struct rq_list *rqlist)
> > > +{
> > > +       __ublk_queue_rqs(rqlist, true);
> > > +}
> > > +
> > >  static int ublk_init_hctx(struct blk_mq_hw_ctx *hctx, void *driver_data,
> > >                 unsigned int hctx_idx)
> > >  {
> > > @@ -1743,6 +1947,14 @@ static const struct blk_mq_ops ublk_mq_ops = {
> > >         .timeout        = ublk_timeout,
> > >  };
> > >
> > > +static const struct blk_mq_ops ublk_batch_mq_ops = {
> > > +       .commit_rqs     = ublk_commit_rqs,
> > > +       .queue_rq       = ublk_queue_rq,
> > > +       .queue_rqs      = ublk_batch_queue_rqs,
> > > +       .init_hctx      = ublk_init_hctx,
> > > +       .timeout        = ublk_timeout,
> > > +};
> > > +
> > >  static void ublk_queue_reinit(struct ublk_device *ub, struct ublk_queue *ubq)
> > >  {
> > >         int i;
> > > @@ -2120,6 +2332,56 @@ static void ublk_cancel_cmd(struct ublk_queue *ubq, unsigned tag,
> > >                 io_uring_cmd_done(io->cmd, UBLK_IO_RES_ABORT, issue_flags);
> > >  }
> > >
> > > +static void ublk_batch_cancel_cmd(struct ublk_queue *ubq,
> > > +                                 struct ublk_batch_fcmd *fcmd,
> > > +                                 unsigned int issue_flags)
> > > +{
> > > +       bool done;
> > > +
> > > +       spin_lock(&ubq->evts_lock);
> > > +       done = (ubq->active_fcmd != fcmd);
> >
> > Needs to use READ_ONCE() since __ublk_release_fcmd() can be called
> > without holding evts_lock?
>
> OK.
>
> >
> > > +       if (done)
> > > +               list_del(&fcmd->node);
> > > +       spin_unlock(&ubq->evts_lock);
> > > +
> > > +       if (done) {
> > > +               io_uring_cmd_done(fcmd->cmd, UBLK_IO_RES_ABORT, issue_flags);
> > > +               ublk_batch_free_fcmd(fcmd);
> > > +       }
> > > +}
> > > +
> > > +static void ublk_batch_cancel_queue(struct ublk_queue *ubq)
> > > +{
> > > +       LIST_HEAD(fcmd_list);
> > > +
> > > +       spin_lock(&ubq->evts_lock);
> > > +       ubq->force_abort = true;
> > > +       list_splice_init(&ubq->fcmd_head, &fcmd_list);
> > > +       if (ubq->active_fcmd)
> > > +               list_move(&ubq->active_fcmd->node, &ubq->fcmd_head);
> >
> > Similarly, needs READ_ONCE()?
>
> OK.
>
> But this one may not be necessary, since now everything is just quiesced,
> and the lockless code path won't hit any more.

Good point. I think a comment to that effect would be helpful.

Best,
Caleb

>
> >
> > > +       spin_unlock(&ubq->evts_lock);
> > > +
> > > +       while (!list_empty(&fcmd_list)) {
> > > +               struct ublk_batch_fcmd *fcmd = list_first_entry(&fcmd_list,
> > > +                               struct ublk_batch_fcmd, node);
> > > +
> > > +               ublk_batch_cancel_cmd(ubq, fcmd, IO_URING_F_UNLOCKED);
> > > +       }
> > > +}
> > > +
> > > +static void ublk_batch_cancel_fn(struct io_uring_cmd *cmd,
> > > +                                unsigned int issue_flags)
> > > +{
> > > +       struct ublk_uring_cmd_pdu *pdu = ublk_get_uring_cmd_pdu(cmd);
> > > +       struct ublk_batch_fcmd *fcmd = pdu->fcmd;
> > > +       struct ublk_queue *ubq = pdu->ubq;
> > > +
> > > +       if (!ubq->canceling)
> >
> > Is it not racy to access ubq->canceling without any lock held?
>
> OK, will switch to call ublk_start_cancel() unconditionally.
>
> >
> > > +               ublk_start_cancel(ubq->dev);
> > > +
> > > +       ublk_batch_cancel_cmd(ubq, fcmd, issue_flags);
> > > +}
> > > +
> > >  /*
> > >   * The ublk char device won't be closed when calling cancel fn, so both
> > >   * ublk device and queue are guaranteed to be live
> > > @@ -2171,6 +2433,11 @@ static void ublk_cancel_queue(struct ublk_queue *ubq)
> > >  {
> > >         int i;
> > >
> > > +       if (ublk_support_batch_io(ubq)) {
> > > +               ublk_batch_cancel_queue(ubq);
> > > +               return;
> > > +       }
> > > +
> > >         for (i = 0; i < ubq->q_depth; i++)
> > >                 ublk_cancel_cmd(ubq, i, IO_URING_F_UNLOCKED);
> > >  }
> > > @@ -3091,6 +3358,74 @@ static int ublk_check_batch_cmd(const struct ublk_batch_io_data *data)
> > >         return ublk_check_batch_cmd_flags(uc);
> > >  }
> > >
> > > +static int ublk_batch_attach(struct ublk_queue *ubq,
> > > +                            struct ublk_batch_io_data *data,
> > > +                            struct ublk_batch_fcmd *fcmd)
> > > +{
> > > +       struct ublk_batch_fcmd *new_fcmd = NULL;
> > > +       bool free = false;
> > > +
> > > +       spin_lock(&ubq->evts_lock);
> > > +       if (unlikely(ubq->force_abort || ubq->canceling)) {
> > > +               free = true;
> > > +       } else {
> > > +               list_add_tail(&fcmd->node, &ubq->fcmd_head);
> > > +               new_fcmd = __ublk_acquire_fcmd(ubq);
> > > +       }
> > > +       spin_unlock(&ubq->evts_lock);
> > > +
> > > +       /*
> > > +        * If the two fetch commands are originated from same io_ring_ctx,
> > > +        * run batch dispatch directly. Otherwise, schedule task work for
> > > +        * doing it.
> > > +        */
> > > +       if (new_fcmd && io_uring_cmd_ctx_handle(new_fcmd->cmd) ==
> > > +                       io_uring_cmd_ctx_handle(fcmd->cmd)) {
> > > +               data->cmd = new_fcmd->cmd;
> > > +               ublk_batch_dispatch(ubq, data, new_fcmd);
> > > +       } else if (new_fcmd) {
> > > +               io_uring_cmd_complete_in_task(new_fcmd->cmd,
> > > +                               ublk_batch_tw_cb);
> > > +       }
> >
> > Return early if (!new_fcmd) to reduce indentation?
> >
> > > +
> > > +       if (free) {
> > > +               ublk_batch_free_fcmd(fcmd);
> > > +               return -ENODEV;
> > > +       }
> >
> > Move the if (free) check directly after spin_unlock(&ubq->evts_lock)?
>
> Yeah, this is better.
>
> >
> > > +       return -EIOCBQUEUED;
> >
> > > +}
> > > +
> > > +static int ublk_handle_batch_fetch_cmd(struct ublk_batch_io_data *data)
> > > +{
> > > +       struct ublk_queue *ubq = ublk_get_queue(data->ub, data->header.q_id);
> > > +       struct ublk_batch_fcmd *fcmd = ublk_batch_alloc_fcmd(data->cmd);
> > > +       struct ublk_uring_cmd_pdu *pdu = ublk_get_uring_cmd_pdu(data->cmd);
> > > +
> > > +       if (!fcmd)
> > > +               return -ENOMEM;
> > > +
> > > +       pdu->ubq = ubq;
> > > +       pdu->fcmd = fcmd;
> > > +       io_uring_cmd_mark_cancelable(data->cmd, data->issue_flags);
> > > +
> > > +       return ublk_batch_attach(ubq, data, fcmd);
> > > +}
> > > +
> > > +static int ublk_validate_batch_fetch_cmd(struct ublk_batch_io_data *data,
> > > +                                        const struct ublk_batch_io *uc)
> > > +{
> > > +       if (!(data->cmd->flags & IORING_URING_CMD_MULTISHOT))
> > > +               return -EINVAL;
> > > +
> > > +       if (uc->elem_bytes != sizeof(__u16))
> > > +               return -EINVAL;
> > > +
> > > +       if (uc->flags != 0)
> > > +               return -E2BIG;
> > > +
> > > +       return 0;
> > > +}
> > > +
> > >  static int ublk_ch_batch_io_uring_cmd(struct io_uring_cmd *cmd,
> > >                                        unsigned int issue_flags)
> > >  {
> > > @@ -3113,6 +3448,11 @@ static int ublk_ch_batch_io_uring_cmd(struct io_uring_cmd *cmd,
> > >         if (data.header.q_id >= ub->dev_info.nr_hw_queues)
> > >                 goto out;
> > >
> > > +       if (unlikely(issue_flags & IO_URING_F_CANCEL)) {
> > > +               ublk_batch_cancel_fn(cmd, issue_flags);
> > > +               return 0;
> > > +       }
> >
> > Move this to the top of the function before the other logic that's not
> > necessary in the cancel case?
>
> Yeah, looks better.
>
>
> Thanks,
> Ming
>
Re: [PATCH V4 14/27] ublk: add UBLK_U_IO_FETCH_IO_CMDS for batch I/O processing
Posted by Ming Lei 23 minutes ago
On Mon, Dec 01, 2025 at 09:51:59AM -0800, Caleb Sander Mateos wrote:
> On Mon, Dec 1, 2025 at 1:42 AM Ming Lei <ming.lei@redhat.com> wrote:
> >
> > On Sun, Nov 30, 2025 at 09:55:47PM -0800, Caleb Sander Mateos wrote:
> > > On Thu, Nov 20, 2025 at 6:00 PM Ming Lei <ming.lei@redhat.com> wrote:
> > > >
> > > > Add UBLK_U_IO_FETCH_IO_CMDS command to enable efficient batch processing
> > > > of I/O requests. This multishot uring_cmd allows the ublk server to fetch
> > > > multiple I/O commands in a single operation, significantly reducing
> > > > submission overhead compared to individual FETCH_REQ* commands.
> > > >
> > > > Key Design Features:
> > > >
> > > > 1. Multishot Operation: One UBLK_U_IO_FETCH_IO_CMDS can fetch many I/O
> > > >    commands, with the batch size limited by the provided buffer length.
> > > >
> > > > 2. Dynamic Load Balancing: Multiple fetch commands can be submitted
> > > >    simultaneously, but only one is active at any time. This enables
> > > >    efficient load distribution across multiple server task contexts.
> > > >
> > > > 3. Implicit State Management: The implementation uses three key variables
> > > >    to track state:
> > > >    - evts_fifo: Queue of request tags awaiting processing
> > > >    - fcmd_head: List of available fetch commands
> > > >    - active_fcmd: Currently active fetch command (NULL = none active)
> > > >
> > > >    States are derived implicitly:
> > > >    - IDLE: No fetch commands available
> > > >    - READY: Fetch commands available, none active
> > > >    - ACTIVE: One fetch command processing events
> > > >
> > > > 4. Lockless Reader Optimization: The active fetch command can read from
> > > >    evts_fifo without locking (single reader guarantee), while writers
> > > >    (ublk_queue_rq/ublk_queue_rqs) use evts_lock protection. The memory
> > > >    barrier pairing plays key role for the single lockless reader
> > > >    optimization.
> > > >
> > > > Implementation Details:
> > > >
> > > > - ublk_queue_rq() and ublk_queue_rqs() save request tags to evts_fifo
> > > > - __ublk_pick_active_fcmd() selects an available fetch command when
> > > >   events arrive and no command is currently active
> > >
> > > What is __ublk_pick_active_fcmd()? I don't see a function with that name.
> >
> > It is renamed as __ublk_acquire_fcmd(), and its counter pair is
> > __ublk_release_fcmd().
> 
> Okay, update the commit message then?
> 
> >
> > >
> > > > - ublk_batch_dispatch() moves tags from evts_fifo to the fetch command's
> > > >   buffer and posts completion via io_uring_mshot_cmd_post_cqe()
> > > > - State transitions are coordinated via evts_lock to maintain consistency
> > > >
> > > > Signed-off-by: Ming Lei <ming.lei@redhat.com>
> > > > ---
> > > >  drivers/block/ublk_drv.c      | 412 +++++++++++++++++++++++++++++++---
> > > >  include/uapi/linux/ublk_cmd.h |   7 +
> > > >  2 files changed, 388 insertions(+), 31 deletions(-)
> > > >
> > > > diff --git a/drivers/block/ublk_drv.c b/drivers/block/ublk_drv.c
> > > > index cc9c92d97349..2e5e392c939e 100644
> > > > --- a/drivers/block/ublk_drv.c
> > > > +++ b/drivers/block/ublk_drv.c
> > > > @@ -93,6 +93,7 @@
> > > >
> > > >  /* ublk batch fetch uring_cmd */
> > > >  struct ublk_batch_fcmd {
> > > > +       struct list_head node;
> > > >         struct io_uring_cmd *cmd;
> > > >         unsigned short buf_group;
> > > >  };
> > > > @@ -117,7 +118,10 @@ struct ublk_uring_cmd_pdu {
> > > >          */
> > > >         struct ublk_queue *ubq;
> > > >
> > > > -       u16 tag;
> > > > +       union {
> > > > +               u16 tag;
> > > > +               struct ublk_batch_fcmd *fcmd; /* batch io only */
> > > > +       };
> > > >  };
> > > >
> > > >  struct ublk_batch_io_data {
> > > > @@ -229,18 +233,36 @@ struct ublk_queue {
> > > >         struct ublk_device *dev;
> > > >
> > > >         /*
> > > > -        * Inflight ublk request tag is saved in this fifo
> > > > +        * Batch I/O State Management:
> > > > +        *
> > > > +        * The batch I/O system uses implicit state management based on the
> > > > +        * combination of three key variables below.
> > > > +        *
> > > > +        * - IDLE: list_empty(&fcmd_head) && !active_fcmd
> > > > +        *   No fetch commands available, events queue in evts_fifo
> > > > +        *
> > > > +        * - READY: !list_empty(&fcmd_head) && !active_fcmd
> > > > +        *   Fetch commands available but none processing events
> > > >          *
> > > > -        * There are multiple writer from ublk_queue_rq() or ublk_queue_rqs(),
> > > > -        * so lock is required for storing request tag to fifo
> > > > +        * - ACTIVE: active_fcmd
> > > > +        *   One fetch command actively processing events from evts_fifo
> > > >          *
> > > > -        * Make sure just one reader for fetching request from task work
> > > > -        * function to ublk server, so no need to grab the lock in reader
> > > > -        * side.
> > > > +        * Key Invariants:
> > > > +        * - At most one active_fcmd at any time (single reader)
> > > > +        * - active_fcmd is always from fcmd_head list when non-NULL
> > > > +        * - evts_fifo can be read locklessly by the single active reader
> > > > +        * - All state transitions require evts_lock protection
> > > > +        * - Multiple writers to evts_fifo require lock protection
> > > >          */
> > > >         struct {
> > > >                 DECLARE_KFIFO_PTR(evts_fifo, unsigned short);
> > > >                 spinlock_t evts_lock;
> > > > +
> > > > +               /* List of fetch commands available to process events */
> > > > +               struct list_head fcmd_head;
> > > > +
> > > > +               /* Currently active fetch command (NULL = none active) */
> > > > +               struct ublk_batch_fcmd  *active_fcmd;
> > > >         }____cacheline_aligned_in_smp;
> > > >
> > > >         struct ublk_io ios[] __counted_by(q_depth);
> > > > @@ -292,12 +314,20 @@ static void ublk_abort_queue(struct ublk_device *ub, struct ublk_queue *ubq);
> > > >  static inline struct request *__ublk_check_and_get_req(struct ublk_device *ub,
> > > >                 u16 q_id, u16 tag, struct ublk_io *io, size_t offset);
> > > >  static inline unsigned int ublk_req_build_flags(struct request *req);
> > > > +static void ublk_batch_dispatch(struct ublk_queue *ubq,
> > > > +                               struct ublk_batch_io_data *data,
> > > > +                               struct ublk_batch_fcmd *fcmd);
> > > >
> > > >  static inline bool ublk_dev_support_batch_io(const struct ublk_device *ub)
> > > >  {
> > > >         return false;
> > > >  }
> > > >
> > > > +static inline bool ublk_support_batch_io(const struct ublk_queue *ubq)
> > > > +{
> > > > +       return false;
> > > > +}
> > > > +
> > > >  static inline void ublk_io_lock(struct ublk_io *io)
> > > >  {
> > > >         spin_lock(&io->lock);
> > > > @@ -624,13 +654,45 @@ static wait_queue_head_t ublk_idr_wq;     /* wait until one idr is freed */
> > > >
> > > >  static DEFINE_MUTEX(ublk_ctl_mutex);
> > > >
> > > > +static struct ublk_batch_fcmd *
> > > > +ublk_batch_alloc_fcmd(struct io_uring_cmd *cmd)
> > > > +{
> > > > +       struct ublk_batch_fcmd *fcmd = kzalloc(sizeof(*fcmd), GFP_NOIO);
> > >
> > > An allocation in the I/O path seems unfortunate. Is there not room to
> > > store the struct ublk_batch_fcmd in the io_uring_cmd pdu?
> >
> > It is allocated once for one mshot request, which covers many IOs.
> >
> > It can't be held in uring_cmd pdu, but the allocation can be optimized in
> > future. Not a big deal in enablement stage.
> 
> Okay, seems fine to optimize it in the future.
> 
> >
> > > > +
> > > > +       if (fcmd) {
> > > > +               fcmd->cmd = cmd;
> > > > +               fcmd->buf_group = READ_ONCE(cmd->sqe->buf_index);
> > >
> > > Is it necessary to store sample this here just to pass it back to the
> > > io_uring layer? Wouldn't the io_uring layer already have access to it
> > > in struct io_kiocb's buf_index field?
> >
> > ->buf_group is used by io_uring_cmd_buffer_select(), and this way also
> > follows ->buf_index uses in both io_uring/net.c and io_uring/rw.c.
> >
> >
> > io_ring_buffer_select(), so we can't reuse req->buf_index here.
> 
> But io_uring/net.c and io_uring/rw.c both retrieve the buf_group value
> from req->buf_index instead of the SQE, for example:
> if (req->flags & REQ_F_BUFFER_SELECT)
>         sr->buf_group = req->buf_index;
> 
> Seems like it would make sense to do the same for
> UBLK_U_IO_FETCH_IO_CMDS. That also saves one pointer dereference here.

IMO we shouldn't encourage driver to access `io_kiocb`, however, cmd->sqe
is exposed to driver explicitly.

> 
> >
> > >
> > > > +       }
> > > > +       return fcmd;
> > > > +}
> > > > +
> > > > +static void ublk_batch_free_fcmd(struct ublk_batch_fcmd *fcmd)
> > > > +{
> > > > +       kfree(fcmd);
> > > > +}
> > > > +
> > > > +static void __ublk_release_fcmd(struct ublk_queue *ubq)
> > > > +{
> > > > +       WRITE_ONCE(ubq->active_fcmd, NULL);
> > > > +}
> > > >
> > > > -static void ublk_batch_deinit_fetch_buf(const struct ublk_batch_io_data *data,
> > > > +/*
> > > > + * Nothing can move on, so clear ->active_fcmd, and the caller should stop
> > > > + * dispatching
> > > > + */
> > > > +static void ublk_batch_deinit_fetch_buf(struct ublk_queue *ubq,
> > > > +                                       const struct ublk_batch_io_data *data,
> > > >                                         struct ublk_batch_fcmd *fcmd,
> > > >                                         int res)
> > > >  {
> > > > +       spin_lock(&ubq->evts_lock);
> > > > +       list_del(&fcmd->node);
> > > > +       WARN_ON_ONCE(fcmd != ubq->active_fcmd);
> > > > +       __ublk_release_fcmd(ubq);
> > > > +       spin_unlock(&ubq->evts_lock);
> > > > +
> > > >         io_uring_cmd_done(fcmd->cmd, res, data->issue_flags);
> > > > -       fcmd->cmd = NULL;
> > > > +       ublk_batch_free_fcmd(fcmd);
> > > >  }
> > > >
> > > >  static int ublk_batch_fetch_post_cqe(struct ublk_batch_fcmd *fcmd,
> > > > @@ -1491,6 +1553,8 @@ static int __ublk_batch_dispatch(struct ublk_queue *ubq,
> > > >         bool needs_filter;
> > > >         int ret;
> > > >
> > > > +       WARN_ON_ONCE(data->cmd != fcmd->cmd);
> > > > +
> > > >         sel = io_uring_cmd_buffer_select(fcmd->cmd, fcmd->buf_group, &len,
> > > >                                          data->issue_flags);
> > > >         if (sel.val < 0)
> > > > @@ -1548,23 +1612,94 @@ static int __ublk_batch_dispatch(struct ublk_queue *ubq,
> > > >         return ret;
> > > >  }
> > > >
> > > > -static __maybe_unused int
> > > > -ublk_batch_dispatch(struct ublk_queue *ubq,
> > > > -                   const struct ublk_batch_io_data *data,
> > > > -                   struct ublk_batch_fcmd *fcmd)
> > > > +static struct ublk_batch_fcmd *__ublk_acquire_fcmd(
> > > > +               struct ublk_queue *ubq)
> > > > +{
> > > > +       struct ublk_batch_fcmd *fcmd;
> > > > +
> > > > +       lockdep_assert_held(&ubq->evts_lock);
> > > > +
> > > > +       /*
> > > > +        * Ordering updating ubq->evts_fifo and checking ubq->active_fcmd.
> > > > +        *
> > > > +        * The pair is the smp_mb() in ublk_batch_dispatch().
> > > > +        *
> > > > +        * If ubq->active_fcmd is observed as non-NULL, the new added tags
> > > > +        * can be visisible in ublk_batch_dispatch() with the barrier pairing.
> > > > +        */
> > > > +       smp_mb();
> > > > +       if (READ_ONCE(ubq->active_fcmd)) {
> > > > +               fcmd = NULL;
> > > > +       } else {
> > > > +               fcmd = list_first_entry_or_null(&ubq->fcmd_head,
> > > > +                               struct ublk_batch_fcmd, node);
> > > > +               WRITE_ONCE(ubq->active_fcmd, fcmd);
> > > > +       }
> > > > +       return fcmd;
> > > > +}
> > > > +
> > > > +static void ublk_batch_tw_cb(struct io_uring_cmd *cmd,
> > > > +                          unsigned int issue_flags)
> > > > +{
> > > > +       struct ublk_uring_cmd_pdu *pdu = ublk_get_uring_cmd_pdu(cmd);
> > > > +       struct ublk_batch_fcmd *fcmd = pdu->fcmd;
> > > > +       struct ublk_batch_io_data data = {
> > > > +               .ub = pdu->ubq->dev,
> > > > +               .cmd = fcmd->cmd,
> > > > +               .issue_flags = issue_flags,
> > > > +       };
> > > > +
> > > > +       WARN_ON_ONCE(pdu->ubq->active_fcmd != fcmd);
> > > > +
> > > > +       ublk_batch_dispatch(pdu->ubq, &data, fcmd);
> > > > +}
> > > > +
> > > > +static void ublk_batch_dispatch(struct ublk_queue *ubq,
> > > > +                               struct ublk_batch_io_data *data,
> > > > +                               struct ublk_batch_fcmd *fcmd)
> > > >  {
> > > > +       struct ublk_batch_fcmd *new_fcmd;
> > >
> > > Is the new_fcmd variable necessary? Can fcmd be reused instead?
> > >
> > > > +       void *handle;
> > > > +       bool empty;
> > > >         int ret = 0;
> > > >
> > > > +again:
> > > >         while (!ublk_io_evts_empty(ubq)) {
> > > >                 ret = __ublk_batch_dispatch(ubq, data, fcmd);
> > > >                 if (ret <= 0)
> > > >                         break;
> > > >         }
> > > >
> > > > -       if (ret < 0)
> > > > -               ublk_batch_deinit_fetch_buf(data, fcmd, ret);
> > > > +       if (ret < 0) {
> > > > +               ublk_batch_deinit_fetch_buf(ubq, data, fcmd, ret);
> > > > +               return;
> > > > +       }
> > > >
> > > > -       return ret;
> > > > +       handle = io_uring_cmd_ctx_handle(fcmd->cmd);
> > > > +       __ublk_release_fcmd(ubq);
> > > > +       /*
> > > > +        * Order clearing ubq->active_fcmd from __ublk_release_fcmd() and
> > > > +        * checking ubq->evts_fifo.
> > > > +        *
> > > > +        * The pair is the smp_mb() in __ublk_acquire_fcmd().
> > > > +        */
> > > > +       smp_mb();
> > > > +       empty = ublk_io_evts_empty(ubq);
> > > > +       if (likely(empty))
> > >
> > > nit: empty variable seems unnecessary
> > >
> > > > +               return;
> > > > +
> > > > +       spin_lock(&ubq->evts_lock);
> > > > +       new_fcmd = __ublk_acquire_fcmd(ubq);
> > > > +       spin_unlock(&ubq->evts_lock);
> > > > +
> > > > +       if (!new_fcmd)
> > > > +               return;
> > > > +       if (handle == io_uring_cmd_ctx_handle(new_fcmd->cmd)) {
> > >
> > > This check seems to be meant to decide whether the new and old
> > > UBLK_U_IO_FETCH_IO_CMDS commands can execute in the same task work?
> >
> > Actually not.
> >
> > > But belonging to the same io_uring context doesn't necessarily mean
> > > that the same task issued them. It seems like it would be safer to
> > > always dispatch new_fcmd->cmd to task work.
> >
> > What matters is just that ctx->uring_lock & issue_flag matches from ublk
> > viewpoint, so it is safe to do so.
> 
> Okay, that makes sense.
> 
> >
> > However, given it is hit in slow path, so starting new dispatch
> > is easier.
> 
> Yeah, I'd agree it makes sense to keep the unexpected path code
> simpler. There may also be fairness concerns from looping indefinitely
> here if the evts_fifo continues to be nonempty, so dispatching to task
> work seems safer.

Fair enough.

Thanks,
Ming

Re: [PATCH V4 14/27] ublk: add UBLK_U_IO_FETCH_IO_CMDS for batch I/O processing
Posted by Caleb Sander Mateos 11 minutes ago
On Mon, Dec 1, 2025 at 5:27 PM Ming Lei <ming.lei@redhat.com> wrote:
>
> On Mon, Dec 01, 2025 at 09:51:59AM -0800, Caleb Sander Mateos wrote:
> > On Mon, Dec 1, 2025 at 1:42 AM Ming Lei <ming.lei@redhat.com> wrote:
> > >
> > > On Sun, Nov 30, 2025 at 09:55:47PM -0800, Caleb Sander Mateos wrote:
> > > > On Thu, Nov 20, 2025 at 6:00 PM Ming Lei <ming.lei@redhat.com> wrote:
> > > > >
> > > > > Add UBLK_U_IO_FETCH_IO_CMDS command to enable efficient batch processing
> > > > > of I/O requests. This multishot uring_cmd allows the ublk server to fetch
> > > > > multiple I/O commands in a single operation, significantly reducing
> > > > > submission overhead compared to individual FETCH_REQ* commands.
> > > > >
> > > > > Key Design Features:
> > > > >
> > > > > 1. Multishot Operation: One UBLK_U_IO_FETCH_IO_CMDS can fetch many I/O
> > > > >    commands, with the batch size limited by the provided buffer length.
> > > > >
> > > > > 2. Dynamic Load Balancing: Multiple fetch commands can be submitted
> > > > >    simultaneously, but only one is active at any time. This enables
> > > > >    efficient load distribution across multiple server task contexts.
> > > > >
> > > > > 3. Implicit State Management: The implementation uses three key variables
> > > > >    to track state:
> > > > >    - evts_fifo: Queue of request tags awaiting processing
> > > > >    - fcmd_head: List of available fetch commands
> > > > >    - active_fcmd: Currently active fetch command (NULL = none active)
> > > > >
> > > > >    States are derived implicitly:
> > > > >    - IDLE: No fetch commands available
> > > > >    - READY: Fetch commands available, none active
> > > > >    - ACTIVE: One fetch command processing events
> > > > >
> > > > > 4. Lockless Reader Optimization: The active fetch command can read from
> > > > >    evts_fifo without locking (single reader guarantee), while writers
> > > > >    (ublk_queue_rq/ublk_queue_rqs) use evts_lock protection. The memory
> > > > >    barrier pairing plays key role for the single lockless reader
> > > > >    optimization.
> > > > >
> > > > > Implementation Details:
> > > > >
> > > > > - ublk_queue_rq() and ublk_queue_rqs() save request tags to evts_fifo
> > > > > - __ublk_pick_active_fcmd() selects an available fetch command when
> > > > >   events arrive and no command is currently active
> > > >
> > > > What is __ublk_pick_active_fcmd()? I don't see a function with that name.
> > >
> > > It is renamed as __ublk_acquire_fcmd(), and its counter pair is
> > > __ublk_release_fcmd().
> >
> > Okay, update the commit message then?
> >
> > >
> > > >
> > > > > - ublk_batch_dispatch() moves tags from evts_fifo to the fetch command's
> > > > >   buffer and posts completion via io_uring_mshot_cmd_post_cqe()
> > > > > - State transitions are coordinated via evts_lock to maintain consistency
> > > > >
> > > > > Signed-off-by: Ming Lei <ming.lei@redhat.com>
> > > > > ---
> > > > >  drivers/block/ublk_drv.c      | 412 +++++++++++++++++++++++++++++++---
> > > > >  include/uapi/linux/ublk_cmd.h |   7 +
> > > > >  2 files changed, 388 insertions(+), 31 deletions(-)
> > > > >
> > > > > diff --git a/drivers/block/ublk_drv.c b/drivers/block/ublk_drv.c
> > > > > index cc9c92d97349..2e5e392c939e 100644
> > > > > --- a/drivers/block/ublk_drv.c
> > > > > +++ b/drivers/block/ublk_drv.c
> > > > > @@ -93,6 +93,7 @@
> > > > >
> > > > >  /* ublk batch fetch uring_cmd */
> > > > >  struct ublk_batch_fcmd {
> > > > > +       struct list_head node;
> > > > >         struct io_uring_cmd *cmd;
> > > > >         unsigned short buf_group;
> > > > >  };
> > > > > @@ -117,7 +118,10 @@ struct ublk_uring_cmd_pdu {
> > > > >          */
> > > > >         struct ublk_queue *ubq;
> > > > >
> > > > > -       u16 tag;
> > > > > +       union {
> > > > > +               u16 tag;
> > > > > +               struct ublk_batch_fcmd *fcmd; /* batch io only */
> > > > > +       };
> > > > >  };
> > > > >
> > > > >  struct ublk_batch_io_data {
> > > > > @@ -229,18 +233,36 @@ struct ublk_queue {
> > > > >         struct ublk_device *dev;
> > > > >
> > > > >         /*
> > > > > -        * Inflight ublk request tag is saved in this fifo
> > > > > +        * Batch I/O State Management:
> > > > > +        *
> > > > > +        * The batch I/O system uses implicit state management based on the
> > > > > +        * combination of three key variables below.
> > > > > +        *
> > > > > +        * - IDLE: list_empty(&fcmd_head) && !active_fcmd
> > > > > +        *   No fetch commands available, events queue in evts_fifo
> > > > > +        *
> > > > > +        * - READY: !list_empty(&fcmd_head) && !active_fcmd
> > > > > +        *   Fetch commands available but none processing events
> > > > >          *
> > > > > -        * There are multiple writer from ublk_queue_rq() or ublk_queue_rqs(),
> > > > > -        * so lock is required for storing request tag to fifo
> > > > > +        * - ACTIVE: active_fcmd
> > > > > +        *   One fetch command actively processing events from evts_fifo
> > > > >          *
> > > > > -        * Make sure just one reader for fetching request from task work
> > > > > -        * function to ublk server, so no need to grab the lock in reader
> > > > > -        * side.
> > > > > +        * Key Invariants:
> > > > > +        * - At most one active_fcmd at any time (single reader)
> > > > > +        * - active_fcmd is always from fcmd_head list when non-NULL
> > > > > +        * - evts_fifo can be read locklessly by the single active reader
> > > > > +        * - All state transitions require evts_lock protection
> > > > > +        * - Multiple writers to evts_fifo require lock protection
> > > > >          */
> > > > >         struct {
> > > > >                 DECLARE_KFIFO_PTR(evts_fifo, unsigned short);
> > > > >                 spinlock_t evts_lock;
> > > > > +
> > > > > +               /* List of fetch commands available to process events */
> > > > > +               struct list_head fcmd_head;
> > > > > +
> > > > > +               /* Currently active fetch command (NULL = none active) */
> > > > > +               struct ublk_batch_fcmd  *active_fcmd;
> > > > >         }____cacheline_aligned_in_smp;
> > > > >
> > > > >         struct ublk_io ios[] __counted_by(q_depth);
> > > > > @@ -292,12 +314,20 @@ static void ublk_abort_queue(struct ublk_device *ub, struct ublk_queue *ubq);
> > > > >  static inline struct request *__ublk_check_and_get_req(struct ublk_device *ub,
> > > > >                 u16 q_id, u16 tag, struct ublk_io *io, size_t offset);
> > > > >  static inline unsigned int ublk_req_build_flags(struct request *req);
> > > > > +static void ublk_batch_dispatch(struct ublk_queue *ubq,
> > > > > +                               struct ublk_batch_io_data *data,
> > > > > +                               struct ublk_batch_fcmd *fcmd);
> > > > >
> > > > >  static inline bool ublk_dev_support_batch_io(const struct ublk_device *ub)
> > > > >  {
> > > > >         return false;
> > > > >  }
> > > > >
> > > > > +static inline bool ublk_support_batch_io(const struct ublk_queue *ubq)
> > > > > +{
> > > > > +       return false;
> > > > > +}
> > > > > +
> > > > >  static inline void ublk_io_lock(struct ublk_io *io)
> > > > >  {
> > > > >         spin_lock(&io->lock);
> > > > > @@ -624,13 +654,45 @@ static wait_queue_head_t ublk_idr_wq;     /* wait until one idr is freed */
> > > > >
> > > > >  static DEFINE_MUTEX(ublk_ctl_mutex);
> > > > >
> > > > > +static struct ublk_batch_fcmd *
> > > > > +ublk_batch_alloc_fcmd(struct io_uring_cmd *cmd)
> > > > > +{
> > > > > +       struct ublk_batch_fcmd *fcmd = kzalloc(sizeof(*fcmd), GFP_NOIO);
> > > >
> > > > An allocation in the I/O path seems unfortunate. Is there not room to
> > > > store the struct ublk_batch_fcmd in the io_uring_cmd pdu?
> > >
> > > It is allocated once for one mshot request, which covers many IOs.
> > >
> > > It can't be held in uring_cmd pdu, but the allocation can be optimized in
> > > future. Not a big deal in enablement stage.
> >
> > Okay, seems fine to optimize it in the future.
> >
> > >
> > > > > +
> > > > > +       if (fcmd) {
> > > > > +               fcmd->cmd = cmd;
> > > > > +               fcmd->buf_group = READ_ONCE(cmd->sqe->buf_index);
> > > >
> > > > Is it necessary to store sample this here just to pass it back to the
> > > > io_uring layer? Wouldn't the io_uring layer already have access to it
> > > > in struct io_kiocb's buf_index field?
> > >
> > > ->buf_group is used by io_uring_cmd_buffer_select(), and this way also
> > > follows ->buf_index uses in both io_uring/net.c and io_uring/rw.c.
> > >
> > >
> > > io_ring_buffer_select(), so we can't reuse req->buf_index here.
> >
> > But io_uring/net.c and io_uring/rw.c both retrieve the buf_group value
> > from req->buf_index instead of the SQE, for example:
> > if (req->flags & REQ_F_BUFFER_SELECT)
> >         sr->buf_group = req->buf_index;
> >
> > Seems like it would make sense to do the same for
> > UBLK_U_IO_FETCH_IO_CMDS. That also saves one pointer dereference here.
>
> IMO we shouldn't encourage driver to access `io_kiocb`, however, cmd->sqe
> is exposed to driver explicitly.

Right, but we can add a helper in include/linux/io_uring/cmd.h to
encapsulate accessing the io_kiocb field.

Best,
Caleb

>
> >
> > >
> > > >
> > > > > +       }
> > > > > +       return fcmd;
> > > > > +}
> > > > > +
> > > > > +static void ublk_batch_free_fcmd(struct ublk_batch_fcmd *fcmd)
> > > > > +{
> > > > > +       kfree(fcmd);
> > > > > +}
> > > > > +
> > > > > +static void __ublk_release_fcmd(struct ublk_queue *ubq)
> > > > > +{
> > > > > +       WRITE_ONCE(ubq->active_fcmd, NULL);
> > > > > +}
> > > > >
> > > > > -static void ublk_batch_deinit_fetch_buf(const struct ublk_batch_io_data *data,
> > > > > +/*
> > > > > + * Nothing can move on, so clear ->active_fcmd, and the caller should stop
> > > > > + * dispatching
> > > > > + */
> > > > > +static void ublk_batch_deinit_fetch_buf(struct ublk_queue *ubq,
> > > > > +                                       const struct ublk_batch_io_data *data,
> > > > >                                         struct ublk_batch_fcmd *fcmd,
> > > > >                                         int res)
> > > > >  {
> > > > > +       spin_lock(&ubq->evts_lock);
> > > > > +       list_del(&fcmd->node);
> > > > > +       WARN_ON_ONCE(fcmd != ubq->active_fcmd);
> > > > > +       __ublk_release_fcmd(ubq);
> > > > > +       spin_unlock(&ubq->evts_lock);
> > > > > +
> > > > >         io_uring_cmd_done(fcmd->cmd, res, data->issue_flags);
> > > > > -       fcmd->cmd = NULL;
> > > > > +       ublk_batch_free_fcmd(fcmd);
> > > > >  }
> > > > >
> > > > >  static int ublk_batch_fetch_post_cqe(struct ublk_batch_fcmd *fcmd,
> > > > > @@ -1491,6 +1553,8 @@ static int __ublk_batch_dispatch(struct ublk_queue *ubq,
> > > > >         bool needs_filter;
> > > > >         int ret;
> > > > >
> > > > > +       WARN_ON_ONCE(data->cmd != fcmd->cmd);
> > > > > +
> > > > >         sel = io_uring_cmd_buffer_select(fcmd->cmd, fcmd->buf_group, &len,
> > > > >                                          data->issue_flags);
> > > > >         if (sel.val < 0)
> > > > > @@ -1548,23 +1612,94 @@ static int __ublk_batch_dispatch(struct ublk_queue *ubq,
> > > > >         return ret;
> > > > >  }
> > > > >
> > > > > -static __maybe_unused int
> > > > > -ublk_batch_dispatch(struct ublk_queue *ubq,
> > > > > -                   const struct ublk_batch_io_data *data,
> > > > > -                   struct ublk_batch_fcmd *fcmd)
> > > > > +static struct ublk_batch_fcmd *__ublk_acquire_fcmd(
> > > > > +               struct ublk_queue *ubq)
> > > > > +{
> > > > > +       struct ublk_batch_fcmd *fcmd;
> > > > > +
> > > > > +       lockdep_assert_held(&ubq->evts_lock);
> > > > > +
> > > > > +       /*
> > > > > +        * Ordering updating ubq->evts_fifo and checking ubq->active_fcmd.
> > > > > +        *
> > > > > +        * The pair is the smp_mb() in ublk_batch_dispatch().
> > > > > +        *
> > > > > +        * If ubq->active_fcmd is observed as non-NULL, the new added tags
> > > > > +        * can be visisible in ublk_batch_dispatch() with the barrier pairing.
> > > > > +        */
> > > > > +       smp_mb();
> > > > > +       if (READ_ONCE(ubq->active_fcmd)) {
> > > > > +               fcmd = NULL;
> > > > > +       } else {
> > > > > +               fcmd = list_first_entry_or_null(&ubq->fcmd_head,
> > > > > +                               struct ublk_batch_fcmd, node);
> > > > > +               WRITE_ONCE(ubq->active_fcmd, fcmd);
> > > > > +       }
> > > > > +       return fcmd;
> > > > > +}
> > > > > +
> > > > > +static void ublk_batch_tw_cb(struct io_uring_cmd *cmd,
> > > > > +                          unsigned int issue_flags)
> > > > > +{
> > > > > +       struct ublk_uring_cmd_pdu *pdu = ublk_get_uring_cmd_pdu(cmd);
> > > > > +       struct ublk_batch_fcmd *fcmd = pdu->fcmd;
> > > > > +       struct ublk_batch_io_data data = {
> > > > > +               .ub = pdu->ubq->dev,
> > > > > +               .cmd = fcmd->cmd,
> > > > > +               .issue_flags = issue_flags,
> > > > > +       };
> > > > > +
> > > > > +       WARN_ON_ONCE(pdu->ubq->active_fcmd != fcmd);
> > > > > +
> > > > > +       ublk_batch_dispatch(pdu->ubq, &data, fcmd);
> > > > > +}
> > > > > +
> > > > > +static void ublk_batch_dispatch(struct ublk_queue *ubq,
> > > > > +                               struct ublk_batch_io_data *data,
> > > > > +                               struct ublk_batch_fcmd *fcmd)
> > > > >  {
> > > > > +       struct ublk_batch_fcmd *new_fcmd;
> > > >
> > > > Is the new_fcmd variable necessary? Can fcmd be reused instead?
> > > >
> > > > > +       void *handle;
> > > > > +       bool empty;
> > > > >         int ret = 0;
> > > > >
> > > > > +again:
> > > > >         while (!ublk_io_evts_empty(ubq)) {
> > > > >                 ret = __ublk_batch_dispatch(ubq, data, fcmd);
> > > > >                 if (ret <= 0)
> > > > >                         break;
> > > > >         }
> > > > >
> > > > > -       if (ret < 0)
> > > > > -               ublk_batch_deinit_fetch_buf(data, fcmd, ret);
> > > > > +       if (ret < 0) {
> > > > > +               ublk_batch_deinit_fetch_buf(ubq, data, fcmd, ret);
> > > > > +               return;
> > > > > +       }
> > > > >
> > > > > -       return ret;
> > > > > +       handle = io_uring_cmd_ctx_handle(fcmd->cmd);
> > > > > +       __ublk_release_fcmd(ubq);
> > > > > +       /*
> > > > > +        * Order clearing ubq->active_fcmd from __ublk_release_fcmd() and
> > > > > +        * checking ubq->evts_fifo.
> > > > > +        *
> > > > > +        * The pair is the smp_mb() in __ublk_acquire_fcmd().
> > > > > +        */
> > > > > +       smp_mb();
> > > > > +       empty = ublk_io_evts_empty(ubq);
> > > > > +       if (likely(empty))
> > > >
> > > > nit: empty variable seems unnecessary
> > > >
> > > > > +               return;
> > > > > +
> > > > > +       spin_lock(&ubq->evts_lock);
> > > > > +       new_fcmd = __ublk_acquire_fcmd(ubq);
> > > > > +       spin_unlock(&ubq->evts_lock);
> > > > > +
> > > > > +       if (!new_fcmd)
> > > > > +               return;
> > > > > +       if (handle == io_uring_cmd_ctx_handle(new_fcmd->cmd)) {
> > > >
> > > > This check seems to be meant to decide whether the new and old
> > > > UBLK_U_IO_FETCH_IO_CMDS commands can execute in the same task work?
> > >
> > > Actually not.
> > >
> > > > But belonging to the same io_uring context doesn't necessarily mean
> > > > that the same task issued them. It seems like it would be safer to
> > > > always dispatch new_fcmd->cmd to task work.
> > >
> > > What matters is just that ctx->uring_lock & issue_flag matches from ublk
> > > viewpoint, so it is safe to do so.
> >
> > Okay, that makes sense.
> >
> > >
> > > However, given it is hit in slow path, so starting new dispatch
> > > is easier.
> >
> > Yeah, I'd agree it makes sense to keep the unexpected path code
> > simpler. There may also be fairness concerns from looping indefinitely
> > here if the evts_fifo continues to be nonempty, so dispatching to task
> > work seems safer.
>
> Fair enough.
>
> Thanks,
> Ming
>