Add UBLK_U_IO_FETCH_IO_CMDS command to enable efficient batch processing
of I/O requests. This multishot uring_cmd allows the ublk server to fetch
multiple I/O commands in a single operation, significantly reducing
submission overhead compared to individual FETCH_REQ* commands.
Key Design Features:
1. Multishot Operation: One UBLK_U_IO_FETCH_IO_CMDS can fetch many I/O
commands, with the batch size limited by the provided buffer length.
2. Dynamic Load Balancing: Multiple fetch commands can be submitted
simultaneously, but only one is active at any time. This enables
efficient load distribution across multiple server task contexts.
3. Implicit State Management: The implementation uses three key variables
to track state:
- evts_fifo: Queue of request tags awaiting processing
- fcmd_head: List of available fetch commands
- active_fcmd: Currently active fetch command (NULL = none active)
States are derived implicitly:
- IDLE: No fetch commands available
- READY: Fetch commands available, none active
- ACTIVE: One fetch command processing events
4. Lockless Reader Optimization: The active fetch command can read from
evts_fifo without locking (single reader guarantee), while writers
(ublk_queue_rq/ublk_queue_rqs) use evts_lock protection. The memory
barrier pairing plays key role for the single lockless reader
optimization.
Implementation Details:
- ublk_queue_rq() and ublk_queue_rqs() save request tags to evts_fifo
- __ublk_pick_active_fcmd() selects an available fetch command when
events arrive and no command is currently active
- ublk_batch_dispatch() moves tags from evts_fifo to the fetch command's
buffer and posts completion via io_uring_mshot_cmd_post_cqe()
- State transitions are coordinated via evts_lock to maintain consistency
Signed-off-by: Ming Lei <ming.lei@redhat.com>
---
drivers/block/ublk_drv.c | 412 +++++++++++++++++++++++++++++++---
include/uapi/linux/ublk_cmd.h | 7 +
2 files changed, 388 insertions(+), 31 deletions(-)
diff --git a/drivers/block/ublk_drv.c b/drivers/block/ublk_drv.c
index cc9c92d97349..2e5e392c939e 100644
--- a/drivers/block/ublk_drv.c
+++ b/drivers/block/ublk_drv.c
@@ -93,6 +93,7 @@
/* ublk batch fetch uring_cmd */
struct ublk_batch_fcmd {
+ struct list_head node;
struct io_uring_cmd *cmd;
unsigned short buf_group;
};
@@ -117,7 +118,10 @@ struct ublk_uring_cmd_pdu {
*/
struct ublk_queue *ubq;
- u16 tag;
+ union {
+ u16 tag;
+ struct ublk_batch_fcmd *fcmd; /* batch io only */
+ };
};
struct ublk_batch_io_data {
@@ -229,18 +233,36 @@ struct ublk_queue {
struct ublk_device *dev;
/*
- * Inflight ublk request tag is saved in this fifo
+ * Batch I/O State Management:
+ *
+ * The batch I/O system uses implicit state management based on the
+ * combination of three key variables below.
+ *
+ * - IDLE: list_empty(&fcmd_head) && !active_fcmd
+ * No fetch commands available, events queue in evts_fifo
+ *
+ * - READY: !list_empty(&fcmd_head) && !active_fcmd
+ * Fetch commands available but none processing events
*
- * There are multiple writer from ublk_queue_rq() or ublk_queue_rqs(),
- * so lock is required for storing request tag to fifo
+ * - ACTIVE: active_fcmd
+ * One fetch command actively processing events from evts_fifo
*
- * Make sure just one reader for fetching request from task work
- * function to ublk server, so no need to grab the lock in reader
- * side.
+ * Key Invariants:
+ * - At most one active_fcmd at any time (single reader)
+ * - active_fcmd is always from fcmd_head list when non-NULL
+ * - evts_fifo can be read locklessly by the single active reader
+ * - All state transitions require evts_lock protection
+ * - Multiple writers to evts_fifo require lock protection
*/
struct {
DECLARE_KFIFO_PTR(evts_fifo, unsigned short);
spinlock_t evts_lock;
+
+ /* List of fetch commands available to process events */
+ struct list_head fcmd_head;
+
+ /* Currently active fetch command (NULL = none active) */
+ struct ublk_batch_fcmd *active_fcmd;
}____cacheline_aligned_in_smp;
struct ublk_io ios[] __counted_by(q_depth);
@@ -292,12 +314,20 @@ static void ublk_abort_queue(struct ublk_device *ub, struct ublk_queue *ubq);
static inline struct request *__ublk_check_and_get_req(struct ublk_device *ub,
u16 q_id, u16 tag, struct ublk_io *io, size_t offset);
static inline unsigned int ublk_req_build_flags(struct request *req);
+static void ublk_batch_dispatch(struct ublk_queue *ubq,
+ struct ublk_batch_io_data *data,
+ struct ublk_batch_fcmd *fcmd);
static inline bool ublk_dev_support_batch_io(const struct ublk_device *ub)
{
return false;
}
+static inline bool ublk_support_batch_io(const struct ublk_queue *ubq)
+{
+ return false;
+}
+
static inline void ublk_io_lock(struct ublk_io *io)
{
spin_lock(&io->lock);
@@ -624,13 +654,45 @@ static wait_queue_head_t ublk_idr_wq; /* wait until one idr is freed */
static DEFINE_MUTEX(ublk_ctl_mutex);
+static struct ublk_batch_fcmd *
+ublk_batch_alloc_fcmd(struct io_uring_cmd *cmd)
+{
+ struct ublk_batch_fcmd *fcmd = kzalloc(sizeof(*fcmd), GFP_NOIO);
+
+ if (fcmd) {
+ fcmd->cmd = cmd;
+ fcmd->buf_group = READ_ONCE(cmd->sqe->buf_index);
+ }
+ return fcmd;
+}
+
+static void ublk_batch_free_fcmd(struct ublk_batch_fcmd *fcmd)
+{
+ kfree(fcmd);
+}
+
+static void __ublk_release_fcmd(struct ublk_queue *ubq)
+{
+ WRITE_ONCE(ubq->active_fcmd, NULL);
+}
-static void ublk_batch_deinit_fetch_buf(const struct ublk_batch_io_data *data,
+/*
+ * Nothing can move on, so clear ->active_fcmd, and the caller should stop
+ * dispatching
+ */
+static void ublk_batch_deinit_fetch_buf(struct ublk_queue *ubq,
+ const struct ublk_batch_io_data *data,
struct ublk_batch_fcmd *fcmd,
int res)
{
+ spin_lock(&ubq->evts_lock);
+ list_del(&fcmd->node);
+ WARN_ON_ONCE(fcmd != ubq->active_fcmd);
+ __ublk_release_fcmd(ubq);
+ spin_unlock(&ubq->evts_lock);
+
io_uring_cmd_done(fcmd->cmd, res, data->issue_flags);
- fcmd->cmd = NULL;
+ ublk_batch_free_fcmd(fcmd);
}
static int ublk_batch_fetch_post_cqe(struct ublk_batch_fcmd *fcmd,
@@ -1491,6 +1553,8 @@ static int __ublk_batch_dispatch(struct ublk_queue *ubq,
bool needs_filter;
int ret;
+ WARN_ON_ONCE(data->cmd != fcmd->cmd);
+
sel = io_uring_cmd_buffer_select(fcmd->cmd, fcmd->buf_group, &len,
data->issue_flags);
if (sel.val < 0)
@@ -1548,23 +1612,94 @@ static int __ublk_batch_dispatch(struct ublk_queue *ubq,
return ret;
}
-static __maybe_unused int
-ublk_batch_dispatch(struct ublk_queue *ubq,
- const struct ublk_batch_io_data *data,
- struct ublk_batch_fcmd *fcmd)
+static struct ublk_batch_fcmd *__ublk_acquire_fcmd(
+ struct ublk_queue *ubq)
+{
+ struct ublk_batch_fcmd *fcmd;
+
+ lockdep_assert_held(&ubq->evts_lock);
+
+ /*
+ * Ordering updating ubq->evts_fifo and checking ubq->active_fcmd.
+ *
+ * The pair is the smp_mb() in ublk_batch_dispatch().
+ *
+ * If ubq->active_fcmd is observed as non-NULL, the new added tags
+ * can be visisible in ublk_batch_dispatch() with the barrier pairing.
+ */
+ smp_mb();
+ if (READ_ONCE(ubq->active_fcmd)) {
+ fcmd = NULL;
+ } else {
+ fcmd = list_first_entry_or_null(&ubq->fcmd_head,
+ struct ublk_batch_fcmd, node);
+ WRITE_ONCE(ubq->active_fcmd, fcmd);
+ }
+ return fcmd;
+}
+
+static void ublk_batch_tw_cb(struct io_uring_cmd *cmd,
+ unsigned int issue_flags)
+{
+ struct ublk_uring_cmd_pdu *pdu = ublk_get_uring_cmd_pdu(cmd);
+ struct ublk_batch_fcmd *fcmd = pdu->fcmd;
+ struct ublk_batch_io_data data = {
+ .ub = pdu->ubq->dev,
+ .cmd = fcmd->cmd,
+ .issue_flags = issue_flags,
+ };
+
+ WARN_ON_ONCE(pdu->ubq->active_fcmd != fcmd);
+
+ ublk_batch_dispatch(pdu->ubq, &data, fcmd);
+}
+
+static void ublk_batch_dispatch(struct ublk_queue *ubq,
+ struct ublk_batch_io_data *data,
+ struct ublk_batch_fcmd *fcmd)
{
+ struct ublk_batch_fcmd *new_fcmd;
+ void *handle;
+ bool empty;
int ret = 0;
+again:
while (!ublk_io_evts_empty(ubq)) {
ret = __ublk_batch_dispatch(ubq, data, fcmd);
if (ret <= 0)
break;
}
- if (ret < 0)
- ublk_batch_deinit_fetch_buf(data, fcmd, ret);
+ if (ret < 0) {
+ ublk_batch_deinit_fetch_buf(ubq, data, fcmd, ret);
+ return;
+ }
- return ret;
+ handle = io_uring_cmd_ctx_handle(fcmd->cmd);
+ __ublk_release_fcmd(ubq);
+ /*
+ * Order clearing ubq->active_fcmd from __ublk_release_fcmd() and
+ * checking ubq->evts_fifo.
+ *
+ * The pair is the smp_mb() in __ublk_acquire_fcmd().
+ */
+ smp_mb();
+ empty = ublk_io_evts_empty(ubq);
+ if (likely(empty))
+ return;
+
+ spin_lock(&ubq->evts_lock);
+ new_fcmd = __ublk_acquire_fcmd(ubq);
+ spin_unlock(&ubq->evts_lock);
+
+ if (!new_fcmd)
+ return;
+ if (handle == io_uring_cmd_ctx_handle(new_fcmd->cmd)) {
+ data->cmd = new_fcmd->cmd;
+ fcmd = new_fcmd;
+ goto again;
+ }
+ io_uring_cmd_complete_in_task(new_fcmd->cmd, ublk_batch_tw_cb);
}
static void ublk_cmd_tw_cb(struct io_uring_cmd *cmd,
@@ -1576,13 +1711,27 @@ static void ublk_cmd_tw_cb(struct io_uring_cmd *cmd,
ublk_dispatch_req(ubq, pdu->req, issue_flags);
}
-static void ublk_queue_cmd(struct ublk_queue *ubq, struct request *rq)
+static void ublk_queue_cmd(struct ublk_queue *ubq, struct request *rq, bool last)
{
- struct io_uring_cmd *cmd = ubq->ios[rq->tag].cmd;
- struct ublk_uring_cmd_pdu *pdu = ublk_get_uring_cmd_pdu(cmd);
+ if (ublk_support_batch_io(ubq)) {
+ unsigned short tag = rq->tag;
+ struct ublk_batch_fcmd *fcmd = NULL;
- pdu->req = rq;
- io_uring_cmd_complete_in_task(cmd, ublk_cmd_tw_cb);
+ spin_lock(&ubq->evts_lock);
+ kfifo_put(&ubq->evts_fifo, tag);
+ if (last)
+ fcmd = __ublk_acquire_fcmd(ubq);
+ spin_unlock(&ubq->evts_lock);
+
+ if (fcmd)
+ io_uring_cmd_complete_in_task(fcmd->cmd, ublk_batch_tw_cb);
+ } else {
+ struct io_uring_cmd *cmd = ubq->ios[rq->tag].cmd;
+ struct ublk_uring_cmd_pdu *pdu = ublk_get_uring_cmd_pdu(cmd);
+
+ pdu->req = rq;
+ io_uring_cmd_complete_in_task(cmd, ublk_cmd_tw_cb);
+ }
}
static void ublk_cmd_list_tw_cb(struct io_uring_cmd *cmd,
@@ -1600,14 +1749,44 @@ static void ublk_cmd_list_tw_cb(struct io_uring_cmd *cmd,
} while (rq);
}
-static void ublk_queue_cmd_list(struct ublk_io *io, struct rq_list *l)
+static void ublk_batch_queue_cmd_list(struct ublk_queue *ubq, struct rq_list *l)
{
- struct io_uring_cmd *cmd = io->cmd;
- struct ublk_uring_cmd_pdu *pdu = ublk_get_uring_cmd_pdu(cmd);
+ unsigned short tags[MAX_NR_TAG];
+ struct ublk_batch_fcmd *fcmd;
+ struct request *rq;
+ unsigned cnt = 0;
+
+ spin_lock(&ubq->evts_lock);
+ rq_list_for_each(l, rq) {
+ tags[cnt++] = (unsigned short)rq->tag;
+ if (cnt >= MAX_NR_TAG) {
+ kfifo_in(&ubq->evts_fifo, tags, cnt);
+ cnt = 0;
+ }
+ }
+ if (cnt)
+ kfifo_in(&ubq->evts_fifo, tags, cnt);
+ fcmd = __ublk_acquire_fcmd(ubq);
+ spin_unlock(&ubq->evts_lock);
- pdu->req_list = rq_list_peek(l);
rq_list_init(l);
- io_uring_cmd_complete_in_task(cmd, ublk_cmd_list_tw_cb);
+ if (fcmd)
+ io_uring_cmd_complete_in_task(fcmd->cmd, ublk_batch_tw_cb);
+}
+
+static void ublk_queue_cmd_list(struct ublk_queue *ubq, struct ublk_io *io,
+ struct rq_list *l, bool batch)
+{
+ if (batch) {
+ ublk_batch_queue_cmd_list(ubq, l);
+ } else {
+ struct io_uring_cmd *cmd = io->cmd;
+ struct ublk_uring_cmd_pdu *pdu = ublk_get_uring_cmd_pdu(cmd);
+
+ pdu->req_list = rq_list_peek(l);
+ rq_list_init(l);
+ io_uring_cmd_complete_in_task(cmd, ublk_cmd_list_tw_cb);
+ }
}
static enum blk_eh_timer_return ublk_timeout(struct request *rq)
@@ -1686,7 +1865,7 @@ static blk_status_t ublk_queue_rq(struct blk_mq_hw_ctx *hctx,
return BLK_STS_OK;
}
- ublk_queue_cmd(ubq, rq);
+ ublk_queue_cmd(ubq, rq, bd->last);
return BLK_STS_OK;
}
@@ -1698,11 +1877,25 @@ static inline bool ublk_belong_to_same_batch(const struct ublk_io *io,
(io->task == io2->task);
}
-static void ublk_queue_rqs(struct rq_list *rqlist)
+static void ublk_commit_rqs(struct blk_mq_hw_ctx *hctx)
+{
+ struct ublk_queue *ubq = hctx->driver_data;
+ struct ublk_batch_fcmd *fcmd;
+
+ spin_lock(&ubq->evts_lock);
+ fcmd = __ublk_acquire_fcmd(ubq);
+ spin_unlock(&ubq->evts_lock);
+
+ if (fcmd)
+ io_uring_cmd_complete_in_task(fcmd->cmd, ublk_batch_tw_cb);
+}
+
+static void __ublk_queue_rqs(struct rq_list *rqlist, bool batch)
{
struct rq_list requeue_list = { };
struct rq_list submit_list = { };
struct ublk_io *io = NULL;
+ struct ublk_queue *ubq = NULL;
struct request *req;
while ((req = rq_list_pop(rqlist))) {
@@ -1716,16 +1909,27 @@ static void ublk_queue_rqs(struct rq_list *rqlist)
if (io && !ublk_belong_to_same_batch(io, this_io) &&
!rq_list_empty(&submit_list))
- ublk_queue_cmd_list(io, &submit_list);
+ ublk_queue_cmd_list(ubq, io, &submit_list, batch);
io = this_io;
+ ubq = this_q;
rq_list_add_tail(&submit_list, req);
}
if (!rq_list_empty(&submit_list))
- ublk_queue_cmd_list(io, &submit_list);
+ ublk_queue_cmd_list(ubq, io, &submit_list, batch);
*rqlist = requeue_list;
}
+static void ublk_queue_rqs(struct rq_list *rqlist)
+{
+ __ublk_queue_rqs(rqlist, false);
+}
+
+static void ublk_batch_queue_rqs(struct rq_list *rqlist)
+{
+ __ublk_queue_rqs(rqlist, true);
+}
+
static int ublk_init_hctx(struct blk_mq_hw_ctx *hctx, void *driver_data,
unsigned int hctx_idx)
{
@@ -1743,6 +1947,14 @@ static const struct blk_mq_ops ublk_mq_ops = {
.timeout = ublk_timeout,
};
+static const struct blk_mq_ops ublk_batch_mq_ops = {
+ .commit_rqs = ublk_commit_rqs,
+ .queue_rq = ublk_queue_rq,
+ .queue_rqs = ublk_batch_queue_rqs,
+ .init_hctx = ublk_init_hctx,
+ .timeout = ublk_timeout,
+};
+
static void ublk_queue_reinit(struct ublk_device *ub, struct ublk_queue *ubq)
{
int i;
@@ -2120,6 +2332,56 @@ static void ublk_cancel_cmd(struct ublk_queue *ubq, unsigned tag,
io_uring_cmd_done(io->cmd, UBLK_IO_RES_ABORT, issue_flags);
}
+static void ublk_batch_cancel_cmd(struct ublk_queue *ubq,
+ struct ublk_batch_fcmd *fcmd,
+ unsigned int issue_flags)
+{
+ bool done;
+
+ spin_lock(&ubq->evts_lock);
+ done = (ubq->active_fcmd != fcmd);
+ if (done)
+ list_del(&fcmd->node);
+ spin_unlock(&ubq->evts_lock);
+
+ if (done) {
+ io_uring_cmd_done(fcmd->cmd, UBLK_IO_RES_ABORT, issue_flags);
+ ublk_batch_free_fcmd(fcmd);
+ }
+}
+
+static void ublk_batch_cancel_queue(struct ublk_queue *ubq)
+{
+ LIST_HEAD(fcmd_list);
+
+ spin_lock(&ubq->evts_lock);
+ ubq->force_abort = true;
+ list_splice_init(&ubq->fcmd_head, &fcmd_list);
+ if (ubq->active_fcmd)
+ list_move(&ubq->active_fcmd->node, &ubq->fcmd_head);
+ spin_unlock(&ubq->evts_lock);
+
+ while (!list_empty(&fcmd_list)) {
+ struct ublk_batch_fcmd *fcmd = list_first_entry(&fcmd_list,
+ struct ublk_batch_fcmd, node);
+
+ ublk_batch_cancel_cmd(ubq, fcmd, IO_URING_F_UNLOCKED);
+ }
+}
+
+static void ublk_batch_cancel_fn(struct io_uring_cmd *cmd,
+ unsigned int issue_flags)
+{
+ struct ublk_uring_cmd_pdu *pdu = ublk_get_uring_cmd_pdu(cmd);
+ struct ublk_batch_fcmd *fcmd = pdu->fcmd;
+ struct ublk_queue *ubq = pdu->ubq;
+
+ if (!ubq->canceling)
+ ublk_start_cancel(ubq->dev);
+
+ ublk_batch_cancel_cmd(ubq, fcmd, issue_flags);
+}
+
/*
* The ublk char device won't be closed when calling cancel fn, so both
* ublk device and queue are guaranteed to be live
@@ -2171,6 +2433,11 @@ static void ublk_cancel_queue(struct ublk_queue *ubq)
{
int i;
+ if (ublk_support_batch_io(ubq)) {
+ ublk_batch_cancel_queue(ubq);
+ return;
+ }
+
for (i = 0; i < ubq->q_depth; i++)
ublk_cancel_cmd(ubq, i, IO_URING_F_UNLOCKED);
}
@@ -3091,6 +3358,74 @@ static int ublk_check_batch_cmd(const struct ublk_batch_io_data *data)
return ublk_check_batch_cmd_flags(uc);
}
+static int ublk_batch_attach(struct ublk_queue *ubq,
+ struct ublk_batch_io_data *data,
+ struct ublk_batch_fcmd *fcmd)
+{
+ struct ublk_batch_fcmd *new_fcmd = NULL;
+ bool free = false;
+
+ spin_lock(&ubq->evts_lock);
+ if (unlikely(ubq->force_abort || ubq->canceling)) {
+ free = true;
+ } else {
+ list_add_tail(&fcmd->node, &ubq->fcmd_head);
+ new_fcmd = __ublk_acquire_fcmd(ubq);
+ }
+ spin_unlock(&ubq->evts_lock);
+
+ /*
+ * If the two fetch commands are originated from same io_ring_ctx,
+ * run batch dispatch directly. Otherwise, schedule task work for
+ * doing it.
+ */
+ if (new_fcmd && io_uring_cmd_ctx_handle(new_fcmd->cmd) ==
+ io_uring_cmd_ctx_handle(fcmd->cmd)) {
+ data->cmd = new_fcmd->cmd;
+ ublk_batch_dispatch(ubq, data, new_fcmd);
+ } else if (new_fcmd) {
+ io_uring_cmd_complete_in_task(new_fcmd->cmd,
+ ublk_batch_tw_cb);
+ }
+
+ if (free) {
+ ublk_batch_free_fcmd(fcmd);
+ return -ENODEV;
+ }
+ return -EIOCBQUEUED;
+}
+
+static int ublk_handle_batch_fetch_cmd(struct ublk_batch_io_data *data)
+{
+ struct ublk_queue *ubq = ublk_get_queue(data->ub, data->header.q_id);
+ struct ublk_batch_fcmd *fcmd = ublk_batch_alloc_fcmd(data->cmd);
+ struct ublk_uring_cmd_pdu *pdu = ublk_get_uring_cmd_pdu(data->cmd);
+
+ if (!fcmd)
+ return -ENOMEM;
+
+ pdu->ubq = ubq;
+ pdu->fcmd = fcmd;
+ io_uring_cmd_mark_cancelable(data->cmd, data->issue_flags);
+
+ return ublk_batch_attach(ubq, data, fcmd);
+}
+
+static int ublk_validate_batch_fetch_cmd(struct ublk_batch_io_data *data,
+ const struct ublk_batch_io *uc)
+{
+ if (!(data->cmd->flags & IORING_URING_CMD_MULTISHOT))
+ return -EINVAL;
+
+ if (uc->elem_bytes != sizeof(__u16))
+ return -EINVAL;
+
+ if (uc->flags != 0)
+ return -E2BIG;
+
+ return 0;
+}
+
static int ublk_ch_batch_io_uring_cmd(struct io_uring_cmd *cmd,
unsigned int issue_flags)
{
@@ -3113,6 +3448,11 @@ static int ublk_ch_batch_io_uring_cmd(struct io_uring_cmd *cmd,
if (data.header.q_id >= ub->dev_info.nr_hw_queues)
goto out;
+ if (unlikely(issue_flags & IO_URING_F_CANCEL)) {
+ ublk_batch_cancel_fn(cmd, issue_flags);
+ return 0;
+ }
+
switch (cmd_op) {
case UBLK_U_IO_PREP_IO_CMDS:
ret = ublk_check_batch_cmd(&data);
@@ -3126,6 +3466,12 @@ static int ublk_ch_batch_io_uring_cmd(struct io_uring_cmd *cmd,
goto out;
ret = ublk_handle_batch_commit_cmd(&data);
break;
+ case UBLK_U_IO_FETCH_IO_CMDS:
+ ret = ublk_validate_batch_fetch_cmd(&data, uc);
+ if (ret)
+ goto out;
+ ret = ublk_handle_batch_fetch_cmd(&data);
+ break;
default:
ret = -EOPNOTSUPP;
}
@@ -3327,6 +3673,7 @@ static int ublk_init_queue(struct ublk_device *ub, int q_id)
ret = ublk_io_evts_init(ubq, ubq->q_depth, numa_node);
if (ret)
goto fail;
+ INIT_LIST_HEAD(&ubq->fcmd_head);
}
ub->queues[q_id] = ubq;
ubq->dev = ub;
@@ -3451,7 +3798,10 @@ static void ublk_align_max_io_size(struct ublk_device *ub)
static int ublk_add_tag_set(struct ublk_device *ub)
{
- ub->tag_set.ops = &ublk_mq_ops;
+ if (ublk_dev_support_batch_io(ub))
+ ub->tag_set.ops = &ublk_batch_mq_ops;
+ else
+ ub->tag_set.ops = &ublk_mq_ops;
ub->tag_set.nr_hw_queues = ub->dev_info.nr_hw_queues;
ub->tag_set.queue_depth = ub->dev_info.queue_depth;
ub->tag_set.numa_node = NUMA_NO_NODE;
diff --git a/include/uapi/linux/ublk_cmd.h b/include/uapi/linux/ublk_cmd.h
index 295ec8f34173..cd894c1d188e 100644
--- a/include/uapi/linux/ublk_cmd.h
+++ b/include/uapi/linux/ublk_cmd.h
@@ -120,6 +120,13 @@
#define UBLK_U_IO_COMMIT_IO_CMDS \
_IOWR('u', 0x26, struct ublk_batch_io)
+/*
+ * Fetch io commands to provided buffer in multishot style,
+ * `IORING_URING_CMD_MULTISHOT` is required for this command.
+ */
+#define UBLK_U_IO_FETCH_IO_CMDS \
+ _IOWR('u', 0x27, struct ublk_batch_io)
+
/* only ABORT means that no re-fetch */
#define UBLK_IO_RES_OK 0
#define UBLK_IO_RES_NEED_GET_DATA 1
--
2.47.0
On Thu, Nov 20, 2025 at 6:00 PM Ming Lei <ming.lei@redhat.com> wrote:
>
> Add UBLK_U_IO_FETCH_IO_CMDS command to enable efficient batch processing
> of I/O requests. This multishot uring_cmd allows the ublk server to fetch
> multiple I/O commands in a single operation, significantly reducing
> submission overhead compared to individual FETCH_REQ* commands.
>
> Key Design Features:
>
> 1. Multishot Operation: One UBLK_U_IO_FETCH_IO_CMDS can fetch many I/O
> commands, with the batch size limited by the provided buffer length.
>
> 2. Dynamic Load Balancing: Multiple fetch commands can be submitted
> simultaneously, but only one is active at any time. This enables
> efficient load distribution across multiple server task contexts.
>
> 3. Implicit State Management: The implementation uses three key variables
> to track state:
> - evts_fifo: Queue of request tags awaiting processing
> - fcmd_head: List of available fetch commands
> - active_fcmd: Currently active fetch command (NULL = none active)
>
> States are derived implicitly:
> - IDLE: No fetch commands available
> - READY: Fetch commands available, none active
> - ACTIVE: One fetch command processing events
>
> 4. Lockless Reader Optimization: The active fetch command can read from
> evts_fifo without locking (single reader guarantee), while writers
> (ublk_queue_rq/ublk_queue_rqs) use evts_lock protection. The memory
> barrier pairing plays key role for the single lockless reader
> optimization.
>
> Implementation Details:
>
> - ublk_queue_rq() and ublk_queue_rqs() save request tags to evts_fifo
> - __ublk_pick_active_fcmd() selects an available fetch command when
> events arrive and no command is currently active
What is __ublk_pick_active_fcmd()? I don't see a function with that name.
> - ublk_batch_dispatch() moves tags from evts_fifo to the fetch command's
> buffer and posts completion via io_uring_mshot_cmd_post_cqe()
> - State transitions are coordinated via evts_lock to maintain consistency
>
> Signed-off-by: Ming Lei <ming.lei@redhat.com>
> ---
> drivers/block/ublk_drv.c | 412 +++++++++++++++++++++++++++++++---
> include/uapi/linux/ublk_cmd.h | 7 +
> 2 files changed, 388 insertions(+), 31 deletions(-)
>
> diff --git a/drivers/block/ublk_drv.c b/drivers/block/ublk_drv.c
> index cc9c92d97349..2e5e392c939e 100644
> --- a/drivers/block/ublk_drv.c
> +++ b/drivers/block/ublk_drv.c
> @@ -93,6 +93,7 @@
>
> /* ublk batch fetch uring_cmd */
> struct ublk_batch_fcmd {
> + struct list_head node;
> struct io_uring_cmd *cmd;
> unsigned short buf_group;
> };
> @@ -117,7 +118,10 @@ struct ublk_uring_cmd_pdu {
> */
> struct ublk_queue *ubq;
>
> - u16 tag;
> + union {
> + u16 tag;
> + struct ublk_batch_fcmd *fcmd; /* batch io only */
> + };
> };
>
> struct ublk_batch_io_data {
> @@ -229,18 +233,36 @@ struct ublk_queue {
> struct ublk_device *dev;
>
> /*
> - * Inflight ublk request tag is saved in this fifo
> + * Batch I/O State Management:
> + *
> + * The batch I/O system uses implicit state management based on the
> + * combination of three key variables below.
> + *
> + * - IDLE: list_empty(&fcmd_head) && !active_fcmd
> + * No fetch commands available, events queue in evts_fifo
> + *
> + * - READY: !list_empty(&fcmd_head) && !active_fcmd
> + * Fetch commands available but none processing events
> *
> - * There are multiple writer from ublk_queue_rq() or ublk_queue_rqs(),
> - * so lock is required for storing request tag to fifo
> + * - ACTIVE: active_fcmd
> + * One fetch command actively processing events from evts_fifo
> *
> - * Make sure just one reader for fetching request from task work
> - * function to ublk server, so no need to grab the lock in reader
> - * side.
> + * Key Invariants:
> + * - At most one active_fcmd at any time (single reader)
> + * - active_fcmd is always from fcmd_head list when non-NULL
> + * - evts_fifo can be read locklessly by the single active reader
> + * - All state transitions require evts_lock protection
> + * - Multiple writers to evts_fifo require lock protection
> */
> struct {
> DECLARE_KFIFO_PTR(evts_fifo, unsigned short);
> spinlock_t evts_lock;
> +
> + /* List of fetch commands available to process events */
> + struct list_head fcmd_head;
> +
> + /* Currently active fetch command (NULL = none active) */
> + struct ublk_batch_fcmd *active_fcmd;
> }____cacheline_aligned_in_smp;
>
> struct ublk_io ios[] __counted_by(q_depth);
> @@ -292,12 +314,20 @@ static void ublk_abort_queue(struct ublk_device *ub, struct ublk_queue *ubq);
> static inline struct request *__ublk_check_and_get_req(struct ublk_device *ub,
> u16 q_id, u16 tag, struct ublk_io *io, size_t offset);
> static inline unsigned int ublk_req_build_flags(struct request *req);
> +static void ublk_batch_dispatch(struct ublk_queue *ubq,
> + struct ublk_batch_io_data *data,
> + struct ublk_batch_fcmd *fcmd);
>
> static inline bool ublk_dev_support_batch_io(const struct ublk_device *ub)
> {
> return false;
> }
>
> +static inline bool ublk_support_batch_io(const struct ublk_queue *ubq)
> +{
> + return false;
> +}
> +
> static inline void ublk_io_lock(struct ublk_io *io)
> {
> spin_lock(&io->lock);
> @@ -624,13 +654,45 @@ static wait_queue_head_t ublk_idr_wq; /* wait until one idr is freed */
>
> static DEFINE_MUTEX(ublk_ctl_mutex);
>
> +static struct ublk_batch_fcmd *
> +ublk_batch_alloc_fcmd(struct io_uring_cmd *cmd)
> +{
> + struct ublk_batch_fcmd *fcmd = kzalloc(sizeof(*fcmd), GFP_NOIO);
An allocation in the I/O path seems unfortunate. Is there not room to
store the struct ublk_batch_fcmd in the io_uring_cmd pdu?
> +
> + if (fcmd) {
> + fcmd->cmd = cmd;
> + fcmd->buf_group = READ_ONCE(cmd->sqe->buf_index);
Is it necessary to store sample this here just to pass it back to the
io_uring layer? Wouldn't the io_uring layer already have access to it
in struct io_kiocb's buf_index field?
> + }
> + return fcmd;
> +}
> +
> +static void ublk_batch_free_fcmd(struct ublk_batch_fcmd *fcmd)
> +{
> + kfree(fcmd);
> +}
> +
> +static void __ublk_release_fcmd(struct ublk_queue *ubq)
> +{
> + WRITE_ONCE(ubq->active_fcmd, NULL);
> +}
>
> -static void ublk_batch_deinit_fetch_buf(const struct ublk_batch_io_data *data,
> +/*
> + * Nothing can move on, so clear ->active_fcmd, and the caller should stop
> + * dispatching
> + */
> +static void ublk_batch_deinit_fetch_buf(struct ublk_queue *ubq,
> + const struct ublk_batch_io_data *data,
> struct ublk_batch_fcmd *fcmd,
> int res)
> {
> + spin_lock(&ubq->evts_lock);
> + list_del(&fcmd->node);
> + WARN_ON_ONCE(fcmd != ubq->active_fcmd);
> + __ublk_release_fcmd(ubq);
> + spin_unlock(&ubq->evts_lock);
> +
> io_uring_cmd_done(fcmd->cmd, res, data->issue_flags);
> - fcmd->cmd = NULL;
> + ublk_batch_free_fcmd(fcmd);
> }
>
> static int ublk_batch_fetch_post_cqe(struct ublk_batch_fcmd *fcmd,
> @@ -1491,6 +1553,8 @@ static int __ublk_batch_dispatch(struct ublk_queue *ubq,
> bool needs_filter;
> int ret;
>
> + WARN_ON_ONCE(data->cmd != fcmd->cmd);
> +
> sel = io_uring_cmd_buffer_select(fcmd->cmd, fcmd->buf_group, &len,
> data->issue_flags);
> if (sel.val < 0)
> @@ -1548,23 +1612,94 @@ static int __ublk_batch_dispatch(struct ublk_queue *ubq,
> return ret;
> }
>
> -static __maybe_unused int
> -ublk_batch_dispatch(struct ublk_queue *ubq,
> - const struct ublk_batch_io_data *data,
> - struct ublk_batch_fcmd *fcmd)
> +static struct ublk_batch_fcmd *__ublk_acquire_fcmd(
> + struct ublk_queue *ubq)
> +{
> + struct ublk_batch_fcmd *fcmd;
> +
> + lockdep_assert_held(&ubq->evts_lock);
> +
> + /*
> + * Ordering updating ubq->evts_fifo and checking ubq->active_fcmd.
> + *
> + * The pair is the smp_mb() in ublk_batch_dispatch().
> + *
> + * If ubq->active_fcmd is observed as non-NULL, the new added tags
> + * can be visisible in ublk_batch_dispatch() with the barrier pairing.
> + */
> + smp_mb();
> + if (READ_ONCE(ubq->active_fcmd)) {
> + fcmd = NULL;
> + } else {
> + fcmd = list_first_entry_or_null(&ubq->fcmd_head,
> + struct ublk_batch_fcmd, node);
> + WRITE_ONCE(ubq->active_fcmd, fcmd);
> + }
> + return fcmd;
> +}
> +
> +static void ublk_batch_tw_cb(struct io_uring_cmd *cmd,
> + unsigned int issue_flags)
> +{
> + struct ublk_uring_cmd_pdu *pdu = ublk_get_uring_cmd_pdu(cmd);
> + struct ublk_batch_fcmd *fcmd = pdu->fcmd;
> + struct ublk_batch_io_data data = {
> + .ub = pdu->ubq->dev,
> + .cmd = fcmd->cmd,
> + .issue_flags = issue_flags,
> + };
> +
> + WARN_ON_ONCE(pdu->ubq->active_fcmd != fcmd);
> +
> + ublk_batch_dispatch(pdu->ubq, &data, fcmd);
> +}
> +
> +static void ublk_batch_dispatch(struct ublk_queue *ubq,
> + struct ublk_batch_io_data *data,
> + struct ublk_batch_fcmd *fcmd)
> {
> + struct ublk_batch_fcmd *new_fcmd;
Is the new_fcmd variable necessary? Can fcmd be reused instead?
> + void *handle;
> + bool empty;
> int ret = 0;
>
> +again:
> while (!ublk_io_evts_empty(ubq)) {
> ret = __ublk_batch_dispatch(ubq, data, fcmd);
> if (ret <= 0)
> break;
> }
>
> - if (ret < 0)
> - ublk_batch_deinit_fetch_buf(data, fcmd, ret);
> + if (ret < 0) {
> + ublk_batch_deinit_fetch_buf(ubq, data, fcmd, ret);
> + return;
> + }
>
> - return ret;
> + handle = io_uring_cmd_ctx_handle(fcmd->cmd);
> + __ublk_release_fcmd(ubq);
> + /*
> + * Order clearing ubq->active_fcmd from __ublk_release_fcmd() and
> + * checking ubq->evts_fifo.
> + *
> + * The pair is the smp_mb() in __ublk_acquire_fcmd().
> + */
> + smp_mb();
> + empty = ublk_io_evts_empty(ubq);
> + if (likely(empty))
nit: empty variable seems unnecessary
> + return;
> +
> + spin_lock(&ubq->evts_lock);
> + new_fcmd = __ublk_acquire_fcmd(ubq);
> + spin_unlock(&ubq->evts_lock);
> +
> + if (!new_fcmd)
> + return;
> + if (handle == io_uring_cmd_ctx_handle(new_fcmd->cmd)) {
This check seems to be meant to decide whether the new and old
UBLK_U_IO_FETCH_IO_CMDS commands can execute in the same task work?
But belonging to the same io_uring context doesn't necessarily mean
that the same task issued them. It seems like it would be safer to
always dispatch new_fcmd->cmd to task work.
> + data->cmd = new_fcmd->cmd;
> + fcmd = new_fcmd;
> + goto again;
> + }
> + io_uring_cmd_complete_in_task(new_fcmd->cmd, ublk_batch_tw_cb);
> }
>
> static void ublk_cmd_tw_cb(struct io_uring_cmd *cmd,
> @@ -1576,13 +1711,27 @@ static void ublk_cmd_tw_cb(struct io_uring_cmd *cmd,
> ublk_dispatch_req(ubq, pdu->req, issue_flags);
> }
>
> -static void ublk_queue_cmd(struct ublk_queue *ubq, struct request *rq)
> +static void ublk_queue_cmd(struct ublk_queue *ubq, struct request *rq, bool last)
> {
> - struct io_uring_cmd *cmd = ubq->ios[rq->tag].cmd;
> - struct ublk_uring_cmd_pdu *pdu = ublk_get_uring_cmd_pdu(cmd);
> + if (ublk_support_batch_io(ubq)) {
> + unsigned short tag = rq->tag;
> + struct ublk_batch_fcmd *fcmd = NULL;
>
> - pdu->req = rq;
> - io_uring_cmd_complete_in_task(cmd, ublk_cmd_tw_cb);
> + spin_lock(&ubq->evts_lock);
> + kfifo_put(&ubq->evts_fifo, tag);
> + if (last)
> + fcmd = __ublk_acquire_fcmd(ubq);
> + spin_unlock(&ubq->evts_lock);
> +
> + if (fcmd)
> + io_uring_cmd_complete_in_task(fcmd->cmd, ublk_batch_tw_cb);
> + } else {
> + struct io_uring_cmd *cmd = ubq->ios[rq->tag].cmd;
> + struct ublk_uring_cmd_pdu *pdu = ublk_get_uring_cmd_pdu(cmd);
> +
> + pdu->req = rq;
> + io_uring_cmd_complete_in_task(cmd, ublk_cmd_tw_cb);
> + }
> }
>
> static void ublk_cmd_list_tw_cb(struct io_uring_cmd *cmd,
> @@ -1600,14 +1749,44 @@ static void ublk_cmd_list_tw_cb(struct io_uring_cmd *cmd,
> } while (rq);
> }
>
> -static void ublk_queue_cmd_list(struct ublk_io *io, struct rq_list *l)
> +static void ublk_batch_queue_cmd_list(struct ublk_queue *ubq, struct rq_list *l)
> {
> - struct io_uring_cmd *cmd = io->cmd;
> - struct ublk_uring_cmd_pdu *pdu = ublk_get_uring_cmd_pdu(cmd);
> + unsigned short tags[MAX_NR_TAG];
> + struct ublk_batch_fcmd *fcmd;
> + struct request *rq;
> + unsigned cnt = 0;
> +
> + spin_lock(&ubq->evts_lock);
> + rq_list_for_each(l, rq) {
> + tags[cnt++] = (unsigned short)rq->tag;
> + if (cnt >= MAX_NR_TAG) {
> + kfifo_in(&ubq->evts_fifo, tags, cnt);
> + cnt = 0;
> + }
> + }
> + if (cnt)
> + kfifo_in(&ubq->evts_fifo, tags, cnt);
> + fcmd = __ublk_acquire_fcmd(ubq);
> + spin_unlock(&ubq->evts_lock);
>
> - pdu->req_list = rq_list_peek(l);
> rq_list_init(l);
> - io_uring_cmd_complete_in_task(cmd, ublk_cmd_list_tw_cb);
> + if (fcmd)
> + io_uring_cmd_complete_in_task(fcmd->cmd, ublk_batch_tw_cb);
> +}
> +
> +static void ublk_queue_cmd_list(struct ublk_queue *ubq, struct ublk_io *io,
> + struct rq_list *l, bool batch)
> +{
> + if (batch) {
> + ublk_batch_queue_cmd_list(ubq, l);
> + } else {
> + struct io_uring_cmd *cmd = io->cmd;
> + struct ublk_uring_cmd_pdu *pdu = ublk_get_uring_cmd_pdu(cmd);
> +
> + pdu->req_list = rq_list_peek(l);
> + rq_list_init(l);
> + io_uring_cmd_complete_in_task(cmd, ublk_cmd_list_tw_cb);
> + }
> }
>
> static enum blk_eh_timer_return ublk_timeout(struct request *rq)
> @@ -1686,7 +1865,7 @@ static blk_status_t ublk_queue_rq(struct blk_mq_hw_ctx *hctx,
> return BLK_STS_OK;
> }
>
> - ublk_queue_cmd(ubq, rq);
> + ublk_queue_cmd(ubq, rq, bd->last);
> return BLK_STS_OK;
> }
>
> @@ -1698,11 +1877,25 @@ static inline bool ublk_belong_to_same_batch(const struct ublk_io *io,
> (io->task == io2->task);
> }
>
> -static void ublk_queue_rqs(struct rq_list *rqlist)
> +static void ublk_commit_rqs(struct blk_mq_hw_ctx *hctx)
> +{
> + struct ublk_queue *ubq = hctx->driver_data;
> + struct ublk_batch_fcmd *fcmd;
> +
> + spin_lock(&ubq->evts_lock);
> + fcmd = __ublk_acquire_fcmd(ubq);
> + spin_unlock(&ubq->evts_lock);
> +
> + if (fcmd)
> + io_uring_cmd_complete_in_task(fcmd->cmd, ublk_batch_tw_cb);
> +}
> +
> +static void __ublk_queue_rqs(struct rq_list *rqlist, bool batch)
> {
> struct rq_list requeue_list = { };
> struct rq_list submit_list = { };
> struct ublk_io *io = NULL;
> + struct ublk_queue *ubq = NULL;
> struct request *req;
>
> while ((req = rq_list_pop(rqlist))) {
> @@ -1716,16 +1909,27 @@ static void ublk_queue_rqs(struct rq_list *rqlist)
>
> if (io && !ublk_belong_to_same_batch(io, this_io) &&
> !rq_list_empty(&submit_list))
> - ublk_queue_cmd_list(io, &submit_list);
> + ublk_queue_cmd_list(ubq, io, &submit_list, batch);
This seems to assume that all the requests belong to the same
ublk_queue, which isn't required
> io = this_io;
> + ubq = this_q;
> rq_list_add_tail(&submit_list, req);
> }
>
> if (!rq_list_empty(&submit_list))
> - ublk_queue_cmd_list(io, &submit_list);
> + ublk_queue_cmd_list(ubq, io, &submit_list, batch);
Same here
> *rqlist = requeue_list;
> }
>
> +static void ublk_queue_rqs(struct rq_list *rqlist)
> +{
> + __ublk_queue_rqs(rqlist, false);
> +}
> +
> +static void ublk_batch_queue_rqs(struct rq_list *rqlist)
> +{
> + __ublk_queue_rqs(rqlist, true);
> +}
> +
> static int ublk_init_hctx(struct blk_mq_hw_ctx *hctx, void *driver_data,
> unsigned int hctx_idx)
> {
> @@ -1743,6 +1947,14 @@ static const struct blk_mq_ops ublk_mq_ops = {
> .timeout = ublk_timeout,
> };
>
> +static const struct blk_mq_ops ublk_batch_mq_ops = {
> + .commit_rqs = ublk_commit_rqs,
> + .queue_rq = ublk_queue_rq,
> + .queue_rqs = ublk_batch_queue_rqs,
> + .init_hctx = ublk_init_hctx,
> + .timeout = ublk_timeout,
> +};
> +
> static void ublk_queue_reinit(struct ublk_device *ub, struct ublk_queue *ubq)
> {
> int i;
> @@ -2120,6 +2332,56 @@ static void ublk_cancel_cmd(struct ublk_queue *ubq, unsigned tag,
> io_uring_cmd_done(io->cmd, UBLK_IO_RES_ABORT, issue_flags);
> }
>
> +static void ublk_batch_cancel_cmd(struct ublk_queue *ubq,
> + struct ublk_batch_fcmd *fcmd,
> + unsigned int issue_flags)
> +{
> + bool done;
> +
> + spin_lock(&ubq->evts_lock);
> + done = (ubq->active_fcmd != fcmd);
Needs to use READ_ONCE() since __ublk_release_fcmd() can be called
without holding evts_lock?
> + if (done)
> + list_del(&fcmd->node);
> + spin_unlock(&ubq->evts_lock);
> +
> + if (done) {
> + io_uring_cmd_done(fcmd->cmd, UBLK_IO_RES_ABORT, issue_flags);
> + ublk_batch_free_fcmd(fcmd);
> + }
> +}
> +
> +static void ublk_batch_cancel_queue(struct ublk_queue *ubq)
> +{
> + LIST_HEAD(fcmd_list);
> +
> + spin_lock(&ubq->evts_lock);
> + ubq->force_abort = true;
> + list_splice_init(&ubq->fcmd_head, &fcmd_list);
> + if (ubq->active_fcmd)
> + list_move(&ubq->active_fcmd->node, &ubq->fcmd_head);
Similarly, needs READ_ONCE()?
> + spin_unlock(&ubq->evts_lock);
> +
> + while (!list_empty(&fcmd_list)) {
> + struct ublk_batch_fcmd *fcmd = list_first_entry(&fcmd_list,
> + struct ublk_batch_fcmd, node);
> +
> + ublk_batch_cancel_cmd(ubq, fcmd, IO_URING_F_UNLOCKED);
> + }
> +}
> +
> +static void ublk_batch_cancel_fn(struct io_uring_cmd *cmd,
> + unsigned int issue_flags)
> +{
> + struct ublk_uring_cmd_pdu *pdu = ublk_get_uring_cmd_pdu(cmd);
> + struct ublk_batch_fcmd *fcmd = pdu->fcmd;
> + struct ublk_queue *ubq = pdu->ubq;
> +
> + if (!ubq->canceling)
Is it not racy to access ubq->canceling without any lock held?
> + ublk_start_cancel(ubq->dev);
> +
> + ublk_batch_cancel_cmd(ubq, fcmd, issue_flags);
> +}
> +
> /*
> * The ublk char device won't be closed when calling cancel fn, so both
> * ublk device and queue are guaranteed to be live
> @@ -2171,6 +2433,11 @@ static void ublk_cancel_queue(struct ublk_queue *ubq)
> {
> int i;
>
> + if (ublk_support_batch_io(ubq)) {
> + ublk_batch_cancel_queue(ubq);
> + return;
> + }
> +
> for (i = 0; i < ubq->q_depth; i++)
> ublk_cancel_cmd(ubq, i, IO_URING_F_UNLOCKED);
> }
> @@ -3091,6 +3358,74 @@ static int ublk_check_batch_cmd(const struct ublk_batch_io_data *data)
> return ublk_check_batch_cmd_flags(uc);
> }
>
> +static int ublk_batch_attach(struct ublk_queue *ubq,
> + struct ublk_batch_io_data *data,
> + struct ublk_batch_fcmd *fcmd)
> +{
> + struct ublk_batch_fcmd *new_fcmd = NULL;
> + bool free = false;
> +
> + spin_lock(&ubq->evts_lock);
> + if (unlikely(ubq->force_abort || ubq->canceling)) {
> + free = true;
> + } else {
> + list_add_tail(&fcmd->node, &ubq->fcmd_head);
> + new_fcmd = __ublk_acquire_fcmd(ubq);
> + }
> + spin_unlock(&ubq->evts_lock);
> +
> + /*
> + * If the two fetch commands are originated from same io_ring_ctx,
> + * run batch dispatch directly. Otherwise, schedule task work for
> + * doing it.
> + */
> + if (new_fcmd && io_uring_cmd_ctx_handle(new_fcmd->cmd) ==
> + io_uring_cmd_ctx_handle(fcmd->cmd)) {
> + data->cmd = new_fcmd->cmd;
> + ublk_batch_dispatch(ubq, data, new_fcmd);
> + } else if (new_fcmd) {
> + io_uring_cmd_complete_in_task(new_fcmd->cmd,
> + ublk_batch_tw_cb);
> + }
Return early if (!new_fcmd) to reduce indentation?
> +
> + if (free) {
> + ublk_batch_free_fcmd(fcmd);
> + return -ENODEV;
> + }
Move the if (free) check directly after spin_unlock(&ubq->evts_lock)?
> + return -EIOCBQUEUED;
> +}
> +
> +static int ublk_handle_batch_fetch_cmd(struct ublk_batch_io_data *data)
> +{
> + struct ublk_queue *ubq = ublk_get_queue(data->ub, data->header.q_id);
> + struct ublk_batch_fcmd *fcmd = ublk_batch_alloc_fcmd(data->cmd);
> + struct ublk_uring_cmd_pdu *pdu = ublk_get_uring_cmd_pdu(data->cmd);
> +
> + if (!fcmd)
> + return -ENOMEM;
> +
> + pdu->ubq = ubq;
> + pdu->fcmd = fcmd;
> + io_uring_cmd_mark_cancelable(data->cmd, data->issue_flags);
> +
> + return ublk_batch_attach(ubq, data, fcmd);
> +}
> +
> +static int ublk_validate_batch_fetch_cmd(struct ublk_batch_io_data *data,
> + const struct ublk_batch_io *uc)
> +{
> + if (!(data->cmd->flags & IORING_URING_CMD_MULTISHOT))
> + return -EINVAL;
> +
> + if (uc->elem_bytes != sizeof(__u16))
> + return -EINVAL;
> +
> + if (uc->flags != 0)
> + return -E2BIG;
> +
> + return 0;
> +}
> +
> static int ublk_ch_batch_io_uring_cmd(struct io_uring_cmd *cmd,
> unsigned int issue_flags)
> {
> @@ -3113,6 +3448,11 @@ static int ublk_ch_batch_io_uring_cmd(struct io_uring_cmd *cmd,
> if (data.header.q_id >= ub->dev_info.nr_hw_queues)
> goto out;
>
> + if (unlikely(issue_flags & IO_URING_F_CANCEL)) {
> + ublk_batch_cancel_fn(cmd, issue_flags);
> + return 0;
> + }
Move this to the top of the function before the other logic that's not
necessary in the cancel case?
Best,
Caleb
> +
> switch (cmd_op) {
> case UBLK_U_IO_PREP_IO_CMDS:
> ret = ublk_check_batch_cmd(&data);
> @@ -3126,6 +3466,12 @@ static int ublk_ch_batch_io_uring_cmd(struct io_uring_cmd *cmd,
> goto out;
> ret = ublk_handle_batch_commit_cmd(&data);
> break;
> + case UBLK_U_IO_FETCH_IO_CMDS:
> + ret = ublk_validate_batch_fetch_cmd(&data, uc);
> + if (ret)
> + goto out;
> + ret = ublk_handle_batch_fetch_cmd(&data);
> + break;
> default:
> ret = -EOPNOTSUPP;
> }
> @@ -3327,6 +3673,7 @@ static int ublk_init_queue(struct ublk_device *ub, int q_id)
> ret = ublk_io_evts_init(ubq, ubq->q_depth, numa_node);
> if (ret)
> goto fail;
> + INIT_LIST_HEAD(&ubq->fcmd_head);
> }
> ub->queues[q_id] = ubq;
> ubq->dev = ub;
> @@ -3451,7 +3798,10 @@ static void ublk_align_max_io_size(struct ublk_device *ub)
>
> static int ublk_add_tag_set(struct ublk_device *ub)
> {
> - ub->tag_set.ops = &ublk_mq_ops;
> + if (ublk_dev_support_batch_io(ub))
> + ub->tag_set.ops = &ublk_batch_mq_ops;
> + else
> + ub->tag_set.ops = &ublk_mq_ops;
> ub->tag_set.nr_hw_queues = ub->dev_info.nr_hw_queues;
> ub->tag_set.queue_depth = ub->dev_info.queue_depth;
> ub->tag_set.numa_node = NUMA_NO_NODE;
> diff --git a/include/uapi/linux/ublk_cmd.h b/include/uapi/linux/ublk_cmd.h
> index 295ec8f34173..cd894c1d188e 100644
> --- a/include/uapi/linux/ublk_cmd.h
> +++ b/include/uapi/linux/ublk_cmd.h
> @@ -120,6 +120,13 @@
> #define UBLK_U_IO_COMMIT_IO_CMDS \
> _IOWR('u', 0x26, struct ublk_batch_io)
>
> +/*
> + * Fetch io commands to provided buffer in multishot style,
> + * `IORING_URING_CMD_MULTISHOT` is required for this command.
> + */
> +#define UBLK_U_IO_FETCH_IO_CMDS \
> + _IOWR('u', 0x27, struct ublk_batch_io)
> +
> /* only ABORT means that no re-fetch */
> #define UBLK_IO_RES_OK 0
> #define UBLK_IO_RES_NEED_GET_DATA 1
> --
> 2.47.0
>
On Sun, Nov 30, 2025 at 09:55:47PM -0800, Caleb Sander Mateos wrote:
> On Thu, Nov 20, 2025 at 6:00 PM Ming Lei <ming.lei@redhat.com> wrote:
> >
> > Add UBLK_U_IO_FETCH_IO_CMDS command to enable efficient batch processing
> > of I/O requests. This multishot uring_cmd allows the ublk server to fetch
> > multiple I/O commands in a single operation, significantly reducing
> > submission overhead compared to individual FETCH_REQ* commands.
> >
> > Key Design Features:
> >
> > 1. Multishot Operation: One UBLK_U_IO_FETCH_IO_CMDS can fetch many I/O
> > commands, with the batch size limited by the provided buffer length.
> >
> > 2. Dynamic Load Balancing: Multiple fetch commands can be submitted
> > simultaneously, but only one is active at any time. This enables
> > efficient load distribution across multiple server task contexts.
> >
> > 3. Implicit State Management: The implementation uses three key variables
> > to track state:
> > - evts_fifo: Queue of request tags awaiting processing
> > - fcmd_head: List of available fetch commands
> > - active_fcmd: Currently active fetch command (NULL = none active)
> >
> > States are derived implicitly:
> > - IDLE: No fetch commands available
> > - READY: Fetch commands available, none active
> > - ACTIVE: One fetch command processing events
> >
> > 4. Lockless Reader Optimization: The active fetch command can read from
> > evts_fifo without locking (single reader guarantee), while writers
> > (ublk_queue_rq/ublk_queue_rqs) use evts_lock protection. The memory
> > barrier pairing plays key role for the single lockless reader
> > optimization.
> >
> > Implementation Details:
> >
> > - ublk_queue_rq() and ublk_queue_rqs() save request tags to evts_fifo
> > - __ublk_pick_active_fcmd() selects an available fetch command when
> > events arrive and no command is currently active
>
> What is __ublk_pick_active_fcmd()? I don't see a function with that name.
It is renamed as __ublk_acquire_fcmd(), and its counter pair is
__ublk_release_fcmd().
>
> > - ublk_batch_dispatch() moves tags from evts_fifo to the fetch command's
> > buffer and posts completion via io_uring_mshot_cmd_post_cqe()
> > - State transitions are coordinated via evts_lock to maintain consistency
> >
> > Signed-off-by: Ming Lei <ming.lei@redhat.com>
> > ---
> > drivers/block/ublk_drv.c | 412 +++++++++++++++++++++++++++++++---
> > include/uapi/linux/ublk_cmd.h | 7 +
> > 2 files changed, 388 insertions(+), 31 deletions(-)
> >
> > diff --git a/drivers/block/ublk_drv.c b/drivers/block/ublk_drv.c
> > index cc9c92d97349..2e5e392c939e 100644
> > --- a/drivers/block/ublk_drv.c
> > +++ b/drivers/block/ublk_drv.c
> > @@ -93,6 +93,7 @@
> >
> > /* ublk batch fetch uring_cmd */
> > struct ublk_batch_fcmd {
> > + struct list_head node;
> > struct io_uring_cmd *cmd;
> > unsigned short buf_group;
> > };
> > @@ -117,7 +118,10 @@ struct ublk_uring_cmd_pdu {
> > */
> > struct ublk_queue *ubq;
> >
> > - u16 tag;
> > + union {
> > + u16 tag;
> > + struct ublk_batch_fcmd *fcmd; /* batch io only */
> > + };
> > };
> >
> > struct ublk_batch_io_data {
> > @@ -229,18 +233,36 @@ struct ublk_queue {
> > struct ublk_device *dev;
> >
> > /*
> > - * Inflight ublk request tag is saved in this fifo
> > + * Batch I/O State Management:
> > + *
> > + * The batch I/O system uses implicit state management based on the
> > + * combination of three key variables below.
> > + *
> > + * - IDLE: list_empty(&fcmd_head) && !active_fcmd
> > + * No fetch commands available, events queue in evts_fifo
> > + *
> > + * - READY: !list_empty(&fcmd_head) && !active_fcmd
> > + * Fetch commands available but none processing events
> > *
> > - * There are multiple writer from ublk_queue_rq() or ublk_queue_rqs(),
> > - * so lock is required for storing request tag to fifo
> > + * - ACTIVE: active_fcmd
> > + * One fetch command actively processing events from evts_fifo
> > *
> > - * Make sure just one reader for fetching request from task work
> > - * function to ublk server, so no need to grab the lock in reader
> > - * side.
> > + * Key Invariants:
> > + * - At most one active_fcmd at any time (single reader)
> > + * - active_fcmd is always from fcmd_head list when non-NULL
> > + * - evts_fifo can be read locklessly by the single active reader
> > + * - All state transitions require evts_lock protection
> > + * - Multiple writers to evts_fifo require lock protection
> > */
> > struct {
> > DECLARE_KFIFO_PTR(evts_fifo, unsigned short);
> > spinlock_t evts_lock;
> > +
> > + /* List of fetch commands available to process events */
> > + struct list_head fcmd_head;
> > +
> > + /* Currently active fetch command (NULL = none active) */
> > + struct ublk_batch_fcmd *active_fcmd;
> > }____cacheline_aligned_in_smp;
> >
> > struct ublk_io ios[] __counted_by(q_depth);
> > @@ -292,12 +314,20 @@ static void ublk_abort_queue(struct ublk_device *ub, struct ublk_queue *ubq);
> > static inline struct request *__ublk_check_and_get_req(struct ublk_device *ub,
> > u16 q_id, u16 tag, struct ublk_io *io, size_t offset);
> > static inline unsigned int ublk_req_build_flags(struct request *req);
> > +static void ublk_batch_dispatch(struct ublk_queue *ubq,
> > + struct ublk_batch_io_data *data,
> > + struct ublk_batch_fcmd *fcmd);
> >
> > static inline bool ublk_dev_support_batch_io(const struct ublk_device *ub)
> > {
> > return false;
> > }
> >
> > +static inline bool ublk_support_batch_io(const struct ublk_queue *ubq)
> > +{
> > + return false;
> > +}
> > +
> > static inline void ublk_io_lock(struct ublk_io *io)
> > {
> > spin_lock(&io->lock);
> > @@ -624,13 +654,45 @@ static wait_queue_head_t ublk_idr_wq; /* wait until one idr is freed */
> >
> > static DEFINE_MUTEX(ublk_ctl_mutex);
> >
> > +static struct ublk_batch_fcmd *
> > +ublk_batch_alloc_fcmd(struct io_uring_cmd *cmd)
> > +{
> > + struct ublk_batch_fcmd *fcmd = kzalloc(sizeof(*fcmd), GFP_NOIO);
>
> An allocation in the I/O path seems unfortunate. Is there not room to
> store the struct ublk_batch_fcmd in the io_uring_cmd pdu?
It is allocated once for one mshot request, which covers many IOs.
It can't be held in uring_cmd pdu, but the allocation can be optimized in
future. Not a big deal in enablement stage.
> > +
> > + if (fcmd) {
> > + fcmd->cmd = cmd;
> > + fcmd->buf_group = READ_ONCE(cmd->sqe->buf_index);
>
> Is it necessary to store sample this here just to pass it back to the
> io_uring layer? Wouldn't the io_uring layer already have access to it
> in struct io_kiocb's buf_index field?
->buf_group is used by io_uring_cmd_buffer_select(), and this way also
follows ->buf_index uses in both io_uring/net.c and io_uring/rw.c.
More importantly req->buf_index is used in io_uring/kbuf.c internally, see
io_ring_buffer_select(), so we can't reuse req->buf_index here.
>
> > + }
> > + return fcmd;
> > +}
> > +
> > +static void ublk_batch_free_fcmd(struct ublk_batch_fcmd *fcmd)
> > +{
> > + kfree(fcmd);
> > +}
> > +
> > +static void __ublk_release_fcmd(struct ublk_queue *ubq)
> > +{
> > + WRITE_ONCE(ubq->active_fcmd, NULL);
> > +}
> >
> > -static void ublk_batch_deinit_fetch_buf(const struct ublk_batch_io_data *data,
> > +/*
> > + * Nothing can move on, so clear ->active_fcmd, and the caller should stop
> > + * dispatching
> > + */
> > +static void ublk_batch_deinit_fetch_buf(struct ublk_queue *ubq,
> > + const struct ublk_batch_io_data *data,
> > struct ublk_batch_fcmd *fcmd,
> > int res)
> > {
> > + spin_lock(&ubq->evts_lock);
> > + list_del(&fcmd->node);
> > + WARN_ON_ONCE(fcmd != ubq->active_fcmd);
> > + __ublk_release_fcmd(ubq);
> > + spin_unlock(&ubq->evts_lock);
> > +
> > io_uring_cmd_done(fcmd->cmd, res, data->issue_flags);
> > - fcmd->cmd = NULL;
> > + ublk_batch_free_fcmd(fcmd);
> > }
> >
> > static int ublk_batch_fetch_post_cqe(struct ublk_batch_fcmd *fcmd,
> > @@ -1491,6 +1553,8 @@ static int __ublk_batch_dispatch(struct ublk_queue *ubq,
> > bool needs_filter;
> > int ret;
> >
> > + WARN_ON_ONCE(data->cmd != fcmd->cmd);
> > +
> > sel = io_uring_cmd_buffer_select(fcmd->cmd, fcmd->buf_group, &len,
> > data->issue_flags);
> > if (sel.val < 0)
> > @@ -1548,23 +1612,94 @@ static int __ublk_batch_dispatch(struct ublk_queue *ubq,
> > return ret;
> > }
> >
> > -static __maybe_unused int
> > -ublk_batch_dispatch(struct ublk_queue *ubq,
> > - const struct ublk_batch_io_data *data,
> > - struct ublk_batch_fcmd *fcmd)
> > +static struct ublk_batch_fcmd *__ublk_acquire_fcmd(
> > + struct ublk_queue *ubq)
> > +{
> > + struct ublk_batch_fcmd *fcmd;
> > +
> > + lockdep_assert_held(&ubq->evts_lock);
> > +
> > + /*
> > + * Ordering updating ubq->evts_fifo and checking ubq->active_fcmd.
> > + *
> > + * The pair is the smp_mb() in ublk_batch_dispatch().
> > + *
> > + * If ubq->active_fcmd is observed as non-NULL, the new added tags
> > + * can be visisible in ublk_batch_dispatch() with the barrier pairing.
> > + */
> > + smp_mb();
> > + if (READ_ONCE(ubq->active_fcmd)) {
> > + fcmd = NULL;
> > + } else {
> > + fcmd = list_first_entry_or_null(&ubq->fcmd_head,
> > + struct ublk_batch_fcmd, node);
> > + WRITE_ONCE(ubq->active_fcmd, fcmd);
> > + }
> > + return fcmd;
> > +}
> > +
> > +static void ublk_batch_tw_cb(struct io_uring_cmd *cmd,
> > + unsigned int issue_flags)
> > +{
> > + struct ublk_uring_cmd_pdu *pdu = ublk_get_uring_cmd_pdu(cmd);
> > + struct ublk_batch_fcmd *fcmd = pdu->fcmd;
> > + struct ublk_batch_io_data data = {
> > + .ub = pdu->ubq->dev,
> > + .cmd = fcmd->cmd,
> > + .issue_flags = issue_flags,
> > + };
> > +
> > + WARN_ON_ONCE(pdu->ubq->active_fcmd != fcmd);
> > +
> > + ublk_batch_dispatch(pdu->ubq, &data, fcmd);
> > +}
> > +
> > +static void ublk_batch_dispatch(struct ublk_queue *ubq,
> > + struct ublk_batch_io_data *data,
> > + struct ublk_batch_fcmd *fcmd)
> > {
> > + struct ublk_batch_fcmd *new_fcmd;
>
> Is the new_fcmd variable necessary? Can fcmd be reused instead?
>
> > + void *handle;
> > + bool empty;
> > int ret = 0;
> >
> > +again:
> > while (!ublk_io_evts_empty(ubq)) {
> > ret = __ublk_batch_dispatch(ubq, data, fcmd);
> > if (ret <= 0)
> > break;
> > }
> >
> > - if (ret < 0)
> > - ublk_batch_deinit_fetch_buf(data, fcmd, ret);
> > + if (ret < 0) {
> > + ublk_batch_deinit_fetch_buf(ubq, data, fcmd, ret);
> > + return;
> > + }
> >
> > - return ret;
> > + handle = io_uring_cmd_ctx_handle(fcmd->cmd);
> > + __ublk_release_fcmd(ubq);
> > + /*
> > + * Order clearing ubq->active_fcmd from __ublk_release_fcmd() and
> > + * checking ubq->evts_fifo.
> > + *
> > + * The pair is the smp_mb() in __ublk_acquire_fcmd().
> > + */
> > + smp_mb();
> > + empty = ublk_io_evts_empty(ubq);
> > + if (likely(empty))
>
> nit: empty variable seems unnecessary
>
> > + return;
> > +
> > + spin_lock(&ubq->evts_lock);
> > + new_fcmd = __ublk_acquire_fcmd(ubq);
> > + spin_unlock(&ubq->evts_lock);
> > +
> > + if (!new_fcmd)
> > + return;
> > + if (handle == io_uring_cmd_ctx_handle(new_fcmd->cmd)) {
>
> This check seems to be meant to decide whether the new and old
> UBLK_U_IO_FETCH_IO_CMDS commands can execute in the same task work?
Actually not.
> But belonging to the same io_uring context doesn't necessarily mean
> that the same task issued them. It seems like it would be safer to
> always dispatch new_fcmd->cmd to task work.
What matters is just that ctx->uring_lock & issue_flag matches from ublk
viewpoint, so it is safe to do so.
However, given it is hit in slow path, so starting new dispatch
is easier.
>
> > + data->cmd = new_fcmd->cmd;
> > + fcmd = new_fcmd;
> > + goto again;
> > + }
> > + io_uring_cmd_complete_in_task(new_fcmd->cmd, ublk_batch_tw_cb);
> > }
> >
> > static void ublk_cmd_tw_cb(struct io_uring_cmd *cmd,
> > @@ -1576,13 +1711,27 @@ static void ublk_cmd_tw_cb(struct io_uring_cmd *cmd,
> > ublk_dispatch_req(ubq, pdu->req, issue_flags);
> > }
> >
> > -static void ublk_queue_cmd(struct ublk_queue *ubq, struct request *rq)
> > +static void ublk_queue_cmd(struct ublk_queue *ubq, struct request *rq, bool last)
> > {
> > - struct io_uring_cmd *cmd = ubq->ios[rq->tag].cmd;
> > - struct ublk_uring_cmd_pdu *pdu = ublk_get_uring_cmd_pdu(cmd);
> > + if (ublk_support_batch_io(ubq)) {
> > + unsigned short tag = rq->tag;
> > + struct ublk_batch_fcmd *fcmd = NULL;
> >
> > - pdu->req = rq;
> > - io_uring_cmd_complete_in_task(cmd, ublk_cmd_tw_cb);
> > + spin_lock(&ubq->evts_lock);
> > + kfifo_put(&ubq->evts_fifo, tag);
> > + if (last)
> > + fcmd = __ublk_acquire_fcmd(ubq);
> > + spin_unlock(&ubq->evts_lock);
> > +
> > + if (fcmd)
> > + io_uring_cmd_complete_in_task(fcmd->cmd, ublk_batch_tw_cb);
> > + } else {
> > + struct io_uring_cmd *cmd = ubq->ios[rq->tag].cmd;
> > + struct ublk_uring_cmd_pdu *pdu = ublk_get_uring_cmd_pdu(cmd);
> > +
> > + pdu->req = rq;
> > + io_uring_cmd_complete_in_task(cmd, ublk_cmd_tw_cb);
> > + }
> > }
> >
> > static void ublk_cmd_list_tw_cb(struct io_uring_cmd *cmd,
> > @@ -1600,14 +1749,44 @@ static void ublk_cmd_list_tw_cb(struct io_uring_cmd *cmd,
> > } while (rq);
> > }
> >
> > -static void ublk_queue_cmd_list(struct ublk_io *io, struct rq_list *l)
> > +static void ublk_batch_queue_cmd_list(struct ublk_queue *ubq, struct rq_list *l)
> > {
> > - struct io_uring_cmd *cmd = io->cmd;
> > - struct ublk_uring_cmd_pdu *pdu = ublk_get_uring_cmd_pdu(cmd);
> > + unsigned short tags[MAX_NR_TAG];
> > + struct ublk_batch_fcmd *fcmd;
> > + struct request *rq;
> > + unsigned cnt = 0;
> > +
> > + spin_lock(&ubq->evts_lock);
> > + rq_list_for_each(l, rq) {
> > + tags[cnt++] = (unsigned short)rq->tag;
> > + if (cnt >= MAX_NR_TAG) {
> > + kfifo_in(&ubq->evts_fifo, tags, cnt);
> > + cnt = 0;
> > + }
> > + }
> > + if (cnt)
> > + kfifo_in(&ubq->evts_fifo, tags, cnt);
> > + fcmd = __ublk_acquire_fcmd(ubq);
> > + spin_unlock(&ubq->evts_lock);
> >
> > - pdu->req_list = rq_list_peek(l);
> > rq_list_init(l);
> > - io_uring_cmd_complete_in_task(cmd, ublk_cmd_list_tw_cb);
> > + if (fcmd)
> > + io_uring_cmd_complete_in_task(fcmd->cmd, ublk_batch_tw_cb);
> > +}
> > +
> > +static void ublk_queue_cmd_list(struct ublk_queue *ubq, struct ublk_io *io,
> > + struct rq_list *l, bool batch)
> > +{
> > + if (batch) {
> > + ublk_batch_queue_cmd_list(ubq, l);
> > + } else {
> > + struct io_uring_cmd *cmd = io->cmd;
> > + struct ublk_uring_cmd_pdu *pdu = ublk_get_uring_cmd_pdu(cmd);
> > +
> > + pdu->req_list = rq_list_peek(l);
> > + rq_list_init(l);
> > + io_uring_cmd_complete_in_task(cmd, ublk_cmd_list_tw_cb);
> > + }
> > }
> >
> > static enum blk_eh_timer_return ublk_timeout(struct request *rq)
> > @@ -1686,7 +1865,7 @@ static blk_status_t ublk_queue_rq(struct blk_mq_hw_ctx *hctx,
> > return BLK_STS_OK;
> > }
> >
> > - ublk_queue_cmd(ubq, rq);
> > + ublk_queue_cmd(ubq, rq, bd->last);
> > return BLK_STS_OK;
> > }
> >
> > @@ -1698,11 +1877,25 @@ static inline bool ublk_belong_to_same_batch(const struct ublk_io *io,
> > (io->task == io2->task);
> > }
> >
> > -static void ublk_queue_rqs(struct rq_list *rqlist)
> > +static void ublk_commit_rqs(struct blk_mq_hw_ctx *hctx)
> > +{
> > + struct ublk_queue *ubq = hctx->driver_data;
> > + struct ublk_batch_fcmd *fcmd;
> > +
> > + spin_lock(&ubq->evts_lock);
> > + fcmd = __ublk_acquire_fcmd(ubq);
> > + spin_unlock(&ubq->evts_lock);
> > +
> > + if (fcmd)
> > + io_uring_cmd_complete_in_task(fcmd->cmd, ublk_batch_tw_cb);
> > +}
> > +
> > +static void __ublk_queue_rqs(struct rq_list *rqlist, bool batch)
> > {
> > struct rq_list requeue_list = { };
> > struct rq_list submit_list = { };
> > struct ublk_io *io = NULL;
> > + struct ublk_queue *ubq = NULL;
> > struct request *req;
> >
> > while ((req = rq_list_pop(rqlist))) {
> > @@ -1716,16 +1909,27 @@ static void ublk_queue_rqs(struct rq_list *rqlist)
> >
> > if (io && !ublk_belong_to_same_batch(io, this_io) &&
> > !rq_list_empty(&submit_list))
> > - ublk_queue_cmd_list(io, &submit_list);
> > + ublk_queue_cmd_list(ubq, io, &submit_list, batch);
>
> This seems to assume that all the requests belong to the same
> ublk_queue, which isn't required
Here, it is required for BATCH_IO, which needs new __ublk_queue_rqs()
implementation now.
Nice catch!
>
> > io = this_io;
> > + ubq = this_q;
> > rq_list_add_tail(&submit_list, req);
> > }
> >
> > if (!rq_list_empty(&submit_list))
> > - ublk_queue_cmd_list(io, &submit_list);
> > + ublk_queue_cmd_list(ubq, io, &submit_list, batch);
>
> Same here
>
> > *rqlist = requeue_list;
> > }
> >
> > +static void ublk_queue_rqs(struct rq_list *rqlist)
> > +{
> > + __ublk_queue_rqs(rqlist, false);
> > +}
> > +
> > +static void ublk_batch_queue_rqs(struct rq_list *rqlist)
> > +{
> > + __ublk_queue_rqs(rqlist, true);
> > +}
> > +
> > static int ublk_init_hctx(struct blk_mq_hw_ctx *hctx, void *driver_data,
> > unsigned int hctx_idx)
> > {
> > @@ -1743,6 +1947,14 @@ static const struct blk_mq_ops ublk_mq_ops = {
> > .timeout = ublk_timeout,
> > };
> >
> > +static const struct blk_mq_ops ublk_batch_mq_ops = {
> > + .commit_rqs = ublk_commit_rqs,
> > + .queue_rq = ublk_queue_rq,
> > + .queue_rqs = ublk_batch_queue_rqs,
> > + .init_hctx = ublk_init_hctx,
> > + .timeout = ublk_timeout,
> > +};
> > +
> > static void ublk_queue_reinit(struct ublk_device *ub, struct ublk_queue *ubq)
> > {
> > int i;
> > @@ -2120,6 +2332,56 @@ static void ublk_cancel_cmd(struct ublk_queue *ubq, unsigned tag,
> > io_uring_cmd_done(io->cmd, UBLK_IO_RES_ABORT, issue_flags);
> > }
> >
> > +static void ublk_batch_cancel_cmd(struct ublk_queue *ubq,
> > + struct ublk_batch_fcmd *fcmd,
> > + unsigned int issue_flags)
> > +{
> > + bool done;
> > +
> > + spin_lock(&ubq->evts_lock);
> > + done = (ubq->active_fcmd != fcmd);
>
> Needs to use READ_ONCE() since __ublk_release_fcmd() can be called
> without holding evts_lock?
OK.
>
> > + if (done)
> > + list_del(&fcmd->node);
> > + spin_unlock(&ubq->evts_lock);
> > +
> > + if (done) {
> > + io_uring_cmd_done(fcmd->cmd, UBLK_IO_RES_ABORT, issue_flags);
> > + ublk_batch_free_fcmd(fcmd);
> > + }
> > +}
> > +
> > +static void ublk_batch_cancel_queue(struct ublk_queue *ubq)
> > +{
> > + LIST_HEAD(fcmd_list);
> > +
> > + spin_lock(&ubq->evts_lock);
> > + ubq->force_abort = true;
> > + list_splice_init(&ubq->fcmd_head, &fcmd_list);
> > + if (ubq->active_fcmd)
> > + list_move(&ubq->active_fcmd->node, &ubq->fcmd_head);
>
> Similarly, needs READ_ONCE()?
OK.
But this one may not be necessary, since now everything is just quiesced,
and the lockless code path won't hit any more.
>
> > + spin_unlock(&ubq->evts_lock);
> > +
> > + while (!list_empty(&fcmd_list)) {
> > + struct ublk_batch_fcmd *fcmd = list_first_entry(&fcmd_list,
> > + struct ublk_batch_fcmd, node);
> > +
> > + ublk_batch_cancel_cmd(ubq, fcmd, IO_URING_F_UNLOCKED);
> > + }
> > +}
> > +
> > +static void ublk_batch_cancel_fn(struct io_uring_cmd *cmd,
> > + unsigned int issue_flags)
> > +{
> > + struct ublk_uring_cmd_pdu *pdu = ublk_get_uring_cmd_pdu(cmd);
> > + struct ublk_batch_fcmd *fcmd = pdu->fcmd;
> > + struct ublk_queue *ubq = pdu->ubq;
> > +
> > + if (!ubq->canceling)
>
> Is it not racy to access ubq->canceling without any lock held?
OK, will switch to call ublk_start_cancel() unconditionally.
>
> > + ublk_start_cancel(ubq->dev);
> > +
> > + ublk_batch_cancel_cmd(ubq, fcmd, issue_flags);
> > +}
> > +
> > /*
> > * The ublk char device won't be closed when calling cancel fn, so both
> > * ublk device and queue are guaranteed to be live
> > @@ -2171,6 +2433,11 @@ static void ublk_cancel_queue(struct ublk_queue *ubq)
> > {
> > int i;
> >
> > + if (ublk_support_batch_io(ubq)) {
> > + ublk_batch_cancel_queue(ubq);
> > + return;
> > + }
> > +
> > for (i = 0; i < ubq->q_depth; i++)
> > ublk_cancel_cmd(ubq, i, IO_URING_F_UNLOCKED);
> > }
> > @@ -3091,6 +3358,74 @@ static int ublk_check_batch_cmd(const struct ublk_batch_io_data *data)
> > return ublk_check_batch_cmd_flags(uc);
> > }
> >
> > +static int ublk_batch_attach(struct ublk_queue *ubq,
> > + struct ublk_batch_io_data *data,
> > + struct ublk_batch_fcmd *fcmd)
> > +{
> > + struct ublk_batch_fcmd *new_fcmd = NULL;
> > + bool free = false;
> > +
> > + spin_lock(&ubq->evts_lock);
> > + if (unlikely(ubq->force_abort || ubq->canceling)) {
> > + free = true;
> > + } else {
> > + list_add_tail(&fcmd->node, &ubq->fcmd_head);
> > + new_fcmd = __ublk_acquire_fcmd(ubq);
> > + }
> > + spin_unlock(&ubq->evts_lock);
> > +
> > + /*
> > + * If the two fetch commands are originated from same io_ring_ctx,
> > + * run batch dispatch directly. Otherwise, schedule task work for
> > + * doing it.
> > + */
> > + if (new_fcmd && io_uring_cmd_ctx_handle(new_fcmd->cmd) ==
> > + io_uring_cmd_ctx_handle(fcmd->cmd)) {
> > + data->cmd = new_fcmd->cmd;
> > + ublk_batch_dispatch(ubq, data, new_fcmd);
> > + } else if (new_fcmd) {
> > + io_uring_cmd_complete_in_task(new_fcmd->cmd,
> > + ublk_batch_tw_cb);
> > + }
>
> Return early if (!new_fcmd) to reduce indentation?
>
> > +
> > + if (free) {
> > + ublk_batch_free_fcmd(fcmd);
> > + return -ENODEV;
> > + }
>
> Move the if (free) check directly after spin_unlock(&ubq->evts_lock)?
Yeah, this is better.
>
> > + return -EIOCBQUEUED;
>
> > +}
> > +
> > +static int ublk_handle_batch_fetch_cmd(struct ublk_batch_io_data *data)
> > +{
> > + struct ublk_queue *ubq = ublk_get_queue(data->ub, data->header.q_id);
> > + struct ublk_batch_fcmd *fcmd = ublk_batch_alloc_fcmd(data->cmd);
> > + struct ublk_uring_cmd_pdu *pdu = ublk_get_uring_cmd_pdu(data->cmd);
> > +
> > + if (!fcmd)
> > + return -ENOMEM;
> > +
> > + pdu->ubq = ubq;
> > + pdu->fcmd = fcmd;
> > + io_uring_cmd_mark_cancelable(data->cmd, data->issue_flags);
> > +
> > + return ublk_batch_attach(ubq, data, fcmd);
> > +}
> > +
> > +static int ublk_validate_batch_fetch_cmd(struct ublk_batch_io_data *data,
> > + const struct ublk_batch_io *uc)
> > +{
> > + if (!(data->cmd->flags & IORING_URING_CMD_MULTISHOT))
> > + return -EINVAL;
> > +
> > + if (uc->elem_bytes != sizeof(__u16))
> > + return -EINVAL;
> > +
> > + if (uc->flags != 0)
> > + return -E2BIG;
> > +
> > + return 0;
> > +}
> > +
> > static int ublk_ch_batch_io_uring_cmd(struct io_uring_cmd *cmd,
> > unsigned int issue_flags)
> > {
> > @@ -3113,6 +3448,11 @@ static int ublk_ch_batch_io_uring_cmd(struct io_uring_cmd *cmd,
> > if (data.header.q_id >= ub->dev_info.nr_hw_queues)
> > goto out;
> >
> > + if (unlikely(issue_flags & IO_URING_F_CANCEL)) {
> > + ublk_batch_cancel_fn(cmd, issue_flags);
> > + return 0;
> > + }
>
> Move this to the top of the function before the other logic that's not
> necessary in the cancel case?
Yeah, looks better.
Thanks,
Ming
On Mon, Dec 1, 2025 at 1:42 AM Ming Lei <ming.lei@redhat.com> wrote:
>
> On Sun, Nov 30, 2025 at 09:55:47PM -0800, Caleb Sander Mateos wrote:
> > On Thu, Nov 20, 2025 at 6:00 PM Ming Lei <ming.lei@redhat.com> wrote:
> > >
> > > Add UBLK_U_IO_FETCH_IO_CMDS command to enable efficient batch processing
> > > of I/O requests. This multishot uring_cmd allows the ublk server to fetch
> > > multiple I/O commands in a single operation, significantly reducing
> > > submission overhead compared to individual FETCH_REQ* commands.
> > >
> > > Key Design Features:
> > >
> > > 1. Multishot Operation: One UBLK_U_IO_FETCH_IO_CMDS can fetch many I/O
> > > commands, with the batch size limited by the provided buffer length.
> > >
> > > 2. Dynamic Load Balancing: Multiple fetch commands can be submitted
> > > simultaneously, but only one is active at any time. This enables
> > > efficient load distribution across multiple server task contexts.
> > >
> > > 3. Implicit State Management: The implementation uses three key variables
> > > to track state:
> > > - evts_fifo: Queue of request tags awaiting processing
> > > - fcmd_head: List of available fetch commands
> > > - active_fcmd: Currently active fetch command (NULL = none active)
> > >
> > > States are derived implicitly:
> > > - IDLE: No fetch commands available
> > > - READY: Fetch commands available, none active
> > > - ACTIVE: One fetch command processing events
> > >
> > > 4. Lockless Reader Optimization: The active fetch command can read from
> > > evts_fifo without locking (single reader guarantee), while writers
> > > (ublk_queue_rq/ublk_queue_rqs) use evts_lock protection. The memory
> > > barrier pairing plays key role for the single lockless reader
> > > optimization.
> > >
> > > Implementation Details:
> > >
> > > - ublk_queue_rq() and ublk_queue_rqs() save request tags to evts_fifo
> > > - __ublk_pick_active_fcmd() selects an available fetch command when
> > > events arrive and no command is currently active
> >
> > What is __ublk_pick_active_fcmd()? I don't see a function with that name.
>
> It is renamed as __ublk_acquire_fcmd(), and its counter pair is
> __ublk_release_fcmd().
Okay, update the commit message then?
>
> >
> > > - ublk_batch_dispatch() moves tags from evts_fifo to the fetch command's
> > > buffer and posts completion via io_uring_mshot_cmd_post_cqe()
> > > - State transitions are coordinated via evts_lock to maintain consistency
> > >
> > > Signed-off-by: Ming Lei <ming.lei@redhat.com>
> > > ---
> > > drivers/block/ublk_drv.c | 412 +++++++++++++++++++++++++++++++---
> > > include/uapi/linux/ublk_cmd.h | 7 +
> > > 2 files changed, 388 insertions(+), 31 deletions(-)
> > >
> > > diff --git a/drivers/block/ublk_drv.c b/drivers/block/ublk_drv.c
> > > index cc9c92d97349..2e5e392c939e 100644
> > > --- a/drivers/block/ublk_drv.c
> > > +++ b/drivers/block/ublk_drv.c
> > > @@ -93,6 +93,7 @@
> > >
> > > /* ublk batch fetch uring_cmd */
> > > struct ublk_batch_fcmd {
> > > + struct list_head node;
> > > struct io_uring_cmd *cmd;
> > > unsigned short buf_group;
> > > };
> > > @@ -117,7 +118,10 @@ struct ublk_uring_cmd_pdu {
> > > */
> > > struct ublk_queue *ubq;
> > >
> > > - u16 tag;
> > > + union {
> > > + u16 tag;
> > > + struct ublk_batch_fcmd *fcmd; /* batch io only */
> > > + };
> > > };
> > >
> > > struct ublk_batch_io_data {
> > > @@ -229,18 +233,36 @@ struct ublk_queue {
> > > struct ublk_device *dev;
> > >
> > > /*
> > > - * Inflight ublk request tag is saved in this fifo
> > > + * Batch I/O State Management:
> > > + *
> > > + * The batch I/O system uses implicit state management based on the
> > > + * combination of three key variables below.
> > > + *
> > > + * - IDLE: list_empty(&fcmd_head) && !active_fcmd
> > > + * No fetch commands available, events queue in evts_fifo
> > > + *
> > > + * - READY: !list_empty(&fcmd_head) && !active_fcmd
> > > + * Fetch commands available but none processing events
> > > *
> > > - * There are multiple writer from ublk_queue_rq() or ublk_queue_rqs(),
> > > - * so lock is required for storing request tag to fifo
> > > + * - ACTIVE: active_fcmd
> > > + * One fetch command actively processing events from evts_fifo
> > > *
> > > - * Make sure just one reader for fetching request from task work
> > > - * function to ublk server, so no need to grab the lock in reader
> > > - * side.
> > > + * Key Invariants:
> > > + * - At most one active_fcmd at any time (single reader)
> > > + * - active_fcmd is always from fcmd_head list when non-NULL
> > > + * - evts_fifo can be read locklessly by the single active reader
> > > + * - All state transitions require evts_lock protection
> > > + * - Multiple writers to evts_fifo require lock protection
> > > */
> > > struct {
> > > DECLARE_KFIFO_PTR(evts_fifo, unsigned short);
> > > spinlock_t evts_lock;
> > > +
> > > + /* List of fetch commands available to process events */
> > > + struct list_head fcmd_head;
> > > +
> > > + /* Currently active fetch command (NULL = none active) */
> > > + struct ublk_batch_fcmd *active_fcmd;
> > > }____cacheline_aligned_in_smp;
> > >
> > > struct ublk_io ios[] __counted_by(q_depth);
> > > @@ -292,12 +314,20 @@ static void ublk_abort_queue(struct ublk_device *ub, struct ublk_queue *ubq);
> > > static inline struct request *__ublk_check_and_get_req(struct ublk_device *ub,
> > > u16 q_id, u16 tag, struct ublk_io *io, size_t offset);
> > > static inline unsigned int ublk_req_build_flags(struct request *req);
> > > +static void ublk_batch_dispatch(struct ublk_queue *ubq,
> > > + struct ublk_batch_io_data *data,
> > > + struct ublk_batch_fcmd *fcmd);
> > >
> > > static inline bool ublk_dev_support_batch_io(const struct ublk_device *ub)
> > > {
> > > return false;
> > > }
> > >
> > > +static inline bool ublk_support_batch_io(const struct ublk_queue *ubq)
> > > +{
> > > + return false;
> > > +}
> > > +
> > > static inline void ublk_io_lock(struct ublk_io *io)
> > > {
> > > spin_lock(&io->lock);
> > > @@ -624,13 +654,45 @@ static wait_queue_head_t ublk_idr_wq; /* wait until one idr is freed */
> > >
> > > static DEFINE_MUTEX(ublk_ctl_mutex);
> > >
> > > +static struct ublk_batch_fcmd *
> > > +ublk_batch_alloc_fcmd(struct io_uring_cmd *cmd)
> > > +{
> > > + struct ublk_batch_fcmd *fcmd = kzalloc(sizeof(*fcmd), GFP_NOIO);
> >
> > An allocation in the I/O path seems unfortunate. Is there not room to
> > store the struct ublk_batch_fcmd in the io_uring_cmd pdu?
>
> It is allocated once for one mshot request, which covers many IOs.
>
> It can't be held in uring_cmd pdu, but the allocation can be optimized in
> future. Not a big deal in enablement stage.
Okay, seems fine to optimize it in the future.
>
> > > +
> > > + if (fcmd) {
> > > + fcmd->cmd = cmd;
> > > + fcmd->buf_group = READ_ONCE(cmd->sqe->buf_index);
> >
> > Is it necessary to store sample this here just to pass it back to the
> > io_uring layer? Wouldn't the io_uring layer already have access to it
> > in struct io_kiocb's buf_index field?
>
> ->buf_group is used by io_uring_cmd_buffer_select(), and this way also
> follows ->buf_index uses in both io_uring/net.c and io_uring/rw.c.
>
>
> io_ring_buffer_select(), so we can't reuse req->buf_index here.
But io_uring/net.c and io_uring/rw.c both retrieve the buf_group value
from req->buf_index instead of the SQE, for example:
if (req->flags & REQ_F_BUFFER_SELECT)
sr->buf_group = req->buf_index;
Seems like it would make sense to do the same for
UBLK_U_IO_FETCH_IO_CMDS. That also saves one pointer dereference here.
>
> >
> > > + }
> > > + return fcmd;
> > > +}
> > > +
> > > +static void ublk_batch_free_fcmd(struct ublk_batch_fcmd *fcmd)
> > > +{
> > > + kfree(fcmd);
> > > +}
> > > +
> > > +static void __ublk_release_fcmd(struct ublk_queue *ubq)
> > > +{
> > > + WRITE_ONCE(ubq->active_fcmd, NULL);
> > > +}
> > >
> > > -static void ublk_batch_deinit_fetch_buf(const struct ublk_batch_io_data *data,
> > > +/*
> > > + * Nothing can move on, so clear ->active_fcmd, and the caller should stop
> > > + * dispatching
> > > + */
> > > +static void ublk_batch_deinit_fetch_buf(struct ublk_queue *ubq,
> > > + const struct ublk_batch_io_data *data,
> > > struct ublk_batch_fcmd *fcmd,
> > > int res)
> > > {
> > > + spin_lock(&ubq->evts_lock);
> > > + list_del(&fcmd->node);
> > > + WARN_ON_ONCE(fcmd != ubq->active_fcmd);
> > > + __ublk_release_fcmd(ubq);
> > > + spin_unlock(&ubq->evts_lock);
> > > +
> > > io_uring_cmd_done(fcmd->cmd, res, data->issue_flags);
> > > - fcmd->cmd = NULL;
> > > + ublk_batch_free_fcmd(fcmd);
> > > }
> > >
> > > static int ublk_batch_fetch_post_cqe(struct ublk_batch_fcmd *fcmd,
> > > @@ -1491,6 +1553,8 @@ static int __ublk_batch_dispatch(struct ublk_queue *ubq,
> > > bool needs_filter;
> > > int ret;
> > >
> > > + WARN_ON_ONCE(data->cmd != fcmd->cmd);
> > > +
> > > sel = io_uring_cmd_buffer_select(fcmd->cmd, fcmd->buf_group, &len,
> > > data->issue_flags);
> > > if (sel.val < 0)
> > > @@ -1548,23 +1612,94 @@ static int __ublk_batch_dispatch(struct ublk_queue *ubq,
> > > return ret;
> > > }
> > >
> > > -static __maybe_unused int
> > > -ublk_batch_dispatch(struct ublk_queue *ubq,
> > > - const struct ublk_batch_io_data *data,
> > > - struct ublk_batch_fcmd *fcmd)
> > > +static struct ublk_batch_fcmd *__ublk_acquire_fcmd(
> > > + struct ublk_queue *ubq)
> > > +{
> > > + struct ublk_batch_fcmd *fcmd;
> > > +
> > > + lockdep_assert_held(&ubq->evts_lock);
> > > +
> > > + /*
> > > + * Ordering updating ubq->evts_fifo and checking ubq->active_fcmd.
> > > + *
> > > + * The pair is the smp_mb() in ublk_batch_dispatch().
> > > + *
> > > + * If ubq->active_fcmd is observed as non-NULL, the new added tags
> > > + * can be visisible in ublk_batch_dispatch() with the barrier pairing.
> > > + */
> > > + smp_mb();
> > > + if (READ_ONCE(ubq->active_fcmd)) {
> > > + fcmd = NULL;
> > > + } else {
> > > + fcmd = list_first_entry_or_null(&ubq->fcmd_head,
> > > + struct ublk_batch_fcmd, node);
> > > + WRITE_ONCE(ubq->active_fcmd, fcmd);
> > > + }
> > > + return fcmd;
> > > +}
> > > +
> > > +static void ublk_batch_tw_cb(struct io_uring_cmd *cmd,
> > > + unsigned int issue_flags)
> > > +{
> > > + struct ublk_uring_cmd_pdu *pdu = ublk_get_uring_cmd_pdu(cmd);
> > > + struct ublk_batch_fcmd *fcmd = pdu->fcmd;
> > > + struct ublk_batch_io_data data = {
> > > + .ub = pdu->ubq->dev,
> > > + .cmd = fcmd->cmd,
> > > + .issue_flags = issue_flags,
> > > + };
> > > +
> > > + WARN_ON_ONCE(pdu->ubq->active_fcmd != fcmd);
> > > +
> > > + ublk_batch_dispatch(pdu->ubq, &data, fcmd);
> > > +}
> > > +
> > > +static void ublk_batch_dispatch(struct ublk_queue *ubq,
> > > + struct ublk_batch_io_data *data,
> > > + struct ublk_batch_fcmd *fcmd)
> > > {
> > > + struct ublk_batch_fcmd *new_fcmd;
> >
> > Is the new_fcmd variable necessary? Can fcmd be reused instead?
> >
> > > + void *handle;
> > > + bool empty;
> > > int ret = 0;
> > >
> > > +again:
> > > while (!ublk_io_evts_empty(ubq)) {
> > > ret = __ublk_batch_dispatch(ubq, data, fcmd);
> > > if (ret <= 0)
> > > break;
> > > }
> > >
> > > - if (ret < 0)
> > > - ublk_batch_deinit_fetch_buf(data, fcmd, ret);
> > > + if (ret < 0) {
> > > + ublk_batch_deinit_fetch_buf(ubq, data, fcmd, ret);
> > > + return;
> > > + }
> > >
> > > - return ret;
> > > + handle = io_uring_cmd_ctx_handle(fcmd->cmd);
> > > + __ublk_release_fcmd(ubq);
> > > + /*
> > > + * Order clearing ubq->active_fcmd from __ublk_release_fcmd() and
> > > + * checking ubq->evts_fifo.
> > > + *
> > > + * The pair is the smp_mb() in __ublk_acquire_fcmd().
> > > + */
> > > + smp_mb();
> > > + empty = ublk_io_evts_empty(ubq);
> > > + if (likely(empty))
> >
> > nit: empty variable seems unnecessary
> >
> > > + return;
> > > +
> > > + spin_lock(&ubq->evts_lock);
> > > + new_fcmd = __ublk_acquire_fcmd(ubq);
> > > + spin_unlock(&ubq->evts_lock);
> > > +
> > > + if (!new_fcmd)
> > > + return;
> > > + if (handle == io_uring_cmd_ctx_handle(new_fcmd->cmd)) {
> >
> > This check seems to be meant to decide whether the new and old
> > UBLK_U_IO_FETCH_IO_CMDS commands can execute in the same task work?
>
> Actually not.
>
> > But belonging to the same io_uring context doesn't necessarily mean
> > that the same task issued them. It seems like it would be safer to
> > always dispatch new_fcmd->cmd to task work.
>
> What matters is just that ctx->uring_lock & issue_flag matches from ublk
> viewpoint, so it is safe to do so.
Okay, that makes sense.
>
> However, given it is hit in slow path, so starting new dispatch
> is easier.
Yeah, I'd agree it makes sense to keep the unexpected path code
simpler. There may also be fairness concerns from looping indefinitely
here if the evts_fifo continues to be nonempty, so dispatching to task
work seems safer.
>
> >
> > > + data->cmd = new_fcmd->cmd;
> > > + fcmd = new_fcmd;
> > > + goto again;
> > > + }
> > > + io_uring_cmd_complete_in_task(new_fcmd->cmd, ublk_batch_tw_cb);
> > > }
> > >
> > > static void ublk_cmd_tw_cb(struct io_uring_cmd *cmd,
> > > @@ -1576,13 +1711,27 @@ static void ublk_cmd_tw_cb(struct io_uring_cmd *cmd,
> > > ublk_dispatch_req(ubq, pdu->req, issue_flags);
> > > }
> > >
> > > -static void ublk_queue_cmd(struct ublk_queue *ubq, struct request *rq)
> > > +static void ublk_queue_cmd(struct ublk_queue *ubq, struct request *rq, bool last)
> > > {
> > > - struct io_uring_cmd *cmd = ubq->ios[rq->tag].cmd;
> > > - struct ublk_uring_cmd_pdu *pdu = ublk_get_uring_cmd_pdu(cmd);
> > > + if (ublk_support_batch_io(ubq)) {
> > > + unsigned short tag = rq->tag;
> > > + struct ublk_batch_fcmd *fcmd = NULL;
> > >
> > > - pdu->req = rq;
> > > - io_uring_cmd_complete_in_task(cmd, ublk_cmd_tw_cb);
> > > + spin_lock(&ubq->evts_lock);
> > > + kfifo_put(&ubq->evts_fifo, tag);
> > > + if (last)
> > > + fcmd = __ublk_acquire_fcmd(ubq);
> > > + spin_unlock(&ubq->evts_lock);
> > > +
> > > + if (fcmd)
> > > + io_uring_cmd_complete_in_task(fcmd->cmd, ublk_batch_tw_cb);
> > > + } else {
> > > + struct io_uring_cmd *cmd = ubq->ios[rq->tag].cmd;
> > > + struct ublk_uring_cmd_pdu *pdu = ublk_get_uring_cmd_pdu(cmd);
> > > +
> > > + pdu->req = rq;
> > > + io_uring_cmd_complete_in_task(cmd, ublk_cmd_tw_cb);
> > > + }
> > > }
> > >
> > > static void ublk_cmd_list_tw_cb(struct io_uring_cmd *cmd,
> > > @@ -1600,14 +1749,44 @@ static void ublk_cmd_list_tw_cb(struct io_uring_cmd *cmd,
> > > } while (rq);
> > > }
> > >
> > > -static void ublk_queue_cmd_list(struct ublk_io *io, struct rq_list *l)
> > > +static void ublk_batch_queue_cmd_list(struct ublk_queue *ubq, struct rq_list *l)
> > > {
> > > - struct io_uring_cmd *cmd = io->cmd;
> > > - struct ublk_uring_cmd_pdu *pdu = ublk_get_uring_cmd_pdu(cmd);
> > > + unsigned short tags[MAX_NR_TAG];
> > > + struct ublk_batch_fcmd *fcmd;
> > > + struct request *rq;
> > > + unsigned cnt = 0;
> > > +
> > > + spin_lock(&ubq->evts_lock);
> > > + rq_list_for_each(l, rq) {
> > > + tags[cnt++] = (unsigned short)rq->tag;
> > > + if (cnt >= MAX_NR_TAG) {
> > > + kfifo_in(&ubq->evts_fifo, tags, cnt);
> > > + cnt = 0;
> > > + }
> > > + }
> > > + if (cnt)
> > > + kfifo_in(&ubq->evts_fifo, tags, cnt);
> > > + fcmd = __ublk_acquire_fcmd(ubq);
> > > + spin_unlock(&ubq->evts_lock);
> > >
> > > - pdu->req_list = rq_list_peek(l);
> > > rq_list_init(l);
> > > - io_uring_cmd_complete_in_task(cmd, ublk_cmd_list_tw_cb);
> > > + if (fcmd)
> > > + io_uring_cmd_complete_in_task(fcmd->cmd, ublk_batch_tw_cb);
> > > +}
> > > +
> > > +static void ublk_queue_cmd_list(struct ublk_queue *ubq, struct ublk_io *io,
> > > + struct rq_list *l, bool batch)
> > > +{
> > > + if (batch) {
> > > + ublk_batch_queue_cmd_list(ubq, l);
> > > + } else {
> > > + struct io_uring_cmd *cmd = io->cmd;
> > > + struct ublk_uring_cmd_pdu *pdu = ublk_get_uring_cmd_pdu(cmd);
> > > +
> > > + pdu->req_list = rq_list_peek(l);
> > > + rq_list_init(l);
> > > + io_uring_cmd_complete_in_task(cmd, ublk_cmd_list_tw_cb);
> > > + }
> > > }
> > >
> > > static enum blk_eh_timer_return ublk_timeout(struct request *rq)
> > > @@ -1686,7 +1865,7 @@ static blk_status_t ublk_queue_rq(struct blk_mq_hw_ctx *hctx,
> > > return BLK_STS_OK;
> > > }
> > >
> > > - ublk_queue_cmd(ubq, rq);
> > > + ublk_queue_cmd(ubq, rq, bd->last);
> > > return BLK_STS_OK;
> > > }
> > >
> > > @@ -1698,11 +1877,25 @@ static inline bool ublk_belong_to_same_batch(const struct ublk_io *io,
> > > (io->task == io2->task);
> > > }
> > >
> > > -static void ublk_queue_rqs(struct rq_list *rqlist)
> > > +static void ublk_commit_rqs(struct blk_mq_hw_ctx *hctx)
> > > +{
> > > + struct ublk_queue *ubq = hctx->driver_data;
> > > + struct ublk_batch_fcmd *fcmd;
> > > +
> > > + spin_lock(&ubq->evts_lock);
> > > + fcmd = __ublk_acquire_fcmd(ubq);
> > > + spin_unlock(&ubq->evts_lock);
> > > +
> > > + if (fcmd)
> > > + io_uring_cmd_complete_in_task(fcmd->cmd, ublk_batch_tw_cb);
> > > +}
> > > +
> > > +static void __ublk_queue_rqs(struct rq_list *rqlist, bool batch)
> > > {
> > > struct rq_list requeue_list = { };
> > > struct rq_list submit_list = { };
> > > struct ublk_io *io = NULL;
> > > + struct ublk_queue *ubq = NULL;
> > > struct request *req;
> > >
> > > while ((req = rq_list_pop(rqlist))) {
> > > @@ -1716,16 +1909,27 @@ static void ublk_queue_rqs(struct rq_list *rqlist)
> > >
> > > if (io && !ublk_belong_to_same_batch(io, this_io) &&
> > > !rq_list_empty(&submit_list))
> > > - ublk_queue_cmd_list(io, &submit_list);
> > > + ublk_queue_cmd_list(ubq, io, &submit_list, batch);
> >
> > This seems to assume that all the requests belong to the same
> > ublk_queue, which isn't required
>
> Here, it is required for BATCH_IO, which needs new __ublk_queue_rqs()
> implementation now.
>
> Nice catch!
>
> >
> > > io = this_io;
> > > + ubq = this_q;
> > > rq_list_add_tail(&submit_list, req);
> > > }
> > >
> > > if (!rq_list_empty(&submit_list))
> > > - ublk_queue_cmd_list(io, &submit_list);
> > > + ublk_queue_cmd_list(ubq, io, &submit_list, batch);
> >
> > Same here
> >
> > > *rqlist = requeue_list;
> > > }
> > >
> > > +static void ublk_queue_rqs(struct rq_list *rqlist)
> > > +{
> > > + __ublk_queue_rqs(rqlist, false);
> > > +}
> > > +
> > > +static void ublk_batch_queue_rqs(struct rq_list *rqlist)
> > > +{
> > > + __ublk_queue_rqs(rqlist, true);
> > > +}
> > > +
> > > static int ublk_init_hctx(struct blk_mq_hw_ctx *hctx, void *driver_data,
> > > unsigned int hctx_idx)
> > > {
> > > @@ -1743,6 +1947,14 @@ static const struct blk_mq_ops ublk_mq_ops = {
> > > .timeout = ublk_timeout,
> > > };
> > >
> > > +static const struct blk_mq_ops ublk_batch_mq_ops = {
> > > + .commit_rqs = ublk_commit_rqs,
> > > + .queue_rq = ublk_queue_rq,
> > > + .queue_rqs = ublk_batch_queue_rqs,
> > > + .init_hctx = ublk_init_hctx,
> > > + .timeout = ublk_timeout,
> > > +};
> > > +
> > > static void ublk_queue_reinit(struct ublk_device *ub, struct ublk_queue *ubq)
> > > {
> > > int i;
> > > @@ -2120,6 +2332,56 @@ static void ublk_cancel_cmd(struct ublk_queue *ubq, unsigned tag,
> > > io_uring_cmd_done(io->cmd, UBLK_IO_RES_ABORT, issue_flags);
> > > }
> > >
> > > +static void ublk_batch_cancel_cmd(struct ublk_queue *ubq,
> > > + struct ublk_batch_fcmd *fcmd,
> > > + unsigned int issue_flags)
> > > +{
> > > + bool done;
> > > +
> > > + spin_lock(&ubq->evts_lock);
> > > + done = (ubq->active_fcmd != fcmd);
> >
> > Needs to use READ_ONCE() since __ublk_release_fcmd() can be called
> > without holding evts_lock?
>
> OK.
>
> >
> > > + if (done)
> > > + list_del(&fcmd->node);
> > > + spin_unlock(&ubq->evts_lock);
> > > +
> > > + if (done) {
> > > + io_uring_cmd_done(fcmd->cmd, UBLK_IO_RES_ABORT, issue_flags);
> > > + ublk_batch_free_fcmd(fcmd);
> > > + }
> > > +}
> > > +
> > > +static void ublk_batch_cancel_queue(struct ublk_queue *ubq)
> > > +{
> > > + LIST_HEAD(fcmd_list);
> > > +
> > > + spin_lock(&ubq->evts_lock);
> > > + ubq->force_abort = true;
> > > + list_splice_init(&ubq->fcmd_head, &fcmd_list);
> > > + if (ubq->active_fcmd)
> > > + list_move(&ubq->active_fcmd->node, &ubq->fcmd_head);
> >
> > Similarly, needs READ_ONCE()?
>
> OK.
>
> But this one may not be necessary, since now everything is just quiesced,
> and the lockless code path won't hit any more.
Good point. I think a comment to that effect would be helpful.
Best,
Caleb
>
> >
> > > + spin_unlock(&ubq->evts_lock);
> > > +
> > > + while (!list_empty(&fcmd_list)) {
> > > + struct ublk_batch_fcmd *fcmd = list_first_entry(&fcmd_list,
> > > + struct ublk_batch_fcmd, node);
> > > +
> > > + ublk_batch_cancel_cmd(ubq, fcmd, IO_URING_F_UNLOCKED);
> > > + }
> > > +}
> > > +
> > > +static void ublk_batch_cancel_fn(struct io_uring_cmd *cmd,
> > > + unsigned int issue_flags)
> > > +{
> > > + struct ublk_uring_cmd_pdu *pdu = ublk_get_uring_cmd_pdu(cmd);
> > > + struct ublk_batch_fcmd *fcmd = pdu->fcmd;
> > > + struct ublk_queue *ubq = pdu->ubq;
> > > +
> > > + if (!ubq->canceling)
> >
> > Is it not racy to access ubq->canceling without any lock held?
>
> OK, will switch to call ublk_start_cancel() unconditionally.
>
> >
> > > + ublk_start_cancel(ubq->dev);
> > > +
> > > + ublk_batch_cancel_cmd(ubq, fcmd, issue_flags);
> > > +}
> > > +
> > > /*
> > > * The ublk char device won't be closed when calling cancel fn, so both
> > > * ublk device and queue are guaranteed to be live
> > > @@ -2171,6 +2433,11 @@ static void ublk_cancel_queue(struct ublk_queue *ubq)
> > > {
> > > int i;
> > >
> > > + if (ublk_support_batch_io(ubq)) {
> > > + ublk_batch_cancel_queue(ubq);
> > > + return;
> > > + }
> > > +
> > > for (i = 0; i < ubq->q_depth; i++)
> > > ublk_cancel_cmd(ubq, i, IO_URING_F_UNLOCKED);
> > > }
> > > @@ -3091,6 +3358,74 @@ static int ublk_check_batch_cmd(const struct ublk_batch_io_data *data)
> > > return ublk_check_batch_cmd_flags(uc);
> > > }
> > >
> > > +static int ublk_batch_attach(struct ublk_queue *ubq,
> > > + struct ublk_batch_io_data *data,
> > > + struct ublk_batch_fcmd *fcmd)
> > > +{
> > > + struct ublk_batch_fcmd *new_fcmd = NULL;
> > > + bool free = false;
> > > +
> > > + spin_lock(&ubq->evts_lock);
> > > + if (unlikely(ubq->force_abort || ubq->canceling)) {
> > > + free = true;
> > > + } else {
> > > + list_add_tail(&fcmd->node, &ubq->fcmd_head);
> > > + new_fcmd = __ublk_acquire_fcmd(ubq);
> > > + }
> > > + spin_unlock(&ubq->evts_lock);
> > > +
> > > + /*
> > > + * If the two fetch commands are originated from same io_ring_ctx,
> > > + * run batch dispatch directly. Otherwise, schedule task work for
> > > + * doing it.
> > > + */
> > > + if (new_fcmd && io_uring_cmd_ctx_handle(new_fcmd->cmd) ==
> > > + io_uring_cmd_ctx_handle(fcmd->cmd)) {
> > > + data->cmd = new_fcmd->cmd;
> > > + ublk_batch_dispatch(ubq, data, new_fcmd);
> > > + } else if (new_fcmd) {
> > > + io_uring_cmd_complete_in_task(new_fcmd->cmd,
> > > + ublk_batch_tw_cb);
> > > + }
> >
> > Return early if (!new_fcmd) to reduce indentation?
> >
> > > +
> > > + if (free) {
> > > + ublk_batch_free_fcmd(fcmd);
> > > + return -ENODEV;
> > > + }
> >
> > Move the if (free) check directly after spin_unlock(&ubq->evts_lock)?
>
> Yeah, this is better.
>
> >
> > > + return -EIOCBQUEUED;
> >
> > > +}
> > > +
> > > +static int ublk_handle_batch_fetch_cmd(struct ublk_batch_io_data *data)
> > > +{
> > > + struct ublk_queue *ubq = ublk_get_queue(data->ub, data->header.q_id);
> > > + struct ublk_batch_fcmd *fcmd = ublk_batch_alloc_fcmd(data->cmd);
> > > + struct ublk_uring_cmd_pdu *pdu = ublk_get_uring_cmd_pdu(data->cmd);
> > > +
> > > + if (!fcmd)
> > > + return -ENOMEM;
> > > +
> > > + pdu->ubq = ubq;
> > > + pdu->fcmd = fcmd;
> > > + io_uring_cmd_mark_cancelable(data->cmd, data->issue_flags);
> > > +
> > > + return ublk_batch_attach(ubq, data, fcmd);
> > > +}
> > > +
> > > +static int ublk_validate_batch_fetch_cmd(struct ublk_batch_io_data *data,
> > > + const struct ublk_batch_io *uc)
> > > +{
> > > + if (!(data->cmd->flags & IORING_URING_CMD_MULTISHOT))
> > > + return -EINVAL;
> > > +
> > > + if (uc->elem_bytes != sizeof(__u16))
> > > + return -EINVAL;
> > > +
> > > + if (uc->flags != 0)
> > > + return -E2BIG;
> > > +
> > > + return 0;
> > > +}
> > > +
> > > static int ublk_ch_batch_io_uring_cmd(struct io_uring_cmd *cmd,
> > > unsigned int issue_flags)
> > > {
> > > @@ -3113,6 +3448,11 @@ static int ublk_ch_batch_io_uring_cmd(struct io_uring_cmd *cmd,
> > > if (data.header.q_id >= ub->dev_info.nr_hw_queues)
> > > goto out;
> > >
> > > + if (unlikely(issue_flags & IO_URING_F_CANCEL)) {
> > > + ublk_batch_cancel_fn(cmd, issue_flags);
> > > + return 0;
> > > + }
> >
> > Move this to the top of the function before the other logic that's not
> > necessary in the cancel case?
>
> Yeah, looks better.
>
>
> Thanks,
> Ming
>
On Mon, Dec 01, 2025 at 09:51:59AM -0800, Caleb Sander Mateos wrote:
> On Mon, Dec 1, 2025 at 1:42 AM Ming Lei <ming.lei@redhat.com> wrote:
> >
> > On Sun, Nov 30, 2025 at 09:55:47PM -0800, Caleb Sander Mateos wrote:
> > > On Thu, Nov 20, 2025 at 6:00 PM Ming Lei <ming.lei@redhat.com> wrote:
> > > >
> > > > Add UBLK_U_IO_FETCH_IO_CMDS command to enable efficient batch processing
> > > > of I/O requests. This multishot uring_cmd allows the ublk server to fetch
> > > > multiple I/O commands in a single operation, significantly reducing
> > > > submission overhead compared to individual FETCH_REQ* commands.
> > > >
> > > > Key Design Features:
> > > >
> > > > 1. Multishot Operation: One UBLK_U_IO_FETCH_IO_CMDS can fetch many I/O
> > > > commands, with the batch size limited by the provided buffer length.
> > > >
> > > > 2. Dynamic Load Balancing: Multiple fetch commands can be submitted
> > > > simultaneously, but only one is active at any time. This enables
> > > > efficient load distribution across multiple server task contexts.
> > > >
> > > > 3. Implicit State Management: The implementation uses three key variables
> > > > to track state:
> > > > - evts_fifo: Queue of request tags awaiting processing
> > > > - fcmd_head: List of available fetch commands
> > > > - active_fcmd: Currently active fetch command (NULL = none active)
> > > >
> > > > States are derived implicitly:
> > > > - IDLE: No fetch commands available
> > > > - READY: Fetch commands available, none active
> > > > - ACTIVE: One fetch command processing events
> > > >
> > > > 4. Lockless Reader Optimization: The active fetch command can read from
> > > > evts_fifo without locking (single reader guarantee), while writers
> > > > (ublk_queue_rq/ublk_queue_rqs) use evts_lock protection. The memory
> > > > barrier pairing plays key role for the single lockless reader
> > > > optimization.
> > > >
> > > > Implementation Details:
> > > >
> > > > - ublk_queue_rq() and ublk_queue_rqs() save request tags to evts_fifo
> > > > - __ublk_pick_active_fcmd() selects an available fetch command when
> > > > events arrive and no command is currently active
> > >
> > > What is __ublk_pick_active_fcmd()? I don't see a function with that name.
> >
> > It is renamed as __ublk_acquire_fcmd(), and its counter pair is
> > __ublk_release_fcmd().
>
> Okay, update the commit message then?
>
> >
> > >
> > > > - ublk_batch_dispatch() moves tags from evts_fifo to the fetch command's
> > > > buffer and posts completion via io_uring_mshot_cmd_post_cqe()
> > > > - State transitions are coordinated via evts_lock to maintain consistency
> > > >
> > > > Signed-off-by: Ming Lei <ming.lei@redhat.com>
> > > > ---
> > > > drivers/block/ublk_drv.c | 412 +++++++++++++++++++++++++++++++---
> > > > include/uapi/linux/ublk_cmd.h | 7 +
> > > > 2 files changed, 388 insertions(+), 31 deletions(-)
> > > >
> > > > diff --git a/drivers/block/ublk_drv.c b/drivers/block/ublk_drv.c
> > > > index cc9c92d97349..2e5e392c939e 100644
> > > > --- a/drivers/block/ublk_drv.c
> > > > +++ b/drivers/block/ublk_drv.c
> > > > @@ -93,6 +93,7 @@
> > > >
> > > > /* ublk batch fetch uring_cmd */
> > > > struct ublk_batch_fcmd {
> > > > + struct list_head node;
> > > > struct io_uring_cmd *cmd;
> > > > unsigned short buf_group;
> > > > };
> > > > @@ -117,7 +118,10 @@ struct ublk_uring_cmd_pdu {
> > > > */
> > > > struct ublk_queue *ubq;
> > > >
> > > > - u16 tag;
> > > > + union {
> > > > + u16 tag;
> > > > + struct ublk_batch_fcmd *fcmd; /* batch io only */
> > > > + };
> > > > };
> > > >
> > > > struct ublk_batch_io_data {
> > > > @@ -229,18 +233,36 @@ struct ublk_queue {
> > > > struct ublk_device *dev;
> > > >
> > > > /*
> > > > - * Inflight ublk request tag is saved in this fifo
> > > > + * Batch I/O State Management:
> > > > + *
> > > > + * The batch I/O system uses implicit state management based on the
> > > > + * combination of three key variables below.
> > > > + *
> > > > + * - IDLE: list_empty(&fcmd_head) && !active_fcmd
> > > > + * No fetch commands available, events queue in evts_fifo
> > > > + *
> > > > + * - READY: !list_empty(&fcmd_head) && !active_fcmd
> > > > + * Fetch commands available but none processing events
> > > > *
> > > > - * There are multiple writer from ublk_queue_rq() or ublk_queue_rqs(),
> > > > - * so lock is required for storing request tag to fifo
> > > > + * - ACTIVE: active_fcmd
> > > > + * One fetch command actively processing events from evts_fifo
> > > > *
> > > > - * Make sure just one reader for fetching request from task work
> > > > - * function to ublk server, so no need to grab the lock in reader
> > > > - * side.
> > > > + * Key Invariants:
> > > > + * - At most one active_fcmd at any time (single reader)
> > > > + * - active_fcmd is always from fcmd_head list when non-NULL
> > > > + * - evts_fifo can be read locklessly by the single active reader
> > > > + * - All state transitions require evts_lock protection
> > > > + * - Multiple writers to evts_fifo require lock protection
> > > > */
> > > > struct {
> > > > DECLARE_KFIFO_PTR(evts_fifo, unsigned short);
> > > > spinlock_t evts_lock;
> > > > +
> > > > + /* List of fetch commands available to process events */
> > > > + struct list_head fcmd_head;
> > > > +
> > > > + /* Currently active fetch command (NULL = none active) */
> > > > + struct ublk_batch_fcmd *active_fcmd;
> > > > }____cacheline_aligned_in_smp;
> > > >
> > > > struct ublk_io ios[] __counted_by(q_depth);
> > > > @@ -292,12 +314,20 @@ static void ublk_abort_queue(struct ublk_device *ub, struct ublk_queue *ubq);
> > > > static inline struct request *__ublk_check_and_get_req(struct ublk_device *ub,
> > > > u16 q_id, u16 tag, struct ublk_io *io, size_t offset);
> > > > static inline unsigned int ublk_req_build_flags(struct request *req);
> > > > +static void ublk_batch_dispatch(struct ublk_queue *ubq,
> > > > + struct ublk_batch_io_data *data,
> > > > + struct ublk_batch_fcmd *fcmd);
> > > >
> > > > static inline bool ublk_dev_support_batch_io(const struct ublk_device *ub)
> > > > {
> > > > return false;
> > > > }
> > > >
> > > > +static inline bool ublk_support_batch_io(const struct ublk_queue *ubq)
> > > > +{
> > > > + return false;
> > > > +}
> > > > +
> > > > static inline void ublk_io_lock(struct ublk_io *io)
> > > > {
> > > > spin_lock(&io->lock);
> > > > @@ -624,13 +654,45 @@ static wait_queue_head_t ublk_idr_wq; /* wait until one idr is freed */
> > > >
> > > > static DEFINE_MUTEX(ublk_ctl_mutex);
> > > >
> > > > +static struct ublk_batch_fcmd *
> > > > +ublk_batch_alloc_fcmd(struct io_uring_cmd *cmd)
> > > > +{
> > > > + struct ublk_batch_fcmd *fcmd = kzalloc(sizeof(*fcmd), GFP_NOIO);
> > >
> > > An allocation in the I/O path seems unfortunate. Is there not room to
> > > store the struct ublk_batch_fcmd in the io_uring_cmd pdu?
> >
> > It is allocated once for one mshot request, which covers many IOs.
> >
> > It can't be held in uring_cmd pdu, but the allocation can be optimized in
> > future. Not a big deal in enablement stage.
>
> Okay, seems fine to optimize it in the future.
>
> >
> > > > +
> > > > + if (fcmd) {
> > > > + fcmd->cmd = cmd;
> > > > + fcmd->buf_group = READ_ONCE(cmd->sqe->buf_index);
> > >
> > > Is it necessary to store sample this here just to pass it back to the
> > > io_uring layer? Wouldn't the io_uring layer already have access to it
> > > in struct io_kiocb's buf_index field?
> >
> > ->buf_group is used by io_uring_cmd_buffer_select(), and this way also
> > follows ->buf_index uses in both io_uring/net.c and io_uring/rw.c.
> >
> >
> > io_ring_buffer_select(), so we can't reuse req->buf_index here.
>
> But io_uring/net.c and io_uring/rw.c both retrieve the buf_group value
> from req->buf_index instead of the SQE, for example:
> if (req->flags & REQ_F_BUFFER_SELECT)
> sr->buf_group = req->buf_index;
>
> Seems like it would make sense to do the same for
> UBLK_U_IO_FETCH_IO_CMDS. That also saves one pointer dereference here.
IMO we shouldn't encourage driver to access `io_kiocb`, however, cmd->sqe
is exposed to driver explicitly.
>
> >
> > >
> > > > + }
> > > > + return fcmd;
> > > > +}
> > > > +
> > > > +static void ublk_batch_free_fcmd(struct ublk_batch_fcmd *fcmd)
> > > > +{
> > > > + kfree(fcmd);
> > > > +}
> > > > +
> > > > +static void __ublk_release_fcmd(struct ublk_queue *ubq)
> > > > +{
> > > > + WRITE_ONCE(ubq->active_fcmd, NULL);
> > > > +}
> > > >
> > > > -static void ublk_batch_deinit_fetch_buf(const struct ublk_batch_io_data *data,
> > > > +/*
> > > > + * Nothing can move on, so clear ->active_fcmd, and the caller should stop
> > > > + * dispatching
> > > > + */
> > > > +static void ublk_batch_deinit_fetch_buf(struct ublk_queue *ubq,
> > > > + const struct ublk_batch_io_data *data,
> > > > struct ublk_batch_fcmd *fcmd,
> > > > int res)
> > > > {
> > > > + spin_lock(&ubq->evts_lock);
> > > > + list_del(&fcmd->node);
> > > > + WARN_ON_ONCE(fcmd != ubq->active_fcmd);
> > > > + __ublk_release_fcmd(ubq);
> > > > + spin_unlock(&ubq->evts_lock);
> > > > +
> > > > io_uring_cmd_done(fcmd->cmd, res, data->issue_flags);
> > > > - fcmd->cmd = NULL;
> > > > + ublk_batch_free_fcmd(fcmd);
> > > > }
> > > >
> > > > static int ublk_batch_fetch_post_cqe(struct ublk_batch_fcmd *fcmd,
> > > > @@ -1491,6 +1553,8 @@ static int __ublk_batch_dispatch(struct ublk_queue *ubq,
> > > > bool needs_filter;
> > > > int ret;
> > > >
> > > > + WARN_ON_ONCE(data->cmd != fcmd->cmd);
> > > > +
> > > > sel = io_uring_cmd_buffer_select(fcmd->cmd, fcmd->buf_group, &len,
> > > > data->issue_flags);
> > > > if (sel.val < 0)
> > > > @@ -1548,23 +1612,94 @@ static int __ublk_batch_dispatch(struct ublk_queue *ubq,
> > > > return ret;
> > > > }
> > > >
> > > > -static __maybe_unused int
> > > > -ublk_batch_dispatch(struct ublk_queue *ubq,
> > > > - const struct ublk_batch_io_data *data,
> > > > - struct ublk_batch_fcmd *fcmd)
> > > > +static struct ublk_batch_fcmd *__ublk_acquire_fcmd(
> > > > + struct ublk_queue *ubq)
> > > > +{
> > > > + struct ublk_batch_fcmd *fcmd;
> > > > +
> > > > + lockdep_assert_held(&ubq->evts_lock);
> > > > +
> > > > + /*
> > > > + * Ordering updating ubq->evts_fifo and checking ubq->active_fcmd.
> > > > + *
> > > > + * The pair is the smp_mb() in ublk_batch_dispatch().
> > > > + *
> > > > + * If ubq->active_fcmd is observed as non-NULL, the new added tags
> > > > + * can be visisible in ublk_batch_dispatch() with the barrier pairing.
> > > > + */
> > > > + smp_mb();
> > > > + if (READ_ONCE(ubq->active_fcmd)) {
> > > > + fcmd = NULL;
> > > > + } else {
> > > > + fcmd = list_first_entry_or_null(&ubq->fcmd_head,
> > > > + struct ublk_batch_fcmd, node);
> > > > + WRITE_ONCE(ubq->active_fcmd, fcmd);
> > > > + }
> > > > + return fcmd;
> > > > +}
> > > > +
> > > > +static void ublk_batch_tw_cb(struct io_uring_cmd *cmd,
> > > > + unsigned int issue_flags)
> > > > +{
> > > > + struct ublk_uring_cmd_pdu *pdu = ublk_get_uring_cmd_pdu(cmd);
> > > > + struct ublk_batch_fcmd *fcmd = pdu->fcmd;
> > > > + struct ublk_batch_io_data data = {
> > > > + .ub = pdu->ubq->dev,
> > > > + .cmd = fcmd->cmd,
> > > > + .issue_flags = issue_flags,
> > > > + };
> > > > +
> > > > + WARN_ON_ONCE(pdu->ubq->active_fcmd != fcmd);
> > > > +
> > > > + ublk_batch_dispatch(pdu->ubq, &data, fcmd);
> > > > +}
> > > > +
> > > > +static void ublk_batch_dispatch(struct ublk_queue *ubq,
> > > > + struct ublk_batch_io_data *data,
> > > > + struct ublk_batch_fcmd *fcmd)
> > > > {
> > > > + struct ublk_batch_fcmd *new_fcmd;
> > >
> > > Is the new_fcmd variable necessary? Can fcmd be reused instead?
> > >
> > > > + void *handle;
> > > > + bool empty;
> > > > int ret = 0;
> > > >
> > > > +again:
> > > > while (!ublk_io_evts_empty(ubq)) {
> > > > ret = __ublk_batch_dispatch(ubq, data, fcmd);
> > > > if (ret <= 0)
> > > > break;
> > > > }
> > > >
> > > > - if (ret < 0)
> > > > - ublk_batch_deinit_fetch_buf(data, fcmd, ret);
> > > > + if (ret < 0) {
> > > > + ublk_batch_deinit_fetch_buf(ubq, data, fcmd, ret);
> > > > + return;
> > > > + }
> > > >
> > > > - return ret;
> > > > + handle = io_uring_cmd_ctx_handle(fcmd->cmd);
> > > > + __ublk_release_fcmd(ubq);
> > > > + /*
> > > > + * Order clearing ubq->active_fcmd from __ublk_release_fcmd() and
> > > > + * checking ubq->evts_fifo.
> > > > + *
> > > > + * The pair is the smp_mb() in __ublk_acquire_fcmd().
> > > > + */
> > > > + smp_mb();
> > > > + empty = ublk_io_evts_empty(ubq);
> > > > + if (likely(empty))
> > >
> > > nit: empty variable seems unnecessary
> > >
> > > > + return;
> > > > +
> > > > + spin_lock(&ubq->evts_lock);
> > > > + new_fcmd = __ublk_acquire_fcmd(ubq);
> > > > + spin_unlock(&ubq->evts_lock);
> > > > +
> > > > + if (!new_fcmd)
> > > > + return;
> > > > + if (handle == io_uring_cmd_ctx_handle(new_fcmd->cmd)) {
> > >
> > > This check seems to be meant to decide whether the new and old
> > > UBLK_U_IO_FETCH_IO_CMDS commands can execute in the same task work?
> >
> > Actually not.
> >
> > > But belonging to the same io_uring context doesn't necessarily mean
> > > that the same task issued them. It seems like it would be safer to
> > > always dispatch new_fcmd->cmd to task work.
> >
> > What matters is just that ctx->uring_lock & issue_flag matches from ublk
> > viewpoint, so it is safe to do so.
>
> Okay, that makes sense.
>
> >
> > However, given it is hit in slow path, so starting new dispatch
> > is easier.
>
> Yeah, I'd agree it makes sense to keep the unexpected path code
> simpler. There may also be fairness concerns from looping indefinitely
> here if the evts_fifo continues to be nonempty, so dispatching to task
> work seems safer.
Fair enough.
Thanks,
Ming
On Mon, Dec 1, 2025 at 5:27 PM Ming Lei <ming.lei@redhat.com> wrote:
>
> On Mon, Dec 01, 2025 at 09:51:59AM -0800, Caleb Sander Mateos wrote:
> > On Mon, Dec 1, 2025 at 1:42 AM Ming Lei <ming.lei@redhat.com> wrote:
> > >
> > > On Sun, Nov 30, 2025 at 09:55:47PM -0800, Caleb Sander Mateos wrote:
> > > > On Thu, Nov 20, 2025 at 6:00 PM Ming Lei <ming.lei@redhat.com> wrote:
> > > > >
> > > > > Add UBLK_U_IO_FETCH_IO_CMDS command to enable efficient batch processing
> > > > > of I/O requests. This multishot uring_cmd allows the ublk server to fetch
> > > > > multiple I/O commands in a single operation, significantly reducing
> > > > > submission overhead compared to individual FETCH_REQ* commands.
> > > > >
> > > > > Key Design Features:
> > > > >
> > > > > 1. Multishot Operation: One UBLK_U_IO_FETCH_IO_CMDS can fetch many I/O
> > > > > commands, with the batch size limited by the provided buffer length.
> > > > >
> > > > > 2. Dynamic Load Balancing: Multiple fetch commands can be submitted
> > > > > simultaneously, but only one is active at any time. This enables
> > > > > efficient load distribution across multiple server task contexts.
> > > > >
> > > > > 3. Implicit State Management: The implementation uses three key variables
> > > > > to track state:
> > > > > - evts_fifo: Queue of request tags awaiting processing
> > > > > - fcmd_head: List of available fetch commands
> > > > > - active_fcmd: Currently active fetch command (NULL = none active)
> > > > >
> > > > > States are derived implicitly:
> > > > > - IDLE: No fetch commands available
> > > > > - READY: Fetch commands available, none active
> > > > > - ACTIVE: One fetch command processing events
> > > > >
> > > > > 4. Lockless Reader Optimization: The active fetch command can read from
> > > > > evts_fifo without locking (single reader guarantee), while writers
> > > > > (ublk_queue_rq/ublk_queue_rqs) use evts_lock protection. The memory
> > > > > barrier pairing plays key role for the single lockless reader
> > > > > optimization.
> > > > >
> > > > > Implementation Details:
> > > > >
> > > > > - ublk_queue_rq() and ublk_queue_rqs() save request tags to evts_fifo
> > > > > - __ublk_pick_active_fcmd() selects an available fetch command when
> > > > > events arrive and no command is currently active
> > > >
> > > > What is __ublk_pick_active_fcmd()? I don't see a function with that name.
> > >
> > > It is renamed as __ublk_acquire_fcmd(), and its counter pair is
> > > __ublk_release_fcmd().
> >
> > Okay, update the commit message then?
> >
> > >
> > > >
> > > > > - ublk_batch_dispatch() moves tags from evts_fifo to the fetch command's
> > > > > buffer and posts completion via io_uring_mshot_cmd_post_cqe()
> > > > > - State transitions are coordinated via evts_lock to maintain consistency
> > > > >
> > > > > Signed-off-by: Ming Lei <ming.lei@redhat.com>
> > > > > ---
> > > > > drivers/block/ublk_drv.c | 412 +++++++++++++++++++++++++++++++---
> > > > > include/uapi/linux/ublk_cmd.h | 7 +
> > > > > 2 files changed, 388 insertions(+), 31 deletions(-)
> > > > >
> > > > > diff --git a/drivers/block/ublk_drv.c b/drivers/block/ublk_drv.c
> > > > > index cc9c92d97349..2e5e392c939e 100644
> > > > > --- a/drivers/block/ublk_drv.c
> > > > > +++ b/drivers/block/ublk_drv.c
> > > > > @@ -93,6 +93,7 @@
> > > > >
> > > > > /* ublk batch fetch uring_cmd */
> > > > > struct ublk_batch_fcmd {
> > > > > + struct list_head node;
> > > > > struct io_uring_cmd *cmd;
> > > > > unsigned short buf_group;
> > > > > };
> > > > > @@ -117,7 +118,10 @@ struct ublk_uring_cmd_pdu {
> > > > > */
> > > > > struct ublk_queue *ubq;
> > > > >
> > > > > - u16 tag;
> > > > > + union {
> > > > > + u16 tag;
> > > > > + struct ublk_batch_fcmd *fcmd; /* batch io only */
> > > > > + };
> > > > > };
> > > > >
> > > > > struct ublk_batch_io_data {
> > > > > @@ -229,18 +233,36 @@ struct ublk_queue {
> > > > > struct ublk_device *dev;
> > > > >
> > > > > /*
> > > > > - * Inflight ublk request tag is saved in this fifo
> > > > > + * Batch I/O State Management:
> > > > > + *
> > > > > + * The batch I/O system uses implicit state management based on the
> > > > > + * combination of three key variables below.
> > > > > + *
> > > > > + * - IDLE: list_empty(&fcmd_head) && !active_fcmd
> > > > > + * No fetch commands available, events queue in evts_fifo
> > > > > + *
> > > > > + * - READY: !list_empty(&fcmd_head) && !active_fcmd
> > > > > + * Fetch commands available but none processing events
> > > > > *
> > > > > - * There are multiple writer from ublk_queue_rq() or ublk_queue_rqs(),
> > > > > - * so lock is required for storing request tag to fifo
> > > > > + * - ACTIVE: active_fcmd
> > > > > + * One fetch command actively processing events from evts_fifo
> > > > > *
> > > > > - * Make sure just one reader for fetching request from task work
> > > > > - * function to ublk server, so no need to grab the lock in reader
> > > > > - * side.
> > > > > + * Key Invariants:
> > > > > + * - At most one active_fcmd at any time (single reader)
> > > > > + * - active_fcmd is always from fcmd_head list when non-NULL
> > > > > + * - evts_fifo can be read locklessly by the single active reader
> > > > > + * - All state transitions require evts_lock protection
> > > > > + * - Multiple writers to evts_fifo require lock protection
> > > > > */
> > > > > struct {
> > > > > DECLARE_KFIFO_PTR(evts_fifo, unsigned short);
> > > > > spinlock_t evts_lock;
> > > > > +
> > > > > + /* List of fetch commands available to process events */
> > > > > + struct list_head fcmd_head;
> > > > > +
> > > > > + /* Currently active fetch command (NULL = none active) */
> > > > > + struct ublk_batch_fcmd *active_fcmd;
> > > > > }____cacheline_aligned_in_smp;
> > > > >
> > > > > struct ublk_io ios[] __counted_by(q_depth);
> > > > > @@ -292,12 +314,20 @@ static void ublk_abort_queue(struct ublk_device *ub, struct ublk_queue *ubq);
> > > > > static inline struct request *__ublk_check_and_get_req(struct ublk_device *ub,
> > > > > u16 q_id, u16 tag, struct ublk_io *io, size_t offset);
> > > > > static inline unsigned int ublk_req_build_flags(struct request *req);
> > > > > +static void ublk_batch_dispatch(struct ublk_queue *ubq,
> > > > > + struct ublk_batch_io_data *data,
> > > > > + struct ublk_batch_fcmd *fcmd);
> > > > >
> > > > > static inline bool ublk_dev_support_batch_io(const struct ublk_device *ub)
> > > > > {
> > > > > return false;
> > > > > }
> > > > >
> > > > > +static inline bool ublk_support_batch_io(const struct ublk_queue *ubq)
> > > > > +{
> > > > > + return false;
> > > > > +}
> > > > > +
> > > > > static inline void ublk_io_lock(struct ublk_io *io)
> > > > > {
> > > > > spin_lock(&io->lock);
> > > > > @@ -624,13 +654,45 @@ static wait_queue_head_t ublk_idr_wq; /* wait until one idr is freed */
> > > > >
> > > > > static DEFINE_MUTEX(ublk_ctl_mutex);
> > > > >
> > > > > +static struct ublk_batch_fcmd *
> > > > > +ublk_batch_alloc_fcmd(struct io_uring_cmd *cmd)
> > > > > +{
> > > > > + struct ublk_batch_fcmd *fcmd = kzalloc(sizeof(*fcmd), GFP_NOIO);
> > > >
> > > > An allocation in the I/O path seems unfortunate. Is there not room to
> > > > store the struct ublk_batch_fcmd in the io_uring_cmd pdu?
> > >
> > > It is allocated once for one mshot request, which covers many IOs.
> > >
> > > It can't be held in uring_cmd pdu, but the allocation can be optimized in
> > > future. Not a big deal in enablement stage.
> >
> > Okay, seems fine to optimize it in the future.
> >
> > >
> > > > > +
> > > > > + if (fcmd) {
> > > > > + fcmd->cmd = cmd;
> > > > > + fcmd->buf_group = READ_ONCE(cmd->sqe->buf_index);
> > > >
> > > > Is it necessary to store sample this here just to pass it back to the
> > > > io_uring layer? Wouldn't the io_uring layer already have access to it
> > > > in struct io_kiocb's buf_index field?
> > >
> > > ->buf_group is used by io_uring_cmd_buffer_select(), and this way also
> > > follows ->buf_index uses in both io_uring/net.c and io_uring/rw.c.
> > >
> > >
> > > io_ring_buffer_select(), so we can't reuse req->buf_index here.
> >
> > But io_uring/net.c and io_uring/rw.c both retrieve the buf_group value
> > from req->buf_index instead of the SQE, for example:
> > if (req->flags & REQ_F_BUFFER_SELECT)
> > sr->buf_group = req->buf_index;
> >
> > Seems like it would make sense to do the same for
> > UBLK_U_IO_FETCH_IO_CMDS. That also saves one pointer dereference here.
>
> IMO we shouldn't encourage driver to access `io_kiocb`, however, cmd->sqe
> is exposed to driver explicitly.
Right, but we can add a helper in include/linux/io_uring/cmd.h to
encapsulate accessing the io_kiocb field.
Best,
Caleb
>
> >
> > >
> > > >
> > > > > + }
> > > > > + return fcmd;
> > > > > +}
> > > > > +
> > > > > +static void ublk_batch_free_fcmd(struct ublk_batch_fcmd *fcmd)
> > > > > +{
> > > > > + kfree(fcmd);
> > > > > +}
> > > > > +
> > > > > +static void __ublk_release_fcmd(struct ublk_queue *ubq)
> > > > > +{
> > > > > + WRITE_ONCE(ubq->active_fcmd, NULL);
> > > > > +}
> > > > >
> > > > > -static void ublk_batch_deinit_fetch_buf(const struct ublk_batch_io_data *data,
> > > > > +/*
> > > > > + * Nothing can move on, so clear ->active_fcmd, and the caller should stop
> > > > > + * dispatching
> > > > > + */
> > > > > +static void ublk_batch_deinit_fetch_buf(struct ublk_queue *ubq,
> > > > > + const struct ublk_batch_io_data *data,
> > > > > struct ublk_batch_fcmd *fcmd,
> > > > > int res)
> > > > > {
> > > > > + spin_lock(&ubq->evts_lock);
> > > > > + list_del(&fcmd->node);
> > > > > + WARN_ON_ONCE(fcmd != ubq->active_fcmd);
> > > > > + __ublk_release_fcmd(ubq);
> > > > > + spin_unlock(&ubq->evts_lock);
> > > > > +
> > > > > io_uring_cmd_done(fcmd->cmd, res, data->issue_flags);
> > > > > - fcmd->cmd = NULL;
> > > > > + ublk_batch_free_fcmd(fcmd);
> > > > > }
> > > > >
> > > > > static int ublk_batch_fetch_post_cqe(struct ublk_batch_fcmd *fcmd,
> > > > > @@ -1491,6 +1553,8 @@ static int __ublk_batch_dispatch(struct ublk_queue *ubq,
> > > > > bool needs_filter;
> > > > > int ret;
> > > > >
> > > > > + WARN_ON_ONCE(data->cmd != fcmd->cmd);
> > > > > +
> > > > > sel = io_uring_cmd_buffer_select(fcmd->cmd, fcmd->buf_group, &len,
> > > > > data->issue_flags);
> > > > > if (sel.val < 0)
> > > > > @@ -1548,23 +1612,94 @@ static int __ublk_batch_dispatch(struct ublk_queue *ubq,
> > > > > return ret;
> > > > > }
> > > > >
> > > > > -static __maybe_unused int
> > > > > -ublk_batch_dispatch(struct ublk_queue *ubq,
> > > > > - const struct ublk_batch_io_data *data,
> > > > > - struct ublk_batch_fcmd *fcmd)
> > > > > +static struct ublk_batch_fcmd *__ublk_acquire_fcmd(
> > > > > + struct ublk_queue *ubq)
> > > > > +{
> > > > > + struct ublk_batch_fcmd *fcmd;
> > > > > +
> > > > > + lockdep_assert_held(&ubq->evts_lock);
> > > > > +
> > > > > + /*
> > > > > + * Ordering updating ubq->evts_fifo and checking ubq->active_fcmd.
> > > > > + *
> > > > > + * The pair is the smp_mb() in ublk_batch_dispatch().
> > > > > + *
> > > > > + * If ubq->active_fcmd is observed as non-NULL, the new added tags
> > > > > + * can be visisible in ublk_batch_dispatch() with the barrier pairing.
> > > > > + */
> > > > > + smp_mb();
> > > > > + if (READ_ONCE(ubq->active_fcmd)) {
> > > > > + fcmd = NULL;
> > > > > + } else {
> > > > > + fcmd = list_first_entry_or_null(&ubq->fcmd_head,
> > > > > + struct ublk_batch_fcmd, node);
> > > > > + WRITE_ONCE(ubq->active_fcmd, fcmd);
> > > > > + }
> > > > > + return fcmd;
> > > > > +}
> > > > > +
> > > > > +static void ublk_batch_tw_cb(struct io_uring_cmd *cmd,
> > > > > + unsigned int issue_flags)
> > > > > +{
> > > > > + struct ublk_uring_cmd_pdu *pdu = ublk_get_uring_cmd_pdu(cmd);
> > > > > + struct ublk_batch_fcmd *fcmd = pdu->fcmd;
> > > > > + struct ublk_batch_io_data data = {
> > > > > + .ub = pdu->ubq->dev,
> > > > > + .cmd = fcmd->cmd,
> > > > > + .issue_flags = issue_flags,
> > > > > + };
> > > > > +
> > > > > + WARN_ON_ONCE(pdu->ubq->active_fcmd != fcmd);
> > > > > +
> > > > > + ublk_batch_dispatch(pdu->ubq, &data, fcmd);
> > > > > +}
> > > > > +
> > > > > +static void ublk_batch_dispatch(struct ublk_queue *ubq,
> > > > > + struct ublk_batch_io_data *data,
> > > > > + struct ublk_batch_fcmd *fcmd)
> > > > > {
> > > > > + struct ublk_batch_fcmd *new_fcmd;
> > > >
> > > > Is the new_fcmd variable necessary? Can fcmd be reused instead?
> > > >
> > > > > + void *handle;
> > > > > + bool empty;
> > > > > int ret = 0;
> > > > >
> > > > > +again:
> > > > > while (!ublk_io_evts_empty(ubq)) {
> > > > > ret = __ublk_batch_dispatch(ubq, data, fcmd);
> > > > > if (ret <= 0)
> > > > > break;
> > > > > }
> > > > >
> > > > > - if (ret < 0)
> > > > > - ublk_batch_deinit_fetch_buf(data, fcmd, ret);
> > > > > + if (ret < 0) {
> > > > > + ublk_batch_deinit_fetch_buf(ubq, data, fcmd, ret);
> > > > > + return;
> > > > > + }
> > > > >
> > > > > - return ret;
> > > > > + handle = io_uring_cmd_ctx_handle(fcmd->cmd);
> > > > > + __ublk_release_fcmd(ubq);
> > > > > + /*
> > > > > + * Order clearing ubq->active_fcmd from __ublk_release_fcmd() and
> > > > > + * checking ubq->evts_fifo.
> > > > > + *
> > > > > + * The pair is the smp_mb() in __ublk_acquire_fcmd().
> > > > > + */
> > > > > + smp_mb();
> > > > > + empty = ublk_io_evts_empty(ubq);
> > > > > + if (likely(empty))
> > > >
> > > > nit: empty variable seems unnecessary
> > > >
> > > > > + return;
> > > > > +
> > > > > + spin_lock(&ubq->evts_lock);
> > > > > + new_fcmd = __ublk_acquire_fcmd(ubq);
> > > > > + spin_unlock(&ubq->evts_lock);
> > > > > +
> > > > > + if (!new_fcmd)
> > > > > + return;
> > > > > + if (handle == io_uring_cmd_ctx_handle(new_fcmd->cmd)) {
> > > >
> > > > This check seems to be meant to decide whether the new and old
> > > > UBLK_U_IO_FETCH_IO_CMDS commands can execute in the same task work?
> > >
> > > Actually not.
> > >
> > > > But belonging to the same io_uring context doesn't necessarily mean
> > > > that the same task issued them. It seems like it would be safer to
> > > > always dispatch new_fcmd->cmd to task work.
> > >
> > > What matters is just that ctx->uring_lock & issue_flag matches from ublk
> > > viewpoint, so it is safe to do so.
> >
> > Okay, that makes sense.
> >
> > >
> > > However, given it is hit in slow path, so starting new dispatch
> > > is easier.
> >
> > Yeah, I'd agree it makes sense to keep the unexpected path code
> > simpler. There may also be fairness concerns from looping indefinitely
> > here if the evts_fifo continues to be nonempty, so dispatching to task
> > work seems safer.
>
> Fair enough.
>
> Thanks,
> Ming
>
© 2016 - 2025 Red Hat, Inc.