[PATCH 2/4] netdev queue flow control for TUN

Simon Schippers posted 4 patches 1 month ago
There is a newer version of this series
[PATCH 2/4] netdev queue flow control for TUN
Posted by Simon Schippers 1 month ago
The netdev queue is stopped in tun_net_xmit after inserting an SKB into
the ring buffer if the ring buffer became full because of that. If the
insertion into the ptr_ring fails, the netdev queue is also stopped and
the SKB is dropped. However, this never happened in my testing. To ensure
that the ptr_ring change is available to the consumer before the netdev
queue stop, an smp_wmb() is used.

Then in tun_ring_recv, the new helper wake_netdev_queue is called in the
blocking wait queue and after consuming an SKB from the ptr_ring. This
helper first checks if the netdev queue has stopped. Then with the paired
smp_rmb() it is known that tun_net_xmit will not produce SKBs anymore.
With that knowledge, the helper can then wake the netdev queue if there is
at least a single spare slot in the ptr_ring by calling ptr_ring_spare
with cnt=1.

Co-developed-by: Tim Gebauer <tim.gebauer@tu-dortmund.de>
Signed-off-by: Tim Gebauer <tim.gebauer@tu-dortmund.de>
Signed-off-by: Simon Schippers <simon.schippers@tu-dortmund.de>
---
 drivers/net/tun.c | 33 ++++++++++++++++++++++++++++++---
 1 file changed, 30 insertions(+), 3 deletions(-)

diff --git a/drivers/net/tun.c b/drivers/net/tun.c
index cc6c50180663..735498e221d8 100644
--- a/drivers/net/tun.c
+++ b/drivers/net/tun.c
@@ -1060,13 +1060,21 @@ static netdev_tx_t tun_net_xmit(struct sk_buff *skb, struct net_device *dev)
 
 	nf_reset_ct(skb);
 
-	if (ptr_ring_produce(&tfile->tx_ring, skb)) {
+	queue = netdev_get_tx_queue(dev, txq);
+	if (unlikely(ptr_ring_produce(&tfile->tx_ring, skb))) {
+		/* Paired with smp_rmb() in wake_netdev_queue. */
+		smp_wmb();
+		netif_tx_stop_queue(queue);
 		drop_reason = SKB_DROP_REASON_FULL_RING;
 		goto drop;
 	}
+	if (ptr_ring_full(&tfile->tx_ring)) {
+		/* Paired with smp_rmb() in wake_netdev_queue. */
+		smp_wmb();
+		netif_tx_stop_queue(queue);
+	}
 
 	/* dev->lltx requires to do our own update of trans_start */
-	queue = netdev_get_tx_queue(dev, txq);
 	txq_trans_cond_update(queue);
 
 	/* Notify and wake up reader process */
@@ -2110,6 +2118,24 @@ static ssize_t tun_put_user(struct tun_struct *tun,
 	return total;
 }
 
+static inline void wake_netdev_queue(struct tun_file *tfile)
+{
+	struct netdev_queue *txq;
+	struct net_device *dev;
+
+	rcu_read_lock();
+	dev = rcu_dereference(tfile->tun)->dev;
+	txq = netdev_get_tx_queue(dev, tfile->queue_index);
+
+	if (netif_tx_queue_stopped(txq)) {
+		/* Paired with smp_wmb() in tun_net_xmit. */
+		smp_rmb();
+		if (ptr_ring_spare(&tfile->tx_ring, 1))
+			netif_tx_wake_queue(txq);
+	}
+	rcu_read_unlock();
+}
+
 static void *tun_ring_recv(struct tun_file *tfile, int noblock, int *err)
 {
 	DECLARE_WAITQUEUE(wait, current);
@@ -2139,7 +2165,7 @@ static void *tun_ring_recv(struct tun_file *tfile, int noblock, int *err)
 			error = -EFAULT;
 			break;
 		}
-
+		wake_netdev_queue(tfile);
 		schedule();
 	}
 
@@ -2147,6 +2173,7 @@ static void *tun_ring_recv(struct tun_file *tfile, int noblock, int *err)
 	remove_wait_queue(&tfile->socket.wq.wait, &wait);
 
 out:
+	wake_netdev_queue(tfile);
 	*err = error;
 	return ptr;
 }
-- 
2.43.0
Re: [PATCH 2/4] netdev queue flow control for TUN
Posted by Michael S. Tsirkin 4 weeks, 1 day ago
On Tue, Sep 02, 2025 at 10:09:55AM +0200, Simon Schippers wrote:
> The netdev queue is stopped in tun_net_xmit after inserting an SKB into
> the ring buffer if the ring buffer became full because of that. If the
> insertion into the ptr_ring fails, the netdev queue is also stopped and
> the SKB is dropped. However, this never happened in my testing. To ensure
> that the ptr_ring change is available to the consumer before the netdev
> queue stop, an smp_wmb() is used.

I think the stop -> wake bounce involves enough barriers already,
no need for us to get cute.


> Then in tun_ring_recv, the new helper wake_netdev_queue is called in the
> blocking wait queue and after consuming an SKB from the ptr_ring. This
> helper first checks if the netdev queue has stopped. Then with the paired
> smp_rmb() it is known that tun_net_xmit will not produce SKBs anymore.
> With that knowledge, the helper can then wake the netdev queue if there is
> at least a single spare slot in the ptr_ring by calling ptr_ring_spare
> with cnt=1.
> 
> Co-developed-by: Tim Gebauer <tim.gebauer@tu-dortmund.de>
> Signed-off-by: Tim Gebauer <tim.gebauer@tu-dortmund.de>
> Signed-off-by: Simon Schippers <simon.schippers@tu-dortmund.de>
> ---
>  drivers/net/tun.c | 33 ++++++++++++++++++++++++++++++---
>  1 file changed, 30 insertions(+), 3 deletions(-)
> 
> diff --git a/drivers/net/tun.c b/drivers/net/tun.c
> index cc6c50180663..735498e221d8 100644
> --- a/drivers/net/tun.c
> +++ b/drivers/net/tun.c
> @@ -1060,13 +1060,21 @@ static netdev_tx_t tun_net_xmit(struct sk_buff *skb, struct net_device *dev)
>  
>  	nf_reset_ct(skb);
>  
> -	if (ptr_ring_produce(&tfile->tx_ring, skb)) {
> +	queue = netdev_get_tx_queue(dev, txq);
> +	if (unlikely(ptr_ring_produce(&tfile->tx_ring, skb))) {
> +		/* Paired with smp_rmb() in wake_netdev_queue. */
> +		smp_wmb();
> +		netif_tx_stop_queue(queue);
>  		drop_reason = SKB_DROP_REASON_FULL_RING;
>  		goto drop;
>  	}
> +	if (ptr_ring_full(&tfile->tx_ring)) {
> +		/* Paired with smp_rmb() in wake_netdev_queue. */
> +		smp_wmb();
> +		netif_tx_stop_queue(queue);
> +	}
>  
>  	/* dev->lltx requires to do our own update of trans_start */
> -	queue = netdev_get_tx_queue(dev, txq);
>  	txq_trans_cond_update(queue);
>  
>  	/* Notify and wake up reader process */
> @@ -2110,6 +2118,24 @@ static ssize_t tun_put_user(struct tun_struct *tun,
>  	return total;
>  }
>  
> +static inline void wake_netdev_queue(struct tun_file *tfile)
> +{
> +	struct netdev_queue *txq;
> +	struct net_device *dev;
> +
> +	rcu_read_lock();
> +	dev = rcu_dereference(tfile->tun)->dev;
> +	txq = netdev_get_tx_queue(dev, tfile->queue_index);
> +
> +	if (netif_tx_queue_stopped(txq)) {
> +		/* Paired with smp_wmb() in tun_net_xmit. */
> +		smp_rmb();
> +		if (ptr_ring_spare(&tfile->tx_ring, 1))
> +			netif_tx_wake_queue(txq);
> +	}
> +	rcu_read_unlock();
> +}
> +
>  static void *tun_ring_recv(struct tun_file *tfile, int noblock, int *err)
>  {
>  	DECLARE_WAITQUEUE(wait, current);
> @@ -2139,7 +2165,7 @@ static void *tun_ring_recv(struct tun_file *tfile, int noblock, int *err)
>  			error = -EFAULT;
>  			break;
>  		}
> -
> +		wake_netdev_queue(tfile);
>  		schedule();
>  	}
>  
> @@ -2147,6 +2173,7 @@ static void *tun_ring_recv(struct tun_file *tfile, int noblock, int *err)
>  	remove_wait_queue(&tfile->socket.wq.wait, &wait);
>  
>  out:
> +	wake_netdev_queue(tfile);
>  	*err = error;
>  	return ptr;
>  }
> -- 
> 2.43.0
[PATCH 2/4] netdev queue flow control for TUN
Posted by Simon Schippers 4 weeks, 1 day ago
Michael S. Tsirkin wrote:
> On Tue, Sep 02, 2025 at 10:09:55AM +0200, Simon Schippers wrote:
>> The netdev queue is stopped in tun_net_xmit after inserting an SKB into
>> the ring buffer if the ring buffer became full because of that. If the
>> insertion into the ptr_ring fails, the netdev queue is also stopped and
>> the SKB is dropped. However, this never happened in my testing. To ensure
>> that the ptr_ring change is available to the consumer before the netdev
>> queue stop, an smp_wmb() is used.
> 
> I think the stop -> wake bounce involves enough barriers already,
> no need for us to get cute.
>

Yes, you and Jason are correct, it seems to be unnecessary. I just removed
the barriers, and it tests fine!
Re: [PATCH 2/4] netdev queue flow control for TUN
Posted by Michael S. Tsirkin 4 weeks, 1 day ago
On Tue, Sep 02, 2025 at 10:09:55AM +0200, Simon Schippers wrote:
> The netdev queue is stopped in tun_net_xmit after inserting an SKB into
> the ring buffer if the ring buffer became full because of that. If the
> insertion into the ptr_ring fails, the netdev queue is also stopped and
> the SKB is dropped. However, this never happened in my testing. To ensure
> that the ptr_ring change is available to the consumer before the netdev
> queue stop, an smp_wmb() is used.
> 
> Then in tun_ring_recv, the new helper wake_netdev_queue is called in the
> blocking wait queue and after consuming an SKB from the ptr_ring. This
> helper first checks if the netdev queue has stopped. Then with the paired
> smp_rmb() it is known that tun_net_xmit will not produce SKBs anymore.
> With that knowledge, the helper can then wake the netdev queue if there is
> at least a single spare slot in the ptr_ring by calling ptr_ring_spare
> with cnt=1.
> 
> Co-developed-by: Tim Gebauer <tim.gebauer@tu-dortmund.de>
> Signed-off-by: Tim Gebauer <tim.gebauer@tu-dortmund.de>
> Signed-off-by: Simon Schippers <simon.schippers@tu-dortmund.de>


Oh you just want to know if produce will succeed?
Kind of a version of peek but for producer?

So all this cuteness of looking at the consumer is actually not necessary,
and bad for cache.

You just want this:


Signed-off-by: Michael S. Tsirkin <mst@redhat.com>

diff --git a/include/linux/ptr_ring.h b/include/linux/ptr_ring.h
index 551329220e4f..de25fe81dd4e 100644
--- a/include/linux/ptr_ring.h
+++ b/include/linux/ptr_ring.h
@@ -96,6 +96,14 @@ static inline bool ptr_ring_full_bh(struct ptr_ring *r)
 	return ret;
 }
 
+static inline int __ptr_ring_produce_peek(struct ptr_ring *r)
+{
+	if (unlikely(!r->size) || r->queue[r->producer])
+		return -ENOSPC;
+
+	return 0;
+}
+
 /* Note: callers invoking this in a loop must use a compiler barrier,
  * for example cpu_relax(). Callers must hold producer_lock.
  * Callers are responsible for making sure pointer that is being queued
@@ -103,8 +111,10 @@ static inline bool ptr_ring_full_bh(struct ptr_ring *r)
  */
 static inline int __ptr_ring_produce(struct ptr_ring *r, void *ptr)
 {
-	if (unlikely(!r->size) || r->queue[r->producer])
-		return -ENOSPC;
+	int r = __ptr_ring_produce_peek(r);
+
+	if (r)
+		return r;
 
 	/* Make sure the pointer we are storing points to a valid data. */
 	/* Pairs with the dependency ordering in __ptr_ring_consume. */



Add some docs, and call this, then wake.  No?

-- 
MST
Re: [PATCH 2/4] netdev queue flow control for TUN
Posted by Simon Schippers 4 weeks, 1 day ago
Michael S. Tsirkin wrote:
> On Tue, Sep 02, 2025 at 10:09:55AM +0200, Simon Schippers wrote:
>> The netdev queue is stopped in tun_net_xmit after inserting an SKB into
>> the ring buffer if the ring buffer became full because of that. If the
>> insertion into the ptr_ring fails, the netdev queue is also stopped and
>> the SKB is dropped. However, this never happened in my testing. To ensure
>> that the ptr_ring change is available to the consumer before the netdev
>> queue stop, an smp_wmb() is used.
>>
>> Then in tun_ring_recv, the new helper wake_netdev_queue is called in the
>> blocking wait queue and after consuming an SKB from the ptr_ring. This
>> helper first checks if the netdev queue has stopped. Then with the paired
>> smp_rmb() it is known that tun_net_xmit will not produce SKBs anymore.
>> With that knowledge, the helper can then wake the netdev queue if there is
>> at least a single spare slot in the ptr_ring by calling ptr_ring_spare
>> with cnt=1.
>>
>> Co-developed-by: Tim Gebauer <tim.gebauer@tu-dortmund.de>
>> Signed-off-by: Tim Gebauer <tim.gebauer@tu-dortmund.de>
>> Signed-off-by: Simon Schippers <simon.schippers@tu-dortmund.de>
> 
> 
> Oh you just want to know if produce will succeed?
> Kind of a version of peek but for producer?
> 
> So all this cuteness of looking at the consumer is actually not necessary,
> and bad for cache.
> 
> You just want this:
> 
> 
> Signed-off-by: Michael S. Tsirkin <mst@redhat.com>
> 
> diff --git a/include/linux/ptr_ring.h b/include/linux/ptr_ring.h
> index 551329220e4f..de25fe81dd4e 100644
> --- a/include/linux/ptr_ring.h
> +++ b/include/linux/ptr_ring.h
> @@ -96,6 +96,14 @@ static inline bool ptr_ring_full_bh(struct ptr_ring *r)
>  	return ret;
>  }
>  
> +static inline int __ptr_ring_produce_peek(struct ptr_ring *r)
> +{
> +	if (unlikely(!r->size) || r->queue[r->producer])
> +		return -ENOSPC;
> +
> +	return 0;
> +}
> +
>  /* Note: callers invoking this in a loop must use a compiler barrier,
>   * for example cpu_relax(). Callers must hold producer_lock.
>   * Callers are responsible for making sure pointer that is being queued
> @@ -103,8 +111,10 @@ static inline bool ptr_ring_full_bh(struct ptr_ring *r)
>   */
>  static inline int __ptr_ring_produce(struct ptr_ring *r, void *ptr)
>  {
> -	if (unlikely(!r->size) || r->queue[r->producer])
> -		return -ENOSPC;
> +	int r = __ptr_ring_produce_peek(r);
> +
> +	if (r)
> +		return r;
>  
>  	/* Make sure the pointer we are storing points to a valid data. */
>  	/* Pairs with the dependency ordering in __ptr_ring_consume. */
> 
> 
> 
> Add some docs, and call this, then wake.  No?
>

Yes, this looks great! I like that it does not need any further logic :)
I will just call this method instead of my approach in wake_netdev_queue
without taking any locks. It should be just fine since at this moment it
is known that the producer stopped due to the stopped netdev queue.
Re: [PATCH 2/4] netdev queue flow control for TUN
Posted by Jason Wang 1 month ago
On Tue, Sep 2, 2025 at 4:10 PM Simon Schippers
<simon.schippers@tu-dortmund.de> wrote:
>
> The netdev queue is stopped in tun_net_xmit after inserting an SKB into
> the ring buffer if the ring buffer became full because of that. If the
> insertion into the ptr_ring fails, the netdev queue is also stopped and
> the SKB is dropped. However, this never happened in my testing.

You can reach this by using pktgen on TUN.

> To ensure
> that the ptr_ring change is available to the consumer before the netdev
> queue stop, an smp_wmb() is used.
>
> Then in tun_ring_recv, the new helper wake_netdev_queue is called in the
> blocking wait queue and after consuming an SKB from the ptr_ring. This
> helper first checks if the netdev queue has stopped. Then with the paired
> smp_rmb() it is known that tun_net_xmit will not produce SKBs anymore.
> With that knowledge, the helper can then wake the netdev queue if there is
> at least a single spare slot in the ptr_ring by calling ptr_ring_spare
> with cnt=1.
>
> Co-developed-by: Tim Gebauer <tim.gebauer@tu-dortmund.de>
> Signed-off-by: Tim Gebauer <tim.gebauer@tu-dortmund.de>
> Signed-off-by: Simon Schippers <simon.schippers@tu-dortmund.de>
> ---
>  drivers/net/tun.c | 33 ++++++++++++++++++++++++++++++---
>  1 file changed, 30 insertions(+), 3 deletions(-)
>
> diff --git a/drivers/net/tun.c b/drivers/net/tun.c
> index cc6c50180663..735498e221d8 100644
> --- a/drivers/net/tun.c
> +++ b/drivers/net/tun.c
> @@ -1060,13 +1060,21 @@ static netdev_tx_t tun_net_xmit(struct sk_buff *skb, struct net_device *dev)
>
>         nf_reset_ct(skb);
>
> -       if (ptr_ring_produce(&tfile->tx_ring, skb)) {
> +       queue = netdev_get_tx_queue(dev, txq);
> +       if (unlikely(ptr_ring_produce(&tfile->tx_ring, skb))) {
> +               /* Paired with smp_rmb() in wake_netdev_queue. */
> +               smp_wmb();
> +               netif_tx_stop_queue(queue);

The barrier looks odd since it requires the driver to care about the
ordering, can you elaborate more on this?

There's a WRITE_ONCE + mb() in netif_tx_stop_queue already:

static __always_inline void netif_tx_stop_queue(struct netdev_queue *dev_queue)
{
        /* Paired with READ_ONCE() from dev_watchdog() */
        WRITE_ONCE(dev_queue->trans_start, jiffies);

        /* This barrier is paired with smp_mb() from dev_watchdog() */
        smp_mb__before_atomic();

        /* Must be an atomic op see netif_txq_try_stop() */
        set_bit(__QUEUE_STATE_DRV_XOFF, &dev_queue->state);
}

>                 drop_reason = SKB_DROP_REASON_FULL_RING;
>                 goto drop;
>         }
> +       if (ptr_ring_full(&tfile->tx_ring)) {
> +               /* Paired with smp_rmb() in wake_netdev_queue. */
> +               smp_wmb();
> +               netif_tx_stop_queue(queue);
> +       }
>
>         /* dev->lltx requires to do our own update of trans_start */
> -       queue = netdev_get_tx_queue(dev, txq);
>         txq_trans_cond_update(queue);
>
>         /* Notify and wake up reader process */
> @@ -2110,6 +2118,24 @@ static ssize_t tun_put_user(struct tun_struct *tun,
>         return total;
>  }
>
> +static inline void wake_netdev_queue(struct tun_file *tfile)

Let's rename this to tun_wake_xxx.

> +{
> +       struct netdev_queue *txq;
> +       struct net_device *dev;
> +
> +       rcu_read_lock();
> +       dev = rcu_dereference(tfile->tun)->dev;
> +       txq = netdev_get_tx_queue(dev, tfile->queue_index);
> +
> +       if (netif_tx_queue_stopped(txq)) {
> +               /* Paired with smp_wmb() in tun_net_xmit. */
> +               smp_rmb();
> +               if (ptr_ring_spare(&tfile->tx_ring, 1))

I wonder if there would be a case that will use cnt > 1. If not a
ptr_ring_can_produce() should be sufficient.

> +                       netif_tx_wake_queue(txq);
> +       }
> +       rcu_read_unlock();
> +}
> +
>  static void *tun_ring_recv(struct tun_file *tfile, int noblock, int *err)
>  {
>         DECLARE_WAITQUEUE(wait, current);
> @@ -2139,7 +2165,7 @@ static void *tun_ring_recv(struct tun_file *tfile, int noblock, int *err)
>                         error = -EFAULT;
>                         break;
>                 }
> -
> +               wake_netdev_queue(tfile);
>                 schedule();
>         }
>
> @@ -2147,6 +2173,7 @@ static void *tun_ring_recv(struct tun_file *tfile, int noblock, int *err)
>         remove_wait_queue(&tfile->socket.wq.wait, &wait);
>
>  out:
> +       wake_netdev_queue(tfile);
>         *err = error;
>         return ptr;
>  }
> --
> 2.43.0
>

Thanks
[PATCH 2/4] netdev queue flow control for TUN
Posted by Simon Schippers 4 weeks, 1 day ago
Jason Wang wrote:
> On Tue, Sep 2, 2025 at 4:10 PM Simon Schippers
> <simon.schippers@tu-dortmund.de> wrote:
>>
>> The netdev queue is stopped in tun_net_xmit after inserting an SKB into
>> the ring buffer if the ring buffer became full because of that. If the
>> insertion into the ptr_ring fails, the netdev queue is also stopped and
>> the SKB is dropped. However, this never happened in my testing.
> 
> You can reach this by using pktgen on TUN.
> 

Yes, and I think it could also be reached after ptr_ring_unconsume is
called in vhost_net.

>> To ensure
>> that the ptr_ring change is available to the consumer before the netdev
>> queue stop, an smp_wmb() is used.
>>
>> Then in tun_ring_recv, the new helper wake_netdev_queue is called in the
>> blocking wait queue and after consuming an SKB from the ptr_ring. This
>> helper first checks if the netdev queue has stopped. Then with the paired
>> smp_rmb() it is known that tun_net_xmit will not produce SKBs anymore.
>> With that knowledge, the helper can then wake the netdev queue if there is
>> at least a single spare slot in the ptr_ring by calling ptr_ring_spare
>> with cnt=1.
>>
>> Co-developed-by: Tim Gebauer <tim.gebauer@tu-dortmund.de>
>> Signed-off-by: Tim Gebauer <tim.gebauer@tu-dortmund.de>
>> Signed-off-by: Simon Schippers <simon.schippers@tu-dortmund.de>
>> ---
>>  drivers/net/tun.c | 33 ++++++++++++++++++++++++++++++---
>>  1 file changed, 30 insertions(+), 3 deletions(-)
>>
>> diff --git a/drivers/net/tun.c b/drivers/net/tun.c
>> index cc6c50180663..735498e221d8 100644
>> --- a/drivers/net/tun.c
>> +++ b/drivers/net/tun.c
>> @@ -1060,13 +1060,21 @@ static netdev_tx_t tun_net_xmit(struct sk_buff *skb, struct net_device *dev)
>>
>>         nf_reset_ct(skb);
>>
>> -       if (ptr_ring_produce(&tfile->tx_ring, skb)) {
>> +       queue = netdev_get_tx_queue(dev, txq);
>> +       if (unlikely(ptr_ring_produce(&tfile->tx_ring, skb))) {
>> +               /* Paired with smp_rmb() in wake_netdev_queue. */
>> +               smp_wmb();
>> +               netif_tx_stop_queue(queue);
> 
> The barrier looks odd since it requires the driver to care about the
> ordering, can you elaborate more on this?
> 
> There's a WRITE_ONCE + mb() in netif_tx_stop_queue already:
> 
> static __always_inline void netif_tx_stop_queue(struct netdev_queue *dev_queue)
> {
>         /* Paired with READ_ONCE() from dev_watchdog() */
>         WRITE_ONCE(dev_queue->trans_start, jiffies);
> 
>         /* This barrier is paired with smp_mb() from dev_watchdog() */
>         smp_mb__before_atomic();
> 
>         /* Must be an atomic op see netif_txq_try_stop() */
>         set_bit(__QUEUE_STATE_DRV_XOFF, &dev_queue->state);
> }
> 
>>                 drop_reason = SKB_DROP_REASON_FULL_RING;
>>                 goto drop;
>>         }
>> +       if (ptr_ring_full(&tfile->tx_ring)) {
>> +               /* Paired with smp_rmb() in wake_netdev_queue. */
>> +               smp_wmb();
>> +               netif_tx_stop_queue(queue);
>> +       }
>>
>>         /* dev->lltx requires to do our own update of trans_start */
>> -       queue = netdev_get_tx_queue(dev, txq);
>>         txq_trans_cond_update(queue);
>>
>>         /* Notify and wake up reader process */
>> @@ -2110,6 +2118,24 @@ static ssize_t tun_put_user(struct tun_struct *tun,
>>         return total;
>>  }
>>
>> +static inline void wake_netdev_queue(struct tun_file *tfile)
> 
> Let's rename this to tun_wake_xxx.
>

Okay. I will rename wake_netdev_queue to tun_wake_netdev_queue and
tap_wake_netdev_queue respectively and remove inline. Then vhost_net just
calls these methods in vhost_net_buf_produce with file->private_data.

>> +{
>> +       struct netdev_queue *txq;
>> +       struct net_device *dev;
>> +
>> +       rcu_read_lock();
>> +       dev = rcu_dereference(tfile->tun)->dev;
>> +       txq = netdev_get_tx_queue(dev, tfile->queue_index);
>> +
>> +       if (netif_tx_queue_stopped(txq)) {
>> +               /* Paired with smp_wmb() in tun_net_xmit. */
>> +               smp_rmb();
>> +               if (ptr_ring_spare(&tfile->tx_ring, 1))
> 
> I wonder if there would be a case that will use cnt > 1. If not a
> ptr_ring_can_produce() should be sufficient.
> 
>> +                       netif_tx_wake_queue(txq);
>> +       }
>> +       rcu_read_unlock();
>> +}
>> +
>>  static void *tun_ring_recv(struct tun_file *tfile, int noblock, int *err)
>>  {
>>         DECLARE_WAITQUEUE(wait, current);
>> @@ -2139,7 +2165,7 @@ static void *tun_ring_recv(struct tun_file *tfile, int noblock, int *err)
>>                         error = -EFAULT;
>>                         break;
>>                 }
>> -
>> +               wake_netdev_queue(tfile);
>>                 schedule();
>>         }
>>
>> @@ -2147,6 +2173,7 @@ static void *tun_ring_recv(struct tun_file *tfile, int noblock, int *err)
>>         remove_wait_queue(&tfile->socket.wq.wait, &wait);
>>
>>  out:
>> +       wake_netdev_queue(tfile);
>>         *err = error;
>>         return ptr;
>>  }
>> --
>> 2.43.0
>>
> 
> Thanks
> 
Re: [PATCH 2/4] netdev queue flow control for TUN
Posted by Willem de Bruijn 1 month ago
Simon Schippers wrote:
> The netdev queue is stopped in tun_net_xmit after inserting an SKB into
> the ring buffer if the ring buffer became full because of that. If the
> insertion into the ptr_ring fails, the netdev queue is also stopped and
> the SKB is dropped. However, this never happened in my testing. To ensure
> that the ptr_ring change is available to the consumer before the netdev
> queue stop, an smp_wmb() is used.
> 
> Then in tun_ring_recv, the new helper wake_netdev_queue is called in the
> blocking wait queue and after consuming an SKB from the ptr_ring. This
> helper first checks if the netdev queue has stopped. Then with the paired
> smp_rmb() it is known that tun_net_xmit will not produce SKBs anymore.
> With that knowledge, the helper can then wake the netdev queue if there is
> at least a single spare slot in the ptr_ring by calling ptr_ring_spare
> with cnt=1.
> 
> Co-developed-by: Tim Gebauer <tim.gebauer@tu-dortmund.de>
> Signed-off-by: Tim Gebauer <tim.gebauer@tu-dortmund.de>
> Signed-off-by: Simon Schippers <simon.schippers@tu-dortmund.de>
> ---
>  drivers/net/tun.c | 33 ++++++++++++++++++++++++++++++---
>  1 file changed, 30 insertions(+), 3 deletions(-)
> 
> diff --git a/drivers/net/tun.c b/drivers/net/tun.c
> index cc6c50180663..735498e221d8 100644
> --- a/drivers/net/tun.c
> +++ b/drivers/net/tun.c
> @@ -1060,13 +1060,21 @@ static netdev_tx_t tun_net_xmit(struct sk_buff *skb, struct net_device *dev)
>  
>  	nf_reset_ct(skb);
>  
> -	if (ptr_ring_produce(&tfile->tx_ring, skb)) {
> +	queue = netdev_get_tx_queue(dev, txq);
> +	if (unlikely(ptr_ring_produce(&tfile->tx_ring, skb))) {
> +		/* Paired with smp_rmb() in wake_netdev_queue. */
> +		smp_wmb();
> +		netif_tx_stop_queue(queue);
>  		drop_reason = SKB_DROP_REASON_FULL_RING;
>  		goto drop;
>  	}
> +	if (ptr_ring_full(&tfile->tx_ring)) {
> +		/* Paired with smp_rmb() in wake_netdev_queue. */
> +		smp_wmb();
> +		netif_tx_stop_queue(queue);
> +	}
>  
>  	/* dev->lltx requires to do our own update of trans_start */
> -	queue = netdev_get_tx_queue(dev, txq);
>  	txq_trans_cond_update(queue);
>  
>  	/* Notify and wake up reader process */
> @@ -2110,6 +2118,24 @@ static ssize_t tun_put_user(struct tun_struct *tun,
>  	return total;
>  }
>  
> +static inline void wake_netdev_queue(struct tun_file *tfile)

no inline keyword in .c files, let the compiler decide.

> +{
> +	struct netdev_queue *txq;
> +	struct net_device *dev;
> +
> +	rcu_read_lock();
> +	dev = rcu_dereference(tfile->tun)->dev;
> +	txq = netdev_get_tx_queue(dev, tfile->queue_index);
> +
> +	if (netif_tx_queue_stopped(txq)) {
> +		/* Paired with smp_wmb() in tun_net_xmit. */
> +		smp_rmb();
> +		if (ptr_ring_spare(&tfile->tx_ring, 1))
> +			netif_tx_wake_queue(txq);
> +	}
> +	rcu_read_unlock();
> +}
Re: [PATCH 2/4] netdev queue flow control for TUN
Posted by Willem de Bruijn 1 month ago
Simon Schippers wrote:
> The netdev queue is stopped in tun_net_xmit after inserting an SKB into
> the ring buffer if the ring buffer became full because of that. If the
> insertion into the ptr_ring fails, the netdev queue is also stopped and
> the SKB is dropped. However, this never happened in my testing.

Indeed, since the last successful insertion will always pause the
queue before this can happen. Since this cannot be reached, no need
to add the code defensively. If in doubt, maybe add a
NET_DEBUG_WARN_ON_ONCE.

> To ensure
> that the ptr_ring change is available to the consumer before the netdev
> queue stop, an smp_wmb() is used.
> 
> Then in tun_ring_recv, the new helper wake_netdev_queue is called in the
> blocking wait queue and after consuming an SKB from the ptr_ring. This
> helper first checks if the netdev queue has stopped. Then with the paired
> smp_rmb() it is known that tun_net_xmit will not produce SKBs anymore.
> With that knowledge, the helper can then wake the netdev queue if there is
> at least a single spare slot in the ptr_ring by calling ptr_ring_spare
> with cnt=1.
> 
> Co-developed-by: Tim Gebauer <tim.gebauer@tu-dortmund.de>
> Signed-off-by: Tim Gebauer <tim.gebauer@tu-dortmund.de>
> Signed-off-by: Simon Schippers <simon.schippers@tu-dortmund.de>
> ---
>  drivers/net/tun.c | 33 ++++++++++++++++++++++++++++++---
>  1 file changed, 30 insertions(+), 3 deletions(-)
> 
> diff --git a/drivers/net/tun.c b/drivers/net/tun.c
> index cc6c50180663..735498e221d8 100644
> --- a/drivers/net/tun.c
> +++ b/drivers/net/tun.c
> @@ -1060,13 +1060,21 @@ static netdev_tx_t tun_net_xmit(struct sk_buff *skb, struct net_device *dev)
>  
>  	nf_reset_ct(skb);
>  
> -	if (ptr_ring_produce(&tfile->tx_ring, skb)) {
> +	queue = netdev_get_tx_queue(dev, txq);
> +	if (unlikely(ptr_ring_produce(&tfile->tx_ring, skb))) {
> +		/* Paired with smp_rmb() in wake_netdev_queue. */
> +		smp_wmb();
> +		netif_tx_stop_queue(queue);
>  		drop_reason = SKB_DROP_REASON_FULL_RING;
>  		goto drop;
>  	}
> +	if (ptr_ring_full(&tfile->tx_ring)) {
> +		/* Paired with smp_rmb() in wake_netdev_queue. */
> +		smp_wmb();
> +		netif_tx_stop_queue(queue);
> +	}
>  
>  	/* dev->lltx requires to do our own update of trans_start */
> -	queue = netdev_get_tx_queue(dev, txq);
>  	txq_trans_cond_update(queue);
>  
>  	/* Notify and wake up reader process */
> @@ -2110,6 +2118,24 @@ static ssize_t tun_put_user(struct tun_struct *tun,
>  	return total;
>  }
>  
> +static inline void wake_netdev_queue(struct tun_file *tfile)
> +{
> +	struct netdev_queue *txq;
> +	struct net_device *dev;
> +
> +	rcu_read_lock();
> +	dev = rcu_dereference(tfile->tun)->dev;
> +	txq = netdev_get_tx_queue(dev, tfile->queue_index);
> +
> +	if (netif_tx_queue_stopped(txq)) {
> +		/* Paired with smp_wmb() in tun_net_xmit. */
> +		smp_rmb();
> +		if (ptr_ring_spare(&tfile->tx_ring, 1))
> +			netif_tx_wake_queue(txq);
> +	}
> +	rcu_read_unlock();
> +}
> +
>  static void *tun_ring_recv(struct tun_file *tfile, int noblock, int *err)
>  {
>  	DECLARE_WAITQUEUE(wait, current);
> @@ -2139,7 +2165,7 @@ static void *tun_ring_recv(struct tun_file *tfile, int noblock, int *err)
>  			error = -EFAULT;
>  			break;
>  		}
> -
> +		wake_netdev_queue(tfile);

Why wake when no entry was consumed?

Also keep the empty line.

>  		schedule();
>  	}
>  
> @@ -2147,6 +2173,7 @@ static void *tun_ring_recv(struct tun_file *tfile, int noblock, int *err)
>  	remove_wait_queue(&tfile->socket.wq.wait, &wait);
>  
>  out:
> +	wake_netdev_queue(tfile);
>  	*err = error;
>  	return ptr;
>  }
> -- 
> 2.43.0
>
[PATCH 2/4] netdev queue flow control for TUN
Posted by Simon Schippers 4 weeks, 1 day ago
Willem de Bruijn wrote:
> Simon Schippers wrote:
>> The netdev queue is stopped in tun_net_xmit after inserting an SKB into
>> the ring buffer if the ring buffer became full because of that. If the
>> insertion into the ptr_ring fails, the netdev queue is also stopped and
>> the SKB is dropped. However, this never happened in my testing.
> 
> Indeed, since the last successful insertion will always pause the
> queue before this can happen. Since this cannot be reached, no need
> to add the code defensively. If in doubt, maybe add a
> NET_DEBUG_WARN_ON_ONCE.
> 
>> To ensure
>> that the ptr_ring change is available to the consumer before the netdev
>> queue stop, an smp_wmb() is used.
>>
>> Then in tun_ring_recv, the new helper wake_netdev_queue is called in the
>> blocking wait queue and after consuming an SKB from the ptr_ring. This
>> helper first checks if the netdev queue has stopped. Then with the paired
>> smp_rmb() it is known that tun_net_xmit will not produce SKBs anymore.
>> With that knowledge, the helper can then wake the netdev queue if there is
>> at least a single spare slot in the ptr_ring by calling ptr_ring_spare
>> with cnt=1.
>>
>> Co-developed-by: Tim Gebauer <tim.gebauer@tu-dortmund.de>
>> Signed-off-by: Tim Gebauer <tim.gebauer@tu-dortmund.de>
>> Signed-off-by: Simon Schippers <simon.schippers@tu-dortmund.de>
>> ---
>>  drivers/net/tun.c | 33 ++++++++++++++++++++++++++++++---
>>  1 file changed, 30 insertions(+), 3 deletions(-)
>>
>> diff --git a/drivers/net/tun.c b/drivers/net/tun.c
>> index cc6c50180663..735498e221d8 100644
>> --- a/drivers/net/tun.c
>> +++ b/drivers/net/tun.c
>> @@ -1060,13 +1060,21 @@ static netdev_tx_t tun_net_xmit(struct sk_buff *skb, struct net_device *dev)
>>  
>>  	nf_reset_ct(skb);
>>  
>> -	if (ptr_ring_produce(&tfile->tx_ring, skb)) {
>> +	queue = netdev_get_tx_queue(dev, txq);
>> +	if (unlikely(ptr_ring_produce(&tfile->tx_ring, skb))) {
>> +		/* Paired with smp_rmb() in wake_netdev_queue. */
>> +		smp_wmb();
>> +		netif_tx_stop_queue(queue);
>>  		drop_reason = SKB_DROP_REASON_FULL_RING;
>>  		goto drop;
>>  	}
>> +	if (ptr_ring_full(&tfile->tx_ring)) {
>> +		/* Paired with smp_rmb() in wake_netdev_queue. */
>> +		smp_wmb();
>> +		netif_tx_stop_queue(queue);
>> +	}
>>  
>>  	/* dev->lltx requires to do our own update of trans_start */
>> -	queue = netdev_get_tx_queue(dev, txq);
>>  	txq_trans_cond_update(queue);
>>  
>>  	/* Notify and wake up reader process */
>> @@ -2110,6 +2118,24 @@ static ssize_t tun_put_user(struct tun_struct *tun,
>>  	return total;
>>  }
>>  
>> +static inline void wake_netdev_queue(struct tun_file *tfile)
>> +{
>> +	struct netdev_queue *txq;
>> +	struct net_device *dev;
>> +
>> +	rcu_read_lock();
>> +	dev = rcu_dereference(tfile->tun)->dev;
>> +	txq = netdev_get_tx_queue(dev, tfile->queue_index);
>> +
>> +	if (netif_tx_queue_stopped(txq)) {
>> +		/* Paired with smp_wmb() in tun_net_xmit. */
>> +		smp_rmb();
>> +		if (ptr_ring_spare(&tfile->tx_ring, 1))
>> +			netif_tx_wake_queue(txq);
>> +	}
>> +	rcu_read_unlock();
>> +}
>> +
>>  static void *tun_ring_recv(struct tun_file *tfile, int noblock, int *err)
>>  {
>>  	DECLARE_WAITQUEUE(wait, current);
>> @@ -2139,7 +2165,7 @@ static void *tun_ring_recv(struct tun_file *tfile, int noblock, int *err)
>>  			error = -EFAULT;
>>  			break;
>>  		}
>> -
>> +		wake_netdev_queue(tfile);
> 
> Why wake when no entry was consumed?

I do it because the queue may not have been woken the last time after
consuming an SKB. However, I am not sure if it is still absolutely
necessary after all the changes in the code. Still, I think it is wise to
do it to avoid being stuck in the wait queue under any circumstances.

> 
> Also keep the empty line.
>

Okay :)

>>  		schedule();
>>  	}
>>  
>> @@ -2147,6 +2173,7 @@ static void *tun_ring_recv(struct tun_file *tfile, int noblock, int *err)
>>  	remove_wait_queue(&tfile->socket.wq.wait, &wait);
>>  
>>  out:
>> +	wake_netdev_queue(tfile);
>>  	*err = error;
>>  	return ptr;
>>  }
>> -- 
>> 2.43.0
>>
> 
>