From nobody Fri Dec 19 04:57:59 2025 Received: from us-smtp-delivery-124.mimecast.com (us-smtp-delivery-124.mimecast.com [170.10.133.124]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by smtp.subspace.kernel.org (Postfix) with ESMTPS id C1D6B1B0F19 for ; Wed, 4 Dec 2024 07:48:41 +0000 (UTC) Authentication-Results: smtp.subspace.kernel.org; arc=none smtp.client-ip=170.10.133.124 ARC-Seal: i=1; a=rsa-sha256; d=subspace.kernel.org; s=arc-20240116; t=1733298524; cv=none; b=oBXnFbxkC2fJJF3+r1vzHBEOxex1iNyp4vC99+xhtLWOBr1N/Fnq5tAIXqhjeqBuBNXmgG1GQSvxnbxyFn5cQJeBV2Rmb53lwHemE6D3IowsDjcTslmU5jlZiWE0FFunMoDbJuIMcM1rmENvyrC2LEF6IEVOZeqYAeEioLWqMdU= ARC-Message-Signature: i=1; a=rsa-sha256; d=subspace.kernel.org; s=arc-20240116; t=1733298524; c=relaxed/simple; bh=xJIfW4nYgAk2o1dqUwcagpIsStB08sC7LSa5XPckERc=; h=From:To:Cc:Subject:Date:Message-ID:In-Reply-To:References: MIME-Version; b=jQUik4QKRHKWvDGxOscIP+VW+ovCUceI7oyhSSqhU5ltt/PVO6PCOmIUc0YBgraYreOCWUfhPIIvEUyoPKbRkiiyZ4YmkuHki9uBOqLd4JPCzfGYpEvyDc89eYqejiuu47CI8TOjoBoz6wP5iZGmC+CCecVPyDzhvaYAqBjVtUs= ARC-Authentication-Results: i=1; smtp.subspace.kernel.org; dmarc=pass (p=none dis=none) header.from=redhat.com; spf=pass smtp.mailfrom=redhat.com; dkim=pass (1024-bit key) header.d=redhat.com header.i=@redhat.com header.b=NSGLjppC; arc=none smtp.client-ip=170.10.133.124 Authentication-Results: smtp.subspace.kernel.org; dmarc=pass (p=none dis=none) header.from=redhat.com Authentication-Results: smtp.subspace.kernel.org; spf=pass smtp.mailfrom=redhat.com Authentication-Results: smtp.subspace.kernel.org; dkim=pass (1024-bit key) header.d=redhat.com header.i=@redhat.com header.b="NSGLjppC" DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=redhat.com; s=mimecast20190719; t=1733298521; h=from:from:reply-to:subject:subject:date:date:message-id:message-id: to:to:cc:cc:mime-version:mime-version: content-transfer-encoding:content-transfer-encoding: in-reply-to:in-reply-to:references:references; bh=M+z8Om3cgRX7uYj57J3DdDUAaBsW5k16ISue1QObnzk=; b=NSGLjppCeACFLYXdJtOHHT57S16JKRBA+EFvLrFbAWvmvkOLIeWws/gngrV+mtkfLBRCMV sn6PKgSgQ2iwnv8kpfMSgbEbPmb8Avepp4ZL7NkOgh/aijRUltFWMXDYplNcZewYnMgPDL ps6bjvGvz2ZnJJA10uey9iv1gjm7+qo= Received: from mx-prod-mc-03.mail-002.prod.us-west-2.aws.redhat.com (ec2-54-186-198-63.us-west-2.compute.amazonaws.com [54.186.198.63]) by relay.mimecast.com with ESMTP with STARTTLS (version=TLSv1.3, cipher=TLS_AES_256_GCM_SHA384) id us-mta-130-YpRm4lmoPKy7pbIt3wO8mA-1; Wed, 04 Dec 2024 02:48:35 -0500 X-MC-Unique: YpRm4lmoPKy7pbIt3wO8mA-1 X-Mimecast-MFC-AGG-ID: YpRm4lmoPKy7pbIt3wO8mA Received: from mx-prod-int-01.mail-002.prod.us-west-2.aws.redhat.com (mx-prod-int-01.mail-002.prod.us-west-2.aws.redhat.com [10.30.177.4]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature RSA-PSS (2048 bits) server-digest SHA256) (No client certificate requested) by mx-prod-mc-03.mail-002.prod.us-west-2.aws.redhat.com (Postfix) with ESMTPS id 84E861956054; Wed, 4 Dec 2024 07:48:33 +0000 (UTC) Received: from warthog.procyon.org.uk.com (unknown [10.42.28.48]) by mx-prod-int-01.mail-002.prod.us-west-2.aws.redhat.com (Postfix) with ESMTP id 9DA0F3000197; Wed, 4 Dec 2024 07:48:30 +0000 (UTC) From: David Howells To: netdev@vger.kernel.org Cc: David Howells , Marc Dionne , Yunsheng Lin , "David S. Miller" , Eric Dumazet , Jakub Kicinski , Paolo Abeni , linux-afs@lists.infradead.org, linux-kernel@vger.kernel.org Subject: [PATCH net-next v2 18/39] rxrpc: Implement progressive transmission queue struct Date: Wed, 4 Dec 2024 07:46:46 +0000 Message-ID: <20241204074710.990092-19-dhowells@redhat.com> In-Reply-To: <20241204074710.990092-1-dhowells@redhat.com> References: <20241204074710.990092-1-dhowells@redhat.com> Precedence: bulk X-Mailing-List: linux-kernel@vger.kernel.org List-Id: List-Subscribe: List-Unsubscribe: MIME-Version: 1.0 Content-Transfer-Encoding: quoted-printable X-Scanned-By: MIMEDefang 3.4.1 on 10.30.177.4 Content-Type: text/plain; charset="utf-8" We need to scan the buffers in the transmission queue occasionally when processing ACKs, but the transmission queue is currently a linked list of transmission buffers which, when we eventually expand the Tx window to 8192 packets will be very slow to walk. Instead, pull the fields we need to examine a lot (last sent time, retransmitted flag) into a new struct rxrpc_txqueue and make each one hold an array of 32 or 64 packets. The transmission queue is then a list of these structs, each pointing to a contiguous set of packets. Scanning is then a lot faster as the flags and timestamps are concentrated in the CPU dcache. The transmission timestamps are stored as a number of microseconds from a base ktime to reduce memory requirements. This should be fine provided we manage to transmit an entire buffer within an hour. This will make implementing RACK-TLP [RFC8985] easier as it will be less costly to scan the transmission buffers. Signed-off-by: David Howells cc: Marc Dionne cc: "David S. Miller" cc: Eric Dumazet cc: Jakub Kicinski cc: Paolo Abeni cc: linux-afs@lists.infradead.org cc: netdev@vger.kernel.org --- include/trace/events/rxrpc.h | 98 ++++++++++++++--- net/rxrpc/ar-internal.h | 47 ++++++-- net/rxrpc/call_event.c | 204 ++++++++++++++++++++++------------- net/rxrpc/call_object.c | 38 ++++--- net/rxrpc/input.c | 72 ++++++++++--- net/rxrpc/output.c | 163 ++++++++++++++-------------- net/rxrpc/sendmsg.c | 69 +++++++++--- net/rxrpc/txbuf.c | 41 +------ 8 files changed, 467 insertions(+), 265 deletions(-) diff --git a/include/trace/events/rxrpc.h b/include/trace/events/rxrpc.h index 28fa7be31ff8..e6cf9ec940aa 100644 --- a/include/trace/events/rxrpc.h +++ b/include/trace/events/rxrpc.h @@ -297,7 +297,6 @@ =20 #define rxrpc_txqueue_traces \ EM(rxrpc_txqueue_await_reply, "AWR") \ - EM(rxrpc_txqueue_dequeue, "DEQ") \ EM(rxrpc_txqueue_end, "END") \ EM(rxrpc_txqueue_queue, "QUE") \ EM(rxrpc_txqueue_queue_last, "QLS") \ @@ -482,6 +481,19 @@ EM(rxrpc_txbuf_see_send_more, "SEE SEND+ ") \ E_(rxrpc_txbuf_see_unacked, "SEE UNACKED") =20 +#define rxrpc_tq_traces \ + EM(rxrpc_tq_alloc, "ALLOC") \ + EM(rxrpc_tq_cleaned, "CLEAN") \ + EM(rxrpc_tq_decant, "DCNT ") \ + EM(rxrpc_tq_decant_advance, "DCNT>") \ + EM(rxrpc_tq_queue, "QUEUE") \ + EM(rxrpc_tq_queue_dup, "QUE!!") \ + EM(rxrpc_tq_rotate, "ROT ") \ + EM(rxrpc_tq_rotate_and_free, "ROT-F") \ + EM(rxrpc_tq_rotate_and_keep, "ROT-K") \ + EM(rxrpc_tq_transmit, "XMIT ") \ + E_(rxrpc_tq_transmit_advance, "XMIT>") + #define rxrpc_pmtud_reduce_traces \ EM(rxrpc_pmtud_reduce_ack, "Ack ") \ EM(rxrpc_pmtud_reduce_icmp, "Icmp ") \ @@ -518,6 +530,7 @@ enum rxrpc_rtt_tx_trace { rxrpc_rtt_tx_traces } __mode= (byte); enum rxrpc_sack_trace { rxrpc_sack_traces } __mode(byte); enum rxrpc_skb_trace { rxrpc_skb_traces } __mode(byte); enum rxrpc_timer_trace { rxrpc_timer_traces } __mode(byte); +enum rxrpc_tq_trace { rxrpc_tq_traces } __mode(byte); enum rxrpc_tx_point { rxrpc_tx_points } __mode(byte); enum rxrpc_txbuf_trace { rxrpc_txbuf_traces } __mode(byte); enum rxrpc_txqueue_trace { rxrpc_txqueue_traces } __mode(byte); @@ -554,6 +567,7 @@ rxrpc_rtt_tx_traces; rxrpc_sack_traces; rxrpc_skb_traces; rxrpc_timer_traces; +rxrpc_tq_traces; rxrpc_tx_points; rxrpc_txbuf_traces; rxrpc_txqueue_traces; @@ -881,7 +895,7 @@ TRACE_EVENT(rxrpc_txqueue, __field(rxrpc_seq_t, acks_hard_ack) __field(rxrpc_seq_t, tx_bottom) __field(rxrpc_seq_t, tx_top) - __field(rxrpc_seq_t, tx_prepared) + __field(rxrpc_seq_t, send_top) __field(int, tx_winsize) ), =20 @@ -891,7 +905,7 @@ TRACE_EVENT(rxrpc_txqueue, __entry->acks_hard_ack =3D call->acks_hard_ack; __entry->tx_bottom =3D call->tx_bottom; __entry->tx_top =3D call->tx_top; - __entry->tx_prepared =3D call->tx_prepared; + __entry->send_top =3D call->send_top; __entry->tx_winsize =3D call->tx_winsize; ), =20 @@ -902,14 +916,14 @@ TRACE_EVENT(rxrpc_txqueue, __entry->acks_hard_ack, __entry->tx_top - __entry->tx_bottom, __entry->tx_top - __entry->acks_hard_ack, - __entry->tx_prepared - __entry->tx_bottom, + __entry->send_top - __entry->tx_top, __entry->tx_winsize) ); =20 TRACE_EVENT(rxrpc_transmit, - TP_PROTO(struct rxrpc_call *call, int space), + TP_PROTO(struct rxrpc_call *call, rxrpc_seq_t send_top, int space), =20 - TP_ARGS(call, space), + TP_ARGS(call, send_top, space), =20 TP_STRUCT__entry( __field(unsigned int, call) @@ -925,12 +939,12 @@ TRACE_EVENT(rxrpc_transmit, =20 TP_fast_assign( __entry->call =3D call->debug_id; - __entry->seq =3D call->tx_bottom; + __entry->seq =3D call->tx_top + 1; __entry->space =3D space; __entry->tx_winsize =3D call->tx_winsize; __entry->cong_cwnd =3D call->cong_cwnd; __entry->cong_extra =3D call->cong_extra; - __entry->prepared =3D call->tx_prepared - call->tx_bottom; + __entry->prepared =3D send_top - call->tx_bottom; __entry->in_flight =3D call->tx_top - call->acks_hard_ack; __entry->pmtud_jumbo =3D call->peer->pmtud_jumbo; ), @@ -947,6 +961,32 @@ TRACE_EVENT(rxrpc_transmit, __entry->pmtud_jumbo) ); =20 +TRACE_EVENT(rxrpc_tx_rotate, + TP_PROTO(struct rxrpc_call *call, rxrpc_seq_t seq, rxrpc_seq_t to), + + TP_ARGS(call, seq, to), + + TP_STRUCT__entry( + __field(unsigned int, call) + __field(rxrpc_seq_t, seq) + __field(rxrpc_seq_t, to) + __field(rxrpc_seq_t, top) + ), + + TP_fast_assign( + __entry->call =3D call->debug_id; + __entry->seq =3D seq; + __entry->to =3D to; + __entry->top =3D call->tx_top; + ), + + TP_printk("c=3D%08x q=3D%08x-%08x-%08x", + __entry->call, + __entry->seq, + __entry->to, + __entry->top) + ); + TRACE_EVENT(rxrpc_rx_data, TP_PROTO(unsigned int call, rxrpc_seq_t seq, rxrpc_serial_t serial, u8 flags), @@ -1621,10 +1661,11 @@ TRACE_EVENT(rxrpc_drop_ack, ); =20 TRACE_EVENT(rxrpc_retransmit, - TP_PROTO(struct rxrpc_call *call, rxrpc_seq_t seq, - rxrpc_serial_t serial, ktime_t expiry), + TP_PROTO(struct rxrpc_call *call, + struct rxrpc_send_data_req *req, + struct rxrpc_txbuf *txb, ktime_t expiry), =20 - TP_ARGS(call, seq, serial, expiry), + TP_ARGS(call, req, txb, expiry), =20 TP_STRUCT__entry( __field(unsigned int, call) @@ -1635,8 +1676,8 @@ TRACE_EVENT(rxrpc_retransmit, =20 TP_fast_assign( __entry->call =3D call->debug_id; - __entry->seq =3D seq; - __entry->serial =3D serial; + __entry->seq =3D req->seq; + __entry->serial =3D txb->serial; __entry->expiry =3D expiry; ), =20 @@ -1714,9 +1755,9 @@ TRACE_EVENT(rxrpc_reset_cwnd, __entry->cwnd =3D call->cong_cwnd; __entry->extra =3D call->cong_extra; __entry->hard_ack =3D call->acks_hard_ack; - __entry->prepared =3D call->tx_prepared - call->tx_bottom; + __entry->prepared =3D call->send_top - call->tx_bottom; __entry->since_last_tx =3D ktime_sub(now, call->tx_last_sent); - __entry->has_data =3D !list_empty(&call->tx_sendmsg); + __entry->has_data =3D call->tx_bottom !=3D call->tx_top; ), =20 TP_printk("c=3D%08x q=3D%08x %s cw=3D%u+%u pr=3D%u tm=3D%llu d=3D%u", @@ -2024,6 +2065,33 @@ TRACE_EVENT(rxrpc_txbuf, __entry->ref) ); =20 +TRACE_EVENT(rxrpc_tq, + TP_PROTO(struct rxrpc_call *call, struct rxrpc_txqueue *tq, + rxrpc_seq_t seq, enum rxrpc_tq_trace trace), + + TP_ARGS(call, tq, seq, trace), + + TP_STRUCT__entry( + __field(unsigned int, call_debug_id) + __field(rxrpc_seq_t, qbase) + __field(rxrpc_seq_t, seq) + __field(enum rxrpc_tq_trace, trace) + ), + + TP_fast_assign( + __entry->call_debug_id =3D call->debug_id; + __entry->qbase =3D tq ? tq->qbase : call->tx_qbase; + __entry->seq =3D seq; + __entry->trace =3D trace; + ), + + TP_printk("c=3D%08x bq=3D%08x q=3D%08x %s", + __entry->call_debug_id, + __entry->qbase, + __entry->seq, + __print_symbolic(__entry->trace, rxrpc_tq_traces)) + ); + TRACE_EVENT(rxrpc_poke_call, TP_PROTO(struct rxrpc_call *call, bool busy, enum rxrpc_call_poke_trace what), diff --git a/net/rxrpc/ar-internal.h b/net/rxrpc/ar-internal.h index 84efa21f176c..bcce4862b0b7 100644 --- a/net/rxrpc/ar-internal.h +++ b/net/rxrpc/ar-internal.h @@ -30,6 +30,7 @@ struct rxrpc_crypt { struct key_preparsed_payload; struct rxrpc_connection; struct rxrpc_txbuf; +struct rxrpc_txqueue; =20 /* * Mark applied to socket buffers in skb->mark. skb->priority is used @@ -691,13 +692,17 @@ struct rxrpc_call { unsigned short rx_pkt_offset; /* Current recvmsg packet offset */ unsigned short rx_pkt_len; /* Current recvmsg packet len */ =20 + /* Sendmsg data tracking. */ + rxrpc_seq_t send_top; /* Highest Tx slot filled by sendmsg. */ + struct rxrpc_txqueue *send_queue; /* Queue that sendmsg is writing into */ + /* Transmitted data tracking. */ spinlock_t tx_lock; /* Transmit queue lock */ - struct list_head tx_sendmsg; /* Sendmsg prepared packets */ - struct list_head tx_buffer; /* Buffer of transmissible packets */ + struct rxrpc_txqueue *tx_queue; /* Start of transmission buffers */ + struct rxrpc_txqueue *tx_qtail; /* End of transmission buffers */ + rxrpc_seq_t tx_qbase; /* First slot in tx_queue */ rxrpc_seq_t tx_bottom; /* First packet in buffer */ rxrpc_seq_t tx_transmitted; /* Highest packet transmitted */ - rxrpc_seq_t tx_prepared; /* Highest Tx slot prepared. */ rxrpc_seq_t tx_top; /* Highest Tx slot allocated. */ u16 tx_backoff; /* Delay to insert due to Tx failure (ms) */ u8 tx_winsize; /* Maximum size of Tx window */ @@ -815,9 +820,6 @@ struct rxrpc_send_params { * Buffer of data to be output as a packet. */ struct rxrpc_txbuf { - struct list_head call_link; /* Link in call->tx_sendmsg/tx_buffer */ - struct list_head tx_link; /* Link in live Enc queue or Tx queue */ - ktime_t last_sent; /* Time at which last transmitted */ refcount_t ref; rxrpc_seq_t seq; /* Sequence number of this packet */ rxrpc_serial_t serial; /* Last serial number transmitted with */ @@ -849,6 +851,36 @@ static inline bool rxrpc_sending_to_client(const struc= t rxrpc_txbuf *txb) return !rxrpc_sending_to_server(txb); } =20 +/* + * Transmit queue element, including RACK [RFC8985] per-segment metadata. = The + * transmission timestamp is in usec from the base. + */ +struct rxrpc_txqueue { + /* Start with the members we want to prefetch. */ + struct rxrpc_txqueue *next; + ktime_t xmit_ts_base; + rxrpc_seq_t qbase; + + /* The arrays we want to pack into as few cache lines as possible. */ + struct { +#define RXRPC_NR_TXQUEUE BITS_PER_LONG +#define RXRPC_TXQ_MASK (RXRPC_NR_TXQUEUE - 1) + struct rxrpc_txbuf *bufs[RXRPC_NR_TXQUEUE]; + unsigned int segment_xmit_ts[RXRPC_NR_TXQUEUE]; + } ____cacheline_aligned; +}; + +/* + * Data transmission request. + */ +struct rxrpc_send_data_req { + ktime_t now; /* Current time */ + struct rxrpc_txqueue *tq; /* Tx queue segment holding first DATA */ + rxrpc_seq_t seq; /* Sequence of first data */ + int n; /* Number of DATA packets to glue into jumbo */ + bool did_send; /* T if did actually send */ +}; + #include =20 /* @@ -905,7 +937,6 @@ void rxrpc_propose_ping(struct rxrpc_call *call, u32 se= rial, enum rxrpc_propose_ack_trace why); void rxrpc_propose_delay_ACK(struct rxrpc_call *, rxrpc_serial_t, enum rxrpc_propose_ack_trace); -void rxrpc_shrink_call_tx_buffer(struct rxrpc_call *); void rxrpc_resend(struct rxrpc_call *call, struct sk_buff *ack_skb); =20 bool rxrpc_input_call_event(struct rxrpc_call *call); @@ -1191,10 +1222,10 @@ void rxrpc_send_ACK(struct rxrpc_call *call, u8 ack= _reason, rxrpc_serial_t serial, enum rxrpc_propose_ack_trace why); void rxrpc_send_probe_for_pmtud(struct rxrpc_call *call); int rxrpc_send_abort_packet(struct rxrpc_call *); +void rxrpc_send_data_packet(struct rxrpc_call *call, struct rxrpc_send_dat= a_req *req); void rxrpc_send_conn_abort(struct rxrpc_connection *conn); void rxrpc_reject_packet(struct rxrpc_local *local, struct sk_buff *skb); void rxrpc_send_keepalive(struct rxrpc_peer *); -void rxrpc_transmit_data(struct rxrpc_call *call, struct rxrpc_txbuf *txb,= int n); =20 /* * peer_event.c diff --git a/net/rxrpc/call_event.c b/net/rxrpc/call_event.c index ef47de3f41c6..90e3d9395675 100644 --- a/net/rxrpc/call_event.c +++ b/net/rxrpc/call_event.c @@ -62,57 +62,85 @@ static void rxrpc_congestion_timeout(struct rxrpc_call = *call) set_bit(RXRPC_CALL_RETRANS_TIMEOUT, &call->flags); } =20 +/* + * Retransmit one or more packets. + */ +static void rxrpc_retransmit_data(struct rxrpc_call *call, + struct rxrpc_send_data_req *req, + ktime_t rto) +{ + struct rxrpc_txqueue *tq =3D req->tq; + unsigned int ix =3D req->seq & RXRPC_TXQ_MASK; + struct rxrpc_txbuf *txb =3D tq->bufs[ix]; + ktime_t xmit_ts, resend_at; + + _enter("%x,%x,%x,%x", tq->qbase, req->seq, ix, txb->debug_id); + + xmit_ts =3D ktime_add_us(tq->xmit_ts_base, tq->segment_xmit_ts[ix]); + resend_at =3D ktime_add(xmit_ts, rto); + trace_rxrpc_retransmit(call, req, txb, + ktime_sub(resend_at, req->now)); + + txb->flags |=3D RXRPC_TXBUF_RESENT; + rxrpc_send_data_packet(call, req); + rxrpc_inc_stat(call->rxnet, stat_tx_data_retrans); + + req->tq =3D NULL; + req->n =3D 0; + req->did_send =3D true; + req->now =3D ktime_get_real(); +} + /* * Perform retransmission of NAK'd and unack'd packets. */ void rxrpc_resend(struct rxrpc_call *call, struct sk_buff *ack_skb) { + struct rxrpc_send_data_req req =3D { + .now =3D ktime_get_real(), + }; struct rxrpc_ackpacket *ack =3D NULL; struct rxrpc_skb_priv *sp; + struct rxrpc_txqueue *tq; struct rxrpc_txbuf *txb; - rxrpc_seq_t transmitted =3D call->tx_transmitted; + rxrpc_seq_t transmitted =3D call->tx_transmitted, seq; ktime_t next_resend =3D KTIME_MAX, rto =3D ns_to_ktime(call->peer->rto_us= * NSEC_PER_USEC); - ktime_t resend_at =3D KTIME_MAX, now, delay; + ktime_t resend_at =3D KTIME_MAX, delay; bool unacked =3D false, did_send =3D false; - unsigned int i; + unsigned int qix; =20 _enter("{%d,%d}", call->acks_hard_ack, call->tx_top); =20 - now =3D ktime_get_real(); - - if (list_empty(&call->tx_buffer)) + if (call->tx_bottom =3D=3D call->tx_top) goto no_resend; =20 trace_rxrpc_resend(call, ack_skb); - txb =3D list_first_entry(&call->tx_buffer, struct rxrpc_txbuf, call_link); + tq =3D call->tx_queue; + seq =3D call->tx_bottom; =20 - /* Scan the soft ACK table without dropping the lock and resend any - * explicitly NAK'd packets. - */ + /* Scan the soft ACK table and resend any explicitly NAK'd packets. */ if (ack_skb) { sp =3D rxrpc_skb(ack_skb); ack =3D (void *)ack_skb->data + sizeof(struct rxrpc_wire_header); =20 - for (i =3D 0; i < sp->ack.nr_acks; i++) { - rxrpc_seq_t seq; + for (int i =3D 0; i < sp->ack.nr_acks; i++) { + rxrpc_seq_t aseq; =20 if (ack->acks[i] & 1) continue; - seq =3D sp->ack.first_ack + i; - if (after(txb->seq, transmitted)) - break; - if (after(txb->seq, seq)) - continue; /* A new hard ACK probably came in */ - list_for_each_entry_from(txb, &call->tx_buffer, call_link) { - if (txb->seq =3D=3D seq) - goto found_txb; - } - goto no_further_resend; + aseq =3D sp->ack.first_ack + i; + while (after_eq(aseq, tq->qbase + RXRPC_NR_TXQUEUE)) + tq =3D tq->next; + seq =3D aseq; + qix =3D seq - tq->qbase; + txb =3D tq->bufs[qix]; + if (after(seq, transmitted)) + goto no_further_resend; =20 - found_txb: - resend_at =3D ktime_add(txb->last_sent, rto); + resend_at =3D ktime_add_us(tq->xmit_ts_base, tq->segment_xmit_ts[qix]); + resend_at =3D ktime_add(resend_at, rto); if (after(txb->serial, call->acks_highest_serial)) { - if (ktime_after(resend_at, now) && + if (ktime_after(resend_at, req.now) && ktime_before(resend_at, next_resend)) next_resend =3D resend_at; continue; /* Ack point not yet reached */ @@ -120,17 +148,13 @@ void rxrpc_resend(struct rxrpc_call *call, struct sk_= buff *ack_skb) =20 rxrpc_see_txbuf(txb, rxrpc_txbuf_see_unacked); =20 - trace_rxrpc_retransmit(call, txb->seq, txb->serial, - ktime_sub(resend_at, now)); - - txb->flags |=3D RXRPC_TXBUF_RESENT; - rxrpc_transmit_data(call, txb, 1); - did_send =3D true; - now =3D ktime_get_real(); + req.tq =3D tq; + req.seq =3D seq; + req.n =3D 1; + rxrpc_retransmit_data(call, &req, rto); =20 - if (list_is_last(&txb->call_link, &call->tx_buffer)) + if (after_eq(seq, call->tx_top)) goto no_further_resend; - txb =3D list_next_entry(txb, call_link); } } =20 @@ -139,35 +163,43 @@ void rxrpc_resend(struct rxrpc_call *call, struct sk_= buff *ack_skb) * ACK'd or NACK'd in due course, so don't worry about it here; here we * need to consider retransmitting anything beyond that point. */ - if (after_eq(call->acks_prev_seq, call->tx_transmitted)) + seq =3D call->acks_prev_seq; + if (after_eq(seq, call->tx_transmitted)) goto no_further_resend; + seq++; =20 - list_for_each_entry_from(txb, &call->tx_buffer, call_link) { - resend_at =3D ktime_add(txb->last_sent, rto); + while (after_eq(seq, tq->qbase + RXRPC_NR_TXQUEUE)) + tq =3D tq->next; =20 - if (before_eq(txb->seq, call->acks_prev_seq)) + while (before_eq(seq, call->tx_transmitted)) { + qix =3D seq - tq->qbase; + if (qix >=3D RXRPC_NR_TXQUEUE) { + tq =3D tq->next; continue; - if (after(txb->seq, call->tx_transmitted)) - break; /* Not transmitted yet */ + } + txb =3D tq->bufs[qix]; + resend_at =3D ktime_add_us(tq->xmit_ts_base, tq->segment_xmit_ts[qix]); + resend_at =3D ktime_add(resend_at, rto); =20 if (ack && ack->reason =3D=3D RXRPC_ACK_PING_RESPONSE && before(txb->serial, ntohl(ack->serial))) goto do_resend; /* Wasn't accounted for by a more recent ping. */ =20 - if (ktime_after(resend_at, now)) { + if (ktime_after(resend_at, req.now)) { if (ktime_before(resend_at, next_resend)) next_resend =3D resend_at; + seq++; continue; } =20 do_resend: unacked =3D true; =20 - txb->flags |=3D RXRPC_TXBUF_RESENT; - rxrpc_transmit_data(call, txb, 1); - did_send =3D true; - rxrpc_inc_stat(call->rxnet, stat_tx_data_retrans); - now =3D ktime_get_real(); + req.tq =3D tq; + req.seq =3D seq; + req.n =3D 1; + rxrpc_retransmit_data(call, &req, rto); + seq++; } =20 no_further_resend: @@ -175,7 +207,8 @@ void rxrpc_resend(struct rxrpc_call *call, struct sk_bu= ff *ack_skb) if (resend_at < KTIME_MAX) { delay =3D rxrpc_get_rto_backoff(call->peer, did_send); resend_at =3D ktime_add(resend_at, delay); - trace_rxrpc_timer_set(call, resend_at - now, rxrpc_timer_trace_resend_re= set); + trace_rxrpc_timer_set(call, resend_at - req.now, + rxrpc_timer_trace_resend_reset); } call->resend_at =3D resend_at; =20 @@ -186,11 +219,11 @@ void rxrpc_resend(struct rxrpc_call *call, struct sk_= buff *ack_skb) * that an ACK got lost somewhere. Send a ping to find out instead of * retransmitting data. */ - if (!did_send) { + if (!req.did_send) { ktime_t next_ping =3D ktime_add_us(call->acks_latest_ts, call->peer->srtt_us >> 3); =20 - if (ktime_sub(next_ping, now) <=3D 0) + if (ktime_sub(next_ping, req.now) <=3D 0) rxrpc_send_ACK(call, RXRPC_ACK_PING, 0, rxrpc_propose_ack_ping_for_0_retrans); } @@ -240,47 +273,68 @@ static unsigned int rxrpc_tx_window_space(struct rxrp= c_call *call) } =20 /* - * Decant some if the sendmsg prepared queue into the transmission buffer. + * Transmit some as-yet untransmitted data. */ -static void rxrpc_decant_prepared_tx(struct rxrpc_call *call) +static void rxrpc_transmit_fresh_data(struct rxrpc_call *call) { int space =3D rxrpc_tx_window_space(call); =20 if (!test_bit(RXRPC_CALL_EXPOSED, &call->flags)) { - if (list_empty(&call->tx_sendmsg)) + if (call->send_top =3D=3D call->tx_top) return; rxrpc_expose_client_call(call); } =20 while (space > 0) { - struct rxrpc_txbuf *head =3D NULL, *txb; - int count =3D 0, limit =3D min(space, 1); - - if (list_empty(&call->tx_sendmsg)) + struct rxrpc_send_data_req req =3D { + .now =3D ktime_get_real(), + .seq =3D call->tx_transmitted + 1, + .n =3D 0, + }; + struct rxrpc_txqueue *tq; + struct rxrpc_txbuf *txb; + rxrpc_seq_t send_top, seq; + int limit =3D min(space, 1); + + /* Order send_top before the contents of the new txbufs and + * txqueue pointers + */ + send_top =3D smp_load_acquire(&call->send_top); + if (call->tx_top =3D=3D send_top) break; =20 - trace_rxrpc_transmit(call, space); + trace_rxrpc_transmit(call, send_top, space); + + tq =3D call->tx_qtail; + seq =3D call->tx_top; + trace_rxrpc_tq(call, tq, seq, rxrpc_tq_decant); =20 - spin_lock(&call->tx_lock); do { - txb =3D list_first_entry(&call->tx_sendmsg, - struct rxrpc_txbuf, call_link); - if (!head) - head =3D txb; - list_move_tail(&txb->call_link, &call->tx_buffer); - count++; + int ix; + + seq++; + ix =3D seq & RXRPC_TXQ_MASK; + if (!ix) { + tq =3D tq->next; + trace_rxrpc_tq(call, tq, seq, rxrpc_tq_decant_advance); + } + if (!req.tq) + req.tq =3D tq; + txb =3D tq->bufs[ix]; + req.n++; if (!txb->jumboable) break; - } while (count < limit && !list_empty(&call->tx_sendmsg)); + } while (req.n < limit && before(seq, send_top)); =20 - spin_unlock(&call->tx_lock); - - call->tx_top =3D txb->seq; - if (txb->flags & RXRPC_LAST_PACKET) + if (txb->flags & RXRPC_LAST_PACKET) { rxrpc_close_tx_phase(call); + tq =3D NULL; + } + call->tx_qtail =3D tq; + call->tx_top =3D seq; =20 - space -=3D count; - rxrpc_transmit_data(call, head, count); + space -=3D req.n; + rxrpc_send_data_packet(call, &req); } } =20 @@ -288,7 +342,7 @@ static void rxrpc_transmit_some_data(struct rxrpc_call = *call) { switch (__rxrpc_call_state(call)) { case RXRPC_CALL_SERVER_ACK_REQUEST: - if (list_empty(&call->tx_sendmsg)) + if (call->tx_bottom =3D=3D READ_ONCE(call->send_top)) return; rxrpc_begin_service_reply(call); fallthrough; @@ -297,11 +351,11 @@ static void rxrpc_transmit_some_data(struct rxrpc_cal= l *call) case RXRPC_CALL_CLIENT_SEND_REQUEST: if (!rxrpc_tx_window_space(call)) return; - if (list_empty(&call->tx_sendmsg)) { + if (call->tx_bottom =3D=3D READ_ONCE(call->send_top)) { rxrpc_inc_stat(call->rxnet, stat_tx_data_underflow); return; } - rxrpc_decant_prepared_tx(call); + rxrpc_transmit_fresh_data(call); break; default: return; @@ -503,8 +557,6 @@ bool rxrpc_input_call_event(struct rxrpc_call *call) call->peer->pmtud_pending) rxrpc_send_probe_for_pmtud(call); } - if (call->acks_hard_ack !=3D call->tx_bottom) - rxrpc_shrink_call_tx_buffer(call); _leave(""); return true; =20 diff --git a/net/rxrpc/call_object.c b/net/rxrpc/call_object.c index c026f16f891e..a9682b31a4f9 100644 --- a/net/rxrpc/call_object.c +++ b/net/rxrpc/call_object.c @@ -146,8 +146,6 @@ struct rxrpc_call *rxrpc_alloc_call(struct rxrpc_sock *= rx, gfp_t gfp, INIT_LIST_HEAD(&call->recvmsg_link); INIT_LIST_HEAD(&call->sock_link); INIT_LIST_HEAD(&call->attend_link); - INIT_LIST_HEAD(&call->tx_sendmsg); - INIT_LIST_HEAD(&call->tx_buffer); skb_queue_head_init(&call->rx_queue); skb_queue_head_init(&call->recvmsg_queue); skb_queue_head_init(&call->rx_oos_queue); @@ -532,9 +530,26 @@ void rxrpc_get_call(struct rxrpc_call *call, enum rxrp= c_call_trace why) } =20 /* - * Clean up the Rx skb ring. + * Clean up the transmission buffers. */ -static void rxrpc_cleanup_ring(struct rxrpc_call *call) +static void rxrpc_cleanup_tx_buffers(struct rxrpc_call *call) +{ + struct rxrpc_txqueue *tq, *next; + + for (tq =3D call->tx_queue; tq; tq =3D next) { + next =3D tq->next; + for (int i =3D 0; i < RXRPC_NR_TXQUEUE; i++) + if (tq->bufs[i]) + rxrpc_put_txbuf(tq->bufs[i], rxrpc_txbuf_put_cleaned); + trace_rxrpc_tq(call, tq, 0, rxrpc_tq_cleaned); + kfree(tq); + } +} + +/* + * Clean up the receive buffers. + */ +static void rxrpc_cleanup_rx_buffers(struct rxrpc_call *call) { rxrpc_purge_queue(&call->recvmsg_queue); rxrpc_purge_queue(&call->rx_queue); @@ -673,23 +688,12 @@ static void rxrpc_rcu_free_call(struct rcu_head *rcu) static void rxrpc_destroy_call(struct work_struct *work) { struct rxrpc_call *call =3D container_of(work, struct rxrpc_call, destroy= er); - struct rxrpc_txbuf *txb; =20 del_timer_sync(&call->timer); =20 rxrpc_free_skb(call->cong_last_nack, rxrpc_skb_put_last_nack); - rxrpc_cleanup_ring(call); - while ((txb =3D list_first_entry_or_null(&call->tx_sendmsg, - struct rxrpc_txbuf, call_link))) { - list_del(&txb->call_link); - rxrpc_put_txbuf(txb, rxrpc_txbuf_put_cleaned); - } - while ((txb =3D list_first_entry_or_null(&call->tx_buffer, - struct rxrpc_txbuf, call_link))) { - list_del(&txb->call_link); - rxrpc_put_txbuf(txb, rxrpc_txbuf_put_cleaned); - } - + rxrpc_cleanup_tx_buffers(call); + rxrpc_cleanup_rx_buffers(call); rxrpc_put_txbuf(call->tx_pending, rxrpc_txbuf_put_cleaned); rxrpc_put_connection(call->conn, rxrpc_conn_put_call); rxrpc_deactivate_bundle(call->bundle); diff --git a/net/rxrpc/input.c b/net/rxrpc/input.c index 96fe005c5e81..cfdd23042d4c 100644 --- a/net/rxrpc/input.c +++ b/net/rxrpc/input.c @@ -214,24 +214,71 @@ void rxrpc_congestion_degrade(struct rxrpc_call *call) static bool rxrpc_rotate_tx_window(struct rxrpc_call *call, rxrpc_seq_t to, struct rxrpc_ack_summary *summary) { - struct rxrpc_txbuf *txb; + struct rxrpc_txqueue *tq =3D call->tx_queue; + rxrpc_seq_t seq =3D call->tx_bottom + 1; bool rot_last =3D false; =20 - list_for_each_entry_rcu(txb, &call->tx_buffer, call_link, false) { - if (before_eq(txb->seq, call->acks_hard_ack)) - continue; - if (txb->flags & RXRPC_LAST_PACKET) { + _enter("%x,%x,%x", call->tx_bottom, call->acks_hard_ack, to); + + trace_rxrpc_tx_rotate(call, seq, to); + trace_rxrpc_tq(call, tq, seq, rxrpc_tq_rotate); + + /* We may have a left over fully-consumed buffer at the front that we + * couldn't drop before (rotate_and_keep below). + */ + if (seq =3D=3D call->tx_qbase + RXRPC_NR_TXQUEUE) { + call->tx_qbase +=3D RXRPC_NR_TXQUEUE; + call->tx_queue =3D tq->next; + trace_rxrpc_tq(call, tq, seq, rxrpc_tq_rotate_and_free); + kfree(tq); + tq =3D call->tx_queue; + } + + do { + unsigned int ix =3D seq - call->tx_qbase; + + _debug("tq=3D%x seq=3D%x i=3D%d f=3D%x", tq->qbase, seq, ix, tq->bufs[ix= ]->flags); + if (tq->bufs[ix]->flags & RXRPC_LAST_PACKET) { set_bit(RXRPC_CALL_TX_LAST, &call->flags); rot_last =3D true; } - if (txb->seq =3D=3D to) - break; - } + rxrpc_put_txbuf(tq->bufs[ix], rxrpc_txbuf_put_rotated); + tq->bufs[ix] =3D NULL; + + WRITE_ONCE(call->tx_bottom, seq); + WRITE_ONCE(call->acks_hard_ack, seq); + trace_rxrpc_txqueue(call, (rot_last ? + rxrpc_txqueue_rotate_last : + rxrpc_txqueue_rotate)); =20 - if (rot_last) + seq++; + if (!(seq & RXRPC_TXQ_MASK)) { + prefetch(tq->next); + if (tq !=3D call->tx_qtail) { + call->tx_qbase +=3D RXRPC_NR_TXQUEUE; + call->tx_queue =3D tq->next; + trace_rxrpc_tq(call, tq, seq, rxrpc_tq_rotate_and_free); + kfree(tq); + tq =3D call->tx_queue; + } else { + trace_rxrpc_tq(call, tq, seq, rxrpc_tq_rotate_and_keep); + tq =3D NULL; + break; + } + } + + } while (before_eq(seq, to)); + + if (rot_last) { set_bit(RXRPC_CALL_TX_ALL_ACKED, &call->flags); + if (tq) { + trace_rxrpc_tq(call, tq, seq, rxrpc_tq_rotate_and_free); + kfree(tq); + call->tx_queue =3D NULL; + } + } =20 - _enter("%x,%x,%x,%d", to, call->acks_hard_ack, call->tx_top, rot_last); + _debug("%x,%x,%x,%d", to, call->acks_hard_ack, call->tx_top, rot_last); =20 if (call->acks_lowest_nak =3D=3D call->acks_hard_ack) { call->acks_lowest_nak =3D to; @@ -240,11 +287,6 @@ static bool rxrpc_rotate_tx_window(struct rxrpc_call *= call, rxrpc_seq_t to, call->acks_lowest_nak =3D to; } =20 - smp_store_release(&call->acks_hard_ack, to); - - trace_rxrpc_txqueue(call, (rot_last ? - rxrpc_txqueue_rotate_last : - rxrpc_txqueue_rotate)); wake_up(&call->waitq); return rot_last; } diff --git a/net/rxrpc/output.c b/net/rxrpc/output.c index 400c3389d492..c2044d593237 100644 --- a/net/rxrpc/output.c +++ b/net/rxrpc/output.c @@ -375,10 +375,10 @@ int rxrpc_send_abort_packet(struct rxrpc_call *call) /* * Prepare a (sub)packet for transmission. */ -static size_t rxrpc_prepare_data_subpacket(struct rxrpc_call *call, struct= rxrpc_txbuf *txb, - rxrpc_serial_t serial, - int subpkt, int nr_subpkts, - ktime_t now) +static size_t rxrpc_prepare_data_subpacket(struct rxrpc_call *call, + struct rxrpc_send_data_req *req, + struct rxrpc_txbuf *txb, + rxrpc_serial_t serial, int subpkt) { struct rxrpc_wire_header *whdr =3D txb->kvec[0].iov_base; struct rxrpc_jumbo_header *jumbo =3D (void *)(whdr + 1) - sizeof(*jumbo); @@ -386,7 +386,7 @@ static size_t rxrpc_prepare_data_subpacket(struct rxrpc= _call *call, struct rxrpc struct rxrpc_connection *conn =3D call->conn; struct kvec *kv =3D &call->local->kvec[subpkt]; size_t len =3D txb->pkt_len; - bool last, more; + bool last; u8 flags; =20 _enter("%x,%zd", txb->seq, len); @@ -401,14 +401,11 @@ static size_t rxrpc_prepare_data_subpacket(struct rxr= pc_call *call, struct rxrpc flags =3D txb->flags & RXRPC_TXBUF_WIRE_FLAGS; last =3D txb->flags & RXRPC_LAST_PACKET; =20 - if (subpkt < nr_subpkts - 1) { + if (subpkt < req->n - 1) { len =3D RXRPC_JUMBO_DATALEN; goto dont_set_request_ack; } =20 - more =3D (!list_is_last(&txb->call_link, &call->tx_buffer) || - !list_empty(&call->tx_sendmsg)); - /* If our RTT cache needs working on, request an ACK. Also request * ACKs if a DATA packet appears to have been lost. * @@ -430,7 +427,7 @@ static size_t rxrpc_prepare_data_subpacket(struct rxrpc= _call *call, struct rxrpc why =3D rxrpc_reqack_more_rtt; else if (ktime_before(ktime_add_ms(call->peer->rtt_last_req, 1000), ktime= _get_real())) why =3D rxrpc_reqack_old_rtt; - else if (!last && !more) + else if (!last && !after(READ_ONCE(call->send_top), txb->seq)) why =3D rxrpc_reqack_app_stall; else goto dont_set_request_ack; @@ -439,13 +436,13 @@ static size_t rxrpc_prepare_data_subpacket(struct rxr= pc_call *call, struct rxrpc trace_rxrpc_req_ack(call->debug_id, txb->seq, why); if (why !=3D rxrpc_reqack_no_srv_last) { flags |=3D RXRPC_REQUEST_ACK; - rxrpc_begin_rtt_probe(call, serial, now, rxrpc_rtt_tx_data); - call->peer->rtt_last_req =3D now; + rxrpc_begin_rtt_probe(call, serial, req->now, rxrpc_rtt_tx_data); + call->peer->rtt_last_req =3D req->now; } dont_set_request_ack: =20 /* The jumbo header overlays the wire header in the txbuf. */ - if (subpkt < nr_subpkts - 1) + if (subpkt < req->n - 1) flags |=3D RXRPC_JUMBO_PACKET; else flags &=3D ~RXRPC_JUMBO_PACKET; @@ -469,62 +466,100 @@ static size_t rxrpc_prepare_data_subpacket(struct rx= rpc_call *call, struct rxrpc return len; } =20 +/* + * Prepare a transmission queue object for initial transmission. Returns = the + * number of microseconds since the transmission queue base timestamp. + */ +static unsigned int rxrpc_prepare_txqueue(struct rxrpc_txqueue *tq, + struct rxrpc_send_data_req *req) +{ + if (!tq) + return 0; + if (tq->xmit_ts_base =3D=3D KTIME_MIN) { + tq->xmit_ts_base =3D req->now; + return 0; + } + return ktime_to_us(ktime_sub(req->now, tq->xmit_ts_base)); +} + /* * Prepare a (jumbo) packet for transmission. */ -static size_t rxrpc_prepare_data_packet(struct rxrpc_call *call, struct rx= rpc_txbuf *head, int n) +static size_t rxrpc_prepare_data_packet(struct rxrpc_call *call, struct rx= rpc_send_data_req *req) { - struct rxrpc_txbuf *txb =3D head; + struct rxrpc_txqueue *tq =3D req->tq; rxrpc_serial_t serial; - ktime_t now =3D ktime_get_real(); + unsigned int xmit_ts; + rxrpc_seq_t seq =3D req->seq; size_t len =3D 0; =20 + trace_rxrpc_tq(call, tq, seq, rxrpc_tq_transmit); + /* Each transmission of a Tx packet needs a new serial number */ - serial =3D rxrpc_get_next_serials(call->conn, n); + serial =3D rxrpc_get_next_serials(call->conn, req->n); =20 - for (int i =3D 0; i < n; i++) { - txb->last_sent =3D now; - len +=3D rxrpc_prepare_data_subpacket(call, txb, serial, i, n, now); - serial++; - txb =3D list_next_entry(txb, call_link); - } + call->tx_last_sent =3D req->now; + xmit_ts =3D rxrpc_prepare_txqueue(tq, req); + prefetch(tq->next); =20 - /* Set timeouts */ - if (call->peer->rtt_count > 1) { - ktime_t delay =3D rxrpc_get_rto_backoff(call->peer, false); + for (int i =3D 0;;) { + int ix =3D seq & RXRPC_TXQ_MASK; + struct rxrpc_txbuf *txb =3D tq->bufs[seq & RXRPC_TXQ_MASK]; =20 - call->ack_lost_at =3D ktime_add(now, delay); - trace_rxrpc_timer_set(call, delay, rxrpc_timer_trace_lost_ack); + _debug("prep[%u] tq=3D%x q=3D%x", i, tq->qbase, seq); + tq->segment_xmit_ts[ix] =3D xmit_ts; + len +=3D rxrpc_prepare_data_subpacket(call, req, txb, serial, i); + serial++; + seq++; + i++; + if (i >=3D req->n) + break; + if (!(seq & RXRPC_TXQ_MASK)) { + tq =3D tq->next; + trace_rxrpc_tq(call, tq, seq, rxrpc_tq_transmit_advance); + xmit_ts =3D rxrpc_prepare_txqueue(tq, req); + } } =20 + /* Set timeouts */ if (!test_and_set_bit(RXRPC_CALL_BEGAN_RX_TIMER, &call->flags)) { ktime_t delay =3D ms_to_ktime(READ_ONCE(call->next_rx_timo)); =20 - call->expect_rx_by =3D ktime_add(now, delay); + call->expect_rx_by =3D ktime_add(req->now, delay); trace_rxrpc_timer_set(call, delay, rxrpc_timer_trace_expect_rx); } + if (call->resend_at =3D=3D KTIME_MAX) { + ktime_t delay =3D rxrpc_get_rto_backoff(call->peer, false); + + call->resend_at =3D ktime_add(req->now, delay); + trace_rxrpc_timer_set(call, delay, rxrpc_timer_trace_resend); + } =20 - rxrpc_set_keepalive(call, now); + rxrpc_set_keepalive(call, req->now); return len; } =20 /* - * send a packet through the transport endpoint + * Send one or more packets through the transport endpoint */ -static int rxrpc_send_data_packet(struct rxrpc_call *call, struct rxrpc_tx= buf *txb, int n) +void rxrpc_send_data_packet(struct rxrpc_call *call, struct rxrpc_send_dat= a_req *req) { struct rxrpc_connection *conn =3D call->conn; enum rxrpc_tx_point frag; + struct rxrpc_txqueue *tq =3D req->tq; + struct rxrpc_txbuf *txb; struct msghdr msg; + rxrpc_seq_t seq =3D req->seq; size_t len; bool new_call =3D test_bit(RXRPC_CALL_BEGAN_RX_TIMER, &call->flags); int ret; =20 - _enter("%x,{%d}", txb->seq, txb->pkt_len); + _enter("%x,%x-%x", tq->qbase, seq, seq + req->n - 1); =20 - len =3D rxrpc_prepare_data_packet(call, txb, n); + len =3D rxrpc_prepare_data_packet(call, req); + txb =3D tq->bufs[seq & RXRPC_TXQ_MASK]; =20 - iov_iter_kvec(&msg.msg_iter, WRITE, call->local->kvec, n, len); + iov_iter_kvec(&msg.msg_iter, WRITE, call->local->kvec, req->n, len); =20 msg.msg_name =3D &call->peer->srx.transport; msg.msg_namelen =3D call->peer->srx.transport_len; @@ -535,7 +570,7 @@ static int rxrpc_send_data_packet(struct rxrpc_call *ca= ll, struct rxrpc_txbuf *t /* Send the packet with the don't fragment bit set unless we think it's * too big or if this is a retransmission. */ - if (txb->seq =3D=3D call->tx_transmitted + 1 && + if (seq =3D=3D call->tx_transmitted + 1 && len >=3D sizeof(struct rxrpc_wire_header) + call->peer->max_data) { rxrpc_local_dont_fragment(conn->local, false); frag =3D rxrpc_tx_point_call_data_frag; @@ -548,8 +583,8 @@ static int rxrpc_send_data_packet(struct rxrpc_call *ca= ll, struct rxrpc_txbuf *t * retransmission algorithm doesn't try to resend what we haven't sent * yet. */ - if (txb->seq =3D=3D call->tx_transmitted + 1) - call->tx_transmitted =3D txb->seq + n - 1; + if (seq =3D=3D call->tx_transmitted + 1) + call->tx_transmitted =3D seq + req->n - 1; =20 if (IS_ENABLED(CONFIG_AF_RXRPC_INJECT_LOSS)) { static int lose; @@ -586,19 +621,21 @@ static int rxrpc_send_data_packet(struct rxrpc_call *= call, struct rxrpc_txbuf *t rxrpc_tx_backoff(call, ret); =20 if (ret < 0) { - /* Cancel the call if the initial transmission fails, - * particularly if that's due to network routing issues that - * aren't going away anytime soon. The layer above can arrange - * the retransmission. + /* Cancel the call if the initial transmission fails or if we + * hit due to network routing issues that aren't going away + * anytime soon. The layer above can arrange the + * retransmission. */ - if (new_call) + if (new_call || + ret =3D=3D -ENETUNREACH || + ret =3D=3D -EHOSTUNREACH || + ret =3D=3D -ECONNREFUSED) rxrpc_set_call_completion(call, RXRPC_CALL_LOCAL_ERROR, RX_USER_ABORT, ret); } =20 done: _leave(" =3D %d [%u]", ret, call->peer->max_data); - return ret; } =20 /* @@ -773,41 +810,3 @@ void rxrpc_send_keepalive(struct rxrpc_peer *peer) peer->last_tx_at =3D ktime_get_seconds(); _leave(""); } - -/* - * Schedule an instant Tx resend. - */ -static inline void rxrpc_instant_resend(struct rxrpc_call *call, - struct rxrpc_txbuf *txb) -{ - if (!__rxrpc_call_is_complete(call)) - kdebug("resend"); -} - -/* - * Transmit a packet, possibly gluing several subpackets together. - */ -void rxrpc_transmit_data(struct rxrpc_call *call, struct rxrpc_txbuf *txb,= int n) -{ - int ret; - - ret =3D rxrpc_send_data_packet(call, txb, n); - if (ret < 0) { - switch (ret) { - case -ENETUNREACH: - case -EHOSTUNREACH: - case -ECONNREFUSED: - rxrpc_set_call_completion(call, RXRPC_CALL_LOCAL_ERROR, - 0, ret); - break; - default: - _debug("need instant resend %d", ret); - rxrpc_instant_resend(call, txb); - } - } else { - ktime_t delay =3D ns_to_ktime(call->peer->rto_us * NSEC_PER_USEC); - - call->resend_at =3D ktime_add(ktime_get_real(), delay); - trace_rxrpc_timer_set(call, delay, rxrpc_timer_trace_resend_tx); - } -} diff --git a/net/rxrpc/sendmsg.c b/net/rxrpc/sendmsg.c index 467c9402882e..85b35b11755d 100644 --- a/net/rxrpc/sendmsg.c +++ b/net/rxrpc/sendmsg.c @@ -98,7 +98,7 @@ static bool rxrpc_check_tx_space(struct rxrpc_call *call,= rxrpc_seq_t *_tx_win) =20 if (_tx_win) *_tx_win =3D tx_bottom; - return call->tx_prepared - tx_bottom < 256; + return call->send_top - tx_bottom < 256; } =20 /* @@ -242,36 +242,74 @@ static void rxrpc_queue_packet(struct rxrpc_sock *rx,= struct rxrpc_call *call, struct rxrpc_txbuf *txb, rxrpc_notify_end_tx_t notify_end_tx) { + struct rxrpc_txqueue *sq =3D call->send_queue; rxrpc_seq_t seq =3D txb->seq; bool poke, last =3D txb->flags & RXRPC_LAST_PACKET; - + int ix =3D seq & RXRPC_TXQ_MASK; rxrpc_inc_stat(call->rxnet, stat_tx_data); =20 - ASSERTCMP(txb->seq, =3D=3D, call->tx_prepared + 1); - - /* We have to set the timestamp before queueing as the retransmit - * algorithm can see the packet as soon as we queue it. - */ - txb->last_sent =3D ktime_get_real(); + ASSERTCMP(txb->seq, =3D=3D, call->send_top + 1); =20 if (last) trace_rxrpc_txqueue(call, rxrpc_txqueue_queue_last); else trace_rxrpc_txqueue(call, rxrpc_txqueue_queue); =20 + if (WARN_ON_ONCE(sq->bufs[ix])) + trace_rxrpc_tq(call, sq, seq, rxrpc_tq_queue_dup); + else + trace_rxrpc_tq(call, sq, seq, rxrpc_tq_queue); + /* Add the packet to the call's output buffer */ spin_lock(&call->tx_lock); - poke =3D list_empty(&call->tx_sendmsg); - list_add_tail(&txb->call_link, &call->tx_sendmsg); - call->tx_prepared =3D seq; - if (last) + poke =3D (READ_ONCE(call->tx_bottom) =3D=3D call->send_top); + sq->bufs[ix] =3D txb; + /* Order send_top after the queue->next pointer and txb content. */ + smp_store_release(&call->send_top, seq); + if (last) { rxrpc_notify_end_tx(rx, call, notify_end_tx); + call->send_queue =3D NULL; + } spin_unlock(&call->tx_lock); =20 if (poke) rxrpc_poke_call(call, rxrpc_call_poke_start); } =20 +/* + * Allocate a new txqueue unit and add it to the transmission queue. + */ +static int rxrpc_alloc_txqueue(struct sock *sk, struct rxrpc_call *call) +{ + struct rxrpc_txqueue *tq; + + tq =3D kzalloc(sizeof(*tq), sk->sk_allocation); + if (!tq) + return -ENOMEM; + + tq->xmit_ts_base =3D KTIME_MIN; + for (int i =3D 0; i < RXRPC_NR_TXQUEUE; i++) + tq->segment_xmit_ts[i] =3D UINT_MAX; + + if (call->send_queue) { + tq->qbase =3D call->send_top + 1; + call->send_queue->next =3D tq; + call->send_queue =3D tq; + } else if (WARN_ON(call->tx_queue)) { + kfree(tq); + return -ENOMEM; + } else { + tq->qbase =3D 0; + call->tx_qbase =3D 0; + call->send_queue =3D tq; + call->tx_qtail =3D tq; + call->tx_queue =3D tq; + } + + trace_rxrpc_tq(call, tq, call->send_top, rxrpc_tq_alloc); + return 0; +} + /* * send data through a socket * - must be called in process context @@ -346,6 +384,13 @@ static int rxrpc_send_data(struct rxrpc_sock *rx, if (!rxrpc_check_tx_space(call, NULL)) goto wait_for_space; =20 + /* See if we need to begin/extend the Tx queue. */ + if (!call->send_queue || !((call->send_top + 1) & RXRPC_TXQ_MASK)) { + ret =3D rxrpc_alloc_txqueue(sk, call); + if (ret < 0) + goto maybe_error; + } + /* Work out the maximum size of a packet. Assume that * the security header is going to be in the padded * region (enc blocksize), but the trailer is not. diff --git a/net/rxrpc/txbuf.c b/net/rxrpc/txbuf.c index 0cc8f49d69a9..067223c8c35f 100644 --- a/net/rxrpc/txbuf.c +++ b/net/rxrpc/txbuf.c @@ -43,17 +43,14 @@ struct rxrpc_txbuf *rxrpc_alloc_data_txbuf(struct rxrpc= _call *call, size_t data_ =20 whdr =3D buf + hoff; =20 - INIT_LIST_HEAD(&txb->call_link); - INIT_LIST_HEAD(&txb->tx_link); refcount_set(&txb->ref, 1); - txb->last_sent =3D KTIME_MIN; txb->call_debug_id =3D call->debug_id; txb->debug_id =3D atomic_inc_return(&rxrpc_txbuf_debug_ids); txb->alloc_size =3D data_size; txb->space =3D data_size; txb->offset =3D sizeof(*whdr); txb->flags =3D call->conn->out_clientflag; - txb->seq =3D call->tx_prepared + 1; + txb->seq =3D call->send_top + 1; txb->nr_kvec =3D 1; txb->kvec[0].iov_base =3D whdr; txb->kvec[0].iov_len =3D sizeof(*whdr); @@ -114,8 +111,6 @@ struct rxrpc_txbuf *rxrpc_alloc_ack_txbuf(struct rxrpc_= call *call, size_t sack_s filler =3D buf + sizeof(*whdr) + sizeof(*ack) + 1; trailer =3D buf + sizeof(*whdr) + sizeof(*ack) + 1 + 3; =20 - INIT_LIST_HEAD(&txb->call_link); - INIT_LIST_HEAD(&txb->tx_link); refcount_set(&txb->ref, 1); txb->call_debug_id =3D call->debug_id; txb->debug_id =3D atomic_inc_return(&rxrpc_txbuf_debug_ids); @@ -200,37 +195,3 @@ void rxrpc_put_txbuf(struct rxrpc_txbuf *txb, enum rxr= pc_txbuf_trace what) rxrpc_free_txbuf(txb); } } - -/* - * Shrink the transmit buffer. - */ -void rxrpc_shrink_call_tx_buffer(struct rxrpc_call *call) -{ - struct rxrpc_txbuf *txb; - rxrpc_seq_t hard_ack =3D smp_load_acquire(&call->acks_hard_ack); - bool wake =3D false; - - _enter("%x/%x/%x", call->tx_bottom, call->acks_hard_ack, call->tx_top); - - while ((txb =3D list_first_entry_or_null(&call->tx_buffer, - struct rxrpc_txbuf, call_link))) { - hard_ack =3D call->acks_hard_ack; - if (before(hard_ack, txb->seq)) - break; - - if (txb->seq !=3D call->tx_bottom + 1) - rxrpc_see_txbuf(txb, rxrpc_txbuf_see_out_of_step); - ASSERTCMP(txb->seq, =3D=3D, call->tx_bottom + 1); - WRITE_ONCE(call->tx_bottom, call->tx_bottom + 1); - list_del_rcu(&txb->call_link); - - trace_rxrpc_txqueue(call, rxrpc_txqueue_dequeue); - - rxrpc_put_txbuf(txb, rxrpc_txbuf_put_rotated); - if (after(call->acks_hard_ack, call->tx_bottom + 128)) - wake =3D true; - } - - if (wake) - wake_up(&call->waitq); -}