https://docs.kernel.org/filesystems/fuse-io-uring.html
As described in the kernel documentation, after FUSE-over-io_uring
initialization and handshake, FUSE interacts with the kernel using
SQE/CQE to send requests and receive responses. This corresponds to
the "Sending requests with CQEs" section in the docs.
This patch implements three key parts: registering the CQE handler
(fuse_uring_cqe_handler), processing FUSE requests (fuse_uring_co_
process_request), and sending response results (fuse_uring_send_
response). It also merges the traditional /dev/fuse request handling
with the FUSE-over-io_uring handling functions.
Suggested-by: Kevin Wolf <kwolf@redhat.com>
Suggested-by: Stefan Hajnoczi <stefanha@redhat.com>
Signed-off-by: Brian Song <hibriansong@gmail.com>
---
block/export/fuse.c | 457 ++++++++++++++++++++++++++++++--------------
1 file changed, 309 insertions(+), 148 deletions(-)
diff --git a/block/export/fuse.c b/block/export/fuse.c
index 19bf9e5f74..07f74fc8ec 100644
--- a/block/export/fuse.c
+++ b/block/export/fuse.c
@@ -310,6 +310,47 @@ static const BlockDevOps fuse_export_blk_dev_ops = {
};
#ifdef CONFIG_LINUX_IO_URING
+static void coroutine_fn fuse_uring_co_process_request(FuseRingEnt *ent);
+
+static void coroutine_fn co_fuse_uring_queue_handle_cqes(void *opaque)
+{
+ FuseRingEnt *ent = opaque;
+ FuseExport *exp = ent->rq->q->exp;
+
+ /* Going to process requests */
+ fuse_inc_in_flight(exp);
+
+ /* A ring entry returned */
+ fuse_uring_co_process_request(ent);
+
+ /* Finished processing requests */
+ fuse_dec_in_flight(exp);
+}
+
+static void fuse_uring_cqe_handler(CqeHandler *cqe_handler)
+{
+ FuseRingEnt *ent = container_of(cqe_handler, FuseRingEnt, fuse_cqe_handler);
+ Coroutine *co;
+ FuseExport *exp = ent->rq->q->exp;
+
+ if (unlikely(exp->halted)) {
+ return;
+ }
+
+ int err = cqe_handler->cqe.res;
+
+ if (err != 0) {
+ /* -ENOTCONN is ok on umount */
+ if (err != -EINTR && err != -EAGAIN &&
+ err != -ENOTCONN) {
+ fuse_export_halt(exp);
+ }
+ } else {
+ co = qemu_coroutine_create(co_fuse_uring_queue_handle_cqes, ent);
+ qemu_coroutine_enter(co);
+ }
+}
+
static void fuse_uring_sqe_set_req_data(struct fuse_uring_cmd_req *req,
const unsigned int rqid,
const unsigned int commit_id)
@@ -1213,6 +1254,9 @@ fuse_co_read(FuseExport *exp, void **bufptr, uint64_t offset, uint32_t size)
* Data in @in_place_buf is assumed to be overwritten after yielding, so will
* be copied to a bounce buffer beforehand. @spillover_buf in contrast is
* assumed to be exclusively owned and will be used as-is.
+ * In FUSE-over-io_uring mode, the actual op_payload content is stored in
+ * @spillover_buf. To ensure this buffer is used for writing, @in_place_buf
+ * is explicitly set to NULL.
* Return the number of bytes written to *out on success, and -errno on error.
*/
static ssize_t coroutine_fn
@@ -1220,8 +1264,8 @@ fuse_co_write(FuseExport *exp, struct fuse_write_out *out,
uint64_t offset, uint32_t size,
const void *in_place_buf, const void *spillover_buf)
{
- size_t in_place_size;
- void *copied;
+ size_t in_place_size = 0;
+ void *copied = NULL;
int64_t blk_len;
int ret;
struct iovec iov[2];
@@ -1236,10 +1280,12 @@ fuse_co_write(FuseExport *exp, struct fuse_write_out *out,
return -EACCES;
}
- /* Must copy to bounce buffer before potentially yielding */
- in_place_size = MIN(size, FUSE_IN_PLACE_WRITE_BYTES);
- copied = blk_blockalign(exp->common.blk, in_place_size);
- memcpy(copied, in_place_buf, in_place_size);
+ if (in_place_buf) {
+ /* Must copy to bounce buffer before potentially yielding */
+ in_place_size = MIN(size, FUSE_IN_PLACE_WRITE_BYTES);
+ copied = blk_blockalign(exp->common.blk, in_place_size);
+ memcpy(copied, in_place_buf, in_place_size);
+ }
/**
* Clients will expect short writes at EOF, so we have to limit
@@ -1263,26 +1309,38 @@ fuse_co_write(FuseExport *exp, struct fuse_write_out *out,
}
}
- iov[0] = (struct iovec) {
- .iov_base = copied,
- .iov_len = in_place_size,
- };
- if (size > FUSE_IN_PLACE_WRITE_BYTES) {
- assert(size - FUSE_IN_PLACE_WRITE_BYTES <= FUSE_SPILLOVER_BUF_SIZE);
- iov[1] = (struct iovec) {
- .iov_base = (void *)spillover_buf,
- .iov_len = size - FUSE_IN_PLACE_WRITE_BYTES,
+ if (in_place_buf) {
+ iov[0] = (struct iovec) {
+ .iov_base = copied,
+ .iov_len = in_place_size,
};
- qemu_iovec_init_external(&qiov, iov, 2);
+ if (size > FUSE_IN_PLACE_WRITE_BYTES) {
+ assert(size - FUSE_IN_PLACE_WRITE_BYTES <= FUSE_SPILLOVER_BUF_SIZE);
+ iov[1] = (struct iovec) {
+ .iov_base = (void *)spillover_buf,
+ .iov_len = size - FUSE_IN_PLACE_WRITE_BYTES,
+ };
+ qemu_iovec_init_external(&qiov, iov, 2);
+ } else {
+ qemu_iovec_init_external(&qiov, iov, 1);
+ }
} else {
+ /* fuse over io_uring */
+ iov[0] = (struct iovec) {
+ .iov_base = (void *)spillover_buf,
+ .iov_len = size,
+ };
qemu_iovec_init_external(&qiov, iov, 1);
}
+
ret = blk_co_pwritev(exp->common.blk, offset, size, &qiov, 0);
if (ret < 0) {
goto fail_free_buffer;
}
- qemu_vfree(copied);
+ if (in_place_buf) {
+ qemu_vfree(copied);
+ }
*out = (struct fuse_write_out) {
.size = size,
@@ -1290,7 +1348,9 @@ fuse_co_write(FuseExport *exp, struct fuse_write_out *out,
return sizeof(*out);
fail_free_buffer:
- qemu_vfree(copied);
+ if (in_place_buf) {
+ qemu_vfree(copied);
+ }
return ret;
}
@@ -1578,173 +1638,151 @@ static int fuse_write_buf_response(int fd, uint32_t req_id,
}
}
-/*
- * For use in fuse_co_process_request():
- * Returns a pointer to the parameter object for the given operation (inside of
- * queue->request_buf, which is assumed to hold a fuse_in_header first).
- * Verifies that the object is complete (queue->request_buf is large enough to
- * hold it in one piece, and the request length includes the whole object).
- *
- * Note that queue->request_buf may be overwritten after yielding, so the
- * returned pointer must not be used across a function that may yield!
- */
-#define FUSE_IN_OP_STRUCT(op_name, queue) \
+#define FUSE_IN_OP_STRUCT_LEGACY(in_buf) \
({ \
- const struct fuse_in_header *__in_hdr = \
- (const struct fuse_in_header *)(queue)->request_buf; \
- const struct fuse_##op_name##_in *__in = \
- (const struct fuse_##op_name##_in *)(__in_hdr + 1); \
- const size_t __param_len = sizeof(*__in_hdr) + sizeof(*__in); \
- uint32_t __req_len; \
- \
- QEMU_BUILD_BUG_ON(sizeof((queue)->request_buf) < __param_len); \
- \
- __req_len = __in_hdr->len; \
- if (__req_len < __param_len) { \
- warn_report("FUSE request truncated (%" PRIu32 " < %zu)", \
- __req_len, __param_len); \
- ret = -EINVAL; \
- break; \
- } \
- __in; \
+ (void *)(((struct fuse_in_header *)in_buf) + 1); \
})
-/*
- * For use in fuse_co_process_request():
- * Returns a pointer to the return object for the given operation (inside of
- * out_buf, which is assumed to hold a fuse_out_header first).
- * Verifies that out_buf is large enough to hold the whole object.
- *
- * (out_buf should be a char[] array.)
- */
-#define FUSE_OUT_OP_STRUCT(op_name, out_buf) \
+#define FUSE_OUT_OP_STRUCT_LEGACY(out_buf) \
({ \
- struct fuse_out_header *__out_hdr = \
- (struct fuse_out_header *)(out_buf); \
- struct fuse_##op_name##_out *__out = \
- (struct fuse_##op_name##_out *)(__out_hdr + 1); \
- \
- QEMU_BUILD_BUG_ON(sizeof(*__out_hdr) + sizeof(*__out) > \
- sizeof(out_buf)); \
- \
- __out; \
+ (void *)(((struct fuse_out_header *)out_buf) + 1); \
})
-/**
- * Process a FUSE request, incl. writing the response.
- *
- * Note that yielding in any request-processing function can overwrite the
- * contents of q->request_buf. Anything that takes a buffer needs to take
- * care that the content is copied before yielding.
- *
- * @spillover_buf can contain the tail of a write request too large to fit into
- * q->request_buf. This function takes ownership of it (i.e. will free it),
- * which assumes that its contents will not be overwritten by concurrent
- * requests (as opposed to q->request_buf).
+
+/*
+ * Shared helper for FUSE request processing. Handles both legacy and io_uring
+ * paths.
*/
-static void coroutine_fn
-fuse_co_process_request(FuseQueue *q, void *spillover_buf)
+static void coroutine_fn fuse_co_process_request_common(
+ FuseExport *exp,
+ uint32_t opcode,
+ uint64_t req_id,
+ void *in_buf,
+ void *spillover_buf,
+ void *out_buf,
+ int fd, /* -1 for uring */
+ void (*send_response)(void *opaque, uint32_t req_id, ssize_t ret,
+ const void *buf, void *out_buf),
+ void *opaque /* FuseQueue* or FuseRingEnt* */)
{
- FuseExport *exp = q->exp;
- uint32_t opcode;
- uint64_t req_id;
- /*
- * Return buffer. Must be large enough to hold all return headers, but does
- * not include space for data returned by read requests.
- * (FUSE_IN_OP_STRUCT() verifies at compile time that out_buf is indeed
- * large enough.)
- */
- char out_buf[sizeof(struct fuse_out_header) +
- MAX_CONST(sizeof(struct fuse_init_out),
- MAX_CONST(sizeof(struct fuse_open_out),
- MAX_CONST(sizeof(struct fuse_attr_out),
- MAX_CONST(sizeof(struct fuse_write_out),
- sizeof(struct fuse_lseek_out)))))];
- struct fuse_out_header *out_hdr = (struct fuse_out_header *)out_buf;
- /* For read requests: Data to be returned */
void *out_data_buffer = NULL;
- ssize_t ret;
+ ssize_t ret = 0;
- /* Limit scope to ensure pointer is no longer used after yielding */
- {
- const struct fuse_in_header *in_hdr =
- (const struct fuse_in_header *)q->request_buf;
+ void *op_in_buf = (void *)FUSE_IN_OP_STRUCT_LEGACY(in_buf);
+ void *op_out_buf = (void *)FUSE_OUT_OP_STRUCT_LEGACY(out_buf);
- opcode = in_hdr->opcode;
- req_id = in_hdr->unique;
+#ifdef CONFIG_LINUX_IO_URING
+ if (opcode != FUSE_INIT && exp->is_uring) {
+ op_in_buf = (void *)in_buf;
+ op_out_buf = (void *)out_buf;
}
+#endif
switch (opcode) {
case FUSE_INIT: {
- const struct fuse_init_in *in = FUSE_IN_OP_STRUCT(init, q);
- ret = fuse_co_init(exp, FUSE_OUT_OP_STRUCT(init, out_buf),
- in->max_readahead, in->flags);
+ const struct fuse_init_in *in =
+ (const struct fuse_init_in *)FUSE_IN_OP_STRUCT_LEGACY(in_buf);
+
+ struct fuse_init_out *out =
+ (struct fuse_init_out *)FUSE_OUT_OP_STRUCT_LEGACY(out_buf);
+
+ ret = fuse_co_init(exp, out, in->max_readahead, in);
break;
}
- case FUSE_OPEN:
- ret = fuse_co_open(exp, FUSE_OUT_OP_STRUCT(open, out_buf));
+ case FUSE_OPEN: {
+ struct fuse_open_out *out =
+ (struct fuse_open_out *)op_out_buf;
+
+ ret = fuse_co_open(exp, out);
break;
+ }
case FUSE_RELEASE:
ret = 0;
break;
case FUSE_LOOKUP:
- ret = -ENOENT; /* There is no node but the root node */
+ ret = -ENOENT;
break;
- case FUSE_GETATTR:
- ret = fuse_co_getattr(exp, FUSE_OUT_OP_STRUCT(attr, out_buf));
+ case FUSE_GETATTR: {
+ struct fuse_attr_out *out =
+ (struct fuse_attr_out *)op_out_buf;
+
+ ret = fuse_co_getattr(exp, out);
break;
+ }
case FUSE_SETATTR: {
- const struct fuse_setattr_in *in = FUSE_IN_OP_STRUCT(setattr, q);
- ret = fuse_co_setattr(exp, FUSE_OUT_OP_STRUCT(attr, out_buf),
- in->valid, in->size, in->mode, in->uid, in->gid);
+ const struct fuse_setattr_in *in =
+ (const struct fuse_setattr_in *)op_in_buf;
+
+ struct fuse_attr_out *out =
+ (struct fuse_attr_out *)op_out_buf;
+
+ ret = fuse_co_setattr(exp, out, in->valid, in->size, in->mode,
+ in->uid, in->gid);
break;
}
case FUSE_READ: {
- const struct fuse_read_in *in = FUSE_IN_OP_STRUCT(read, q);
+ const struct fuse_read_in *in =
+ (const struct fuse_read_in *)op_in_buf;
+
ret = fuse_co_read(exp, &out_data_buffer, in->offset, in->size);
break;
}
case FUSE_WRITE: {
- const struct fuse_write_in *in = FUSE_IN_OP_STRUCT(write, q);
- uint32_t req_len;
-
- req_len = ((const struct fuse_in_header *)q->request_buf)->len;
- if (unlikely(req_len < sizeof(struct fuse_in_header) + sizeof(*in) +
- in->size)) {
- warn_report("FUSE WRITE truncated; received %zu bytes of %" PRIu32,
- req_len - sizeof(struct fuse_in_header) - sizeof(*in),
- in->size);
- ret = -EINVAL;
- break;
+ const struct fuse_write_in *in =
+ (const struct fuse_write_in *)op_in_buf;
+
+ struct fuse_write_out *out =
+ (struct fuse_write_out *)op_out_buf;
+
+#ifdef CONFIG_LINUX_IO_URING
+ if (!exp->is_uring) {
+#endif
+ uint32_t req_len = ((const struct fuse_in_header *)in_buf)->len;
+
+ if (unlikely(req_len < sizeof(struct fuse_in_header) + sizeof(*in) +
+ in->size)) {
+ warn_report("FUSE WRITE truncated; received %zu bytes of %"
+ PRIu32,
+ req_len - sizeof(struct fuse_in_header) - sizeof(*in),
+ in->size);
+ ret = -EINVAL;
+ break;
+ }
+#ifdef CONFIG_LINUX_IO_URING
+ } else {
+ assert(in->size <=
+ ((FuseRingEnt *)opaque)->req_header.ring_ent_in_out.payload_sz);
}
+#endif
- /*
- * poll_fuse_fd() has checked that in_hdr->len matches the number of
- * bytes read, which cannot exceed the max_write value we set
- * (FUSE_MAX_WRITE_BYTES). So we know that FUSE_MAX_WRITE_BYTES >=
- * in_hdr->len >= in->size + X, so this assertion must hold.
- */
assert(in->size <= FUSE_MAX_WRITE_BYTES);
- /*
- * Passing a pointer to `in` (i.e. the request buffer) is fine because
- * fuse_co_write() takes care to copy its contents before potentially
- * yielding.
- */
- ret = fuse_co_write(exp, FUSE_OUT_OP_STRUCT(write, out_buf),
- in->offset, in->size, in + 1, spillover_buf);
+ const void *in_place_buf = in + 1;
+ const void *spill_buf = spillover_buf;
+
+#ifdef CONFIG_LINUX_IO_URING
+ if (exp->is_uring) {
+ in_place_buf = NULL;
+ spill_buf = out_buf;
+ }
+#endif
+
+ ret = fuse_co_write(exp, out, in->offset, in->size,
+ in_place_buf, spill_buf);
break;
}
case FUSE_FALLOCATE: {
- const struct fuse_fallocate_in *in = FUSE_IN_OP_STRUCT(fallocate, q);
+ const struct fuse_fallocate_in *in =
+ (const struct fuse_fallocate_in *)op_in_buf;
+
ret = fuse_co_fallocate(exp, in->offset, in->length, in->mode);
break;
}
@@ -1759,9 +1797,13 @@ fuse_co_process_request(FuseQueue *q, void *spillover_buf)
#ifdef CONFIG_FUSE_LSEEK
case FUSE_LSEEK: {
- const struct fuse_lseek_in *in = FUSE_IN_OP_STRUCT(lseek, q);
- ret = fuse_co_lseek(exp, FUSE_OUT_OP_STRUCT(lseek, out_buf),
- in->offset, in->whence);
+ const struct fuse_lseek_in *in =
+ (const struct fuse_lseek_in *)op_in_buf;
+
+ struct fuse_lseek_out *out =
+ (struct fuse_lseek_out *)op_out_buf;
+
+ ret = fuse_co_lseek(exp, out, in->offset, in->whence);
break;
}
#endif
@@ -1770,28 +1812,147 @@ fuse_co_process_request(FuseQueue *q, void *spillover_buf)
ret = -ENOSYS;
}
- /* Ignore errors from fuse_write*(), nothing we can do anyway */
+ send_response(opaque, req_id, ret, out_data_buffer, out_buf);
+
if (out_data_buffer) {
- assert(ret >= 0);
- fuse_write_buf_response(q->fuse_fd, req_id, out_hdr,
- out_data_buffer, ret);
qemu_vfree(out_data_buffer);
+ }
+
+ if (fd != -1) {
+ qemu_vfree(spillover_buf);
+ }
+
#ifdef CONFIG_LINUX_IO_URING
+ /* Handle FUSE initialization errors */
+ if (unlikely(opcode == FUSE_INIT && ret == -ENODEV)) {
+ error_report("System doesn't support FUSE-over-io_uring");
+ fuse_export_halt(exp);
+ return;
+ }
+
/* Handle FUSE-over-io_uring initialization */
- if (unlikely(opcode == FUSE_INIT && exp->is_uring)) {
+ if (unlikely(opcode == FUSE_INIT && exp->is_uring && fd != -1)) {
struct fuse_init_out *out =
- (struct fuse_init_out *)FUSE_OUT_OP_STRUCT(out_buf);
+ (struct fuse_init_out *)FUSE_OUT_OP_STRUCT_LEGACY(out_buf);
fuse_uring_start(exp, out);
}
#endif
+}
+
+/* Helper to send response for legacy */
+static void send_response_legacy(void *opaque, uint32_t req_id, ssize_t ret,
+ const void *buf, void *out_buf)
+{
+ FuseQueue *q = (FuseQueue *)opaque;
+ struct fuse_out_header *out_hdr = (struct fuse_out_header *)out_buf;
+ if (buf) {
+ assert(ret >= 0);
+ fuse_write_buf_response(q->fuse_fd, req_id, out_hdr, buf, ret);
} else {
fuse_write_response(q->fuse_fd, req_id, out_hdr,
ret < 0 ? ret : 0,
ret < 0 ? 0 : ret);
}
+}
- qemu_vfree(spillover_buf);
+static void coroutine_fn
+fuse_co_process_request(FuseQueue *q, void *spillover_buf)
+{
+ FuseExport *exp = q->exp;
+ uint32_t opcode;
+ uint64_t req_id;
+
+ /*
+ * Return buffer. Must be large enough to hold all return headers, but does
+ * not include space for data returned by read requests.
+ */
+ char out_buf[sizeof(struct fuse_out_header) +
+ MAX_CONST(sizeof(struct fuse_init_out),
+ MAX_CONST(sizeof(struct fuse_open_out),
+ MAX_CONST(sizeof(struct fuse_attr_out),
+ MAX_CONST(sizeof(struct fuse_write_out),
+ sizeof(struct fuse_lseek_out)))))] = {0};
+
+ /* Limit scope to ensure pointer is no longer used after yielding */
+ {
+ const struct fuse_in_header *in_hdr =
+ (const struct fuse_in_header *)q->request_buf;
+
+ opcode = in_hdr->opcode;
+ req_id = in_hdr->unique;
+ }
+
+ fuse_co_process_request_common(exp, opcode, req_id, q->request_buf,
+ spillover_buf, out_buf, q->fuse_fd, send_response_legacy, q);
+}
+
+#ifdef CONFIG_LINUX_IO_URING
+static void fuse_uring_prep_sqe_commit(struct io_uring_sqe *sqe, void *opaque)
+{
+ FuseRingEnt *ent = opaque;
+ struct fuse_uring_cmd_req *req = (void *)&sqe->cmd[0];
+
+ fuse_uring_sqe_prepare(sqe, ent->rq->q, FUSE_IO_URING_CMD_COMMIT_AND_FETCH);
+ fuse_uring_sqe_set_req_data(req, ent->rq->rqid,
+ ent->req_commit_id);
+}
+
+static void
+fuse_uring_send_response(FuseRingEnt *ent, uint32_t req_id, ssize_t ret,
+ const void *out_data_buffer)
+{
+ FuseExport *exp = ent->rq->q->exp;
+
+ struct fuse_uring_req_header *rrh = &ent->req_header;
+ struct fuse_out_header *out_header = (struct fuse_out_header *)&rrh->in_out;
+ struct fuse_uring_ent_in_out *ent_in_out =
+ (struct fuse_uring_ent_in_out *)&rrh->ring_ent_in_out;
+
+ /* FUSE_READ */
+ if (out_data_buffer && ret > 0) {
+ memcpy(ent->op_payload, out_data_buffer, ret);
+ }
+
+ out_header->error = ret < 0 ? ret : 0;
+ out_header->unique = req_id;
+ /* out_header->len = ret > 0 ? ret : 0; */
+ ent_in_out->payload_sz = ret > 0 ? ret : 0;
+ aio_add_sqe(fuse_uring_prep_sqe_commit, ent,
+ &ent->fuse_cqe_handler);
+}
+
+/* Helper to send response for uring */
+static void send_response_uring(void *opaque, uint32_t req_id, ssize_t ret,
+ const void *out_data_buffer, void *payload)
+{
+ FuseRingEnt *ent = (FuseRingEnt *)opaque;
+
+ fuse_uring_send_response(ent, req_id, ret, out_data_buffer);
+}
+
+static void coroutine_fn fuse_uring_co_process_request(FuseRingEnt *ent)
+{
+ FuseExport *exp = ent->rq->q->exp;
+ struct fuse_uring_req_header *rrh = &ent->req_header;
+ struct fuse_uring_ent_in_out *ent_in_out =
+ (struct fuse_uring_ent_in_out *)&rrh->ring_ent_in_out;
+ struct fuse_in_header *in_hdr =
+ (struct fuse_in_header *)&rrh->in_out;
+ uint32_t opcode = in_hdr->opcode;
+ uint64_t req_id = in_hdr->unique;
+ ent->req_commit_id = ent_in_out->commit_id;
+
+ if (unlikely(ent->req_commit_id == 0)) {
+ error_report("If this happens kernel will not find the response - "
+ "it will be stuck forever - better to abort immediately.");
+ fuse_export_halt(exp);
+ return;
+ }
+
+ fuse_co_process_request_common(exp, opcode, req_id, &rrh->op_in,
+ NULL, ent->op_payload, -1, send_response_uring, ent);
}
+#endif
const BlockExportDriver blk_exp_fuse = {
.type = BLOCK_EXPORT_TYPE_FUSE,
--
2.45.2
Am 30.08.2025 um 04:50 hat Brian Song geschrieben: > https://docs.kernel.org/filesystems/fuse-io-uring.html > > As described in the kernel documentation, after FUSE-over-io_uring > initialization and handshake, FUSE interacts with the kernel using > SQE/CQE to send requests and receive responses. This corresponds to > the "Sending requests with CQEs" section in the docs. > > This patch implements three key parts: registering the CQE handler > (fuse_uring_cqe_handler), processing FUSE requests (fuse_uring_co_ > process_request), and sending response results (fuse_uring_send_ > response). It also merges the traditional /dev/fuse request handling > with the FUSE-over-io_uring handling functions. > > Suggested-by: Kevin Wolf <kwolf@redhat.com> > Suggested-by: Stefan Hajnoczi <stefanha@redhat.com> > Signed-off-by: Brian Song <hibriansong@gmail.com> A general remark first: I think this would be easier to review if it were split into multiple patches. For example, at the first sight it looks to me like I'd split at least: - Factor out fuse_co_process_request_common() from fuse_co_process_request(). This would be a pure code movement patch with no intention to change the behaviour (i.e. it doesn't add any io_uring code yet). It is very common to have such refactoring commits in preparation for the addition of a new feature later. - Change fuse_co_write() to allow a NULL in_place_buf - Add io_uring request processing All three are logically independent changes and can be reviewed on their own. Maybe further splitting is possible that would only become obvious when looking at the smaller patches. > block/export/fuse.c | 457 ++++++++++++++++++++++++++++++-------------- > 1 file changed, 309 insertions(+), 148 deletions(-) > > diff --git a/block/export/fuse.c b/block/export/fuse.c > index 19bf9e5f74..07f74fc8ec 100644 > --- a/block/export/fuse.c > +++ b/block/export/fuse.c > @@ -310,6 +310,47 @@ static const BlockDevOps fuse_export_blk_dev_ops = { > }; > > #ifdef CONFIG_LINUX_IO_URING > +static void coroutine_fn fuse_uring_co_process_request(FuseRingEnt *ent); > + > +static void coroutine_fn co_fuse_uring_queue_handle_cqes(void *opaque) > +{ > + FuseRingEnt *ent = opaque; > + FuseExport *exp = ent->rq->q->exp; > + > + /* Going to process requests */ > + fuse_inc_in_flight(exp); I think this can be too late. The in_flight counter must be increased before we start processing something that must be waited for in a drain. Can't it happen here that a drain in the main thread already returns while the CQE is still pending in an iothread, but nothing stops it from being processed and starting new requests even though we're supossedly in a drained section now? > + /* A ring entry returned */ > + fuse_uring_co_process_request(ent); > + > + /* Finished processing requests */ > + fuse_dec_in_flight(exp); > +} > + > +static void fuse_uring_cqe_handler(CqeHandler *cqe_handler) > +{ > + FuseRingEnt *ent = container_of(cqe_handler, FuseRingEnt, fuse_cqe_handler); > + Coroutine *co; > + FuseExport *exp = ent->rq->q->exp; > + > + if (unlikely(exp->halted)) { > + return; > + } > + > + int err = cqe_handler->cqe.res; > + > + if (err != 0) { > + /* -ENOTCONN is ok on umount */ > + if (err != -EINTR && err != -EAGAIN && > + err != -ENOTCONN) { This fits on a single line (but I think the result was that you'll remove some error codes anway). > + fuse_export_halt(exp); > + } > + } else { > + co = qemu_coroutine_create(co_fuse_uring_queue_handle_cqes, ent); > + qemu_coroutine_enter(co); > + } > +} > + > static void fuse_uring_sqe_set_req_data(struct fuse_uring_cmd_req *req, > const unsigned int rqid, > const unsigned int commit_id) > @@ -1213,6 +1254,9 @@ fuse_co_read(FuseExport *exp, void **bufptr, uint64_t offset, uint32_t size) > * Data in @in_place_buf is assumed to be overwritten after yielding, so will > * be copied to a bounce buffer beforehand. @spillover_buf in contrast is > * assumed to be exclusively owned and will be used as-is. > + * In FUSE-over-io_uring mode, the actual op_payload content is stored in > + * @spillover_buf. To ensure this buffer is used for writing, @in_place_buf > + * is explicitly set to NULL. > * Return the number of bytes written to *out on success, and -errno on error. > */ > static ssize_t coroutine_fn > @@ -1220,8 +1264,8 @@ fuse_co_write(FuseExport *exp, struct fuse_write_out *out, > uint64_t offset, uint32_t size, > const void *in_place_buf, const void *spillover_buf) > { > - size_t in_place_size; > - void *copied; > + size_t in_place_size = 0; > + void *copied = NULL; > int64_t blk_len; > int ret; > struct iovec iov[2]; > @@ -1236,10 +1280,12 @@ fuse_co_write(FuseExport *exp, struct fuse_write_out *out, > return -EACCES; > } > > - /* Must copy to bounce buffer before potentially yielding */ > - in_place_size = MIN(size, FUSE_IN_PLACE_WRITE_BYTES); > - copied = blk_blockalign(exp->common.blk, in_place_size); > - memcpy(copied, in_place_buf, in_place_size); > + if (in_place_buf) { > + /* Must copy to bounce buffer before potentially yielding */ > + in_place_size = MIN(size, FUSE_IN_PLACE_WRITE_BYTES); > + copied = blk_blockalign(exp->common.blk, in_place_size); > + memcpy(copied, in_place_buf, in_place_size); > + } > > /** > * Clients will expect short writes at EOF, so we have to limit > @@ -1263,26 +1309,38 @@ fuse_co_write(FuseExport *exp, struct fuse_write_out *out, > } > } > > - iov[0] = (struct iovec) { > - .iov_base = copied, > - .iov_len = in_place_size, > - }; > - if (size > FUSE_IN_PLACE_WRITE_BYTES) { > - assert(size - FUSE_IN_PLACE_WRITE_BYTES <= FUSE_SPILLOVER_BUF_SIZE); > - iov[1] = (struct iovec) { > - .iov_base = (void *)spillover_buf, > - .iov_len = size - FUSE_IN_PLACE_WRITE_BYTES, > + if (in_place_buf) { > + iov[0] = (struct iovec) { > + .iov_base = copied, > + .iov_len = in_place_size, > }; > - qemu_iovec_init_external(&qiov, iov, 2); > + if (size > FUSE_IN_PLACE_WRITE_BYTES) { > + assert(size - FUSE_IN_PLACE_WRITE_BYTES <= FUSE_SPILLOVER_BUF_SIZE); > + iov[1] = (struct iovec) { > + .iov_base = (void *)spillover_buf, > + .iov_len = size - FUSE_IN_PLACE_WRITE_BYTES, > + }; > + qemu_iovec_init_external(&qiov, iov, 2); > + } else { > + qemu_iovec_init_external(&qiov, iov, 1); > + } > } else { > + /* fuse over io_uring */ > + iov[0] = (struct iovec) { > + .iov_base = (void *)spillover_buf, > + .iov_len = size, > + }; > qemu_iovec_init_external(&qiov, iov, 1); > } > + > ret = blk_co_pwritev(exp->common.blk, offset, size, &qiov, 0); > if (ret < 0) { > goto fail_free_buffer; > } > > - qemu_vfree(copied); > + if (in_place_buf) { > + qemu_vfree(copied); > + } > > *out = (struct fuse_write_out) { > .size = size, > @@ -1290,7 +1348,9 @@ fuse_co_write(FuseExport *exp, struct fuse_write_out *out, > return sizeof(*out); > > fail_free_buffer: > - qemu_vfree(copied); > + if (in_place_buf) { > + qemu_vfree(copied); > + } > return ret; > } > > @@ -1578,173 +1638,151 @@ static int fuse_write_buf_response(int fd, uint32_t req_id, > } > } > > -/* > - * For use in fuse_co_process_request(): > - * Returns a pointer to the parameter object for the given operation (inside of > - * queue->request_buf, which is assumed to hold a fuse_in_header first). > - * Verifies that the object is complete (queue->request_buf is large enough to > - * hold it in one piece, and the request length includes the whole object). > - * > - * Note that queue->request_buf may be overwritten after yielding, so the > - * returned pointer must not be used across a function that may yield! > - */ > -#define FUSE_IN_OP_STRUCT(op_name, queue) \ > +#define FUSE_IN_OP_STRUCT_LEGACY(in_buf) \ > ({ \ > - const struct fuse_in_header *__in_hdr = \ > - (const struct fuse_in_header *)(queue)->request_buf; \ > - const struct fuse_##op_name##_in *__in = \ > - (const struct fuse_##op_name##_in *)(__in_hdr + 1); \ > - const size_t __param_len = sizeof(*__in_hdr) + sizeof(*__in); \ > - uint32_t __req_len; \ > - \ > - QEMU_BUILD_BUG_ON(sizeof((queue)->request_buf) < __param_len); \ > - \ > - __req_len = __in_hdr->len; \ > - if (__req_len < __param_len) { \ > - warn_report("FUSE request truncated (%" PRIu32 " < %zu)", \ > - __req_len, __param_len); \ > - ret = -EINVAL; \ > - break; \ > - } \ This check made sure that we don't access in_buf out of bounds. What is the replacement for it? > - __in; \ > + (void *)(((struct fuse_in_header *)in_buf) + 1); \ > }) > > -/* > - * For use in fuse_co_process_request(): > - * Returns a pointer to the return object for the given operation (inside of > - * out_buf, which is assumed to hold a fuse_out_header first). > - * Verifies that out_buf is large enough to hold the whole object. > - * > - * (out_buf should be a char[] array.) > - */ > -#define FUSE_OUT_OP_STRUCT(op_name, out_buf) \ > +#define FUSE_OUT_OP_STRUCT_LEGACY(out_buf) \ > ({ \ > - struct fuse_out_header *__out_hdr = \ > - (struct fuse_out_header *)(out_buf); \ > - struct fuse_##op_name##_out *__out = \ > - (struct fuse_##op_name##_out *)(__out_hdr + 1); \ > - \ > - QEMU_BUILD_BUG_ON(sizeof(*__out_hdr) + sizeof(*__out) > \ > - sizeof(out_buf)); \ > - \ > - __out; \ > + (void *)(((struct fuse_out_header *)out_buf) + 1); \ > }) > > -/** > - * Process a FUSE request, incl. writing the response. > - * > - * Note that yielding in any request-processing function can overwrite the > - * contents of q->request_buf. Anything that takes a buffer needs to take > - * care that the content is copied before yielding. > - * > - * @spillover_buf can contain the tail of a write request too large to fit into > - * q->request_buf. This function takes ownership of it (i.e. will free it), > - * which assumes that its contents will not be overwritten by concurrent > - * requests (as opposed to q->request_buf). > + > +/* > + * Shared helper for FUSE request processing. Handles both legacy and io_uring > + * paths. > */ > -static void coroutine_fn > -fuse_co_process_request(FuseQueue *q, void *spillover_buf) > +static void coroutine_fn fuse_co_process_request_common( > + FuseExport *exp, > + uint32_t opcode, > + uint64_t req_id, > + void *in_buf, > + void *spillover_buf, > + void *out_buf, > + int fd, /* -1 for uring */ > + void (*send_response)(void *opaque, uint32_t req_id, ssize_t ret, > + const void *buf, void *out_buf), > + void *opaque /* FuseQueue* or FuseRingEnt* */) > { > - FuseExport *exp = q->exp; > - uint32_t opcode; > - uint64_t req_id; > - /* > - * Return buffer. Must be large enough to hold all return headers, but does > - * not include space for data returned by read requests. > - * (FUSE_IN_OP_STRUCT() verifies at compile time that out_buf is indeed > - * large enough.) > - */ > - char out_buf[sizeof(struct fuse_out_header) + > - MAX_CONST(sizeof(struct fuse_init_out), > - MAX_CONST(sizeof(struct fuse_open_out), > - MAX_CONST(sizeof(struct fuse_attr_out), > - MAX_CONST(sizeof(struct fuse_write_out), > - sizeof(struct fuse_lseek_out)))))]; > - struct fuse_out_header *out_hdr = (struct fuse_out_header *)out_buf; > - /* For read requests: Data to be returned */ > void *out_data_buffer = NULL; > - ssize_t ret; > + ssize_t ret = 0; > > - /* Limit scope to ensure pointer is no longer used after yielding */ > - { > - const struct fuse_in_header *in_hdr = > - (const struct fuse_in_header *)q->request_buf; > + void *op_in_buf = (void *)FUSE_IN_OP_STRUCT_LEGACY(in_buf); > + void *op_out_buf = (void *)FUSE_OUT_OP_STRUCT_LEGACY(out_buf); > > - opcode = in_hdr->opcode; > - req_id = in_hdr->unique; > +#ifdef CONFIG_LINUX_IO_URING > + if (opcode != FUSE_INIT && exp->is_uring) { Maybe add a comment explaining that FUSE_INIT is always delivered through /dev/fuse, even if we want to enable io_uring? > + op_in_buf = (void *)in_buf; > + op_out_buf = (void *)out_buf; > } > +#endif > > switch (opcode) { > case FUSE_INIT: { > - const struct fuse_init_in *in = FUSE_IN_OP_STRUCT(init, q); > - ret = fuse_co_init(exp, FUSE_OUT_OP_STRUCT(init, out_buf), > - in->max_readahead, in->flags); > + const struct fuse_init_in *in = > + (const struct fuse_init_in *)FUSE_IN_OP_STRUCT_LEGACY(in_buf); > + > + struct fuse_init_out *out = > + (struct fuse_init_out *)FUSE_OUT_OP_STRUCT_LEGACY(out_buf); FUSE_IN_OP_STRUCT_LEGACY() returns a void *, so the explicit casts are unnecessary. This applies to all of the commands below, too. > + > + ret = fuse_co_init(exp, out, in->max_readahead, in); > break; > } > > - case FUSE_OPEN: > - ret = fuse_co_open(exp, FUSE_OUT_OP_STRUCT(open, out_buf)); > + case FUSE_OPEN: { > + struct fuse_open_out *out = > + (struct fuse_open_out *)op_out_buf; > + > + ret = fuse_co_open(exp, out); > break; > + } > > case FUSE_RELEASE: > ret = 0; > break; > > case FUSE_LOOKUP: > - ret = -ENOENT; /* There is no node but the root node */ > + ret = -ENOENT; > break; Why are you removing the comment? > > - case FUSE_GETATTR: > - ret = fuse_co_getattr(exp, FUSE_OUT_OP_STRUCT(attr, out_buf)); > + case FUSE_GETATTR: { > + struct fuse_attr_out *out = > + (struct fuse_attr_out *)op_out_buf; > + > + ret = fuse_co_getattr(exp, out); > break; > + } > > case FUSE_SETATTR: { > - const struct fuse_setattr_in *in = FUSE_IN_OP_STRUCT(setattr, q); > - ret = fuse_co_setattr(exp, FUSE_OUT_OP_STRUCT(attr, out_buf), > - in->valid, in->size, in->mode, in->uid, in->gid); > + const struct fuse_setattr_in *in = > + (const struct fuse_setattr_in *)op_in_buf; > + > + struct fuse_attr_out *out = > + (struct fuse_attr_out *)op_out_buf; > + > + ret = fuse_co_setattr(exp, out, in->valid, in->size, in->mode, > + in->uid, in->gid); > break; > } > > case FUSE_READ: { > - const struct fuse_read_in *in = FUSE_IN_OP_STRUCT(read, q); > + const struct fuse_read_in *in = > + (const struct fuse_read_in *)op_in_buf; > + > ret = fuse_co_read(exp, &out_data_buffer, in->offset, in->size); > break; > } > > case FUSE_WRITE: { > - const struct fuse_write_in *in = FUSE_IN_OP_STRUCT(write, q); > - uint32_t req_len; > - > - req_len = ((const struct fuse_in_header *)q->request_buf)->len; > - if (unlikely(req_len < sizeof(struct fuse_in_header) + sizeof(*in) + > - in->size)) { > - warn_report("FUSE WRITE truncated; received %zu bytes of %" PRIu32, > - req_len - sizeof(struct fuse_in_header) - sizeof(*in), > - in->size); > - ret = -EINVAL; > - break; > + const struct fuse_write_in *in = > + (const struct fuse_write_in *)op_in_buf; > + > + struct fuse_write_out *out = > + (struct fuse_write_out *)op_out_buf; > + > +#ifdef CONFIG_LINUX_IO_URING > + if (!exp->is_uring) { > +#endif I wonder if it wouldn't be better to have exp->is_uring available even without CONFIG_LINUX_IO_URING, it would just always be false. It would be nice to avoid #ifdefs in the middle of the function if they aren't strictly necessary. > + uint32_t req_len = ((const struct fuse_in_header *)in_buf)->len; > + > + if (unlikely(req_len < sizeof(struct fuse_in_header) + sizeof(*in) + > + in->size)) { > + warn_report("FUSE WRITE truncated; received %zu bytes of %" > + PRIu32, > + req_len - sizeof(struct fuse_in_header) - sizeof(*in), > + in->size); > + ret = -EINVAL; > + break; > + } > +#ifdef CONFIG_LINUX_IO_URING > + } else { > + assert(in->size <= > + ((FuseRingEnt *)opaque)->req_header.ring_ent_in_out.payload_sz); > } > +#endif > > - /* > - * poll_fuse_fd() has checked that in_hdr->len matches the number of > - * bytes read, which cannot exceed the max_write value we set > - * (FUSE_MAX_WRITE_BYTES). So we know that FUSE_MAX_WRITE_BYTES >= > - * in_hdr->len >= in->size + X, so this assertion must hold. > - */ > assert(in->size <= FUSE_MAX_WRITE_BYTES); Instead of deleting the comment explaining why this is true, can you just add a second paragraph explaining why it's true for io_uring, too? > - /* > - * Passing a pointer to `in` (i.e. the request buffer) is fine because > - * fuse_co_write() takes care to copy its contents before potentially > - * yielding. > - */ Why did you delete this comment? It's still true. > - ret = fuse_co_write(exp, FUSE_OUT_OP_STRUCT(write, out_buf), > - in->offset, in->size, in + 1, spillover_buf); > + const void *in_place_buf = in + 1; > + const void *spill_buf = spillover_buf; > + > +#ifdef CONFIG_LINUX_IO_URING > + if (exp->is_uring) { > + in_place_buf = NULL; > + spill_buf = out_buf; > + } > +#endif > + > + ret = fuse_co_write(exp, out, in->offset, in->size, > + in_place_buf, spill_buf); > break; > } > > case FUSE_FALLOCATE: { > - const struct fuse_fallocate_in *in = FUSE_IN_OP_STRUCT(fallocate, q); > + const struct fuse_fallocate_in *in = > + (const struct fuse_fallocate_in *)op_in_buf; > + > ret = fuse_co_fallocate(exp, in->offset, in->length, in->mode); > break; > } Kevin
On Fri, Aug 29, 2025 at 10:50:23PM -0400, Brian Song wrote: > https://docs.kernel.org/filesystems/fuse-io-uring.html > > As described in the kernel documentation, after FUSE-over-io_uring > initialization and handshake, FUSE interacts with the kernel using > SQE/CQE to send requests and receive responses. This corresponds to > the "Sending requests with CQEs" section in the docs. > > This patch implements three key parts: registering the CQE handler > (fuse_uring_cqe_handler), processing FUSE requests (fuse_uring_co_ > process_request), and sending response results (fuse_uring_send_ > response). It also merges the traditional /dev/fuse request handling > with the FUSE-over-io_uring handling functions. > > Suggested-by: Kevin Wolf <kwolf@redhat.com> > Suggested-by: Stefan Hajnoczi <stefanha@redhat.com> > Signed-off-by: Brian Song <hibriansong@gmail.com> > --- > block/export/fuse.c | 457 ++++++++++++++++++++++++++++++-------------- > 1 file changed, 309 insertions(+), 148 deletions(-) > > diff --git a/block/export/fuse.c b/block/export/fuse.c > index 19bf9e5f74..07f74fc8ec 100644 > --- a/block/export/fuse.c > +++ b/block/export/fuse.c > @@ -310,6 +310,47 @@ static const BlockDevOps fuse_export_blk_dev_ops = { > }; > > #ifdef CONFIG_LINUX_IO_URING > +static void coroutine_fn fuse_uring_co_process_request(FuseRingEnt *ent); > + > +static void coroutine_fn co_fuse_uring_queue_handle_cqes(void *opaque) This function appears to handle exactly one cqe. A singular function name would be clearer than a plural: co_fuse_uring_queue_handle_cqe(). > +{ > + FuseRingEnt *ent = opaque; > + FuseExport *exp = ent->rq->q->exp; > + > + /* Going to process requests */ > + fuse_inc_in_flight(exp); What is the rationale for taking a reference here? Normally something already holds a reference (e.g. the request itself) and it will be dropped somewhere inside a function we're about to call, but we still need to access exp afterwards, so we temporarily take a reference. Please document the specifics in a comment. I think blk_exp_ref()/blk_exp_unref() are appropriate instead of fuse_inc_in_flight()/fuse_dec_in_flight() since we only need to hold onto the export and don't care about drain behavior. > + > + /* A ring entry returned */ > + fuse_uring_co_process_request(ent); > + > + /* Finished processing requests */ > + fuse_dec_in_flight(exp); > +} > + > +static void fuse_uring_cqe_handler(CqeHandler *cqe_handler) > +{ > + FuseRingEnt *ent = container_of(cqe_handler, FuseRingEnt, fuse_cqe_handler); > + Coroutine *co; > + FuseExport *exp = ent->rq->q->exp; > + > + if (unlikely(exp->halted)) { > + return; > + } > + > + int err = cqe_handler->cqe.res; > + > + if (err != 0) { > + /* -ENOTCONN is ok on umount */ > + if (err != -EINTR && err != -EAGAIN && > + err != -ENOTCONN) { > + fuse_export_halt(exp); > + } How are EINTR and EAGAIN handled if they are silently ignored? When did you encounter these error codes?
On 9/3/25 7:51 AM, Stefan Hajnoczi wrote: > On Fri, Aug 29, 2025 at 10:50:23PM -0400, Brian Song wrote: >> https://docs.kernel.org/filesystems/fuse-io-uring.html >> >> As described in the kernel documentation, after FUSE-over-io_uring >> initialization and handshake, FUSE interacts with the kernel using >> SQE/CQE to send requests and receive responses. This corresponds to >> the "Sending requests with CQEs" section in the docs. >> >> This patch implements three key parts: registering the CQE handler >> (fuse_uring_cqe_handler), processing FUSE requests (fuse_uring_co_ >> process_request), and sending response results (fuse_uring_send_ >> response). It also merges the traditional /dev/fuse request handling >> with the FUSE-over-io_uring handling functions. >> >> Suggested-by: Kevin Wolf <kwolf@redhat.com> >> Suggested-by: Stefan Hajnoczi <stefanha@redhat.com> >> Signed-off-by: Brian Song <hibriansong@gmail.com> >> --- >> block/export/fuse.c | 457 ++++++++++++++++++++++++++++++-------------- >> 1 file changed, 309 insertions(+), 148 deletions(-) >> >> diff --git a/block/export/fuse.c b/block/export/fuse.c >> index 19bf9e5f74..07f74fc8ec 100644 >> --- a/block/export/fuse.c >> +++ b/block/export/fuse.c >> @@ -310,6 +310,47 @@ static const BlockDevOps fuse_export_blk_dev_ops = { >> }; >> >> #ifdef CONFIG_LINUX_IO_URING >> +static void coroutine_fn fuse_uring_co_process_request(FuseRingEnt *ent); >> + >> +static void coroutine_fn co_fuse_uring_queue_handle_cqes(void *opaque) > > This function appears to handle exactly one cqe. A singular function > name would be clearer than a plural: co_fuse_uring_queue_handle_cqe(). > >> +{ >> + FuseRingEnt *ent = opaque; >> + FuseExport *exp = ent->rq->q->exp; >> + >> + /* Going to process requests */ >> + fuse_inc_in_flight(exp); > > What is the rationale for taking a reference here? Normally something > already holds a reference (e.g. the request itself) and it will be > dropped somewhere inside a function we're about to call, but we still > need to access exp afterwards, so we temporarily take a reference. > Please document the specifics in a comment. > > I think blk_exp_ref()/blk_exp_unref() are appropriate instead of > fuse_inc_in_flight()/fuse_dec_in_flight() since we only need to hold > onto the export and don't care about drain behavior. > Stefan: When handling FUSE requests, we don’t want the FuseExport to be accidentally deleted. Therefore, we use fuse_inc_in_flight in the CQE handler to increment the in_flight counter, and when a request is completed, we call fuse_dec_in_flight to decrement it. Once the last request has been processed, fuse_dec_in_flight brings the in_flight counter down to 0, indicating that the export can safely be deleted. The usage of in_flight follows the same logic as in traditional FUSE request handling. Since submitted SQEs for FUSE cannot be canceled, once we register or commit them we must wait for the kernel to return a CQE. Otherwise, the kernel may deliver a CQE and invoke its handler after the export has already been deleted. For this reason, we directly call blk_exp_ref and blk_exp_unref when submitting an SQE and when receiving its CQE, to explicitly control the export reference and prevent accidental deletion. The doc/comment for co_fuse_uring_queue_handle_cqe: Protect FuseExport from premature deletion while handling FUSE requests. CQE handlers inc/dec the in_flight counter; when it reaches 0, the export can be freed. This follows the same logic as traditional FUSE. Since FUSE SQEs cannot be canceled, a CQE may arrive after commit even if the export is deleted. To prevent this, we ref/unref the export explicitly at SQE submission and CQE completion. >> + >> + /* A ring entry returned */ >> + fuse_uring_co_process_request(ent); >> + >> + /* Finished processing requests */ >> + fuse_dec_in_flight(exp); >> +} >> + >> +static void fuse_uring_cqe_handler(CqeHandler *cqe_handler) >> +{ >> + FuseRingEnt *ent = container_of(cqe_handler, FuseRingEnt, fuse_cqe_handler); >> + Coroutine *co; >> + FuseExport *exp = ent->rq->q->exp; >> + >> + if (unlikely(exp->halted)) { >> + return; >> + } >> + >> + int err = cqe_handler->cqe.res; >> + >> + if (err != 0) { >> + /* -ENOTCONN is ok on umount */ >> + if (err != -EINTR && err != -EAGAIN && >> + err != -ENOTCONN) { >> + fuse_export_halt(exp); >> + } > > How are EINTR and EAGAIN handled if they are silently ignored? When did > you encounter these error codes? Bernd: I have the same question about this. As for how the kernel returns errors, I haven’t studied each case yet. In libfuse it’s implemented the same way, could you briefly explain why we choose to ignore these two errors, and under what circumstances we might encounter them? Thanks, Brian
On Mon, Sep 08, 2025 at 03:09:57PM -0400, Brian Song wrote: > > > On 9/3/25 7:51 AM, Stefan Hajnoczi wrote: > > On Fri, Aug 29, 2025 at 10:50:23PM -0400, Brian Song wrote: > > > https://docs.kernel.org/filesystems/fuse-io-uring.html > > > > > > As described in the kernel documentation, after FUSE-over-io_uring > > > initialization and handshake, FUSE interacts with the kernel using > > > SQE/CQE to send requests and receive responses. This corresponds to > > > the "Sending requests with CQEs" section in the docs. > > > > > > This patch implements three key parts: registering the CQE handler > > > (fuse_uring_cqe_handler), processing FUSE requests (fuse_uring_co_ > > > process_request), and sending response results (fuse_uring_send_ > > > response). It also merges the traditional /dev/fuse request handling > > > with the FUSE-over-io_uring handling functions. > > > > > > Suggested-by: Kevin Wolf <kwolf@redhat.com> > > > Suggested-by: Stefan Hajnoczi <stefanha@redhat.com> > > > Signed-off-by: Brian Song <hibriansong@gmail.com> > > > --- > > > block/export/fuse.c | 457 ++++++++++++++++++++++++++++++-------------- > > > 1 file changed, 309 insertions(+), 148 deletions(-) > > > > > > diff --git a/block/export/fuse.c b/block/export/fuse.c > > > index 19bf9e5f74..07f74fc8ec 100644 > > > --- a/block/export/fuse.c > > > +++ b/block/export/fuse.c > > > @@ -310,6 +310,47 @@ static const BlockDevOps fuse_export_blk_dev_ops = { > > > }; > > > #ifdef CONFIG_LINUX_IO_URING > > > +static void coroutine_fn fuse_uring_co_process_request(FuseRingEnt *ent); > > > + > > > +static void coroutine_fn co_fuse_uring_queue_handle_cqes(void *opaque) > > > > This function appears to handle exactly one cqe. A singular function > > name would be clearer than a plural: co_fuse_uring_queue_handle_cqe(). > > > > > +{ > > > + FuseRingEnt *ent = opaque; > > > + FuseExport *exp = ent->rq->q->exp; > > > + > > > + /* Going to process requests */ > > > + fuse_inc_in_flight(exp); > > > > What is the rationale for taking a reference here? Normally something > > already holds a reference (e.g. the request itself) and it will be > > dropped somewhere inside a function we're about to call, but we still > > need to access exp afterwards, so we temporarily take a reference. > > Please document the specifics in a comment. > > > > I think blk_exp_ref()/blk_exp_unref() are appropriate instead of > > fuse_inc_in_flight()/fuse_dec_in_flight() since we only need to hold > > onto the export and don't care about drain behavior. > > > > Stefan: > > When handling FUSE requests, we don’t want the FuseExport to be accidentally > deleted. Therefore, we use fuse_inc_in_flight in the CQE handler to > increment the in_flight counter, and when a request is completed, we call > fuse_dec_in_flight to decrement it. Once the last request has been > processed, fuse_dec_in_flight brings the in_flight counter down to 0, > indicating that the export can safely be deleted. The usage of in_flight > follows the same logic as in traditional FUSE request handling. > > Since submitted SQEs for FUSE cannot be canceled, once we register or commit > them we must wait for the kernel to return a CQE. Otherwise, the kernel may > deliver a CQE and invoke its handler after the export has already been > deleted. For this reason, we directly call blk_exp_ref and blk_exp_unref > when submitting an SQE and when receiving its CQE, to explicitly control the > export reference and prevent accidental deletion. > > The doc/comment for co_fuse_uring_queue_handle_cqe: > > Protect FuseExport from premature deletion while handling FUSE requests. CQE > handlers inc/dec the in_flight counter; when it reaches 0, the export can be > freed. This follows the same logic as traditional FUSE. > > Since FUSE SQEs cannot be canceled, a CQE may arrive after commit even if > the export is deleted. To prevent this, we ref/unref the export explicitly > at SQE submission and CQE completion. I looked at your "final" branch on GitHub and the refcount changes there match what I was thinking of. In case it helps for writing comments, I'll try to describe my mental model of the refcounts: - fuse_inc_in_flight()/fuse_dec_in_flight() must wrap the lifecycle of FUSE requests that the server is processing. This ensures that the block layer's drain operation waits for requests to complete and that the export cannot be deleted while the requests are still in progress. - blk_exp_ref()/blk_exp_unref() prevents the export from being deleted while something that still depends on it remains outstanding. How this maps to FUSE-over-io_uring: - When an SQE is submitted blk_exp_ref() must be called. After the CQE has been processed, blk_exp_unref() must be called. This way the export cannot be deleted before all CQEs have been handled. - The coroutine that processes a FUSE request must call fuse_inc_in_flight() before processing begins and fuse_dec_in_flight() after processing ends. Thanks, Stefan
On 9/8/25 21:09, Brian Song wrote: > > > On 9/3/25 7:51 AM, Stefan Hajnoczi wrote: >> On Fri, Aug 29, 2025 at 10:50:23PM -0400, Brian Song wrote: >>> https://docs.kernel.org/filesystems/fuse-io-uring.html >>> >>> As described in the kernel documentation, after FUSE-over-io_uring >>> initialization and handshake, FUSE interacts with the kernel using >>> SQE/CQE to send requests and receive responses. This corresponds to >>> the "Sending requests with CQEs" section in the docs. >>> >>> This patch implements three key parts: registering the CQE handler >>> (fuse_uring_cqe_handler), processing FUSE requests (fuse_uring_co_ >>> process_request), and sending response results (fuse_uring_send_ >>> response). It also merges the traditional /dev/fuse request handling >>> with the FUSE-over-io_uring handling functions. >>> >>> Suggested-by: Kevin Wolf <kwolf@redhat.com> >>> Suggested-by: Stefan Hajnoczi <stefanha@redhat.com> >>> Signed-off-by: Brian Song <hibriansong@gmail.com> >>> --- >>> block/export/fuse.c | 457 ++++++++++++++++++++++++++++++-------------- >>> 1 file changed, 309 insertions(+), 148 deletions(-) >>> >>> diff --git a/block/export/fuse.c b/block/export/fuse.c >>> index 19bf9e5f74..07f74fc8ec 100644 >>> --- a/block/export/fuse.c >>> +++ b/block/export/fuse.c >>> @@ -310,6 +310,47 @@ static const BlockDevOps fuse_export_blk_dev_ops = { >>> }; >>> >>> #ifdef CONFIG_LINUX_IO_URING >>> +static void coroutine_fn fuse_uring_co_process_request(FuseRingEnt *ent); >>> + >>> +static void coroutine_fn co_fuse_uring_queue_handle_cqes(void *opaque) >> >> This function appears to handle exactly one cqe. A singular function >> name would be clearer than a plural: co_fuse_uring_queue_handle_cqe(). >> >>> +{ >>> + FuseRingEnt *ent = opaque; >>> + FuseExport *exp = ent->rq->q->exp; >>> + >>> + /* Going to process requests */ >>> + fuse_inc_in_flight(exp); >> >> What is the rationale for taking a reference here? Normally something >> already holds a reference (e.g. the request itself) and it will be >> dropped somewhere inside a function we're about to call, but we still >> need to access exp afterwards, so we temporarily take a reference. >> Please document the specifics in a comment. >> >> I think blk_exp_ref()/blk_exp_unref() are appropriate instead of >> fuse_inc_in_flight()/fuse_dec_in_flight() since we only need to hold >> onto the export and don't care about drain behavior. >> > > Stefan: > > When handling FUSE requests, we don’t want the FuseExport to be > accidentally deleted. Therefore, we use fuse_inc_in_flight in the CQE > handler to increment the in_flight counter, and when a request is > completed, we call fuse_dec_in_flight to decrement it. Once the last > request has been processed, fuse_dec_in_flight brings the in_flight > counter down to 0, indicating that the export can safely be deleted. The > usage of in_flight follows the same logic as in traditional FUSE request > handling. > > Since submitted SQEs for FUSE cannot be canceled, once we register or > commit them we must wait for the kernel to return a CQE. Otherwise, the > kernel may deliver a CQE and invoke its handler after the export has > already been deleted. For this reason, we directly call blk_exp_ref and > blk_exp_unref when submitting an SQE and when receiving its CQE, to > explicitly control the export reference and prevent accidental deletion. > > The doc/comment for co_fuse_uring_queue_handle_cqe: > > Protect FuseExport from premature deletion while handling FUSE requests. > CQE handlers inc/dec the in_flight counter; when it reaches 0, the > export can be freed. This follows the same logic as traditional FUSE. > > Since FUSE SQEs cannot be canceled, a CQE may arrive after commit even > if the export is deleted. To prevent this, we ref/unref the export > explicitly at SQE submission and CQE completion. > >>> + >>> + /* A ring entry returned */ >>> + fuse_uring_co_process_request(ent); >>> + >>> + /* Finished processing requests */ >>> + fuse_dec_in_flight(exp); >>> +} >>> + >>> +static void fuse_uring_cqe_handler(CqeHandler *cqe_handler) >>> +{ >>> + FuseRingEnt *ent = container_of(cqe_handler, FuseRingEnt, fuse_cqe_handler); >>> + Coroutine *co; >>> + FuseExport *exp = ent->rq->q->exp; >>> + >>> + if (unlikely(exp->halted)) { >>> + return; >>> + } >>> + >>> + int err = cqe_handler->cqe.res; >>> + >>> + if (err != 0) { >>> + /* -ENOTCONN is ok on umount */ >>> + if (err != -EINTR && err != -EAGAIN && >>> + err != -ENOTCONN) { >>> + fuse_export_halt(exp); >>> + } >> >> How are EINTR and EAGAIN handled if they are silently ignored? When did >> you encounter these error codes? > > Bernd: > > I have the same question about this. As for how the kernel returns > errors, I haven’t studied each case yet. In libfuse it’s implemented the > same way, could you briefly explain why we choose to ignore these two > errors, and under what circumstances we might encounter them? I think I remember why I had added these. Initially the ring threads didn't inherit the signal handlers libfuse worker threads have. I had fixed that later and these error conditions are a left over. In libfuse idea is that the main thread gets all signals and then sets se->exited - worker thread, include ring threads are not supposed to get or handle signals at all, but have to monitor se->exited. Good catch Stefan, I think I can remove these conditions in libfuse. Thanks, Bernd
On 9/8/25 3:45 PM, Bernd Schubert wrote: > > > On 9/8/25 21:09, Brian Song wrote: >> >> >> On 9/3/25 7:51 AM, Stefan Hajnoczi wrote: >>> On Fri, Aug 29, 2025 at 10:50:23PM -0400, Brian Song wrote: >>>> https://docs.kernel.org/filesystems/fuse-io-uring.html >>>> >>>> As described in the kernel documentation, after FUSE-over-io_uring >>>> initialization and handshake, FUSE interacts with the kernel using >>>> SQE/CQE to send requests and receive responses. This corresponds to >>>> the "Sending requests with CQEs" section in the docs. >>>> >>>> This patch implements three key parts: registering the CQE handler >>>> (fuse_uring_cqe_handler), processing FUSE requests (fuse_uring_co_ >>>> process_request), and sending response results (fuse_uring_send_ >>>> response). It also merges the traditional /dev/fuse request handling >>>> with the FUSE-over-io_uring handling functions. >>>> >>>> Suggested-by: Kevin Wolf <kwolf@redhat.com> >>>> Suggested-by: Stefan Hajnoczi <stefanha@redhat.com> >>>> Signed-off-by: Brian Song <hibriansong@gmail.com> >>>> --- >>>> block/export/fuse.c | 457 ++++++++++++++++++++++++++++++-------------- >>>> 1 file changed, 309 insertions(+), 148 deletions(-) >>>> >>>> diff --git a/block/export/fuse.c b/block/export/fuse.c >>>> index 19bf9e5f74..07f74fc8ec 100644 >>>> --- a/block/export/fuse.c >>>> +++ b/block/export/fuse.c >>>> @@ -310,6 +310,47 @@ static const BlockDevOps fuse_export_blk_dev_ops = { >>>> }; >>>> >>>> #ifdef CONFIG_LINUX_IO_URING >>>> +static void coroutine_fn fuse_uring_co_process_request(FuseRingEnt *ent); >>>> + >>>> +static void coroutine_fn co_fuse_uring_queue_handle_cqes(void *opaque) >>> >>> This function appears to handle exactly one cqe. A singular function >>> name would be clearer than a plural: co_fuse_uring_queue_handle_cqe(). >>> >>>> +{ >>>> + FuseRingEnt *ent = opaque; >>>> + FuseExport *exp = ent->rq->q->exp; >>>> + >>>> + /* Going to process requests */ >>>> + fuse_inc_in_flight(exp); >>> >>> What is the rationale for taking a reference here? Normally something >>> already holds a reference (e.g. the request itself) and it will be >>> dropped somewhere inside a function we're about to call, but we still >>> need to access exp afterwards, so we temporarily take a reference. >>> Please document the specifics in a comment. >>> >>> I think blk_exp_ref()/blk_exp_unref() are appropriate instead of >>> fuse_inc_in_flight()/fuse_dec_in_flight() since we only need to hold >>> onto the export and don't care about drain behavior. >>> >> >> Stefan: >> >> When handling FUSE requests, we don’t want the FuseExport to be >> accidentally deleted. Therefore, we use fuse_inc_in_flight in the CQE >> handler to increment the in_flight counter, and when a request is >> completed, we call fuse_dec_in_flight to decrement it. Once the last >> request has been processed, fuse_dec_in_flight brings the in_flight >> counter down to 0, indicating that the export can safely be deleted. The >> usage of in_flight follows the same logic as in traditional FUSE request >> handling. >> >> Since submitted SQEs for FUSE cannot be canceled, once we register or >> commit them we must wait for the kernel to return a CQE. Otherwise, the >> kernel may deliver a CQE and invoke its handler after the export has >> already been deleted. For this reason, we directly call blk_exp_ref and >> blk_exp_unref when submitting an SQE and when receiving its CQE, to >> explicitly control the export reference and prevent accidental deletion. >> >> The doc/comment for co_fuse_uring_queue_handle_cqe: >> >> Protect FuseExport from premature deletion while handling FUSE requests. >> CQE handlers inc/dec the in_flight counter; when it reaches 0, the >> export can be freed. This follows the same logic as traditional FUSE. >> >> Since FUSE SQEs cannot be canceled, a CQE may arrive after commit even >> if the export is deleted. To prevent this, we ref/unref the export >> explicitly at SQE submission and CQE completion. >> >>>> + >>>> + /* A ring entry returned */ >>>> + fuse_uring_co_process_request(ent); >>>> + >>>> + /* Finished processing requests */ >>>> + fuse_dec_in_flight(exp); >>>> +} >>>> + >>>> +static void fuse_uring_cqe_handler(CqeHandler *cqe_handler) >>>> +{ >>>> + FuseRingEnt *ent = container_of(cqe_handler, FuseRingEnt, fuse_cqe_handler); >>>> + Coroutine *co; >>>> + FuseExport *exp = ent->rq->q->exp; >>>> + >>>> + if (unlikely(exp->halted)) { >>>> + return; >>>> + } >>>> + >>>> + int err = cqe_handler->cqe.res; >>>> + >>>> + if (err != 0) { >>>> + /* -ENOTCONN is ok on umount */ >>>> + if (err != -EINTR && err != -EAGAIN && >>>> + err != -ENOTCONN) { >>>> + fuse_export_halt(exp); >>>> + } >>> >>> How are EINTR and EAGAIN handled if they are silently ignored? When did >>> you encounter these error codes? >> >> Bernd: >> >> I have the same question about this. As for how the kernel returns >> errors, I haven’t studied each case yet. In libfuse it’s implemented the >> same way, could you briefly explain why we choose to ignore these two >> errors, and under what circumstances we might encounter them? > > > I think I remember why I had added these. Initially the ring threads > didn't inherit the signal handlers libfuse worker threads have. I had > fixed that later and these error conditions are a left over. > In libfuse idea is that the main thread gets all signals and then sets > se->exited - worker thread, include ring threads are not supposed to get > or handle signals at all, but have to monitor se->exited. > > Good catch Stefan, I think I can remove these conditions in libfuse. > > > Thanks, > Bernd > In libfuse: static int fuse_uring_queue_handle_cqes(struct fuse_ring_queue *queue) { struct fuse_ring_pool *ring_pool = queue->ring_pool; struct fuse_session *se = ring_pool->se; size_t num_completed = 0; struct io_uring_cqe *cqe; unsigned int head; int ret = 0; io_uring_for_each_cqe(&queue->ring, head, cqe) { int err = 0; num_completed++; err = cqe->res; if (err != 0) { if (err > 0 && ((uintptr_t)io_uring_cqe_get_data(cqe) == (unsigned int)queue->eventfd)) { /* teardown from eventfd */ return -ENOTCONN; } // XXX: Needs rate limited logs, otherwise log spam //fuse_log(FUSE_LOG_ERR, "cqe res: %d\n", cqe->res); /* -ENOTCONN is ok on umount */ if (err != -EINTR && err != -EAGAIN && err != -ENOTCONN) { se->error = cqe->res; /* return first error */ if (ret == 0) ret = err; } } else { fuse_uring_handle_cqe(queue, cqe); } } if (num_completed) io_uring_cq_advance(&queue->ring, num_completed); return ret == 0 ? 0 : num_completed; } If err > 0 && ((uintptr_t)io_uring_cqe_get_data(cqe) == (unsigned int)queue->eventfd), it will return the negative value -ENOTCONN so that the caller sets se->exited = 1. Then, under what circumstances is err > 0? When is err < 0? The current code also doesn't seem to handle the case where err is negative?
© 2016 - 2025 Red Hat, Inc.