In some cases, instead of always consuming all items from ring buffers
in a greedy way, we may want to consume up to a certain amount of items,
for example when we need to copy items from the BPF ring buffer to a
limited user buffer.
This change allows to set an upper limit to the amount of items consumed
from one or more ring buffers.
Link: https://lore.kernel.org/lkml/20240310154726.734289-1-andrea.righi@canonical.com/T
Signed-off-by: Andrea Righi <andrea.righi@canonical.com>
---
tools/lib/bpf/ringbuf.c | 29 +++++++++++++++++------------
1 file changed, 17 insertions(+), 12 deletions(-)
diff --git a/tools/lib/bpf/ringbuf.c b/tools/lib/bpf/ringbuf.c
index aacb64278a01..81df535040d1 100644
--- a/tools/lib/bpf/ringbuf.c
+++ b/tools/lib/bpf/ringbuf.c
@@ -231,7 +231,7 @@ static inline int roundup_len(__u32 len)
return (len + 7) / 8 * 8;
}
-static int64_t ringbuf_process_ring(struct ring *r)
+static int64_t ringbuf_process_ring(struct ring *r, int64_t max_items)
{
int *len_ptr, len, err;
/* 64-bit to avoid overflow in case of extreme application behavior */
@@ -264,7 +264,14 @@ static int64_t ringbuf_process_ring(struct ring *r)
cons_pos);
return err;
}
- cnt++;
+ if (++cnt >= max_items) {
+ /* update consumer pos and return the
+ * total amount of items consumed.
+ */
+ smp_store_release(r->consumer_pos,
+ cons_pos);
+ goto done;
+ }
}
smp_store_release(r->consumer_pos, cons_pos);
@@ -281,19 +288,18 @@ static int64_t ringbuf_process_ring(struct ring *r)
*/
int ring_buffer__consume(struct ring_buffer *rb)
{
- int64_t err, res = 0;
+ int64_t err, res = 0, max_items = INT_MAX;
int i;
for (i = 0; i < rb->ring_cnt; i++) {
struct ring *ring = rb->rings[i];
- err = ringbuf_process_ring(ring);
+ err = ringbuf_process_ring(ring, max_items);
if (err < 0)
return libbpf_err(err);
res += err;
+ max_items -= err;
}
- if (res > INT_MAX)
- return INT_MAX;
return res;
}
@@ -304,7 +310,7 @@ int ring_buffer__consume(struct ring_buffer *rb)
int ring_buffer__poll(struct ring_buffer *rb, int timeout_ms)
{
int i, cnt;
- int64_t err, res = 0;
+ int64_t err, res = 0, max_items = INT_MAX;
cnt = epoll_wait(rb->epoll_fd, rb->events, rb->ring_cnt, timeout_ms);
if (cnt < 0)
@@ -314,13 +320,12 @@ int ring_buffer__poll(struct ring_buffer *rb, int timeout_ms)
__u32 ring_id = rb->events[i].data.fd;
struct ring *ring = rb->rings[ring_id];
- err = ringbuf_process_ring(ring);
+ err = ringbuf_process_ring(ring, max_items);
if (err < 0)
return libbpf_err(err);
res += err;
+ max_items -= err;
}
- if (res > INT_MAX)
- return INT_MAX;
return res;
}
@@ -375,11 +380,11 @@ int ring__consume(struct ring *r)
{
int64_t res;
- res = ringbuf_process_ring(r);
+ res = ringbuf_process_ring(r, INT_MAX);
if (res < 0)
return libbpf_err(res);
- return res > INT_MAX ? INT_MAX : res;
+ return res;
}
static void user_ringbuf_unmap_ring(struct user_ring_buffer *rb)
--
2.43.0
On Mon, Apr 1, 2024 at 12:32 AM Andrea Righi <andrea.righi@canonical.com> wrote:
>
> In some cases, instead of always consuming all items from ring buffers
> in a greedy way, we may want to consume up to a certain amount of items,
> for example when we need to copy items from the BPF ring buffer to a
> limited user buffer.
>
> This change allows to set an upper limit to the amount of items consumed
> from one or more ring buffers.
>
> Link: https://lore.kernel.org/lkml/20240310154726.734289-1-andrea.righi@canonical.com/T
> Signed-off-by: Andrea Righi <andrea.righi@canonical.com>
> ---
> tools/lib/bpf/ringbuf.c | 29 +++++++++++++++++------------
> 1 file changed, 17 insertions(+), 12 deletions(-)
>
> diff --git a/tools/lib/bpf/ringbuf.c b/tools/lib/bpf/ringbuf.c
> index aacb64278a01..81df535040d1 100644
> --- a/tools/lib/bpf/ringbuf.c
> +++ b/tools/lib/bpf/ringbuf.c
> @@ -231,7 +231,7 @@ static inline int roundup_len(__u32 len)
> return (len + 7) / 8 * 8;
> }
>
> -static int64_t ringbuf_process_ring(struct ring *r)
> +static int64_t ringbuf_process_ring(struct ring *r, int64_t max_items)
> {
> int *len_ptr, len, err;
> /* 64-bit to avoid overflow in case of extreme application behavior */
> @@ -264,7 +264,14 @@ static int64_t ringbuf_process_ring(struct ring *r)
> cons_pos);
> return err;
> }
> - cnt++;
> + if (++cnt >= max_items) {
> + /* update consumer pos and return the
> + * total amount of items consumed.
> + */
> + smp_store_release(r->consumer_pos,
> + cons_pos);
Does this fit on a single line under 100 characters? If yes, please
keep it as a single line
but actually it seems cleaner to keep cnt++ intact, let
smp_store_release() below happen, and then check the exit condition.
Were you afraid to do unnecessary checks on discarded samples? I
wouldn't worry about that.
> + goto done;
> + }
> }
>
> smp_store_release(r->consumer_pos, cons_pos);
> @@ -281,19 +288,18 @@ static int64_t ringbuf_process_ring(struct ring *r)
> */
> int ring_buffer__consume(struct ring_buffer *rb)
> {
> - int64_t err, res = 0;
> + int64_t err, res = 0, max_items = INT_MAX;
I'm wondering if it might be better to have a convention that zero
means "no limit", which might allow the compiler to eliminate the
condition in ringbuf_process_ring altogether due to constant
propagation. WDYT?
> int i;
>
> for (i = 0; i < rb->ring_cnt; i++) {
> struct ring *ring = rb->rings[i];
>
> - err = ringbuf_process_ring(ring);
> + err = ringbuf_process_ring(ring, max_items);
> if (err < 0)
> return libbpf_err(err);
> res += err;
> + max_items -= err;
> }
> - if (res > INT_MAX)
> - return INT_MAX;
> return res;
> }
[...]
On Tue, Apr 02, 2024 at 10:58:33AM -0700, Andrii Nakryiko wrote:
> On Mon, Apr 1, 2024 at 12:32 AM Andrea Righi <andrea.righi@canonical.com> wrote:
> >
> > In some cases, instead of always consuming all items from ring buffers
> > in a greedy way, we may want to consume up to a certain amount of items,
> > for example when we need to copy items from the BPF ring buffer to a
> > limited user buffer.
> >
> > This change allows to set an upper limit to the amount of items consumed
> > from one or more ring buffers.
> >
> > Link: https://lore.kernel.org/lkml/20240310154726.734289-1-andrea.righi@canonical.com/T
> > Signed-off-by: Andrea Righi <andrea.righi@canonical.com>
> > ---
> > tools/lib/bpf/ringbuf.c | 29 +++++++++++++++++------------
> > 1 file changed, 17 insertions(+), 12 deletions(-)
> >
> > diff --git a/tools/lib/bpf/ringbuf.c b/tools/lib/bpf/ringbuf.c
> > index aacb64278a01..81df535040d1 100644
> > --- a/tools/lib/bpf/ringbuf.c
> > +++ b/tools/lib/bpf/ringbuf.c
> > @@ -231,7 +231,7 @@ static inline int roundup_len(__u32 len)
> > return (len + 7) / 8 * 8;
> > }
> >
> > -static int64_t ringbuf_process_ring(struct ring *r)
> > +static int64_t ringbuf_process_ring(struct ring *r, int64_t max_items)
> > {
> > int *len_ptr, len, err;
> > /* 64-bit to avoid overflow in case of extreme application behavior */
> > @@ -264,7 +264,14 @@ static int64_t ringbuf_process_ring(struct ring *r)
> > cons_pos);
> > return err;
> > }
> > - cnt++;
> > + if (++cnt >= max_items) {
> > + /* update consumer pos and return the
> > + * total amount of items consumed.
> > + */
> > + smp_store_release(r->consumer_pos,
> > + cons_pos);
>
> Does this fit on a single line under 100 characters? If yes, please
> keep it as a single line
>
> but actually it seems cleaner to keep cnt++ intact, let
> smp_store_release() below happen, and then check the exit condition.
> Were you afraid to do unnecessary checks on discarded samples? I
> wouldn't worry about that.
Ok, it makes sense, I'll change it.
>
> > + goto done;
> > + }
> > }
> >
> > smp_store_release(r->consumer_pos, cons_pos);
> > @@ -281,19 +288,18 @@ static int64_t ringbuf_process_ring(struct ring *r)
> > */
> > int ring_buffer__consume(struct ring_buffer *rb)
> > {
> > - int64_t err, res = 0;
> > + int64_t err, res = 0, max_items = INT_MAX;
>
> I'm wondering if it might be better to have a convention that zero
> means "no limit", which might allow the compiler to eliminate the
> condition in ringbuf_process_ring altogether due to constant
> propagation. WDYT?
Indeed, in this way we won't add any potential overhead to the existing
code that doesn't care about max_items. Will add that.
-Andrea
© 2016 - 2026 Red Hat, Inc.