From nobody Thu Sep 18 23:36:27 2025 Return-Path: X-Spam-Checker-Version: SpamAssassin 3.4.0 (2014-02-07) on aws-us-west-2-korg-lkml-1.web.codeaurora.org Received: from vger.kernel.org (vger.kernel.org [23.128.96.18]) by smtp.lore.kernel.org (Postfix) with ESMTP id 815E7C3A5A7 for ; Fri, 2 Dec 2022 00:23:50 +0000 (UTC) Received: (majordomo@vger.kernel.org) by vger.kernel.org via listexpand id S232184AbiLBAXt (ORCPT ); Thu, 1 Dec 2022 19:23:49 -0500 Received: from lindbergh.monkeyblade.net ([23.128.96.19]:34126 "EHLO lindbergh.monkeyblade.net" rhost-flags-OK-OK-OK-OK) by vger.kernel.org with ESMTP id S231201AbiLBAXQ (ORCPT ); Thu, 1 Dec 2022 19:23:16 -0500 Received: from us-smtp-delivery-124.mimecast.com (us-smtp-delivery-124.mimecast.com [170.10.129.124]) by lindbergh.monkeyblade.net (Postfix) with ESMTPS id 9678DD15A6 for ; Thu, 1 Dec 2022 16:19:09 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=redhat.com; s=mimecast20190719; t=1669940348; h=from:from:reply-to:subject:subject:date:date:message-id:message-id: to:to:cc:cc:mime-version:mime-version:content-type:content-type: content-transfer-encoding:content-transfer-encoding: in-reply-to:in-reply-to:references:references; bh=U8gv5FwlfBLk2h9FYCfGchxbHxe+R+fxfyI8Nu42mz8=; b=Iiffkll0XCSy4z4XcOkPDseOiiGfCmbpB6YHw15jEGJ1kNf3CiHcYlEnF7hjzFkphBS0ne V8MfbgL2EN7U4FfOlJf+gg6LF0G3UGL1naH40F5wToZRlCbMoRzbpe2WwAi6MtOB3MyH4e ax7dMPDT6yd1bNgcfVk4Gt8r5Cr7TD4= Received: from mimecast-mx02.redhat.com (mimecast-mx02.redhat.com [66.187.233.88]) by relay.mimecast.com with ESMTP with STARTTLS (version=TLSv1.2, cipher=TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384) id us-mta-563-2-_i_3ckNdeQJxEoROQu5Q-1; Thu, 01 Dec 2022 19:19:05 -0500 X-MC-Unique: 2-_i_3ckNdeQJxEoROQu5Q-1 Received: from smtp.corp.redhat.com (int-mx05.intmail.prod.int.rdu2.redhat.com [10.11.54.5]) (using TLSv1.2 with cipher AECDH-AES256-SHA (256/256 bits)) (No client certificate requested) by mimecast-mx02.redhat.com (Postfix) with ESMTPS id 26357185A7A8; Fri, 2 Dec 2022 00:19:05 +0000 (UTC) Received: from warthog.procyon.org.uk (unknown [10.33.36.36]) by smtp.corp.redhat.com (Postfix) with ESMTP id 50C6D7AE5; Fri, 2 Dec 2022 00:19:04 +0000 (UTC) Organization: Red Hat UK Ltd. Registered Address: Red Hat UK Ltd, Amberley Place, 107-111 Peascod Street, Windsor, Berkshire, SI4 1TE, United Kingdom. Registered in England and Wales under Company Registration No. 3798903 Subject: [PATCH net-next 28/36] rxrpc: Simplify skbuff accounting in receive path From: David Howells To: netdev@vger.kernel.org Cc: Marc Dionne , linux-afs@lists.infradead.org, dhowells@redhat.com, linux-afs@lists.infradead.org, linux-kernel@vger.kernel.org Date: Fri, 02 Dec 2022 00:19:01 +0000 Message-ID: <166994034178.1732290.4349128141687751353.stgit@warthog.procyon.org.uk> In-Reply-To: <166994010342.1732290.13771061038178613124.stgit@warthog.procyon.org.uk> References: <166994010342.1732290.13771061038178613124.stgit@warthog.procyon.org.uk> User-Agent: StGit/1.5 MIME-Version: 1.0 Content-Type: text/plain; charset="utf-8" Content-Transfer-Encoding: quoted-printable X-Scanned-By: MIMEDefang 3.1 on 10.11.54.5 Precedence: bulk List-ID: X-Mailing-List: linux-kernel@vger.kernel.org A received skbuff needs a ref when it gets put on a call data queue or conn packet queue, and rxrpc_input_packet() and co. jump through a lot of hoops to avoid double-dropping the skbuff ref so that we can avoid getting a ref when we queue the packet. Change this so that the skbuff ref is unconditionally dropped by the caller of rxrpc_input_packet(). An additional ref is then taken on the packet if it is pushed onto a queue. Signed-off-by: David Howells cc: Marc Dionne cc: linux-afs@lists.infradead.org --- include/trace/events/rxrpc.h | 3 +- net/rxrpc/input.c | 45 +++++++++++++-------------- net/rxrpc/io_thread.c | 70 +++++++++++++++++++-------------------= ---- 3 files changed, 56 insertions(+), 62 deletions(-) diff --git a/include/trace/events/rxrpc.h b/include/trace/events/rxrpc.h index c3043fbea0e6..82b1327c2ba6 100644 --- a/include/trace/events/rxrpc.h +++ b/include/trace/events/rxrpc.h @@ -28,6 +28,8 @@ EM(rxrpc_skb_eaten_by_unshare_nomem, "ETN unshar-nm") \ EM(rxrpc_skb_get_ack, "GET ack ") \ EM(rxrpc_skb_get_conn_work, "GET conn-work") \ + EM(rxrpc_skb_get_local_work, "GET locl-work") \ + EM(rxrpc_skb_get_reject_work, "GET rej-work ") \ EM(rxrpc_skb_get_to_recvmsg, "GET to-recv ") \ EM(rxrpc_skb_get_to_recvmsg_oos, "GET to-recv-o") \ EM(rxrpc_skb_new_encap_rcv, "NEW encap-rcv") \ @@ -39,7 +41,6 @@ EM(rxrpc_skb_put_error_report, "PUT error-rep") \ EM(rxrpc_skb_put_input, "PUT input ") \ EM(rxrpc_skb_put_jumbo_subpacket, "PUT jumbo-sub") \ - EM(rxrpc_skb_put_lose, "PUT lose ") \ EM(rxrpc_skb_put_purge, "PUT purge ") \ EM(rxrpc_skb_put_rotate, "PUT rotate ") \ EM(rxrpc_skb_put_unknown, "PUT unknown ") \ diff --git a/net/rxrpc/input.c b/net/rxrpc/input.c index 036f02371051..42addbcf59f9 100644 --- a/net/rxrpc/input.c +++ b/net/rxrpc/input.c @@ -338,7 +338,8 @@ static void rxrpc_input_queue_data(struct rxrpc_call *c= all, struct sk_buff *skb, /* * Process a DATA packet. */ -static void rxrpc_input_data_one(struct rxrpc_call *call, struct sk_buff *= skb) +static void rxrpc_input_data_one(struct rxrpc_call *call, struct sk_buff *= skb, + bool *_notify) { struct rxrpc_skb_priv *sp =3D rxrpc_skb(skb); struct sk_buff *oos; @@ -361,7 +362,7 @@ static void rxrpc_input_data_one(struct rxrpc_call *cal= l, struct sk_buff *skb) if (test_and_set_bit(RXRPC_CALL_RX_LAST, &call->flags) && seq + 1 !=3D wtop) { rxrpc_proto_abort("LSN", call, seq); - goto err_free; + return; } } else { if (test_bit(RXRPC_CALL_RX_LAST, &call->flags) && @@ -369,7 +370,7 @@ static void rxrpc_input_data_one(struct rxrpc_call *cal= l, struct sk_buff *skb) pr_warn("Packet beyond last: c=3D%x q=3D%x window=3D%x-%x wlimit=3D%x\n= ", call->debug_id, seq, window, wtop, wlimit); rxrpc_proto_abort("LSA", call, seq); - goto err_free; + return; } } =20 @@ -402,9 +403,11 @@ static void rxrpc_input_data_one(struct rxrpc_call *ca= ll, struct sk_buff *skb) if (after(window, wtop)) wtop =3D window; =20 + rxrpc_get_skb(skb, rxrpc_skb_get_to_recvmsg); + spin_lock(&call->recvmsg_queue.lock); rxrpc_input_queue_data(call, skb, window, wtop, rxrpc_receive_queue); - skb =3D NULL; + *_notify =3D true; =20 while ((oos =3D skb_peek(&call->rx_oos_queue))) { struct rxrpc_skb_priv *osp =3D rxrpc_skb(oos); @@ -456,16 +459,17 @@ static void rxrpc_input_data_one(struct rxrpc_call *c= all, struct sk_buff *skb) struct rxrpc_skb_priv *osp =3D rxrpc_skb(oos); =20 if (after(osp->hdr.seq, seq)) { + rxrpc_get_skb(skb, rxrpc_skb_get_to_recvmsg_oos); __skb_queue_before(&call->rx_oos_queue, oos, skb); goto oos_queued; } } =20 + rxrpc_get_skb(skb, rxrpc_skb_get_to_recvmsg_oos); __skb_queue_tail(&call->rx_oos_queue, skb); oos_queued: trace_rxrpc_receive(call, last ? rxrpc_receive_oos_last : rxrpc_receive_= oos, sp->hdr.serial, sp->hdr.seq); - skb =3D NULL; } =20 send_ack: @@ -483,9 +487,6 @@ static void rxrpc_input_data_one(struct rxrpc_call *cal= l, struct sk_buff *skb) else rxrpc_propose_delay_ACK(call, serial, rxrpc_propose_ack_input_data); - -err_free: - rxrpc_free_skb(skb, rxrpc_skb_put_input); } =20 /* @@ -498,6 +499,7 @@ static bool rxrpc_input_split_jumbo(struct rxrpc_call *= call, struct sk_buff *skb struct sk_buff *jskb; unsigned int offset =3D sizeof(struct rxrpc_wire_header); unsigned int len =3D skb->len - offset; + bool notify =3D false; =20 while (sp->hdr.flags & RXRPC_JUMBO_PACKET) { if (len < RXRPC_JUMBO_SUBPKTLEN) @@ -517,7 +519,8 @@ static bool rxrpc_input_split_jumbo(struct rxrpc_call *= call, struct sk_buff *skb jsp =3D rxrpc_skb(jskb); jsp->offset =3D offset; jsp->len =3D RXRPC_JUMBO_DATALEN; - rxrpc_input_data_one(call, jskb); + rxrpc_input_data_one(call, jskb, ¬ify); + rxrpc_free_skb(jskb, rxrpc_skb_put_jumbo_subpacket); =20 sp->hdr.flags =3D jhdr.flags; sp->hdr._rsvd =3D ntohs(jhdr._rsvd); @@ -529,7 +532,11 @@ static bool rxrpc_input_split_jumbo(struct rxrpc_call = *call, struct sk_buff *skb =20 sp->offset =3D offset; sp->len =3D len; - rxrpc_input_data_one(call, skb); + rxrpc_input_data_one(call, skb, ¬ify); + if (notify) { + trace_rxrpc_notify_socket(call->debug_id, sp->hdr.serial); + rxrpc_notify_socket(call); + } return true; =20 protocol_error: @@ -552,10 +559,8 @@ static void rxrpc_input_data(struct rxrpc_call *call, = struct sk_buff *skb) skb->len, seq0); =20 state =3D READ_ONCE(call->state); - if (state >=3D RXRPC_CALL_COMPLETE) { - rxrpc_free_skb(skb, rxrpc_skb_put_input); + if (state >=3D RXRPC_CALL_COMPLETE) return; - } =20 /* Unshare the packet so that it can be modified for in-place * decryption. @@ -605,7 +610,6 @@ static void rxrpc_input_data(struct rxrpc_call *call, s= truct sk_buff *skb) out: trace_rxrpc_notify_socket(call->debug_id, serial); rxrpc_notify_socket(call); - rxrpc_free_skb(skb, rxrpc_skb_put_input); _leave(" [queued]"); } =20 @@ -797,7 +801,7 @@ static void rxrpc_input_ack(struct rxrpc_call *call, st= ruct sk_buff *skb) struct rxrpc_ackpacket ack; struct rxrpc_skb_priv *sp =3D rxrpc_skb(skb); struct rxrpc_ackinfo info; - struct sk_buff *skb_old =3D NULL, *skb_put =3D skb; + struct sk_buff *skb_old =3D NULL; rxrpc_serial_t ack_serial, acked_serial; rxrpc_seq_t first_soft_ack, hard_ack, prev_pkt; int nr_acks, offset, ioffset; @@ -963,6 +967,7 @@ static void rxrpc_input_ack(struct rxrpc_call *call, st= ruct sk_buff *skb) goto out; } =20 + rxrpc_get_skb(skb, rxrpc_skb_get_ack); spin_lock(&call->acks_ack_lock); skb_old =3D call->acks_soft_tbl; call->acks_soft_tbl =3D skb; @@ -970,7 +975,6 @@ static void rxrpc_input_ack(struct rxrpc_call *call, st= ruct sk_buff *skb) =20 rxrpc_input_soft_acks(call, skb->data + offset, first_soft_ack, nr_acks, &summary); - skb_put =3D NULL; } else if (call->acks_soft_tbl) { spin_lock(&call->acks_ack_lock); skb_old =3D call->acks_soft_tbl; @@ -986,7 +990,6 @@ static void rxrpc_input_ack(struct rxrpc_call *call, st= ruct sk_buff *skb) =20 rxrpc_congestion_management(call, skb, &summary, acked_serial); out: - rxrpc_free_skb(skb_put, rxrpc_skb_put_input); rxrpc_free_skb(skb_old, rxrpc_skb_put_ack); } =20 @@ -1037,11 +1040,11 @@ void rxrpc_input_call_event(struct rxrpc_call *call= , struct sk_buff *skb) switch (sp->hdr.type) { case RXRPC_PACKET_TYPE_DATA: rxrpc_input_data(call, skb); - goto no_free; + break; =20 case RXRPC_PACKET_TYPE_ACK: rxrpc_input_ack(call, skb); - goto no_free; + break; =20 case RXRPC_PACKET_TYPE_BUSY: /* Just ignore BUSY packets from the server; the retry and @@ -1061,10 +1064,6 @@ void rxrpc_input_call_event(struct rxrpc_call *call,= struct sk_buff *skb) default: break; } - - rxrpc_free_skb(skb, rxrpc_skb_put_input); -no_free: - _leave(""); } =20 /* diff --git a/net/rxrpc/io_thread.c b/net/rxrpc/io_thread.c index 2119941b6d6c..91b8ba5b90db 100644 --- a/net/rxrpc/io_thread.c +++ b/net/rxrpc/io_thread.c @@ -72,6 +72,7 @@ static void rxrpc_post_packet_to_conn(struct rxrpc_connec= tion *conn, { _enter("%p,%p", conn, skb); =20 + rxrpc_get_skb(skb, rxrpc_skb_get_conn_work); skb_queue_tail(&conn->rx_queue, skb); rxrpc_queue_conn(conn, rxrpc_conn_queue_rx_work); } @@ -86,10 +87,9 @@ static void rxrpc_post_packet_to_local(struct rxrpc_loca= l *local, _enter("%p,%p", local, skb); =20 if (rxrpc_get_local_maybe(local, rxrpc_local_get_queue)) { + rxrpc_get_skb(skb, rxrpc_skb_get_local_work); skb_queue_tail(&local->event_queue, skb); rxrpc_queue_local(local); - } else { - rxrpc_free_skb(skb, rxrpc_skb_put_input); } } =20 @@ -99,10 +99,9 @@ static void rxrpc_post_packet_to_local(struct rxrpc_loca= l *local, static void rxrpc_reject_packet(struct rxrpc_local *local, struct sk_buff = *skb) { if (rxrpc_get_local_maybe(local, rxrpc_local_get_queue)) { + rxrpc_get_skb(skb, rxrpc_skb_get_reject_work); skb_queue_tail(&local->reject_queue, skb); rxrpc_queue_local(local); - } else { - rxrpc_free_skb(skb, rxrpc_skb_put_input); } } =20 @@ -153,7 +152,7 @@ static bool rxrpc_extract_abort(struct sk_buff *skb) /* * Process packets received on the local endpoint */ -static int rxrpc_input_packet(struct rxrpc_local *local, struct sk_buff *s= kb) +static int rxrpc_input_packet(struct rxrpc_local *local, struct sk_buff **= _skb) { struct rxrpc_connection *conn; struct rxrpc_channel *chan; @@ -161,6 +160,7 @@ static int rxrpc_input_packet(struct rxrpc_local *local= , struct sk_buff *skb) struct rxrpc_skb_priv *sp; struct rxrpc_peer *peer =3D NULL; struct rxrpc_sock *rx =3D NULL; + struct sk_buff *skb =3D *_skb; unsigned int channel; =20 if (skb->tstamp =3D=3D 0) @@ -181,7 +181,6 @@ static int rxrpc_input_packet(struct rxrpc_local *local= , struct sk_buff *skb) static int lose; if ((lose++ & 7) =3D=3D 7) { trace_rxrpc_rx_lose(sp); - rxrpc_free_skb(skb, rxrpc_skb_put_lose); return 0; } } @@ -193,13 +192,13 @@ static int rxrpc_input_packet(struct rxrpc_local *loc= al, struct sk_buff *skb) switch (sp->hdr.type) { case RXRPC_PACKET_TYPE_VERSION: if (rxrpc_to_client(sp)) - goto discard; + return 0; rxrpc_post_packet_to_local(local, skb); - goto out; + return 0; =20 case RXRPC_PACKET_TYPE_BUSY: if (rxrpc_to_server(sp)) - goto discard; + return 0; fallthrough; case RXRPC_PACKET_TYPE_ACK: case RXRPC_PACKET_TYPE_ACKALL: @@ -208,7 +207,7 @@ static int rxrpc_input_packet(struct rxrpc_local *local= , struct sk_buff *skb) break; case RXRPC_PACKET_TYPE_ABORT: if (!rxrpc_extract_abort(skb)) - return true; /* Just discard if malformed */ + return 0; /* Just discard if malformed */ break; =20 case RXRPC_PACKET_TYPE_DATA: @@ -220,15 +219,16 @@ static int rxrpc_input_packet(struct rxrpc_local *loc= al, struct sk_buff *skb) * decryption. */ if (sp->hdr.securityIndex !=3D 0) { - struct sk_buff *nskb =3D skb_unshare(skb, GFP_ATOMIC); - if (!nskb) { - rxrpc_eaten_skb(skb, rxrpc_skb_eaten_by_unshare_nomem); - goto out; + skb =3D skb_unshare(skb, GFP_ATOMIC); + if (!skb) { + rxrpc_eaten_skb(*_skb, rxrpc_skb_eaten_by_unshare_nomem); + *_skb =3D NULL; + return 0; } =20 - if (nskb !=3D skb) { - rxrpc_eaten_skb(skb, rxrpc_skb_eaten_by_unshare); - skb =3D nskb; + if (skb !=3D *_skb) { + rxrpc_eaten_skb(*_skb, rxrpc_skb_eaten_by_unshare); + *_skb =3D skb; rxrpc_new_skb(skb, rxrpc_skb_new_unshared); sp =3D rxrpc_skb(skb); } @@ -237,18 +237,18 @@ static int rxrpc_input_packet(struct rxrpc_local *loc= al, struct sk_buff *skb) =20 case RXRPC_PACKET_TYPE_CHALLENGE: if (rxrpc_to_server(sp)) - goto discard; + return 0; break; case RXRPC_PACKET_TYPE_RESPONSE: if (rxrpc_to_client(sp)) - goto discard; + return 0; break; =20 /* Packet types 9-11 should just be ignored. */ case RXRPC_PACKET_TYPE_PARAMS: case RXRPC_PACKET_TYPE_10: case RXRPC_PACKET_TYPE_11: - goto discard; + return 0; =20 default: goto bad_message; @@ -268,7 +268,7 @@ static int rxrpc_input_packet(struct rxrpc_local *local= , struct sk_buff *skb) if (sp->hdr.type =3D=3D RXRPC_PACKET_TYPE_DATA && sp->hdr.seq =3D=3D 1) goto unsupported_service; - goto discard; + return 0; } } =20 @@ -294,7 +294,7 @@ static int rxrpc_input_packet(struct rxrpc_local *local= , struct sk_buff *skb) /* Connection-level packet */ _debug("CONN %p {%d}", conn, conn->debug_id); rxrpc_post_packet_to_conn(conn, skb); - goto out; + return 0; } =20 if ((int)sp->hdr.serial - (int)conn->hi_serial > 0) @@ -306,19 +306,19 @@ static int rxrpc_input_packet(struct rxrpc_local *loc= al, struct sk_buff *skb) =20 /* Ignore really old calls */ if (sp->hdr.callNumber < chan->last_call) - goto discard; + return 0; =20 if (sp->hdr.callNumber =3D=3D chan->last_call) { if (chan->call || sp->hdr.type =3D=3D RXRPC_PACKET_TYPE_ABORT) - goto discard; + return 0; =20 /* For the previous service call, if completed * successfully, we discard all further packets. */ if (rxrpc_conn_is_service(conn) && chan->last_type =3D=3D RXRPC_PACKET_TYPE_ACK) - goto discard; + return 0; =20 /* But otherwise we need to retransmit the final packet * from data cached in the connection record. @@ -329,7 +329,7 @@ static int rxrpc_input_packet(struct rxrpc_local *local= , struct sk_buff *skb) sp->hdr.serial, sp->hdr.flags); rxrpc_post_packet_to_conn(conn, skb); - goto out; + return 0; } =20 call =3D rcu_dereference(chan->call); @@ -357,21 +357,14 @@ static int rxrpc_input_packet(struct rxrpc_local *loc= al, struct sk_buff *skb) sp->hdr.type !=3D RXRPC_PACKET_TYPE_DATA) goto bad_message; if (sp->hdr.seq !=3D 1) - goto discard; + return 0; call =3D rxrpc_new_incoming_call(local, rx, skb); if (!call) goto reject_packet; } =20 - /* Process a call packet; this either discards or passes on the ref - * elsewhere. - */ + /* Process a call packet. */ rxrpc_input_call_event(call, skb); - goto out; - -discard: - rxrpc_free_skb(skb, rxrpc_skb_put_input); -out: trace_rxrpc_rx_done(0, 0); return 0; =20 @@ -400,9 +393,7 @@ static int rxrpc_input_packet(struct rxrpc_local *local= , struct sk_buff *skb) post_abort: skb->mark =3D RXRPC_SKB_MARK_REJECT_ABORT; reject_packet: - trace_rxrpc_rx_done(skb->mark, skb->priority); rxrpc_reject_packet(local, skb); - _leave(" [badmsg]"); return 0; } =20 @@ -441,9 +432,12 @@ int rxrpc_io_thread(void *data) if ((skb =3D __skb_dequeue(&rx_queue))) { switch (skb->mark) { case RXRPC_SKB_MARK_PACKET: + skb->priority =3D 0; rcu_read_lock(); - rxrpc_input_packet(local, skb); + rxrpc_input_packet(local, &skb); rcu_read_unlock(); + trace_rxrpc_rx_done(skb->mark, skb->priority); + rxrpc_free_skb(skb, rxrpc_skb_put_input); break; case RXRPC_SKB_MARK_ERROR: rxrpc_input_error(local, skb);