The new wrappers tun_ring_consume/tap_ring_consume deal with consuming an
entry of the ptr_ring and then waking the netdev queue when entries got
invalidated to be used again by the producer.
To avoid waking the netdev queue when the ptr_ring is full, it is checked
if the netdev queue is stopped before invalidating entries. Like that the
netdev queue can be safely woken after invalidating entries.
The READ_ONCE in __ptr_ring_peek, paired with the smp_wmb() in
__ptr_ring_produce within tun_net_xmit guarantees that the information
about the netdev queue being stopped is visible after __ptr_ring_peek is
called.
The netdev queue is also woken after resizing the ptr_ring.
Co-developed-by: Tim Gebauer <tim.gebauer@tu-dortmund.de>
Signed-off-by: Tim Gebauer <tim.gebauer@tu-dortmund.de>
Signed-off-by: Simon Schippers <simon.schippers@tu-dortmund.de>
---
drivers/net/tap.c | 44 +++++++++++++++++++++++++++++++++++++++++++-
drivers/net/tun.c | 47 +++++++++++++++++++++++++++++++++++++++++++++--
2 files changed, 88 insertions(+), 3 deletions(-)
diff --git a/drivers/net/tap.c b/drivers/net/tap.c
index 1197f245e873..f8292721a9d6 100644
--- a/drivers/net/tap.c
+++ b/drivers/net/tap.c
@@ -753,6 +753,46 @@ static ssize_t tap_put_user(struct tap_queue *q,
return ret ? ret : total;
}
+static struct sk_buff *tap_ring_consume(struct tap_queue *q)
+{
+ struct netdev_queue *txq;
+ struct net_device *dev;
+ bool will_invalidate;
+ bool stopped;
+ void *ptr;
+
+ spin_lock(&q->ring.consumer_lock);
+ ptr = __ptr_ring_peek(&q->ring);
+ if (!ptr) {
+ spin_unlock(&q->ring.consumer_lock);
+ return ptr;
+ }
+
+ /* Check if the queue stopped before zeroing out, so no ptr get
+ * produced in the meantime, because this could result in waking
+ * even though the ptr_ring is full. The order of the operations
+ * is ensured by barrier().
+ */
+ will_invalidate = __ptr_ring_will_invalidate(&q->ring);
+ if (unlikely(will_invalidate)) {
+ rcu_read_lock();
+ dev = rcu_dereference(q->tap)->dev;
+ txq = netdev_get_tx_queue(dev, q->queue_index);
+ stopped = netif_tx_queue_stopped(txq);
+ }
+ barrier();
+ __ptr_ring_discard_one(&q->ring, will_invalidate);
+
+ if (unlikely(will_invalidate)) {
+ if (stopped)
+ netif_tx_wake_queue(txq);
+ rcu_read_unlock();
+ }
+ spin_unlock(&q->ring.consumer_lock);
+
+ return ptr;
+}
+
static ssize_t tap_do_read(struct tap_queue *q,
struct iov_iter *to,
int noblock, struct sk_buff *skb)
@@ -774,7 +814,7 @@ static ssize_t tap_do_read(struct tap_queue *q,
TASK_INTERRUPTIBLE);
/* Read frames from the queue */
- skb = ptr_ring_consume(&q->ring);
+ skb = tap_ring_consume(q);
if (skb)
break;
if (noblock) {
@@ -1207,6 +1247,8 @@ int tap_queue_resize(struct tap_dev *tap)
ret = ptr_ring_resize_multiple_bh(rings, n,
dev->tx_queue_len, GFP_KERNEL,
__skb_array_destroy_skb);
+ if (netif_running(dev))
+ netif_tx_wake_all_queues(dev);
kfree(rings);
return ret;
diff --git a/drivers/net/tun.c b/drivers/net/tun.c
index c6b22af9bae8..682df8157b55 100644
--- a/drivers/net/tun.c
+++ b/drivers/net/tun.c
@@ -2114,13 +2114,53 @@ static ssize_t tun_put_user(struct tun_struct *tun,
return total;
}
+static void *tun_ring_consume(struct tun_file *tfile)
+{
+ struct netdev_queue *txq;
+ struct net_device *dev;
+ bool will_invalidate;
+ bool stopped;
+ void *ptr;
+
+ spin_lock(&tfile->tx_ring.consumer_lock);
+ ptr = __ptr_ring_peek(&tfile->tx_ring);
+ if (!ptr) {
+ spin_unlock(&tfile->tx_ring.consumer_lock);
+ return ptr;
+ }
+
+ /* Check if the queue stopped before zeroing out, so no ptr get
+ * produced in the meantime, because this could result in waking
+ * even though the ptr_ring is full. The order of the operations
+ * is ensured by barrier().
+ */
+ will_invalidate = __ptr_ring_will_invalidate(&tfile->tx_ring);
+ if (unlikely(will_invalidate)) {
+ rcu_read_lock();
+ dev = rcu_dereference(tfile->tun)->dev;
+ txq = netdev_get_tx_queue(dev, tfile->queue_index);
+ stopped = netif_tx_queue_stopped(txq);
+ }
+ barrier();
+ __ptr_ring_discard_one(&tfile->tx_ring, will_invalidate);
+
+ if (unlikely(will_invalidate)) {
+ if (stopped)
+ netif_tx_wake_queue(txq);
+ rcu_read_unlock();
+ }
+ spin_unlock(&tfile->tx_ring.consumer_lock);
+
+ return ptr;
+}
+
static void *tun_ring_recv(struct tun_file *tfile, int noblock, int *err)
{
DECLARE_WAITQUEUE(wait, current);
void *ptr = NULL;
int error = 0;
- ptr = ptr_ring_consume(&tfile->tx_ring);
+ ptr = tun_ring_consume(tfile);
if (ptr)
goto out;
if (noblock) {
@@ -2132,7 +2172,7 @@ static void *tun_ring_recv(struct tun_file *tfile, int noblock, int *err)
while (1) {
set_current_state(TASK_INTERRUPTIBLE);
- ptr = ptr_ring_consume(&tfile->tx_ring);
+ ptr = tun_ring_consume(tfile);
if (ptr)
break;
if (signal_pending(current)) {
@@ -3621,6 +3661,9 @@ static int tun_queue_resize(struct tun_struct *tun)
dev->tx_queue_len, GFP_KERNEL,
tun_ptr_free);
+ if (netif_running(dev))
+ netif_tx_wake_all_queues(dev);
+
kfree(rings);
return ret;
}
--
2.43.0
On Tue, Sep 23, 2025 at 12:15:49AM +0200, Simon Schippers wrote: > The new wrappers tun_ring_consume/tap_ring_consume deal with consuming an > entry of the ptr_ring and then waking the netdev queue when entries got > invalidated to be used again by the producer. > To avoid waking the netdev queue when the ptr_ring is full, it is checked > if the netdev queue is stopped before invalidating entries. Like that the > netdev queue can be safely woken after invalidating entries. > > The READ_ONCE in __ptr_ring_peek, paired with the smp_wmb() in > __ptr_ring_produce within tun_net_xmit guarantees that the information > about the netdev queue being stopped is visible after __ptr_ring_peek is > called. > > The netdev queue is also woken after resizing the ptr_ring. > > Co-developed-by: Tim Gebauer <tim.gebauer@tu-dortmund.de> > Signed-off-by: Tim Gebauer <tim.gebauer@tu-dortmund.de> > Signed-off-by: Simon Schippers <simon.schippers@tu-dortmund.de> > --- > drivers/net/tap.c | 44 +++++++++++++++++++++++++++++++++++++++++++- > drivers/net/tun.c | 47 +++++++++++++++++++++++++++++++++++++++++++++-- > 2 files changed, 88 insertions(+), 3 deletions(-) > > diff --git a/drivers/net/tap.c b/drivers/net/tap.c > index 1197f245e873..f8292721a9d6 100644 > --- a/drivers/net/tap.c > +++ b/drivers/net/tap.c > @@ -753,6 +753,46 @@ static ssize_t tap_put_user(struct tap_queue *q, > return ret ? ret : total; > } > > +static struct sk_buff *tap_ring_consume(struct tap_queue *q) > +{ > + struct netdev_queue *txq; > + struct net_device *dev; > + bool will_invalidate; > + bool stopped; > + void *ptr; > + > + spin_lock(&q->ring.consumer_lock); > + ptr = __ptr_ring_peek(&q->ring); > + if (!ptr) { > + spin_unlock(&q->ring.consumer_lock); > + return ptr; > + } > + > + /* Check if the queue stopped before zeroing out, so no ptr get > + * produced in the meantime, because this could result in waking > + * even though the ptr_ring is full. So what? Maybe it would be a bit suboptimal? But with your design, I do not get what prevents this: stopped? -> No ring is stopped discard and queue stays stopped forever > The order of the operations > + * is ensured by barrier(). > + */ > + will_invalidate = __ptr_ring_will_invalidate(&q->ring); > + if (unlikely(will_invalidate)) { > + rcu_read_lock(); > + dev = rcu_dereference(q->tap)->dev; > + txq = netdev_get_tx_queue(dev, q->queue_index); > + stopped = netif_tx_queue_stopped(txq); > + } > + barrier(); > + __ptr_ring_discard_one(&q->ring, will_invalidate); > + > + if (unlikely(will_invalidate)) { > + if (stopped) > + netif_tx_wake_queue(txq); > + rcu_read_unlock(); > + } After an entry is consumed, you can detect this by checking r->consumer_head >= r->consumer_tail so it seems you could keep calling regular ptr_ring_consume and check afterwards? > + spin_unlock(&q->ring.consumer_lock); > + > + return ptr; > +} > + > static ssize_t tap_do_read(struct tap_queue *q, > struct iov_iter *to, > int noblock, struct sk_buff *skb) > @@ -774,7 +814,7 @@ static ssize_t tap_do_read(struct tap_queue *q, > TASK_INTERRUPTIBLE); > > /* Read frames from the queue */ > - skb = ptr_ring_consume(&q->ring); > + skb = tap_ring_consume(q); > if (skb) > break; > if (noblock) { > @@ -1207,6 +1247,8 @@ int tap_queue_resize(struct tap_dev *tap) > ret = ptr_ring_resize_multiple_bh(rings, n, > dev->tx_queue_len, GFP_KERNEL, > __skb_array_destroy_skb); > + if (netif_running(dev)) > + netif_tx_wake_all_queues(dev); > > kfree(rings); > return ret; > diff --git a/drivers/net/tun.c b/drivers/net/tun.c > index c6b22af9bae8..682df8157b55 100644 > --- a/drivers/net/tun.c > +++ b/drivers/net/tun.c > @@ -2114,13 +2114,53 @@ static ssize_t tun_put_user(struct tun_struct *tun, > return total; > } > > +static void *tun_ring_consume(struct tun_file *tfile) > +{ > + struct netdev_queue *txq; > + struct net_device *dev; > + bool will_invalidate; > + bool stopped; > + void *ptr; > + > + spin_lock(&tfile->tx_ring.consumer_lock); > + ptr = __ptr_ring_peek(&tfile->tx_ring); > + if (!ptr) { > + spin_unlock(&tfile->tx_ring.consumer_lock); > + return ptr; > + } > + > + /* Check if the queue stopped before zeroing out, so no ptr get > + * produced in the meantime, because this could result in waking > + * even though the ptr_ring is full. The order of the operations > + * is ensured by barrier(). > + */ > + will_invalidate = __ptr_ring_will_invalidate(&tfile->tx_ring); > + if (unlikely(will_invalidate)) { > + rcu_read_lock(); > + dev = rcu_dereference(tfile->tun)->dev; > + txq = netdev_get_tx_queue(dev, tfile->queue_index); > + stopped = netif_tx_queue_stopped(txq); > + } > + barrier(); > + __ptr_ring_discard_one(&tfile->tx_ring, will_invalidate); > + > + if (unlikely(will_invalidate)) { > + if (stopped) > + netif_tx_wake_queue(txq); > + rcu_read_unlock(); > + } > + spin_unlock(&tfile->tx_ring.consumer_lock); > + > + return ptr; > +} > + > static void *tun_ring_recv(struct tun_file *tfile, int noblock, int *err) > { > DECLARE_WAITQUEUE(wait, current); > void *ptr = NULL; > int error = 0; > > - ptr = ptr_ring_consume(&tfile->tx_ring); > + ptr = tun_ring_consume(tfile); > if (ptr) > goto out; > if (noblock) { > @@ -2132,7 +2172,7 @@ static void *tun_ring_recv(struct tun_file *tfile, int noblock, int *err) > > while (1) { > set_current_state(TASK_INTERRUPTIBLE); > - ptr = ptr_ring_consume(&tfile->tx_ring); > + ptr = tun_ring_consume(tfile); > if (ptr) > break; > if (signal_pending(current)) { > @@ -3621,6 +3661,9 @@ static int tun_queue_resize(struct tun_struct *tun) > dev->tx_queue_len, GFP_KERNEL, > tun_ptr_free); > > + if (netif_running(dev)) > + netif_tx_wake_all_queues(dev); > + > kfree(rings); > return ret; > } > -- > 2.43.0
On 23.09.25 18:36, Michael S. Tsirkin wrote: > On Tue, Sep 23, 2025 at 12:15:49AM +0200, Simon Schippers wrote: >> The new wrappers tun_ring_consume/tap_ring_consume deal with consuming an >> entry of the ptr_ring and then waking the netdev queue when entries got >> invalidated to be used again by the producer. >> To avoid waking the netdev queue when the ptr_ring is full, it is checked >> if the netdev queue is stopped before invalidating entries. Like that the >> netdev queue can be safely woken after invalidating entries. >> >> The READ_ONCE in __ptr_ring_peek, paired with the smp_wmb() in >> __ptr_ring_produce within tun_net_xmit guarantees that the information >> about the netdev queue being stopped is visible after __ptr_ring_peek is >> called. >> >> The netdev queue is also woken after resizing the ptr_ring. >> >> Co-developed-by: Tim Gebauer <tim.gebauer@tu-dortmund.de> >> Signed-off-by: Tim Gebauer <tim.gebauer@tu-dortmund.de> >> Signed-off-by: Simon Schippers <simon.schippers@tu-dortmund.de> >> --- >> drivers/net/tap.c | 44 +++++++++++++++++++++++++++++++++++++++++++- >> drivers/net/tun.c | 47 +++++++++++++++++++++++++++++++++++++++++++++-- >> 2 files changed, 88 insertions(+), 3 deletions(-) >> >> diff --git a/drivers/net/tap.c b/drivers/net/tap.c >> index 1197f245e873..f8292721a9d6 100644 >> --- a/drivers/net/tap.c >> +++ b/drivers/net/tap.c >> @@ -753,6 +753,46 @@ static ssize_t tap_put_user(struct tap_queue *q, >> return ret ? ret : total; >> } >> >> +static struct sk_buff *tap_ring_consume(struct tap_queue *q) >> +{ >> + struct netdev_queue *txq; >> + struct net_device *dev; >> + bool will_invalidate; >> + bool stopped; >> + void *ptr; >> + >> + spin_lock(&q->ring.consumer_lock); >> + ptr = __ptr_ring_peek(&q->ring); >> + if (!ptr) { >> + spin_unlock(&q->ring.consumer_lock); >> + return ptr; >> + } >> + >> + /* Check if the queue stopped before zeroing out, so no ptr get >> + * produced in the meantime, because this could result in waking >> + * even though the ptr_ring is full. > > So what? Maybe it would be a bit suboptimal? But with your design, I do > not get what prevents this: > > > stopped? -> No > ring is stopped > discard > > and queue stays stopped forever > I think I found a solution to this problem, see below: > >> The order of the operations >> + * is ensured by barrier(). >> + */ >> + will_invalidate = __ptr_ring_will_invalidate(&q->ring); >> + if (unlikely(will_invalidate)) { >> + rcu_read_lock(); >> + dev = rcu_dereference(q->tap)->dev; >> + txq = netdev_get_tx_queue(dev, q->queue_index); >> + stopped = netif_tx_queue_stopped(txq); >> + } >> + barrier(); >> + __ptr_ring_discard_one(&q->ring, will_invalidate); >> + >> + if (unlikely(will_invalidate)) { Here I just check for if (will_invalidate || __ptr_ring_empty(&q->ring)) { instead because, if the ptr_ring is empty and the netdev queue stopped, the race must have occurred. Then it is safe to wake the netdev queue, because it is known that space in the ptr_ring was freed when the race occurred. Also, it is guaranteed that tap_ring_consume is called at least once after the race, because a new entry is generated by the producer at the race. In my adjusted implementation, it tests fine with pktgen without any lost packets. Generally now I think that the whole implementation can be fine without using spinlocks at all. I am currently adjusting the implementation regarding SMP memory barrier pairings, and I have a question: In the v4 you mentioned "the stop -> wake bounce involves enough barriers already". Does it, for instance, mean that netif_tx_wake_queue already ensures memory ordering, and I do not have to use an smp_wmb() in front of netif_tx_wake_queue() and smp_rmb() in front of the ptr_ring operations in tun_net_xmit? I dug through net/core/netdevice.h and dev.c but could not really answer this question by myself... Thanks :) >> + if (stopped) >> + netif_tx_wake_queue(txq); >> + rcu_read_unlock(); >> + } > > > After an entry is consumed, you can detect this by checking > > r->consumer_head >= r->consumer_tail > > > so it seems you could keep calling regular ptr_ring_consume > and check afterwards? > > > > >> + spin_unlock(&q->ring.consumer_lock); >> + >> + return ptr; >> +} >> + >> static ssize_t tap_do_read(struct tap_queue *q, >> struct iov_iter *to, >> int noblock, struct sk_buff *skb) >> @@ -774,7 +814,7 @@ static ssize_t tap_do_read(struct tap_queue *q, >> TASK_INTERRUPTIBLE); >> >> /* Read frames from the queue */ >> - skb = ptr_ring_consume(&q->ring); >> + skb = tap_ring_consume(q); >> if (skb) >> break; >> if (noblock) { >> @@ -1207,6 +1247,8 @@ int tap_queue_resize(struct tap_dev *tap) >> ret = ptr_ring_resize_multiple_bh(rings, n, >> dev->tx_queue_len, GFP_KERNEL, >> __skb_array_destroy_skb); >> + if (netif_running(dev)) >> + netif_tx_wake_all_queues(dev); >> >> kfree(rings); >> return ret; >> diff --git a/drivers/net/tun.c b/drivers/net/tun.c >> index c6b22af9bae8..682df8157b55 100644 >> --- a/drivers/net/tun.c >> +++ b/drivers/net/tun.c >> @@ -2114,13 +2114,53 @@ static ssize_t tun_put_user(struct tun_struct *tun, >> return total; >> } >> >> +static void *tun_ring_consume(struct tun_file *tfile) >> +{ >> + struct netdev_queue *txq; >> + struct net_device *dev; >> + bool will_invalidate; >> + bool stopped; >> + void *ptr; >> + >> + spin_lock(&tfile->tx_ring.consumer_lock); >> + ptr = __ptr_ring_peek(&tfile->tx_ring); >> + if (!ptr) { >> + spin_unlock(&tfile->tx_ring.consumer_lock); >> + return ptr; >> + } >> + >> + /* Check if the queue stopped before zeroing out, so no ptr get >> + * produced in the meantime, because this could result in waking >> + * even though the ptr_ring is full. The order of the operations >> + * is ensured by barrier(). >> + */ >> + will_invalidate = __ptr_ring_will_invalidate(&tfile->tx_ring); >> + if (unlikely(will_invalidate)) { >> + rcu_read_lock(); >> + dev = rcu_dereference(tfile->tun)->dev; >> + txq = netdev_get_tx_queue(dev, tfile->queue_index); >> + stopped = netif_tx_queue_stopped(txq); >> + } >> + barrier(); >> + __ptr_ring_discard_one(&tfile->tx_ring, will_invalidate); >> + >> + if (unlikely(will_invalidate)) { >> + if (stopped) >> + netif_tx_wake_queue(txq); >> + rcu_read_unlock(); >> + } >> + spin_unlock(&tfile->tx_ring.consumer_lock); >> + >> + return ptr; >> +} >> + >> static void *tun_ring_recv(struct tun_file *tfile, int noblock, int *err) >> { >> DECLARE_WAITQUEUE(wait, current); >> void *ptr = NULL; >> int error = 0; >> >> - ptr = ptr_ring_consume(&tfile->tx_ring); >> + ptr = tun_ring_consume(tfile); >> if (ptr) >> goto out; >> if (noblock) { >> @@ -2132,7 +2172,7 @@ static void *tun_ring_recv(struct tun_file *tfile, int noblock, int *err) >> >> while (1) { >> set_current_state(TASK_INTERRUPTIBLE); >> - ptr = ptr_ring_consume(&tfile->tx_ring); >> + ptr = tun_ring_consume(tfile); >> if (ptr) >> break; >> if (signal_pending(current)) { >> @@ -3621,6 +3661,9 @@ static int tun_queue_resize(struct tun_struct *tun) >> dev->tx_queue_len, GFP_KERNEL, >> tun_ptr_free); >> >> + if (netif_running(dev)) >> + netif_tx_wake_all_queues(dev); >> + >> kfree(rings); >> return ret; >> } >> -- >> 2.43.0 >
On Sun, Sep 28, 2025 at 11:27:25PM +0200, Simon Schippers wrote: > On 23.09.25 18:36, Michael S. Tsirkin wrote: > > On Tue, Sep 23, 2025 at 12:15:49AM +0200, Simon Schippers wrote: > >> The new wrappers tun_ring_consume/tap_ring_consume deal with consuming an > >> entry of the ptr_ring and then waking the netdev queue when entries got > >> invalidated to be used again by the producer. > >> To avoid waking the netdev queue when the ptr_ring is full, it is checked > >> if the netdev queue is stopped before invalidating entries. Like that the > >> netdev queue can be safely woken after invalidating entries. > >> > >> The READ_ONCE in __ptr_ring_peek, paired with the smp_wmb() in > >> __ptr_ring_produce within tun_net_xmit guarantees that the information > >> about the netdev queue being stopped is visible after __ptr_ring_peek is > >> called. > >> > >> The netdev queue is also woken after resizing the ptr_ring. > >> > >> Co-developed-by: Tim Gebauer <tim.gebauer@tu-dortmund.de> > >> Signed-off-by: Tim Gebauer <tim.gebauer@tu-dortmund.de> > >> Signed-off-by: Simon Schippers <simon.schippers@tu-dortmund.de> > >> --- > >> drivers/net/tap.c | 44 +++++++++++++++++++++++++++++++++++++++++++- > >> drivers/net/tun.c | 47 +++++++++++++++++++++++++++++++++++++++++++++-- > >> 2 files changed, 88 insertions(+), 3 deletions(-) > >> > >> diff --git a/drivers/net/tap.c b/drivers/net/tap.c > >> index 1197f245e873..f8292721a9d6 100644 > >> --- a/drivers/net/tap.c > >> +++ b/drivers/net/tap.c > >> @@ -753,6 +753,46 @@ static ssize_t tap_put_user(struct tap_queue *q, > >> return ret ? ret : total; > >> } > >> > >> +static struct sk_buff *tap_ring_consume(struct tap_queue *q) > >> +{ > >> + struct netdev_queue *txq; > >> + struct net_device *dev; > >> + bool will_invalidate; > >> + bool stopped; > >> + void *ptr; > >> + > >> + spin_lock(&q->ring.consumer_lock); > >> + ptr = __ptr_ring_peek(&q->ring); > >> + if (!ptr) { > >> + spin_unlock(&q->ring.consumer_lock); > >> + return ptr; > >> + } > >> + > >> + /* Check if the queue stopped before zeroing out, so no ptr get > >> + * produced in the meantime, because this could result in waking > >> + * even though the ptr_ring is full. > > > > So what? Maybe it would be a bit suboptimal? But with your design, I do > > not get what prevents this: > > > > > > stopped? -> No > > ring is stopped > > discard > > > > and queue stays stopped forever > > > > I think I found a solution to this problem, see below: > > > > >> The order of the operations > >> + * is ensured by barrier(). > >> + */ > >> + will_invalidate = __ptr_ring_will_invalidate(&q->ring); > >> + if (unlikely(will_invalidate)) { > >> + rcu_read_lock(); > >> + dev = rcu_dereference(q->tap)->dev; > >> + txq = netdev_get_tx_queue(dev, q->queue_index); > >> + stopped = netif_tx_queue_stopped(txq); > >> + } > >> + barrier(); > >> + __ptr_ring_discard_one(&q->ring, will_invalidate); > >> + > >> + if (unlikely(will_invalidate)) { > > Here I just check for > > if (will_invalidate || __ptr_ring_empty(&q->ring)) { > > instead because, if the ptr_ring is empty and the netdev queue stopped, > the race must have occurred. Then it is safe to wake the netdev queue, > because it is known that space in the ptr_ring was freed when the race > occurred. Also, it is guaranteed that tap_ring_consume is called at least > once after the race, because a new entry is generated by the producer at > the race. > In my adjusted implementation, it tests fine with pktgen without any lost > packets. what if it is not empty and ring is stopped? > > Generally now I think that the whole implementation can be fine without > using spinlocks at all. I am currently adjusting the implementation > regarding SMP memory barrier pairings, and I have a question: > In the v4 you mentioned "the stop -> wake bounce involves enough barriers > already". Does it, for instance, mean that netif_tx_wake_queue already > ensures memory ordering, and I do not have to use an smp_wmb() in front of > netif_tx_wake_queue() and smp_rmb() in front of the ptr_ring operations > in tun_net_xmit? > I dug through net/core/netdevice.h and dev.c but could not really > answer this question by myself... > Thanks :) Only if it wakes up something, I think. Read: SLEEP AND WAKE-UP FUNCTIONS in Documentation/memory-barriers.txt IIUC this is the same. > > >> + if (stopped) > >> + netif_tx_wake_queue(txq); > >> + rcu_read_unlock(); > >> + } > > > > > > After an entry is consumed, you can detect this by checking > > > > r->consumer_head >= r->consumer_tail > > > > > > so it seems you could keep calling regular ptr_ring_consume > > and check afterwards? > > > > > > > > > >> + spin_unlock(&q->ring.consumer_lock); > >> + > >> + return ptr; > >> +} > >> + > >> static ssize_t tap_do_read(struct tap_queue *q, > >> struct iov_iter *to, > >> int noblock, struct sk_buff *skb) > >> @@ -774,7 +814,7 @@ static ssize_t tap_do_read(struct tap_queue *q, > >> TASK_INTERRUPTIBLE); > >> > >> /* Read frames from the queue */ > >> - skb = ptr_ring_consume(&q->ring); > >> + skb = tap_ring_consume(q); > >> if (skb) > >> break; > >> if (noblock) { > >> @@ -1207,6 +1247,8 @@ int tap_queue_resize(struct tap_dev *tap) > >> ret = ptr_ring_resize_multiple_bh(rings, n, > >> dev->tx_queue_len, GFP_KERNEL, > >> __skb_array_destroy_skb); > >> + if (netif_running(dev)) > >> + netif_tx_wake_all_queues(dev); > >> > >> kfree(rings); > >> return ret; > >> diff --git a/drivers/net/tun.c b/drivers/net/tun.c > >> index c6b22af9bae8..682df8157b55 100644 > >> --- a/drivers/net/tun.c > >> +++ b/drivers/net/tun.c > >> @@ -2114,13 +2114,53 @@ static ssize_t tun_put_user(struct tun_struct *tun, > >> return total; > >> } > >> > >> +static void *tun_ring_consume(struct tun_file *tfile) > >> +{ > >> + struct netdev_queue *txq; > >> + struct net_device *dev; > >> + bool will_invalidate; > >> + bool stopped; > >> + void *ptr; > >> + > >> + spin_lock(&tfile->tx_ring.consumer_lock); > >> + ptr = __ptr_ring_peek(&tfile->tx_ring); > >> + if (!ptr) { > >> + spin_unlock(&tfile->tx_ring.consumer_lock); > >> + return ptr; > >> + } > >> + > >> + /* Check if the queue stopped before zeroing out, so no ptr get > >> + * produced in the meantime, because this could result in waking > >> + * even though the ptr_ring is full. The order of the operations > >> + * is ensured by barrier(). > >> + */ > >> + will_invalidate = __ptr_ring_will_invalidate(&tfile->tx_ring); > >> + if (unlikely(will_invalidate)) { > >> + rcu_read_lock(); > >> + dev = rcu_dereference(tfile->tun)->dev; > >> + txq = netdev_get_tx_queue(dev, tfile->queue_index); > >> + stopped = netif_tx_queue_stopped(txq); > >> + } > >> + barrier(); > >> + __ptr_ring_discard_one(&tfile->tx_ring, will_invalidate); > >> + > >> + if (unlikely(will_invalidate)) { > >> + if (stopped) > >> + netif_tx_wake_queue(txq); > >> + rcu_read_unlock(); > >> + } > >> + spin_unlock(&tfile->tx_ring.consumer_lock); > >> + > >> + return ptr; > >> +} > >> + > >> static void *tun_ring_recv(struct tun_file *tfile, int noblock, int *err) > >> { > >> DECLARE_WAITQUEUE(wait, current); > >> void *ptr = NULL; > >> int error = 0; > >> > >> - ptr = ptr_ring_consume(&tfile->tx_ring); > >> + ptr = tun_ring_consume(tfile); > >> if (ptr) > >> goto out; > >> if (noblock) { > >> @@ -2132,7 +2172,7 @@ static void *tun_ring_recv(struct tun_file *tfile, int noblock, int *err) > >> > >> while (1) { > >> set_current_state(TASK_INTERRUPTIBLE); > >> - ptr = ptr_ring_consume(&tfile->tx_ring); > >> + ptr = tun_ring_consume(tfile); > >> if (ptr) > >> break; > >> if (signal_pending(current)) { > >> @@ -3621,6 +3661,9 @@ static int tun_queue_resize(struct tun_struct *tun) > >> dev->tx_queue_len, GFP_KERNEL, > >> tun_ptr_free); > >> > >> + if (netif_running(dev)) > >> + netif_tx_wake_all_queues(dev); > >> + > >> kfree(rings); > >> return ret; > >> } > >> -- > >> 2.43.0 > >
On 29.09.25 00:33, Michael S. Tsirkin wrote: > On Sun, Sep 28, 2025 at 11:27:25PM +0200, Simon Schippers wrote: >> On 23.09.25 18:36, Michael S. Tsirkin wrote: >>> On Tue, Sep 23, 2025 at 12:15:49AM +0200, Simon Schippers wrote: >>>> The new wrappers tun_ring_consume/tap_ring_consume deal with consuming an >>>> entry of the ptr_ring and then waking the netdev queue when entries got >>>> invalidated to be used again by the producer. >>>> To avoid waking the netdev queue when the ptr_ring is full, it is checked >>>> if the netdev queue is stopped before invalidating entries. Like that the >>>> netdev queue can be safely woken after invalidating entries. >>>> >>>> The READ_ONCE in __ptr_ring_peek, paired with the smp_wmb() in >>>> __ptr_ring_produce within tun_net_xmit guarantees that the information >>>> about the netdev queue being stopped is visible after __ptr_ring_peek is >>>> called. >>>> >>>> The netdev queue is also woken after resizing the ptr_ring. >>>> >>>> Co-developed-by: Tim Gebauer <tim.gebauer@tu-dortmund.de> >>>> Signed-off-by: Tim Gebauer <tim.gebauer@tu-dortmund.de> >>>> Signed-off-by: Simon Schippers <simon.schippers@tu-dortmund.de> >>>> --- >>>> drivers/net/tap.c | 44 +++++++++++++++++++++++++++++++++++++++++++- >>>> drivers/net/tun.c | 47 +++++++++++++++++++++++++++++++++++++++++++++-- >>>> 2 files changed, 88 insertions(+), 3 deletions(-) >>>> >>>> diff --git a/drivers/net/tap.c b/drivers/net/tap.c >>>> index 1197f245e873..f8292721a9d6 100644 >>>> --- a/drivers/net/tap.c >>>> +++ b/drivers/net/tap.c >>>> @@ -753,6 +753,46 @@ static ssize_t tap_put_user(struct tap_queue *q, >>>> return ret ? ret : total; >>>> } >>>> >>>> +static struct sk_buff *tap_ring_consume(struct tap_queue *q) >>>> +{ >>>> + struct netdev_queue *txq; >>>> + struct net_device *dev; >>>> + bool will_invalidate; >>>> + bool stopped; >>>> + void *ptr; >>>> + >>>> + spin_lock(&q->ring.consumer_lock); >>>> + ptr = __ptr_ring_peek(&q->ring); >>>> + if (!ptr) { >>>> + spin_unlock(&q->ring.consumer_lock); >>>> + return ptr; >>>> + } >>>> + >>>> + /* Check if the queue stopped before zeroing out, so no ptr get >>>> + * produced in the meantime, because this could result in waking >>>> + * even though the ptr_ring is full. >>> >>> So what? Maybe it would be a bit suboptimal? But with your design, I do >>> not get what prevents this: >>> >>> >>> stopped? -> No >>> ring is stopped >>> discard >>> >>> and queue stays stopped forever >>> >> >> I think I found a solution to this problem, see below: >> >>> >>>> The order of the operations >>>> + * is ensured by barrier(). >>>> + */ >>>> + will_invalidate = __ptr_ring_will_invalidate(&q->ring); >>>> + if (unlikely(will_invalidate)) { >>>> + rcu_read_lock(); >>>> + dev = rcu_dereference(q->tap)->dev; >>>> + txq = netdev_get_tx_queue(dev, q->queue_index); >>>> + stopped = netif_tx_queue_stopped(txq); >>>> + } >>>> + barrier(); >>>> + __ptr_ring_discard_one(&q->ring, will_invalidate); >>>> + >>>> + if (unlikely(will_invalidate)) { >> >> Here I just check for >> >> if (will_invalidate || __ptr_ring_empty(&q->ring)) { >> >> instead because, if the ptr_ring is empty and the netdev queue stopped, >> the race must have occurred. Then it is safe to wake the netdev queue, >> because it is known that space in the ptr_ring was freed when the race >> occurred. Also, it is guaranteed that tap_ring_consume is called at least >> once after the race, because a new entry is generated by the producer at >> the race. >> In my adjusted implementation, it tests fine with pktgen without any lost >> packets. > > > what if it is not empty and ring is stopped? > Then it can not be assumed that there is free space in the ptr_ring, because __ptr_ring_discard_one may only create space after one of the upcoming entries that it will consume. Only if the ptr_ring is empty (which will obviously happen after some time) it is guaranteed that there is free space in the ptr_ring, either because the race occurred previously or __ptr_ring_discard_one freed entries right before. >> >> Generally now I think that the whole implementation can be fine without >> using spinlocks at all. I am currently adjusting the implementation >> regarding SMP memory barrier pairings, and I have a question: >> In the v4 you mentioned "the stop -> wake bounce involves enough barriers >> already". Does it, for instance, mean that netif_tx_wake_queue already >> ensures memory ordering, and I do not have to use an smp_wmb() in front of >> netif_tx_wake_queue() and smp_rmb() in front of the ptr_ring operations >> in tun_net_xmit? >> I dug through net/core/netdevice.h and dev.c but could not really >> answer this question by myself... >> Thanks :) > > Only if it wakes up something, I think. > > Read: > > SLEEP AND WAKE-UP FUNCTIONS > > > in Documentation/memory-barriers.txt > > > IIUC this is the same. > > Thanks, I will look into it! :) >> >>>> + if (stopped) >>>> + netif_tx_wake_queue(txq); >>>> + rcu_read_unlock(); >>>> + } >>> >>> >>> After an entry is consumed, you can detect this by checking >>> >>> r->consumer_head >= r->consumer_tail >>> >>> >>> so it seems you could keep calling regular ptr_ring_consume >>> and check afterwards? >>> >>> >>> >>> >>>> + spin_unlock(&q->ring.consumer_lock); >>>> + >>>> + return ptr; >>>> +} >>>> + >>>> static ssize_t tap_do_read(struct tap_queue *q, >>>> struct iov_iter *to, >>>> int noblock, struct sk_buff *skb) >>>> @@ -774,7 +814,7 @@ static ssize_t tap_do_read(struct tap_queue *q, >>>> TASK_INTERRUPTIBLE); >>>> >>>> /* Read frames from the queue */ >>>> - skb = ptr_ring_consume(&q->ring); >>>> + skb = tap_ring_consume(q); >>>> if (skb) >>>> break; >>>> if (noblock) { >>>> @@ -1207,6 +1247,8 @@ int tap_queue_resize(struct tap_dev *tap) >>>> ret = ptr_ring_resize_multiple_bh(rings, n, >>>> dev->tx_queue_len, GFP_KERNEL, >>>> __skb_array_destroy_skb); >>>> + if (netif_running(dev)) >>>> + netif_tx_wake_all_queues(dev); >>>> >>>> kfree(rings); >>>> return ret; >>>> diff --git a/drivers/net/tun.c b/drivers/net/tun.c >>>> index c6b22af9bae8..682df8157b55 100644 >>>> --- a/drivers/net/tun.c >>>> +++ b/drivers/net/tun.c >>>> @@ -2114,13 +2114,53 @@ static ssize_t tun_put_user(struct tun_struct *tun, >>>> return total; >>>> } >>>> >>>> +static void *tun_ring_consume(struct tun_file *tfile) >>>> +{ >>>> + struct netdev_queue *txq; >>>> + struct net_device *dev; >>>> + bool will_invalidate; >>>> + bool stopped; >>>> + void *ptr; >>>> + >>>> + spin_lock(&tfile->tx_ring.consumer_lock); >>>> + ptr = __ptr_ring_peek(&tfile->tx_ring); >>>> + if (!ptr) { >>>> + spin_unlock(&tfile->tx_ring.consumer_lock); >>>> + return ptr; >>>> + } >>>> + >>>> + /* Check if the queue stopped before zeroing out, so no ptr get >>>> + * produced in the meantime, because this could result in waking >>>> + * even though the ptr_ring is full. The order of the operations >>>> + * is ensured by barrier(). >>>> + */ >>>> + will_invalidate = __ptr_ring_will_invalidate(&tfile->tx_ring); >>>> + if (unlikely(will_invalidate)) { >>>> + rcu_read_lock(); >>>> + dev = rcu_dereference(tfile->tun)->dev; >>>> + txq = netdev_get_tx_queue(dev, tfile->queue_index); >>>> + stopped = netif_tx_queue_stopped(txq); >>>> + } >>>> + barrier(); >>>> + __ptr_ring_discard_one(&tfile->tx_ring, will_invalidate); >>>> + >>>> + if (unlikely(will_invalidate)) { >>>> + if (stopped) >>>> + netif_tx_wake_queue(txq); >>>> + rcu_read_unlock(); >>>> + } >>>> + spin_unlock(&tfile->tx_ring.consumer_lock); >>>> + >>>> + return ptr; >>>> +} >>>> + >>>> static void *tun_ring_recv(struct tun_file *tfile, int noblock, int *err) >>>> { >>>> DECLARE_WAITQUEUE(wait, current); >>>> void *ptr = NULL; >>>> int error = 0; >>>> >>>> - ptr = ptr_ring_consume(&tfile->tx_ring); >>>> + ptr = tun_ring_consume(tfile); >>>> if (ptr) >>>> goto out; >>>> if (noblock) { >>>> @@ -2132,7 +2172,7 @@ static void *tun_ring_recv(struct tun_file *tfile, int noblock, int *err) >>>> >>>> while (1) { >>>> set_current_state(TASK_INTERRUPTIBLE); >>>> - ptr = ptr_ring_consume(&tfile->tx_ring); >>>> + ptr = tun_ring_consume(tfile); >>>> if (ptr) >>>> break; >>>> if (signal_pending(current)) { >>>> @@ -3621,6 +3661,9 @@ static int tun_queue_resize(struct tun_struct *tun) >>>> dev->tx_queue_len, GFP_KERNEL, >>>> tun_ptr_free); >>>> >>>> + if (netif_running(dev)) >>>> + netif_tx_wake_all_queues(dev); >>>> + >>>> kfree(rings); >>>> return ret; >>>> } >>>> -- >>>> 2.43.0 >>> >
On 23.09.25 18:36, Michael S. Tsirkin wrote: > On Tue, Sep 23, 2025 at 12:15:49AM +0200, Simon Schippers wrote: >> The new wrappers tun_ring_consume/tap_ring_consume deal with consuming an >> entry of the ptr_ring and then waking the netdev queue when entries got >> invalidated to be used again by the producer. >> To avoid waking the netdev queue when the ptr_ring is full, it is checked >> if the netdev queue is stopped before invalidating entries. Like that the >> netdev queue can be safely woken after invalidating entries. >> >> The READ_ONCE in __ptr_ring_peek, paired with the smp_wmb() in >> __ptr_ring_produce within tun_net_xmit guarantees that the information >> about the netdev queue being stopped is visible after __ptr_ring_peek is >> called. >> >> The netdev queue is also woken after resizing the ptr_ring. >> >> Co-developed-by: Tim Gebauer <tim.gebauer@tu-dortmund.de> >> Signed-off-by: Tim Gebauer <tim.gebauer@tu-dortmund.de> >> Signed-off-by: Simon Schippers <simon.schippers@tu-dortmund.de> >> --- >> drivers/net/tap.c | 44 +++++++++++++++++++++++++++++++++++++++++++- >> drivers/net/tun.c | 47 +++++++++++++++++++++++++++++++++++++++++++++-- >> 2 files changed, 88 insertions(+), 3 deletions(-) >> >> diff --git a/drivers/net/tap.c b/drivers/net/tap.c >> index 1197f245e873..f8292721a9d6 100644 >> --- a/drivers/net/tap.c >> +++ b/drivers/net/tap.c >> @@ -753,6 +753,46 @@ static ssize_t tap_put_user(struct tap_queue *q, >> return ret ? ret : total; >> } >> >> +static struct sk_buff *tap_ring_consume(struct tap_queue *q) >> +{ >> + struct netdev_queue *txq; >> + struct net_device *dev; >> + bool will_invalidate; >> + bool stopped; >> + void *ptr; >> + >> + spin_lock(&q->ring.consumer_lock); >> + ptr = __ptr_ring_peek(&q->ring); >> + if (!ptr) { >> + spin_unlock(&q->ring.consumer_lock); >> + return ptr; >> + } >> + >> + /* Check if the queue stopped before zeroing out, so no ptr get >> + * produced in the meantime, because this could result in waking >> + * even though the ptr_ring is full. > > So what? Maybe it would be a bit suboptimal? But with your design, I do > not get what prevents this: > > > stopped? -> No > ring is stopped > discard > > and queue stays stopped forever > > I totally missed this (but I am not sure why it did not happen in my testing with different ptr_ring sizes..). I guess you are right, there must be some type of locking. It probably makes sense to lock the netdev txq->_xmit_lock whenever the consumer invalidates old ptr_ring entries (so when r->consumer_head >= r->consumer_tail). The producer holds this lock with dev->lltx=false. Then the consumer is able to wake the queue safely. So I would now just change the implementation to: tun_net_xmit: ... if ptr_ring_produce // Could happen because of unproduce in vhost_net.. netif_tx_stop_queue ... goto drop if ptr_ring_full netif_tx_stop_queue ... tun_ring_recv/tap_do_read (the implementation for the batched methods would be done in the similar way): ... ptr_ring_consume if r->consumer_head >= r->consumer_tail __netif_tx_lock_bh netif_tx_wake_queue __netif_tx_unlock_bh This implementation does not need any new ptr_ring helpers and no fancy ordering tricks. Would this implementation be sufficient in your opinion? >> The order of the operations >> + * is ensured by barrier(). >> + */ >> + will_invalidate = __ptr_ring_will_invalidate(&q->ring); >> + if (unlikely(will_invalidate)) { >> + rcu_read_lock(); >> + dev = rcu_dereference(q->tap)->dev; >> + txq = netdev_get_tx_queue(dev, q->queue_index); >> + stopped = netif_tx_queue_stopped(txq); >> + } >> + barrier(); >> + __ptr_ring_discard_one(&q->ring, will_invalidate); >> + >> + if (unlikely(will_invalidate)) { >> + if (stopped) >> + netif_tx_wake_queue(txq); >> + rcu_read_unlock(); >> + } > > > After an entry is consumed, you can detect this by checking > > r->consumer_head >= r->consumer_tail > > > so it seems you could keep calling regular ptr_ring_consume > and check afterwards? > > > > >> + spin_unlock(&q->ring.consumer_lock); >> + >> + return ptr; >> +} >> + >> static ssize_t tap_do_read(struct tap_queue *q, >> struct iov_iter *to, >> int noblock, struct sk_buff *skb) >> @@ -774,7 +814,7 @@ static ssize_t tap_do_read(struct tap_queue *q, >> TASK_INTERRUPTIBLE); >> >> /* Read frames from the queue */ >> - skb = ptr_ring_consume(&q->ring); >> + skb = tap_ring_consume(q); >> if (skb) >> break; >> if (noblock) { >> @@ -1207,6 +1247,8 @@ int tap_queue_resize(struct tap_dev *tap) >> ret = ptr_ring_resize_multiple_bh(rings, n, >> dev->tx_queue_len, GFP_KERNEL, >> __skb_array_destroy_skb); >> + if (netif_running(dev)) >> + netif_tx_wake_all_queues(dev); >> >> kfree(rings); >> return ret; >> diff --git a/drivers/net/tun.c b/drivers/net/tun.c >> index c6b22af9bae8..682df8157b55 100644 >> --- a/drivers/net/tun.c >> +++ b/drivers/net/tun.c >> @@ -2114,13 +2114,53 @@ static ssize_t tun_put_user(struct tun_struct *tun, >> return total; >> } >> >> +static void *tun_ring_consume(struct tun_file *tfile) >> +{ >> + struct netdev_queue *txq; >> + struct net_device *dev; >> + bool will_invalidate; >> + bool stopped; >> + void *ptr; >> + >> + spin_lock(&tfile->tx_ring.consumer_lock); >> + ptr = __ptr_ring_peek(&tfile->tx_ring); >> + if (!ptr) { >> + spin_unlock(&tfile->tx_ring.consumer_lock); >> + return ptr; >> + } >> + >> + /* Check if the queue stopped before zeroing out, so no ptr get >> + * produced in the meantime, because this could result in waking >> + * even though the ptr_ring is full. The order of the operations >> + * is ensured by barrier(). >> + */ >> + will_invalidate = __ptr_ring_will_invalidate(&tfile->tx_ring); >> + if (unlikely(will_invalidate)) { >> + rcu_read_lock(); >> + dev = rcu_dereference(tfile->tun)->dev; >> + txq = netdev_get_tx_queue(dev, tfile->queue_index); >> + stopped = netif_tx_queue_stopped(txq); >> + } >> + barrier(); >> + __ptr_ring_discard_one(&tfile->tx_ring, will_invalidate); >> + >> + if (unlikely(will_invalidate)) { >> + if (stopped) >> + netif_tx_wake_queue(txq); >> + rcu_read_unlock(); >> + } >> + spin_unlock(&tfile->tx_ring.consumer_lock); >> + >> + return ptr; >> +} >> + >> static void *tun_ring_recv(struct tun_file *tfile, int noblock, int *err) >> { >> DECLARE_WAITQUEUE(wait, current); >> void *ptr = NULL; >> int error = 0; >> >> - ptr = ptr_ring_consume(&tfile->tx_ring); >> + ptr = tun_ring_consume(tfile); >> if (ptr) >> goto out; >> if (noblock) { >> @@ -2132,7 +2172,7 @@ static void *tun_ring_recv(struct tun_file *tfile, int noblock, int *err) >> >> while (1) { >> set_current_state(TASK_INTERRUPTIBLE); >> - ptr = ptr_ring_consume(&tfile->tx_ring); >> + ptr = tun_ring_consume(tfile); >> if (ptr) >> break; >> if (signal_pending(current)) { >> @@ -3621,6 +3661,9 @@ static int tun_queue_resize(struct tun_struct *tun) >> dev->tx_queue_len, GFP_KERNEL, >> tun_ptr_free); >> >> + if (netif_running(dev)) >> + netif_tx_wake_all_queues(dev); >> + >> kfree(rings); >> return ret; >> } >> -- >> 2.43.0 >
On Wed, Sep 24, 2025 at 07:56:33AM +0200, Simon Schippers wrote: > On 23.09.25 18:36, Michael S. Tsirkin wrote: > > On Tue, Sep 23, 2025 at 12:15:49AM +0200, Simon Schippers wrote: > >> The new wrappers tun_ring_consume/tap_ring_consume deal with consuming an > >> entry of the ptr_ring and then waking the netdev queue when entries got > >> invalidated to be used again by the producer. > >> To avoid waking the netdev queue when the ptr_ring is full, it is checked > >> if the netdev queue is stopped before invalidating entries. Like that the > >> netdev queue can be safely woken after invalidating entries. > >> > >> The READ_ONCE in __ptr_ring_peek, paired with the smp_wmb() in > >> __ptr_ring_produce within tun_net_xmit guarantees that the information > >> about the netdev queue being stopped is visible after __ptr_ring_peek is > >> called. > >> > >> The netdev queue is also woken after resizing the ptr_ring. > >> > >> Co-developed-by: Tim Gebauer <tim.gebauer@tu-dortmund.de> > >> Signed-off-by: Tim Gebauer <tim.gebauer@tu-dortmund.de> > >> Signed-off-by: Simon Schippers <simon.schippers@tu-dortmund.de> > >> --- > >> drivers/net/tap.c | 44 +++++++++++++++++++++++++++++++++++++++++++- > >> drivers/net/tun.c | 47 +++++++++++++++++++++++++++++++++++++++++++++-- > >> 2 files changed, 88 insertions(+), 3 deletions(-) > >> > >> diff --git a/drivers/net/tap.c b/drivers/net/tap.c > >> index 1197f245e873..f8292721a9d6 100644 > >> --- a/drivers/net/tap.c > >> +++ b/drivers/net/tap.c > >> @@ -753,6 +753,46 @@ static ssize_t tap_put_user(struct tap_queue *q, > >> return ret ? ret : total; > >> } > >> > >> +static struct sk_buff *tap_ring_consume(struct tap_queue *q) > >> +{ > >> + struct netdev_queue *txq; > >> + struct net_device *dev; > >> + bool will_invalidate; > >> + bool stopped; > >> + void *ptr; > >> + > >> + spin_lock(&q->ring.consumer_lock); > >> + ptr = __ptr_ring_peek(&q->ring); > >> + if (!ptr) { > >> + spin_unlock(&q->ring.consumer_lock); > >> + return ptr; > >> + } > >> + > >> + /* Check if the queue stopped before zeroing out, so no ptr get > >> + * produced in the meantime, because this could result in waking > >> + * even though the ptr_ring is full. > > > > So what? Maybe it would be a bit suboptimal? But with your design, I do > > not get what prevents this: > > > > > > stopped? -> No > > ring is stopped > > discard > > > > and queue stays stopped forever > > > > > > I totally missed this (but I am not sure why it did not happen in my > testing with different ptr_ring sizes..). > > I guess you are right, there must be some type of locking. > It probably makes sense to lock the netdev txq->_xmit_lock whenever the > consumer invalidates old ptr_ring entries (so when r->consumer_head >= > r->consumer_tail). The producer holds this lock with dev->lltx=false. Then > the consumer is able to wake the queue safely. > > So I would now just change the implementation to: > tun_net_xmit: > ... > if ptr_ring_produce > // Could happen because of unproduce in vhost_net.. > netif_tx_stop_queue > ... > goto drop > > if ptr_ring_full > netif_tx_stop_queue > ... > > tun_ring_recv/tap_do_read (the implementation for the batched methods > would be done in the similar way): > ... > ptr_ring_consume > if r->consumer_head >= r->consumer_tail > __netif_tx_lock_bh > netif_tx_wake_queue > __netif_tx_unlock_bh > > This implementation does not need any new ptr_ring helpers and no fancy > ordering tricks. > Would this implementation be sufficient in your opinion? Maybe you mean == ? Pls don't poke at ptr ring internals though. What are we testing for here? I think the point is that a batch of entries was consumed? Maybe __ptr_ring_consumed_batch ? and a comment explaining this returns true when last successful call to consume freed up a batch of space in the ring for producer to make progress. consumer_head == consumer_tail also happens rather a lot, though thankfully not on every entry. So taking tx lock each time this happens, even if queue is not stopped, seems heavyweight. > >> The order of the operations > >> + * is ensured by barrier(). > >> + */ > >> + will_invalidate = __ptr_ring_will_invalidate(&q->ring); > >> + if (unlikely(will_invalidate)) { > >> + rcu_read_lock(); > >> + dev = rcu_dereference(q->tap)->dev; > >> + txq = netdev_get_tx_queue(dev, q->queue_index); > >> + stopped = netif_tx_queue_stopped(txq); > >> + } > >> + barrier(); > >> + __ptr_ring_discard_one(&q->ring, will_invalidate); > >> + > >> + if (unlikely(will_invalidate)) { > >> + if (stopped) > >> + netif_tx_wake_queue(txq); > >> + rcu_read_unlock(); > >> + } > > > > > > After an entry is consumed, you can detect this by checking > > > > r->consumer_head >= r->consumer_tail > > > > > > so it seems you could keep calling regular ptr_ring_consume > > and check afterwards? > > > > > > > > > >> + spin_unlock(&q->ring.consumer_lock); > >> + > >> + return ptr; > >> +} > >> + > >> static ssize_t tap_do_read(struct tap_queue *q, > >> struct iov_iter *to, > >> int noblock, struct sk_buff *skb) > >> @@ -774,7 +814,7 @@ static ssize_t tap_do_read(struct tap_queue *q, > >> TASK_INTERRUPTIBLE); > >> > >> /* Read frames from the queue */ > >> - skb = ptr_ring_consume(&q->ring); > >> + skb = tap_ring_consume(q); > >> if (skb) > >> break; > >> if (noblock) { > >> @@ -1207,6 +1247,8 @@ int tap_queue_resize(struct tap_dev *tap) > >> ret = ptr_ring_resize_multiple_bh(rings, n, > >> dev->tx_queue_len, GFP_KERNEL, > >> __skb_array_destroy_skb); > >> + if (netif_running(dev)) > >> + netif_tx_wake_all_queues(dev); > >> > >> kfree(rings); > >> return ret; > >> diff --git a/drivers/net/tun.c b/drivers/net/tun.c > >> index c6b22af9bae8..682df8157b55 100644 > >> --- a/drivers/net/tun.c > >> +++ b/drivers/net/tun.c > >> @@ -2114,13 +2114,53 @@ static ssize_t tun_put_user(struct tun_struct *tun, > >> return total; > >> } > >> > >> +static void *tun_ring_consume(struct tun_file *tfile) > >> +{ > >> + struct netdev_queue *txq; > >> + struct net_device *dev; > >> + bool will_invalidate; > >> + bool stopped; > >> + void *ptr; > >> + > >> + spin_lock(&tfile->tx_ring.consumer_lock); > >> + ptr = __ptr_ring_peek(&tfile->tx_ring); > >> + if (!ptr) { > >> + spin_unlock(&tfile->tx_ring.consumer_lock); > >> + return ptr; > >> + } > >> + > >> + /* Check if the queue stopped before zeroing out, so no ptr get > >> + * produced in the meantime, because this could result in waking > >> + * even though the ptr_ring is full. The order of the operations > >> + * is ensured by barrier(). > >> + */ > >> + will_invalidate = __ptr_ring_will_invalidate(&tfile->tx_ring); > >> + if (unlikely(will_invalidate)) { > >> + rcu_read_lock(); > >> + dev = rcu_dereference(tfile->tun)->dev; > >> + txq = netdev_get_tx_queue(dev, tfile->queue_index); > >> + stopped = netif_tx_queue_stopped(txq); > >> + } > >> + barrier(); > >> + __ptr_ring_discard_one(&tfile->tx_ring, will_invalidate); > >> + > >> + if (unlikely(will_invalidate)) { > >> + if (stopped) > >> + netif_tx_wake_queue(txq); > >> + rcu_read_unlock(); > >> + } > >> + spin_unlock(&tfile->tx_ring.consumer_lock); > >> + > >> + return ptr; > >> +} > >> + > >> static void *tun_ring_recv(struct tun_file *tfile, int noblock, int *err) > >> { > >> DECLARE_WAITQUEUE(wait, current); > >> void *ptr = NULL; > >> int error = 0; > >> > >> - ptr = ptr_ring_consume(&tfile->tx_ring); > >> + ptr = tun_ring_consume(tfile); > >> if (ptr) > >> goto out; > >> if (noblock) { > >> @@ -2132,7 +2172,7 @@ static void *tun_ring_recv(struct tun_file *tfile, int noblock, int *err) > >> > >> while (1) { > >> set_current_state(TASK_INTERRUPTIBLE); > >> - ptr = ptr_ring_consume(&tfile->tx_ring); > >> + ptr = tun_ring_consume(tfile); > >> if (ptr) > >> break; > >> if (signal_pending(current)) { > >> @@ -3621,6 +3661,9 @@ static int tun_queue_resize(struct tun_struct *tun) > >> dev->tx_queue_len, GFP_KERNEL, > >> tun_ptr_free); > >> > >> + if (netif_running(dev)) > >> + netif_tx_wake_all_queues(dev); > >> + > >> kfree(rings); > >> return ret; > >> } > >> -- > >> 2.43.0 > >
On 24.09.25 08:55, Michael S. Tsirkin wrote: > On Wed, Sep 24, 2025 at 07:56:33AM +0200, Simon Schippers wrote: >> On 23.09.25 18:36, Michael S. Tsirkin wrote: >>> On Tue, Sep 23, 2025 at 12:15:49AM +0200, Simon Schippers wrote: >>>> The new wrappers tun_ring_consume/tap_ring_consume deal with consuming an >>>> entry of the ptr_ring and then waking the netdev queue when entries got >>>> invalidated to be used again by the producer. >>>> To avoid waking the netdev queue when the ptr_ring is full, it is checked >>>> if the netdev queue is stopped before invalidating entries. Like that the >>>> netdev queue can be safely woken after invalidating entries. >>>> >>>> The READ_ONCE in __ptr_ring_peek, paired with the smp_wmb() in >>>> __ptr_ring_produce within tun_net_xmit guarantees that the information >>>> about the netdev queue being stopped is visible after __ptr_ring_peek is >>>> called. >>>> >>>> The netdev queue is also woken after resizing the ptr_ring. >>>> >>>> Co-developed-by: Tim Gebauer <tim.gebauer@tu-dortmund.de> >>>> Signed-off-by: Tim Gebauer <tim.gebauer@tu-dortmund.de> >>>> Signed-off-by: Simon Schippers <simon.schippers@tu-dortmund.de> >>>> --- >>>> drivers/net/tap.c | 44 +++++++++++++++++++++++++++++++++++++++++++- >>>> drivers/net/tun.c | 47 +++++++++++++++++++++++++++++++++++++++++++++-- >>>> 2 files changed, 88 insertions(+), 3 deletions(-) >>>> >>>> diff --git a/drivers/net/tap.c b/drivers/net/tap.c >>>> index 1197f245e873..f8292721a9d6 100644 >>>> --- a/drivers/net/tap.c >>>> +++ b/drivers/net/tap.c >>>> @@ -753,6 +753,46 @@ static ssize_t tap_put_user(struct tap_queue *q, >>>> return ret ? ret : total; >>>> } >>>> >>>> +static struct sk_buff *tap_ring_consume(struct tap_queue *q) >>>> +{ >>>> + struct netdev_queue *txq; >>>> + struct net_device *dev; >>>> + bool will_invalidate; >>>> + bool stopped; >>>> + void *ptr; >>>> + >>>> + spin_lock(&q->ring.consumer_lock); >>>> + ptr = __ptr_ring_peek(&q->ring); >>>> + if (!ptr) { >>>> + spin_unlock(&q->ring.consumer_lock); >>>> + return ptr; >>>> + } >>>> + >>>> + /* Check if the queue stopped before zeroing out, so no ptr get >>>> + * produced in the meantime, because this could result in waking >>>> + * even though the ptr_ring is full. >>> >>> So what? Maybe it would be a bit suboptimal? But with your design, I do >>> not get what prevents this: >>> >>> >>> stopped? -> No >>> ring is stopped >>> discard >>> >>> and queue stays stopped forever >>> >>> >> >> I totally missed this (but I am not sure why it did not happen in my >> testing with different ptr_ring sizes..). >> >> I guess you are right, there must be some type of locking. >> It probably makes sense to lock the netdev txq->_xmit_lock whenever the >> consumer invalidates old ptr_ring entries (so when r->consumer_head >= >> r->consumer_tail). The producer holds this lock with dev->lltx=false. Then >> the consumer is able to wake the queue safely. >> >> So I would now just change the implementation to: >> tun_net_xmit: >> ... >> if ptr_ring_produce >> // Could happen because of unproduce in vhost_net.. >> netif_tx_stop_queue >> ... >> goto drop >> >> if ptr_ring_full >> netif_tx_stop_queue >> ... >> >> tun_ring_recv/tap_do_read (the implementation for the batched methods >> would be done in the similar way): >> ... >> ptr_ring_consume >> if r->consumer_head >= r->consumer_tail >> __netif_tx_lock_bh >> netif_tx_wake_queue >> __netif_tx_unlock_bh >> >> This implementation does not need any new ptr_ring helpers and no fancy >> ordering tricks. >> Would this implementation be sufficient in your opinion? > > > Maybe you mean == ? Pls don't poke at ptr ring internals though. > What are we testing for here? > I think the point is that a batch of entries was consumed? > Maybe __ptr_ring_consumed_batch ? and a comment explaining > this returns true when last successful call to consume > freed up a batch of space in the ring for producer to make > progress. > Yes, I mean ==. Having a dedicated helper for this purpose makes sense. I just find the name __ptr_ring_consumed_batch a bit confusing next to __ptr_ring_consume_batched, since they both refer to different kinds of batches. > > consumer_head == consumer_tail also happens rather a lot, > though thankfully not on every entry. > So taking tx lock each time this happens, even if queue > is not stopped, seems heavyweight. > > Yes, I agree — but avoiding locking probably requires some fancy ordering tricks again.. > > > >>>> The order of the operations >>>> + * is ensured by barrier(). >>>> + */ >>>> + will_invalidate = __ptr_ring_will_invalidate(&q->ring); >>>> + if (unlikely(will_invalidate)) { >>>> + rcu_read_lock(); >>>> + dev = rcu_dereference(q->tap)->dev; >>>> + txq = netdev_get_tx_queue(dev, q->queue_index); >>>> + stopped = netif_tx_queue_stopped(txq); >>>> + } >>>> + barrier(); >>>> + __ptr_ring_discard_one(&q->ring, will_invalidate); >>>> + >>>> + if (unlikely(will_invalidate)) { >>>> + if (stopped) >>>> + netif_tx_wake_queue(txq); >>>> + rcu_read_unlock(); >>>> + } >>> >>> >>> After an entry is consumed, you can detect this by checking >>> >>> r->consumer_head >= r->consumer_tail >>> >>> >>> so it seems you could keep calling regular ptr_ring_consume >>> and check afterwards? >>> >>> >>> >>> >>>> + spin_unlock(&q->ring.consumer_lock); >>>> + >>>> + return ptr; >>>> +} >>>> + >>>> static ssize_t tap_do_read(struct tap_queue *q, >>>> struct iov_iter *to, >>>> int noblock, struct sk_buff *skb) >>>> @@ -774,7 +814,7 @@ static ssize_t tap_do_read(struct tap_queue *q, >>>> TASK_INTERRUPTIBLE); >>>> >>>> /* Read frames from the queue */ >>>> - skb = ptr_ring_consume(&q->ring); >>>> + skb = tap_ring_consume(q); >>>> if (skb) >>>> break; >>>> if (noblock) { >>>> @@ -1207,6 +1247,8 @@ int tap_queue_resize(struct tap_dev *tap) >>>> ret = ptr_ring_resize_multiple_bh(rings, n, >>>> dev->tx_queue_len, GFP_KERNEL, >>>> __skb_array_destroy_skb); >>>> + if (netif_running(dev)) >>>> + netif_tx_wake_all_queues(dev); >>>> >>>> kfree(rings); >>>> return ret; >>>> diff --git a/drivers/net/tun.c b/drivers/net/tun.c >>>> index c6b22af9bae8..682df8157b55 100644 >>>> --- a/drivers/net/tun.c >>>> +++ b/drivers/net/tun.c >>>> @@ -2114,13 +2114,53 @@ static ssize_t tun_put_user(struct tun_struct *tun, >>>> return total; >>>> } >>>> >>>> +static void *tun_ring_consume(struct tun_file *tfile) >>>> +{ >>>> + struct netdev_queue *txq; >>>> + struct net_device *dev; >>>> + bool will_invalidate; >>>> + bool stopped; >>>> + void *ptr; >>>> + >>>> + spin_lock(&tfile->tx_ring.consumer_lock); >>>> + ptr = __ptr_ring_peek(&tfile->tx_ring); >>>> + if (!ptr) { >>>> + spin_unlock(&tfile->tx_ring.consumer_lock); >>>> + return ptr; >>>> + } >>>> + >>>> + /* Check if the queue stopped before zeroing out, so no ptr get >>>> + * produced in the meantime, because this could result in waking >>>> + * even though the ptr_ring is full. The order of the operations >>>> + * is ensured by barrier(). >>>> + */ >>>> + will_invalidate = __ptr_ring_will_invalidate(&tfile->tx_ring); >>>> + if (unlikely(will_invalidate)) { >>>> + rcu_read_lock(); >>>> + dev = rcu_dereference(tfile->tun)->dev; >>>> + txq = netdev_get_tx_queue(dev, tfile->queue_index); >>>> + stopped = netif_tx_queue_stopped(txq); >>>> + } >>>> + barrier(); >>>> + __ptr_ring_discard_one(&tfile->tx_ring, will_invalidate); >>>> + >>>> + if (unlikely(will_invalidate)) { >>>> + if (stopped) >>>> + netif_tx_wake_queue(txq); >>>> + rcu_read_unlock(); >>>> + } >>>> + spin_unlock(&tfile->tx_ring.consumer_lock); >>>> + >>>> + return ptr; >>>> +} >>>> + >>>> static void *tun_ring_recv(struct tun_file *tfile, int noblock, int *err) >>>> { >>>> DECLARE_WAITQUEUE(wait, current); >>>> void *ptr = NULL; >>>> int error = 0; >>>> >>>> - ptr = ptr_ring_consume(&tfile->tx_ring); >>>> + ptr = tun_ring_consume(tfile); >>>> if (ptr) >>>> goto out; >>>> if (noblock) { >>>> @@ -2132,7 +2172,7 @@ static void *tun_ring_recv(struct tun_file *tfile, int noblock, int *err) >>>> >>>> while (1) { >>>> set_current_state(TASK_INTERRUPTIBLE); >>>> - ptr = ptr_ring_consume(&tfile->tx_ring); >>>> + ptr = tun_ring_consume(tfile); >>>> if (ptr) >>>> break; >>>> if (signal_pending(current)) { >>>> @@ -3621,6 +3661,9 @@ static int tun_queue_resize(struct tun_struct *tun) >>>> dev->tx_queue_len, GFP_KERNEL, >>>> tun_ptr_free); >>>> >>>> + if (netif_running(dev)) >>>> + netif_tx_wake_all_queues(dev); >>>> + >>>> kfree(rings); >>>> return ret; >>>> } >>>> -- >>>> 2.43.0 >>> >
On Wed, Sep 24, 2025 at 09:42:45AM +0200, Simon Schippers wrote: > On 24.09.25 08:55, Michael S. Tsirkin wrote: > > On Wed, Sep 24, 2025 at 07:56:33AM +0200, Simon Schippers wrote: > >> On 23.09.25 18:36, Michael S. Tsirkin wrote: > >>> On Tue, Sep 23, 2025 at 12:15:49AM +0200, Simon Schippers wrote: > >>>> The new wrappers tun_ring_consume/tap_ring_consume deal with consuming an > >>>> entry of the ptr_ring and then waking the netdev queue when entries got > >>>> invalidated to be used again by the producer. > >>>> To avoid waking the netdev queue when the ptr_ring is full, it is checked > >>>> if the netdev queue is stopped before invalidating entries. Like that the > >>>> netdev queue can be safely woken after invalidating entries. > >>>> > >>>> The READ_ONCE in __ptr_ring_peek, paired with the smp_wmb() in > >>>> __ptr_ring_produce within tun_net_xmit guarantees that the information > >>>> about the netdev queue being stopped is visible after __ptr_ring_peek is > >>>> called. > >>>> > >>>> The netdev queue is also woken after resizing the ptr_ring. > >>>> > >>>> Co-developed-by: Tim Gebauer <tim.gebauer@tu-dortmund.de> > >>>> Signed-off-by: Tim Gebauer <tim.gebauer@tu-dortmund.de> > >>>> Signed-off-by: Simon Schippers <simon.schippers@tu-dortmund.de> > >>>> --- > >>>> drivers/net/tap.c | 44 +++++++++++++++++++++++++++++++++++++++++++- > >>>> drivers/net/tun.c | 47 +++++++++++++++++++++++++++++++++++++++++++++-- > >>>> 2 files changed, 88 insertions(+), 3 deletions(-) > >>>> > >>>> diff --git a/drivers/net/tap.c b/drivers/net/tap.c > >>>> index 1197f245e873..f8292721a9d6 100644 > >>>> --- a/drivers/net/tap.c > >>>> +++ b/drivers/net/tap.c > >>>> @@ -753,6 +753,46 @@ static ssize_t tap_put_user(struct tap_queue *q, > >>>> return ret ? ret : total; > >>>> } > >>>> > >>>> +static struct sk_buff *tap_ring_consume(struct tap_queue *q) > >>>> +{ > >>>> + struct netdev_queue *txq; > >>>> + struct net_device *dev; > >>>> + bool will_invalidate; > >>>> + bool stopped; > >>>> + void *ptr; > >>>> + > >>>> + spin_lock(&q->ring.consumer_lock); > >>>> + ptr = __ptr_ring_peek(&q->ring); > >>>> + if (!ptr) { > >>>> + spin_unlock(&q->ring.consumer_lock); > >>>> + return ptr; > >>>> + } > >>>> + > >>>> + /* Check if the queue stopped before zeroing out, so no ptr get > >>>> + * produced in the meantime, because this could result in waking > >>>> + * even though the ptr_ring is full. > >>> > >>> So what? Maybe it would be a bit suboptimal? But with your design, I do > >>> not get what prevents this: > >>> > >>> > >>> stopped? -> No > >>> ring is stopped > >>> discard > >>> > >>> and queue stays stopped forever > >>> > >>> > >> > >> I totally missed this (but I am not sure why it did not happen in my > >> testing with different ptr_ring sizes..). > >> > >> I guess you are right, there must be some type of locking. > >> It probably makes sense to lock the netdev txq->_xmit_lock whenever the > >> consumer invalidates old ptr_ring entries (so when r->consumer_head >= > >> r->consumer_tail). The producer holds this lock with dev->lltx=false. Then > >> the consumer is able to wake the queue safely. > >> > >> So I would now just change the implementation to: > >> tun_net_xmit: > >> ... > >> if ptr_ring_produce > >> // Could happen because of unproduce in vhost_net.. > >> netif_tx_stop_queue > >> ... > >> goto drop > >> > >> if ptr_ring_full > >> netif_tx_stop_queue > >> ... > >> > >> tun_ring_recv/tap_do_read (the implementation for the batched methods > >> would be done in the similar way): > >> ... > >> ptr_ring_consume > >> if r->consumer_head >= r->consumer_tail > >> __netif_tx_lock_bh > >> netif_tx_wake_queue > >> __netif_tx_unlock_bh > >> > >> This implementation does not need any new ptr_ring helpers and no fancy > >> ordering tricks. > >> Would this implementation be sufficient in your opinion? > > > > > > Maybe you mean == ? Pls don't poke at ptr ring internals though. > > What are we testing for here? > > I think the point is that a batch of entries was consumed? > > Maybe __ptr_ring_consumed_batch ? and a comment explaining > > this returns true when last successful call to consume > > freed up a batch of space in the ring for producer to make > > progress. > > > > Yes, I mean ==. > > Having a dedicated helper for this purpose makes sense. I just find > the name __ptr_ring_consumed_batch a bit confusing next to > __ptr_ring_consume_batched, since they both refer to different kinds of > batches. __ptr_ring_consume_created_space ? /* Previous call to ptr_ring_consume created some space. * * NB: only refers to the last call to __ptr_ring_consume, * if you are calling ptr_ring_consume multiple times, you * have to check this multiple times. * Accordingly, do not use this after __ptr_ring_consume_batched. */ > > > > consumer_head == consumer_tail also happens rather a lot, > > though thankfully not on every entry. > > So taking tx lock each time this happens, even if queue > > is not stopped, seems heavyweight. > > > > > > Yes, I agree — but avoiding locking probably requires some fancy > ordering tricks again.. > > > > > > > > > >>>> The order of the operations > >>>> + * is ensured by barrier(). > >>>> + */ > >>>> + will_invalidate = __ptr_ring_will_invalidate(&q->ring); > >>>> + if (unlikely(will_invalidate)) { > >>>> + rcu_read_lock(); > >>>> + dev = rcu_dereference(q->tap)->dev; > >>>> + txq = netdev_get_tx_queue(dev, q->queue_index); > >>>> + stopped = netif_tx_queue_stopped(txq); > >>>> + } > >>>> + barrier(); > >>>> + __ptr_ring_discard_one(&q->ring, will_invalidate); > >>>> + > >>>> + if (unlikely(will_invalidate)) { > >>>> + if (stopped) > >>>> + netif_tx_wake_queue(txq); > >>>> + rcu_read_unlock(); > >>>> + } > >>> > >>> > >>> After an entry is consumed, you can detect this by checking > >>> > >>> r->consumer_head >= r->consumer_tail > >>> > >>> > >>> so it seems you could keep calling regular ptr_ring_consume > >>> and check afterwards? > >>> > >>> > >>> > >>> > >>>> + spin_unlock(&q->ring.consumer_lock); > >>>> + > >>>> + return ptr; > >>>> +} > >>>> + > >>>> static ssize_t tap_do_read(struct tap_queue *q, > >>>> struct iov_iter *to, > >>>> int noblock, struct sk_buff *skb) > >>>> @@ -774,7 +814,7 @@ static ssize_t tap_do_read(struct tap_queue *q, > >>>> TASK_INTERRUPTIBLE); > >>>> > >>>> /* Read frames from the queue */ > >>>> - skb = ptr_ring_consume(&q->ring); > >>>> + skb = tap_ring_consume(q); > >>>> if (skb) > >>>> break; > >>>> if (noblock) { > >>>> @@ -1207,6 +1247,8 @@ int tap_queue_resize(struct tap_dev *tap) > >>>> ret = ptr_ring_resize_multiple_bh(rings, n, > >>>> dev->tx_queue_len, GFP_KERNEL, > >>>> __skb_array_destroy_skb); > >>>> + if (netif_running(dev)) > >>>> + netif_tx_wake_all_queues(dev); > >>>> > >>>> kfree(rings); > >>>> return ret; > >>>> diff --git a/drivers/net/tun.c b/drivers/net/tun.c > >>>> index c6b22af9bae8..682df8157b55 100644 > >>>> --- a/drivers/net/tun.c > >>>> +++ b/drivers/net/tun.c > >>>> @@ -2114,13 +2114,53 @@ static ssize_t tun_put_user(struct tun_struct *tun, > >>>> return total; > >>>> } > >>>> > >>>> +static void *tun_ring_consume(struct tun_file *tfile) > >>>> +{ > >>>> + struct netdev_queue *txq; > >>>> + struct net_device *dev; > >>>> + bool will_invalidate; > >>>> + bool stopped; > >>>> + void *ptr; > >>>> + > >>>> + spin_lock(&tfile->tx_ring.consumer_lock); > >>>> + ptr = __ptr_ring_peek(&tfile->tx_ring); > >>>> + if (!ptr) { > >>>> + spin_unlock(&tfile->tx_ring.consumer_lock); > >>>> + return ptr; > >>>> + } > >>>> + > >>>> + /* Check if the queue stopped before zeroing out, so no ptr get > >>>> + * produced in the meantime, because this could result in waking > >>>> + * even though the ptr_ring is full. The order of the operations > >>>> + * is ensured by barrier(). > >>>> + */ > >>>> + will_invalidate = __ptr_ring_will_invalidate(&tfile->tx_ring); > >>>> + if (unlikely(will_invalidate)) { > >>>> + rcu_read_lock(); > >>>> + dev = rcu_dereference(tfile->tun)->dev; > >>>> + txq = netdev_get_tx_queue(dev, tfile->queue_index); > >>>> + stopped = netif_tx_queue_stopped(txq); > >>>> + } > >>>> + barrier(); > >>>> + __ptr_ring_discard_one(&tfile->tx_ring, will_invalidate); > >>>> + > >>>> + if (unlikely(will_invalidate)) { > >>>> + if (stopped) > >>>> + netif_tx_wake_queue(txq); > >>>> + rcu_read_unlock(); > >>>> + } > >>>> + spin_unlock(&tfile->tx_ring.consumer_lock); > >>>> + > >>>> + return ptr; > >>>> +} > >>>> + > >>>> static void *tun_ring_recv(struct tun_file *tfile, int noblock, int *err) > >>>> { > >>>> DECLARE_WAITQUEUE(wait, current); > >>>> void *ptr = NULL; > >>>> int error = 0; > >>>> > >>>> - ptr = ptr_ring_consume(&tfile->tx_ring); > >>>> + ptr = tun_ring_consume(tfile); > >>>> if (ptr) > >>>> goto out; > >>>> if (noblock) { > >>>> @@ -2132,7 +2172,7 @@ static void *tun_ring_recv(struct tun_file *tfile, int noblock, int *err) > >>>> > >>>> while (1) { > >>>> set_current_state(TASK_INTERRUPTIBLE); > >>>> - ptr = ptr_ring_consume(&tfile->tx_ring); > >>>> + ptr = tun_ring_consume(tfile); > >>>> if (ptr) > >>>> break; > >>>> if (signal_pending(current)) { > >>>> @@ -3621,6 +3661,9 @@ static int tun_queue_resize(struct tun_struct *tun) > >>>> dev->tx_queue_len, GFP_KERNEL, > >>>> tun_ptr_free); > >>>> > >>>> + if (netif_running(dev)) > >>>> + netif_tx_wake_all_queues(dev); > >>>> + > >>>> kfree(rings); > >>>> return ret; > >>>> } > >>>> -- > >>>> 2.43.0 > >>> > >
On 24.09.25 09:49, Michael S. Tsirkin wrote: > On Wed, Sep 24, 2025 at 09:42:45AM +0200, Simon Schippers wrote: >> On 24.09.25 08:55, Michael S. Tsirkin wrote: >>> On Wed, Sep 24, 2025 at 07:56:33AM +0200, Simon Schippers wrote: >>>> On 23.09.25 18:36, Michael S. Tsirkin wrote: >>>>> On Tue, Sep 23, 2025 at 12:15:49AM +0200, Simon Schippers wrote: >>>>>> The new wrappers tun_ring_consume/tap_ring_consume deal with consuming an >>>>>> entry of the ptr_ring and then waking the netdev queue when entries got >>>>>> invalidated to be used again by the producer. >>>>>> To avoid waking the netdev queue when the ptr_ring is full, it is checked >>>>>> if the netdev queue is stopped before invalidating entries. Like that the >>>>>> netdev queue can be safely woken after invalidating entries. >>>>>> >>>>>> The READ_ONCE in __ptr_ring_peek, paired with the smp_wmb() in >>>>>> __ptr_ring_produce within tun_net_xmit guarantees that the information >>>>>> about the netdev queue being stopped is visible after __ptr_ring_peek is >>>>>> called. >>>>>> >>>>>> The netdev queue is also woken after resizing the ptr_ring. >>>>>> >>>>>> Co-developed-by: Tim Gebauer <tim.gebauer@tu-dortmund.de> >>>>>> Signed-off-by: Tim Gebauer <tim.gebauer@tu-dortmund.de> >>>>>> Signed-off-by: Simon Schippers <simon.schippers@tu-dortmund.de> >>>>>> --- >>>>>> drivers/net/tap.c | 44 +++++++++++++++++++++++++++++++++++++++++++- >>>>>> drivers/net/tun.c | 47 +++++++++++++++++++++++++++++++++++++++++++++-- >>>>>> 2 files changed, 88 insertions(+), 3 deletions(-) >>>>>> >>>>>> diff --git a/drivers/net/tap.c b/drivers/net/tap.c >>>>>> index 1197f245e873..f8292721a9d6 100644 >>>>>> --- a/drivers/net/tap.c >>>>>> +++ b/drivers/net/tap.c >>>>>> @@ -753,6 +753,46 @@ static ssize_t tap_put_user(struct tap_queue *q, >>>>>> return ret ? ret : total; >>>>>> } >>>>>> >>>>>> +static struct sk_buff *tap_ring_consume(struct tap_queue *q) >>>>>> +{ >>>>>> + struct netdev_queue *txq; >>>>>> + struct net_device *dev; >>>>>> + bool will_invalidate; >>>>>> + bool stopped; >>>>>> + void *ptr; >>>>>> + >>>>>> + spin_lock(&q->ring.consumer_lock); >>>>>> + ptr = __ptr_ring_peek(&q->ring); >>>>>> + if (!ptr) { >>>>>> + spin_unlock(&q->ring.consumer_lock); >>>>>> + return ptr; >>>>>> + } >>>>>> + >>>>>> + /* Check if the queue stopped before zeroing out, so no ptr get >>>>>> + * produced in the meantime, because this could result in waking >>>>>> + * even though the ptr_ring is full. >>>>> >>>>> So what? Maybe it would be a bit suboptimal? But with your design, I do >>>>> not get what prevents this: >>>>> >>>>> >>>>> stopped? -> No >>>>> ring is stopped >>>>> discard >>>>> >>>>> and queue stays stopped forever >>>>> >>>>> >>>> >>>> I totally missed this (but I am not sure why it did not happen in my >>>> testing with different ptr_ring sizes..). >>>> >>>> I guess you are right, there must be some type of locking. >>>> It probably makes sense to lock the netdev txq->_xmit_lock whenever the >>>> consumer invalidates old ptr_ring entries (so when r->consumer_head >= >>>> r->consumer_tail). The producer holds this lock with dev->lltx=false. Then >>>> the consumer is able to wake the queue safely. >>>> >>>> So I would now just change the implementation to: >>>> tun_net_xmit: >>>> ... >>>> if ptr_ring_produce >>>> // Could happen because of unproduce in vhost_net.. >>>> netif_tx_stop_queue >>>> ... >>>> goto drop >>>> >>>> if ptr_ring_full >>>> netif_tx_stop_queue >>>> ... >>>> >>>> tun_ring_recv/tap_do_read (the implementation for the batched methods >>>> would be done in the similar way): >>>> ... >>>> ptr_ring_consume >>>> if r->consumer_head >= r->consumer_tail >>>> __netif_tx_lock_bh >>>> netif_tx_wake_queue >>>> __netif_tx_unlock_bh >>>> >>>> This implementation does not need any new ptr_ring helpers and no fancy >>>> ordering tricks. >>>> Would this implementation be sufficient in your opinion? >>> >>> >>> Maybe you mean == ? Pls don't poke at ptr ring internals though. >>> What are we testing for here? >>> I think the point is that a batch of entries was consumed? >>> Maybe __ptr_ring_consumed_batch ? and a comment explaining >>> this returns true when last successful call to consume >>> freed up a batch of space in the ring for producer to make >>> progress. >>> >> >> Yes, I mean ==. >> >> Having a dedicated helper for this purpose makes sense. I just find >> the name __ptr_ring_consumed_batch a bit confusing next to >> __ptr_ring_consume_batched, since they both refer to different kinds of >> batches. > > __ptr_ring_consume_created_space ? > > /* Previous call to ptr_ring_consume created some space. > * > * NB: only refers to the last call to __ptr_ring_consume, > * if you are calling ptr_ring_consume multiple times, you > * have to check this multiple times. > * Accordingly, do not use this after __ptr_ring_consume_batched. > */ > Sounds good. Regarding __ptr_ring_consume_batched: Theoretically the consumer_tail before and after calling the method could be compared to avoid calling __ptr_ring_consume_created_space at each iteration. But I guess it is also fine calling it at each iteration. >>> >>> consumer_head == consumer_tail also happens rather a lot, >>> though thankfully not on every entry. >>> So taking tx lock each time this happens, even if queue >>> is not stopped, seems heavyweight. >>> >>> >> >> Yes, I agree — but avoiding locking probably requires some fancy >> ordering tricks again.. >> >> >>> >>> >>> >>>>>> The order of the operations >>>>>> + * is ensured by barrier(). >>>>>> + */ >>>>>> + will_invalidate = __ptr_ring_will_invalidate(&q->ring); >>>>>> + if (unlikely(will_invalidate)) { >>>>>> + rcu_read_lock(); >>>>>> + dev = rcu_dereference(q->tap)->dev; >>>>>> + txq = netdev_get_tx_queue(dev, q->queue_index); >>>>>> + stopped = netif_tx_queue_stopped(txq); >>>>>> + } >>>>>> + barrier(); >>>>>> + __ptr_ring_discard_one(&q->ring, will_invalidate); >>>>>> + >>>>>> + if (unlikely(will_invalidate)) { >>>>>> + if (stopped) >>>>>> + netif_tx_wake_queue(txq); >>>>>> + rcu_read_unlock(); >>>>>> + } >>>>> >>>>> >>>>> After an entry is consumed, you can detect this by checking >>>>> >>>>> r->consumer_head >= r->consumer_tail >>>>> >>>>> >>>>> so it seems you could keep calling regular ptr_ring_consume >>>>> and check afterwards? >>>>> >>>>> >>>>> >>>>> >>>>>> + spin_unlock(&q->ring.consumer_lock); >>>>>> + >>>>>> + return ptr; >>>>>> +} >>>>>> + >>>>>> static ssize_t tap_do_read(struct tap_queue *q, >>>>>> struct iov_iter *to, >>>>>> int noblock, struct sk_buff *skb) >>>>>> @@ -774,7 +814,7 @@ static ssize_t tap_do_read(struct tap_queue *q, >>>>>> TASK_INTERRUPTIBLE); >>>>>> >>>>>> /* Read frames from the queue */ >>>>>> - skb = ptr_ring_consume(&q->ring); >>>>>> + skb = tap_ring_consume(q); >>>>>> if (skb) >>>>>> break; >>>>>> if (noblock) { >>>>>> @@ -1207,6 +1247,8 @@ int tap_queue_resize(struct tap_dev *tap) >>>>>> ret = ptr_ring_resize_multiple_bh(rings, n, >>>>>> dev->tx_queue_len, GFP_KERNEL, >>>>>> __skb_array_destroy_skb); >>>>>> + if (netif_running(dev)) >>>>>> + netif_tx_wake_all_queues(dev); >>>>>> >>>>>> kfree(rings); >>>>>> return ret; >>>>>> diff --git a/drivers/net/tun.c b/drivers/net/tun.c >>>>>> index c6b22af9bae8..682df8157b55 100644 >>>>>> --- a/drivers/net/tun.c >>>>>> +++ b/drivers/net/tun.c >>>>>> @@ -2114,13 +2114,53 @@ static ssize_t tun_put_user(struct tun_struct *tun, >>>>>> return total; >>>>>> } >>>>>> >>>>>> +static void *tun_ring_consume(struct tun_file *tfile) >>>>>> +{ >>>>>> + struct netdev_queue *txq; >>>>>> + struct net_device *dev; >>>>>> + bool will_invalidate; >>>>>> + bool stopped; >>>>>> + void *ptr; >>>>>> + >>>>>> + spin_lock(&tfile->tx_ring.consumer_lock); >>>>>> + ptr = __ptr_ring_peek(&tfile->tx_ring); >>>>>> + if (!ptr) { >>>>>> + spin_unlock(&tfile->tx_ring.consumer_lock); >>>>>> + return ptr; >>>>>> + } >>>>>> + >>>>>> + /* Check if the queue stopped before zeroing out, so no ptr get >>>>>> + * produced in the meantime, because this could result in waking >>>>>> + * even though the ptr_ring is full. The order of the operations >>>>>> + * is ensured by barrier(). >>>>>> + */ >>>>>> + will_invalidate = __ptr_ring_will_invalidate(&tfile->tx_ring); >>>>>> + if (unlikely(will_invalidate)) { >>>>>> + rcu_read_lock(); >>>>>> + dev = rcu_dereference(tfile->tun)->dev; >>>>>> + txq = netdev_get_tx_queue(dev, tfile->queue_index); >>>>>> + stopped = netif_tx_queue_stopped(txq); >>>>>> + } >>>>>> + barrier(); >>>>>> + __ptr_ring_discard_one(&tfile->tx_ring, will_invalidate); >>>>>> + >>>>>> + if (unlikely(will_invalidate)) { >>>>>> + if (stopped) >>>>>> + netif_tx_wake_queue(txq); >>>>>> + rcu_read_unlock(); >>>>>> + } >>>>>> + spin_unlock(&tfile->tx_ring.consumer_lock); >>>>>> + >>>>>> + return ptr; >>>>>> +} >>>>>> + >>>>>> static void *tun_ring_recv(struct tun_file *tfile, int noblock, int *err) >>>>>> { >>>>>> DECLARE_WAITQUEUE(wait, current); >>>>>> void *ptr = NULL; >>>>>> int error = 0; >>>>>> >>>>>> - ptr = ptr_ring_consume(&tfile->tx_ring); >>>>>> + ptr = tun_ring_consume(tfile); >>>>>> if (ptr) >>>>>> goto out; >>>>>> if (noblock) { >>>>>> @@ -2132,7 +2172,7 @@ static void *tun_ring_recv(struct tun_file *tfile, int noblock, int *err) >>>>>> >>>>>> while (1) { >>>>>> set_current_state(TASK_INTERRUPTIBLE); >>>>>> - ptr = ptr_ring_consume(&tfile->tx_ring); >>>>>> + ptr = tun_ring_consume(tfile); >>>>>> if (ptr) >>>>>> break; >>>>>> if (signal_pending(current)) { >>>>>> @@ -3621,6 +3661,9 @@ static int tun_queue_resize(struct tun_struct *tun) >>>>>> dev->tx_queue_len, GFP_KERNEL, >>>>>> tun_ptr_free); >>>>>> >>>>>> + if (netif_running(dev)) >>>>>> + netif_tx_wake_all_queues(dev); >>>>>> + >>>>>> kfree(rings); >>>>>> return ret; >>>>>> } >>>>>> -- >>>>>> 2.43.0 >>>>> >>> >
On Wed, Sep 24, 2025 at 10:40:04AM +0200, Simon Schippers wrote: > On 24.09.25 09:49, Michael S. Tsirkin wrote: > > On Wed, Sep 24, 2025 at 09:42:45AM +0200, Simon Schippers wrote: > >> On 24.09.25 08:55, Michael S. Tsirkin wrote: > >>> On Wed, Sep 24, 2025 at 07:56:33AM +0200, Simon Schippers wrote: > >>>> On 23.09.25 18:36, Michael S. Tsirkin wrote: > >>>>> On Tue, Sep 23, 2025 at 12:15:49AM +0200, Simon Schippers wrote: > >>>>>> The new wrappers tun_ring_consume/tap_ring_consume deal with consuming an > >>>>>> entry of the ptr_ring and then waking the netdev queue when entries got > >>>>>> invalidated to be used again by the producer. > >>>>>> To avoid waking the netdev queue when the ptr_ring is full, it is checked > >>>>>> if the netdev queue is stopped before invalidating entries. Like that the > >>>>>> netdev queue can be safely woken after invalidating entries. > >>>>>> > >>>>>> The READ_ONCE in __ptr_ring_peek, paired with the smp_wmb() in > >>>>>> __ptr_ring_produce within tun_net_xmit guarantees that the information > >>>>>> about the netdev queue being stopped is visible after __ptr_ring_peek is > >>>>>> called. > >>>>>> > >>>>>> The netdev queue is also woken after resizing the ptr_ring. > >>>>>> > >>>>>> Co-developed-by: Tim Gebauer <tim.gebauer@tu-dortmund.de> > >>>>>> Signed-off-by: Tim Gebauer <tim.gebauer@tu-dortmund.de> > >>>>>> Signed-off-by: Simon Schippers <simon.schippers@tu-dortmund.de> > >>>>>> --- > >>>>>> drivers/net/tap.c | 44 +++++++++++++++++++++++++++++++++++++++++++- > >>>>>> drivers/net/tun.c | 47 +++++++++++++++++++++++++++++++++++++++++++++-- > >>>>>> 2 files changed, 88 insertions(+), 3 deletions(-) > >>>>>> > >>>>>> diff --git a/drivers/net/tap.c b/drivers/net/tap.c > >>>>>> index 1197f245e873..f8292721a9d6 100644 > >>>>>> --- a/drivers/net/tap.c > >>>>>> +++ b/drivers/net/tap.c > >>>>>> @@ -753,6 +753,46 @@ static ssize_t tap_put_user(struct tap_queue *q, > >>>>>> return ret ? ret : total; > >>>>>> } > >>>>>> > >>>>>> +static struct sk_buff *tap_ring_consume(struct tap_queue *q) > >>>>>> +{ > >>>>>> + struct netdev_queue *txq; > >>>>>> + struct net_device *dev; > >>>>>> + bool will_invalidate; > >>>>>> + bool stopped; > >>>>>> + void *ptr; > >>>>>> + > >>>>>> + spin_lock(&q->ring.consumer_lock); > >>>>>> + ptr = __ptr_ring_peek(&q->ring); > >>>>>> + if (!ptr) { > >>>>>> + spin_unlock(&q->ring.consumer_lock); > >>>>>> + return ptr; > >>>>>> + } > >>>>>> + > >>>>>> + /* Check if the queue stopped before zeroing out, so no ptr get > >>>>>> + * produced in the meantime, because this could result in waking > >>>>>> + * even though the ptr_ring is full. > >>>>> > >>>>> So what? Maybe it would be a bit suboptimal? But with your design, I do > >>>>> not get what prevents this: > >>>>> > >>>>> > >>>>> stopped? -> No > >>>>> ring is stopped > >>>>> discard > >>>>> > >>>>> and queue stays stopped forever > >>>>> > >>>>> > >>>> > >>>> I totally missed this (but I am not sure why it did not happen in my > >>>> testing with different ptr_ring sizes..). > >>>> > >>>> I guess you are right, there must be some type of locking. > >>>> It probably makes sense to lock the netdev txq->_xmit_lock whenever the > >>>> consumer invalidates old ptr_ring entries (so when r->consumer_head >= > >>>> r->consumer_tail). The producer holds this lock with dev->lltx=false. Then > >>>> the consumer is able to wake the queue safely. > >>>> > >>>> So I would now just change the implementation to: > >>>> tun_net_xmit: > >>>> ... > >>>> if ptr_ring_produce > >>>> // Could happen because of unproduce in vhost_net.. > >>>> netif_tx_stop_queue > >>>> ... > >>>> goto drop > >>>> > >>>> if ptr_ring_full > >>>> netif_tx_stop_queue > >>>> ... > >>>> > >>>> tun_ring_recv/tap_do_read (the implementation for the batched methods > >>>> would be done in the similar way): > >>>> ... > >>>> ptr_ring_consume > >>>> if r->consumer_head >= r->consumer_tail > >>>> __netif_tx_lock_bh > >>>> netif_tx_wake_queue > >>>> __netif_tx_unlock_bh > >>>> > >>>> This implementation does not need any new ptr_ring helpers and no fancy > >>>> ordering tricks. > >>>> Would this implementation be sufficient in your opinion? > >>> > >>> > >>> Maybe you mean == ? Pls don't poke at ptr ring internals though. > >>> What are we testing for here? > >>> I think the point is that a batch of entries was consumed? > >>> Maybe __ptr_ring_consumed_batch ? and a comment explaining > >>> this returns true when last successful call to consume > >>> freed up a batch of space in the ring for producer to make > >>> progress. > >>> > >> > >> Yes, I mean ==. > >> > >> Having a dedicated helper for this purpose makes sense. I just find > >> the name __ptr_ring_consumed_batch a bit confusing next to > >> __ptr_ring_consume_batched, since they both refer to different kinds of > >> batches. > > > > __ptr_ring_consume_created_space ? > > > > /* Previous call to ptr_ring_consume created some space. > > * > > * NB: only refers to the last call to __ptr_ring_consume, > > * if you are calling ptr_ring_consume multiple times, you > > * have to check this multiple times. > > * Accordingly, do not use this after __ptr_ring_consume_batched. > > */ > > > > Sounds good. > > Regarding __ptr_ring_consume_batched: > Theoretically the consumer_tail before and after calling the method could > be compared to avoid calling __ptr_ring_consume_created_space at each > iteration. But I guess it is also fine calling it at each iteration. Hmm good point, though I worry about wrap-around a bit. > >>> > >>> consumer_head == consumer_tail also happens rather a lot, > >>> though thankfully not on every entry. > >>> So taking tx lock each time this happens, even if queue > >>> is not stopped, seems heavyweight. > >>> > >>> > >> > >> Yes, I agree — but avoiding locking probably requires some fancy > >> ordering tricks again.. > >> > >> > >>> > >>> > >>> > >>>>>> The order of the operations > >>>>>> + * is ensured by barrier(). > >>>>>> + */ > >>>>>> + will_invalidate = __ptr_ring_will_invalidate(&q->ring); > >>>>>> + if (unlikely(will_invalidate)) { > >>>>>> + rcu_read_lock(); > >>>>>> + dev = rcu_dereference(q->tap)->dev; > >>>>>> + txq = netdev_get_tx_queue(dev, q->queue_index); > >>>>>> + stopped = netif_tx_queue_stopped(txq); > >>>>>> + } > >>>>>> + barrier(); > >>>>>> + __ptr_ring_discard_one(&q->ring, will_invalidate); > >>>>>> + > >>>>>> + if (unlikely(will_invalidate)) { > >>>>>> + if (stopped) > >>>>>> + netif_tx_wake_queue(txq); > >>>>>> + rcu_read_unlock(); > >>>>>> + } > >>>>> > >>>>> > >>>>> After an entry is consumed, you can detect this by checking > >>>>> > >>>>> r->consumer_head >= r->consumer_tail > >>>>> > >>>>> > >>>>> so it seems you could keep calling regular ptr_ring_consume > >>>>> and check afterwards? > >>>>> > >>>>> > >>>>> > >>>>> > >>>>>> + spin_unlock(&q->ring.consumer_lock); > >>>>>> + > >>>>>> + return ptr; > >>>>>> +} > >>>>>> + > >>>>>> static ssize_t tap_do_read(struct tap_queue *q, > >>>>>> struct iov_iter *to, > >>>>>> int noblock, struct sk_buff *skb) > >>>>>> @@ -774,7 +814,7 @@ static ssize_t tap_do_read(struct tap_queue *q, > >>>>>> TASK_INTERRUPTIBLE); > >>>>>> > >>>>>> /* Read frames from the queue */ > >>>>>> - skb = ptr_ring_consume(&q->ring); > >>>>>> + skb = tap_ring_consume(q); > >>>>>> if (skb) > >>>>>> break; > >>>>>> if (noblock) { > >>>>>> @@ -1207,6 +1247,8 @@ int tap_queue_resize(struct tap_dev *tap) > >>>>>> ret = ptr_ring_resize_multiple_bh(rings, n, > >>>>>> dev->tx_queue_len, GFP_KERNEL, > >>>>>> __skb_array_destroy_skb); > >>>>>> + if (netif_running(dev)) > >>>>>> + netif_tx_wake_all_queues(dev); > >>>>>> > >>>>>> kfree(rings); > >>>>>> return ret; > >>>>>> diff --git a/drivers/net/tun.c b/drivers/net/tun.c > >>>>>> index c6b22af9bae8..682df8157b55 100644 > >>>>>> --- a/drivers/net/tun.c > >>>>>> +++ b/drivers/net/tun.c > >>>>>> @@ -2114,13 +2114,53 @@ static ssize_t tun_put_user(struct tun_struct *tun, > >>>>>> return total; > >>>>>> } > >>>>>> > >>>>>> +static void *tun_ring_consume(struct tun_file *tfile) > >>>>>> +{ > >>>>>> + struct netdev_queue *txq; > >>>>>> + struct net_device *dev; > >>>>>> + bool will_invalidate; > >>>>>> + bool stopped; > >>>>>> + void *ptr; > >>>>>> + > >>>>>> + spin_lock(&tfile->tx_ring.consumer_lock); > >>>>>> + ptr = __ptr_ring_peek(&tfile->tx_ring); > >>>>>> + if (!ptr) { > >>>>>> + spin_unlock(&tfile->tx_ring.consumer_lock); > >>>>>> + return ptr; > >>>>>> + } > >>>>>> + > >>>>>> + /* Check if the queue stopped before zeroing out, so no ptr get > >>>>>> + * produced in the meantime, because this could result in waking > >>>>>> + * even though the ptr_ring is full. The order of the operations > >>>>>> + * is ensured by barrier(). > >>>>>> + */ > >>>>>> + will_invalidate = __ptr_ring_will_invalidate(&tfile->tx_ring); > >>>>>> + if (unlikely(will_invalidate)) { > >>>>>> + rcu_read_lock(); > >>>>>> + dev = rcu_dereference(tfile->tun)->dev; > >>>>>> + txq = netdev_get_tx_queue(dev, tfile->queue_index); > >>>>>> + stopped = netif_tx_queue_stopped(txq); > >>>>>> + } > >>>>>> + barrier(); > >>>>>> + __ptr_ring_discard_one(&tfile->tx_ring, will_invalidate); > >>>>>> + > >>>>>> + if (unlikely(will_invalidate)) { > >>>>>> + if (stopped) > >>>>>> + netif_tx_wake_queue(txq); > >>>>>> + rcu_read_unlock(); > >>>>>> + } > >>>>>> + spin_unlock(&tfile->tx_ring.consumer_lock); > >>>>>> + > >>>>>> + return ptr; > >>>>>> +} > >>>>>> + > >>>>>> static void *tun_ring_recv(struct tun_file *tfile, int noblock, int *err) > >>>>>> { > >>>>>> DECLARE_WAITQUEUE(wait, current); > >>>>>> void *ptr = NULL; > >>>>>> int error = 0; > >>>>>> > >>>>>> - ptr = ptr_ring_consume(&tfile->tx_ring); > >>>>>> + ptr = tun_ring_consume(tfile); > >>>>>> if (ptr) > >>>>>> goto out; > >>>>>> if (noblock) { > >>>>>> @@ -2132,7 +2172,7 @@ static void *tun_ring_recv(struct tun_file *tfile, int noblock, int *err) > >>>>>> > >>>>>> while (1) { > >>>>>> set_current_state(TASK_INTERRUPTIBLE); > >>>>>> - ptr = ptr_ring_consume(&tfile->tx_ring); > >>>>>> + ptr = tun_ring_consume(tfile); > >>>>>> if (ptr) > >>>>>> break; > >>>>>> if (signal_pending(current)) { > >>>>>> @@ -3621,6 +3661,9 @@ static int tun_queue_resize(struct tun_struct *tun) > >>>>>> dev->tx_queue_len, GFP_KERNEL, > >>>>>> tun_ptr_free); > >>>>>> > >>>>>> + if (netif_running(dev)) > >>>>>> + netif_tx_wake_all_queues(dev); > >>>>>> + > >>>>>> kfree(rings); > >>>>>> return ret; > >>>>>> } > >>>>>> -- > >>>>>> 2.43.0 > >>>>> > >>> > >
On Tue, Sep 23, 2025 at 12:15:49AM +0200, Simon Schippers wrote: > The new wrappers tun_ring_consume/tap_ring_consume deal with consuming an > entry of the ptr_ring and then waking the netdev queue when entries got > invalidated to be used again by the producer. > To avoid waking the netdev queue when the ptr_ring is full, it is checked > if the netdev queue is stopped before invalidating entries. Like that the > netdev queue can be safely woken after invalidating entries. > > The READ_ONCE in __ptr_ring_peek, paired with the smp_wmb() in > __ptr_ring_produce within tun_net_xmit guarantees that the information > about the netdev queue being stopped is visible after __ptr_ring_peek is > called. > > The netdev queue is also woken after resizing the ptr_ring. > > Co-developed-by: Tim Gebauer <tim.gebauer@tu-dortmund.de> > Signed-off-by: Tim Gebauer <tim.gebauer@tu-dortmund.de> > Signed-off-by: Simon Schippers <simon.schippers@tu-dortmund.de> Sounds like there is subtle interplay here between queue stopped bit and ring full status, all lockless with fancy ordering tricks. I have to say this is fragile. I'd like to see much more documentation. Or alternatively, I ask myself if, after detecting flow control issues, it is possible to just use a spinlock to synchronize, somehow. > --- > drivers/net/tap.c | 44 +++++++++++++++++++++++++++++++++++++++++++- > drivers/net/tun.c | 47 +++++++++++++++++++++++++++++++++++++++++++++-- > 2 files changed, 88 insertions(+), 3 deletions(-) > > diff --git a/drivers/net/tap.c b/drivers/net/tap.c > index 1197f245e873..f8292721a9d6 100644 > --- a/drivers/net/tap.c > +++ b/drivers/net/tap.c > @@ -753,6 +753,46 @@ static ssize_t tap_put_user(struct tap_queue *q, > return ret ? ret : total; > } > > +static struct sk_buff *tap_ring_consume(struct tap_queue *q) > +{ > + struct netdev_queue *txq; > + struct net_device *dev; > + bool will_invalidate; > + bool stopped; > + void *ptr; > + > + spin_lock(&q->ring.consumer_lock); > + ptr = __ptr_ring_peek(&q->ring); > + if (!ptr) { > + spin_unlock(&q->ring.consumer_lock); > + return ptr; > + } > + > + /* Check if the queue stopped before zeroing out, so no ptr get > + * produced in the meantime, because this could result in waking > + * even though the ptr_ring is full. The order of the operations which operations, I don't get it? it's unusual for barrier() to be effective. are you trying to order the read in netif_tx_queue_stopped versus the read in __ptr_ring_discard_one? Then that would seem to need smp_rmb and accordingly smp_wmb in the producing side. > + * is ensured by barrier(). > + */ > + will_invalidate = __ptr_ring_will_invalidate(&q->ring); > + if (unlikely(will_invalidate)) { > + rcu_read_lock(); > + dev = rcu_dereference(q->tap)->dev; > + txq = netdev_get_tx_queue(dev, q->queue_index); > + stopped = netif_tx_queue_stopped(txq); > + } > + barrier(); > + __ptr_ring_discard_one(&q->ring, will_invalidate); > + > + if (unlikely(will_invalidate)) { > + if (stopped) > + netif_tx_wake_queue(txq); > + rcu_read_unlock(); > + } > + spin_unlock(&q->ring.consumer_lock); > + > + return ptr; > +} > + > static ssize_t tap_do_read(struct tap_queue *q, > struct iov_iter *to, > int noblock, struct sk_buff *skb) > @@ -774,7 +814,7 @@ static ssize_t tap_do_read(struct tap_queue *q, > TASK_INTERRUPTIBLE); > > /* Read frames from the queue */ > - skb = ptr_ring_consume(&q->ring); > + skb = tap_ring_consume(q); > if (skb) > break; > if (noblock) { > @@ -1207,6 +1247,8 @@ int tap_queue_resize(struct tap_dev *tap) > ret = ptr_ring_resize_multiple_bh(rings, n, > dev->tx_queue_len, GFP_KERNEL, > __skb_array_destroy_skb); > + if (netif_running(dev)) > + netif_tx_wake_all_queues(dev); > > kfree(rings); > return ret; > diff --git a/drivers/net/tun.c b/drivers/net/tun.c > index c6b22af9bae8..682df8157b55 100644 > --- a/drivers/net/tun.c > +++ b/drivers/net/tun.c > @@ -2114,13 +2114,53 @@ static ssize_t tun_put_user(struct tun_struct *tun, > return total; > } > > +static void *tun_ring_consume(struct tun_file *tfile) > +{ > + struct netdev_queue *txq; > + struct net_device *dev; > + bool will_invalidate; > + bool stopped; > + void *ptr; > + > + spin_lock(&tfile->tx_ring.consumer_lock); > + ptr = __ptr_ring_peek(&tfile->tx_ring); > + if (!ptr) { > + spin_unlock(&tfile->tx_ring.consumer_lock); > + return ptr; > + } > + > + /* Check if the queue stopped before zeroing out, so no ptr get > + * produced in the meantime, because this could result in waking > + * even though the ptr_ring is full. The order of the operations > + * is ensured by barrier(). > + */ > + will_invalidate = __ptr_ring_will_invalidate(&tfile->tx_ring); > + if (unlikely(will_invalidate)) { > + rcu_read_lock(); > + dev = rcu_dereference(tfile->tun)->dev; > + txq = netdev_get_tx_queue(dev, tfile->queue_index); > + stopped = netif_tx_queue_stopped(txq); > + } > + barrier(); > + __ptr_ring_discard_one(&tfile->tx_ring, will_invalidate); > + > + if (unlikely(will_invalidate)) { > + if (stopped) > + netif_tx_wake_queue(txq); > + rcu_read_unlock(); > + } > + spin_unlock(&tfile->tx_ring.consumer_lock); > + > + return ptr; > +} > + > static void *tun_ring_recv(struct tun_file *tfile, int noblock, int *err) > { > DECLARE_WAITQUEUE(wait, current); > void *ptr = NULL; > int error = 0; > > - ptr = ptr_ring_consume(&tfile->tx_ring); > + ptr = tun_ring_consume(tfile); > if (ptr) > goto out; > if (noblock) { > @@ -2132,7 +2172,7 @@ static void *tun_ring_recv(struct tun_file *tfile, int noblock, int *err) > > while (1) { > set_current_state(TASK_INTERRUPTIBLE); > - ptr = ptr_ring_consume(&tfile->tx_ring); > + ptr = tun_ring_consume(tfile); > if (ptr) > break; > if (signal_pending(current)) { > @@ -3621,6 +3661,9 @@ static int tun_queue_resize(struct tun_struct *tun) > dev->tx_queue_len, GFP_KERNEL, > tun_ptr_free); > > + if (netif_running(dev)) > + netif_tx_wake_all_queues(dev); > + > kfree(rings); > return ret; > } > -- > 2.43.0
© 2016 - 2025 Red Hat, Inc.