From nobody Sun May 24 17:49:15 2026 Received: from stravinsky.debian.org (stravinsky.debian.org [82.195.75.108]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by smtp.subspace.kernel.org (Postfix) with ESMTPS id 8D85C3FF1; Sun, 24 May 2026 14:45:31 +0000 (UTC) Authentication-Results: smtp.subspace.kernel.org; arc=none smtp.client-ip=82.195.75.108 ARC-Seal: i=1; a=rsa-sha256; d=subspace.kernel.org; s=arc-20240116; t=1779633933; cv=none; b=Abc+8JVPVaJ9vdz99y3RHZu2sopp+DkETxPzoCQFbJK9rZVX64LYOdhm1yGLzJs+eQi0t24iY4iW+IswrvELdib9o4Jw8oQmBwdVkde+PCUaJSqz41J1vfjCTuKvV5ER8rvhuLehSkamTNfVCDFypz2xr0tv06L28MMwWjOxwaQ= ARC-Message-Signature: i=1; a=rsa-sha256; d=subspace.kernel.org; s=arc-20240116; t=1779633933; c=relaxed/simple; bh=wsFayDdvCcbbdN70RHKxk0+ylgtcDhZ7jVKM6uw29Z8=; h=From:Date:Subject:MIME-Version:Content-Type:Message-Id:References: In-Reply-To:To:Cc; b=VxsbG6sVULd/37ccjLWHtqADaDRTQSz8NdtV+iFEM4MNip6dNR2SZbGUEJOJKXODzGpN4l0SVsLiw0cHT7s8ZeTpx163aYWdLCZ+6tPhm5OULAv62r/aVfROXc1Hpt+MatzSkABYKgLD8Sy0KUnCXWgvHMYzigH6lMuBGemkdjE= ARC-Authentication-Results: i=1; smtp.subspace.kernel.org; dmarc=pass (p=none dis=none) header.from=debian.org; spf=pass smtp.mailfrom=debian.org; dkim=pass (2048-bit key) header.d=debian.org header.i=@debian.org header.b=GANjCxHh; arc=none smtp.client-ip=82.195.75.108 Authentication-Results: smtp.subspace.kernel.org; dmarc=pass (p=none dis=none) header.from=debian.org Authentication-Results: smtp.subspace.kernel.org; spf=pass smtp.mailfrom=debian.org Authentication-Results: smtp.subspace.kernel.org; dkim=pass (2048-bit key) header.d=debian.org header.i=@debian.org header.b="GANjCxHh" DKIM-Signature: v=1; a=rsa-sha256; q=dns/txt; c=relaxed/relaxed; d=debian.org; s=smtpauto.stravinsky; h=X-Debian-User:Cc:To:In-Reply-To:References: Message-Id:Content-Transfer-Encoding:Content-Type:MIME-Version:Subject:Date: From:Reply-To:Content-ID:Content-Description; bh=gqIwuyUbYsnIH/lFXH+STqAFeNJuSo2bMCvkxRl3oiM=; b=GANjCxHhG2ThtV5ryC5NjAH5FS i2DiGrIUjnfMEto5Wo0ssn2lApNGeTHRdgV5GiqIEw+umUM2yjyxZPJbCML9ZSpwSgB/LQ4+v349W v8RFYb7z1jc0aYlwxdgZuMQRK7UOp18k9jLffh0PSj3WEYb9+Co/8RkXhZDIXlXou0k+cQosBz0NL paDroAzY+zu7wl2ilsoDBNQZv1Mt2KayIe7i02hArvRd8l094iykitLfmseL9FAbyR+F8AavRDRZb a4dmYVIHmZ5S98GoEn+L9LBDIynomHXn20lDxlu3KOdN22z/lHB13PokF7DmieAnvjRFWR7YBsytl FfLRVoLQ==; Received: from authenticated-user by stravinsky.debian.org with esmtpsa (TLS1.3:ECDHE_X25519__RSA_PSS_RSAE_SHA256__AES_256_GCM:256) (Exim 4.96) (envelope-from ) id 1wRA4y-000s6b-21; Sun, 24 May 2026 14:45:25 +0000 From: Breno Leitao Date: Sun, 24 May 2026 07:44:58 -0700 Subject: [PATCH v3 1/2] fs/pipe: pre-allocate pages outside pipe->mutex in anon_pipe_write Precedence: bulk X-Mailing-List: linux-kernel@vger.kernel.org List-Id: List-Subscribe: List-Unsubscribe: MIME-Version: 1.0 Content-Type: text/plain; charset="utf-8" Content-Transfer-Encoding: quoted-printable Message-Id: <20260524-fix_pipe-v3-1-bb4a75d23a90@debian.org> References: <20260524-fix_pipe-v3-0-bb4a75d23a90@debian.org> In-Reply-To: <20260524-fix_pipe-v3-0-bb4a75d23a90@debian.org> To: Alexander Viro , Christian Brauner , Jan Kara , Shuah Khan , Mateusz Guzik Cc: linux-fsdevel@vger.kernel.org, linux-kernel@vger.kernel.org, linux-kselftest@vger.kernel.org, shakeel.butt@linux.dev, jlayton@kernel.org, oleg@redhat.com, axboe@kernel.dk, Breno Leitao , kernel-team@meta.com X-Mailer: b4 0.16-dev-d5d98 X-Developer-Signature: v=1; a=openpgp-sha256; l=5719; i=leitao@debian.org; h=from:subject:message-id; bh=wsFayDdvCcbbdN70RHKxk0+ylgtcDhZ7jVKM6uw29Z8=; b=owEBbQKS/ZANAwAIATWjk5/8eHdtAcsmYgBqEw760bfHkAkogCdjDJLUCxGX2ApEqgXbEa0Q3 oi77WclSXiJAjMEAAEIAB0WIQSshTmm6PRnAspKQ5s1o5Of/Hh3bQUCahMO+gAKCRA1o5Of/Hh3 bfrOEACUhLTb+2dnykUdoR6/cvALnNxNBWH0wOaj4x6aLEIkQOQEONJCrB00iPI9upb09HyfAZ8 WF7pJuj0Pe4GgLdr4IJOx/Kmut6xsN9mG9ykav06JbBjPtqdWCKRZOM+qrcV4h2XjPpIY2VgmMA 3ozH9A2hIKnn9z0vxbJrqPF84ACF41x2PYECwrh6zQbINIJWme+mK8WAtyiLk2NCJ+9zbFdhGPk GumehOQT2sPO2PjIGyGvS3xuAFpiAcfmIXHS3Qg4mtFKUMt0R7SUiFk43pO3wqhuWnZ96IptoH9 SuxlfEgHDuasE7rPhDoHZElmnHO2QaMuXJEyGoHSyj72xiUFjYaoMKpukSOk2cXbCCPBTE+eUV9 ix+gBhbJcJqivPHI7kLjuI0WjAQeE9W34WQdtMW57kswJyZxLnVEbyXiQUDDSaoxWAe13NvvElK XHxfgnOuo5TKtRGPhzuHFzBVBIDelYH72E30zNwIa9iar8+YXPmfuJNzoZjMmsknyRhK/iuJsWm UkvjLgloOk+/fouRbwGsLNy/4tr0hOAidZsmEUU3xVDz2tRmE3hI69XPYbXjblqRYwXkE3f7Gp8 zmgyO7tD3fLwvrYZlpt+981CmqlTPKHwdup1/Ye/zhYkEqoxn9fFx6zp6oCN9Mg4v5dJ6mjO5Lo S0baeD1SpU/2UiA== X-Developer-Key: i=leitao@debian.org; a=openpgp; fpr=AC8539A6E8F46702CA4A439B35A3939FFC78776D X-Debian-User: leitao anon_pipe_write() takes pipe->mutex (aka "mutex protecting the whole thing") and then, from the per-iteration anon_pipe_get_page() helper, used to call alloc_page(GFP_HIGHUSER | __GFP_ACCOUNT) once per page while still holding it. That allocation can sleep doing direct reclaim and/or runs memcg charging, which extends the critical section and stalls a concurrent reader on the very same mutex. Just pre-alloc the required pages before the lock in an array and just pop them inside the lock. This can improve the pipe throughput up to 48% and reduce the latency in 33%, easily seen when there is memory pressure and direct reclaim. Reviewed-by: Mateusz Guzik Reviewed-by: Jeff Layton Signed-off-by: Breno Leitao --- fs/pipe.c | 103 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++= ++-- 1 file changed, 100 insertions(+), 3 deletions(-) diff --git a/fs/pipe.c b/fs/pipe.c index 9841648c9cf3..e15795cf0c76 100644 --- a/fs/pipe.c +++ b/fs/pipe.c @@ -111,16 +111,76 @@ void pipe_double_lock(struct pipe_inode_info *pipe1, pipe_lock(pipe2); } =20 -static struct page *anon_pipe_get_page(struct pipe_inode_info *pipe) +#define PIPE_PREALLOC_MAX 8 + +struct anon_pipe_prealloc { + struct page *pages[PIPE_PREALLOC_MAX]; + unsigned int count; +}; + +/* + * Pre-allocate pages outside pipe->mutex for multi-page writes. + * alloc_page() with GFP_HIGHUSER can sleep in reclaim and runs memcg + * charging; doing it under the mutex stalls a concurrent reader. + * + * Loop alloc_page() instead of alloc_pages_bulk_*(): the bulk path refuses + * __GFP_ACCOUNT under memcg (see commit 8dcb3060d81d "memcg: page_alloc: + * skip bulk allocator for __GFP_ACCOUNT") and silently degrades to a sing= le + * page. A per-page loop keeps memcg accounting and the task NUMA mempolicy + * honoured for every page; the per-call overhead is small compared to the + * pipe->mutex hold-time being shrunk. Any shortfall is covered by the + * in-lock alloc_page() fallback in anon_pipe_get_page(). + */ +static void anon_pipe_get_page_prealloc(struct anon_pipe_prealloc *preallo= c, + size_t total_len) +{ + unsigned int want, i; + struct page *page; + + prealloc->count =3D 0; + if (total_len <=3D PAGE_SIZE) + return; + + want =3D min_t(unsigned int, DIV_ROUND_UP(total_len, PAGE_SIZE), + PIPE_PREALLOC_MAX); + + for (i =3D 0; i < want; i++) { + page =3D alloc_page(GFP_HIGHUSER | __GFP_ACCOUNT); + if (!page) + break; + prealloc->pages[prealloc->count++] =3D page; + } +} + +static struct page *anon_pipe_prealloc_pop(struct anon_pipe_prealloc *prea= lloc) +{ + if (!prealloc->count) + return NULL; + + prealloc->count--; + + return prealloc->pages[prealloc->count]; +} + +static struct page *anon_pipe_get_page(struct pipe_inode_info *pipe, + struct anon_pipe_prealloc *prealloc) { + struct page *page; + + /* Drain prealloc first to keep tmp_page[] hot for later small writes. */ + page =3D anon_pipe_prealloc_pop(prealloc); + if (page) + return page; + for (int i =3D 0; i < ARRAY_SIZE(pipe->tmp_page); i++) { if (pipe->tmp_page[i]) { - struct page *page =3D pipe->tmp_page[i]; + page =3D pipe->tmp_page[i]; pipe->tmp_page[i] =3D NULL; return page; } } =20 + /* FWIW: This is called with pipe->mutex held */ return alloc_page(GFP_HIGHUSER | __GFP_ACCOUNT); } =20 @@ -139,6 +199,38 @@ static void anon_pipe_put_page(struct pipe_inode_info = *pipe, put_page(page); } =20 +/* + * Stash leftover prealloc pages in tmp_page[] so the next write to this + * pipe gets a hot page without entering the allocator. + */ +static void anon_pipe_refill_tmp_pages(struct pipe_inode_info *pipe, + struct anon_pipe_prealloc *prealloc) +{ + int i, idx; + + if (!prealloc->count) + return; + + for (i =3D 0; i < ARRAY_SIZE(pipe->tmp_page); i++) { + if (pipe->tmp_page[i]) + continue; + if (!prealloc->count) + return; + idx =3D --prealloc->count; + pipe->tmp_page[i] =3D prealloc->pages[idx]; + prealloc->pages[idx] =3D NULL; + } +} + +/* Runs after mutex_unlock() to keep put_page() out of the critical sectio= n. */ +static void anon_pipe_free_pages(struct anon_pipe_prealloc *prealloc) +{ + while (prealloc->count) { + prealloc->count--; + put_page(prealloc->pages[prealloc->count]); + } +} + static void anon_pipe_buf_release(struct pipe_inode_info *pipe, struct pipe_buffer *buf) { @@ -432,6 +524,7 @@ anon_pipe_write(struct kiocb *iocb, struct iov_iter *fr= om) { struct file *filp =3D iocb->ki_filp; struct pipe_inode_info *pipe =3D filp->private_data; + struct anon_pipe_prealloc prealloc; unsigned int head; ssize_t ret =3D 0; size_t total_len =3D iov_iter_count(from); @@ -455,6 +548,8 @@ anon_pipe_write(struct kiocb *iocb, struct iov_iter *fr= om) if (unlikely(total_len =3D=3D 0)) return 0; =20 + anon_pipe_get_page_prealloc(&prealloc, total_len); + mutex_lock(&pipe->mutex); =20 if (!pipe->readers) { @@ -512,7 +607,7 @@ anon_pipe_write(struct kiocb *iocb, struct iov_iter *fr= om) struct page *page; int copied; =20 - page =3D anon_pipe_get_page(pipe); + page =3D anon_pipe_get_page(pipe, &prealloc); if (unlikely(!page)) { if (!ret) ret =3D -ENOMEM; @@ -576,9 +671,11 @@ anon_pipe_write(struct kiocb *iocb, struct iov_iter *f= rom) wake_next_writer =3D true; } out: + anon_pipe_refill_tmp_pages(pipe, &prealloc); if (pipe_is_full(pipe)) wake_next_writer =3D false; mutex_unlock(&pipe->mutex); + anon_pipe_free_pages(&prealloc); =20 /* * If we do do a wakeup event, we do a 'sync' wakeup, because we --=20 2.54.0 From nobody Sun May 24 17:49:15 2026 Received: from stravinsky.debian.org (stravinsky.debian.org [82.195.75.108]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by smtp.subspace.kernel.org (Postfix) with ESMTPS id E623139890D; Sun, 24 May 2026 14:45:38 +0000 (UTC) Authentication-Results: smtp.subspace.kernel.org; arc=none smtp.client-ip=82.195.75.108 ARC-Seal: i=1; a=rsa-sha256; d=subspace.kernel.org; s=arc-20240116; t=1779633941; cv=none; b=jLa9CzZRDOZ3EQtezyeM+iCPxt0tS6cbjnSebBpj8V/YNLaRjLrpalg2bZ7mt6Jc6MwxOJRx0GqBwkerbM5SQKkEi6tJr2DLau3zr2+I6BiNGSzwIemSVTpdrd8LSlyPHV/t3n4Ug0n/nz3sIQpk/B21s2BNIQOuj1AVcGfcdtI= ARC-Message-Signature: i=1; a=rsa-sha256; d=subspace.kernel.org; s=arc-20240116; t=1779633941; c=relaxed/simple; bh=vrgPMxOumI6zrhKkRv/KGS1G1qcBT0IVwthBlUkvXsI=; h=From:Date:Subject:MIME-Version:Content-Type:Message-Id:References: In-Reply-To:To:Cc; b=hw4c3fGfakZiqhVLOktTX+kVjdFnK0jTT0tiwGTg9mpALGtJScdseXReDozt4pTuoelLcz9PuRL173Zr41tBZaxQN+paDg0El1rwK5Z/UBzN6S1fPLpA/DgtCQd+r2/C4CB4RkHvcqea1nX7Mp0QsNwhc+o7ectl8JlFoLV3+Lg= ARC-Authentication-Results: i=1; smtp.subspace.kernel.org; dmarc=pass (p=none dis=none) header.from=debian.org; spf=pass smtp.mailfrom=debian.org; dkim=pass (2048-bit key) header.d=debian.org header.i=@debian.org header.b=VXnTE9uS; arc=none smtp.client-ip=82.195.75.108 Authentication-Results: smtp.subspace.kernel.org; dmarc=pass (p=none dis=none) header.from=debian.org Authentication-Results: smtp.subspace.kernel.org; spf=pass smtp.mailfrom=debian.org Authentication-Results: smtp.subspace.kernel.org; dkim=pass (2048-bit key) header.d=debian.org header.i=@debian.org header.b="VXnTE9uS" DKIM-Signature: v=1; a=rsa-sha256; q=dns/txt; c=relaxed/relaxed; d=debian.org; s=smtpauto.stravinsky; h=X-Debian-User:Cc:To:In-Reply-To:References: Message-Id:Content-Transfer-Encoding:Content-Type:MIME-Version:Subject:Date: From:Reply-To:Content-ID:Content-Description; bh=AisOawA93lKhnrPRg87kaIuZBpJLV4Bu9j9lJhZlX0k=; b=VXnTE9uSrUN4McRRJpRPakN+Ex wpVvzQj9iqY4jwLSCUlc2tZ0VKgvi7m476Mh0CBVc5/v9fsnv0jLXgkfF23wH/Qz1+eY5w9C4wbMe +E3DmB745vZskXi88sQHq44RwR/tXUEDmdKYGX770neD9mQoI4aM21pqn7loyxPdMKezmMZ0Sv4fP xUUxjW3lZmx6OV3MxDWWQfzoemViIbEXuNGznX8NNHwIvCR1s5vIUHvgjYGdZNwPbiCgis/oeezDI qeGzZiaOsdOE51aZu4sjKK6cilCedVq558q8ZUjRJKxmuOi7AzsTN/w2At1ZDx/W7GUoEaExAncyr /riBpTVA==; Received: from authenticated-user by stravinsky.debian.org with esmtpsa (TLS1.3:ECDHE_X25519__RSA_PSS_RSAE_SHA256__AES_256_GCM:256) (Exim 4.96) (envelope-from ) id 1wRA53-000s6p-0w; Sun, 24 May 2026 14:45:29 +0000 From: Breno Leitao Date: Sun, 24 May 2026 07:44:59 -0700 Subject: [PATCH v3 2/2] selftests/pipe: add pipe_bench microbenchmark Precedence: bulk X-Mailing-List: linux-kernel@vger.kernel.org List-Id: List-Subscribe: List-Unsubscribe: MIME-Version: 1.0 Content-Type: text/plain; charset="utf-8" Content-Transfer-Encoding: quoted-printable Message-Id: <20260524-fix_pipe-v3-2-bb4a75d23a90@debian.org> References: <20260524-fix_pipe-v3-0-bb4a75d23a90@debian.org> In-Reply-To: <20260524-fix_pipe-v3-0-bb4a75d23a90@debian.org> To: Alexander Viro , Christian Brauner , Jan Kara , Shuah Khan , Mateusz Guzik Cc: linux-fsdevel@vger.kernel.org, linux-kernel@vger.kernel.org, linux-kselftest@vger.kernel.org, shakeel.butt@linux.dev, jlayton@kernel.org, oleg@redhat.com, axboe@kernel.dk, Breno Leitao , kernel-team@meta.com X-Mailer: b4 0.16-dev-d5d98 X-Developer-Signature: v=1; a=openpgp-sha256; l=18756; i=leitao@debian.org; h=from:subject:message-id; bh=vrgPMxOumI6zrhKkRv/KGS1G1qcBT0IVwthBlUkvXsI=; b=owEBbQKS/ZANAwAIATWjk5/8eHdtAcsmYgBqEw76jAJeWx5kyt2l6RmomvqTPkyY8wSvX2Igs wIureG0q/WJAjMEAAEIAB0WIQSshTmm6PRnAspKQ5s1o5Of/Hh3bQUCahMO+gAKCRA1o5Of/Hh3 bZq+D/9N1HN6Y6nrmwLV3uGzH0/W6SGGXaHncsfFqBqQO72kCyXOBZjiU86yMJrWuj3Fesy71tJ bGMDqRRhuw0d+QhzcZgL0cTRgsOnNWC1jFdwCUM0j5NuXxSQ1/L6Ge3Dj6hwRqhf+mW2wBSbjgP RQWrQyzDxrjIFzAWmW2upp16Y3CbZWYO5dLkqHTm4FwirtNsAzjQqLw60JZe2awW3leIY8JqoVJ iMcHd5BLx2q19ONXCGibJKMZfMLhGUAWZpePWft2i+5JB+x/805rEwaohP4RX5D7j0uO0Z3DiDa FXALye/jbgU6lfhahmU2SMVLMKKRO7W9w/C7CypNOpxZqnzYR2GeZP5GpiAKsTBZoeM9uKjzw/K t6JxfLw1sj1Ck460DmJWn9/5Tde8mllEppqXlyH9oGQVX5parNHI1wbTMs2ZIDrNTq526L7DJvI UYWo34y+EwfEj+YyX3oxHWJDDBbDqm+A8wyEA3T3eSuUGDFxQKdaXiZjO7d9hPouvzfva5YLviR r0V17BpE19S2INA7B8nO6X4MyDM+VUWhgKYf1Esaz+bbBHw4WCPsN+pwUySaSA1JDDjfKRhy662 scQ5wRWh1OBYR3yNCPYOfAKCW97A4mHlVQLRzvtoXR2XsR5tSSyUOSMnkuaLjOhsyLr+79T9RdQ 42qla5zybosU/VQ== X-Developer-Key: i=leitao@debian.org; a=openpgp; fpr=AC8539A6E8F46702CA4A439B35A3939FFC78776D X-Debian-User: leitao Add a small selftest that stresses pipe->mutex contention by spawning N writer threads that hammer a single pipe with multi-page writes, plus M reader threads that drain. Each writer records its own write() latency samples into a log2-bucketed histogram; main aggregates and prints total writes, throughput, average and percentile (p50/p99) latencies, and the maximum observed latency. Pass --memory-pressure to fork stress-ng (--vm 4 --vm-bytes 80% --vm-method all) for the duration of the run, so alloc_page() in anon_pipe_write() routinely hits direct reclaim. The flag fails fast if stress-ng is not on $PATH. Program print something like the following, for different writes, readers, msgsizes and memory pressure: config: writers=3DX readers=3DY msgsize=3DZ duration=3D3 pipe_size=3D10485= 76 memory_pressure=3D[no|yes] writes: total=3D54451 rate=3D18150/s throughput_MBps: 1134.40 lat_avg_ns: 275355 lat_p50_ns_upper: 262143 lat_p99_ns_upper: 1048575 lat_max_ns: 2145633 Reviewed-by: Jeff Layton Signed-off-by: Breno Leitao --- tools/testing/selftests/Makefile | 1 + tools/testing/selftests/pipe/.gitignore | 1 + tools/testing/selftests/pipe/Makefile | 9 + tools/testing/selftests/pipe/pipe_bench.c | 616 ++++++++++++++++++++++++++= ++++ 4 files changed, 627 insertions(+) diff --git a/tools/testing/selftests/Makefile b/tools/testing/selftests/Mak= efile index 6e59b8f63e41..bcd9db9d292c 100644 --- a/tools/testing/selftests/Makefile +++ b/tools/testing/selftests/Makefile @@ -91,6 +91,7 @@ TARGETS +=3D pcie_bwctrl TARGETS +=3D perf_events TARGETS +=3D pidfd TARGETS +=3D pid_namespace +TARGETS +=3D pipe TARGETS +=3D power_supply TARGETS +=3D powerpc TARGETS +=3D prctl diff --git a/tools/testing/selftests/pipe/.gitignore b/tools/testing/selfte= sts/pipe/.gitignore new file mode 100644 index 000000000000..20b549361a15 --- /dev/null +++ b/tools/testing/selftests/pipe/.gitignore @@ -0,0 +1 @@ +pipe_bench diff --git a/tools/testing/selftests/pipe/Makefile b/tools/testing/selftest= s/pipe/Makefile new file mode 100644 index 000000000000..1810c680117b --- /dev/null +++ b/tools/testing/selftests/pipe/Makefile @@ -0,0 +1,9 @@ +# SPDX-License-Identifier: GPL-2.0 +# Copyright (c) 2026 Meta Platforms, Inc. and affiliates +# Copyright (c) 2026 Breno Leitao + +CFLAGS +=3D -O2 -Wall -Wextra -pthread + +TEST_GEN_PROGS :=3D pipe_bench + +include ../lib.mk diff --git a/tools/testing/selftests/pipe/pipe_bench.c b/tools/testing/self= tests/pipe/pipe_bench.c new file mode 100644 index 000000000000..7e96429b8fb4 --- /dev/null +++ b/tools/testing/selftests/pipe/pipe_bench.c @@ -0,0 +1,616 @@ +// SPDX-License-Identifier: GPL-2.0 +/* + * pipe_bench - exercise concurrent pipe operation + * + * N writer threads hammer a single pipe with multi-page writes; M reader + * threads drain it. Each writer records its own write() latency histogram. + * Multi-page writes (msgsize >=3D PAGE_SIZE) force the loop in + * anon_pipe_write() to call alloc_page(GFP_HIGHUSER | __GFP_ACCOUNT) under + * pipe->mutex, which is the critical section the patch shrinks. + * + * By default the benchmark sweeps writers in {1, 2, 5} x readers in + * {1, 5, 10} and prints one block per configuration so two runs (e.g. + * baseline vs patched) can be diffed directly. Pass -w and -r to run a + * single configuration instead. Pass --memory-pressure to spawn stress-ng + * alongside the sweep so the per-page alloc_page() path under pipe->mutex + * has to dip into reclaim. + * + * Copyright (c) 2026 Meta Platforms, Inc. and affiliates + * Copyright (c) 2026 Breno Leitao + */ + +#define _GNU_SOURCE +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#define ARRAY_SIZE(a) (sizeof(a) / sizeof((a)[0])) +#define HIST_BUCKETS 32 + +static size_t g_msgsize =3D 16 * 4096; +static int g_duration =3D 3; +static int g_pipe_size =3D 1024 * 1024; +static int g_memory_pressure; + +static atomic_int g_stop; +static int g_pipe[2]; + +struct wstats { + uint64_t writes; + uint64_t bytes; + uint64_t lat_sum_ns; + uint64_t lat_max_ns; + uint64_t lat_hist[HIST_BUCKETS]; + char *buf; +}; + +struct rstats { + char *buf; +}; + +struct hist_totals { + uint64_t writes; + uint64_t bytes; + uint64_t lat_sum; + uint64_t lat_max; +}; + +static inline uint64_t now_ns(void) +{ + struct timespec ts; + + clock_gettime(CLOCK_MONOTONIC, &ts); + return (uint64_t)ts.tv_sec * 1000000000ull + (uint64_t)ts.tv_nsec; +} + +static inline int log2_bucket(uint64_t v) +{ + int b =3D 0; + + if (!v) + return 0; + while (v >>=3D 1) + b++; + return b < HIST_BUCKETS ? b : HIST_BUCKETS - 1; +} + +static void *writer(void *arg) +{ + struct wstats *s =3D arg; + + while (!atomic_load_explicit(&g_stop, memory_order_relaxed)) { + uint64_t t0 =3D now_ns(); + ssize_t n =3D write(g_pipe[1], s->buf, g_msgsize); + uint64_t dt =3D now_ns() - t0; + + if (n > 0) { + s->writes++; + s->bytes +=3D (uint64_t)n; + s->lat_sum_ns +=3D dt; + if (dt > s->lat_max_ns) + s->lat_max_ns =3D dt; + s->lat_hist[log2_bucket(dt)]++; + } else if (n < 0 && (errno =3D=3D EPIPE || errno =3D=3D EBADF)) { + break; + } + } + return NULL; +} + +static void *reader(void *arg) +{ + struct rstats *s =3D arg; + + /* + * Drain until EOF (write end closed by main). g_stop is not checked + * here on purpose: writers may be blocked in write() with the pipe + * full when g_stop is set, so the reader must keep draining until + * main closes the write end. + */ + for (;;) { + ssize_t n =3D read(g_pipe[0], s->buf, g_msgsize); + + if (n <=3D 0) + break; + } + return NULL; +} + +/* Sum per-writer stats and per-bucket counts into the caller's aggregates= . */ +static void aggregate_wstats(struct wstats *all, int nw, + uint64_t agg[HIST_BUCKETS], + struct hist_totals *t) +{ + memset(t, 0, sizeof(*t)); + for (int i =3D 0; i < nw; i++) { + t->writes +=3D all[i].writes; + t->bytes +=3D all[i].bytes; + t->lat_sum +=3D all[i].lat_sum_ns; + if (all[i].lat_max_ns > t->lat_max) + t->lat_max =3D all[i].lat_max_ns; + for (int b =3D 0; b < HIST_BUCKETS; b++) + agg[b] +=3D all[i].lat_hist[b]; + } +} + +/* + * Walk @agg in order, returning the inclusive upper bound (in ns) of the + * log2 bucket where the running sum first reaches @target. + * + * A percentile is undefined with zero samples, and with very low sample + * counts integer truncation could make @target zero -- then "cum >=3D 0" + * would latch on the first (possibly empty) bucket. Callers must pass + * @target >=3D 1. + */ +static uint64_t bucket_at(const uint64_t agg[HIST_BUCKETS], uint64_t targe= t) +{ + uint64_t cum =3D 0; + + for (int b =3D 0; b < HIST_BUCKETS; b++) { + /* HIST_BUCKETS <=3D 63, so (b + 1) is always a safe shift. */ + uint64_t upper =3D (1ULL << (b + 1)) - 1; + + cum +=3D agg[b]; + if (cum >=3D target) + return upper; + } + return 0; +} + +static void compute_p50_p99(const uint64_t agg[HIST_BUCKETS], uint64_t wri= tes, + uint64_t *p50, uint64_t *p99) +{ + uint64_t p50_target, p99_target; + + *p50 =3D *p99 =3D 0; + if (!writes) + return; + + p50_target =3D writes * 50 / 100; + p99_target =3D writes * 99 / 100; + if (!p50_target) + p50_target =3D 1; + if (!p99_target) + p99_target =3D 1; + + *p50 =3D bucket_at(agg, p50_target); + *p99 =3D bucket_at(agg, p99_target); +} + +static void print_summary(int nw, int nr, const struct hist_totals *t, + uint64_t p50, uint64_t p99) +{ + double sec =3D g_duration; + uint64_t avg_ns =3D t->writes ? t->lat_sum / t->writes : 0; + + printf("config: writers=3D%d readers=3D%d msgsize=3D%zu duration=3D%d pip= e_size=3D%d memory_pressure=3D%s\n", + nw, nr, g_msgsize, g_duration, g_pipe_size, + g_memory_pressure ? "yes" : "no"); + printf("writes: total=3D%llu rate=3D%.0f/s\n", + (unsigned long long)t->writes, (double)t->writes / sec); + printf("throughput_MBps: %.2f\n", + ((double)t->bytes / sec) / (1024.0 * 1024.0)); + printf("lat_avg_ns: %llu\n", (unsigned long long)avg_ns); + printf("lat_p50_ns_upper: %llu\n", (unsigned long long)p50); + printf("lat_p99_ns_upper: %llu\n", (unsigned long long)p99); + printf("lat_max_ns: %llu\n", (unsigned long long)t->lat_max); +} + +static void summarize(struct wstats *all, int nw, int nr) +{ + uint64_t agg[HIST_BUCKETS] =3D {0}; + struct hist_totals t; + uint64_t p50, p99; + + aggregate_wstats(all, nw, agg, &t); + compute_p50_p99(agg, t.writes, &p50, &p99); + print_summary(nw, nr, &t, p50, p99); +} + +/* + * Child branch of fork(): restore SIGPIPE to default (parent ignores it), + * exec stress-ng, and on failure write the reason into @hs_wr before + * exiting. The parent observes EOF on hs_wr (closed via O_CLOEXEC) when + * exec succeeds. + */ +static void stress_ng_child(int hs_wr) __attribute__((noreturn)); +static void stress_ng_child(int hs_wr) +{ + char errbuf[256]; + + signal(SIGPIPE, SIG_DFL); + execlp("stress-ng", "stress-ng", + "--vm", "4", "--vm-bytes", "80%", + "--vm-method", "all", + (char *)NULL); + snprintf(errbuf, sizeof(errbuf), + "exec stress-ng failed: %s\n", strerror(errno)); + (void)!write(hs_wr, errbuf, strlen(errbuf)); + _exit(127); +} + +/* + * Read from the O_CLOEXEC handshake pipe. Anything readable means the + * child wrote an error before exec; EOF (n =3D=3D 0) means the write-end + * closed because exec succeeded. Returns 0 on exec success, -1 if the + * child failed and was reaped. + */ +static int stress_ng_wait_handshake(int hs_rd, pid_t pid) +{ + struct pollfd pfd =3D { .fd =3D hs_rd, .events =3D POLLIN }; + char errbuf[256]; + int status; + int ret; + + ret =3D poll(&pfd, 1, 500); + if (ret <=3D 0) + return 0; + + ssize_t n =3D read(hs_rd, errbuf, sizeof(errbuf) - 1); + + if (n > 0) { + errbuf[n] =3D '\0'; + fputs(errbuf, stderr); + waitpid(pid, &status, 0); + return -1; + } + return 0; +} + +static pid_t spawn_stress_ng(void) +{ + int hs[2]; + pid_t pid; + + /* + * Handshake pipe: child writes one byte and _exit()s on exec + * failure. On exec success the O_CLOEXEC flag closes the write + * end, which the parent observes as EOF. This makes the "is + * stress-ng on $PATH?" check fail fast rather than silently. + */ + if (pipe2(hs, O_CLOEXEC) < 0) { + perror("pipe2"); + return -1; + } + + pid =3D fork(); + if (pid < 0) { + perror("fork"); + close(hs[0]); + close(hs[1]); + return -1; + } + if (pid =3D=3D 0) { + close(hs[0]); + stress_ng_child(hs[1]); + } + + close(hs[1]); + if (stress_ng_wait_handshake(hs[0], pid) < 0) { + close(hs[0]); + return -1; + } + close(hs[0]); + + /* Give stress-ng a moment to map its VM regions before measuring. */ + sleep(1); + return pid; +} + +static void kill_stress_ng(pid_t pid) +{ + int status; + + if (pid <=3D 0) + return; + kill(pid, SIGTERM); + for (int i =3D 0; i < 20; i++) { + if (waitpid(pid, &status, WNOHANG) > 0) + return; + usleep(100 * 1000); + } + kill(pid, SIGKILL); + waitpid(pid, &status, 0); +} + +/* + * Allocate per-thread page-aligned buffers in main so a failed + * aligned_alloc() aborts the run before any thread starts. Workers used + * to allocate their own buffer and return NULL on failure, which left + * peers blocked in write()/read() with nobody to unblock them. + */ +static int alloc_thread_bufs(struct wstats *ws, int nw, + struct rstats *rs, int nr) +{ + for (int i =3D 0; i < nw; i++) { + ws[i].buf =3D aligned_alloc(4096, g_msgsize); + if (!ws[i].buf) { + fprintf(stderr, "writer %d: aligned_alloc(%zu) failed\n", + i, g_msgsize); + return -1; + } + memset(ws[i].buf, 0xAA, g_msgsize); + } + for (int i =3D 0; i < nr; i++) { + rs[i].buf =3D aligned_alloc(4096, g_msgsize); + if (!rs[i].buf) { + fprintf(stderr, "reader %d: aligned_alloc(%zu) failed\n", + i, g_msgsize); + return -1; + } + } + return 0; +} + +static void free_thread_bufs(struct wstats *ws, int nw, + struct rstats *rs, int nr) +{ + if (ws) + for (int i =3D 0; i < nw; i++) + free(ws[i].buf); + if (rs) + for (int i =3D 0; i < nr; i++) + free(rs[i].buf); +} + +static int start_readers(pthread_t *rt, struct rstats *rs, int nr, + int *created) +{ + for (int i =3D 0; i < nr; i++) { + int err =3D pthread_create(&rt[i], NULL, reader, &rs[i]); + + if (err) { + fprintf(stderr, "pthread_create reader %d: %s\n", + i, strerror(err)); + return -1; + } + (*created)++; + } + return 0; +} + +static int start_writers(pthread_t *wt, struct wstats *ws, int nw, + int *created) +{ + for (int i =3D 0; i < nw; i++) { + int err =3D pthread_create(&wt[i], NULL, writer, &ws[i]); + + if (err) { + fprintf(stderr, "pthread_create writer %d: %s\n", + i, strerror(err)); + return -1; + } + (*created)++; + } + return 0; +} + +static int open_bench_pipe(void) +{ + if (pipe(g_pipe) < 0) { + perror("pipe"); + return -1; + } + if (fcntl(g_pipe[1], F_SETPIPE_SZ, g_pipe_size) < 0) + perror("F_SETPIPE_SZ (continuing)"); + return 0; +} + +/* + * Normal termination: g_stop tells writers to leave the loop after the + * current write() returns. Closing the shared write-end fd means once + * the in-flight writes drain, readers see EOF and exit. Writers are not + * unblocked by EPIPE here -- g_pipe[0] stays open so readers can keep + * draining. + * + * Error path: some threads may have been created and others skipped, so + * writers could be blocked in write() with no reader making progress. + * Close both ends -- closing the read end is what delivers EPIPE to a + * blocked writer. + */ +static void stop_and_join(pthread_t *wt, int nw_created, + pthread_t *rt, int nr_created, int rc) +{ + atomic_store(&g_stop, 1); + close(g_pipe[1]); + if (rc < 0) + close(g_pipe[0]); + for (int i =3D 0; i < nw_created; i++) + pthread_join(wt[i], NULL); + for (int i =3D 0; i < nr_created; i++) + pthread_join(rt[i], NULL); + if (rc =3D=3D 0) + close(g_pipe[0]); +} + +static int run_one(int nw, int nr) +{ + pthread_t *wt =3D NULL, *rt =3D NULL; + struct wstats *ws =3D NULL; + struct rstats *rs =3D NULL; + int nw_created =3D 0, nr_created =3D 0; + int rc =3D 0; + + atomic_store(&g_stop, 0); + + if (open_bench_pipe() < 0) + return -1; + + wt =3D calloc((size_t)nw, sizeof(*wt)); + rt =3D calloc((size_t)nr, sizeof(*rt)); + ws =3D calloc((size_t)nw, sizeof(*ws)); + rs =3D calloc((size_t)nr, sizeof(*rs)); + if (!wt || !rt || !ws || !rs) { + fprintf(stderr, "alloc failed\n"); + rc =3D -1; + goto teardown; + } + + if (alloc_thread_bufs(ws, nw, rs, nr) < 0) { + rc =3D -1; + goto teardown; + } + + if (start_readers(rt, rs, nr, &nr_created) < 0 || + start_writers(wt, ws, nw, &nw_created) < 0) { + rc =3D -1; + goto teardown; + } + + sleep((unsigned int)g_duration); + +teardown: + stop_and_join(wt, nw_created, rt, nr_created, rc); + + if (rc =3D=3D 0) { + summarize(ws, nw, nr); + fflush(stdout); + } + + free_thread_bufs(ws, nw, rs, nr); + free(wt); + free(rt); + free(ws); + free(rs); + return rc; +} + +static void usage(const char *prog) +{ + fprintf(stderr, + "usage: %s [-w writers] [-r readers] [-s msgsize] [-d secs] [-p pipe_siz= e] [--memory-pressure]\n" + " default: sweep writers=3D{1,2,5} x readers=3D{1,5,10}\n" + " --memory-pressure: spawn stress-ng (--vm 4 --vm-bytes 80%% --vm-metho= d all) for the run\n", + prog); +} + +static int parse_args(int argc, char **argv, + int *writers_override, int *readers_override) +{ + static const struct option long_opts[] =3D { + {"memory-pressure", no_argument, NULL, 'M'}, + {0, 0, 0, 0}, + }; + int opt; + + while ((opt =3D getopt_long(argc, argv, "w:r:s:d:p:", + long_opts, NULL)) !=3D -1) { + switch (opt) { + case 'w': + *writers_override =3D atoi(optarg); + break; + case 'r': + *readers_override =3D atoi(optarg); + break; + case 's': + g_msgsize =3D (size_t)atol(optarg); + break; + case 'd': + g_duration =3D atoi(optarg); + break; + case 'p': + g_pipe_size =3D atoi(optarg); + break; + case 'M': + g_memory_pressure =3D 1; + break; + default: + usage(argv[0]); + return -1; + } + } + return 0; +} + +/* + * aligned_alloc(4096, size) requires size to be a multiple of the + * alignment (C11); glibc returns NULL otherwise, which would make + * writer/reader threads silently exit and the run report zero writes. + * Validate up front instead. + */ +static int validate_args(void) +{ + if (g_msgsize =3D=3D 0 || g_msgsize % 4096 !=3D 0) { + fprintf(stderr, + "msgsize must be a positive multiple of 4096 (got %zu)\n", + g_msgsize); + return -1; + } + if (g_duration <=3D 0) { + fprintf(stderr, "duration must be > 0 seconds (got %d)\n", + g_duration); + return -1; + } + if (g_pipe_size <=3D 0) { + fprintf(stderr, "pipe_size must be > 0 bytes (got %d)\n", + g_pipe_size); + return -1; + } + return 0; +} + +static int run_sweep(void) +{ + static const int writers_sweep[] =3D {1, 2, 5}; + static const int readers_sweep[] =3D {1, 5, 10}; + + for (size_t i =3D 0; i < ARRAY_SIZE(writers_sweep); i++) { + for (size_t j =3D 0; j < ARRAY_SIZE(readers_sweep); j++) { + printf("---\n"); + if (run_one(writers_sweep[i], readers_sweep[j]) < 0) + return -1; + } + } + return 0; +} + +int main(int argc, char **argv) +{ + int writers_override =3D 0, readers_override =3D 0; + pid_t stress_pid =3D -1; + int rc =3D 0; + + if (parse_args(argc, argv, &writers_override, &readers_override) < 0) + return 1; + if (validate_args() < 0) + return 1; + + signal(SIGPIPE, SIG_IGN); + setvbuf(stdout, NULL, _IOLBF, 0); + setvbuf(stderr, NULL, _IOLBF, 0); + + fprintf(stderr, "pid=3D%d\n", getpid()); + fflush(stderr); + + if (g_memory_pressure) { + stress_pid =3D spawn_stress_ng(); + if (stress_pid < 0) { + fprintf(stderr, + "memory_pressure requested but stress-ng could not be spawned\n"); + return 1; + } + } + + if (writers_override > 0 || readers_override > 0) { + int nw =3D writers_override > 0 ? writers_override : 1; + int nr =3D readers_override > 0 ? readers_override : 1; + + rc =3D run_one(nw, nr) < 0 ? 1 : 0; + } else { + rc =3D run_sweep() < 0 ? 1 : 0; + } + + kill_stress_ng(stress_pid); + return rc; +} --=20 2.54.0