From nobody Sun May 24 19:33:56 2026 Received: from stravinsky.debian.org (stravinsky.debian.org [82.195.75.108]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by smtp.subspace.kernel.org (Postfix) with ESMTPS id E4B5644CADF; Fri, 22 May 2026 16:44:39 +0000 (UTC) Authentication-Results: smtp.subspace.kernel.org; arc=none smtp.client-ip=82.195.75.108 ARC-Seal: i=1; a=rsa-sha256; d=subspace.kernel.org; s=arc-20240116; t=1779468283; cv=none; b=LsLkdkZ0EAahHmPR4NmqjQWgKDbJY4zvsfTEfTkh7Ojf0v8vNrsWd6L2cnsRN1iR2JW+mwh2Fvqx3cYXxcILtrFsdZgBfs//5b3+VSG7QwlhX8CsSuKvyNZHBodxI4hYq+vyX3DCNjIwvNpvcwGDOh/F05xbm3I1XUBxJ1dSAbM= ARC-Message-Signature: i=1; a=rsa-sha256; d=subspace.kernel.org; s=arc-20240116; t=1779468283; c=relaxed/simple; bh=/xHqRTO2zjYCKx0/GdOznI7H8QKk3Vyf9kPi929KwYo=; h=From:Date:Subject:MIME-Version:Content-Type:Message-Id:References: In-Reply-To:To:Cc; b=jfwYoU7LgemzHY580wh931WddIuZ4A/AyXD5xkctqXZFfJ3z8LW4QkxgH6No7MmvLSDT2+3pqpx5MHNW1a/8M9Ojj5yNRWoTxoHD5F2N+JsYhPAjZqq4suxPVAJ4SJM40YKaqDoO4NMcGTf+rb/ORk6Llaeni9F4Bga8/AFeCXk= ARC-Authentication-Results: i=1; smtp.subspace.kernel.org; dmarc=pass (p=none dis=none) header.from=debian.org; spf=pass smtp.mailfrom=debian.org; dkim=pass (2048-bit key) header.d=debian.org header.i=@debian.org header.b=EC8jVXAv; arc=none smtp.client-ip=82.195.75.108 Authentication-Results: smtp.subspace.kernel.org; dmarc=pass (p=none dis=none) header.from=debian.org Authentication-Results: smtp.subspace.kernel.org; spf=pass smtp.mailfrom=debian.org Authentication-Results: smtp.subspace.kernel.org; dkim=pass (2048-bit key) header.d=debian.org header.i=@debian.org header.b="EC8jVXAv" DKIM-Signature: v=1; a=rsa-sha256; q=dns/txt; c=relaxed/relaxed; d=debian.org; s=smtpauto.stravinsky; h=X-Debian-User:Cc:To:In-Reply-To:References: Message-Id:Content-Transfer-Encoding:Content-Type:MIME-Version:Subject:Date: From:Reply-To:Content-ID:Content-Description; bh=Aw+E1KV64PHL5+qduaNKFyFbxxy3EjSau0LQIK83gBE=; b=EC8jVXAvw0sO4BIpuTNjLu74M3 7aeDirgFvqc/+mNJ5WXeVhiIaduI8NytS9N98Yb1l+kyNxoYgUYu9LhYUTgxSUxyXWcDy/yeoY8ZI jehBFMAG+/ZG3PQ2L3BF7QAddk5SLkxgTZ2BIlVL6F91MDsZjHuehpxYKEgXYB8QOf4sKvl2L64Tf p+HvDmxNbZoHqPlAz+h+kCnbIpS5/GbRi2tmVNRo8Nj8XnbSx9X7tX1J3Iy28OqQKYOeGj+lBnCET /JG+FjjQ1wFGDwYbpB9EqS+7RlI5LiSSJVxFVSmZbL9NzA23i/PNV0bZUCZEBikfA5HnbUMREixRL ynkrHlsg==; Received: from authenticated user by stravinsky.debian.org with esmtpsa (TLS1.3:ECDHE_X25519__RSA_PSS_RSAE_SHA256__AES_256_GCM:256) (Exim 4.96) (envelope-from ) id 1wQSzD-004nmg-0x; Fri, 22 May 2026 16:44:35 +0000 From: Breno Leitao Date: Fri, 22 May 2026 09:44:21 -0700 Subject: [PATCH v2 1/2] fs/pipe: pre-allocate pages outside pipe->mutex in anon_pipe_write Precedence: bulk X-Mailing-List: linux-kernel@vger.kernel.org List-Id: List-Subscribe: List-Unsubscribe: MIME-Version: 1.0 Content-Type: text/plain; charset="utf-8" Content-Transfer-Encoding: quoted-printable Message-Id: <20260522-fix_pipe-v2-1-a8b35a78244e@debian.org> References: <20260522-fix_pipe-v2-0-a8b35a78244e@debian.org> In-Reply-To: <20260522-fix_pipe-v2-0-a8b35a78244e@debian.org> To: Alexander Viro , Christian Brauner , Jan Kara , Shuah Khan , Mateusz Guzik Cc: linux-fsdevel@vger.kernel.org, linux-kernel@vger.kernel.org, linux-kselftest@vger.kernel.org, shakeel.butt@linux.dev, jlayton@kernel.org, oleg@redhat.com, axboe@kernel.dk, Breno Leitao , kernel-team@meta.com X-Mailer: b4 0.16-dev-d5d98 X-Developer-Signature: v=1; a=openpgp-sha256; l=6089; i=leitao@debian.org; h=from:subject:message-id; bh=/xHqRTO2zjYCKx0/GdOznI7H8QKk3Vyf9kPi929KwYo=; b=owEBbQKS/ZANAwAIATWjk5/8eHdtAcsmYgBqEIfob5uCweanJ/elEd9VoKupP6OUlA5/jNK/N TSkAYLuBIyJAjMEAAEIAB0WIQSshTmm6PRnAspKQ5s1o5Of/Hh3bQUCahCH6AAKCRA1o5Of/Hh3 bVuOEACiubRtGu4seXhszduhutWH6QPeDgUF7krNFyRw+Ch6oUH4+0mx2GOoApetETyoz7g/84W HC5EcFVYtnLwtbqfN7Wo6TWbyBGL4StOel8pVGWbF1kfkSydt0aJXUAh15EiJKrYPEwa6arVBNs 0t9p+tNEPDqsEW2broNT3Y+U+Q5q+BhnBYDMWlcstXNMMLSu8EJSN4zbLPXs6PiBO5//grJyxM9 QziQS9zRzxe0VKGfCPqdy3XVc1vX78pXAE8naaubdjNEQ4O1Uqk2ouFu7Ikx6UfMDISt+cBEl/R GJBijQYVkNJ0EPx0WmZKBRcpmOEnnkII5FZUFddGQyoOL/b/KCub6ImiFYo4dFX2dXod0bx9HRw l4Vsbti6VVoruJkvrTaUapVZ4jD43bvo75wSaCbqoPQkW0Ed34yCoLGUREIDG4nWX3LXnQNWear dNNqkyyHi+pQyiciPgRrKQuvgeil5fZBrTe31MjmEzLix/wX7At4mvbWw2fZleqlEt889hqbrUD hoayCGYF2LnVlKg7X5zFLeR7CVJlkyugaiCagaVdx1rTBSt3LNiMHWJiEdwrW/0I0v70rPahfoz YoK2R1ppJ5pXKV+nAnGUaoEGg7A8W2kdhJ45FhEn2TyfyoNyk8B7ZkiVSeb/ryG11Cw8sR9+rKg lBQEkly8q8fAm0A== X-Developer-Key: i=leitao@debian.org; a=openpgp; fpr=AC8539A6E8F46702CA4A439B35A3939FFC78776D X-Debian-User: leitao anon_pipe_write() takes pipe->mutex (aka "mutex protecting the whole thing") and then, from the per-iteration anon_pipe_get_page() helper, used to call alloc_page(GFP_HIGHUSER | __GFP_ACCOUNT) once per page while still holding it. That allocation can sleep doing direct reclaim and/or runs memcg charging, which extends the critical section and stalls a concurrent reader on the very same mutex. Just pre-alloc the required pages before the lock in an array and just pop them inside the lock. This can improve the pipe throughput up to 48% and reduce the latency in 33%, easily seen when there is memory pressure and direct reclaim. Signed-off-by: Breno Leitao Reviewed-by: Jeff Layton Reviewed-by: Mateusz Guzik --- fs/pipe.c | 105 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++= ++-- 1 file changed, 102 insertions(+), 3 deletions(-) diff --git a/fs/pipe.c b/fs/pipe.c index 9841648c9cf3e..cff255217bbfe 100644 --- a/fs/pipe.c +++ b/fs/pipe.c @@ -111,16 +111,76 @@ void pipe_double_lock(struct pipe_inode_info *pipe1, pipe_lock(pipe2); } =20 -static struct page *anon_pipe_get_page(struct pipe_inode_info *pipe) +#define PIPE_PREALLOC_MAX 8 + +struct anon_pipe_prealloc { + struct page *pages[PIPE_PREALLOC_MAX]; + unsigned int count; +}; + +/* + * Pre-allocate pages outside pipe->mutex for multi-page writes. + * alloc_page() with GFP_HIGHUSER can sleep in reclaim and runs memcg + * charging; doing it under the mutex stalls a concurrent reader. + * + * Loop alloc_page() instead of alloc_pages_bulk_*(): the bulk path refuses + * __GFP_ACCOUNT under memcg (see commit 8dcb3060d81d "memcg: page_alloc: + * skip bulk allocator for __GFP_ACCOUNT") and silently degrades to a sing= le + * page. A per-page loop keeps memcg accounting and the task NUMA mempolicy + * honoured for every page; the per-call overhead is small compared to the + * pipe->mutex hold-time being shrunk. Any shortfall is covered by the + * in-lock alloc_page() fallback in anon_pipe_get_page(). + */ +static void anon_pipe_get_page_prealloc(struct anon_pipe_prealloc *preallo= c, + size_t total_len) +{ + unsigned int want, i; + struct page *page; + + prealloc->count =3D 0; + if (total_len <=3D PAGE_SIZE) + return; + + want =3D min_t(unsigned int, DIV_ROUND_UP(total_len, PAGE_SIZE), + PIPE_PREALLOC_MAX); + + for (i =3D 0; i < want; i++) { + page =3D alloc_page(GFP_HIGHUSER | __GFP_ACCOUNT); + if (!page) + break; + prealloc->pages[prealloc->count++] =3D page; + } +} + +static struct page *anon_pipe_prealloc_pop(struct anon_pipe_prealloc *prea= lloc) +{ + if (!prealloc->count) + return NULL; + + prealloc->count--; + + return prealloc->pages[prealloc->count]; +} + +static struct page *anon_pipe_get_page(struct pipe_inode_info *pipe, + struct anon_pipe_prealloc *prealloc) { + struct page *page; + + /* Drain prealloc first to keep tmp_page[] hot for later small writes. */ + page =3D anon_pipe_prealloc_pop(prealloc); + if (page) + return page; + for (int i =3D 0; i < ARRAY_SIZE(pipe->tmp_page); i++) { if (pipe->tmp_page[i]) { - struct page *page =3D pipe->tmp_page[i]; + page =3D pipe->tmp_page[i]; pipe->tmp_page[i] =3D NULL; return page; } } =20 + /* FWIW: This is called with pipe->mutex held */ return alloc_page(GFP_HIGHUSER | __GFP_ACCOUNT); } =20 @@ -139,6 +199,38 @@ static void anon_pipe_put_page(struct pipe_inode_info = *pipe, put_page(page); } =20 +/* + * Stash leftover prealloc pages in tmp_page[] so the next write to this + * pipe gets a hot page without entering the allocator. + */ +static void anon_pipe_refill_tmp_pages(struct pipe_inode_info *pipe, + struct anon_pipe_prealloc *prealloc) +{ + int i, idx; + + if (!prealloc->count) + return; + + for (i =3D 0; i < ARRAY_SIZE(pipe->tmp_page); i++) { + if (pipe->tmp_page[i]) + continue; + if (!prealloc->count) + return; + idx =3D --prealloc->count; + pipe->tmp_page[i] =3D prealloc->pages[idx]; + prealloc->pages[idx] =3D NULL; + } +} + +/* Runs after mutex_unlock() to keep put_page() out of the critical sectio= n. */ +static void anon_pipe_free_pages(struct anon_pipe_prealloc *prealloc) +{ + while (prealloc->count) { + prealloc->count--; + put_page(prealloc->pages[prealloc->count]); + } +} + static void anon_pipe_buf_release(struct pipe_inode_info *pipe, struct pipe_buffer *buf) { @@ -432,6 +524,7 @@ anon_pipe_write(struct kiocb *iocb, struct iov_iter *fr= om) { struct file *filp =3D iocb->ki_filp; struct pipe_inode_info *pipe =3D filp->private_data; + struct anon_pipe_prealloc prealloc; unsigned int head; ssize_t ret =3D 0; size_t total_len =3D iov_iter_count(from); @@ -455,6 +548,8 @@ anon_pipe_write(struct kiocb *iocb, struct iov_iter *fr= om) if (unlikely(total_len =3D=3D 0)) return 0; =20 + anon_pipe_get_page_prealloc(&prealloc, total_len); + mutex_lock(&pipe->mutex); =20 if (!pipe->readers) { @@ -512,7 +607,7 @@ anon_pipe_write(struct kiocb *iocb, struct iov_iter *fr= om) struct page *page; int copied; =20 - page =3D anon_pipe_get_page(pipe); + page =3D anon_pipe_get_page(pipe, &prealloc); if (unlikely(!page)) { if (!ret) ret =3D -ENOMEM; @@ -566,7 +661,9 @@ anon_pipe_write(struct kiocb *iocb, struct iov_iter *fr= om) * after waiting we need to re-check whether the pipe * become empty while we dropped the lock. */ + anon_pipe_refill_tmp_pages(pipe, &prealloc); mutex_unlock(&pipe->mutex); + anon_pipe_free_pages(&prealloc); if (was_empty) wake_up_interruptible_sync_poll(&pipe->rd_wait, EPOLLIN | EPOLLRDNORM); kill_fasync(&pipe->fasync_readers, SIGIO, POLL_IN); @@ -576,9 +673,11 @@ anon_pipe_write(struct kiocb *iocb, struct iov_iter *f= rom) wake_next_writer =3D true; } out: + anon_pipe_refill_tmp_pages(pipe, &prealloc); if (pipe_is_full(pipe)) wake_next_writer =3D false; mutex_unlock(&pipe->mutex); + anon_pipe_free_pages(&prealloc); =20 /* * If we do do a wakeup event, we do a 'sync' wakeup, because we --=20 2.54.0 From nobody Sun May 24 19:33:56 2026 Received: from stravinsky.debian.org (stravinsky.debian.org [82.195.75.108]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by smtp.subspace.kernel.org (Postfix) with ESMTPS id 5AB5B4779A3; Fri, 22 May 2026 16:44:45 +0000 (UTC) Authentication-Results: smtp.subspace.kernel.org; arc=none smtp.client-ip=82.195.75.108 ARC-Seal: i=1; a=rsa-sha256; d=subspace.kernel.org; s=arc-20240116; t=1779468290; cv=none; b=aotX1zwuXKX9CSpvYIYS03DtAkqbQ/X3wWkWHoxJbxXut0298KdNr3wga2qVD+B2C1oo1gM7e1O+eYiiiLJtrNcVsQA7yKnNBOBzh7bWcZ2hOD2CH7Xgq+XD/hxP8qkfO3DAT7T763fWwmJ2F7UiPWXBBva3DhO1hsJuGbuyjaA= ARC-Message-Signature: i=1; a=rsa-sha256; d=subspace.kernel.org; s=arc-20240116; t=1779468290; c=relaxed/simple; bh=7mAjnGmnl2tRP7NoOO/2moSWR2+uFvAMg8FH2VY8FV4=; h=From:Date:Subject:MIME-Version:Content-Type:Message-Id:References: In-Reply-To:To:Cc; b=t0xuUFliWmpXKzzB14+K8HSTQGt3mNDfde/Mc8DjT/VQuHSKY1fc6RS0/YUianYUdTsiRFntJo/lemZzMPCFZe1RM6S7C7Ds10X1bv4RW/MKV1ykKJamFFEuEiA9uXIc+RQh0Pb9ZzUy8+fMN1qpbpCP+7O9HnposTmH79xAsNE= ARC-Authentication-Results: i=1; smtp.subspace.kernel.org; dmarc=pass (p=none dis=none) header.from=debian.org; spf=pass smtp.mailfrom=debian.org; dkim=pass (2048-bit key) header.d=debian.org header.i=@debian.org header.b=cZCRUsu0; arc=none smtp.client-ip=82.195.75.108 Authentication-Results: smtp.subspace.kernel.org; dmarc=pass (p=none dis=none) header.from=debian.org Authentication-Results: smtp.subspace.kernel.org; spf=pass smtp.mailfrom=debian.org Authentication-Results: smtp.subspace.kernel.org; dkim=pass (2048-bit key) header.d=debian.org header.i=@debian.org header.b="cZCRUsu0" DKIM-Signature: v=1; a=rsa-sha256; q=dns/txt; c=relaxed/relaxed; d=debian.org; s=smtpauto.stravinsky; h=X-Debian-User:Cc:To:In-Reply-To:References: Message-Id:Content-Transfer-Encoding:Content-Type:MIME-Version:Subject:Date: From:Reply-To:Content-ID:Content-Description; bh=EWaztlYerTKMeOVXIOGdyQBh8Ke8p77OppYttuEET6A=; b=cZCRUsu0IpMEHNIO3mWEUDWsP+ m3Dk2dX8eq6MYbLTf3UiWme+1uqmuy3PiL/ZNpuADpjVZ1Epeje1EIg6GnsJ/UdYMnCdJuzeZgIHm bJm3fiHZ1HNf4WkXjdPi+ASP3ruFl1NpO0Z6RaPnQa6pCFhnaPxEzf5tfpOO7mGxKiujL6P+ek6vG dqXW0qx1/K3+SFDzm9mNC+pSk+fyKeZe7HALDYBUscLjS23JdkS6mWNY+nYlBNsZMfrgIIYvWOmov ZR+wpbjTwjk+zV1KX0aRqFu3J2m7aRrKHXe0cGWYhx6Ok3BK+awYz95gvrcx6Lt51y5dMnBk60sRj wCGdGhlQ==; Received: from authenticated user by stravinsky.debian.org with esmtpsa (TLS1.3:ECDHE_X25519__RSA_PSS_RSAE_SHA256__AES_256_GCM:256) (Exim 4.96) (envelope-from ) id 1wQSzH-004nmt-2L; Fri, 22 May 2026 16:44:40 +0000 From: Breno Leitao Date: Fri, 22 May 2026 09:44:22 -0700 Subject: [PATCH v2 2/2] selftests/pipe: add pipe_bench microbenchmark Precedence: bulk X-Mailing-List: linux-kernel@vger.kernel.org List-Id: List-Subscribe: List-Unsubscribe: MIME-Version: 1.0 Content-Type: text/plain; charset="utf-8" Content-Transfer-Encoding: quoted-printable Message-Id: <20260522-fix_pipe-v2-2-a8b35a78244e@debian.org> References: <20260522-fix_pipe-v2-0-a8b35a78244e@debian.org> In-Reply-To: <20260522-fix_pipe-v2-0-a8b35a78244e@debian.org> To: Alexander Viro , Christian Brauner , Jan Kara , Shuah Khan , Mateusz Guzik Cc: linux-fsdevel@vger.kernel.org, linux-kernel@vger.kernel.org, linux-kselftest@vger.kernel.org, shakeel.butt@linux.dev, jlayton@kernel.org, oleg@redhat.com, axboe@kernel.dk, Breno Leitao , kernel-team@meta.com X-Mailer: b4 0.16-dev-d5d98 X-Developer-Signature: v=1; a=openpgp-sha256; l=18717; i=leitao@debian.org; h=from:subject:message-id; bh=7mAjnGmnl2tRP7NoOO/2moSWR2+uFvAMg8FH2VY8FV4=; b=owEBbQKS/ZANAwAIATWjk5/8eHdtAcsmYgBqEIfoW6LNYs+pHMCl6DehAOSvFwKyfk+OeGV5F Mo8dSUeWM2JAjMEAAEIAB0WIQSshTmm6PRnAspKQ5s1o5Of/Hh3bQUCahCH6AAKCRA1o5Of/Hh3 bRLUD/9wULRXDnUoZhbM/cPVc9SJxKWp41ryvCUexyRxBR+2kuJth0J1UA7dyZWuz9a1atY3neX w+8v/JiWxAVSPXW/kjIki8jlJfTXXx5Lrh9TtZMJjqxHsQK5UBhHSjZsaevsQB4Tb3UzLJhNL8p L4ohNU0uKO63yFTucmcz2iskXvYM2R7Pep4rMORkF1xByVGZOVUKUz4CK+r9W8YMVBU9tPJqLqJ rub03cW2XzH7eXJWA4o9C2IvsyUT72qWavxalePiRI5NkgRKgg6VuCvvmpr/AEVqi/9zpMYBY// Jk9BuBtWcq30NT46NA+FW/p/Ye9TVL5RG0LYRpOtagP4SC/nk9uMbaahH4YxsfoynMJTiywaZ0i 4ROYv4vQMkBf+YHPWJylrtKbepZGwGLQbbeZRQugU5KFNwaJPOHcegGghhPrxND5EGaKXrmAgSQ 9LYCR5bn2uExw01b/epj9GIX40thmpRCObkx5/isfZhAYvZefbrQ6CWvelz0monbb9qVRfdqB98 3+pOKVdmhueGGbf0dpXtefaMEBKQC2l9ViRgRAz8aU0cY65Z8Bh2bVfXL7s/bfynEvYBLoPvzG2 mijhRXSP1pDyDkq4UqpgI2aHfKy87SXvhzWcW/uks4Dyf6OPoHF8JdpHpSc2EK92JGITxhUxA0I XiIBRt4YLXMFFxQ== X-Developer-Key: i=leitao@debian.org; a=openpgp; fpr=AC8539A6E8F46702CA4A439B35A3939FFC78776D X-Debian-User: leitao Add a small selftest that stresses pipe->mutex contention by spawning N writer threads that hammer a single pipe with multi-page writes, plus M reader threads that drain. Each writer records its own write() latency samples into a log2-bucketed histogram; main aggregates and prints total writes, throughput, average and percentile (p50/p99) latencies, and the maximum observed latency. Pass --memory-pressure to fork stress-ng (--vm 4 --vm-bytes 80% --vm-method all) for the duration of the run, so alloc_page() in anon_pipe_write() routinely hits direct reclaim. The flag fails fast if stress-ng is not on $PATH. Program print something like the following, for different writes, readers, msgsizes and memory pressure: config: writers=3DX readers=3DY msgsize=3DZ duration=3D3 pipe_size=3D10485= 76 memory_pressure=3D[no|yes] writes: total=3D54451 rate=3D18150/s throughput_MBps: 1134.40 lat_avg_ns: 275355 lat_p50_ns_upper: 262143 lat_p99_ns_upper: 1048575 lat_max_ns: 2145633 Signed-off-by: Breno Leitao Reviewed-by: Jeff Layton --- tools/testing/selftests/Makefile | 1 + tools/testing/selftests/pipe/.gitignore | 1 + tools/testing/selftests/pipe/Makefile | 9 + tools/testing/selftests/pipe/pipe_bench.c | 616 ++++++++++++++++++++++++++= ++++ 4 files changed, 627 insertions(+) diff --git a/tools/testing/selftests/Makefile b/tools/testing/selftests/Mak= efile index 6e59b8f63e416..bcd9db9d292ca 100644 --- a/tools/testing/selftests/Makefile +++ b/tools/testing/selftests/Makefile @@ -91,6 +91,7 @@ TARGETS +=3D pcie_bwctrl TARGETS +=3D perf_events TARGETS +=3D pidfd TARGETS +=3D pid_namespace +TARGETS +=3D pipe TARGETS +=3D power_supply TARGETS +=3D powerpc TARGETS +=3D prctl diff --git a/tools/testing/selftests/pipe/.gitignore b/tools/testing/selfte= sts/pipe/.gitignore new file mode 100644 index 0000000000000..20b549361a152 --- /dev/null +++ b/tools/testing/selftests/pipe/.gitignore @@ -0,0 +1 @@ +pipe_bench diff --git a/tools/testing/selftests/pipe/Makefile b/tools/testing/selftest= s/pipe/Makefile new file mode 100644 index 0000000000000..1810c680117b3 --- /dev/null +++ b/tools/testing/selftests/pipe/Makefile @@ -0,0 +1,9 @@ +# SPDX-License-Identifier: GPL-2.0 +# Copyright (c) 2026 Meta Platforms, Inc. and affiliates +# Copyright (c) 2026 Breno Leitao + +CFLAGS +=3D -O2 -Wall -Wextra -pthread + +TEST_GEN_PROGS :=3D pipe_bench + +include ../lib.mk diff --git a/tools/testing/selftests/pipe/pipe_bench.c b/tools/testing/self= tests/pipe/pipe_bench.c new file mode 100644 index 0000000000000..7e96429b8fb4d --- /dev/null +++ b/tools/testing/selftests/pipe/pipe_bench.c @@ -0,0 +1,616 @@ +// SPDX-License-Identifier: GPL-2.0 +/* + * pipe_bench - exercise concurrent pipe operation + * + * N writer threads hammer a single pipe with multi-page writes; M reader + * threads drain it. Each writer records its own write() latency histogram. + * Multi-page writes (msgsize >=3D PAGE_SIZE) force the loop in + * anon_pipe_write() to call alloc_page(GFP_HIGHUSER | __GFP_ACCOUNT) under + * pipe->mutex, which is the critical section the patch shrinks. + * + * By default the benchmark sweeps writers in {1, 2, 5} x readers in + * {1, 5, 10} and prints one block per configuration so two runs (e.g. + * baseline vs patched) can be diffed directly. Pass -w and -r to run a + * single configuration instead. Pass --memory-pressure to spawn stress-ng + * alongside the sweep so the per-page alloc_page() path under pipe->mutex + * has to dip into reclaim. + * + * Copyright (c) 2026 Meta Platforms, Inc. and affiliates + * Copyright (c) 2026 Breno Leitao + */ + +#define _GNU_SOURCE +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#define ARRAY_SIZE(a) (sizeof(a) / sizeof((a)[0])) +#define HIST_BUCKETS 32 + +static size_t g_msgsize =3D 16 * 4096; +static int g_duration =3D 3; +static int g_pipe_size =3D 1024 * 1024; +static int g_memory_pressure; + +static atomic_int g_stop; +static int g_pipe[2]; + +struct wstats { + uint64_t writes; + uint64_t bytes; + uint64_t lat_sum_ns; + uint64_t lat_max_ns; + uint64_t lat_hist[HIST_BUCKETS]; + char *buf; +}; + +struct rstats { + char *buf; +}; + +struct hist_totals { + uint64_t writes; + uint64_t bytes; + uint64_t lat_sum; + uint64_t lat_max; +}; + +static inline uint64_t now_ns(void) +{ + struct timespec ts; + + clock_gettime(CLOCK_MONOTONIC, &ts); + return (uint64_t)ts.tv_sec * 1000000000ull + (uint64_t)ts.tv_nsec; +} + +static inline int log2_bucket(uint64_t v) +{ + int b =3D 0; + + if (!v) + return 0; + while (v >>=3D 1) + b++; + return b < HIST_BUCKETS ? b : HIST_BUCKETS - 1; +} + +static void *writer(void *arg) +{ + struct wstats *s =3D arg; + + while (!atomic_load_explicit(&g_stop, memory_order_relaxed)) { + uint64_t t0 =3D now_ns(); + ssize_t n =3D write(g_pipe[1], s->buf, g_msgsize); + uint64_t dt =3D now_ns() - t0; + + if (n > 0) { + s->writes++; + s->bytes +=3D (uint64_t)n; + s->lat_sum_ns +=3D dt; + if (dt > s->lat_max_ns) + s->lat_max_ns =3D dt; + s->lat_hist[log2_bucket(dt)]++; + } else if (n < 0 && (errno =3D=3D EPIPE || errno =3D=3D EBADF)) { + break; + } + } + return NULL; +} + +static void *reader(void *arg) +{ + struct rstats *s =3D arg; + + /* + * Drain until EOF (write end closed by main). g_stop is not checked + * here on purpose: writers may be blocked in write() with the pipe + * full when g_stop is set, so the reader must keep draining until + * main closes the write end. + */ + for (;;) { + ssize_t n =3D read(g_pipe[0], s->buf, g_msgsize); + + if (n <=3D 0) + break; + } + return NULL; +} + +/* Sum per-writer stats and per-bucket counts into the caller's aggregates= . */ +static void aggregate_wstats(struct wstats *all, int nw, + uint64_t agg[HIST_BUCKETS], + struct hist_totals *t) +{ + memset(t, 0, sizeof(*t)); + for (int i =3D 0; i < nw; i++) { + t->writes +=3D all[i].writes; + t->bytes +=3D all[i].bytes; + t->lat_sum +=3D all[i].lat_sum_ns; + if (all[i].lat_max_ns > t->lat_max) + t->lat_max =3D all[i].lat_max_ns; + for (int b =3D 0; b < HIST_BUCKETS; b++) + agg[b] +=3D all[i].lat_hist[b]; + } +} + +/* + * Walk @agg in order, returning the inclusive upper bound (in ns) of the + * log2 bucket where the running sum first reaches @target. + * + * A percentile is undefined with zero samples, and with very low sample + * counts integer truncation could make @target zero -- then "cum >=3D 0" + * would latch on the first (possibly empty) bucket. Callers must pass + * @target >=3D 1. + */ +static uint64_t bucket_at(const uint64_t agg[HIST_BUCKETS], uint64_t targe= t) +{ + uint64_t cum =3D 0; + + for (int b =3D 0; b < HIST_BUCKETS; b++) { + /* HIST_BUCKETS <=3D 63, so (b + 1) is always a safe shift. */ + uint64_t upper =3D (1ULL << (b + 1)) - 1; + + cum +=3D agg[b]; + if (cum >=3D target) + return upper; + } + return 0; +} + +static void compute_p50_p99(const uint64_t agg[HIST_BUCKETS], uint64_t wri= tes, + uint64_t *p50, uint64_t *p99) +{ + uint64_t p50_target, p99_target; + + *p50 =3D *p99 =3D 0; + if (!writes) + return; + + p50_target =3D writes * 50 / 100; + p99_target =3D writes * 99 / 100; + if (!p50_target) + p50_target =3D 1; + if (!p99_target) + p99_target =3D 1; + + *p50 =3D bucket_at(agg, p50_target); + *p99 =3D bucket_at(agg, p99_target); +} + +static void print_summary(int nw, int nr, const struct hist_totals *t, + uint64_t p50, uint64_t p99) +{ + double sec =3D g_duration; + uint64_t avg_ns =3D t->writes ? t->lat_sum / t->writes : 0; + + printf("config: writers=3D%d readers=3D%d msgsize=3D%zu duration=3D%d pip= e_size=3D%d memory_pressure=3D%s\n", + nw, nr, g_msgsize, g_duration, g_pipe_size, + g_memory_pressure ? "yes" : "no"); + printf("writes: total=3D%llu rate=3D%.0f/s\n", + (unsigned long long)t->writes, (double)t->writes / sec); + printf("throughput_MBps: %.2f\n", + ((double)t->bytes / sec) / (1024.0 * 1024.0)); + printf("lat_avg_ns: %llu\n", (unsigned long long)avg_ns); + printf("lat_p50_ns_upper: %llu\n", (unsigned long long)p50); + printf("lat_p99_ns_upper: %llu\n", (unsigned long long)p99); + printf("lat_max_ns: %llu\n", (unsigned long long)t->lat_max); +} + +static void summarize(struct wstats *all, int nw, int nr) +{ + uint64_t agg[HIST_BUCKETS] =3D {0}; + struct hist_totals t; + uint64_t p50, p99; + + aggregate_wstats(all, nw, agg, &t); + compute_p50_p99(agg, t.writes, &p50, &p99); + print_summary(nw, nr, &t, p50, p99); +} + +/* + * Child branch of fork(): restore SIGPIPE to default (parent ignores it), + * exec stress-ng, and on failure write the reason into @hs_wr before + * exiting. The parent observes EOF on hs_wr (closed via O_CLOEXEC) when + * exec succeeds. + */ +static void stress_ng_child(int hs_wr) __attribute__((noreturn)); +static void stress_ng_child(int hs_wr) +{ + char errbuf[256]; + + signal(SIGPIPE, SIG_DFL); + execlp("stress-ng", "stress-ng", + "--vm", "4", "--vm-bytes", "80%", + "--vm-method", "all", + (char *)NULL); + snprintf(errbuf, sizeof(errbuf), + "exec stress-ng failed: %s\n", strerror(errno)); + (void)!write(hs_wr, errbuf, strlen(errbuf)); + _exit(127); +} + +/* + * Read from the O_CLOEXEC handshake pipe. Anything readable means the + * child wrote an error before exec; EOF (n =3D=3D 0) means the write-end + * closed because exec succeeded. Returns 0 on exec success, -1 if the + * child failed and was reaped. + */ +static int stress_ng_wait_handshake(int hs_rd, pid_t pid) +{ + struct pollfd pfd =3D { .fd =3D hs_rd, .events =3D POLLIN }; + char errbuf[256]; + int status; + int ret; + + ret =3D poll(&pfd, 1, 500); + if (ret <=3D 0) + return 0; + + ssize_t n =3D read(hs_rd, errbuf, sizeof(errbuf) - 1); + + if (n > 0) { + errbuf[n] =3D '\0'; + fputs(errbuf, stderr); + waitpid(pid, &status, 0); + return -1; + } + return 0; +} + +static pid_t spawn_stress_ng(void) +{ + int hs[2]; + pid_t pid; + + /* + * Handshake pipe: child writes one byte and _exit()s on exec + * failure. On exec success the O_CLOEXEC flag closes the write + * end, which the parent observes as EOF. This makes the "is + * stress-ng on $PATH?" check fail fast rather than silently. + */ + if (pipe2(hs, O_CLOEXEC) < 0) { + perror("pipe2"); + return -1; + } + + pid =3D fork(); + if (pid < 0) { + perror("fork"); + close(hs[0]); + close(hs[1]); + return -1; + } + if (pid =3D=3D 0) { + close(hs[0]); + stress_ng_child(hs[1]); + } + + close(hs[1]); + if (stress_ng_wait_handshake(hs[0], pid) < 0) { + close(hs[0]); + return -1; + } + close(hs[0]); + + /* Give stress-ng a moment to map its VM regions before measuring. */ + sleep(1); + return pid; +} + +static void kill_stress_ng(pid_t pid) +{ + int status; + + if (pid <=3D 0) + return; + kill(pid, SIGTERM); + for (int i =3D 0; i < 20; i++) { + if (waitpid(pid, &status, WNOHANG) > 0) + return; + usleep(100 * 1000); + } + kill(pid, SIGKILL); + waitpid(pid, &status, 0); +} + +/* + * Allocate per-thread page-aligned buffers in main so a failed + * aligned_alloc() aborts the run before any thread starts. Workers used + * to allocate their own buffer and return NULL on failure, which left + * peers blocked in write()/read() with nobody to unblock them. + */ +static int alloc_thread_bufs(struct wstats *ws, int nw, + struct rstats *rs, int nr) +{ + for (int i =3D 0; i < nw; i++) { + ws[i].buf =3D aligned_alloc(4096, g_msgsize); + if (!ws[i].buf) { + fprintf(stderr, "writer %d: aligned_alloc(%zu) failed\n", + i, g_msgsize); + return -1; + } + memset(ws[i].buf, 0xAA, g_msgsize); + } + for (int i =3D 0; i < nr; i++) { + rs[i].buf =3D aligned_alloc(4096, g_msgsize); + if (!rs[i].buf) { + fprintf(stderr, "reader %d: aligned_alloc(%zu) failed\n", + i, g_msgsize); + return -1; + } + } + return 0; +} + +static void free_thread_bufs(struct wstats *ws, int nw, + struct rstats *rs, int nr) +{ + if (ws) + for (int i =3D 0; i < nw; i++) + free(ws[i].buf); + if (rs) + for (int i =3D 0; i < nr; i++) + free(rs[i].buf); +} + +static int start_readers(pthread_t *rt, struct rstats *rs, int nr, + int *created) +{ + for (int i =3D 0; i < nr; i++) { + int err =3D pthread_create(&rt[i], NULL, reader, &rs[i]); + + if (err) { + fprintf(stderr, "pthread_create reader %d: %s\n", + i, strerror(err)); + return -1; + } + (*created)++; + } + return 0; +} + +static int start_writers(pthread_t *wt, struct wstats *ws, int nw, + int *created) +{ + for (int i =3D 0; i < nw; i++) { + int err =3D pthread_create(&wt[i], NULL, writer, &ws[i]); + + if (err) { + fprintf(stderr, "pthread_create writer %d: %s\n", + i, strerror(err)); + return -1; + } + (*created)++; + } + return 0; +} + +static int open_bench_pipe(void) +{ + if (pipe(g_pipe) < 0) { + perror("pipe"); + return -1; + } + if (fcntl(g_pipe[1], F_SETPIPE_SZ, g_pipe_size) < 0) + perror("F_SETPIPE_SZ (continuing)"); + return 0; +} + +/* + * Normal termination: g_stop tells writers to leave the loop after the + * current write() returns. Closing the shared write-end fd means once + * the in-flight writes drain, readers see EOF and exit. Writers are not + * unblocked by EPIPE here -- g_pipe[0] stays open so readers can keep + * draining. + * + * Error path: some threads may have been created and others skipped, so + * writers could be blocked in write() with no reader making progress. + * Close both ends -- closing the read end is what delivers EPIPE to a + * blocked writer. + */ +static void stop_and_join(pthread_t *wt, int nw_created, + pthread_t *rt, int nr_created, int rc) +{ + atomic_store(&g_stop, 1); + close(g_pipe[1]); + if (rc < 0) + close(g_pipe[0]); + for (int i =3D 0; i < nw_created; i++) + pthread_join(wt[i], NULL); + for (int i =3D 0; i < nr_created; i++) + pthread_join(rt[i], NULL); + if (rc =3D=3D 0) + close(g_pipe[0]); +} + +static int run_one(int nw, int nr) +{ + pthread_t *wt =3D NULL, *rt =3D NULL; + struct wstats *ws =3D NULL; + struct rstats *rs =3D NULL; + int nw_created =3D 0, nr_created =3D 0; + int rc =3D 0; + + atomic_store(&g_stop, 0); + + if (open_bench_pipe() < 0) + return -1; + + wt =3D calloc((size_t)nw, sizeof(*wt)); + rt =3D calloc((size_t)nr, sizeof(*rt)); + ws =3D calloc((size_t)nw, sizeof(*ws)); + rs =3D calloc((size_t)nr, sizeof(*rs)); + if (!wt || !rt || !ws || !rs) { + fprintf(stderr, "alloc failed\n"); + rc =3D -1; + goto teardown; + } + + if (alloc_thread_bufs(ws, nw, rs, nr) < 0) { + rc =3D -1; + goto teardown; + } + + if (start_readers(rt, rs, nr, &nr_created) < 0 || + start_writers(wt, ws, nw, &nw_created) < 0) { + rc =3D -1; + goto teardown; + } + + sleep((unsigned int)g_duration); + +teardown: + stop_and_join(wt, nw_created, rt, nr_created, rc); + + if (rc =3D=3D 0) { + summarize(ws, nw, nr); + fflush(stdout); + } + + free_thread_bufs(ws, nw, rs, nr); + free(wt); + free(rt); + free(ws); + free(rs); + return rc; +} + +static void usage(const char *prog) +{ + fprintf(stderr, + "usage: %s [-w writers] [-r readers] [-s msgsize] [-d secs] [-p pipe_siz= e] [--memory-pressure]\n" + " default: sweep writers=3D{1,2,5} x readers=3D{1,5,10}\n" + " --memory-pressure: spawn stress-ng (--vm 4 --vm-bytes 80%% --vm-metho= d all) for the run\n", + prog); +} + +static int parse_args(int argc, char **argv, + int *writers_override, int *readers_override) +{ + static const struct option long_opts[] =3D { + {"memory-pressure", no_argument, NULL, 'M'}, + {0, 0, 0, 0}, + }; + int opt; + + while ((opt =3D getopt_long(argc, argv, "w:r:s:d:p:", + long_opts, NULL)) !=3D -1) { + switch (opt) { + case 'w': + *writers_override =3D atoi(optarg); + break; + case 'r': + *readers_override =3D atoi(optarg); + break; + case 's': + g_msgsize =3D (size_t)atol(optarg); + break; + case 'd': + g_duration =3D atoi(optarg); + break; + case 'p': + g_pipe_size =3D atoi(optarg); + break; + case 'M': + g_memory_pressure =3D 1; + break; + default: + usage(argv[0]); + return -1; + } + } + return 0; +} + +/* + * aligned_alloc(4096, size) requires size to be a multiple of the + * alignment (C11); glibc returns NULL otherwise, which would make + * writer/reader threads silently exit and the run report zero writes. + * Validate up front instead. + */ +static int validate_args(void) +{ + if (g_msgsize =3D=3D 0 || g_msgsize % 4096 !=3D 0) { + fprintf(stderr, + "msgsize must be a positive multiple of 4096 (got %zu)\n", + g_msgsize); + return -1; + } + if (g_duration <=3D 0) { + fprintf(stderr, "duration must be > 0 seconds (got %d)\n", + g_duration); + return -1; + } + if (g_pipe_size <=3D 0) { + fprintf(stderr, "pipe_size must be > 0 bytes (got %d)\n", + g_pipe_size); + return -1; + } + return 0; +} + +static int run_sweep(void) +{ + static const int writers_sweep[] =3D {1, 2, 5}; + static const int readers_sweep[] =3D {1, 5, 10}; + + for (size_t i =3D 0; i < ARRAY_SIZE(writers_sweep); i++) { + for (size_t j =3D 0; j < ARRAY_SIZE(readers_sweep); j++) { + printf("---\n"); + if (run_one(writers_sweep[i], readers_sweep[j]) < 0) + return -1; + } + } + return 0; +} + +int main(int argc, char **argv) +{ + int writers_override =3D 0, readers_override =3D 0; + pid_t stress_pid =3D -1; + int rc =3D 0; + + if (parse_args(argc, argv, &writers_override, &readers_override) < 0) + return 1; + if (validate_args() < 0) + return 1; + + signal(SIGPIPE, SIG_IGN); + setvbuf(stdout, NULL, _IOLBF, 0); + setvbuf(stderr, NULL, _IOLBF, 0); + + fprintf(stderr, "pid=3D%d\n", getpid()); + fflush(stderr); + + if (g_memory_pressure) { + stress_pid =3D spawn_stress_ng(); + if (stress_pid < 0) { + fprintf(stderr, + "memory_pressure requested but stress-ng could not be spawned\n"); + return 1; + } + } + + if (writers_override > 0 || readers_override > 0) { + int nw =3D writers_override > 0 ? writers_override : 1; + int nr =3D readers_override > 0 ? readers_override : 1; + + rc =3D run_one(nw, nr) < 0 ? 1 : 0; + } else { + rc =3D run_sweep() < 0 ? 1 : 0; + } + + kill_stress_ng(stress_pid); + return rc; +} --=20 2.54.0