This patch implements the CQE handler for FUSE-over-io_uring. Upon
receiving a FUSE request via a Completion Queue Entry (CQE), the
handler processes the request and submits the response back to the
kernel via the FUSE_IO_URING_CMD_COMMIT_AND_FETCH command.
Additionally, the request processing logic shared between legacy and
io_uring modes has been extracted into fuse_co_process_request_common().
The execution flow now dispatches requests to the appropriate
mode-specific logic based on the uring_started flag.
Suggested-by: Kevin Wolf <kwolf@redhat.com>
Suggested-by: Stefan Hajnoczi <stefanha@redhat.com>
Signed-off-by: Brian Song <hibriansong@gmail.com>
---
block/export/fuse.c | 400 +++++++++++++++++++++++++++++++++-----------
1 file changed, 301 insertions(+), 99 deletions(-)
diff --git a/block/export/fuse.c b/block/export/fuse.c
index 867752555a..c117e081cd 100644
--- a/block/export/fuse.c
+++ b/block/export/fuse.c
@@ -138,8 +138,8 @@ struct FuseQueue {
* FUSE_MIN_READ_BUFFER (from linux/fuse.h) bytes.
* This however is just the first part of the buffer; every read is given
* a vector of this buffer (which should be enough for all normal requests,
- * which we check via the static assertion in FUSE_IN_OP_STRUCT()) and the
- * spill-over buffer below.
+ * which we check via the static assertion in FUSE_IN_OP_STRUCT_LEGACY())
+ * and the spill-over buffer below.
* Therefore, the size of this buffer plus FUSE_SPILLOVER_BUF_SIZE must be
* FUSE_MIN_READ_BUFFER or more (checked via static assertion below).
*/
@@ -912,6 +912,7 @@ static void coroutine_fn co_read_from_fuse_fd(void *opaque)
}
fuse_co_process_request(q, spillover_buf);
+ qemu_vfree(spillover_buf);
no_request:
fuse_dec_in_flight(exp);
@@ -1684,100 +1685,75 @@ static int fuse_write_buf_response(int fd, uint32_t req_id,
}
/*
- * For use in fuse_co_process_request():
+ * For use in fuse_co_process_request_common():
* Returns a pointer to the parameter object for the given operation (inside of
- * queue->request_buf, which is assumed to hold a fuse_in_header first).
- * Verifies that the object is complete (queue->request_buf is large enough to
- * hold it in one piece, and the request length includes the whole object).
+ * in_buf, which is assumed to hold a fuse_in_header first).
+ * Verifies that the object is complete (in_buf is large enough to hold it in
+ * one piece, and the request length includes the whole object).
+ * Only performs verification for legacy FUSE.
*
* Note that queue->request_buf may be overwritten after yielding, so the
* returned pointer must not be used across a function that may yield!
*/
-#define FUSE_IN_OP_STRUCT(op_name, queue) \
+#define FUSE_IN_OP_STRUCT_LEGACY(op_name, queue) \
({ \
const struct fuse_in_header *__in_hdr = \
(const struct fuse_in_header *)(queue)->request_buf; \
const struct fuse_##op_name##_in *__in = \
(const struct fuse_##op_name##_in *)(__in_hdr + 1); \
const size_t __param_len = sizeof(*__in_hdr) + sizeof(*__in); \
- uint32_t __req_len; \
\
- QEMU_BUILD_BUG_ON(sizeof((queue)->request_buf) < __param_len); \
+ QEMU_BUILD_BUG_ON(sizeof((queue)->request_buf) < \
+ (sizeof(struct fuse_in_header) + \
+ sizeof(struct fuse_##op_name##_in))); \
\
- __req_len = __in_hdr->len; \
+ uint32_t __req_len = __in_hdr->len; \
if (__req_len < __param_len) { \
warn_report("FUSE request truncated (%" PRIu32 " < %zu)", \
__req_len, __param_len); \
ret = -EINVAL; \
- break; \
+ __in = NULL; \
} \
__in; \
})
/*
- * For use in fuse_co_process_request():
+ * For use in fuse_co_process_request_common():
* Returns a pointer to the return object for the given operation (inside of
* out_buf, which is assumed to hold a fuse_out_header first).
- * Verifies that out_buf is large enough to hold the whole object.
+ * Only performs verification for legacy FUSE.
+ * Note: Buffer size verification is done via static assertions in the caller
+ * (fuse_co_process_request) where out_buf is a local array.
*
- * (out_buf should be a char[] array.)
+ * (out_buf should be a char[] array in the caller.)
*/
-#define FUSE_OUT_OP_STRUCT(op_name, out_buf) \
+#define FUSE_OUT_OP_STRUCT_LEGACY(op_name, out_buf) \
({ \
struct fuse_out_header *__out_hdr = \
(struct fuse_out_header *)(out_buf); \
struct fuse_##op_name##_out *__out = \
(struct fuse_##op_name##_out *)(__out_hdr + 1); \
\
- QEMU_BUILD_BUG_ON(sizeof(*__out_hdr) + sizeof(*__out) > \
- sizeof(out_buf)); \
- \
__out; \
})
/**
- * Process a FUSE request, incl. writing the response.
- *
- * Note that yielding in any request-processing function can overwrite the
- * contents of q->request_buf. Anything that takes a buffer needs to take
- * care that the content is copied before yielding.
- *
- * @spillover_buf can contain the tail of a write request too large to fit into
- * q->request_buf. This function takes ownership of it (i.e. will free it),
- * which assumes that its contents will not be overwritten by concurrent
- * requests (as opposed to q->request_buf).
+ * Shared helper for FUSE request processing. Handles both legacy and io_uring
+ * paths.
*/
-static void coroutine_fn
-fuse_co_process_request(FuseQueue *q, void *spillover_buf)
+static void coroutine_fn fuse_co_process_request_common(
+ FuseExport *exp,
+ uint32_t opcode,
+ uint64_t req_id,
+ void *in_buf,
+ void *spillover_buf,
+ void *out_buf,
+ void (*send_response)(void *opaque, uint32_t req_id, int ret,
+ const void *buf, void *out_buf),
+ void *opaque /* FuseQueue* or FuseUringEnt* */)
{
- FuseExport *exp = q->exp;
- uint32_t opcode;
- uint64_t req_id;
- /*
- * Return buffer. Must be large enough to hold all return headers, but does
- * not include space for data returned by read requests.
- * (FUSE_IN_OP_STRUCT() verifies at compile time that out_buf is indeed
- * large enough.)
- */
- char out_buf[sizeof(struct fuse_out_header) +
- MAX_CONST(sizeof(struct fuse_init_out),
- MAX_CONST(sizeof(struct fuse_open_out),
- MAX_CONST(sizeof(struct fuse_attr_out),
- MAX_CONST(sizeof(struct fuse_write_out),
- sizeof(struct fuse_lseek_out)))))];
- struct fuse_out_header *out_hdr = (struct fuse_out_header *)out_buf;
- /* For read requests: Data to be returned */
void *out_data_buffer = NULL;
- ssize_t ret;
-
- /* Limit scope to ensure pointer is no longer used after yielding */
- {
- const struct fuse_in_header *in_hdr =
- (const struct fuse_in_header *)q->request_buf;
-
- opcode = in_hdr->opcode;
- req_id = in_hdr->unique;
- }
+ int ret = 0;
#ifdef CONFIG_LINUX_IO_URING
/*
@@ -1794,15 +1770,32 @@ fuse_co_process_request(FuseQueue *q, void *spillover_buf)
switch (opcode) {
case FUSE_INIT: {
- const struct fuse_init_in *in = FUSE_IN_OP_STRUCT(init, q);
- ret = fuse_co_init(exp, FUSE_OUT_OP_STRUCT(init, out_buf),
- in->max_readahead, in);
+ FuseQueue *q = opaque;
+ const struct fuse_init_in *in =
+ FUSE_IN_OP_STRUCT_LEGACY(init, q);
+ if (!in) {
+ break;
+ }
+
+ struct fuse_init_out *out =
+ FUSE_OUT_OP_STRUCT_LEGACY(init, out_buf);
+
+ ret = fuse_co_init(exp, out, in->max_readahead, in);
break;
}
- case FUSE_OPEN:
- ret = fuse_co_open(exp, FUSE_OUT_OP_STRUCT(open, out_buf));
+ case FUSE_OPEN: {
+ struct fuse_open_out *out;
+
+ if (exp->uring_started) {
+ out = out_buf;
+ } else {
+ out = FUSE_OUT_OP_STRUCT_LEGACY(open, out_buf);
+ }
+
+ ret = fuse_co_open(exp, out);
break;
+ }
case FUSE_RELEASE:
ret = 0;
@@ -1812,37 +1805,105 @@ fuse_co_process_request(FuseQueue *q, void *spillover_buf)
ret = -ENOENT; /* There is no node but the root node */
break;
- case FUSE_GETATTR:
- ret = fuse_co_getattr(exp, FUSE_OUT_OP_STRUCT(attr, out_buf));
+ case FUSE_GETATTR: {
+ struct fuse_attr_out *out;
+
+ if (exp->uring_started) {
+ out = out_buf;
+ } else {
+ out = FUSE_OUT_OP_STRUCT_LEGACY(attr, out_buf);
+ }
+
+ ret = fuse_co_getattr(exp, out);
break;
+ }
case FUSE_SETATTR: {
- const struct fuse_setattr_in *in = FUSE_IN_OP_STRUCT(setattr, q);
- ret = fuse_co_setattr(exp, FUSE_OUT_OP_STRUCT(attr, out_buf),
- in->valid, in->size, in->mode, in->uid, in->gid);
+ const struct fuse_setattr_in *in;
+ struct fuse_attr_out *out;
+
+ if (exp->uring_started) {
+ in = in_buf;
+ out = out_buf;
+ } else {
+ FuseQueue *q = opaque;
+ in = FUSE_IN_OP_STRUCT_LEGACY(setattr, q);
+ if (!in) {
+ break;
+ }
+
+ out = FUSE_OUT_OP_STRUCT_LEGACY(attr, out_buf);
+ }
+
+ ret = fuse_co_setattr(exp, out, in->valid, in->size, in->mode,
+ in->uid, in->gid);
break;
}
case FUSE_READ: {
- const struct fuse_read_in *in = FUSE_IN_OP_STRUCT(read, q);
+ const struct fuse_read_in *in;
+
+ if (exp->uring_started) {
+ in = in_buf;
+ } else {
+ FuseQueue *q = opaque;
+ in = FUSE_IN_OP_STRUCT_LEGACY(read, q);
+ if (!in) {
+ break;
+ }
+ }
+
ret = fuse_co_read(exp, &out_data_buffer, in->offset, in->size);
break;
}
case FUSE_WRITE: {
- const struct fuse_write_in *in = FUSE_IN_OP_STRUCT(write, q);
- uint32_t req_len;
-
- req_len = ((const struct fuse_in_header *)q->request_buf)->len;
- if (unlikely(req_len < sizeof(struct fuse_in_header) + sizeof(*in) +
- in->size)) {
- warn_report("FUSE WRITE truncated; received %zu bytes of %" PRIu32,
- req_len - sizeof(struct fuse_in_header) - sizeof(*in),
- in->size);
- ret = -EINVAL;
- break;
- }
+ const struct fuse_write_in *in;
+ struct fuse_write_out *out;
+ const void *in_place_buf;
+ const void *spill_buf;
+
+ if (exp->uring_started) {
+ FuseUringEnt *ent = opaque;
+
+ in = in_buf;
+ out = out_buf;
+
+ assert(in->size <= ent->req_header.ring_ent_in_out.payload_sz);
+ /*
+ * In uring mode, the "out_buf" (ent->payload) actually holds the
+ * input data for WRITE requests.
+ */
+ in_place_buf = NULL;
+ spill_buf = out_buf;
+ } else {
+ FuseQueue *q = opaque;
+ in = FUSE_IN_OP_STRUCT_LEGACY(write, q);
+ if (!in) {
+ break;
+ }
+
+ out = FUSE_OUT_OP_STRUCT_LEGACY(write, out_buf);
+
+ /* Additional check for WRITE: verify the request includes data */
+ uint32_t req_len =
+ ((const struct fuse_in_header *)(q->request_buf))->len;
+
+ if (unlikely(req_len < sizeof(struct fuse_in_header) + sizeof(*in) +
+ in->size)) {
+ warn_report("FUSE WRITE truncated; received %zu bytes of %"
+ PRIu32,
+ req_len - sizeof(struct fuse_in_header) - sizeof(*in),
+ in->size);
+ ret = -EINVAL;
+ break;
+ }
+
+ /* Legacy buffer setup */
+ in_place_buf = in + 1;
+ spill_buf = spillover_buf;
+ }
/*
* poll_fuse_fd() has checked that in_hdr->len matches the number of
* bytes read, which cannot exceed the max_write value we set
@@ -1856,13 +1917,24 @@ fuse_co_process_request(FuseQueue *q, void *spillover_buf)
* fuse_co_write() takes care to copy its contents before potentially
* yielding.
*/
- ret = fuse_co_write(exp, FUSE_OUT_OP_STRUCT(write, out_buf),
- in->offset, in->size, in + 1, spillover_buf);
+ ret = fuse_co_write(exp, out, in->offset, in->size,
+ in_place_buf, spill_buf);
break;
}
case FUSE_FALLOCATE: {
- const struct fuse_fallocate_in *in = FUSE_IN_OP_STRUCT(fallocate, q);
+ const struct fuse_fallocate_in *in;
+
+ if (exp->uring_started) {
+ in = in_buf;
+ } else {
+ FuseQueue *q = opaque;
+ in = FUSE_IN_OP_STRUCT_LEGACY(fallocate, q);
+ if (!in) {
+ break;
+ }
+ }
+
ret = fuse_co_fallocate(exp, in->offset, in->length, in->mode);
break;
}
@@ -1877,9 +1949,23 @@ fuse_co_process_request(FuseQueue *q, void *spillover_buf)
#ifdef CONFIG_FUSE_LSEEK
case FUSE_LSEEK: {
- const struct fuse_lseek_in *in = FUSE_IN_OP_STRUCT(lseek, q);
- ret = fuse_co_lseek(exp, FUSE_OUT_OP_STRUCT(lseek, out_buf),
- in->offset, in->whence);
+ const struct fuse_lseek_in *in;
+ struct fuse_lseek_out *out;
+
+ if (exp->uring_started) {
+ in = in_buf;
+ out = out_buf;
+ } else {
+ FuseQueue *q = opaque;
+ in = FUSE_IN_OP_STRUCT_LEGACY(lseek, q);
+ if (!in) {
+ break;
+ }
+
+ out = FUSE_OUT_OP_STRUCT_LEGACY(lseek, out_buf);
+ }
+
+ ret = fuse_co_lseek(exp, out, in->offset, in->whence);
break;
}
#endif
@@ -1888,20 +1974,12 @@ fuse_co_process_request(FuseQueue *q, void *spillover_buf)
ret = -ENOSYS;
}
- /* Ignore errors from fuse_write*(), nothing we can do anyway */
+ send_response(opaque, req_id, ret, out_data_buffer, out_buf);
+
if (out_data_buffer) {
- assert(ret >= 0);
- fuse_write_buf_response(q->fuse_fd, req_id, out_hdr,
- out_data_buffer, ret);
qemu_vfree(out_data_buffer);
- } else {
- fuse_write_response(q->fuse_fd, req_id, out_hdr,
- ret < 0 ? ret : 0,
- ret < 0 ? 0 : ret);
}
- qemu_vfree(spillover_buf);
-
#ifdef CONFIG_LINUX_IO_URING
if (unlikely(opcode == FUSE_INIT) && uring_initially_enabled) {
if (exp->is_uring && !exp->uring_started) {
@@ -1910,7 +1988,8 @@ fuse_co_process_request(FuseQueue *q, void *spillover_buf)
* If io_uring mode was requested for this export but it has not
* been started yet, start it now.
*/
- struct fuse_init_out *out = FUSE_OUT_OP_STRUCT(init, out_buf);
+ struct fuse_init_out *out =
+ FUSE_OUT_OP_STRUCT_LEGACY(init, out_buf);
fuse_uring_start(exp, out);
} else if (ret == -EOPNOTSUPP) {
/*
@@ -1923,12 +2002,135 @@ fuse_co_process_request(FuseQueue *q, void *spillover_buf)
}
#endif
}
+/* Helper to send response for legacy */
+static void send_response_legacy(void *opaque, uint32_t req_id, int ret,
+ const void *buf, void *out_buf)
+{
+ FuseQueue *q = (FuseQueue *)opaque;
+ struct fuse_out_header *out_hdr = (struct fuse_out_header *)out_buf;
+ if (buf) {
+ assert(ret >= 0);
+ fuse_write_buf_response(q->fuse_fd, req_id, out_hdr, buf, ret);
+ } else {
+ fuse_write_response(q->fuse_fd, req_id, out_hdr,
+ ret < 0 ? ret : 0,
+ ret < 0 ? 0 : ret);
+ }
+}
+
+static void coroutine_fn
+fuse_co_process_request(FuseQueue *q, void *spillover_buf)
+{
+ FuseExport *exp = q->exp;
+ uint32_t opcode;
+ uint64_t req_id;
+
+ /*
+ * Return buffer. Must be large enough to hold all return headers, but does
+ * not include space for data returned by read requests.
+ */
+ char out_buf[sizeof(struct fuse_out_header) +
+ MAX_CONST(sizeof(struct fuse_init_out),
+ MAX_CONST(sizeof(struct fuse_open_out),
+ MAX_CONST(sizeof(struct fuse_attr_out),
+ MAX_CONST(sizeof(struct fuse_write_out),
+ sizeof(struct fuse_lseek_out)))))] = {0};
+
+ /* Verify that out_buf is large enough for all output structures */
+ QEMU_BUILD_BUG_ON(sizeof(struct fuse_out_header) +
+ sizeof(struct fuse_init_out) > sizeof(out_buf));
+ QEMU_BUILD_BUG_ON(sizeof(struct fuse_out_header) +
+ sizeof(struct fuse_open_out) > sizeof(out_buf));
+ QEMU_BUILD_BUG_ON(sizeof(struct fuse_out_header) +
+ sizeof(struct fuse_attr_out) > sizeof(out_buf));
+ QEMU_BUILD_BUG_ON(sizeof(struct fuse_out_header) +
+ sizeof(struct fuse_write_out) > sizeof(out_buf));
+#ifdef CONFIG_FUSE_LSEEK
+ QEMU_BUILD_BUG_ON(sizeof(struct fuse_out_header) +
+ sizeof(struct fuse_lseek_out) > sizeof(out_buf));
+#endif
+
+ /* Limit scope to ensure pointer is no longer used after yielding */
+ {
+ const struct fuse_in_header *in_hdr =
+ (const struct fuse_in_header *)q->request_buf;
+
+ opcode = in_hdr->opcode;
+ req_id = in_hdr->unique;
+ }
+
+ fuse_co_process_request_common(exp, opcode, req_id, NULL, spillover_buf,
+ out_buf, send_response_legacy, q);
+}
#ifdef CONFIG_LINUX_IO_URING
+static void fuse_uring_prep_sqe_commit(struct io_uring_sqe *sqe, void *opaque)
+{
+ FuseUringEnt *ent = opaque;
+ struct fuse_uring_cmd_req *req = (void *)&sqe->cmd[0];
+
+ ent->last_cmd = FUSE_IO_URING_CMD_COMMIT_AND_FETCH;
+
+ fuse_uring_sqe_prepare(sqe, ent->rq->q, ent->last_cmd);
+ fuse_uring_sqe_set_req_data(req, ent->rq->rqid, ent->req_commit_id);
+}
+
+static void
+fuse_uring_send_response(FuseUringEnt *ent, uint32_t req_id, int ret,
+ const void *out_data_buffer)
+{
+ FuseExport *exp = ent->rq->q->exp;
+
+ struct fuse_uring_req_header *rrh = &ent->req_header;
+ struct fuse_out_header *out_header = (struct fuse_out_header *)&rrh->in_out;
+ struct fuse_uring_ent_in_out *ent_in_out =
+ (struct fuse_uring_ent_in_out *)&rrh->ring_ent_in_out;
+
+ /* FUSE_READ */
+ if (out_data_buffer && ret > 0) {
+ memcpy(ent->req_payload, out_data_buffer, ret);
+ }
+
+ out_header->error = ret < 0 ? ret : 0;
+ out_header->unique = req_id;
+ ent_in_out->payload_sz = ret > 0 ? ret : 0;
+
+ /* Commit and fetch a uring entry */
+ blk_exp_ref(&exp->common);
+ aio_add_sqe(fuse_uring_prep_sqe_commit, ent, &ent->fuse_cqe_handler);
+}
+
+/* Helper to send response for uring */
+static void send_response_uring(void *opaque, uint32_t req_id, int ret,
+ const void *out_data_buffer, void *payload)
+{
+ FuseUringEnt *ent = (FuseUringEnt *)opaque;
+
+ fuse_uring_send_response(ent, req_id, ret, out_data_buffer);
+}
+
static void coroutine_fn fuse_uring_co_process_request(FuseUringEnt *ent)
{
- /* TODO */
- (void)ent;
+ FuseExport *exp = ent->rq->q->exp;
+ struct fuse_uring_req_header *rrh = &ent->req_header;
+ struct fuse_uring_ent_in_out *ent_in_out =
+ (struct fuse_uring_ent_in_out *)&rrh->ring_ent_in_out;
+ struct fuse_in_header *in_hdr =
+ (struct fuse_in_header *)&rrh->in_out;
+ uint32_t opcode = in_hdr->opcode;
+ uint64_t req_id = in_hdr->unique;
+
+ ent->req_commit_id = ent_in_out->commit_id;
+
+ if (unlikely(ent->req_commit_id == 0)) {
+ error_report("If this happens kernel will not find the response - "
+ "it will be stuck forever - better to abort immediately.");
+ fuse_export_halt(exp);
+ return;
+ }
+
+ fuse_co_process_request_common(exp, opcode, req_id, &rrh->op_in,
+ NULL, ent->req_payload, send_response_uring, ent);
}
#endif /* CONFIG_LINUX_IO_URING */
--
2.43.0
© 2016 - 2026 Red Hat, Inc.