Add a new buffer type (to `enum macb_tx_buff_type`). Near the end of
macb_tx_complete(), we go and read the XSK buffers using
xsk_tx_peek_release_desc_batch() and append those buffers to our Tx
ring.
Additionally, in macb_tx_complete(), we signal to the XSK subsystem
number of bytes completed and conditionally mark the need_wakeup
flag.
Lastly, we update XSK wakeup by writing the TCOMP bit in the per-queue
IMR register, to ensure NAPI scheduling will take place.
Signed-off-by: Théo Lebrun <theo.lebrun@bootlin.com>
---
drivers/net/ethernet/cadence/macb.h | 1 +
drivers/net/ethernet/cadence/macb_main.c | 91 +++++++++++++++++++++++++++++---
2 files changed, 86 insertions(+), 6 deletions(-)
diff --git a/drivers/net/ethernet/cadence/macb.h b/drivers/net/ethernet/cadence/macb.h
index a9e6f0289ecb..5700a285c08a 100644
--- a/drivers/net/ethernet/cadence/macb.h
+++ b/drivers/net/ethernet/cadence/macb.h
@@ -963,6 +963,7 @@ enum macb_tx_buff_type {
MACB_TYPE_SKB,
MACB_TYPE_XDP_TX,
MACB_TYPE_XDP_NDO,
+ MACB_TYPE_XSK,
};
/* struct macb_tx_buff - data about an skb or xdp frame which is being
diff --git a/drivers/net/ethernet/cadence/macb_main.c b/drivers/net/ethernet/cadence/macb_main.c
index ea1b0b8c4fab..fee1ebadcf20 100644
--- a/drivers/net/ethernet/cadence/macb_main.c
+++ b/drivers/net/ethernet/cadence/macb_main.c
@@ -986,21 +986,30 @@ static int macb_halt_tx(struct macb *bp)
static void macb_tx_release_buff(void *buff, enum macb_tx_buff_type type, int budget)
{
- if (type == MACB_TYPE_SKB) {
+ switch (type) {
+ case MACB_TYPE_SKB:
napi_consume_skb(buff, budget);
- } else if (type == MACB_TYPE_XDP_TX) {
- if (!budget)
- xdp_return_frame(buff);
- else
+ break;
+ case MACB_TYPE_XDP_TX:
+ if (budget)
xdp_return_frame_rx_napi(buff);
- } else {
+ else
+ xdp_return_frame(buff);
+ break;
+ case MACB_TYPE_XDP_NDO:
xdp_return_frame(buff);
+ break;
+ case MACB_TYPE_XSK:
+ break;
}
}
static void macb_tx_unmap(struct macb *bp, struct macb_tx_buff *tx_buff,
int budget)
{
+ if (tx_buff->type == MACB_TYPE_XSK)
+ return;
+
if (tx_buff->mapping) {
if (tx_buff->mapped_as_page)
dma_unmap_page(&bp->pdev->dev, tx_buff->mapping,
@@ -1255,6 +1264,57 @@ static void macb_xdp_submit_buff(struct macb *bp, unsigned int queue_index,
netif_stop_subqueue(netdev, queue_index);
}
+static void macb_xdp_xmit_zc(struct macb *bp, unsigned int queue_index, int budget)
+{
+ struct macb_queue *queue = &bp->queues[queue_index];
+ struct xsk_buff_pool *xsk = queue->xsk_pool;
+ dma_addr_t mapping;
+ u32 slot_available;
+ size_t bytes = 0;
+ u32 batch;
+
+ guard(spinlock_irqsave)(&queue->tx_ptr_lock);
+
+ /* This is a hard error, log it. */
+ slot_available = CIRC_SPACE(queue->tx_head, queue->tx_tail, bp->tx_ring_size);
+ if (slot_available < 1) {
+ netif_stop_subqueue(bp->dev, queue_index);
+ netdev_dbg(bp->dev, "tx_head = %u, tx_tail = %u\n",
+ queue->tx_head, queue->tx_tail);
+ return;
+ }
+
+ batch = min_t(u32, slot_available, budget);
+ batch = xsk_tx_peek_release_desc_batch(xsk, batch);
+ if (!batch)
+ return;
+
+ for (u32 i = 0; i < batch; i++) {
+ struct xdp_desc *desc = &xsk->tx_descs[i];
+
+ mapping = xsk_buff_raw_get_dma(xsk, desc->addr);
+ xsk_buff_raw_dma_sync_for_device(xsk, mapping, desc->len);
+
+ macb_xdp_submit_buff(bp, queue_index, (struct macb_tx_buff){
+ .ptr = NULL,
+ .mapping = mapping,
+ .size = desc->len,
+ .mapped_as_page = false,
+ .type = MACB_TYPE_XSK,
+ });
+
+ bytes += desc->len;
+ }
+
+ /* Make newly initialized descriptor visible to hardware */
+ wmb();
+ spin_lock(&bp->lock);
+ macb_writel(bp, NCR, macb_readl(bp, NCR) | MACB_BIT(TSTART));
+ spin_unlock(&bp->lock);
+
+ netdev_tx_sent_queue(netdev_get_tx_queue(bp->dev, queue_index), bytes);
+}
+
static int macb_tx_complete(struct macb_queue *queue, int budget)
{
struct macb *bp = queue->bp;
@@ -1316,6 +1376,11 @@ static int macb_tx_complete(struct macb_queue *queue, int budget)
case MACB_TYPE_XDP_NDO:
bytes += tx_buff->size;
break;
+
+ case MACB_TYPE_XSK:
+ bytes += tx_buff->size;
+ xsk_frames++;
+ break;
}
packets++;
@@ -1337,6 +1402,16 @@ static int macb_tx_complete(struct macb_queue *queue, int budget)
netif_wake_subqueue(bp->dev, queue_index);
spin_unlock_irqrestore(&queue->tx_ptr_lock, flags);
+ if (queue->xsk_pool) {
+ if (xsk_frames)
+ xsk_tx_completed(queue->xsk_pool, xsk_frames);
+
+ if (xsk_uses_need_wakeup(queue->xsk_pool))
+ xsk_set_tx_need_wakeup(queue->xsk_pool);
+
+ macb_xdp_xmit_zc(bp, queue_index, budget);
+ }
+
return packets;
}
@@ -1616,6 +1691,10 @@ static int gem_xsk_wakeup(struct net_device *dev, u32 qid, u32 flags)
!napi_if_scheduled_mark_missed(&queue->napi_rx))
irqs |= MACB_BIT(RCOMP);
+ if ((flags & XDP_WAKEUP_TX) &&
+ !napi_if_scheduled_mark_missed(&queue->napi_tx))
+ irqs |= MACB_BIT(TCOMP);
+
if (irqs)
queue_writel(queue, IMR, irqs);
--
2.53.0
Hi Théo,
On 04/03/2026 19:24, Théo Lebrun wrote:
> Add a new buffer type (to `enum macb_tx_buff_type`). Near the end of
> macb_tx_complete(), we go and read the XSK buffers using
> xsk_tx_peek_release_desc_batch() and append those buffers to our Tx
> ring.
>
> Additionally, in macb_tx_complete(), we signal to the XSK subsystem
> number of bytes completed and conditionally mark the need_wakeup
> flag.
>
> Lastly, we update XSK wakeup by writing the TCOMP bit in the per-queue
> IMR register, to ensure NAPI scheduling will take place.
>
> Signed-off-by: Théo Lebrun <theo.lebrun@bootlin.com>
> ---
[...]
> +static void macb_xdp_xmit_zc(struct macb *bp, unsigned int queue_index, int budget)
> +{
> + struct macb_queue *queue = &bp->queues[queue_index];
> + struct xsk_buff_pool *xsk = queue->xsk_pool;
> + dma_addr_t mapping;
> + u32 slot_available;
> + size_t bytes = 0;
> + u32 batch;
> +
> + guard(spinlock_irqsave)(&queue->tx_ptr_lock);
> +
> + /* This is a hard error, log it. */
> + slot_available = CIRC_SPACE(queue->tx_head, queue->tx_tail, bp->tx_ring_size);
> + if (slot_available < 1) {
> + netif_stop_subqueue(bp->dev, queue_index);
> + netdev_dbg(bp->dev, "tx_head = %u, tx_tail = %u\n",
> + queue->tx_head, queue->tx_tail);
> + return;
> + }
> +
> + batch = min_t(u32, slot_available, budget);
> + batch = xsk_tx_peek_release_desc_batch(xsk, batch);
> + if (!batch)
> + return;
> +
> + for (u32 i = 0; i < batch; i++) {
> + struct xdp_desc *desc = &xsk->tx_descs[i];
> +
> + mapping = xsk_buff_raw_get_dma(xsk, desc->addr);
> + xsk_buff_raw_dma_sync_for_device(xsk, mapping, desc->len);
> +
> + macb_xdp_submit_buff(bp, queue_index, (struct macb_tx_buff){
> + .ptr = NULL,
> + .mapping = mapping,
> + .size = desc->len,
> + .mapped_as_page = false,
> + .type = MACB_TYPE_XSK,
> + });
> +
> + bytes += desc->len;
> + }
> +
> + /* Make newly initialized descriptor visible to hardware */
> + wmb();
> + spin_lock(&bp->lock);
> + macb_writel(bp, NCR, macb_readl(bp, NCR) | MACB_BIT(TSTART));
> + spin_unlock(&bp->lock);
this lock is also taken in interrupt context, this should probably use a
irqsave/restore variant. Now, there are a few other parts of this driver
that use a plain spin_lock() call and except for the paths that actually
run in interrupt context, they don't seem correct to me :(
Maxime
Hello!
On Fri Mar 6, 2026 at 1:48 PM CET, Maxime Chevallier wrote:
> On 04/03/2026 19:24, Théo Lebrun wrote:
>> Add a new buffer type (to `enum macb_tx_buff_type`). Near the end of
>> macb_tx_complete(), we go and read the XSK buffers using
>> xsk_tx_peek_release_desc_batch() and append those buffers to our Tx
>> ring.
>>
>> Additionally, in macb_tx_complete(), we signal to the XSK subsystem
>> number of bytes completed and conditionally mark the need_wakeup
>> flag.
>>
>> Lastly, we update XSK wakeup by writing the TCOMP bit in the per-queue
>> IMR register, to ensure NAPI scheduling will take place.
>>
>> Signed-off-by: Théo Lebrun <theo.lebrun@bootlin.com>
>> ---
>
> [...]
>
>> +static void macb_xdp_xmit_zc(struct macb *bp, unsigned int queue_index, int budget)
>> +{
>> + struct macb_queue *queue = &bp->queues[queue_index];
>> + struct xsk_buff_pool *xsk = queue->xsk_pool;
>> + dma_addr_t mapping;
>> + u32 slot_available;
>> + size_t bytes = 0;
>> + u32 batch;
>> +
>> + guard(spinlock_irqsave)(&queue->tx_ptr_lock);
>> +
>> + /* This is a hard error, log it. */
>> + slot_available = CIRC_SPACE(queue->tx_head, queue->tx_tail, bp->tx_ring_size);
>> + if (slot_available < 1) {
>> + netif_stop_subqueue(bp->dev, queue_index);
>> + netdev_dbg(bp->dev, "tx_head = %u, tx_tail = %u\n",
>> + queue->tx_head, queue->tx_tail);
>> + return;
>> + }
>> +
>> + batch = min_t(u32, slot_available, budget);
>> + batch = xsk_tx_peek_release_desc_batch(xsk, batch);
>> + if (!batch)
>> + return;
>> +
>> + for (u32 i = 0; i < batch; i++) {
>> + struct xdp_desc *desc = &xsk->tx_descs[i];
>> +
>> + mapping = xsk_buff_raw_get_dma(xsk, desc->addr);
>> + xsk_buff_raw_dma_sync_for_device(xsk, mapping, desc->len);
>> +
>> + macb_xdp_submit_buff(bp, queue_index, (struct macb_tx_buff){
>> + .ptr = NULL,
>> + .mapping = mapping,
>> + .size = desc->len,
>> + .mapped_as_page = false,
>> + .type = MACB_TYPE_XSK,
>> + });
>> +
>> + bytes += desc->len;
>> + }
>> +
>> + /* Make newly initialized descriptor visible to hardware */
>> + wmb();
>> + spin_lock(&bp->lock);
>> + macb_writel(bp, NCR, macb_readl(bp, NCR) | MACB_BIT(TSTART));
>> + spin_unlock(&bp->lock);
>
> this lock is also taken in interrupt context, this should probably use a
> irqsave/restore variant. Now, there are a few other parts of this driver
> that use a plain spin_lock() call and except for the paths that actually
> run in interrupt context, they don't seem correct to me :(
I almost sent a reply agreeing with you, but actually here is the
exhaustive `spin_lock(&bp->lock)` list:
# Function Context
------------------------------------------
1 gem_wol_interrupt() irq
2 macb_interrupt() irq
3 macb_wol_interrupt() irq
4 macb_tx_error_task() workqueue/user
5 macb_tx_restart() napi/softirq
6 macb_xdp_xmit_zc() napi/softirq
7 macb_start_xmit() user
8 macb_xdp_submit_frame() user
And all contexts are safe because it always is this sequence in non-IRQ
contexts (#4-8):
spin_lock_irqsave(&queue->tx_ptr_lock, flags);
spin_lock(&bp->lock);
spin_unlock(&bp->lock);
spin_unlock_irqrestore(&queue->tx_ptr_lock, flags);
So bp->tx_ptr_lock always wraps bp->lock and does the local CPU IRQ
disabling.
(I also checked we don't risk ABBA deadlock, and we don't: all code
acquires bp->tx_ptr_lock THEN bp->lock.)
However, there is still a bug in the code you quoted: setting
BIT(TSTART) is done twice by macb_xdp_xmit_zc():
- once in the helper function macb_xdp_submit_buff() and,
- once in its own body (code you quoted)
This is fixed for V2!
Thanks Maxime,
Have a nice week-end,
--
Théo Lebrun, Bootlin
Embedded Linux and Kernel engineering
https://bootlin.com
Hi,
On 06/03/2026 18:18, Théo Lebrun wrote:
> Hello!
>
> On Fri Mar 6, 2026 at 1:48 PM CET, Maxime Chevallier wrote:
>> On 04/03/2026 19:24, Théo Lebrun wrote:
>>> Add a new buffer type (to `enum macb_tx_buff_type`). Near the end of
>>> macb_tx_complete(), we go and read the XSK buffers using
>>> xsk_tx_peek_release_desc_batch() and append those buffers to our Tx
>>> ring.
>>>
>>> Additionally, in macb_tx_complete(), we signal to the XSK subsystem
>>> number of bytes completed and conditionally mark the need_wakeup
>>> flag.
>>>
>>> Lastly, we update XSK wakeup by writing the TCOMP bit in the per-queue
>>> IMR register, to ensure NAPI scheduling will take place.
>>>
>>> Signed-off-by: Théo Lebrun <theo.lebrun@bootlin.com>
>>> ---
>>
>> [...]
>>
>>> +static void macb_xdp_xmit_zc(struct macb *bp, unsigned int queue_index, int budget)
>>> +{
>>> + struct macb_queue *queue = &bp->queues[queue_index];
>>> + struct xsk_buff_pool *xsk = queue->xsk_pool;
>>> + dma_addr_t mapping;
>>> + u32 slot_available;
>>> + size_t bytes = 0;
>>> + u32 batch;
>>> +
>>> + guard(spinlock_irqsave)(&queue->tx_ptr_lock);
>>> +
>>> + /* This is a hard error, log it. */
>>> + slot_available = CIRC_SPACE(queue->tx_head, queue->tx_tail, bp->tx_ring_size);
>>> + if (slot_available < 1) {
>>> + netif_stop_subqueue(bp->dev, queue_index);
>>> + netdev_dbg(bp->dev, "tx_head = %u, tx_tail = %u\n",
>>> + queue->tx_head, queue->tx_tail);
>>> + return;
>>> + }
>>> +
>>> + batch = min_t(u32, slot_available, budget);
>>> + batch = xsk_tx_peek_release_desc_batch(xsk, batch);
>>> + if (!batch)
>>> + return;
>>> +
>>> + for (u32 i = 0; i < batch; i++) {
>>> + struct xdp_desc *desc = &xsk->tx_descs[i];
>>> +
>>> + mapping = xsk_buff_raw_get_dma(xsk, desc->addr);
>>> + xsk_buff_raw_dma_sync_for_device(xsk, mapping, desc->len);
>>> +
>>> + macb_xdp_submit_buff(bp, queue_index, (struct macb_tx_buff){
>>> + .ptr = NULL,
>>> + .mapping = mapping,
>>> + .size = desc->len,
>>> + .mapped_as_page = false,
>>> + .type = MACB_TYPE_XSK,
>>> + });
>>> +
>>> + bytes += desc->len;
>>> + }
>>> +
>>> + /* Make newly initialized descriptor visible to hardware */
>>> + wmb();
>>> + spin_lock(&bp->lock);
>>> + macb_writel(bp, NCR, macb_readl(bp, NCR) | MACB_BIT(TSTART));
>>> + spin_unlock(&bp->lock);
>>
>> this lock is also taken in interrupt context, this should probably use a
>> irqsave/restore variant. Now, there are a few other parts of this driver
>> that use a plain spin_lock() call and except for the paths that actually
>> run in interrupt context, they don't seem correct to me :(
>
> I almost sent a reply agreeing with you, but actually here is the
> exhaustive `spin_lock(&bp->lock)` list:
>
> # Function Context
> ------------------------------------------
> 1 gem_wol_interrupt() irq
> 2 macb_interrupt() irq
> 3 macb_wol_interrupt() irq
> 4 macb_tx_error_task() workqueue/user
> 5 macb_tx_restart() napi/softirq
> 6 macb_xdp_xmit_zc() napi/softirq
> 7 macb_start_xmit() user
> 8 macb_xdp_submit_frame() user
>
> And all contexts are safe because it always is this sequence in non-IRQ
> contexts (#4-8):
>
> spin_lock_irqsave(&queue->tx_ptr_lock, flags);
> spin_lock(&bp->lock);
> spin_unlock(&bp->lock);
> spin_unlock_irqrestore(&queue->tx_ptr_lock, flags);
Is it because of the guard statement ?
guard(spinlock_irqsave)(&queue->tx_ptr_lock);
It really doesn't make it obvious that this is how it plays out :(
>
> So bp->tx_ptr_lock always wraps bp->lock and does the local CPU IRQ
> disabling.
>
> (I also checked we don't risk ABBA deadlock, and we don't: all code
> acquires bp->tx_ptr_lock THEN bp->lock.)
>
> However, there is still a bug in the code you quoted: setting
> BIT(TSTART) is done twice by macb_xdp_xmit_zc():
> - once in the helper function macb_xdp_submit_buff() and,
> - once in its own body (code you quoted)
> This is fixed for V2!
great :)
Maxime
Hello Maxime,
On Fri Mar 6, 2026 at 6:53 PM CET, Maxime Chevallier wrote:
> On 06/03/2026 18:18, Théo Lebrun wrote:
>> Hello!
>>
>> On Fri Mar 6, 2026 at 1:48 PM CET, Maxime Chevallier wrote:
>>> On 04/03/2026 19:24, Théo Lebrun wrote:
>>>> Add a new buffer type (to `enum macb_tx_buff_type`). Near the end of
>>>> macb_tx_complete(), we go and read the XSK buffers using
>>>> xsk_tx_peek_release_desc_batch() and append those buffers to our Tx
>>>> ring.
>>>>
>>>> Additionally, in macb_tx_complete(), we signal to the XSK subsystem
>>>> number of bytes completed and conditionally mark the need_wakeup
>>>> flag.
>>>>
>>>> Lastly, we update XSK wakeup by writing the TCOMP bit in the per-queue
>>>> IMR register, to ensure NAPI scheduling will take place.
>>>>
>>>> Signed-off-by: Théo Lebrun <theo.lebrun@bootlin.com>
>>>> ---
>>>
>>> [...]
>>>
>>>> +static void macb_xdp_xmit_zc(struct macb *bp, unsigned int queue_index, int budget)
>>>> +{
>>>> + struct macb_queue *queue = &bp->queues[queue_index];
>>>> + struct xsk_buff_pool *xsk = queue->xsk_pool;
>>>> + dma_addr_t mapping;
>>>> + u32 slot_available;
>>>> + size_t bytes = 0;
>>>> + u32 batch;
>>>> +
>>>> + guard(spinlock_irqsave)(&queue->tx_ptr_lock);
>>>> +
>>>> + /* This is a hard error, log it. */
>>>> + slot_available = CIRC_SPACE(queue->tx_head, queue->tx_tail, bp->tx_ring_size);
>>>> + if (slot_available < 1) {
>>>> + netif_stop_subqueue(bp->dev, queue_index);
>>>> + netdev_dbg(bp->dev, "tx_head = %u, tx_tail = %u\n",
>>>> + queue->tx_head, queue->tx_tail);
>>>> + return;
>>>> + }
>>>> +
>>>> + batch = min_t(u32, slot_available, budget);
>>>> + batch = xsk_tx_peek_release_desc_batch(xsk, batch);
>>>> + if (!batch)
>>>> + return;
>>>> +
>>>> + for (u32 i = 0; i < batch; i++) {
>>>> + struct xdp_desc *desc = &xsk->tx_descs[i];
>>>> +
>>>> + mapping = xsk_buff_raw_get_dma(xsk, desc->addr);
>>>> + xsk_buff_raw_dma_sync_for_device(xsk, mapping, desc->len);
>>>> +
>>>> + macb_xdp_submit_buff(bp, queue_index, (struct macb_tx_buff){
>>>> + .ptr = NULL,
>>>> + .mapping = mapping,
>>>> + .size = desc->len,
>>>> + .mapped_as_page = false,
>>>> + .type = MACB_TYPE_XSK,
>>>> + });
>>>> +
>>>> + bytes += desc->len;
>>>> + }
>>>> +
>>>> + /* Make newly initialized descriptor visible to hardware */
>>>> + wmb();
>>>> + spin_lock(&bp->lock);
>>>> + macb_writel(bp, NCR, macb_readl(bp, NCR) | MACB_BIT(TSTART));
>>>> + spin_unlock(&bp->lock);
>>>
>>> this lock is also taken in interrupt context, this should probably use a
>>> irqsave/restore variant. Now, there are a few other parts of this driver
>>> that use a plain spin_lock() call and except for the paths that actually
>>> run in interrupt context, they don't seem correct to me :(
>>
>> I almost sent a reply agreeing with you, but actually here is the
>> exhaustive `spin_lock(&bp->lock)` list:
>>
>> # Function Context
>> ------------------------------------------
>> 1 gem_wol_interrupt() irq
>> 2 macb_interrupt() irq
>> 3 macb_wol_interrupt() irq
>> 4 macb_tx_error_task() workqueue/user
>> 5 macb_tx_restart() napi/softirq
>> 6 macb_xdp_xmit_zc() napi/softirq
>> 7 macb_start_xmit() user
>> 8 macb_xdp_submit_frame() user
>>
>> And all contexts are safe because it always is this sequence in non-IRQ
>> contexts (#4-8):
>>
>> spin_lock_irqsave(&queue->tx_ptr_lock, flags);
>> spin_lock(&bp->lock);
>> spin_unlock(&bp->lock);
>> spin_unlock_irqrestore(&queue->tx_ptr_lock, flags);
>
> Is it because of the guard statement ?
>
> guard(spinlock_irqsave)(&queue->tx_ptr_lock);
>
> It really doesn't make it obvious that this is how it plays out :(
Yes! A guard does an operation when called and one at scope end (in our
case at the end of macb_xdp_xmit_zc()). That way we don't forget the
cleanup, and we can do early returns without a list of labels and
gotos (and mess up along the way).
It uses the __attribute__((cleanup(cleanup_function))) compiler feature,
that is aliased to `__cleanup()` in the kernel.
https://gcc.gnu.org/onlinedocs/gcc/Common-Attributes.html#index-cleanup
https://elixir.bootlin.com/linux/v6.19.6/source/include/linux/compiler_attributes.h#L76
Guard definition for `spinlock_irqsave`:
https://elixir.bootlin.com/linux/v6.19.6/source/include/linux/spinlock.h#L585-L588
(delving into those macros is not recommended)
Code documentation is good:
https://elixir.bootlin.com/linux/v6.19.6/source/include/linux/cleanup.h#L10
Thanks,
--
Théo Lebrun, Bootlin
Embedded Linux and Kernel engineering
https://bootlin.com
© 2016 - 2026 Red Hat, Inc.