Leverage the hybrid helper to implement the receive queue and OoO queue
collapsing at ingress time when reaching memory bounds.
If the msk is owned by the user-space at incoming skb time, perform the
pruning in the release_cb. The prune check is additionally performed
when the skb reaches the msk-level queues.
Pruning is not needed for fallback socket, as their MPTCP-level OoO queue
must always be empty: remove the ingress check for such scenario and
relay on the TCP-level one..
Signed-off-by: Paolo Abeni <pabeni@redhat.com>
---
RFC -> v1:
- use data_seq only when available
- avoid ack_seq lockless access
- drop limit on fallback
- collapse rcvqueue, too
- drop only when pruning is not possible and over rcvbuf * 2
Notes:
- Similarly to path 'mptcp: move checks vs rcvbuf size earlier in the RX
path', some cleanup/tuning in mptcp_over_limit() will be needed
- Pruning in the release_cb() is likely not needed, should probably be
removed (after more testing).
---
net/mptcp/mib.c | 3 ++
net/mptcp/mib.h | 3 ++
net/mptcp/options.c | 40 +++++++++++++++++++++++---
net/mptcp/protocol.c | 68 ++++++++++++++++++++++++++++++++++++++++++++
net/mptcp/protocol.h | 2 ++
5 files changed, 112 insertions(+), 4 deletions(-)
diff --git a/net/mptcp/mib.c b/net/mptcp/mib.c
index f23fda0c55a7..5128feec942c 100644
--- a/net/mptcp/mib.c
+++ b/net/mptcp/mib.c
@@ -85,6 +85,9 @@ static const struct snmp_mib mptcp_snmp_list[] = {
SNMP_MIB_ITEM("SimultConnectFallback", MPTCP_MIB_SIMULTCONNFALLBACK),
SNMP_MIB_ITEM("FallbackFailed", MPTCP_MIB_FALLBACKFAILED),
SNMP_MIB_ITEM("WinProbe", MPTCP_MIB_WINPROBE),
+ SNMP_MIB_ITEM("OfoPruned", MPTCP_MIB_OFO_PRUNED),
+ SNMP_MIB_ITEM("RcvPruned", MPTCP_MIB_RCVPRUNED),
+ SNMP_MIB_ITEM("RcvCollapsed", MPTCP_MIB_RCVCOLLAPSED),
};
/* mptcp_mib_alloc - allocate percpu mib counters
diff --git a/net/mptcp/mib.h b/net/mptcp/mib.h
index 812218b5ed2b..2f8f68e33ac5 100644
--- a/net/mptcp/mib.h
+++ b/net/mptcp/mib.h
@@ -88,6 +88,9 @@ enum linux_mptcp_mib_field {
MPTCP_MIB_SIMULTCONNFALLBACK, /* Simultaneous connect */
MPTCP_MIB_FALLBACKFAILED, /* Can't fallback due to msk status */
MPTCP_MIB_WINPROBE, /* MPTCP-level zero window probe */
+ MPTCP_MIB_OFO_PRUNED, /* MPTCP-level OoO queue pruned */
+ MPTCP_MIB_RCVPRUNED, /* Dropped due to memory constrains */
+ MPTCP_MIB_RCVCOLLAPSED, /* Collapsed due to memory pressure */
__MPTCP_MIB_MAX
};
diff --git a/net/mptcp/options.c b/net/mptcp/options.c
index 14afeee8ca5f..a49cb03954e5 100644
--- a/net/mptcp/options.c
+++ b/net/mptcp/options.c
@@ -1158,12 +1158,40 @@ static bool add_addr_hmac_valid(struct mptcp_sock *msk,
return hmac == mp_opt->ahmac;
}
-static bool mptcp_over_limit(const struct sock *sk, struct sk_buff *skb)
+static bool mptcp_over_limit(struct sock *sk, struct sk_buff *skb,
+ const struct mptcp_options_received *mp_opt)
{
+ struct mptcp_sock *msk = mptcp_sk(sk);
+ bool ret;
+
if (TCP_SKB_CB(skb)->seq == TCP_SKB_CB(skb)->end_seq)
return false;
- return sk_rmem_alloc_get(sk) > READ_ONCE(sk->sk_rcvbuf);
+ /* Allow some slack for backlog processing */
+ if (sk_rmem_alloc_get(sk) < READ_ONCE(sk->sk_rcvbuf))
+ return false;
+
+ mptcp_data_lock(sk);
+ if (!sock_owned_by_user(sk)) {
+ /* When the data seqence is not (yet) available for the,
+ * incoming skb, allow pruning the whole OoO queue
+ */
+ u32 seq = !mp_opt->use_map || mp_opt->mpc_map ? msk->ack_seq :
+ mp_opt->data_seq;
+
+ __mptcp_check_prune(sk, seq);
+ ret = sk_rmem_alloc_get(sk) > READ_ONCE(sk->sk_rcvbuf);
+ } else {
+ u64 limit = ((u64)READ_ONCE(sk->sk_rcvbuf)) << 1;
+
+ /* Pruning will take place later in the RX path, allow
+ * some extra slack.
+ */
+ ret = sk_rmem_alloc_get(sk) > limit;
+ __set_bit(MPTCP_PRUNE, &msk->cb_flags);
+ }
+ mptcp_data_unlock(sk);
+ return ret;
}
/* Return false when the caller must drop the packet, i.e. in case of error,
@@ -1194,7 +1222,11 @@ bool mptcp_incoming_options(struct sock *sk, struct sk_buff *skb)
__mptcp_data_acked(subflow->conn);
mptcp_data_unlock(subflow->conn);
- if (mptcp_over_limit(subflow->conn, skb))
+ /* Will use ack_seq as limit for OoO pruning; any value would do
+ * as OoO queue must be empty.
+ */
+ mp_opt.use_map = 0;
+ if (mptcp_over_limit(subflow->conn, skb, &mp_opt))
return false;
return true;
}
@@ -1274,7 +1306,7 @@ bool mptcp_incoming_options(struct sock *sk, struct sk_buff *skb)
return true;
}
- if (mptcp_over_limit(subflow->conn, skb))
+ if (mptcp_over_limit(subflow->conn, skb, &mp_opt))
return false;
mpext = skb_ext_add(skb, SKB_EXT_MPTCP);
diff --git a/net/mptcp/protocol.c b/net/mptcp/protocol.c
index 49e62f817fd6..0c57561ee046 100644
--- a/net/mptcp/protocol.c
+++ b/net/mptcp/protocol.c
@@ -363,6 +363,66 @@ static void mptcp_init_skb(struct sock *ssk, struct sk_buff *skb, int offset)
skb_dst_drop(skb);
}
+/* "Inspired" from the TCP version */
+static void mptcp_prune_ofo_queue(struct sock *sk, u32 seq)
+{
+ struct mptcp_sock *msk = mptcp_sk(sk);
+ struct rb_node *node, *prev;
+ bool pruned = false;
+
+ if (RB_EMPTY_ROOT(&msk->out_of_order_queue))
+ return;
+
+ node = &msk->ooo_last_skb->rbnode;
+
+ do {
+ struct sk_buff *skb = rb_to_skb(node);
+
+ /* If incoming skb would land last in ofo queue, stop pruning. */
+ if (after(seq, MPTCP_SKB_CB(skb)->map_seq))
+ break;
+
+ pruned = true;
+ prev = rb_prev(node);
+ rb_erase(node, &msk->out_of_order_queue);
+ mptcp_drop(sk, skb);
+ msk->ooo_last_skb = rb_to_skb(prev);
+ if (atomic_read(&sk->sk_rmem_alloc) < sk->sk_rcvbuf)
+ break;
+
+ node = prev;
+ } while (node);
+
+ if (pruned)
+ NET_INC_STATS(sock_net(sk), MPTCP_MIB_OFO_PRUNED);
+}
+
+bool __mptcp_check_prune(struct sock *sk, u32 seq)
+{
+ struct mptcp_sock *msk = mptcp_sk(sk);
+ unsigned int dropped;
+
+ if (likely(atomic_read(&sk->sk_rmem_alloc) < sk->sk_rcvbuf))
+ return false;
+
+ dropped = xtcp_collapse_ofo_queue(sk, &msk->out_of_order_queue,
+ &msk->ooo_last_skb, msk->scaling_ratio);
+ if (!skb_queue_empty(&sk->sk_receive_queue))
+ dropped += xtcp_collapse(sk, &sk->sk_receive_queue, NULL,
+ skb_peek(&sk->sk_receive_queue),
+ NULL,
+ msk->copied_seq, msk->ack_seq,
+ msk->scaling_ratio);
+
+ if (dropped)
+ MPTCP_ADD_STATS(sock_net(sk), MPTCP_MIB_RCVCOLLAPSED, dropped);
+ if (likely(atomic_read(&sk->sk_rmem_alloc) < sk->sk_rcvbuf))
+ return false;
+
+ mptcp_prune_ofo_queue(sk, seq);
+ return atomic_read(&sk->sk_rmem_alloc) >= sk->sk_rcvbuf;
+}
+
static bool __mptcp_move_skb(struct sock *sk, struct sk_buff *skb)
{
u32 copy_len = MPTCP_SKB_CB(skb)->end_seq - MPTCP_SKB_CB(skb)->map_seq;
@@ -372,6 +432,12 @@ static bool __mptcp_move_skb(struct sock *sk, struct sk_buff *skb)
mptcp_borrow_fwdmem(sk, skb);
+ if (__mptcp_check_prune(sk, MPTCP_SKB_CB(skb)->map_seq)) {
+ MPTCP_INC_STATS(sock_net(sk), MPTCP_MIB_RCVPRUNED);
+ mptcp_drop(sk, skb);
+ return false;
+ }
+
if (MPTCP_SKB_CB(skb)->map_seq == ack_seq) {
/* in sequence */
msk->bytes_received += copy_len;
@@ -3679,6 +3745,8 @@ static void mptcp_release_cb(struct sock *sk)
__mptcp_error_report(sk);
if (__test_and_clear_bit(MPTCP_SYNC_SNDBUF, &msk->cb_flags))
__mptcp_sync_sndbuf(sk);
+ if (__test_and_clear_bit(MPTCP_PRUNE, &msk->cb_flags))
+ __mptcp_check_prune(sk, msk->ack_seq - 1);
}
}
diff --git a/net/mptcp/protocol.h b/net/mptcp/protocol.h
index e541f42fca25..a6b7eedf36cf 100644
--- a/net/mptcp/protocol.h
+++ b/net/mptcp/protocol.h
@@ -124,6 +124,7 @@
#define MPTCP_FLUSH_JOIN_LIST 5
#define MPTCP_SYNC_STATE 6
#define MPTCP_SYNC_SNDBUF 7
+#define MPTCP_PRUNE 8
struct mptcp_skb_cb {
u32 map_seq;
@@ -829,6 +830,7 @@ bool __mptcp_close(struct sock *sk, long timeout);
void mptcp_cancel_work(struct sock *sk);
void __mptcp_unaccepted_force_close(struct sock *sk);
void mptcp_set_state(struct sock *sk, int state);
+bool __mptcp_check_prune(struct sock *sk, u32 seq);
bool mptcp_addresses_equal(const struct mptcp_addr_info *a,
const struct mptcp_addr_info *b, bool use_port);
--
2.53.0
© 2016 - 2026 Red Hat, Inc.