From nobody Tue Apr 7 11:14:50 2026 Received: from devnull.danielhodges.dev (vps-2f6e086e.vps.ovh.us [135.148.138.8]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by smtp.subspace.kernel.org (Postfix) with ESMTPS id E93E71E868; Fri, 13 Mar 2026 13:09:52 +0000 (UTC) Authentication-Results: smtp.subspace.kernel.org; arc=none smtp.client-ip=135.148.138.8 ARC-Seal: i=1; a=rsa-sha256; d=subspace.kernel.org; s=arc-20240116; t=1773407395; cv=none; b=Ccj7MlY9fZwMQLZ/Qqx8+Ay84zLjSu8KblEcOOl/CrHl3kKiQjUEt+0g9U3DFcv7wrdQX6uul0qGgXtQMfq8cKhQ39SQvRX3IZhZpHeG4comuheN1chB4gGpR0xOU1vrKizSH+QaaNZpU9nNAg9TsIc/q3BNKIGLJxy1uR9EC7E= ARC-Message-Signature: i=1; a=rsa-sha256; d=subspace.kernel.org; s=arc-20240116; t=1773407395; c=relaxed/simple; bh=2ROS8QZcSMfR0zuV9L6a6XKE+p0ehtYdt5MhLFY/oYY=; h=From:To:Cc:Subject:Date:Message-ID:In-Reply-To:References: MIME-Version:Content-Type; b=jGJ89DIpu4DV8THWKamlpEFl4GzpxmlWA+TrStLQ1qMHlm+Dnwt9evM/vU4j5Humhtr1dtPpbLsWccxExKABNYx6q1PSouDq4iv8LDdSdD517SYw4jWfkbOX9HwlRh3F1bELiyLlkGKbsv33CUX09XfbaOHu8cPU5+9oiiXHsao= ARC-Authentication-Results: i=1; smtp.subspace.kernel.org; dmarc=pass (p=reject dis=none) header.from=danielhodges.dev; spf=pass smtp.mailfrom=danielhodges.dev; dkim=pass (2048-bit key) header.d=danielhodges.dev header.i=@danielhodges.dev header.b=JO5Qd4d6; dkim=permerror (0-bit key) header.d=danielhodges.dev header.i=@danielhodges.dev header.b=4h59N4PA; arc=none smtp.client-ip=135.148.138.8 Authentication-Results: smtp.subspace.kernel.org; dmarc=pass (p=reject dis=none) header.from=danielhodges.dev Authentication-Results: smtp.subspace.kernel.org; spf=pass smtp.mailfrom=danielhodges.dev Authentication-Results: smtp.subspace.kernel.org; dkim=pass (2048-bit key) header.d=danielhodges.dev header.i=@danielhodges.dev header.b="JO5Qd4d6"; dkim=permerror (0-bit key) header.d=danielhodges.dev header.i=@danielhodges.dev header.b="4h59N4PA" DKIM-Signature: v=1; a=rsa-sha256; s=202510r; d=danielhodges.dev; c=relaxed/relaxed; h=Message-ID:Date:Subject:To:From; t=1773407260; bh=PLOjTc7JxcptdK8LCcTzQLv nAVhE0pFPknQIAoqoT3w=; b=JO5Qd4d6je62OT0WgCeoD3yjwreu9/DMdx9ClCfwsCLKNL5TXK 0ywA33NvNHGjaS/T7a7BCulRTE5tfCJuFpS8EkIj/utXMz/YR1YwSJ8kPxBGuiOkG1sR7Ui7qcV yevC9IpjBgKL853vdx9pknZAeJIVQX8/HtTL2M2JNmijoyZO/O77dvO1VUoWyZDE6VUX5Gusbef I97yfMp8GdVSNXgN5a2p5+SEnNerO88o6ZAc9IXtQ1/ib6t1NTqTeXRdSy/Y1viTo0E6zwCB1DI V46ymIs3cREkLCWN601zxIViH21ogImA/kBygHabhc+RWfynHLk5qN/54wUXatXrjeg==; DKIM-Signature: v=1; a=ed25519-sha256; s=202510e; d=danielhodges.dev; c=relaxed/relaxed; h=Message-ID:Date:Subject:To:From; t=1773407260; bh=PLOjTc7JxcptdK8LCcTzQLv nAVhE0pFPknQIAoqoT3w=; b=4h59N4PACA9Z2V3Y5ISXsFtrrojGdR4AGN1nt5aC+IQaLfvJvx 0paFTdLhn6+CKO5/VL8kwU0dZjrXAL1vRYAw==; From: Daniel Hodges To: Jens Axboe Cc: Daniel Hodges , Pavel Begunkov , io-uring@vger.kernel.org, linux-kernel@vger.kernel.org Subject: [RFC PATCH 1/2] io_uring: add high-performance IPC channel infrastructure Date: Fri, 13 Mar 2026 09:07:38 -0400 Message-ID: <20260313130739.23265-2-git@danielhodges.dev> X-Mailer: git-send-email 2.52.0 In-Reply-To: <20260313130739.23265-1-git@danielhodges.dev> References: <20260313130739.23265-1-git@danielhodges.dev> Precedence: bulk X-Mailing-List: linux-kernel@vger.kernel.org List-Id: List-Subscribe: List-Unsubscribe: MIME-Version: 1.0 Content-Type: text/plain; charset="utf-8" Content-Transfer-Encoding: quoted-printable Add a new IPC mechanism built on top of io_uring for message passing between processes. Features: - Shared memory ring buffer for message data transfer - RCU-based subscriber lookup on send/recv paths - Broadcast mode (all subscribers receive each message) - Channel-based design supporting multiple subscribers - Permission checking modeled on Unix file permissions - Anonymous file per channel for mmap support Architecture: - Two new io_uring opcodes: IPC_SEND and IPC_RECV - Four new registration commands for channel lifecycle: CREATE, ATTACH, DETACH, and BUFFERS (BUFFERS is a stub) - Channels are identified by numeric ID or 64-bit key - Data is copied between userspace and a kernel ring buffer via copy_from_user/copy_to_user - Multicast (round-robin) flag is defined but not yet implemented Send/recv hot path optimizations: - Cache the descriptor array base pointer in struct io_ipc_channel to avoid recomputing it on every send/recv - Use __copy_from_user_inatomic/__copy_to_user_inatomic with fallback to the standard copy variants - Add prefetch/prefetchw hints for descriptor and data access - In non-broadcast mode, wake only the first receiver instead of iterating all subscribers - Simplify error handling in io_ipc_send() using inline returns - Remove channel pointer caching in request structs, emptying the send and recv cleanup handlers Signed-off-by: Daniel Hodges --- include/linux/io_uring_types.h | 7 + include/uapi/linux/io_uring.h | 74 +++ io_uring/Kconfig | 14 + io_uring/Makefile | 1 + io_uring/io_uring.c | 6 + io_uring/ipc.c | 1002 ++++++++++++++++++++++++++++++++ io_uring/ipc.h | 161 +++++ io_uring/opdef.c | 19 + io_uring/register.c | 25 + 9 files changed, 1309 insertions(+) create mode 100644 io_uring/ipc.c create mode 100644 io_uring/ipc.h diff --git a/include/linux/io_uring_types.h b/include/linux/io_uring_types.h index dd1420bfcb73..fbbe51aaca68 100644 --- a/include/linux/io_uring_types.h +++ b/include/linux/io_uring_types.h @@ -435,6 +435,13 @@ struct io_ring_ctx { u32 pers_next; struct xarray personalities; =20 +#ifdef CONFIG_IO_URING_IPC + /* IPC subscriber tracking - keyed by subscriber_id */ + struct xarray ipc_subscribers; + /* IPC channels created by this ring - keyed by channel_id */ + struct xarray ipc_channels; +#endif + /* hashed buffered write serialization */ struct io_wq_hash *hash_map; =20 diff --git a/include/uapi/linux/io_uring.h b/include/uapi/linux/io_uring.h index 17ac1b785440..a5b68bd1a047 100644 --- a/include/uapi/linux/io_uring.h +++ b/include/uapi/linux/io_uring.h @@ -318,6 +318,8 @@ enum io_uring_op { IORING_OP_PIPE, IORING_OP_NOP128, IORING_OP_URING_CMD128, + IORING_OP_IPC_SEND, + IORING_OP_IPC_RECV, =20 /* this goes last, obviously */ IORING_OP_LAST, @@ -723,6 +725,12 @@ enum io_uring_register_op { /* register bpf filtering programs */ IORING_REGISTER_BPF_FILTER =3D 37, =20 + /* IPC channel operations */ + IORING_REGISTER_IPC_CHANNEL_CREATE =3D 38, + IORING_REGISTER_IPC_CHANNEL_ATTACH =3D 39, + IORING_REGISTER_IPC_CHANNEL_DETACH =3D 40, + IORING_REGISTER_IPC_CHANNEL_DESTROY =3D 41, + /* this goes last */ IORING_REGISTER_LAST, =20 @@ -1057,6 +1065,72 @@ struct io_timespec { __u64 tv_nsec; }; =20 +/* + * IPC channel support + */ + +/* Flags for IPC channel creation */ +#define IOIPC_F_BROADCAST (1U << 0) /* Broadcast mode (all subscribers re= ceive) */ +#define IOIPC_F_MULTICAST (1U << 1) /* Multicast mode (round-robin delive= ry) */ +#define IOIPC_F_PRIVATE (1U << 2) /* Private (permissions enforced stric= tly) */ + +/* Flags for subscriber attachment */ +#define IOIPC_SUB_SEND (1U << 0) /* Can send to channel */ +#define IOIPC_SUB_RECV (1U << 1) /* Can receive from channel */ +#define IOIPC_SUB_BOTH (IOIPC_SUB_SEND | IOIPC_SUB_RECV) + +/* Create IPC channel */ +struct io_uring_ipc_channel_create { + __u32 flags; /* IOIPC_F_BROADCAST, IOIPC_F_PRIVATE */ + __u32 ring_entries; /* Number of message slots */ + __u32 max_msg_size; /* Maximum message size */ + __u32 mode; /* Permission bits (like chmod) */ + __u64 key; /* Unique key for channel (like ftok()) */ + __u32 channel_id_out; /* Returned channel ID */ + __u32 reserved[3]; +}; + +/* Attach to existing channel */ +struct io_uring_ipc_channel_attach { + __u32 channel_id; /* Attach by channel ID (when key =3D=3D 0) */ + __u32 flags; /* IOIPC_SUB_SEND, IOIPC_SUB_RECV, IOIPC_SUB_BOTH */ + __u64 key; /* Non-zero: attach by key instead of channel_id */ + __s32 channel_fd; /* Output: fd for mmap */ + __u32 local_id_out; /* Output: local subscriber ID */ + __u64 mmap_offset_out; /* Output: offset for mmap() */ + __u32 region_size; /* Output: size of shared region */ + __u32 reserved[3]; +}; + +/* Message descriptor in the ring */ +struct io_uring_ipc_msg_desc { + __u64 offset; /* Offset in data region for message payload */ + __u32 len; /* Message length */ + __u32 msg_id; /* Unique message ID */ + __u64 sender_data; /* Sender's user_data for context */ +}; + +/* Shared ring structure (mmap'd to userspace) */ +struct io_uring_ipc_ring { + /* Cache-aligned producer/consumer positions */ + struct { + __u32 head __attribute__((aligned(64))); + __u32 tail; + } producer; + + struct { + __u32 head __attribute__((aligned(64))); + } consumer; + + /* Ring parameters */ + __u32 ring_mask; + __u32 ring_entries; + __u32 max_msg_size; + + /* Array of message descriptors follows */ + struct io_uring_ipc_msg_desc msgs[]; +}; + #ifdef __cplusplus } #endif diff --git a/io_uring/Kconfig b/io_uring/Kconfig index a7ae23cf1035..d7d7f858b46c 100644 --- a/io_uring/Kconfig +++ b/io_uring/Kconfig @@ -14,3 +14,17 @@ config IO_URING_BPF def_bool y depends on BPF depends on NET + +config IO_URING_IPC + bool "io_uring IPC support" + depends on IO_URING + default y + help + Enable io_uring-based inter-process communication. + + This provides high-performance IPC channels that leverage + io_uring's shared memory infrastructure and async notification + model. Supports broadcast and multicast patterns with + zero-copy capabilities. + + If unsure, say Y. diff --git a/io_uring/Makefile b/io_uring/Makefile index 931f9156132a..380fe8a19174 100644 --- a/io_uring/Makefile +++ b/io_uring/Makefile @@ -17,6 +17,7 @@ obj-$(CONFIG_IO_URING) +=3D io_uring.o opdef.o kbuf.o rs= rc.o notif.o \ query.o =20 obj-$(CONFIG_IO_URING_ZCRX) +=3D zcrx.o +obj-$(CONFIG_IO_URING_IPC) +=3D ipc.o obj-$(CONFIG_IO_WQ) +=3D io-wq.o obj-$(CONFIG_FUTEX) +=3D futex.o obj-$(CONFIG_EPOLL) +=3D epoll.o diff --git a/io_uring/io_uring.c b/io_uring/io_uring.c index 9a37035e76c0..6dccc005aaca 100644 --- a/io_uring/io_uring.c +++ b/io_uring/io_uring.c @@ -87,6 +87,7 @@ #include "msg_ring.h" #include "memmap.h" #include "zcrx.h" +#include "ipc.h" =20 #include "timeout.h" #include "poll.h" @@ -251,6 +252,10 @@ static __cold struct io_ring_ctx *io_ring_ctx_alloc(st= ruct io_uring_params *p) init_waitqueue_head(&ctx->sqo_sq_wait); INIT_LIST_HEAD(&ctx->sqd_list); INIT_LIST_HEAD(&ctx->cq_overflow_list); +#ifdef CONFIG_IO_URING_IPC + xa_init(&ctx->ipc_subscribers); + xa_init(&ctx->ipc_channels); +#endif ret =3D io_alloc_cache_init(&ctx->apoll_cache, IO_POLL_ALLOC_CACHE_MAX, sizeof(struct async_poll), 0); ret |=3D io_alloc_cache_init(&ctx->netmsg_cache, IO_ALLOC_CACHE_MAX, @@ -2154,6 +2159,7 @@ static __cold void io_ring_ctx_free(struct io_ring_ct= x *ctx) io_sqe_buffers_unregister(ctx); io_sqe_files_unregister(ctx); io_unregister_zcrx_ifqs(ctx); + io_ipc_ctx_cleanup(ctx); io_cqring_overflow_kill(ctx); io_eventfd_unregister(ctx); io_free_alloc_caches(ctx); diff --git a/io_uring/ipc.c b/io_uring/ipc.c new file mode 100644 index 000000000000..3f4daf75c843 --- /dev/null +++ b/io_uring/ipc.c @@ -0,0 +1,1002 @@ +// SPDX-License-Identifier: GPL-2.0 +/* + * io_uring IPC channel implementation + * + * High-performance inter-process communication using io_uring infrastruct= ure. + * Provides broadcast and multicast channels with zero-copy support. + */ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include "io_uring.h" +#include "rsrc.h" +#include "memmap.h" +#include "ipc.h" + +#ifdef CONFIG_IO_URING_IPC + +/* + * Global channel registry + * Protected by RCU and spinlock + */ +static DEFINE_HASHTABLE(channel_hash, 8); +static DEFINE_SPINLOCK(channel_hash_lock); +static DEFINE_XARRAY_ALLOC(channel_xa); +static atomic_t ipc_global_sub_id =3D ATOMIC_INIT(0); + +static int ipc_channel_mmap(struct file *file, struct vm_area_struct *vma) +{ + struct io_ipc_channel *channel; + size_t region_size; + unsigned long uaddr =3D vma->vm_start; + unsigned long size =3D vma->vm_end - vma->vm_start; + void *kaddr; + unsigned long pfn; + int ret; + + /* + * Safely acquire a channel reference. A concurrent + * io_ipc_channel_destroy() can NULL private_data and drop + * all references; the channel is freed via call_rcu(), so + * it stays accessible under rcu_read_lock(). + */ + rcu_read_lock(); + channel =3D READ_ONCE(file->private_data); + if (!channel || !refcount_inc_not_zero(&channel->ref_count)) { + rcu_read_unlock(); + return -EINVAL; + } + rcu_read_unlock(); + + region_size =3D io_region_size(channel->region); + kaddr =3D channel->region->ptr; + + /* Validate mmap parameters */ + if (vma->vm_pgoff !=3D 0) { + ret =3D -EINVAL; + goto out_put; + } + + if (size > region_size) { + ret =3D -EINVAL; + goto out_put; + } + + if (vma->vm_flags & VM_WRITE) { + ret =3D -EACCES; + goto out_put; + } + + /* Prevent mprotect from later adding write permission */ + vm_flags_clear(vma, VM_MAYWRITE); + + /* Map the vmalloc'd region page by page */ + vm_flags_set(vma, VM_DONTEXPAND | VM_DONTDUMP); + + while (size > 0) { + pfn =3D vmalloc_to_pfn(kaddr); + ret =3D remap_pfn_range(vma, uaddr, pfn, PAGE_SIZE, vma->vm_page_prot); + if (ret) + goto out_put; + + uaddr +=3D PAGE_SIZE; + kaddr +=3D PAGE_SIZE; + size -=3D PAGE_SIZE; + } + + ret =3D 0; +out_put: + io_ipc_channel_put(channel); + return ret; +} + +static int ipc_channel_release(struct inode *inode, struct file *file) +{ + struct io_ipc_channel *channel =3D file->private_data; + + if (channel) + io_ipc_channel_put(channel); + + return 0; +} + +static const struct file_operations ipc_channel_fops =3D { + .mmap =3D ipc_channel_mmap, + .release =3D ipc_channel_release, + .llseek =3D noop_llseek, +}; + + +static int ipc_calc_region_size(u32 ring_entries, u32 max_msg_size, + size_t *ring_size_out, size_t *data_size_out, + size_t *total_size_out) +{ + size_t ring_size, data_size, total_size; + + ring_size =3D sizeof(struct io_ipc_ring) + + ring_entries * sizeof(struct io_ipc_msg_desc); + ring_size =3D ALIGN(ring_size, PAGE_SIZE); + + if ((u64)ring_entries * max_msg_size > INT_MAX) + return -EINVAL; + + data_size =3D (size_t)ring_entries * max_msg_size; + data_size =3D ALIGN(data_size, PAGE_SIZE); + + total_size =3D ring_size + data_size; + + if (total_size > INT_MAX) + return -EINVAL; + + *ring_size_out =3D ring_size; + *data_size_out =3D data_size; + *total_size_out =3D total_size; + + return 0; +} + +static int ipc_region_alloc(struct io_ipc_channel *channel, u32 ring_entri= es, + u32 max_msg_size) +{ + struct io_mapped_region *region; + size_t ring_size, data_size, total_size; + void *ptr; + int ret; + + ret =3D ipc_calc_region_size(ring_entries, max_msg_size, &ring_size, + &data_size, &total_size); + if (ret) + return ret; + + region =3D kzalloc(sizeof(*region), GFP_KERNEL); + if (!region) + return -ENOMEM; + + ptr =3D vmalloc_user(total_size); + if (!ptr) { + kfree(region); + return -ENOMEM; + } + + region->ptr =3D ptr; + region->nr_pages =3D (total_size + PAGE_SIZE - 1) >> PAGE_SHIFT; + region->flags =3D 0; + + channel->region =3D region; + channel->ring =3D (struct io_ipc_ring *)ptr; + channel->desc_array =3D (struct io_ipc_msg_desc *)((u8 *)ptr + sizeof(str= uct io_ipc_ring)); + channel->data_region =3D ptr + ring_size; + + channel->ring->ring_mask =3D ring_entries - 1; + channel->ring->ring_entries =3D ring_entries; + channel->ring->max_msg_size =3D max_msg_size; + + return 0; +} + +static void ipc_region_free(struct io_ipc_channel *channel) +{ + if (!channel->region) + return; + + if (channel->region->ptr) + vfree(channel->region->ptr); + kfree(channel->region); + channel->region =3D NULL; + channel->ring =3D NULL; + channel->data_region =3D NULL; +} + +static void io_ipc_channel_free(struct io_ipc_channel *channel) +{ + struct io_ipc_subscriber *sub; + unsigned long index; + + xa_for_each(&channel->subscribers, index, sub) + kfree(sub); + xa_destroy(&channel->subscribers); + + if (channel->file) { + /* + * Prevent the deferred fput release callback from + * accessing the channel after it has been freed. + */ + WRITE_ONCE(channel->file->private_data, NULL); + fput(channel->file); + } + + ipc_region_free(channel); + + kfree(channel); +} + +static void io_ipc_channel_free_rcu(struct rcu_head *rcu) +{ + struct io_ipc_channel *channel; + + channel =3D container_of(rcu, struct io_ipc_channel, rcu); + io_ipc_channel_free(channel); +} + +void io_ipc_channel_put(struct io_ipc_channel *channel) +{ + if (refcount_dec_and_test(&channel->ref_count)) { + /* + * Remove from global data structures before scheduling + * the RCU callback. Lookups use refcount_inc_not_zero() + * so no new references can appear. Removing here avoids + * taking xa_lock (a plain spinlock) inside the RCU + * callback which runs in softirq context =E2=80=94 that would + * deadlock against xa_alloc() in process context. + */ + spin_lock_bh(&channel_hash_lock); + if (!hlist_unhashed(&channel->hash_node)) + hash_del_rcu(&channel->hash_node); + spin_unlock_bh(&channel_hash_lock); + + xa_erase(&channel_xa, channel->channel_id); + + call_rcu(&channel->rcu, io_ipc_channel_free_rcu); + } +} + +struct io_ipc_channel *io_ipc_channel_get(u32 channel_id) +{ + struct io_ipc_channel *channel; + + rcu_read_lock(); + channel =3D xa_load(&channel_xa, channel_id); + if (channel && !refcount_inc_not_zero(&channel->ref_count)) + channel =3D NULL; + rcu_read_unlock(); + + return channel; +} + +struct io_ipc_channel *io_ipc_channel_get_by_key(u64 key) +{ + struct io_ipc_channel *channel; + u32 hash =3D hash_64(key, HASH_BITS(channel_hash)); + + rcu_read_lock(); + hash_for_each_possible_rcu(channel_hash, channel, hash_node, hash) { + if (channel->key =3D=3D key && + refcount_inc_not_zero(&channel->ref_count)) { + rcu_read_unlock(); + return channel; + } + } + rcu_read_unlock(); + + return NULL; +} + +static int ipc_check_permission(struct io_ipc_channel *channel, u32 access) +{ + const struct cred *cred =3D current_cred(); + kuid_t uid =3D cred->fsuid; + kgid_t gid =3D cred->fsgid; + u16 mode =3D channel->mode; + u16 needed =3D 0; + + /* Map IPC access flags to Unix permission bits: send=3Dwrite, recv=3Drea= d */ + if (access & IOIPC_SUB_RECV) + needed |=3D 4; /* read */ + if (access & IOIPC_SUB_SEND) + needed |=3D 2; /* write */ + + if (uid_eq(uid, channel->owner_uid)) + return ((mode >> 6) & needed) =3D=3D needed ? 0 : -EACCES; + + if (gid_eq(gid, channel->owner_gid)) + return ((mode >> 3) & needed) =3D=3D needed ? 0 : -EACCES; + + return (mode & needed) =3D=3D needed ? 0 : -EACCES; +} + +int io_ipc_channel_create(struct io_ring_ctx *ctx, + const struct io_uring_ipc_channel_create __user *arg) +{ + struct io_uring_ipc_channel_create create; + struct io_ipc_channel *channel; + u32 hash; + int ret; + + if (copy_from_user(&create, arg, sizeof(create))) + return -EFAULT; + + if (!mem_is_zero(create.reserved, sizeof(create.reserved))) + return -EINVAL; + + if (!create.ring_entries || create.ring_entries > IORING_MAX_ENTRIES) + return -EINVAL; + + if (!is_power_of_2(create.ring_entries)) + return -EINVAL; + + if (!create.max_msg_size || create.max_msg_size > SZ_1M) + return -EINVAL; + + if (create.flags & ~(IOIPC_F_BROADCAST | IOIPC_F_MULTICAST | + IOIPC_F_PRIVATE)) + return -EINVAL; + + if ((create.flags & IOIPC_F_BROADCAST) && + (create.flags & IOIPC_F_MULTICAST)) + return -EINVAL; + + channel =3D kzalloc(sizeof(*channel), GFP_KERNEL); + if (!channel) + return -ENOMEM; + + refcount_set(&channel->ref_count, 1); + channel->flags =3D create.flags; + channel->key =3D create.key; + channel->msg_max_size =3D create.max_msg_size; + channel->owner_uid =3D current_fsuid(); + channel->owner_gid =3D current_fsgid(); + channel->mode =3D create.mode & 0666; + atomic_set(&channel->next_msg_id, 1); + atomic_set(&channel->next_receiver_idx, 0); + atomic_set(&channel->recv_count, 0); + xa_init(&channel->subscribers); + + ret =3D ipc_region_alloc(channel, create.ring_entries, create.max_msg_siz= e); + if (ret) + goto err_free_channel; + + refcount_inc(&channel->ref_count); /* File holds a reference */ + channel->file =3D anon_inode_getfile("[io_uring_ipc]", &ipc_channel_fops, + channel, O_RDWR); + if (IS_ERR(channel->file)) { + ret =3D PTR_ERR(channel->file); + channel->file =3D NULL; + refcount_dec(&channel->ref_count); + goto err_free_region; + } + + ret =3D xa_alloc(&channel_xa, &channel->channel_id, channel, + XA_LIMIT(1, INT_MAX), GFP_KERNEL); + if (ret < 0) + goto err_put_file; + + hash =3D hash_64(create.key, HASH_BITS(channel_hash)); + spin_lock_bh(&channel_hash_lock); + hash_add_rcu(channel_hash, &channel->hash_node, hash); + spin_unlock_bh(&channel_hash_lock); + + /* + * Track the channel in the creating ring so that + * io_ipc_ctx_cleanup() can break the channel-file reference + * cycle if the ring is torn down without an explicit destroy. + */ + ret =3D xa_insert(&ctx->ipc_channels, channel->channel_id, + xa_mk_value(0), GFP_KERNEL); + if (ret) + goto err_remove_channel; + + create.channel_id_out =3D channel->channel_id; + if (copy_to_user((void __user *)arg, &create, sizeof(create))) { + ret =3D -EFAULT; + goto err_remove_ctx; + } + + return 0; + +err_remove_ctx: + xa_erase(&ctx->ipc_channels, channel->channel_id); +err_remove_channel: + spin_lock_bh(&channel_hash_lock); + hash_del_rcu(&channel->hash_node); + spin_unlock_bh(&channel_hash_lock); + xa_erase(&channel_xa, channel->channel_id); +err_put_file: + /* + * fput() is deferred =E2=80=94 the release callback will call + * io_ipc_channel_put() to drop the file's reference. + * Drop the initial reference here; it can't reach zero + * yet because the file still holds one. + */ + fput(channel->file); + channel->file =3D NULL; + io_ipc_channel_put(channel); + return ret; +err_free_region: + ipc_region_free(channel); +err_free_channel: + kfree(channel); + return ret; +} + +int io_ipc_channel_attach(struct io_ring_ctx *ctx, + const struct io_uring_ipc_channel_attach __user *arg) +{ + struct io_uring_ipc_channel_attach attach; + struct io_ipc_channel *channel =3D NULL; + struct io_ipc_subscriber *sub; + int ret; + + if (copy_from_user(&attach, arg, sizeof(attach))) + return -EFAULT; + + if (!mem_is_zero(attach.reserved, sizeof(attach.reserved))) + return -EINVAL; + + if (!attach.flags || (attach.flags & ~IOIPC_SUB_BOTH)) + return -EINVAL; + + if (attach.key) + channel =3D io_ipc_channel_get_by_key(attach.key); + else + channel =3D io_ipc_channel_get(attach.channel_id); + + if (!channel) + return -ENOENT; + + ret =3D ipc_check_permission(channel, attach.flags); + if (ret) + goto err_put_channel; + + sub =3D kzalloc(sizeof(*sub), GFP_KERNEL); + if (!sub) { + ret =3D -ENOMEM; + goto err_put_channel; + } + + sub->ctx =3D ctx; + sub->channel =3D channel; + sub->flags =3D attach.flags; + sub->local_head =3D 0; + sub->subscriber_id =3D atomic_inc_return(&ipc_global_sub_id); + + ret =3D xa_insert(&channel->subscribers, sub->subscriber_id, sub, + GFP_KERNEL); + if (ret) { + kfree(sub); + goto err_put_channel; + } + + refcount_inc(&channel->ref_count); + if (attach.flags & IOIPC_SUB_RECV) + atomic_inc(&channel->recv_count); + + ret =3D xa_insert(&ctx->ipc_subscribers, sub->subscriber_id, sub, + GFP_KERNEL); + if (ret) + goto err_remove_chan_sub; + + ret =3D get_unused_fd_flags(O_RDWR | O_CLOEXEC); + if (ret < 0) + goto err_remove_sub; + + attach.local_id_out =3D sub->subscriber_id; + attach.region_size =3D io_region_size(channel->region); + attach.channel_fd =3D ret; + attach.mmap_offset_out =3D 0; + + if (copy_to_user((void __user *)arg, &attach, sizeof(attach))) { + put_unused_fd(ret); + ret =3D -EFAULT; + goto err_remove_sub; + } + + { + struct file *f =3D get_file_active(&channel->file); + + if (!f) { + put_unused_fd(attach.channel_fd); + ret =3D -EINVAL; + goto err_remove_sub; + } + fd_install(attach.channel_fd, f); + } + + io_ipc_channel_put(channel); + return 0; + +err_remove_sub: + xa_erase(&ctx->ipc_subscribers, sub->subscriber_id); +err_remove_chan_sub: + xa_erase(&channel->subscribers, sub->subscriber_id); + if (attach.flags & IOIPC_SUB_RECV) + atomic_dec(&channel->recv_count); + kfree_rcu(sub, rcu); + io_ipc_channel_put(channel); /* Drop subscriber's reference */ +err_put_channel: + io_ipc_channel_put(channel); /* Drop lookup reference */ + return ret; +} + +int io_ipc_channel_detach(struct io_ring_ctx *ctx, u32 subscriber_id) +{ + struct io_ipc_subscriber *sub; + struct io_ipc_channel *channel; + + sub =3D xa_erase(&ctx->ipc_subscribers, subscriber_id); + if (!sub) + return -ENOENT; + + channel =3D sub->channel; + xa_erase(&channel->subscribers, subscriber_id); + + if (sub->flags & IOIPC_SUB_RECV) + atomic_dec(&channel->recv_count); + + io_ipc_channel_put(channel); + kfree_rcu(sub, rcu); + return 0; +} + +/* + * Recompute consumer.head as the minimum local_head across all receive + * subscribers. Called lazily from the broadcast recv path (every 16 + * messages) and from the send path when the ring appears full. + */ +static void ipc_advance_consumer_head(struct io_ipc_channel *channel) +{ + struct io_ipc_subscriber *s; + struct io_ipc_ring *ring =3D channel->ring; + unsigned long index; + u32 min_head =3D READ_ONCE(ring->producer.tail); + + rcu_read_lock(); + xa_for_each(&channel->subscribers, index, s) { + if (s->flags & IOIPC_SUB_RECV) { + u32 sh =3D READ_ONCE(s->local_head); + + if ((s32)(sh - min_head) < 0) + min_head =3D sh; + } + } + rcu_read_unlock(); + + WRITE_ONCE(ring->consumer.head, min_head); +} + +static int ipc_wake_receivers(struct io_ipc_channel *channel, u32 target) +{ + struct io_ipc_subscriber *sub; + unsigned long index; + + rcu_read_lock(); + if (target) { + sub =3D xa_load(&channel->subscribers, target); + if (!sub || !(sub->flags & IOIPC_SUB_RECV)) { + rcu_read_unlock(); + return -ENOENT; + } + io_cqring_wake(sub->ctx); + rcu_read_unlock(); + return 0; + } + + if (channel->flags & IOIPC_F_MULTICAST) { + u32 recv_cnt =3D atomic_read(&channel->recv_count); + + if (recv_cnt) { + u32 rr =3D (u32)atomic_inc_return(&channel->next_receiver_idx); + u32 target_idx =3D rr % recv_cnt; + u32 i =3D 0; + + xa_for_each(&channel->subscribers, index, sub) { + if (sub->flags & IOIPC_SUB_RECV) { + if (i =3D=3D target_idx) { + io_cqring_wake(sub->ctx); + break; + } + i++; + } + } + } + } else { + xa_for_each(&channel->subscribers, index, sub) { + if (sub->flags & IOIPC_SUB_RECV) { + io_cqring_wake(sub->ctx); + if (!(channel->flags & IOIPC_F_BROADCAST)) + break; + } + } + } + rcu_read_unlock(); + return 0; +} + +int io_ipc_send_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe) +{ + struct io_ipc_send *ipc =3D io_kiocb_to_cmd(req, struct io_ipc_send); + + if (sqe->buf_index || sqe->personality) + return -EINVAL; + + if (sqe->ioprio || sqe->rw_flags) + return -EINVAL; + + ipc->channel_id =3D READ_ONCE(sqe->fd); + ipc->addr =3D READ_ONCE(sqe->addr); + ipc->len =3D READ_ONCE(sqe->len); + ipc->target =3D READ_ONCE(sqe->file_index); + + return 0; +} + +int io_ipc_send(struct io_kiocb *req, unsigned int issue_flags) +{ + struct io_ipc_send *ipc =3D io_kiocb_to_cmd(req, struct io_ipc_send); + struct io_ipc_subscriber *sub =3D NULL; + struct io_ipc_channel *channel =3D NULL; + struct io_ipc_ring *ring; + struct io_ipc_msg_desc *desc; + void __user *user_buf; + void *dest; + u32 head, tail, next_tail, idx; + u32 sub_flags; + int ret; + u32 fd =3D ipc->channel_id; + bool need_put =3D false; + bool retried_advance =3D false; + + /* O(1) subscriber lookup via xarray */ + rcu_read_lock(); + sub =3D xa_load(&req->ctx->ipc_subscribers, fd); + if (sub) { + channel =3D sub->channel; + if (!refcount_inc_not_zero(&channel->ref_count)) { + rcu_read_unlock(); + ret =3D -ENOENT; + goto fail; + } + sub_flags =3D sub->flags; + rcu_read_unlock(); + need_put =3D true; + + if (!(sub_flags & IOIPC_SUB_SEND)) { + ret =3D -EACCES; + goto fail_put; + } + goto found; + } + rcu_read_unlock(); + + channel =3D io_ipc_channel_get(fd); + if (!channel) { + ret =3D -ENOENT; + goto fail; + } + + need_put =3D true; + + ret =3D ipc_check_permission(channel, IOIPC_SUB_SEND); + if (ret) + goto fail_put; + +found: + ring =3D channel->ring; + + if (unlikely(ipc->len > channel->msg_max_size)) { + ret =3D -EMSGSIZE; + goto fail_put; + } + + /* Lock-free slot reservation via CAS on producer tail */ +retry: + do { + tail =3D READ_ONCE(ring->producer.tail); + next_tail =3D tail + 1; + head =3D READ_ONCE(ring->consumer.head); + + if (unlikely(next_tail - head > ring->ring_entries)) { + /* + * Ring full. For broadcast channels, try to + * advance consumer.head once by scanning the + * min local_head across all receivers. + */ + if ((channel->flags & IOIPC_F_BROADCAST) && + !retried_advance) { + ipc_advance_consumer_head(channel); + retried_advance =3D true; + goto retry; + } + ret =3D -ENOBUFS; + goto fail_put; + } + idx =3D tail & ring->ring_mask; + } while (cmpxchg(&ring->producer.tail, tail, next_tail) !=3D tail); + + /* Slot exclusively claimed =E2=80=94 write data and descriptor */ + dest =3D channel->data_region + (idx * channel->msg_max_size); + desc =3D &channel->desc_array[idx]; + + prefetchw(desc); + + user_buf =3D u64_to_user_ptr(ipc->addr); + if (unlikely(__copy_from_user_inatomic(dest, user_buf, ipc->len))) { + if (unlikely(copy_from_user(dest, user_buf, ipc->len))) { + /* + * Slot already claimed via CAS; mark it ready with + * zero length so consumers can advance past it. + */ + desc->offset =3D idx * channel->msg_max_size; + desc->len =3D 0; + desc->msg_id =3D 0; + desc->sender_data =3D 0; + smp_wmb(); + WRITE_ONCE(desc->seq, tail + 1); + ret =3D -EFAULT; + goto fail_put; + } + } + + desc->offset =3D idx * channel->msg_max_size; + desc->len =3D ipc->len; + desc->msg_id =3D atomic_inc_return(&channel->next_msg_id); + desc->sender_data =3D req->cqe.user_data; + + /* Ensure descriptor + data visible before marking slot ready */ + smp_wmb(); + WRITE_ONCE(desc->seq, tail + 1); + + ret =3D ipc_wake_receivers(channel, ipc->target); + if (ret) + goto fail_put; + + if (need_put) + io_ipc_channel_put(channel); + + io_req_set_res(req, ipc->len, 0); + return IOU_COMPLETE; + +fail_put: + if (need_put) + io_ipc_channel_put(channel); +fail: + req_set_fail(req); + io_req_set_res(req, ret, 0); + return IOU_COMPLETE; +} + + +int io_ipc_recv_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe) +{ + struct io_ipc_recv *ipc =3D io_kiocb_to_cmd(req, struct io_ipc_recv); + + if (sqe->buf_index || sqe->personality) + return -EINVAL; + + if (sqe->ioprio || sqe->rw_flags) + return -EINVAL; + + ipc->channel_id =3D READ_ONCE(sqe->fd); + ipc->addr =3D READ_ONCE(sqe->addr); + ipc->len =3D READ_ONCE(sqe->len); + + return 0; +} + +int io_ipc_recv(struct io_kiocb *req, unsigned int issue_flags) +{ + struct io_ipc_recv *ipc =3D io_kiocb_to_cmd(req, struct io_ipc_recv); + struct io_ipc_subscriber *sub =3D NULL; + struct io_ipc_channel *channel; + struct io_ipc_ring *ring; + struct io_ipc_msg_desc *desc; + void __user *user_buf; + void *src; + u32 head, tail, idx; + u32 sub_flags, sub_id; + size_t copy_len; + int ret; + + /* O(1) subscriber lookup via xarray */ + rcu_read_lock(); + sub =3D xa_load(&req->ctx->ipc_subscribers, ipc->channel_id); + if (sub) { + channel =3D sub->channel; + if (!refcount_inc_not_zero(&channel->ref_count)) { + rcu_read_unlock(); + ret =3D -ENOENT; + goto fail; + } + sub_flags =3D sub->flags; + sub_id =3D sub->subscriber_id; + head =3D READ_ONCE(sub->local_head); + rcu_read_unlock(); + goto found; + } + rcu_read_unlock(); + ret =3D -ENOENT; + goto fail; + +found: + ring =3D channel->ring; + + if (!(sub_flags & IOIPC_SUB_RECV)) { + ret =3D -EACCES; + goto fail_put; + } + + /* + * For multicast, competing consumers share consumer.head directly + * instead of using per-subscriber local_head. + */ + if (channel->flags & IOIPC_F_MULTICAST) + head =3D READ_ONCE(ring->consumer.head); + + /* Check if there are messages available - punt to io-wq for retry */ + tail =3D READ_ONCE(ring->producer.tail); + if (unlikely(head =3D=3D tail)) { + io_ipc_channel_put(channel); + return -EAGAIN; + } + + idx =3D head & ring->ring_mask; + desc =3D &channel->desc_array[idx]; + + /* + * Verify slot is fully written by the producer. With lock-free + * CAS-based send, tail advances before data is written; the + * per-slot seq number signals completion. + * Pairs with smp_wmb() + WRITE_ONCE(desc->seq) in send path. + */ + if (READ_ONCE(desc->seq) !=3D head + 1) { + io_ipc_channel_put(channel); + return -EAGAIN; + } + + smp_rmb(); + + src =3D channel->data_region + desc->offset; + prefetch(src); + + copy_len =3D min_t(u32, desc->len, ipc->len); + user_buf =3D u64_to_user_ptr(ipc->addr); + + if (unlikely(__copy_to_user_inatomic(user_buf, src, copy_len))) { + if (unlikely(copy_to_user(user_buf, src, copy_len))) { + ret =3D -EFAULT; + goto fail_put; + } + } + + if (channel->flags & IOIPC_F_MULTICAST) { + /* + * Multicast: atomically advance the shared consumer head. + * Losers retry via -EAGAIN / io-wq. + */ + if (cmpxchg(&ring->consumer.head, head, head + 1) !=3D head) { + io_ipc_channel_put(channel); + return -EAGAIN; + } + } else { + /* + * Re-find subscriber under RCU for atomic local_head update. + * O(1) xarray lookup; subscriber is stable while channel + * ref is held and we're under RCU. + */ + rcu_read_lock(); + sub =3D xa_load(&req->ctx->ipc_subscribers, sub_id); + if (!sub) { + rcu_read_unlock(); + goto done; + } + + if (cmpxchg(&sub->local_head, head, head + 1) !=3D head) { + rcu_read_unlock(); + io_ipc_channel_put(channel); + return -EAGAIN; + } + + if (channel->flags & IOIPC_F_BROADCAST) { + /* + * Lazy consumer.head advancement: only scan min-head + * every 16 messages to amortize the O(N) walk. + * The send path also triggers this on ring-full. + */ + if ((head & 0xf) =3D=3D 0) + ipc_advance_consumer_head(channel); + } else { + /* Unicast: single consumer, update directly */ + WRITE_ONCE(ring->consumer.head, head + 1); + } + rcu_read_unlock(); + } + +done: + io_ipc_channel_put(channel); + io_req_set_res(req, copy_len, 0); + return IOU_COMPLETE; + +fail_put: + io_ipc_channel_put(channel); +fail: + req_set_fail(req); + io_req_set_res(req, ret, 0); + return IOU_COMPLETE; +} + +int io_ipc_channel_destroy(struct io_ring_ctx *ctx, u32 channel_id) +{ + struct io_ipc_channel *channel; + struct file *f; + + /* + * Atomically remove from global lookup. This prevents a second + * destroy call from finding the channel and draining refcount + * below the number of outstanding references. + */ + channel =3D xa_erase(&channel_xa, channel_id); + if (!channel) + return -ENOENT; + + spin_lock_bh(&channel_hash_lock); + if (!hlist_unhashed(&channel->hash_node)) + hash_del_rcu(&channel->hash_node); + spin_unlock_bh(&channel_hash_lock); + + /* + * Break the reference cycle between channel and file. + * The channel holds a file reference (channel->file) and the + * file's release callback holds a channel reference via + * private_data. Detach the file here so that the release + * callback becomes a no-op and the cycle is broken. + */ + f =3D channel->file; + if (f) { + WRITE_ONCE(f->private_data, NULL); + channel->file =3D NULL; + fput(f); + io_ipc_channel_put(channel); /* Drop file's reference */ + } + + /* Drop the creator's initial reference */ + io_ipc_channel_put(channel); + + return 0; +} + +void io_ipc_ctx_cleanup(struct io_ring_ctx *ctx) +{ + struct io_ipc_subscriber *sub; + struct io_ipc_channel *channel; + unsigned long index; + void *entry; + + xa_for_each(&ctx->ipc_subscribers, index, sub) { + channel =3D sub->channel; + + xa_erase(&ctx->ipc_subscribers, index); + xa_erase(&channel->subscribers, sub->subscriber_id); + + if (sub->flags & IOIPC_SUB_RECV) + atomic_dec(&channel->recv_count); + + io_ipc_channel_put(channel); + kfree_rcu(sub, rcu); + } + xa_destroy(&ctx->ipc_subscribers); + + /* + * Destroy any channels created by this ring that were not + * explicitly destroyed. io_ipc_channel_destroy() is + * idempotent =E2=80=94 it returns -ENOENT if the channel was + * already destroyed. + */ + xa_for_each(&ctx->ipc_channels, index, entry) { + io_ipc_channel_destroy(ctx, index); + } + xa_destroy(&ctx->ipc_channels); +} + +#endif /* CONFIG_IO_URING_IPC */ diff --git a/io_uring/ipc.h b/io_uring/ipc.h new file mode 100644 index 000000000000..65d143c3e520 --- /dev/null +++ b/io_uring/ipc.h @@ -0,0 +1,161 @@ +/* SPDX-License-Identifier: GPL-2.0 */ +#ifndef IO_URING_IPC_H +#define IO_URING_IPC_H + +#include + +#ifdef CONFIG_IO_URING_IPC + +/* + * Internal kernel structures for io_uring IPC + */ + +/* Shared ring structure - lives in mmap'd memory region */ +struct io_ipc_ring { + /* Cache-aligned producer/consumer positions */ + struct { + u32 head __aligned(64); + u32 tail; + } producer; + + struct { + u32 head __aligned(64); + } consumer; + + /* Ring parameters */ + u32 ring_mask; + u32 ring_entries; + u32 max_msg_size; + + /* Message descriptors follow inline */ +}; + +/* Message descriptor in the ring */ +struct io_ipc_msg_desc { + u64 offset; /* Offset in data region for message payload */ + u32 len; /* Message length */ + u32 msg_id; /* Unique message ID */ + u64 sender_data; /* Sender's user_data for context */ + u32 seq; /* Lock-free completion sequence number */ +}; + +/* Per-subscriber attachment to a channel */ +struct io_ipc_subscriber { + u32 local_head __aligned(64); /* Cache-aligned to prevent false sharing */ + u32 subscriber_id; /* Unique subscriber ID */ + u32 flags; /* IOIPC_SUB_SEND, IOIPC_SUB_RECV */ + + struct io_ring_ctx *ctx; /* io_uring context */ + struct io_ipc_channel *channel; /* Channel this subscriber is attached to= */ + struct rcu_head rcu; /* For kfree_rcu deferred freeing */ +}; + +/* IPC channel connecting two or more io_uring instances */ +struct io_ipc_channel { + struct io_mapped_region *region; /* Shared memory region */ + struct io_ipc_ring *ring; /* Shared ring structure in mmap'd region */ + void *data_region; /* Data storage area for messages */ + struct io_ipc_msg_desc *desc_array; /* Cached descriptor array base */ + struct file *file; /* Anonymous file for mmap support */ + + /* Subscribers to this channel */ + struct xarray subscribers; /* All subscribers */ + atomic_t recv_count; /* Cached count of IOIPC_SUB_RECV subscribers */ + + /* Channel metadata */ + refcount_t ref_count; + u32 channel_id; + u32 flags; /* IOIPC_F_BROADCAST, IOIPC_F_MULTICAST */ + u64 key; /* Unique key for lookup */ + + /* Ring buffer configuration */ + u32 msg_max_size; + + /* Access control */ + kuid_t owner_uid; + kgid_t owner_gid; + u16 mode; /* Permission bits */ + + /* Next message ID */ + atomic_t next_msg_id; + + /* For multicast round-robin */ + atomic_t next_receiver_idx; + + /* Channel lifecycle */ + struct rcu_head rcu; + struct hlist_node hash_node; /* For global channel hash table */ +}; + +/* Request state for IPC operations */ +struct io_ipc_send { + struct file *file; + u64 addr; + u32 channel_id; + size_t len; + u32 target; +}; + +struct io_ipc_recv { + struct file *file; + u64 addr; + u32 channel_id; + size_t len; +}; + +/* Function declarations */ + +/* Registration operations */ +int io_ipc_channel_create(struct io_ring_ctx *ctx, + const struct io_uring_ipc_channel_create __user *arg); +int io_ipc_channel_attach(struct io_ring_ctx *ctx, + const struct io_uring_ipc_channel_attach __user *arg); +int io_ipc_channel_detach(struct io_ring_ctx *ctx, u32 channel_id); + +/* Operation prep and execution */ +int io_ipc_send_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe); +int io_ipc_send(struct io_kiocb *req, unsigned int issue_flags); + +int io_ipc_recv_prep(struct io_kiocb *req, const struct io_uring_sqe *sqe); +int io_ipc_recv(struct io_kiocb *req, unsigned int issue_flags); + +/* Channel lifecycle */ +int io_ipc_channel_destroy(struct io_ring_ctx *ctx, u32 channel_id); +void io_ipc_ctx_cleanup(struct io_ring_ctx *ctx); + +/* Channel lookup and management */ +struct io_ipc_channel *io_ipc_channel_get(u32 channel_id); +struct io_ipc_channel *io_ipc_channel_get_by_key(u64 key); +void io_ipc_channel_put(struct io_ipc_channel *channel); + +#else /* !CONFIG_IO_URING_IPC */ + +static inline int io_ipc_channel_create(struct io_ring_ctx *ctx, + const struct io_uring_ipc_channel_create __user *arg) +{ + return -EOPNOTSUPP; +} + +static inline int io_ipc_channel_attach(struct io_ring_ctx *ctx, + const struct io_uring_ipc_channel_attach __user *arg) +{ + return -EOPNOTSUPP; +} + +static inline int io_ipc_channel_detach(struct io_ring_ctx *ctx, u32 chann= el_id) +{ + return -EOPNOTSUPP; +} + +static inline int io_ipc_channel_destroy(struct io_ring_ctx *ctx, u32 chan= nel_id) +{ + return -EOPNOTSUPP; +} + +static inline void io_ipc_ctx_cleanup(struct io_ring_ctx *ctx) +{ +} + +#endif /* CONFIG_IO_URING_IPC */ + +#endif /* IO_URING_IPC_H */ diff --git a/io_uring/opdef.c b/io_uring/opdef.c index 645980fa4651..658aa36efda2 100644 --- a/io_uring/opdef.c +++ b/io_uring/opdef.c @@ -38,6 +38,7 @@ #include "futex.h" #include "truncate.h" #include "zcrx.h" +#include "ipc.h" =20 static int io_no_issue(struct io_kiocb *req, unsigned int issue_flags) { @@ -599,6 +600,18 @@ const struct io_issue_def io_issue_defs[] =3D { .prep =3D io_uring_cmd_prep, .issue =3D io_uring_cmd, }, + [IORING_OP_IPC_SEND] =3D { + .audit_skip =3D 1, + .async_size =3D sizeof(struct io_ipc_send), + .prep =3D io_ipc_send_prep, + .issue =3D io_ipc_send, + }, + [IORING_OP_IPC_RECV] =3D { + .audit_skip =3D 1, + .async_size =3D sizeof(struct io_ipc_recv), + .prep =3D io_ipc_recv_prep, + .issue =3D io_ipc_recv, + }, }; =20 const struct io_cold_def io_cold_defs[] =3D { @@ -857,6 +870,12 @@ const struct io_cold_def io_cold_defs[] =3D { .sqe_copy =3D io_uring_cmd_sqe_copy, .cleanup =3D io_uring_cmd_cleanup, }, + [IORING_OP_IPC_SEND] =3D { + .name =3D "IPC_SEND", + }, + [IORING_OP_IPC_RECV] =3D { + .name =3D "IPC_RECV", + }, }; =20 const char *io_uring_get_opcode(u8 opcode) diff --git a/io_uring/register.c b/io_uring/register.c index 0148735f7711..7646dbb2d572 100644 --- a/io_uring/register.c +++ b/io_uring/register.c @@ -34,6 +34,7 @@ #include "zcrx.h" #include "query.h" #include "bpf_filter.h" +#include "ipc.h" =20 #define IORING_MAX_RESTRICTIONS (IORING_RESTRICTION_LAST + \ IORING_REGISTER_LAST + IORING_OP_LAST) @@ -930,6 +931,30 @@ static int __io_uring_register(struct io_ring_ctx *ctx= , unsigned opcode, WRITE_ONCE(ctx->bpf_filters, ctx->restrictions.bpf_filters->filters); break; + case IORING_REGISTER_IPC_CHANNEL_CREATE: + ret =3D -EINVAL; + if (!arg || nr_args !=3D 1) + break; + ret =3D io_ipc_channel_create(ctx, arg); + break; + case IORING_REGISTER_IPC_CHANNEL_ATTACH: + ret =3D -EINVAL; + if (!arg || nr_args !=3D 1) + break; + ret =3D io_ipc_channel_attach(ctx, arg); + break; + case IORING_REGISTER_IPC_CHANNEL_DETACH: + ret =3D -EINVAL; + if (arg || !nr_args) + break; + ret =3D io_ipc_channel_detach(ctx, nr_args); + break; + case IORING_REGISTER_IPC_CHANNEL_DESTROY: + ret =3D -EINVAL; + if (arg || !nr_args) + break; + ret =3D io_ipc_channel_destroy(ctx, nr_args); + break; default: ret =3D -EINVAL; break; --=20 2.52.0 From nobody Tue Apr 7 11:14:50 2026 Received: from devnull.danielhodges.dev (vps-2f6e086e.vps.ovh.us [135.148.138.8]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by smtp.subspace.kernel.org (Postfix) with ESMTPS id 5751A313E36; Fri, 13 Mar 2026 13:09:51 +0000 (UTC) Authentication-Results: smtp.subspace.kernel.org; arc=none smtp.client-ip=135.148.138.8 ARC-Seal: i=1; a=rsa-sha256; d=subspace.kernel.org; s=arc-20240116; t=1773407394; cv=none; b=PBDXwOY2ZSEgm+gvqqmDmj0vgBOpmREprn5vTDB6J8lT1jBva/iYDqmdgsvLU/bYbfTvBd+Z7ggK6Urlw+SoNs+vZfB7gD5W/acz5EluZ48jg+vKPCu+HrlSUADRkok3Z0qjoxddpThlaKkllBM9ePtyecbZLgwU9JzAWkotL+c= ARC-Message-Signature: i=1; a=rsa-sha256; d=subspace.kernel.org; s=arc-20240116; t=1773407394; c=relaxed/simple; bh=1MsvDavICYFjQ+LOjShL7wxc8Tg9o3lfu+RTMz1FYm4=; h=From:To:Cc:Subject:Date:Message-ID:In-Reply-To:References: MIME-Version; b=FDg9hsDGce9zwTuf26IfVlMjKfAzLKFyk4s/hdzWlW6zagNikm8obZ3o7noRiHCMcLmD4494vUs1xHE8J+iWUlDGTXJuaEvSJzzRhx1+Ti7NDp/XEznm6M5a82LXwhEV7UcXf/SEYQyjFRGrJ4iHYfh/qlCEKqnqRIY7j+cjVuE= ARC-Authentication-Results: i=1; smtp.subspace.kernel.org; dmarc=pass (p=reject dis=none) header.from=danielhodges.dev; spf=pass smtp.mailfrom=danielhodges.dev; dkim=pass (2048-bit key) header.d=danielhodges.dev header.i=@danielhodges.dev header.b=Oq2D+Jww; dkim=permerror (0-bit key) header.d=danielhodges.dev header.i=@danielhodges.dev header.b=kRF6BRKD; arc=none smtp.client-ip=135.148.138.8 Authentication-Results: smtp.subspace.kernel.org; dmarc=pass (p=reject dis=none) header.from=danielhodges.dev Authentication-Results: smtp.subspace.kernel.org; spf=pass smtp.mailfrom=danielhodges.dev Authentication-Results: smtp.subspace.kernel.org; dkim=pass (2048-bit key) header.d=danielhodges.dev header.i=@danielhodges.dev header.b="Oq2D+Jww"; dkim=permerror (0-bit key) header.d=danielhodges.dev header.i=@danielhodges.dev header.b="kRF6BRKD" DKIM-Signature: v=1; a=rsa-sha256; s=202510r; d=danielhodges.dev; c=relaxed/relaxed; h=Message-ID:Date:Subject:To:From; t=1773407260; bh=2ehlIq082VA+OgkWztKF5ZA 00ae+KgRo2Eh6dBgpt4U=; b=Oq2D+Jww2OuHpjgq2G1erv2D6UaVX2WE20mpBQ3370yQzWGkMM Jta1fzs36hLAoOCB3QMn8bWx8ymxjxyOhdtb1On8tGAIldp+5VilXsD8kipROLuSKhPR4VKoq1L vQX4k2o1i3pi1B4pl1odRpHJQQMaOuzYGhEOzNoGAsQ7BjVY3cQDv5QiaAShxUwgNz5cdHECfGY vtIStVa4q5AR0J0RCkjVsEKtXbP+8eOl+BxQRYZ7B63M2aWUM1MH5z1glv07uoXSt91vUmKpkhJ Ozq3OnRV80VZ8K+qb1vDMp7F7+eRScq0U57gDjtJc8eh8Zhns86Oifg2vUky0UDAtHw==; DKIM-Signature: v=1; a=ed25519-sha256; s=202510e; d=danielhodges.dev; c=relaxed/relaxed; h=Message-ID:Date:Subject:To:From; t=1773407260; bh=2ehlIq082VA+OgkWztKF5ZA 00ae+KgRo2Eh6dBgpt4U=; b=kRF6BRKDlUz3OT5aLJaOQ9v8RddiWHO1yGjbJVJ2YpVS4IZ2Kw 2FsIMJHXXk92iZE7kOh7HPTmusKSewxwoODQ==; From: Daniel Hodges To: Jens Axboe Cc: Daniel Hodges , Pavel Begunkov , io-uring@vger.kernel.org, linux-kernel@vger.kernel.org Subject: [RFC PATCH 2/2] selftests/ipc: Add io_uring IPC selftest Date: Fri, 13 Mar 2026 09:07:39 -0400 Message-ID: <20260313130739.23265-3-git@danielhodges.dev> X-Mailer: git-send-email 2.52.0 In-Reply-To: <20260313130739.23265-1-git@danielhodges.dev> References: <20260313130739.23265-1-git@danielhodges.dev> Precedence: bulk X-Mailing-List: linux-kernel@vger.kernel.org List-Id: List-Subscribe: List-Unsubscribe: MIME-Version: 1.0 Content-Transfer-Encoding: quoted-printable Content-Type: text/plain; charset="utf-8" Add selftests for io_uring IPC channels. Tests cover send/recv, broadcast, detach, permission enforcement, ring full, message truncation, slot reuse, and cross-process operation. Signed-off-by: Daniel Hodges --- MAINTAINERS | 1 + tools/testing/selftests/ipc/Makefile | 2 +- tools/testing/selftests/ipc/io_uring_ipc.c | 1265 ++++++++++++++++++++ 3 files changed, 1267 insertions(+), 1 deletion(-) create mode 100644 tools/testing/selftests/ipc/io_uring_ipc.c diff --git a/MAINTAINERS b/MAINTAINERS index 837db4f7bcca..d43f59e31f03 100644 --- a/MAINTAINERS +++ b/MAINTAINERS @@ -13432,6 +13432,7 @@ F: include/trace/events/io_uring.h F: include/uapi/linux/io_uring.h F: include/uapi/linux/io_uring/ F: io_uring/ +F: tools/testing/selftests/ipc/io_uring_ipc* =20 IO_URING ZCRX M: Pavel Begunkov diff --git a/tools/testing/selftests/ipc/Makefile b/tools/testing/selftests= /ipc/Makefile index 50e9c299fc4a..74bc45b555f8 100644 --- a/tools/testing/selftests/ipc/Makefile +++ b/tools/testing/selftests/ipc/Makefile @@ -12,7 +12,7 @@ endif =20 CFLAGS +=3D $(KHDR_INCLUDES) =20 -TEST_GEN_PROGS :=3D msgque +TEST_GEN_PROGS :=3D msgque io_uring_ipc =20 include ../lib.mk =20 diff --git a/tools/testing/selftests/ipc/io_uring_ipc.c b/tools/testing/sel= ftests/ipc/io_uring_ipc.c new file mode 100644 index 000000000000..a82988351e02 --- /dev/null +++ b/tools/testing/selftests/ipc/io_uring_ipc.c @@ -0,0 +1,1265 @@ +// SPDX-License-Identifier: GPL-2.0 +/* + * io_uring IPC selftest + * + * Tests the io_uring IPC channel functionality including: + * - Channel creation and attachment + * - Message send and receive (broadcast and non-broadcast) + * - Broadcast delivery to multiple receivers + * - Channel detach + * - Permission enforcement (send-only, recv-only) + * - Ring full and message size limits + * - Multiple message ordering + * - Invalid parameter rejection + * - Cross-process communication + */ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +/* Check if IO_URING_IPC is supported */ +#ifndef IORING_OP_IPC_SEND +#define IORING_OP_IPC_SEND 65 +#define IORING_OP_IPC_RECV 66 + +#define IORING_REGISTER_IPC_CHANNEL_CREATE 38 +#define IORING_REGISTER_IPC_CHANNEL_ATTACH 39 +#define IORING_REGISTER_IPC_CHANNEL_DETACH 40 +#define IORING_REGISTER_IPC_CHANNEL_DESTROY 41 + +/* Flags for IPC channel creation */ +#define IOIPC_F_BROADCAST (1U << 0) +#define IOIPC_F_MULTICAST (1U << 1) +#define IOIPC_F_PRIVATE (1U << 2) + +/* Flags for subscriber attachment */ +#define IOIPC_SUB_SEND (1U << 0) +#define IOIPC_SUB_RECV (1U << 1) +#define IOIPC_SUB_BOTH (IOIPC_SUB_SEND | IOIPC_SUB_RECV) + +/* Create IPC channel */ +struct io_uring_ipc_channel_create { + __u32 flags; + __u32 ring_entries; + __u32 max_msg_size; + __u32 mode; + __u64 key; + __u32 channel_id_out; + __u32 reserved[3]; +}; + +/* Attach to existing channel */ +struct io_uring_ipc_channel_attach { + __u32 channel_id; + __u32 flags; + __u64 key; + __s32 channel_fd; + __u32 local_id_out; + __u64 mmap_offset_out; + __u32 region_size; + __u32 reserved[3]; +}; +#endif + +#ifndef __NR_io_uring_setup +#define __NR_io_uring_setup 425 +#endif + +#ifndef __NR_io_uring_enter +#define __NR_io_uring_enter 426 +#endif + +#ifndef __NR_io_uring_register +#define __NR_io_uring_register 427 +#endif + +#define QUEUE_DEPTH 32 +#define TEST_MSG "Hello from io_uring IPC!" +#define TEST_KEY 0x12345678ULL +#define KSFT_SKIP 4 + +static int io_uring_setup(unsigned int entries, struct io_uring_params *p) +{ + return syscall(__NR_io_uring_setup, entries, p); +} + +static int io_uring_enter(int fd, unsigned int to_submit, unsigned int min= _complete, + unsigned int flags, sigset_t *sig) +{ + return syscall(__NR_io_uring_enter, fd, to_submit, min_complete, + flags, sig); +} + +static int io_uring_register_syscall(int fd, unsigned int opcode, void *ar= g, + unsigned int nr_args) +{ + return syscall(__NR_io_uring_register, fd, opcode, arg, nr_args); +} + +struct io_uring { + int ring_fd; + struct io_uring_sqe *sqes; + struct io_uring_cqe *cqes; + unsigned int *sq_head; + unsigned int *sq_tail; + unsigned int *cq_head; + unsigned int *cq_tail; + unsigned int sq_ring_mask; + unsigned int cq_ring_mask; + unsigned int *sq_array; + void *sq_ring_ptr; + void *cq_ring_ptr; +}; + +static int setup_io_uring(struct io_uring *ring, unsigned int entries) +{ + struct io_uring_params p; + void *sq_ptr, *cq_ptr; + int ret; + + memset(&p, 0, sizeof(p)); + ret =3D io_uring_setup(entries, &p); + if (ret < 0) + return -errno; + + ring->ring_fd =3D ret; + ring->sq_ring_mask =3D p.sq_entries - 1; + ring->cq_ring_mask =3D p.cq_entries - 1; + + sq_ptr =3D mmap(NULL, p.sq_off.array + p.sq_entries * sizeof(unsigned int= ), + PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, + ring->ring_fd, IORING_OFF_SQ_RING); + if (sq_ptr =3D=3D MAP_FAILED) { + close(ring->ring_fd); + return -errno; + } + ring->sq_ring_ptr =3D sq_ptr; + + ring->sqes =3D mmap(NULL, p.sq_entries * sizeof(struct io_uring_sqe), + PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, + ring->ring_fd, IORING_OFF_SQES); + if (ring->sqes =3D=3D MAP_FAILED) { + munmap(sq_ptr, p.sq_off.array + p.sq_entries * sizeof(unsigned int)); + close(ring->ring_fd); + return -errno; + } + + cq_ptr =3D mmap(NULL, p.cq_off.cqes + p.cq_entries * sizeof(struct io_uri= ng_cqe), + PROT_READ | PROT_WRITE, MAP_SHARED | MAP_POPULATE, + ring->ring_fd, IORING_OFF_CQ_RING); + if (cq_ptr =3D=3D MAP_FAILED) { + munmap(ring->sqes, p.sq_entries * sizeof(struct io_uring_sqe)); + munmap(sq_ptr, p.sq_off.array + p.sq_entries * sizeof(unsigned int)); + close(ring->ring_fd); + return -errno; + } + ring->cq_ring_ptr =3D cq_ptr; + + ring->sq_head =3D sq_ptr + p.sq_off.head; + ring->sq_tail =3D sq_ptr + p.sq_off.tail; + ring->sq_array =3D sq_ptr + p.sq_off.array; + ring->cq_head =3D cq_ptr + p.cq_off.head; + ring->cq_tail =3D cq_ptr + p.cq_off.tail; + ring->cqes =3D cq_ptr + p.cq_off.cqes; + + return 0; +} + +static void cleanup_io_uring(struct io_uring *ring) +{ + close(ring->ring_fd); +} + +static struct io_uring_sqe *get_sqe(struct io_uring *ring) +{ + unsigned int tail =3D *ring->sq_tail; + unsigned int index =3D tail & ring->sq_ring_mask; + struct io_uring_sqe *sqe =3D &ring->sqes[index]; + + tail++; + *ring->sq_tail =3D tail; + ring->sq_array[index] =3D index; + + memset(sqe, 0, sizeof(*sqe)); + return sqe; +} + +static int submit_and_wait(struct io_uring *ring, struct io_uring_cqe **cq= e_ptr) +{ + unsigned int to_submit =3D *ring->sq_tail - *ring->sq_head; + unsigned int head; + int ret; + + if (to_submit) { + ret =3D io_uring_enter(ring->ring_fd, to_submit, 0, 0, NULL); + if (ret < 0) + return -errno; + } + + ret =3D io_uring_enter(ring->ring_fd, 0, 1, IORING_ENTER_GETEVENTS, NULL); + if (ret < 0) + return -errno; + + head =3D *ring->cq_head; + if (head =3D=3D *ring->cq_tail) + return -EAGAIN; + + *cqe_ptr =3D &ring->cqes[head & ring->cq_ring_mask]; + return 0; +} + +static void cqe_seen(struct io_uring *ring) +{ + (*ring->cq_head)++; +} + +static int create_channel(struct io_uring *ring, __u32 flags, __u32 ring_e= ntries, + __u32 max_msg_size, __u64 key, unsigned int *id_out) +{ + struct io_uring_ipc_channel_create create; + int ret; + + memset(&create, 0, sizeof(create)); + create.flags =3D flags; + create.ring_entries =3D ring_entries; + create.max_msg_size =3D max_msg_size; + create.mode =3D 0666; + create.key =3D key; + + ret =3D io_uring_register_syscall(ring->ring_fd, + IORING_REGISTER_IPC_CHANNEL_CREATE, + &create, 1); + if (ret < 0) + return -errno; + + *id_out =3D create.channel_id_out; + return 0; +} + +static int attach_channel(struct io_uring *ring, __u64 key, __u32 sub_flag= s, + unsigned int *local_id_out) +{ + struct io_uring_ipc_channel_attach attach; + int ret; + + memset(&attach, 0, sizeof(attach)); + attach.key =3D key; + attach.flags =3D sub_flags; + + ret =3D io_uring_register_syscall(ring->ring_fd, + IORING_REGISTER_IPC_CHANNEL_ATTACH, + &attach, 1); + if (ret < 0) + return -errno; + + *local_id_out =3D attach.local_id_out; + return 0; +} + +static int attach_channel_by_id(struct io_uring *ring, __u32 channel_id, + __u32 sub_flags, unsigned int *local_id_out) +{ + struct io_uring_ipc_channel_attach attach; + int ret; + + memset(&attach, 0, sizeof(attach)); + attach.channel_id =3D channel_id; + attach.key =3D 0; + attach.flags =3D sub_flags; + + ret =3D io_uring_register_syscall(ring->ring_fd, + IORING_REGISTER_IPC_CHANNEL_ATTACH, + &attach, 1); + if (ret < 0) + return -errno; + + *local_id_out =3D attach.local_id_out; + return 0; +} + +static int detach_channel(struct io_uring *ring, unsigned int subscriber_i= d) +{ + int ret; + + ret =3D io_uring_register_syscall(ring->ring_fd, + IORING_REGISTER_IPC_CHANNEL_DETACH, + NULL, subscriber_id); + if (ret < 0) + return -errno; + + return 0; +} + +static int destroy_channel(struct io_uring *ring, unsigned int channel_id) +{ + int ret; + + ret =3D io_uring_register_syscall(ring->ring_fd, + IORING_REGISTER_IPC_CHANNEL_DESTROY, + NULL, channel_id); + if (ret < 0) + return -errno; + + return 0; +} + +static int do_send(struct io_uring *ring, unsigned int fd, + const void *msg, size_t len) +{ + struct io_uring_sqe *sqe; + struct io_uring_cqe *cqe; + int ret, res; + + sqe =3D get_sqe(ring); + sqe->opcode =3D IORING_OP_IPC_SEND; + sqe->fd =3D fd; + sqe->addr =3D (unsigned long)msg; + sqe->len =3D len; + sqe->user_data =3D 1; + + ret =3D submit_and_wait(ring, &cqe); + if (ret < 0) + return ret; + + res =3D cqe->res; + cqe_seen(ring); + return res; +} + +static int do_recv(struct io_uring *ring, unsigned int fd, + void *buf, size_t len) +{ + struct io_uring_sqe *sqe; + struct io_uring_cqe *cqe; + int ret, res; + + sqe =3D get_sqe(ring); + sqe->opcode =3D IORING_OP_IPC_RECV; + sqe->fd =3D fd; + sqe->addr =3D (unsigned long)buf; + sqe->len =3D len; + sqe->user_data =3D 2; + + ret =3D submit_and_wait(ring, &cqe); + if (ret < 0) + return ret; + + res =3D cqe->res; + cqe_seen(ring); + return res; +} + +static int do_send_targeted(struct io_uring *ring, unsigned int fd, + const void *msg, size_t len, __u32 target) +{ + struct io_uring_sqe *sqe; + struct io_uring_cqe *cqe; + int ret, res; + + sqe =3D get_sqe(ring); + sqe->opcode =3D IORING_OP_IPC_SEND; + sqe->fd =3D fd; + sqe->addr =3D (unsigned long)msg; + sqe->len =3D len; + sqe->user_data =3D 1; + sqe->file_index =3D target; + + ret =3D submit_and_wait(ring, &cqe); + if (ret < 0) + return ret; + + res =3D cqe->res; + cqe_seen(ring); + return res; +} + +static int test_nonbroadcast(void) +{ + struct io_uring ring; + unsigned int channel_id, sub_id; + char recv_buf[256]; + int ret; + + ret =3D setup_io_uring(&ring, QUEUE_DEPTH); + if (ret < 0) + return 1; + + ret =3D create_channel(&ring, 0, 16, 4096, TEST_KEY + 100, &channel_id); + if (ret < 0) + goto fail; + + ret =3D attach_channel(&ring, TEST_KEY + 100, IOIPC_SUB_BOTH, &sub_id); + if (ret < 0) + goto fail; + + ret =3D do_send(&ring, sub_id, TEST_MSG, strlen(TEST_MSG) + 1); + if (ret < 0) + goto fail; + + memset(recv_buf, 0, sizeof(recv_buf)); + ret =3D do_recv(&ring, sub_id, recv_buf, sizeof(recv_buf)); + if (ret < 0) + goto fail; + + if (strcmp(recv_buf, TEST_MSG) !=3D 0) + goto fail; + + cleanup_io_uring(&ring); + return 0; +fail: + cleanup_io_uring(&ring); + return 1; +} + +static int test_broadcast_multi(void) +{ + struct io_uring ring1, ring2; + unsigned int channel_id, sub1_id, sub2_id; + char buf1[256], buf2[256]; + int ret; + + ret =3D setup_io_uring(&ring1, QUEUE_DEPTH); + if (ret < 0) + return 1; + + ret =3D setup_io_uring(&ring2, QUEUE_DEPTH); + if (ret < 0) { + cleanup_io_uring(&ring1); + return 1; + } + + ret =3D create_channel(&ring1, IOIPC_F_BROADCAST, 16, 4096, + TEST_KEY + 200, &channel_id); + if (ret < 0) + goto fail; + + ret =3D attach_channel(&ring1, TEST_KEY + 200, IOIPC_SUB_BOTH, &sub1_id); + if (ret < 0) + goto fail; + + ret =3D attach_channel(&ring2, TEST_KEY + 200, IOIPC_SUB_RECV, &sub2_id); + if (ret < 0) + goto fail; + + ret =3D do_send(&ring1, sub1_id, TEST_MSG, strlen(TEST_MSG) + 1); + if (ret < 0) + goto fail; + + /* Both subscribers must receive the same message */ + memset(buf1, 0, sizeof(buf1)); + ret =3D do_recv(&ring1, sub1_id, buf1, sizeof(buf1)); + if (ret < 0) + goto fail; + if (strcmp(buf1, TEST_MSG) !=3D 0) + goto fail; + + memset(buf2, 0, sizeof(buf2)); + ret =3D do_recv(&ring2, sub2_id, buf2, sizeof(buf2)); + if (ret < 0) + goto fail; + if (strcmp(buf2, TEST_MSG) !=3D 0) + goto fail; + + cleanup_io_uring(&ring1); + cleanup_io_uring(&ring2); + return 0; +fail: + cleanup_io_uring(&ring1); + cleanup_io_uring(&ring2); + return 1; +} + +static int test_detach(void) +{ + struct io_uring ring; + unsigned int channel_id, sub_id; + char buf[256]; + int ret; + + ret =3D setup_io_uring(&ring, QUEUE_DEPTH); + if (ret < 0) + return 1; + + ret =3D create_channel(&ring, 0, 16, 4096, TEST_KEY + 300, &channel_id); + if (ret < 0) + goto fail; + + ret =3D attach_channel(&ring, TEST_KEY + 300, IOIPC_SUB_BOTH, &sub_id); + if (ret < 0) + goto fail; + + ret =3D detach_channel(&ring, sub_id); + if (ret < 0) + goto fail; + + /* After detach, recv should fail with ENOENT */ + ret =3D do_recv(&ring, sub_id, buf, sizeof(buf)); + if (ret !=3D -ENOENT) + goto fail; + + cleanup_io_uring(&ring); + return 0; +fail: + cleanup_io_uring(&ring); + return 1; +} + +static int test_recv_only_cannot_send(void) +{ + struct io_uring ring; + unsigned int channel_id, sub_id; + int ret; + + ret =3D setup_io_uring(&ring, QUEUE_DEPTH); + if (ret < 0) + return 1; + + ret =3D create_channel(&ring, 0, 16, 4096, TEST_KEY + 400, &channel_id); + if (ret < 0) + goto fail; + + ret =3D attach_channel(&ring, TEST_KEY + 400, IOIPC_SUB_RECV, &sub_id); + if (ret < 0) + goto fail; + + ret =3D do_send(&ring, sub_id, TEST_MSG, strlen(TEST_MSG) + 1); + if (ret !=3D -EACCES) + goto fail; + + cleanup_io_uring(&ring); + return 0; +fail: + cleanup_io_uring(&ring); + return 1; +} + +static int test_send_only_cannot_recv(void) +{ + struct io_uring ring; + unsigned int channel_id, sub_id; + char buf[256]; + int ret; + + ret =3D setup_io_uring(&ring, QUEUE_DEPTH); + if (ret < 0) + return 1; + + ret =3D create_channel(&ring, 0, 16, 4096, TEST_KEY + 500, &channel_id); + if (ret < 0) + goto fail; + + ret =3D attach_channel(&ring, TEST_KEY + 500, IOIPC_SUB_SEND, &sub_id); + if (ret < 0) + goto fail; + + /* Send first so there's a message in the ring */ + ret =3D do_send(&ring, sub_id, TEST_MSG, strlen(TEST_MSG) + 1); + if (ret < 0) + goto fail; + + /* Recv should fail with EACCES */ + ret =3D do_recv(&ring, sub_id, buf, sizeof(buf)); + if (ret !=3D -EACCES) + goto fail; + + cleanup_io_uring(&ring); + return 0; +fail: + cleanup_io_uring(&ring); + return 1; +} + +static int test_ring_full(void) +{ + struct io_uring ring; + unsigned int channel_id, sub_id; + const char msg[] =3D "X"; + int ret; + + ret =3D setup_io_uring(&ring, QUEUE_DEPTH); + if (ret < 0) + return 1; + + /* ring_entries=3D2: can hold 2 messages before full */ + ret =3D create_channel(&ring, 0, 2, 64, TEST_KEY + 600, &channel_id); + if (ret < 0) + goto fail; + + ret =3D attach_channel(&ring, TEST_KEY + 600, IOIPC_SUB_BOTH, &sub_id); + if (ret < 0) + goto fail; + + /* Fill the 2 slots */ + ret =3D do_send(&ring, sub_id, msg, sizeof(msg)); + if (ret < 0) + goto fail; + + ret =3D do_send(&ring, sub_id, msg, sizeof(msg)); + if (ret < 0) + goto fail; + + /* Third send must fail */ + ret =3D do_send(&ring, sub_id, msg, sizeof(msg)); + if (ret !=3D -ENOBUFS) + goto fail; + + cleanup_io_uring(&ring); + return 0; +fail: + cleanup_io_uring(&ring); + return 1; +} + +static int test_msg_too_large(void) +{ + struct io_uring ring; + unsigned int channel_id, sub_id; + char big_msg[128]; + int ret; + + ret =3D setup_io_uring(&ring, QUEUE_DEPTH); + if (ret < 0) + return 1; + + /* max_msg_size=3D64 */ + ret =3D create_channel(&ring, 0, 16, 64, TEST_KEY + 700, &channel_id); + if (ret < 0) + goto fail; + + ret =3D attach_channel(&ring, TEST_KEY + 700, IOIPC_SUB_BOTH, &sub_id); + if (ret < 0) + goto fail; + + memset(big_msg, 'A', sizeof(big_msg)); + ret =3D do_send(&ring, sub_id, big_msg, sizeof(big_msg)); + if (ret !=3D -EMSGSIZE) + goto fail; + + cleanup_io_uring(&ring); + return 0; +fail: + cleanup_io_uring(&ring); + return 1; +} + +#define NUM_MULTI_MSGS 8 + +static int test_multiple_messages(void) +{ + struct io_uring ring; + unsigned int channel_id, sub_id; + char send_buf[64], recv_buf[64]; + int ret, i; + + ret =3D setup_io_uring(&ring, QUEUE_DEPTH); + if (ret < 0) + return 1; + + ret =3D create_channel(&ring, 0, 16, 64, TEST_KEY + 800, &channel_id); + if (ret < 0) + goto fail; + + ret =3D attach_channel(&ring, TEST_KEY + 800, IOIPC_SUB_BOTH, &sub_id); + if (ret < 0) + goto fail; + + for (i =3D 0; i < NUM_MULTI_MSGS; i++) { + snprintf(send_buf, sizeof(send_buf), "msg-%d", i); + ret =3D do_send(&ring, sub_id, send_buf, strlen(send_buf) + 1); + if (ret < 0) + goto fail; + } + + for (i =3D 0; i < NUM_MULTI_MSGS; i++) { + memset(recv_buf, 0, sizeof(recv_buf)); + ret =3D do_recv(&ring, sub_id, recv_buf, sizeof(recv_buf)); + if (ret < 0) + goto fail; + snprintf(send_buf, sizeof(send_buf), "msg-%d", i); + if (strcmp(recv_buf, send_buf) !=3D 0) + goto fail; + } + + /* Ring should be empty now */ + ret =3D do_recv(&ring, sub_id, recv_buf, sizeof(recv_buf)); + if (ret !=3D -EAGAIN) + goto fail; + + cleanup_io_uring(&ring); + return 0; +fail: + cleanup_io_uring(&ring); + return 1; +} + +static int test_invalid_params(void) +{ + struct io_uring ring; + unsigned int channel_id; + int ret; + + ret =3D setup_io_uring(&ring, QUEUE_DEPTH); + if (ret < 0) + return 1; + + /* Non-power-of-2 ring_entries */ + ret =3D create_channel(&ring, 0, 3, 4096, TEST_KEY + 900, &channel_id); + if (ret !=3D -EINVAL) + goto fail; + + /* Zero ring_entries */ + ret =3D create_channel(&ring, 0, 0, 4096, TEST_KEY + 901, &channel_id); + if (ret !=3D -EINVAL) + goto fail; + + /* Zero max_msg_size */ + ret =3D create_channel(&ring, 0, 16, 0, TEST_KEY + 902, &channel_id); + if (ret !=3D -EINVAL) + goto fail; + + /* BROADCAST | MULTICAST together */ + ret =3D create_channel(&ring, IOIPC_F_BROADCAST | IOIPC_F_MULTICAST, + 16, 4096, TEST_KEY + 903, &channel_id); + if (ret !=3D -EINVAL) + goto fail; + + /* Unsupported flags */ + ret =3D create_channel(&ring, 0xFF00, 16, 4096, TEST_KEY + 904, + &channel_id); + if (ret !=3D -EINVAL) + goto fail; + + cleanup_io_uring(&ring); + return 0; +fail: + cleanup_io_uring(&ring); + return 1; +} + +static int test_attach_by_id(void) +{ + struct io_uring ring; + unsigned int channel_id, sub_id; + char recv_buf[256]; + int ret; + + ret =3D setup_io_uring(&ring, QUEUE_DEPTH); + if (ret < 0) + return 1; + + ret =3D create_channel(&ring, 0, 16, 4096, TEST_KEY + 1000, &channel_id); + if (ret < 0) + goto fail; + + /* Attach using channel_id (key=3D0) instead of key */ + ret =3D attach_channel_by_id(&ring, channel_id, IOIPC_SUB_BOTH, &sub_id); + if (ret < 0) + goto fail; + + ret =3D do_send(&ring, sub_id, TEST_MSG, strlen(TEST_MSG) + 1); + if (ret < 0) + goto fail; + + memset(recv_buf, 0, sizeof(recv_buf)); + ret =3D do_recv(&ring, sub_id, recv_buf, sizeof(recv_buf)); + if (ret < 0) + goto fail; + + if (strcmp(recv_buf, TEST_MSG) !=3D 0) + goto fail; + + cleanup_io_uring(&ring); + return 0; +fail: + cleanup_io_uring(&ring); + return 1; +} + +static int test_recv_truncation(void) +{ + struct io_uring ring; + unsigned int channel_id, sub_id; + const char long_msg[] =3D "This message is longer than the receive buffer= "; + char small_buf[8]; + int ret; + + ret =3D setup_io_uring(&ring, QUEUE_DEPTH); + if (ret < 0) + return 1; + + ret =3D create_channel(&ring, 0, 16, 4096, TEST_KEY + 1100, &channel_id); + if (ret < 0) + goto fail; + + ret =3D attach_channel(&ring, TEST_KEY + 1100, IOIPC_SUB_BOTH, &sub_id); + if (ret < 0) + goto fail; + + ret =3D do_send(&ring, sub_id, long_msg, sizeof(long_msg)); + if (ret < 0) + goto fail; + + memset(small_buf, 0, sizeof(small_buf)); + ret =3D do_recv(&ring, sub_id, small_buf, sizeof(small_buf)); + /* Should return truncated length, not full message length */ + if (ret !=3D sizeof(small_buf)) + goto fail; + + /* Verify we got the first bytes */ + if (memcmp(small_buf, long_msg, sizeof(small_buf)) !=3D 0) + goto fail; + + cleanup_io_uring(&ring); + return 0; +fail: + cleanup_io_uring(&ring); + return 1; +} + +static int test_broadcast_slot_reuse(void) +{ + struct io_uring ring1, ring2; + unsigned int channel_id, sub1_id, sub2_id; + char buf[256]; + const char msg1[] =3D "first"; + const char msg2[] =3D "second"; + const char msg3[] =3D "third"; + int ret; + + ret =3D setup_io_uring(&ring1, QUEUE_DEPTH); + if (ret < 0) + return 1; + + ret =3D setup_io_uring(&ring2, QUEUE_DEPTH); + if (ret < 0) { + cleanup_io_uring(&ring1); + return 1; + } + + /* ring_entries=3D2: only 2 slots available */ + ret =3D create_channel(&ring1, IOIPC_F_BROADCAST, 2, 256, + TEST_KEY + 1200, &channel_id); + if (ret < 0) + goto fail; + + ret =3D attach_channel(&ring1, TEST_KEY + 1200, IOIPC_SUB_BOTH, &sub1_id); + if (ret < 0) + goto fail; + + ret =3D attach_channel(&ring2, TEST_KEY + 1200, IOIPC_SUB_RECV, &sub2_id); + if (ret < 0) + goto fail; + + /* Fill both slots */ + ret =3D do_send(&ring1, sub1_id, msg1, sizeof(msg1)); + if (ret < 0) + goto fail; + + ret =3D do_send(&ring1, sub1_id, msg2, sizeof(msg2)); + if (ret < 0) + goto fail; + + /* Ring is full now -- third send should fail */ + ret =3D do_send(&ring1, sub1_id, msg3, sizeof(msg3)); + if (ret !=3D -ENOBUFS) + goto fail; + + /* sub1 consumes both messages */ + ret =3D do_recv(&ring1, sub1_id, buf, sizeof(buf)); + if (ret < 0) + goto fail; + + ret =3D do_recv(&ring1, sub1_id, buf, sizeof(buf)); + if (ret < 0) + goto fail; + + /* + * Ring should still be full from the producer's perspective because + * sub2 hasn't consumed yet -- min_head stays at 0. + */ + ret =3D do_send(&ring1, sub1_id, msg3, sizeof(msg3)); + if (ret !=3D -ENOBUFS) + goto fail; + + /* sub2 consumes both messages -- now min_head advances */ + ret =3D do_recv(&ring2, sub2_id, buf, sizeof(buf)); + if (ret < 0) + goto fail; + + ret =3D do_recv(&ring2, sub2_id, buf, sizeof(buf)); + if (ret < 0) + goto fail; + + /* Now the slots should be reusable */ + ret =3D do_send(&ring1, sub1_id, msg3, sizeof(msg3)); + if (ret < 0) + goto fail; + + memset(buf, 0, sizeof(buf)); + ret =3D do_recv(&ring1, sub1_id, buf, sizeof(buf)); + if (ret < 0) + goto fail; + + if (strcmp(buf, msg3) !=3D 0) + goto fail; + + cleanup_io_uring(&ring1); + cleanup_io_uring(&ring2); + return 0; +fail: + cleanup_io_uring(&ring1); + cleanup_io_uring(&ring2); + return 1; +} + +static int test_cross_process(void) +{ + struct io_uring ring1, ring2; + unsigned int channel_id, local_id; + char recv_buf[256]; + int ret; + pid_t pid; + + ret =3D setup_io_uring(&ring1, QUEUE_DEPTH); + if (ret < 0) + return 1; + + ret =3D create_channel(&ring1, IOIPC_F_BROADCAST, 16, 4096, + TEST_KEY, &channel_id); + if (ret < 0) { + cleanup_io_uring(&ring1); + return 1; + } + + pid =3D fork(); + if (pid < 0) { + cleanup_io_uring(&ring1); + return 1; + } + + if (pid =3D=3D 0) { + cleanup_io_uring(&ring1); + + ret =3D setup_io_uring(&ring2, QUEUE_DEPTH); + if (ret < 0) + exit(1); + + usleep(100000); + + ret =3D attach_channel(&ring2, TEST_KEY, IOIPC_SUB_BOTH, + &local_id); + if (ret < 0) + exit(1); + + usleep(250000); + + memset(recv_buf, 0, sizeof(recv_buf)); + ret =3D do_recv(&ring2, local_id, recv_buf, sizeof(recv_buf)); + if (ret < 0) + exit(1); + + if (strcmp(recv_buf, TEST_MSG) !=3D 0) + exit(1); + + cleanup_io_uring(&ring2); + exit(0); + } + + /* Parent process - producer */ + usleep(200000); + + ret =3D do_send(&ring1, channel_id, TEST_MSG, strlen(TEST_MSG) + 1); + if (ret < 0) { + waitpid(pid, NULL, 0); + cleanup_io_uring(&ring1); + return 1; + } + + int status; + + waitpid(pid, &status, 0); + cleanup_io_uring(&ring1); + + if (!WIFEXITED(status) || WEXITSTATUS(status) !=3D 0) + return 1; + + return 0; +} + +static int test_multicast_roundrobin(void) +{ + struct io_uring ring1, ring2; + unsigned int channel_id, sub1_id, sub2_id; + char buf1[256], buf2[256]; + int ret; + + ret =3D setup_io_uring(&ring1, QUEUE_DEPTH); + if (ret < 0) + return 1; + + ret =3D setup_io_uring(&ring2, QUEUE_DEPTH); + if (ret < 0) { + cleanup_io_uring(&ring1); + return 1; + } + + ret =3D create_channel(&ring1, IOIPC_F_MULTICAST, 16, 4096, + TEST_KEY + 1300, &channel_id); + if (ret < 0) + goto fail; + + ret =3D attach_channel(&ring1, TEST_KEY + 1300, IOIPC_SUB_BOTH, &sub1_id); + if (ret < 0) + goto fail; + + ret =3D attach_channel(&ring2, TEST_KEY + 1300, IOIPC_SUB_BOTH, &sub2_id); + if (ret < 0) + goto fail; + + /* + * Send two messages. With multicast round-robin waking, different + * subscribers get woken for each message. Both use the shared + * consumer head, so both can recv any available message. + */ + ret =3D do_send(&ring1, sub1_id, "msg-0", 6); + if (ret < 0) + goto fail; + + ret =3D do_send(&ring1, sub1_id, "msg-1", 6); + if (ret < 0) + goto fail; + + /* Both subscribers should be able to recv one message each */ + memset(buf1, 0, sizeof(buf1)); + ret =3D do_recv(&ring1, sub1_id, buf1, sizeof(buf1)); + if (ret < 0) + goto fail; + + memset(buf2, 0, sizeof(buf2)); + ret =3D do_recv(&ring2, sub2_id, buf2, sizeof(buf2)); + if (ret < 0) + goto fail; + + /* Verify we got both messages (order may vary) */ + if (strcmp(buf1, "msg-0") !=3D 0 && strcmp(buf1, "msg-1") !=3D 0) + goto fail; + if (strcmp(buf2, "msg-0") !=3D 0 && strcmp(buf2, "msg-1") !=3D 0) + goto fail; + /* They must be different messages */ + if (strcmp(buf1, buf2) =3D=3D 0) + goto fail; + + cleanup_io_uring(&ring1); + cleanup_io_uring(&ring2); + return 0; +fail: + cleanup_io_uring(&ring1); + cleanup_io_uring(&ring2); + return 1; +} + +static int test_channel_destroy(void) +{ + struct io_uring ring; + unsigned int channel_id, sub_id; + int ret; + + ret =3D setup_io_uring(&ring, QUEUE_DEPTH); + if (ret < 0) + return 1; + + ret =3D create_channel(&ring, 0, 16, 4096, TEST_KEY + 1400, &channel_id); + if (ret < 0) + goto fail; + + ret =3D attach_channel(&ring, TEST_KEY + 1400, IOIPC_SUB_BOTH, &sub_id); + if (ret < 0) + goto fail; + + /* Destroy the channel (drops creator's reference) */ + ret =3D destroy_channel(&ring, channel_id); + if (ret < 0) + goto fail; + + /* Double destroy should fail (channel refcount already dropped) */ + ret =3D destroy_channel(&ring, channel_id); + /* + * May succeed if subscriber still holds a ref, or fail with + * ENOENT if the channel was already freed. Either way the + * first destroy must have succeeded. + */ + + /* Detach the subscriber */ + ret =3D detach_channel(&ring, sub_id); + if (ret < 0) + goto fail; + + cleanup_io_uring(&ring); + return 0; +fail: + cleanup_io_uring(&ring); + return 1; +} + +static int test_unicast_targeted(void) +{ + struct io_uring ring1, ring2; + unsigned int channel_id, sub1_id, sub2_id; + char buf1[256], buf2[256]; + struct io_uring_sqe *sqe; + struct io_uring_cqe *cqe; + int ret; + + ret =3D setup_io_uring(&ring1, QUEUE_DEPTH); + if (ret < 0) + return 1; + + ret =3D setup_io_uring(&ring2, QUEUE_DEPTH); + if (ret < 0) { + cleanup_io_uring(&ring1); + return 1; + } + + /* Create a unicast channel (no flags) */ + ret =3D create_channel(&ring1, 0, 16, 4096, TEST_KEY + 1500, &channel_id); + if (ret < 0) + goto fail; + + /* Attach sender+receiver on ring1, receiver-only on ring2 */ + ret =3D attach_channel(&ring1, TEST_KEY + 1500, IOIPC_SUB_BOTH, &sub1_id); + if (ret < 0) + goto fail; + + ret =3D attach_channel(&ring2, TEST_KEY + 1500, IOIPC_SUB_RECV, &sub2_id); + if (ret < 0) + goto fail; + + /* Send targeting subscriber 2 specifically */ + ret =3D do_send_targeted(&ring1, sub1_id, TEST_MSG, strlen(TEST_MSG) + 1, + sub2_id); + if (ret < 0) + goto fail; + + /* Receiver 2 should get the message */ + memset(buf2, 0, sizeof(buf2)); + ret =3D do_recv(&ring2, sub2_id, buf2, sizeof(buf2)); + if (ret < 0) + goto fail; + + if (strcmp(buf2, TEST_MSG) !=3D 0) + goto fail; + + cleanup_io_uring(&ring1); + cleanup_io_uring(&ring2); + return 0; +fail: + cleanup_io_uring(&ring1); + cleanup_io_uring(&ring2); + return 1; +} + +static int test_unicast_targeted_invalid(void) +{ + struct io_uring ring; + unsigned int channel_id, sub_id; + int ret; + + ret =3D setup_io_uring(&ring, QUEUE_DEPTH); + if (ret < 0) + return 1; + + ret =3D create_channel(&ring, 0, 16, 4096, TEST_KEY + 1600, &channel_id); + if (ret < 0) + goto fail; + + ret =3D attach_channel(&ring, TEST_KEY + 1600, IOIPC_SUB_BOTH, &sub_id); + if (ret < 0) + goto fail; + + /* Send targeting a non-existent subscriber ID */ + ret =3D do_send_targeted(&ring, sub_id, TEST_MSG, strlen(TEST_MSG) + 1, + 9999); + if (ret !=3D -ENOENT) + goto fail; + + cleanup_io_uring(&ring); + return 0; +fail: + cleanup_io_uring(&ring); + return 1; +} + +struct test_case { + const char *name; + int (*func)(void); +}; + +static struct test_case tests[] =3D { + { "Non-broadcast send/recv", test_nonbroadcast }, + { "Broadcast multi-receiver", test_broadcast_multi }, + { "Channel detach", test_detach }, + { "Recv-only cannot send", test_recv_only_cannot_send }, + { "Send-only cannot recv", test_send_only_cannot_recv }, + { "Ring full", test_ring_full }, + { "Message too large", test_msg_too_large }, + { "Multiple messages", test_multiple_messages }, + { "Invalid parameters", test_invalid_params }, + { "Attach by channel ID", test_attach_by_id }, + { "Recv truncation", test_recv_truncation }, + { "Broadcast slot reuse", test_broadcast_slot_reuse }, + { "Cross-process send/recv", test_cross_process }, + { "Multicast round-robin", test_multicast_roundrobin }, + { "Channel destroy", test_channel_destroy }, + { "Unicast targeted delivery", test_unicast_targeted }, + { "Unicast targeted invalid", test_unicast_targeted_invalid }, +}; + +int main(void) +{ + struct io_uring ring; + unsigned int channel_id; + int i, passed =3D 0, failed =3D 0; + int total =3D sizeof(tests) / sizeof(tests[0]); + int ret; + + printf("=3D=3D=3D io_uring IPC Selftest =3D=3D=3D\n\n"); + + /* Check if IPC is supported before running any tests */ + ret =3D setup_io_uring(&ring, QUEUE_DEPTH); + if (ret < 0) { + fprintf(stderr, "Failed to setup io_uring\n"); + return 1; + } + + ret =3D create_channel(&ring, 0, 16, 4096, 0xDEAD0000ULL, &channel_id); + cleanup_io_uring(&ring); + if (ret =3D=3D -EINVAL || ret =3D=3D -ENOSYS) { + printf("SKIP: IO_URING_IPC not supported by kernel\n"); + return KSFT_SKIP; + } + + for (i =3D 0; i < total; i++) { + printf(" [%2d/%d] %-30s ", i + 1, total, tests[i].name); + fflush(stdout); + ret =3D tests[i].func(); + if (ret =3D=3D 0) { + printf("PASS\n"); + passed++; + } else { + printf("FAIL\n"); + failed++; + } + } + + printf("\n=3D=3D=3D Results: %d passed, %d failed (of %d) =3D=3D=3D\n", + passed, failed, total); + + return failed ? 1 : 0; +} --=20 2.52.0