[PATCH mptcp-next v1 6/9] mptcp: implemented OoO queue pruning

Paolo Abeni posted 9 patches 1 month, 2 weeks ago
There is a newer version of this series
[PATCH mptcp-next v1 6/9] mptcp: implemented OoO queue pruning
Posted by Paolo Abeni 1 month, 2 weeks ago
Leverage the hybrid helper to implement the receive queue and OoO queue
collapsing at ingress time when reaching memory bounds.

If the msk is owned by the user-space at incoming skb time, perform the
pruning in the release_cb. The prune check is additionally performed
when the skb reaches the msk-level queues.

Pruning is not needed for fallback socket, as their MPTCP-level OoO queue
must always be empty: remove the ingress check for such scenario and
relay on the TCP-level one..

Signed-off-by: Paolo Abeni <pabeni@redhat.com>
---
RFC -> v1:
 - use data_seq only when available
 - avoid ack_seq lockless access
 - drop limit on fallback
 - collapse rcvqueue, too
 - drop only when pruning is not possible and over rcvbuf * 2

Notes:
 - Similarly to path 'mptcp: move checks vs rcvbuf size earlier in the RX
   path', some cleanup/tuning in mptcp_over_limit() will be needed
 - Pruning in the release_cb() is likely not needed, should probably be
   removed (after more testing).
---
 net/mptcp/mib.c      |  3 ++
 net/mptcp/mib.h      |  3 ++
 net/mptcp/options.c  | 40 +++++++++++++++++++++++---
 net/mptcp/protocol.c | 68 ++++++++++++++++++++++++++++++++++++++++++++
 net/mptcp/protocol.h |  2 ++
 5 files changed, 112 insertions(+), 4 deletions(-)

diff --git a/net/mptcp/mib.c b/net/mptcp/mib.c
index f23fda0c55a7..5128feec942c 100644
--- a/net/mptcp/mib.c
+++ b/net/mptcp/mib.c
@@ -85,6 +85,9 @@ static const struct snmp_mib mptcp_snmp_list[] = {
 	SNMP_MIB_ITEM("SimultConnectFallback", MPTCP_MIB_SIMULTCONNFALLBACK),
 	SNMP_MIB_ITEM("FallbackFailed", MPTCP_MIB_FALLBACKFAILED),
 	SNMP_MIB_ITEM("WinProbe", MPTCP_MIB_WINPROBE),
+	SNMP_MIB_ITEM("OfoPruned", MPTCP_MIB_OFO_PRUNED),
+	SNMP_MIB_ITEM("RcvPruned", MPTCP_MIB_RCVPRUNED),
+	SNMP_MIB_ITEM("RcvCollapsed", MPTCP_MIB_RCVCOLLAPSED),
 };
 
 /* mptcp_mib_alloc - allocate percpu mib counters
diff --git a/net/mptcp/mib.h b/net/mptcp/mib.h
index 812218b5ed2b..2f8f68e33ac5 100644
--- a/net/mptcp/mib.h
+++ b/net/mptcp/mib.h
@@ -88,6 +88,9 @@ enum linux_mptcp_mib_field {
 	MPTCP_MIB_SIMULTCONNFALLBACK,	/* Simultaneous connect */
 	MPTCP_MIB_FALLBACKFAILED,	/* Can't fallback due to msk status */
 	MPTCP_MIB_WINPROBE,		/* MPTCP-level zero window probe */
+	MPTCP_MIB_OFO_PRUNED,		/* MPTCP-level OoO queue pruned */
+	MPTCP_MIB_RCVPRUNED,		/* Dropped due to memory constrains */
+	MPTCP_MIB_RCVCOLLAPSED,		/* Collapsed due to memory pressure */
 	__MPTCP_MIB_MAX
 };
 
diff --git a/net/mptcp/options.c b/net/mptcp/options.c
index 14afeee8ca5f..a49cb03954e5 100644
--- a/net/mptcp/options.c
+++ b/net/mptcp/options.c
@@ -1158,12 +1158,40 @@ static bool add_addr_hmac_valid(struct mptcp_sock *msk,
 	return hmac == mp_opt->ahmac;
 }
 
-static bool mptcp_over_limit(const struct sock *sk, struct sk_buff *skb)
+static bool mptcp_over_limit(struct sock *sk, struct sk_buff *skb,
+			     const struct mptcp_options_received *mp_opt)
 {
+	struct mptcp_sock *msk = mptcp_sk(sk);
+	bool ret;
+
 	if (TCP_SKB_CB(skb)->seq == TCP_SKB_CB(skb)->end_seq)
 		return false;
 
-	return sk_rmem_alloc_get(sk) > READ_ONCE(sk->sk_rcvbuf);
+	/* Allow some slack for backlog processing */
+	if (sk_rmem_alloc_get(sk) < READ_ONCE(sk->sk_rcvbuf))
+		return false;
+
+	mptcp_data_lock(sk);
+	if (!sock_owned_by_user(sk)) {
+		/* When the data seqence is not (yet) available for the,
+		 * incoming skb, allow pruning the whole OoO queue
+		 */
+		u32 seq = !mp_opt->use_map || mp_opt->mpc_map ? msk->ack_seq :
+			  mp_opt->data_seq;
+
+		__mptcp_check_prune(sk, seq);
+		ret = sk_rmem_alloc_get(sk) > READ_ONCE(sk->sk_rcvbuf);
+	} else {
+		u64 limit = ((u64)READ_ONCE(sk->sk_rcvbuf)) << 1;
+
+		/* Pruning will take place later in the RX path, allow
+		 * some extra slack.
+		 */
+		ret = sk_rmem_alloc_get(sk) > limit;
+		__set_bit(MPTCP_PRUNE, &msk->cb_flags);
+	}
+	mptcp_data_unlock(sk);
+	return ret;
 }
 
 /* Return false when the caller must drop the packet, i.e. in case of error,
@@ -1194,7 +1222,11 @@ bool mptcp_incoming_options(struct sock *sk, struct sk_buff *skb)
 		__mptcp_data_acked(subflow->conn);
 		mptcp_data_unlock(subflow->conn);
 
-		if (mptcp_over_limit(subflow->conn, skb))
+		/* Will use ack_seq as limit for OoO pruning; any value would do
+		 * as OoO queue must be empty.
+		 */
+		mp_opt.use_map = 0;
+		if (mptcp_over_limit(subflow->conn, skb, &mp_opt))
 			return false;
 		return true;
 	}
@@ -1274,7 +1306,7 @@ bool mptcp_incoming_options(struct sock *sk, struct sk_buff *skb)
 		return true;
 	}
 
-	if (mptcp_over_limit(subflow->conn, skb))
+	if (mptcp_over_limit(subflow->conn, skb, &mp_opt))
 		return false;
 
 	mpext = skb_ext_add(skb, SKB_EXT_MPTCP);
diff --git a/net/mptcp/protocol.c b/net/mptcp/protocol.c
index 49e62f817fd6..0c57561ee046 100644
--- a/net/mptcp/protocol.c
+++ b/net/mptcp/protocol.c
@@ -363,6 +363,66 @@ static void mptcp_init_skb(struct sock *ssk, struct sk_buff *skb, int offset)
 	skb_dst_drop(skb);
 }
 
+/* "Inspired" from the TCP version */
+static void mptcp_prune_ofo_queue(struct sock *sk, u32 seq)
+{
+	struct mptcp_sock *msk = mptcp_sk(sk);
+	struct rb_node *node, *prev;
+	bool pruned = false;
+
+	if (RB_EMPTY_ROOT(&msk->out_of_order_queue))
+		return;
+
+	node = &msk->ooo_last_skb->rbnode;
+
+	do {
+		struct sk_buff *skb = rb_to_skb(node);
+
+		/* If incoming skb would land last in ofo queue, stop pruning. */
+		if (after(seq, MPTCP_SKB_CB(skb)->map_seq))
+			break;
+
+		pruned = true;
+		prev = rb_prev(node);
+		rb_erase(node, &msk->out_of_order_queue);
+		mptcp_drop(sk, skb);
+		msk->ooo_last_skb = rb_to_skb(prev);
+		if (atomic_read(&sk->sk_rmem_alloc) < sk->sk_rcvbuf)
+			break;
+
+		node = prev;
+	} while (node);
+
+	if (pruned)
+		NET_INC_STATS(sock_net(sk), MPTCP_MIB_OFO_PRUNED);
+}
+
+bool __mptcp_check_prune(struct sock *sk, u32 seq)
+{
+	struct mptcp_sock *msk = mptcp_sk(sk);
+	unsigned int dropped;
+
+	if (likely(atomic_read(&sk->sk_rmem_alloc) < sk->sk_rcvbuf))
+		return false;
+
+	dropped = xtcp_collapse_ofo_queue(sk, &msk->out_of_order_queue,
+					  &msk->ooo_last_skb, msk->scaling_ratio);
+	if (!skb_queue_empty(&sk->sk_receive_queue))
+		dropped += xtcp_collapse(sk, &sk->sk_receive_queue, NULL,
+					 skb_peek(&sk->sk_receive_queue),
+					 NULL,
+					 msk->copied_seq, msk->ack_seq,
+					 msk->scaling_ratio);
+
+	if (dropped)
+		MPTCP_ADD_STATS(sock_net(sk), MPTCP_MIB_RCVCOLLAPSED, dropped);
+	if (likely(atomic_read(&sk->sk_rmem_alloc) < sk->sk_rcvbuf))
+		return false;
+
+	mptcp_prune_ofo_queue(sk, seq);
+	return atomic_read(&sk->sk_rmem_alloc) >= sk->sk_rcvbuf;
+}
+
 static bool __mptcp_move_skb(struct sock *sk, struct sk_buff *skb)
 {
 	u32 copy_len = MPTCP_SKB_CB(skb)->end_seq - MPTCP_SKB_CB(skb)->map_seq;
@@ -372,6 +432,12 @@ static bool __mptcp_move_skb(struct sock *sk, struct sk_buff *skb)
 
 	mptcp_borrow_fwdmem(sk, skb);
 
+	if (__mptcp_check_prune(sk, MPTCP_SKB_CB(skb)->map_seq)) {
+		MPTCP_INC_STATS(sock_net(sk), MPTCP_MIB_RCVPRUNED);
+		mptcp_drop(sk, skb);
+		return false;
+	}
+
 	if (MPTCP_SKB_CB(skb)->map_seq == ack_seq) {
 		/* in sequence */
 		msk->bytes_received += copy_len;
@@ -3679,6 +3745,8 @@ static void mptcp_release_cb(struct sock *sk)
 			__mptcp_error_report(sk);
 		if (__test_and_clear_bit(MPTCP_SYNC_SNDBUF, &msk->cb_flags))
 			__mptcp_sync_sndbuf(sk);
+		if (__test_and_clear_bit(MPTCP_PRUNE, &msk->cb_flags))
+			__mptcp_check_prune(sk, msk->ack_seq - 1);
 	}
 }
 
diff --git a/net/mptcp/protocol.h b/net/mptcp/protocol.h
index e541f42fca25..a6b7eedf36cf 100644
--- a/net/mptcp/protocol.h
+++ b/net/mptcp/protocol.h
@@ -124,6 +124,7 @@
 #define MPTCP_FLUSH_JOIN_LIST	5
 #define MPTCP_SYNC_STATE	6
 #define MPTCP_SYNC_SNDBUF	7
+#define MPTCP_PRUNE		8
 
 struct mptcp_skb_cb {
 	u32 map_seq;
@@ -829,6 +830,7 @@ bool __mptcp_close(struct sock *sk, long timeout);
 void mptcp_cancel_work(struct sock *sk);
 void __mptcp_unaccepted_force_close(struct sock *sk);
 void mptcp_set_state(struct sock *sk, int state);
+bool __mptcp_check_prune(struct sock *sk, u32 seq);
 
 bool mptcp_addresses_equal(const struct mptcp_addr_info *a,
 			   const struct mptcp_addr_info *b, bool use_port);
-- 
2.53.0