[PATCH 2/2] nfsd: allow for more callback session slots

Jeff Layton posted 2 patches 3 weeks, 6 days ago
There is a newer version of this series
[PATCH 2/2] nfsd: allow for more callback session slots
Posted by Jeff Layton 3 weeks, 6 days ago
nfsd currently only uses a single slot in the callback channel, which is
proving to be a bottleneck in some cases. Widen the callback channel to
a max of 32 slots (subject to the client's target_maxreqs value).

Change the cb_holds_slot boolean to an integer that tracks the current
slot number (with -1 meaning "unassigned").  Move the callback slot
tracking info into the session. Add a new u32 that acts as a bitmap to
track which slots are in use, and a u32 to track the latest callback
target_slotid that the client reports. While they are part of the
session, the fields are protected by the cl_lock.

Fix nfsd41_cb_get_slot to always search for the lowest slotid (using
ffs()), and change it to continually retry until there is a slot
available.

Finally, convert the session->se_cb_seq_nr field into an array of
counters and add the necessary handling to ensure that the seqids get
reset at the appropriate times.

Signed-off-by: Jeff Layton <jlayton@kernel.org>
---
 fs/nfsd/nfs4callback.c | 107 +++++++++++++++++++++++++++++++++++--------------
 fs/nfsd/nfs4state.c    |   7 +++-
 fs/nfsd/state.h        |  12 +++---
 fs/nfsd/trace.h        |   2 +-
 4 files changed, 89 insertions(+), 39 deletions(-)

diff --git a/fs/nfsd/nfs4callback.c b/fs/nfsd/nfs4callback.c
index e38fa834b3d91333acf1425eb14c644e5d5f2601..64b85b164125b244494f9805840a0d8a1ccb4c1b 100644
--- a/fs/nfsd/nfs4callback.c
+++ b/fs/nfsd/nfs4callback.c
@@ -406,6 +406,16 @@ encode_cb_getattr4args(struct xdr_stream *xdr, struct nfs4_cb_compound_hdr *hdr,
 	hdr->nops++;
 }
 
+static u32 highest_unset_index(u32 word)
+{
+	int i;
+
+	for (i = sizeof(word) * 8 - 1; i > 0; --i)
+		if (!(word & BIT(i)))
+			return i;
+	return 0;
+}
+
 /*
  * CB_SEQUENCE4args
  *
@@ -432,15 +442,38 @@ static void encode_cb_sequence4args(struct xdr_stream *xdr,
 	encode_sessionid4(xdr, session);
 
 	p = xdr_reserve_space(xdr, 4 + 4 + 4 + 4 + 4);
-	*p++ = cpu_to_be32(session->se_cb_seq_nr);	/* csa_sequenceid */
-	*p++ = xdr_zero;			/* csa_slotid */
-	*p++ = xdr_zero;			/* csa_highest_slotid */
+	*p++ = cpu_to_be32(session->se_cb_seq_nr[cb->cb_held_slot]);	/* csa_sequenceid */
+	*p++ = cpu_to_be32(cb->cb_held_slot);		/* csa_slotid */
+	*p++ = cpu_to_be32(max(highest_unset_index(session->se_cb_slot_avail),
+			       session->se_cb_highest_slot)); /* csa_highest_slotid */
 	*p++ = xdr_zero;			/* csa_cachethis */
 	xdr_encode_empty_array(p);		/* csa_referring_call_lists */
 
 	hdr->nops++;
 }
 
+static void update_cb_slot_table(struct nfsd4_session *ses, u32 highest)
+{
+	/* No need to do anything if nothing changed */
+	if (likely(highest == READ_ONCE(ses->se_cb_highest_slot)))
+		return;
+
+	spin_lock(&ses->se_client->cl_lock);
+	/* If growing the slot table, reset any new sequences to 1 */
+	if (highest > ses->se_cb_highest_slot) {
+		int i;
+
+		for (i = ses->se_cb_highest_slot; i <= highest; ++i) {
+			/* beyond the end of the array? */
+			if (i >= NFSD_BC_SLOT_TABLE_MAX)
+				break;
+			ses->se_cb_seq_nr[i] = 1;
+		}
+	}
+	ses->se_cb_highest_slot = highest;
+	spin_unlock(&ses->se_client->cl_lock);
+}
+
 /*
  * CB_SEQUENCE4resok
  *
@@ -485,7 +518,7 @@ static int decode_cb_sequence4resok(struct xdr_stream *xdr,
 	p += XDR_QUADLEN(NFS4_MAX_SESSIONID_LEN);
 
 	dummy = be32_to_cpup(p++);
-	if (dummy != session->se_cb_seq_nr) {
+	if (dummy != session->se_cb_seq_nr[cb->cb_held_slot]) {
 		dprintk("NFS: %s Invalid sequence number\n", __func__);
 		goto out;
 	}
@@ -496,9 +529,15 @@ static int decode_cb_sequence4resok(struct xdr_stream *xdr,
 		goto out;
 	}
 
-	/*
-	 * FIXME: process highest slotid and target highest slotid
-	 */
+	p++; // ignore current highest slot value
+
+	dummy = be32_to_cpup(p++);
+	if (dummy == 0) {
+		dprintk("NFS: %s Invalid target highest slotid\n", __func__);
+		goto out;
+	}
+
+	update_cb_slot_table(session, dummy);
 	status = 0;
 out:
 	cb->cb_seq_status = status;
@@ -1208,31 +1247,38 @@ void nfsd4_change_callback(struct nfs4_client *clp, struct nfs4_cb_conn *conn)
  * If the slot is available, then mark it busy.  Otherwise, set the
  * thread for sleeping on the callback RPC wait queue.
  */
-static bool nfsd41_cb_get_slot(struct nfsd4_callback *cb, struct rpc_task *task)
+static void nfsd41_cb_get_slot(struct nfsd4_callback *cb, struct rpc_task *task)
 {
 	struct nfs4_client *clp = cb->cb_clp;
+	struct nfsd4_session *ses = clp->cl_cb_session;
+	int idx;
 
-	if (!cb->cb_holds_slot &&
-	    test_and_set_bit(0, &clp->cl_cb_slot_busy) != 0) {
+	if (cb->cb_held_slot >= 0)
+		return;
+retry:
+	spin_lock(&clp->cl_lock);
+	idx = ffs(ses->se_cb_slot_avail) - 1;
+	if (idx < 0 || idx > ses->se_cb_highest_slot) {
+		spin_unlock(&clp->cl_lock);
 		rpc_sleep_on(&clp->cl_cb_waitq, task, NULL);
-		/* Race breaker */
-		if (test_and_set_bit(0, &clp->cl_cb_slot_busy) != 0) {
-			dprintk("%s slot is busy\n", __func__);
-			return false;
-		}
-		rpc_wake_up_queued_task(&clp->cl_cb_waitq, task);
+		goto retry;
 	}
-	cb->cb_holds_slot = true;
-	return true;
+	/* clear the bit for the slot */
+	ses->se_cb_slot_avail &= ~BIT(idx);
+	spin_unlock(&clp->cl_lock);
+	cb->cb_held_slot = idx;
 }
 
 static void nfsd41_cb_release_slot(struct nfsd4_callback *cb)
 {
 	struct nfs4_client *clp = cb->cb_clp;
+	struct nfsd4_session *ses = clp->cl_cb_session;
 
-	if (cb->cb_holds_slot) {
-		cb->cb_holds_slot = false;
-		clear_bit(0, &clp->cl_cb_slot_busy);
+	if (cb->cb_held_slot >= 0) {
+		spin_lock(&clp->cl_lock);
+		ses->se_cb_slot_avail |= BIT(cb->cb_held_slot);
+		spin_unlock(&clp->cl_lock);
+		cb->cb_held_slot = -1;
 		rpc_wake_up_next(&clp->cl_cb_waitq);
 	}
 }
@@ -1265,8 +1311,8 @@ static void nfsd4_cb_prepare(struct rpc_task *task, void *calldata)
 	trace_nfsd_cb_rpc_prepare(clp);
 	cb->cb_seq_status = 1;
 	cb->cb_status = 0;
-	if (minorversion && !nfsd41_cb_get_slot(cb, task))
-		return;
+	if (minorversion)
+		nfsd41_cb_get_slot(cb, task);
 	rpc_call_start(task);
 }
 
@@ -1292,7 +1338,7 @@ static bool nfsd4_cb_sequence_done(struct rpc_task *task, struct nfsd4_callback
 		return true;
 	}
 
-	if (!cb->cb_holds_slot)
+	if (cb->cb_held_slot < 0)
 		goto need_restart;
 
 	/* This is the operation status code for CB_SEQUENCE */
@@ -1306,10 +1352,10 @@ static bool nfsd4_cb_sequence_done(struct rpc_task *task, struct nfsd4_callback
 		 * If CB_SEQUENCE returns an error, then the state of the slot
 		 * (sequence ID, cached reply) MUST NOT change.
 		 */
-		++session->se_cb_seq_nr;
+		++session->se_cb_seq_nr[cb->cb_held_slot];
 		break;
 	case -ESERVERFAULT:
-		++session->se_cb_seq_nr;
+		++session->se_cb_seq_nr[cb->cb_held_slot];
 		nfsd4_mark_cb_fault(cb->cb_clp);
 		ret = false;
 		break;
@@ -1335,17 +1381,16 @@ static bool nfsd4_cb_sequence_done(struct rpc_task *task, struct nfsd4_callback
 	case -NFS4ERR_BADSLOT:
 		goto retry_nowait;
 	case -NFS4ERR_SEQ_MISORDERED:
-		if (session->se_cb_seq_nr != 1) {
-			session->se_cb_seq_nr = 1;
+		if (session->se_cb_seq_nr[cb->cb_held_slot] != 1) {
+			session->se_cb_seq_nr[cb->cb_held_slot] = 1;
 			goto retry_nowait;
 		}
 		break;
 	default:
 		nfsd4_mark_cb_fault(cb->cb_clp);
 	}
-	nfsd41_cb_release_slot(cb);
-
 	trace_nfsd_cb_free_slot(task, cb);
+	nfsd41_cb_release_slot(cb);
 
 	if (RPC_SIGNALLED(task))
 		goto need_restart;
@@ -1565,7 +1610,7 @@ void nfsd4_init_cb(struct nfsd4_callback *cb, struct nfs4_client *clp,
 	INIT_WORK(&cb->cb_work, nfsd4_run_cb_work);
 	cb->cb_status = 0;
 	cb->cb_need_restart = false;
-	cb->cb_holds_slot = false;
+	cb->cb_held_slot = -1;
 }
 
 /**
diff --git a/fs/nfsd/nfs4state.c b/fs/nfsd/nfs4state.c
index 5b718b349396f1aecd0ad4c63b2f43342841bbd4..20a0d40202e40eed1c84d5d6c0a85b908804a6ba 100644
--- a/fs/nfsd/nfs4state.c
+++ b/fs/nfsd/nfs4state.c
@@ -2002,6 +2002,8 @@ static struct nfsd4_session *alloc_session(struct nfsd4_channel_attrs *fattrs,
 	}
 
 	memcpy(&new->se_fchannel, fattrs, sizeof(struct nfsd4_channel_attrs));
+	new->se_cb_slot_avail = ~0U;
+	new->se_cb_highest_slot = battrs->maxreqs - 1;
 	return new;
 out_free:
 	while (i--)
@@ -2132,7 +2134,9 @@ static void init_session(struct svc_rqst *rqstp, struct nfsd4_session *new, stru
 
 	INIT_LIST_HEAD(&new->se_conns);
 
-	new->se_cb_seq_nr = 1;
+	for (idx = 0; idx < NFSD_BC_SLOT_TABLE_MAX; ++idx)
+		new->se_cb_seq_nr[idx] = 1;
+
 	new->se_flags = cses->flags;
 	new->se_cb_prog = cses->callback_prog;
 	new->se_cb_sec = cses->cb_sec;
@@ -3159,7 +3163,6 @@ static struct nfs4_client *create_client(struct xdr_netobj name,
 	kref_init(&clp->cl_nfsdfs.cl_ref);
 	nfsd4_init_cb(&clp->cl_cb_null, clp, NULL, NFSPROC4_CLNT_CB_NULL);
 	clp->cl_time = ktime_get_boottime_seconds();
-	clear_bit(0, &clp->cl_cb_slot_busy);
 	copy_verf(clp, verf);
 	memcpy(&clp->cl_addr, sa, sizeof(struct sockaddr_storage));
 	clp->cl_cb_session = NULL;
diff --git a/fs/nfsd/state.h b/fs/nfsd/state.h
index 41cda86fea1f6166a0fd0215d3d458c93ced3e6a..2987c362bdd56251e736879dc89302ada2259be8 100644
--- a/fs/nfsd/state.h
+++ b/fs/nfsd/state.h
@@ -71,8 +71,8 @@ struct nfsd4_callback {
 	struct work_struct cb_work;
 	int cb_seq_status;
 	int cb_status;
+	int cb_held_slot;
 	bool cb_need_restart;
-	bool cb_holds_slot;
 };
 
 struct nfsd4_callback_ops {
@@ -307,6 +307,9 @@ struct nfsd4_conn {
 	unsigned char cn_flags;
 };
 
+/* Max number of slots that the server will use in the backchannel */
+#define NFSD_BC_SLOT_TABLE_MAX	32
+
 /*
  * Representation of a v4.1+ session. These are refcounted in a similar fashion
  * to the nfs4_client. References are only taken when the server is actively
@@ -325,7 +328,9 @@ struct nfsd4_session {
 	struct nfsd4_cb_sec	se_cb_sec;
 	struct list_head	se_conns;
 	u32			se_cb_prog;
-	u32			se_cb_seq_nr;
+	u32			se_cb_slot_avail; /* bitmap of available slots */
+	u32			se_cb_highest_slot;	/* highest slot client wants */
+	u32			se_cb_seq_nr[NFSD_BC_SLOT_TABLE_MAX];
 	struct nfsd4_slot	*se_slots[];	/* forward channel slots */
 };
 
@@ -459,9 +464,6 @@ struct nfs4_client {
 	 */
 	struct dentry		*cl_nfsd_info_dentry;
 
-	/* for nfs41 callbacks */
-	/* We currently support a single back channel with a single slot */
-	unsigned long		cl_cb_slot_busy;
 	struct rpc_wait_queue	cl_cb_waitq;	/* backchannel callers may */
 						/* wait here for slots */
 	struct net		*net;
diff --git a/fs/nfsd/trace.h b/fs/nfsd/trace.h
index f318898cfc31614b5a84a4867e18c2b3a07122c9..a9c17186b6892f1df8d7f7b90e250c2913ab23fe 100644
--- a/fs/nfsd/trace.h
+++ b/fs/nfsd/trace.h
@@ -1697,7 +1697,7 @@ TRACE_EVENT(nfsd_cb_free_slot,
 		__entry->cl_id = sid->clientid.cl_id;
 		__entry->seqno = sid->sequence;
 		__entry->reserved = sid->reserved;
-		__entry->slot_seqno = session->se_cb_seq_nr;
+		__entry->slot_seqno = session->se_cb_seq_nr[cb->cb_held_slot];
 	),
 	TP_printk(SUNRPC_TRACE_TASK_SPECIFIER
 		" sessionid=%08x:%08x:%08x:%08x new slot seqno=%u",

-- 
2.47.0
Re: [PATCH 2/2] nfsd: allow for more callback session slots
Posted by Chuck Lever 3 weeks, 6 days ago
On Mon, Oct 28, 2024 at 10:26:27AM -0400, Jeff Layton wrote:
> nfsd currently only uses a single slot in the callback channel, which is
> proving to be a bottleneck in some cases. Widen the callback channel to
> a max of 32 slots (subject to the client's target_maxreqs value).
> 
> Change the cb_holds_slot boolean to an integer that tracks the current
> slot number (with -1 meaning "unassigned").  Move the callback slot
> tracking info into the session. Add a new u32 that acts as a bitmap to
> track which slots are in use, and a u32 to track the latest callback
> target_slotid that the client reports. While they are part of the
> session, the fields are protected by the cl_lock.
> 
> Fix nfsd41_cb_get_slot to always search for the lowest slotid (using
> ffs()), and change it to continually retry until there is a slot
> available.
> 
> Finally, convert the session->se_cb_seq_nr field into an array of
> counters and add the necessary handling to ensure that the seqids get
> reset at the appropriate times.
> 
> Signed-off-by: Jeff Layton <jlayton@kernel.org>
> ---
>  fs/nfsd/nfs4callback.c | 107 +++++++++++++++++++++++++++++++++++--------------
>  fs/nfsd/nfs4state.c    |   7 +++-
>  fs/nfsd/state.h        |  12 +++---
>  fs/nfsd/trace.h        |   2 +-
>  4 files changed, 89 insertions(+), 39 deletions(-)
> 
> diff --git a/fs/nfsd/nfs4callback.c b/fs/nfsd/nfs4callback.c
> index e38fa834b3d91333acf1425eb14c644e5d5f2601..64b85b164125b244494f9805840a0d8a1ccb4c1b 100644
> --- a/fs/nfsd/nfs4callback.c
> +++ b/fs/nfsd/nfs4callback.c
> @@ -406,6 +406,16 @@ encode_cb_getattr4args(struct xdr_stream *xdr, struct nfs4_cb_compound_hdr *hdr,
>  	hdr->nops++;
>  }
>  
> +static u32 highest_unset_index(u32 word)
> +{
> +	int i;
> +
> +	for (i = sizeof(word) * 8 - 1; i > 0; --i)
> +		if (!(word & BIT(i)))
> +			return i;
> +	return 0;
> +}
> +

Isn't this the same as ffz() or "ffs(~(x))" ?


>  /*
>   * CB_SEQUENCE4args
>   *
> @@ -432,15 +442,38 @@ static void encode_cb_sequence4args(struct xdr_stream *xdr,
>  	encode_sessionid4(xdr, session);
>  
>  	p = xdr_reserve_space(xdr, 4 + 4 + 4 + 4 + 4);
> -	*p++ = cpu_to_be32(session->se_cb_seq_nr);	/* csa_sequenceid */
> -	*p++ = xdr_zero;			/* csa_slotid */
> -	*p++ = xdr_zero;			/* csa_highest_slotid */
> +	*p++ = cpu_to_be32(session->se_cb_seq_nr[cb->cb_held_slot]);	/* csa_sequenceid */
> +	*p++ = cpu_to_be32(cb->cb_held_slot);		/* csa_slotid */
> +	*p++ = cpu_to_be32(max(highest_unset_index(session->se_cb_slot_avail),
> +			       session->se_cb_highest_slot)); /* csa_highest_slotid */

This encoder doesn't hold the client's cl_lock, but does reference
fields that are updated while that lock is held.

Also, should a per-session spin lock be used instead of cl_lock?

Is locking even necessary? All of these calls are prepared in a
single-threaded workqueue. Reply processing can only free slots, so
it doesn't need to exclude session slot selection.


>  	*p++ = xdr_zero;			/* csa_cachethis */
>  	xdr_encode_empty_array(p);		/* csa_referring_call_lists */
>  
>  	hdr->nops++;
>  }
>  
> +static void update_cb_slot_table(struct nfsd4_session *ses, u32 highest)

Nit: Can you use the name "target" instead of "highest" ?


> +{
> +	/* No need to do anything if nothing changed */
> +	if (likely(highest == READ_ONCE(ses->se_cb_highest_slot)))
> +		return;
> +
> +	spin_lock(&ses->se_client->cl_lock);
> +	/* If growing the slot table, reset any new sequences to 1 */
> +	if (highest > ses->se_cb_highest_slot) {
> +		int i;
> +
> +		for (i = ses->se_cb_highest_slot; i <= highest; ++i) {
> +			/* beyond the end of the array? */
> +			if (i >= NFSD_BC_SLOT_TABLE_MAX)
> +				break;

Nit: why not cap "highest" at NFSD_BC_SLOT_TABLE_MAX before starting
this for loop?


> +			ses->se_cb_seq_nr[i] = 1;
> +		}
> +	}
> +	ses->se_cb_highest_slot = highest;
> +	spin_unlock(&ses->se_client->cl_lock);
> +}
> +
>  /*
>   * CB_SEQUENCE4resok
>   *
> @@ -485,7 +518,7 @@ static int decode_cb_sequence4resok(struct xdr_stream *xdr,
>  	p += XDR_QUADLEN(NFS4_MAX_SESSIONID_LEN);
>  
>  	dummy = be32_to_cpup(p++);
> -	if (dummy != session->se_cb_seq_nr) {
> +	if (dummy != session->se_cb_seq_nr[cb->cb_held_slot]) {

Nit: Let's rename "dummy" as "seqid", and add a "highest" variable
for the next XDR field (not shown here).


>  		dprintk("NFS: %s Invalid sequence number\n", __func__);
>  		goto out;
>  	}
> @@ -496,9 +529,15 @@ static int decode_cb_sequence4resok(struct xdr_stream *xdr,
>  		goto out;
>  	}
>  
> -	/*
> -	 * FIXME: process highest slotid and target highest slotid
> -	 */
> +	p++; // ignore current highest slot value
> +
> +	dummy = be32_to_cpup(p++);

Nit: I prefer a name for this argument variable like "target".

The compiler should be able to combine the usage of these variables
into a single memory location.


> +	if (dummy == 0) {
> +		dprintk("NFS: %s Invalid target highest slotid\n", __func__);
> +		goto out;
> +	}
> +
> +	update_cb_slot_table(session, dummy);
>  	status = 0;
>  out:
>  	cb->cb_seq_status = status;
> @@ -1208,31 +1247,38 @@ void nfsd4_change_callback(struct nfs4_client *clp, struct nfs4_cb_conn *conn)
>   * If the slot is available, then mark it busy.  Otherwise, set the
>   * thread for sleeping on the callback RPC wait queue.
>   */
> -static bool nfsd41_cb_get_slot(struct nfsd4_callback *cb, struct rpc_task *task)
> +static void nfsd41_cb_get_slot(struct nfsd4_callback *cb, struct rpc_task *task)
>  {
>  	struct nfs4_client *clp = cb->cb_clp;
> +	struct nfsd4_session *ses = clp->cl_cb_session;
> +	int idx;
>  
> -	if (!cb->cb_holds_slot &&
> -	    test_and_set_bit(0, &clp->cl_cb_slot_busy) != 0) {
> +	if (cb->cb_held_slot >= 0)
> +		return;
> +retry:
> +	spin_lock(&clp->cl_lock);
> +	idx = ffs(ses->se_cb_slot_avail) - 1;
> +	if (idx < 0 || idx > ses->se_cb_highest_slot) {
> +		spin_unlock(&clp->cl_lock);
>  		rpc_sleep_on(&clp->cl_cb_waitq, task, NULL);
> -		/* Race breaker */
> -		if (test_and_set_bit(0, &clp->cl_cb_slot_busy) != 0) {
> -			dprintk("%s slot is busy\n", __func__);
> -			return false;
> -		}
> -		rpc_wake_up_queued_task(&clp->cl_cb_waitq, task);
> +		goto retry;
>  	}
> -	cb->cb_holds_slot = true;
> -	return true;
> +	/* clear the bit for the slot */
> +	ses->se_cb_slot_avail &= ~BIT(idx);
> +	spin_unlock(&clp->cl_lock);
> +	cb->cb_held_slot = idx;
>  }
>  
>  static void nfsd41_cb_release_slot(struct nfsd4_callback *cb)
>  {
>  	struct nfs4_client *clp = cb->cb_clp;
> +	struct nfsd4_session *ses = clp->cl_cb_session;
>  
> -	if (cb->cb_holds_slot) {
> -		cb->cb_holds_slot = false;
> -		clear_bit(0, &clp->cl_cb_slot_busy);
> +	if (cb->cb_held_slot >= 0) {
> +		spin_lock(&clp->cl_lock);
> +		ses->se_cb_slot_avail |= BIT(cb->cb_held_slot);
> +		spin_unlock(&clp->cl_lock);
> +		cb->cb_held_slot = -1;
>  		rpc_wake_up_next(&clp->cl_cb_waitq);
>  	}
>  }
> @@ -1265,8 +1311,8 @@ static void nfsd4_cb_prepare(struct rpc_task *task, void *calldata)

Nit: This patch should update the documenting comment before
nfsd4_cb_prepare() -- the patch implements "multiple slots".


>  	trace_nfsd_cb_rpc_prepare(clp);
>  	cb->cb_seq_status = 1;
>  	cb->cb_status = 0;
> -	if (minorversion && !nfsd41_cb_get_slot(cb, task))
> -		return;
> +	if (minorversion)
> +		nfsd41_cb_get_slot(cb, task);
>  	rpc_call_start(task);
>  }
>  
> @@ -1292,7 +1338,7 @@ static bool nfsd4_cb_sequence_done(struct rpc_task *task, struct nfsd4_callback
>  		return true;
>  	}
>  
> -	if (!cb->cb_holds_slot)
> +	if (cb->cb_held_slot < 0)
>  		goto need_restart;
>  
>  	/* This is the operation status code for CB_SEQUENCE */
> @@ -1306,10 +1352,10 @@ static bool nfsd4_cb_sequence_done(struct rpc_task *task, struct nfsd4_callback
>  		 * If CB_SEQUENCE returns an error, then the state of the slot
>  		 * (sequence ID, cached reply) MUST NOT change.
>  		 */
> -		++session->se_cb_seq_nr;
> +		++session->se_cb_seq_nr[cb->cb_held_slot];
>  		break;
>  	case -ESERVERFAULT:
> -		++session->se_cb_seq_nr;
> +		++session->se_cb_seq_nr[cb->cb_held_slot];
>  		nfsd4_mark_cb_fault(cb->cb_clp);
>  		ret = false;
>  		break;
> @@ -1335,17 +1381,16 @@ static bool nfsd4_cb_sequence_done(struct rpc_task *task, struct nfsd4_callback
>  	case -NFS4ERR_BADSLOT:
>  		goto retry_nowait;
>  	case -NFS4ERR_SEQ_MISORDERED:
> -		if (session->se_cb_seq_nr != 1) {
> -			session->se_cb_seq_nr = 1;
> +		if (session->se_cb_seq_nr[cb->cb_held_slot] != 1) {
> +			session->se_cb_seq_nr[cb->cb_held_slot] = 1;
>  			goto retry_nowait;
>  		}
>  		break;
>  	default:
>  		nfsd4_mark_cb_fault(cb->cb_clp);
>  	}
> -	nfsd41_cb_release_slot(cb);
> -
>  	trace_nfsd_cb_free_slot(task, cb);
> +	nfsd41_cb_release_slot(cb);
>  
>  	if (RPC_SIGNALLED(task))
>  		goto need_restart;
> @@ -1565,7 +1610,7 @@ void nfsd4_init_cb(struct nfsd4_callback *cb, struct nfs4_client *clp,
>  	INIT_WORK(&cb->cb_work, nfsd4_run_cb_work);
>  	cb->cb_status = 0;
>  	cb->cb_need_restart = false;
> -	cb->cb_holds_slot = false;
> +	cb->cb_held_slot = -1;
>  }
>  
>  /**
> diff --git a/fs/nfsd/nfs4state.c b/fs/nfsd/nfs4state.c
> index 5b718b349396f1aecd0ad4c63b2f43342841bbd4..20a0d40202e40eed1c84d5d6c0a85b908804a6ba 100644
> --- a/fs/nfsd/nfs4state.c
> +++ b/fs/nfsd/nfs4state.c
> @@ -2002,6 +2002,8 @@ static struct nfsd4_session *alloc_session(struct nfsd4_channel_attrs *fattrs,
>  	}
>  
>  	memcpy(&new->se_fchannel, fattrs, sizeof(struct nfsd4_channel_attrs));
> +	new->se_cb_slot_avail = ~0U;
> +	new->se_cb_highest_slot = battrs->maxreqs - 1;
>  	return new;
>  out_free:
>  	while (i--)
> @@ -2132,7 +2134,9 @@ static void init_session(struct svc_rqst *rqstp, struct nfsd4_session *new, stru
>  
>  	INIT_LIST_HEAD(&new->se_conns);
>  
> -	new->se_cb_seq_nr = 1;
> +	for (idx = 0; idx < NFSD_BC_SLOT_TABLE_MAX; ++idx)
> +		new->se_cb_seq_nr[idx] = 1;
> +
>  	new->se_flags = cses->flags;
>  	new->se_cb_prog = cses->callback_prog;
>  	new->se_cb_sec = cses->cb_sec;
> @@ -3159,7 +3163,6 @@ static struct nfs4_client *create_client(struct xdr_netobj name,
>  	kref_init(&clp->cl_nfsdfs.cl_ref);
>  	nfsd4_init_cb(&clp->cl_cb_null, clp, NULL, NFSPROC4_CLNT_CB_NULL);
>  	clp->cl_time = ktime_get_boottime_seconds();
> -	clear_bit(0, &clp->cl_cb_slot_busy);
>  	copy_verf(clp, verf);
>  	memcpy(&clp->cl_addr, sa, sizeof(struct sockaddr_storage));
>  	clp->cl_cb_session = NULL;
> diff --git a/fs/nfsd/state.h b/fs/nfsd/state.h
> index 41cda86fea1f6166a0fd0215d3d458c93ced3e6a..2987c362bdd56251e736879dc89302ada2259be8 100644
> --- a/fs/nfsd/state.h
> +++ b/fs/nfsd/state.h
> @@ -71,8 +71,8 @@ struct nfsd4_callback {
>  	struct work_struct cb_work;
>  	int cb_seq_status;
>  	int cb_status;
> +	int cb_held_slot;
>  	bool cb_need_restart;
> -	bool cb_holds_slot;
>  };
>  
>  struct nfsd4_callback_ops {
> @@ -307,6 +307,9 @@ struct nfsd4_conn {
>  	unsigned char cn_flags;
>  };
>  
> +/* Max number of slots that the server will use in the backchannel */
> +#define NFSD_BC_SLOT_TABLE_MAX	32
> +

The new comment is unclear about whether this is an implementation
limit or a protocol limit. I suggest:

/* Maximum number of slots that NFSD implements for NFSv4.1+ backchannel */

And make this "sizeof(u32) * 8" or something similar that documents
where the value of this limit comes from.

>  /*
>   * Representation of a v4.1+ session. These are refcounted in a similar fashion
>   * to the nfs4_client. References are only taken when the server is actively
> @@ -325,7 +328,9 @@ struct nfsd4_session {
>  	struct nfsd4_cb_sec	se_cb_sec;
>  	struct list_head	se_conns;
>  	u32			se_cb_prog;
> -	u32			se_cb_seq_nr;
> +	u32			se_cb_slot_avail; /* bitmap of available slots */
> +	u32			se_cb_highest_slot;	/* highest slot client wants */
> +	u32			se_cb_seq_nr[NFSD_BC_SLOT_TABLE_MAX];
>  	struct nfsd4_slot	*se_slots[];	/* forward channel slots */
>  };
>  
> @@ -459,9 +464,6 @@ struct nfs4_client {
>  	 */
>  	struct dentry		*cl_nfsd_info_dentry;
>  
> -	/* for nfs41 callbacks */
> -	/* We currently support a single back channel with a single slot */
> -	unsigned long		cl_cb_slot_busy;
>  	struct rpc_wait_queue	cl_cb_waitq;	/* backchannel callers may */
>  						/* wait here for slots */
>  	struct net		*net;
> diff --git a/fs/nfsd/trace.h b/fs/nfsd/trace.h
> index f318898cfc31614b5a84a4867e18c2b3a07122c9..a9c17186b6892f1df8d7f7b90e250c2913ab23fe 100644
> --- a/fs/nfsd/trace.h
> +++ b/fs/nfsd/trace.h
> @@ -1697,7 +1697,7 @@ TRACE_EVENT(nfsd_cb_free_slot,
>  		__entry->cl_id = sid->clientid.cl_id;
>  		__entry->seqno = sid->sequence;
>  		__entry->reserved = sid->reserved;
> -		__entry->slot_seqno = session->se_cb_seq_nr;
> +		__entry->slot_seqno = session->se_cb_seq_nr[cb->cb_held_slot];
>  	),
>  	TP_printk(SUNRPC_TRACE_TASK_SPECIFIER
>  		" sessionid=%08x:%08x:%08x:%08x new slot seqno=%u",
> 
> -- 
> 2.47.0
> 

-- 
Chuck Lever
Re: [PATCH 2/2] nfsd: allow for more callback session slots
Posted by Jeff Layton 3 weeks, 6 days ago
On Mon, 2024-10-28 at 15:05 -0400, Chuck Lever wrote:
> On Mon, Oct 28, 2024 at 10:26:27AM -0400, Jeff Layton wrote:
> > nfsd currently only uses a single slot in the callback channel, which is
> > proving to be a bottleneck in some cases. Widen the callback channel to
> > a max of 32 slots (subject to the client's target_maxreqs value).
> > 
> > Change the cb_holds_slot boolean to an integer that tracks the current
> > slot number (with -1 meaning "unassigned").  Move the callback slot
> > tracking info into the session. Add a new u32 that acts as a bitmap to
> > track which slots are in use, and a u32 to track the latest callback
> > target_slotid that the client reports. While they are part of the
> > session, the fields are protected by the cl_lock.
> > 
> > Fix nfsd41_cb_get_slot to always search for the lowest slotid (using
> > ffs()), and change it to continually retry until there is a slot
> > available.
> > 
> > Finally, convert the session->se_cb_seq_nr field into an array of
> > counters and add the necessary handling to ensure that the seqids get
> > reset at the appropriate times.
> > 
> > Signed-off-by: Jeff Layton <jlayton@kernel.org>
> > ---
> >  fs/nfsd/nfs4callback.c | 107 +++++++++++++++++++++++++++++++++++--------------
> >  fs/nfsd/nfs4state.c    |   7 +++-
> >  fs/nfsd/state.h        |  12 +++---
> >  fs/nfsd/trace.h        |   2 +-
> >  4 files changed, 89 insertions(+), 39 deletions(-)
> > 
> > diff --git a/fs/nfsd/nfs4callback.c b/fs/nfsd/nfs4callback.c
> > index e38fa834b3d91333acf1425eb14c644e5d5f2601..64b85b164125b244494f9805840a0d8a1ccb4c1b 100644
> > --- a/fs/nfsd/nfs4callback.c
> > +++ b/fs/nfsd/nfs4callback.c
> > @@ -406,6 +406,16 @@ encode_cb_getattr4args(struct xdr_stream *xdr, struct nfs4_cb_compound_hdr *hdr,
> >  	hdr->nops++;
> >  }
> >  
> > +static u32 highest_unset_index(u32 word)
> > +{
> > +	int i;
> > +
> > +	for (i = sizeof(word) * 8 - 1; i > 0; --i)
> > +		if (!(word & BIT(i)))
> > +			return i;
> > +	return 0;
> > +}
> > +
> 
> Isn't this the same as ffz() or "ffs(~(x))" ?
> 

No. I need the _last_ cleared bit in the word. It looks though like
there is a fls() function that might be usable, so I could maybe do
fls(~(x)). I'll experiment with that.

> 
> >  /*
> >   * CB_SEQUENCE4args
> >   *
> > @@ -432,15 +442,38 @@ static void encode_cb_sequence4args(struct xdr_stream *xdr,
> >  	encode_sessionid4(xdr, session);
> >  
> >  	p = xdr_reserve_space(xdr, 4 + 4 + 4 + 4 + 4);
> > -	*p++ = cpu_to_be32(session->se_cb_seq_nr);	/* csa_sequenceid */
> > -	*p++ = xdr_zero;			/* csa_slotid */
> > -	*p++ = xdr_zero;			/* csa_highest_slotid */
> > +	*p++ = cpu_to_be32(session->se_cb_seq_nr[cb->cb_held_slot]);	/* csa_sequenceid */
> > +	*p++ = cpu_to_be32(cb->cb_held_slot);		/* csa_slotid */
> > +	*p++ = cpu_to_be32(max(highest_unset_index(session->se_cb_slot_avail),
> > +			       session->se_cb_highest_slot)); /* csa_highest_slotid */
> 
> This encoder doesn't hold the client's cl_lock, but does reference
> fields that are updated while that lock is held.
> 
> Also, should a per-session spin lock be used instead of cl_lock?
> 
> Is locking even necessary? All of these calls are prepared in a
> single-threaded workqueue. Reply processing can only free slots, so
> it doesn't need to exclude session slot selection.
> 

NFSv4 callbacks use async RPCs. The encoding and transmit is done in
the context of the (single-threaded) workqueue, but the reply is
handled by rpciod. Since we release the slot after the reply in rpciod
context, I think we do require locking here. I also worry a bit that
relying on the single-threaded workqueue may end up being a bottleneck
at some point in the future, so I see having clear locking here as a
benefit.

I'm open to switching to a per-session lock of some sort, but I don't
see a real need here. Only one session will be used as the backchannel
at a time, so there shouldn't be competing access between different
sessions for the cl_lock. We are competing with the other uses of the
cl_lock, but this one should be pretty quick. My preference would be to
add extra locking only once it becomes clear that it's necessary.

> 
> >  	*p++ = xdr_zero;			/* csa_cachethis */
> >  	xdr_encode_empty_array(p);		/* csa_referring_call_lists */
> >  
> >  	hdr->nops++;
> >  }
> >  
> > +static void update_cb_slot_table(struct nfsd4_session *ses, u32 highest)
> 
> Nit: Can you use the name "target" instead of "highest" ?
> 

Will do.

> 
> > +{
> > +	/* No need to do anything if nothing changed */
> > +	if (likely(highest == READ_ONCE(ses->se_cb_highest_slot)))
> > +		return;
> > +
> > +	spin_lock(&ses->se_client->cl_lock);
> > +	/* If growing the slot table, reset any new sequences to 1 */
> > +	if (highest > ses->se_cb_highest_slot) {
> > +		int i;
> > +
> > +		for (i = ses->se_cb_highest_slot; i <= highest; ++i) {
> > +			/* beyond the end of the array? */
> > +			if (i >= NFSD_BC_SLOT_TABLE_MAX)
> > +				break;
> 
> Nit: why not cap "highest" at NFSD_BC_SLOT_TABLE_MAX before starting
> this for loop?
> 

Duh -- of course. Will fix.

> 
> > +			ses->se_cb_seq_nr[i] = 1;
> > +		}
> > +	}
> > +	ses->se_cb_highest_slot = highest;
> > +	spin_unlock(&ses->se_client->cl_lock);
> > +}
> > +
> >  /*
> >   * CB_SEQUENCE4resok
> >   *
> > @@ -485,7 +518,7 @@ static int decode_cb_sequence4resok(struct xdr_stream *xdr,
> >  	p += XDR_QUADLEN(NFS4_MAX_SESSIONID_LEN);
> >  
> >  	dummy = be32_to_cpup(p++);
> > -	if (dummy != session->se_cb_seq_nr) {
> > +	if (dummy != session->se_cb_seq_nr[cb->cb_held_slot]) {
> 
> Nit: Let's rename "dummy" as "seqid", and add a "highest" variable
> for the next XDR field (not shown here).
> 

Ok.

> 
> >  		dprintk("NFS: %s Invalid sequence number\n", __func__);
> >  		goto out;
> >  	}
> > @@ -496,9 +529,15 @@ static int decode_cb_sequence4resok(struct xdr_stream *xdr,
> >  		goto out;
> >  	}
> >  
> > -	/*
> > -	 * FIXME: process highest slotid and target highest slotid
> > -	 */
> > +	p++; // ignore current highest slot value
> > +
> > +	dummy = be32_to_cpup(p++);
> 
> Nit: I prefer a name for this argument variable like "target".
> 
> The compiler should be able to combine the usage of these variables
> into a single memory location.
> 

Ok.

> 
> > +	if (dummy == 0) {
> > +		dprintk("NFS: %s Invalid target highest slotid\n", __func__);
> > +		goto out;
> > +	}
> > +
> > +	update_cb_slot_table(session, dummy);
> >  	status = 0;
> >  out:
> >  	cb->cb_seq_status = status;
> > @@ -1208,31 +1247,38 @@ void nfsd4_change_callback(struct nfs4_client *clp, struct nfs4_cb_conn *conn)
> >   * If the slot is available, then mark it busy.  Otherwise, set the
> >   * thread for sleeping on the callback RPC wait queue.
> >   */
> > -static bool nfsd41_cb_get_slot(struct nfsd4_callback *cb, struct rpc_task *task)
> > +static void nfsd41_cb_get_slot(struct nfsd4_callback *cb, struct rpc_task *task)
> >  {
> >  	struct nfs4_client *clp = cb->cb_clp;
> > +	struct nfsd4_session *ses = clp->cl_cb_session;
> > +	int idx;
> >  
> > -	if (!cb->cb_holds_slot &&
> > -	    test_and_set_bit(0, &clp->cl_cb_slot_busy) != 0) {
> > +	if (cb->cb_held_slot >= 0)
> > +		return;
> > +retry:
> > +	spin_lock(&clp->cl_lock);
> > +	idx = ffs(ses->se_cb_slot_avail) - 1;
> > +	if (idx < 0 || idx > ses->se_cb_highest_slot) {
> > +		spin_unlock(&clp->cl_lock);
> >  		rpc_sleep_on(&clp->cl_cb_waitq, task, NULL);
> > -		/* Race breaker */
> > -		if (test_and_set_bit(0, &clp->cl_cb_slot_busy) != 0) {
> > -			dprintk("%s slot is busy\n", __func__);
> > -			return false;
> > -		}
> > -		rpc_wake_up_queued_task(&clp->cl_cb_waitq, task);
> > +		goto retry;
> >  	}
> > -	cb->cb_holds_slot = true;
> > -	return true;
> > +	/* clear the bit for the slot */
> > +	ses->se_cb_slot_avail &= ~BIT(idx);
> > +	spin_unlock(&clp->cl_lock);
> > +	cb->cb_held_slot = idx;
> >  }
> >  
> >  static void nfsd41_cb_release_slot(struct nfsd4_callback *cb)
> >  {
> >  	struct nfs4_client *clp = cb->cb_clp;
> > +	struct nfsd4_session *ses = clp->cl_cb_session;
> >  
> > -	if (cb->cb_holds_slot) {
> > -		cb->cb_holds_slot = false;
> > -		clear_bit(0, &clp->cl_cb_slot_busy);
> > +	if (cb->cb_held_slot >= 0) {
> > +		spin_lock(&clp->cl_lock);
> > +		ses->se_cb_slot_avail |= BIT(cb->cb_held_slot);
> > +		spin_unlock(&clp->cl_lock);
> > +		cb->cb_held_slot = -1;
> >  		rpc_wake_up_next(&clp->cl_cb_waitq);
> >  	}
> >  }
> > @@ -1265,8 +1311,8 @@ static void nfsd4_cb_prepare(struct rpc_task *task, void *calldata)
> 
> Nit: This patch should update the documenting comment before
> nfsd4_cb_prepare() -- the patch implements "multiple slots".
> 

+1

> 
> >  	trace_nfsd_cb_rpc_prepare(clp);
> >  	cb->cb_seq_status = 1;
> >  	cb->cb_status = 0;
> > -	if (minorversion && !nfsd41_cb_get_slot(cb, task))
> > -		return;
> > +	if (minorversion)
> > +		nfsd41_cb_get_slot(cb, task);
> >  	rpc_call_start(task);
> >  }
> >  
> > @@ -1292,7 +1338,7 @@ static bool nfsd4_cb_sequence_done(struct rpc_task *task, struct nfsd4_callback
> >  		return true;
> >  	}
> >  
> > -	if (!cb->cb_holds_slot)
> > +	if (cb->cb_held_slot < 0)
> >  		goto need_restart;
> >  
> >  	/* This is the operation status code for CB_SEQUENCE */
> > @@ -1306,10 +1352,10 @@ static bool nfsd4_cb_sequence_done(struct rpc_task *task, struct nfsd4_callback
> >  		 * If CB_SEQUENCE returns an error, then the state of the slot
> >  		 * (sequence ID, cached reply) MUST NOT change.
> >  		 */
> > -		++session->se_cb_seq_nr;
> > +		++session->se_cb_seq_nr[cb->cb_held_slot];
> >  		break;
> >  	case -ESERVERFAULT:
> > -		++session->se_cb_seq_nr;
> > +		++session->se_cb_seq_nr[cb->cb_held_slot];
> >  		nfsd4_mark_cb_fault(cb->cb_clp);
> >  		ret = false;
> >  		break;
> > @@ -1335,17 +1381,16 @@ static bool nfsd4_cb_sequence_done(struct rpc_task *task, struct nfsd4_callback
> >  	case -NFS4ERR_BADSLOT:
> >  		goto retry_nowait;
> >  	case -NFS4ERR_SEQ_MISORDERED:
> > -		if (session->se_cb_seq_nr != 1) {
> > -			session->se_cb_seq_nr = 1;
> > +		if (session->se_cb_seq_nr[cb->cb_held_slot] != 1) {
> > +			session->se_cb_seq_nr[cb->cb_held_slot] = 1;
> >  			goto retry_nowait;
> >  		}
> >  		break;
> >  	default:
> >  		nfsd4_mark_cb_fault(cb->cb_clp);
> >  	}
> > -	nfsd41_cb_release_slot(cb);
> > -
> >  	trace_nfsd_cb_free_slot(task, cb);
> > +	nfsd41_cb_release_slot(cb);
> >  
> >  	if (RPC_SIGNALLED(task))
> >  		goto need_restart;
> > @@ -1565,7 +1610,7 @@ void nfsd4_init_cb(struct nfsd4_callback *cb, struct nfs4_client *clp,
> >  	INIT_WORK(&cb->cb_work, nfsd4_run_cb_work);
> >  	cb->cb_status = 0;
> >  	cb->cb_need_restart = false;
> > -	cb->cb_holds_slot = false;
> > +	cb->cb_held_slot = -1;
> >  }
> >  
> >  /**
> > diff --git a/fs/nfsd/nfs4state.c b/fs/nfsd/nfs4state.c
> > index 5b718b349396f1aecd0ad4c63b2f43342841bbd4..20a0d40202e40eed1c84d5d6c0a85b908804a6ba 100644
> > --- a/fs/nfsd/nfs4state.c
> > +++ b/fs/nfsd/nfs4state.c
> > @@ -2002,6 +2002,8 @@ static struct nfsd4_session *alloc_session(struct nfsd4_channel_attrs *fattrs,
> >  	}
> >  
> >  	memcpy(&new->se_fchannel, fattrs, sizeof(struct nfsd4_channel_attrs));
> > +	new->se_cb_slot_avail = ~0U;
> > +	new->se_cb_highest_slot = battrs->maxreqs - 1;
> >  	return new;
> >  out_free:
> >  	while (i--)
> > @@ -2132,7 +2134,9 @@ static void init_session(struct svc_rqst *rqstp, struct nfsd4_session *new, stru
> >  
> >  	INIT_LIST_HEAD(&new->se_conns);
> >  
> > -	new->se_cb_seq_nr = 1;
> > +	for (idx = 0; idx < NFSD_BC_SLOT_TABLE_MAX; ++idx)
> > +		new->se_cb_seq_nr[idx] = 1;
> > +
> >  	new->se_flags = cses->flags;
> >  	new->se_cb_prog = cses->callback_prog;
> >  	new->se_cb_sec = cses->cb_sec;
> > @@ -3159,7 +3163,6 @@ static struct nfs4_client *create_client(struct xdr_netobj name,
> >  	kref_init(&clp->cl_nfsdfs.cl_ref);
> >  	nfsd4_init_cb(&clp->cl_cb_null, clp, NULL, NFSPROC4_CLNT_CB_NULL);
> >  	clp->cl_time = ktime_get_boottime_seconds();
> > -	clear_bit(0, &clp->cl_cb_slot_busy);
> >  	copy_verf(clp, verf);
> >  	memcpy(&clp->cl_addr, sa, sizeof(struct sockaddr_storage));
> >  	clp->cl_cb_session = NULL;
> > diff --git a/fs/nfsd/state.h b/fs/nfsd/state.h
> > index 41cda86fea1f6166a0fd0215d3d458c93ced3e6a..2987c362bdd56251e736879dc89302ada2259be8 100644
> > --- a/fs/nfsd/state.h
> > +++ b/fs/nfsd/state.h
> > @@ -71,8 +71,8 @@ struct nfsd4_callback {
> >  	struct work_struct cb_work;
> >  	int cb_seq_status;
> >  	int cb_status;
> > +	int cb_held_slot;
> >  	bool cb_need_restart;
> > -	bool cb_holds_slot;
> >  };
> >  
> >  struct nfsd4_callback_ops {
> > @@ -307,6 +307,9 @@ struct nfsd4_conn {
> >  	unsigned char cn_flags;
> >  };
> >  
> > +/* Max number of slots that the server will use in the backchannel */
> > +#define NFSD_BC_SLOT_TABLE_MAX	32
> > +
> 
> The new comment is unclear about whether this is an implementation
> limit or a protocol limit. I suggest:
> 
> /* Maximum number of slots that NFSD implements for NFSv4.1+ backchannel */
> 
> And make this "sizeof(u32) * 8" or something similar that documents
> where the value of this limit comes from.
> 

Ok.

> >  /*
> >   * Representation of a v4.1+ session. These are refcounted in a similar fashion
> >   * to the nfs4_client. References are only taken when the server is actively
> > @@ -325,7 +328,9 @@ struct nfsd4_session {
> >  	struct nfsd4_cb_sec	se_cb_sec;
> >  	struct list_head	se_conns;
> >  	u32			se_cb_prog;
> > -	u32			se_cb_seq_nr;
> > +	u32			se_cb_slot_avail; /* bitmap of available slots */
> > +	u32			se_cb_highest_slot;	/* highest slot client wants */
> > +	u32			se_cb_seq_nr[NFSD_BC_SLOT_TABLE_MAX];
> >  	struct nfsd4_slot	*se_slots[];	/* forward channel slots */
> >  };
> >  
> > @@ -459,9 +464,6 @@ struct nfs4_client {
> >  	 */
> >  	struct dentry		*cl_nfsd_info_dentry;
> >  
> > -	/* for nfs41 callbacks */
> > -	/* We currently support a single back channel with a single slot */
> > -	unsigned long		cl_cb_slot_busy;
> >  	struct rpc_wait_queue	cl_cb_waitq;	/* backchannel callers may */
> >  						/* wait here for slots */
> >  	struct net		*net;
> > diff --git a/fs/nfsd/trace.h b/fs/nfsd/trace.h
> > index f318898cfc31614b5a84a4867e18c2b3a07122c9..a9c17186b6892f1df8d7f7b90e250c2913ab23fe 100644
> > --- a/fs/nfsd/trace.h
> > +++ b/fs/nfsd/trace.h
> > @@ -1697,7 +1697,7 @@ TRACE_EVENT(nfsd_cb_free_slot,
> >  		__entry->cl_id = sid->clientid.cl_id;
> >  		__entry->seqno = sid->sequence;
> >  		__entry->reserved = sid->reserved;
> > -		__entry->slot_seqno = session->se_cb_seq_nr;
> > +		__entry->slot_seqno = session->se_cb_seq_nr[cb->cb_held_slot];
> >  	),
> >  	TP_printk(SUNRPC_TRACE_TASK_SPECIFIER
> >  		" sessionid=%08x:%08x:%08x:%08x new slot seqno=%u",
> > 
> > -- 
> > 2.47.0
> > 
> 

-- 
Jeff Layton <jlayton@kernel.org>
Re: [PATCH 2/2] nfsd: allow for more callback session slots
Posted by Tom Talpey 3 weeks, 5 days ago
On 10/29/2024 6:28 AM, Jeff Layton wrote:
> I'm open to switching to a per-session lock of some sort, but I don't
> see a real need here. Only one session will be used as the backchannel
> at a time, so there shouldn't be competing access between different
> sessions for the cl_lock. We are competing with the other uses of the
> cl_lock, but this one should be pretty quick. My preference would be to
> add extra locking only once it becomes clear that it's necessary.
I have a question on what you mean by "Only one session will be used
as the backchannel". Does this mean that the server ignores backchannels
for all but one random victim? That doesn't seem fair, or efficient.
And what happens with nconnect > 1?

Another question is, what clients are offering this many backchannel
slots?

Tom.
Re: [PATCH 2/2] nfsd: allow for more callback session slots
Posted by Jeff Layton 3 weeks, 5 days ago
On Tue, 2024-10-29 at 13:54 -0400, Tom Talpey wrote:
> On 10/29/2024 6:28 AM, Jeff Layton wrote:
> > I'm open to switching to a per-session lock of some sort, but I don't
> > see a real need here. Only one session will be used as the backchannel
> > at a time, so there shouldn't be competing access between different
> > sessions for the cl_lock. We are competing with the other uses of the
> > cl_lock, but this one should be pretty quick. My preference would be to
> > add extra locking only once it becomes clear that it's necessary.
>
> I have a question on what you mean by "Only one session will be used
> as the backchannel". Does this mean that the server ignores backchannels
> for all but one random victim? That doesn't seem fair, or efficient.
> And what happens with nconnect > 1?
> 

The server currently picks a single session that it designates as the
backchannel (the clp->cl_cb_session). All the callbacks go over that
session's connections until it's reevaluated in
nfsd4_process_cb_update.

I'm not sure what the server does with nconnect on the backchannel, but
that's a separate project anyway. Getting to the point where we can
send more than one v4.1+ callback at a time is the first step.

> Another question is, what clients are offering this many backchannel
> slots?
> 

The Linux NFS client offers 16 slots. I overshot that a little with
this implementation, but the extra memory cost is trivial (an extra 64
bytes per client).
-- 
Jeff Layton <jlayton@kernel.org>
Re: [PATCH 2/2] nfsd: allow for more callback session slots
Posted by Chuck Lever III 3 weeks, 5 days ago

> On Oct 29, 2024, at 6:28 AM, Jeff Layton <jlayton@kernel.org> wrote:
> 
> On Mon, 2024-10-28 at 15:05 -0400, Chuck Lever wrote:
>> On Mon, Oct 28, 2024 at 10:26:27AM -0400, Jeff Layton wrote:
>>> nfsd currently only uses a single slot in the callback channel, which is
>>> proving to be a bottleneck in some cases. Widen the callback channel to
>>> a max of 32 slots (subject to the client's target_maxreqs value).
>>> 
>>> Change the cb_holds_slot boolean to an integer that tracks the current
>>> slot number (with -1 meaning "unassigned").  Move the callback slot
>>> tracking info into the session. Add a new u32 that acts as a bitmap to
>>> track which slots are in use, and a u32 to track the latest callback
>>> target_slotid that the client reports. While they are part of the
>>> session, the fields are protected by the cl_lock.
>>> 
>>> Fix nfsd41_cb_get_slot to always search for the lowest slotid (using
>>> ffs()), and change it to continually retry until there is a slot
>>> available.
>>> 
>>> Finally, convert the session->se_cb_seq_nr field into an array of
>>> counters and add the necessary handling to ensure that the seqids get
>>> reset at the appropriate times.
>>> 
>>> Signed-off-by: Jeff Layton <jlayton@kernel.org>
>>> ---
>>> fs/nfsd/nfs4callback.c | 107 +++++++++++++++++++++++++++++++++++--------------
>>> fs/nfsd/nfs4state.c    |   7 +++-
>>> fs/nfsd/state.h        |  12 +++---
>>> fs/nfsd/trace.h        |   2 +-
>>> 4 files changed, 89 insertions(+), 39 deletions(-)
>>> 
>>> diff --git a/fs/nfsd/nfs4callback.c b/fs/nfsd/nfs4callback.c
>>> index e38fa834b3d91333acf1425eb14c644e5d5f2601..64b85b164125b244494f9805840a0d8a1ccb4c1b 100644
>>> --- a/fs/nfsd/nfs4callback.c
>>> +++ b/fs/nfsd/nfs4callback.c
>>> @@ -406,6 +406,16 @@ encode_cb_getattr4args(struct xdr_stream *xdr, struct nfs4_cb_compound_hdr *hdr,
>>> hdr->nops++;
>>> }
>>> 
>>> +static u32 highest_unset_index(u32 word)
>>> +{
>>> + int i;
>>> +
>>> + for (i = sizeof(word) * 8 - 1; i > 0; --i)
>>> + if (!(word & BIT(i)))
>>> + return i;
>>> + return 0;
>>> +}
>>> +
>> 
>> Isn't this the same as ffz() or "ffs(~(x))" ?
>> 
> 
> No. I need the _last_ cleared bit in the word. It looks though like
> there is a fls() function that might be usable, so I could maybe do
> fls(~(x)). I'll experiment with that.
> 
>> 
>>> /*
>>>  * CB_SEQUENCE4args
>>>  *
>>> @@ -432,15 +442,38 @@ static void encode_cb_sequence4args(struct xdr_stream *xdr,
>>> encode_sessionid4(xdr, session);
>>> 
>>> p = xdr_reserve_space(xdr, 4 + 4 + 4 + 4 + 4);
>>> - *p++ = cpu_to_be32(session->se_cb_seq_nr); /* csa_sequenceid */
>>> - *p++ = xdr_zero; /* csa_slotid */
>>> - *p++ = xdr_zero; /* csa_highest_slotid */
>>> + *p++ = cpu_to_be32(session->se_cb_seq_nr[cb->cb_held_slot]); /* csa_sequenceid */
>>> + *p++ = cpu_to_be32(cb->cb_held_slot); /* csa_slotid */
>>> + *p++ = cpu_to_be32(max(highest_unset_index(session->se_cb_slot_avail),
>>> +        session->se_cb_highest_slot)); /* csa_highest_slotid */
>> 
>> This encoder doesn't hold the client's cl_lock, but does reference
>> fields that are updated while that lock is held.
>> 
>> Also, should a per-session spin lock be used instead of cl_lock?
>> 
>> Is locking even necessary? All of these calls are prepared in a
>> single-threaded workqueue. Reply processing can only free slots, so
>> it doesn't need to exclude session slot selection.
>> 
> 
> NFSv4 callbacks use async RPCs. The encoding and transmit is done in
> the context of the (single-threaded) workqueue, but the reply is
> handled by rpciod. Since we release the slot after the reply in rpciod
> context, I think we do require locking here.

Fair enough, there are some shared data items that have RMW
accesses in both the Call and Reply path.


> I also worry a bit that
> relying on the single-threaded workqueue may end up being a bottleneck
> at some point in the future, so I see having clear locking here as a
> benefit.
> 
> I'm open to switching to a per-session lock of some sort, but I don't
> see a real need here. Only one session will be used as the backchannel
> at a time, so there shouldn't be competing access between different
> sessions for the cl_lock. We are competing with the other uses of the
> cl_lock, but this one should be pretty quick. My preference would be to
> add extra locking only once it becomes clear that it's necessary.

I agree that some day we want to escape from the single-thread
workqueue paradigm.

I don't think this usage will generate a lot of contention now,
but it will pull the lock's cache lines away from the hotter
users on occasion.

Also, I think a per-session lock is more self-documenting to
show precisely what fields are being protected. I've often
had to look at breaking up a larger lock and found it hard
to figure out exactly what needs protection and why.

Thus I like per-session locking a little better, with a brief
comment that explains the locking design. (It's not a strong
preference, but I think we will be better served going forward).


>>> *p++ = xdr_zero; /* csa_cachethis */
>>> xdr_encode_empty_array(p); /* csa_referring_call_lists */
>>> 
>>> hdr->nops++;
>>> }
>>> 
>>> +static void update_cb_slot_table(struct nfsd4_session *ses, u32 highest)
>> 
>> Nit: Can you use the name "target" instead of "highest" ?
>> 
> 
> Will do.
> 
>> 
>>> +{
>>> + /* No need to do anything if nothing changed */
>>> + if (likely(highest == READ_ONCE(ses->se_cb_highest_slot)))
>>> + return;
>>> +
>>> + spin_lock(&ses->se_client->cl_lock);
>>> + /* If growing the slot table, reset any new sequences to 1 */
>>> + if (highest > ses->se_cb_highest_slot) {
>>> + int i;
>>> +
>>> + for (i = ses->se_cb_highest_slot; i <= highest; ++i) {
>>> + /* beyond the end of the array? */
>>> + if (i >= NFSD_BC_SLOT_TABLE_MAX)
>>> + break;
>> 
>> Nit: why not cap "highest" at NFSD_BC_SLOT_TABLE_MAX before starting
>> this for loop?
>> 
> 
> Duh -- of course. Will fix.
> 
>> 
>>> + ses->se_cb_seq_nr[i] = 1;
>>> + }
>>> + }
>>> + ses->se_cb_highest_slot = highest;
>>> + spin_unlock(&ses->se_client->cl_lock);
>>> +}
>>> +
>>> /*
>>>  * CB_SEQUENCE4resok
>>>  *
>>> @@ -485,7 +518,7 @@ static int decode_cb_sequence4resok(struct xdr_stream *xdr,
>>> p += XDR_QUADLEN(NFS4_MAX_SESSIONID_LEN);
>>> 
>>> dummy = be32_to_cpup(p++);
>>> - if (dummy != session->se_cb_seq_nr) {
>>> + if (dummy != session->se_cb_seq_nr[cb->cb_held_slot]) {
>> 
>> Nit: Let's rename "dummy" as "seqid", and add a "highest" variable
>> for the next XDR field (not shown here).
>> 
> 
> Ok.
> 
>> 
>>> dprintk("NFS: %s Invalid sequence number\n", __func__);
>>> goto out;
>>> }
>>> @@ -496,9 +529,15 @@ static int decode_cb_sequence4resok(struct xdr_stream *xdr,
>>> goto out;
>>> }
>>> 
>>> - /*
>>> -  * FIXME: process highest slotid and target highest slotid
>>> -  */
>>> + p++; // ignore current highest slot value
>>> +
>>> + dummy = be32_to_cpup(p++);
>> 
>> Nit: I prefer a name for this argument variable like "target".
>> 
>> The compiler should be able to combine the usage of these variables
>> into a single memory location.
>> 
> 
> Ok.
> 
>> 
>>> + if (dummy == 0) {
>>> + dprintk("NFS: %s Invalid target highest slotid\n", __func__);
>>> + goto out;
>>> + }
>>> +
>>> + update_cb_slot_table(session, dummy);
>>> status = 0;
>>> out:
>>> cb->cb_seq_status = status;
>>> @@ -1208,31 +1247,38 @@ void nfsd4_change_callback(struct nfs4_client *clp, struct nfs4_cb_conn *conn)
>>>  * If the slot is available, then mark it busy.  Otherwise, set the
>>>  * thread for sleeping on the callback RPC wait queue.
>>>  */
>>> -static bool nfsd41_cb_get_slot(struct nfsd4_callback *cb, struct rpc_task *task)
>>> +static void nfsd41_cb_get_slot(struct nfsd4_callback *cb, struct rpc_task *task)
>>> {
>>> struct nfs4_client *clp = cb->cb_clp;
>>> + struct nfsd4_session *ses = clp->cl_cb_session;
>>> + int idx;
>>> 
>>> - if (!cb->cb_holds_slot &&
>>> -     test_and_set_bit(0, &clp->cl_cb_slot_busy) != 0) {
>>> + if (cb->cb_held_slot >= 0)
>>> + return;
>>> +retry:
>>> + spin_lock(&clp->cl_lock);
>>> + idx = ffs(ses->se_cb_slot_avail) - 1;
>>> + if (idx < 0 || idx > ses->se_cb_highest_slot) {
>>> + spin_unlock(&clp->cl_lock);
>>> rpc_sleep_on(&clp->cl_cb_waitq, task, NULL);
>>> - /* Race breaker */
>>> - if (test_and_set_bit(0, &clp->cl_cb_slot_busy) != 0) {
>>> - dprintk("%s slot is busy\n", __func__);
>>> - return false;
>>> - }
>>> - rpc_wake_up_queued_task(&clp->cl_cb_waitq, task);
>>> + goto retry;
>>> }
>>> - cb->cb_holds_slot = true;
>>> - return true;
>>> + /* clear the bit for the slot */
>>> + ses->se_cb_slot_avail &= ~BIT(idx);
>>> + spin_unlock(&clp->cl_lock);
>>> + cb->cb_held_slot = idx;
>>> }
>>> 
>>> static void nfsd41_cb_release_slot(struct nfsd4_callback *cb)
>>> {
>>> struct nfs4_client *clp = cb->cb_clp;
>>> + struct nfsd4_session *ses = clp->cl_cb_session;
>>> 
>>> - if (cb->cb_holds_slot) {
>>> - cb->cb_holds_slot = false;
>>> - clear_bit(0, &clp->cl_cb_slot_busy);
>>> + if (cb->cb_held_slot >= 0) {
>>> + spin_lock(&clp->cl_lock);
>>> + ses->se_cb_slot_avail |= BIT(cb->cb_held_slot);
>>> + spin_unlock(&clp->cl_lock);
>>> + cb->cb_held_slot = -1;
>>> rpc_wake_up_next(&clp->cl_cb_waitq);
>>> }
>>> }
>>> @@ -1265,8 +1311,8 @@ static void nfsd4_cb_prepare(struct rpc_task *task, void *calldata)
>> 
>> Nit: This patch should update the documenting comment before
>> nfsd4_cb_prepare() -- the patch implements "multiple slots".
>> 
> 
> +1
> 
>> 
>>> trace_nfsd_cb_rpc_prepare(clp);
>>> cb->cb_seq_status = 1;
>>> cb->cb_status = 0;
>>> - if (minorversion && !nfsd41_cb_get_slot(cb, task))
>>> - return;
>>> + if (minorversion)
>>> + nfsd41_cb_get_slot(cb, task);
>>> rpc_call_start(task);
>>> }
>>> 
>>> @@ -1292,7 +1338,7 @@ static bool nfsd4_cb_sequence_done(struct rpc_task *task, struct nfsd4_callback
>>> return true;
>>> }
>>> 
>>> - if (!cb->cb_holds_slot)
>>> + if (cb->cb_held_slot < 0)
>>> goto need_restart;
>>> 
>>> /* This is the operation status code for CB_SEQUENCE */
>>> @@ -1306,10 +1352,10 @@ static bool nfsd4_cb_sequence_done(struct rpc_task *task, struct nfsd4_callback
>>>  * If CB_SEQUENCE returns an error, then the state of the slot
>>>  * (sequence ID, cached reply) MUST NOT change.
>>>  */
>>> - ++session->se_cb_seq_nr;
>>> + ++session->se_cb_seq_nr[cb->cb_held_slot];
>>> break;
>>> case -ESERVERFAULT:
>>> - ++session->se_cb_seq_nr;
>>> + ++session->se_cb_seq_nr[cb->cb_held_slot];
>>> nfsd4_mark_cb_fault(cb->cb_clp);
>>> ret = false;
>>> break;
>>> @@ -1335,17 +1381,16 @@ static bool nfsd4_cb_sequence_done(struct rpc_task *task, struct nfsd4_callback
>>> case -NFS4ERR_BADSLOT:
>>> goto retry_nowait;
>>> case -NFS4ERR_SEQ_MISORDERED:
>>> - if (session->se_cb_seq_nr != 1) {
>>> - session->se_cb_seq_nr = 1;
>>> + if (session->se_cb_seq_nr[cb->cb_held_slot] != 1) {
>>> + session->se_cb_seq_nr[cb->cb_held_slot] = 1;
>>> goto retry_nowait;
>>> }
>>> break;
>>> default:
>>> nfsd4_mark_cb_fault(cb->cb_clp);
>>> }
>>> - nfsd41_cb_release_slot(cb);
>>> -
>>> trace_nfsd_cb_free_slot(task, cb);
>>> + nfsd41_cb_release_slot(cb);
>>> 
>>> if (RPC_SIGNALLED(task))
>>> goto need_restart;
>>> @@ -1565,7 +1610,7 @@ void nfsd4_init_cb(struct nfsd4_callback *cb, struct nfs4_client *clp,
>>> INIT_WORK(&cb->cb_work, nfsd4_run_cb_work);
>>> cb->cb_status = 0;
>>> cb->cb_need_restart = false;
>>> - cb->cb_holds_slot = false;
>>> + cb->cb_held_slot = -1;
>>> }
>>> 
>>> /**
>>> diff --git a/fs/nfsd/nfs4state.c b/fs/nfsd/nfs4state.c
>>> index 5b718b349396f1aecd0ad4c63b2f43342841bbd4..20a0d40202e40eed1c84d5d6c0a85b908804a6ba 100644
>>> --- a/fs/nfsd/nfs4state.c
>>> +++ b/fs/nfsd/nfs4state.c
>>> @@ -2002,6 +2002,8 @@ static struct nfsd4_session *alloc_session(struct nfsd4_channel_attrs *fattrs,
>>> }
>>> 
>>> memcpy(&new->se_fchannel, fattrs, sizeof(struct nfsd4_channel_attrs));
>>> + new->se_cb_slot_avail = ~0U;
>>> + new->se_cb_highest_slot = battrs->maxreqs - 1;
>>> return new;
>>> out_free:
>>> while (i--)
>>> @@ -2132,7 +2134,9 @@ static void init_session(struct svc_rqst *rqstp, struct nfsd4_session *new, stru
>>> 
>>> INIT_LIST_HEAD(&new->se_conns);
>>> 
>>> - new->se_cb_seq_nr = 1;
>>> + for (idx = 0; idx < NFSD_BC_SLOT_TABLE_MAX; ++idx)
>>> + new->se_cb_seq_nr[idx] = 1;
>>> +
>>> new->se_flags = cses->flags;
>>> new->se_cb_prog = cses->callback_prog;
>>> new->se_cb_sec = cses->cb_sec;
>>> @@ -3159,7 +3163,6 @@ static struct nfs4_client *create_client(struct xdr_netobj name,
>>> kref_init(&clp->cl_nfsdfs.cl_ref);
>>> nfsd4_init_cb(&clp->cl_cb_null, clp, NULL, NFSPROC4_CLNT_CB_NULL);
>>> clp->cl_time = ktime_get_boottime_seconds();
>>> - clear_bit(0, &clp->cl_cb_slot_busy);
>>> copy_verf(clp, verf);
>>> memcpy(&clp->cl_addr, sa, sizeof(struct sockaddr_storage));
>>> clp->cl_cb_session = NULL;
>>> diff --git a/fs/nfsd/state.h b/fs/nfsd/state.h
>>> index 41cda86fea1f6166a0fd0215d3d458c93ced3e6a..2987c362bdd56251e736879dc89302ada2259be8 100644
>>> --- a/fs/nfsd/state.h
>>> +++ b/fs/nfsd/state.h
>>> @@ -71,8 +71,8 @@ struct nfsd4_callback {
>>> struct work_struct cb_work;
>>> int cb_seq_status;
>>> int cb_status;
>>> + int cb_held_slot;
>>> bool cb_need_restart;
>>> - bool cb_holds_slot;
>>> };
>>> 
>>> struct nfsd4_callback_ops {
>>> @@ -307,6 +307,9 @@ struct nfsd4_conn {
>>> unsigned char cn_flags;
>>> };
>>> 
>>> +/* Max number of slots that the server will use in the backchannel */
>>> +#define NFSD_BC_SLOT_TABLE_MAX 32
>>> +
>> 
>> The new comment is unclear about whether this is an implementation
>> limit or a protocol limit. I suggest:
>> 
>> /* Maximum number of slots that NFSD implements for NFSv4.1+ backchannel */
>> 
>> And make this "sizeof(u32) * 8" or something similar that documents
>> where the value of this limit comes from.
>> 
> 
> Ok.
> 
>>> /*
>>>  * Representation of a v4.1+ session. These are refcounted in a similar fashion
>>>  * to the nfs4_client. References are only taken when the server is actively
>>> @@ -325,7 +328,9 @@ struct nfsd4_session {
>>> struct nfsd4_cb_sec se_cb_sec;
>>> struct list_head se_conns;
>>> u32 se_cb_prog;
>>> - u32 se_cb_seq_nr;
>>> + u32 se_cb_slot_avail; /* bitmap of available slots */
>>> + u32 se_cb_highest_slot; /* highest slot client wants */
>>> + u32 se_cb_seq_nr[NFSD_BC_SLOT_TABLE_MAX];
>>> struct nfsd4_slot *se_slots[]; /* forward channel slots */
>>> };
>>> 
>>> @@ -459,9 +464,6 @@ struct nfs4_client {
>>>  */
>>> struct dentry *cl_nfsd_info_dentry;
>>> 
>>> - /* for nfs41 callbacks */
>>> - /* We currently support a single back channel with a single slot */
>>> - unsigned long cl_cb_slot_busy;
>>> struct rpc_wait_queue cl_cb_waitq; /* backchannel callers may */
>>> /* wait here for slots */
>>> struct net *net;
>>> diff --git a/fs/nfsd/trace.h b/fs/nfsd/trace.h
>>> index f318898cfc31614b5a84a4867e18c2b3a07122c9..a9c17186b6892f1df8d7f7b90e250c2913ab23fe 100644
>>> --- a/fs/nfsd/trace.h
>>> +++ b/fs/nfsd/trace.h
>>> @@ -1697,7 +1697,7 @@ TRACE_EVENT(nfsd_cb_free_slot,
>>> __entry->cl_id = sid->clientid.cl_id;
>>> __entry->seqno = sid->sequence;
>>> __entry->reserved = sid->reserved;
>>> - __entry->slot_seqno = session->se_cb_seq_nr;
>>> + __entry->slot_seqno = session->se_cb_seq_nr[cb->cb_held_slot];
>>> ),
>>> TP_printk(SUNRPC_TRACE_TASK_SPECIFIER
>>> " sessionid=%08x:%08x:%08x:%08x new slot seqno=%u",
>>> 
>>> -- 
>>> 2.47.0
>>> 
>> 
> 
> -- 
> Jeff Layton <jlayton@kernel.org>


--
Chuck Lever


Re: [PATCH 2/2] nfsd: allow for more callback session slots
Posted by Jeff Layton 3 weeks, 6 days ago
On Mon, 2024-10-28 at 10:26 -0400, Jeff Layton wrote:
> nfsd currently only uses a single slot in the callback channel, which is
> proving to be a bottleneck in some cases. Widen the callback channel to
> a max of 32 slots (subject to the client's target_maxreqs value).
> 
> Change the cb_holds_slot boolean to an integer that tracks the current
> slot number (with -1 meaning "unassigned").  Move the callback slot
> tracking info into the session. Add a new u32 that acts as a bitmap to
> track which slots are in use, and a u32 to track the latest callback
> target_slotid that the client reports. While they are part of the
> session, the fields are protected by the cl_lock.
> 
> Fix nfsd41_cb_get_slot to always search for the lowest slotid (using
> ffs()), and change it to continually retry until there is a slot
> available.
> 
> Finally, convert the session->se_cb_seq_nr field into an array of
> counters and add the necessary handling to ensure that the seqids get
> reset at the appropriate times.
> 
> Signed-off-by: Jeff Layton <jlayton@kernel.org>
> ---
>  fs/nfsd/nfs4callback.c | 107 +++++++++++++++++++++++++++++++++++--------------
>  fs/nfsd/nfs4state.c    |   7 +++-
>  fs/nfsd/state.h        |  12 +++---
>  fs/nfsd/trace.h        |   2 +-
>  4 files changed, 89 insertions(+), 39 deletions(-)
> 
> diff --git a/fs/nfsd/nfs4callback.c b/fs/nfsd/nfs4callback.c
> index e38fa834b3d91333acf1425eb14c644e5d5f2601..64b85b164125b244494f9805840a0d8a1ccb4c1b 100644
> --- a/fs/nfsd/nfs4callback.c
> +++ b/fs/nfsd/nfs4callback.c
> @@ -406,6 +406,16 @@ encode_cb_getattr4args(struct xdr_stream *xdr, struct nfs4_cb_compound_hdr *hdr,
>  	hdr->nops++;
>  }
>  
> +static u32 highest_unset_index(u32 word)
> +{
> +	int i;
> +
> +	for (i = sizeof(word) * 8 - 1; i > 0; --i)
> +		if (!(word & BIT(i)))
> +			return i;
> +	return 0;
> +}
> +
>  /*
>   * CB_SEQUENCE4args
>   *
> @@ -432,15 +442,38 @@ static void encode_cb_sequence4args(struct xdr_stream *xdr,
>  	encode_sessionid4(xdr, session);
>  
>  	p = xdr_reserve_space(xdr, 4 + 4 + 4 + 4 + 4);
> -	*p++ = cpu_to_be32(session->se_cb_seq_nr);	/* csa_sequenceid */
> -	*p++ = xdr_zero;			/* csa_slotid */
> -	*p++ = xdr_zero;			/* csa_highest_slotid */
> +	*p++ = cpu_to_be32(session->se_cb_seq_nr[cb->cb_held_slot]);	/* csa_sequenceid */
> +	*p++ = cpu_to_be32(cb->cb_held_slot);		/* csa_slotid */
> +	*p++ = cpu_to_be32(max(highest_unset_index(session->se_cb_slot_avail),
> +			       session->se_cb_highest_slot)); /* csa_highest_slotid */
>  	*p++ = xdr_zero;			/* csa_cachethis */
>  	xdr_encode_empty_array(p);		/* csa_referring_call_lists */
>  
>  	hdr->nops++;
>  }
>  
> +static void update_cb_slot_table(struct nfsd4_session *ses, u32 highest)
> +{
> +	/* No need to do anything if nothing changed */
> +	if (likely(highest == READ_ONCE(ses->se_cb_highest_slot)))
> +		return;
> +
> +	spin_lock(&ses->se_client->cl_lock);
> +	/* If growing the slot table, reset any new sequences to 1 */
> +	if (highest > ses->se_cb_highest_slot) {
> +		int i;
> +
> +		for (i = ses->se_cb_highest_slot; i <= highest; ++i) {

That should be:

	for (i = ses->se_cb_highest_slot + 1; i <= highest; ++i) {

IOW, we only want to reset the new slots, not any that could be in use.
Fixed in my tree.

> +			/* beyond the end of the array? */
> +			if (i >= NFSD_BC_SLOT_TABLE_MAX)
> +				break;
> +			ses->se_cb_seq_nr[i] = 1;
> +		}
> +	}
> +	ses->se_cb_highest_slot = highest;
> +	spin_unlock(&ses->se_client->cl_lock);
> +}
> +
>  /*
>   * CB_SEQUENCE4resok
>   *
> @@ -485,7 +518,7 @@ static int decode_cb_sequence4resok(struct xdr_stream *xdr,
>  	p += XDR_QUADLEN(NFS4_MAX_SESSIONID_LEN);
>  
>  	dummy = be32_to_cpup(p++);
> -	if (dummy != session->se_cb_seq_nr) {
> +	if (dummy != session->se_cb_seq_nr[cb->cb_held_slot]) {
>  		dprintk("NFS: %s Invalid sequence number\n", __func__);
>  		goto out;
>  	}
> @@ -496,9 +529,15 @@ static int decode_cb_sequence4resok(struct xdr_stream *xdr,
>  		goto out;
>  	}
>  
> -	/*
> -	 * FIXME: process highest slotid and target highest slotid
> -	 */
> +	p++; // ignore current highest slot value
> +
> +	dummy = be32_to_cpup(p++);
> +	if (dummy == 0) {
> +		dprintk("NFS: %s Invalid target highest slotid\n", __func__);
> +		goto out;
> +	}
> +
> +	update_cb_slot_table(session, dummy);
>  	status = 0;
>  out:
>  	cb->cb_seq_status = status;
> @@ -1208,31 +1247,38 @@ void nfsd4_change_callback(struct nfs4_client *clp, struct nfs4_cb_conn *conn)
>   * If the slot is available, then mark it busy.  Otherwise, set the
>   * thread for sleeping on the callback RPC wait queue.
>   */
> -static bool nfsd41_cb_get_slot(struct nfsd4_callback *cb, struct rpc_task *task)
> +static void nfsd41_cb_get_slot(struct nfsd4_callback *cb, struct rpc_task *task)
>  {
>  	struct nfs4_client *clp = cb->cb_clp;
> +	struct nfsd4_session *ses = clp->cl_cb_session;
> +	int idx;
>  
> -	if (!cb->cb_holds_slot &&
> -	    test_and_set_bit(0, &clp->cl_cb_slot_busy) != 0) {
> +	if (cb->cb_held_slot >= 0)
> +		return;
> +retry:
> +	spin_lock(&clp->cl_lock);
> +	idx = ffs(ses->se_cb_slot_avail) - 1;
> +	if (idx < 0 || idx > ses->se_cb_highest_slot) {
> +		spin_unlock(&clp->cl_lock);
>  		rpc_sleep_on(&clp->cl_cb_waitq, task, NULL);
> -		/* Race breaker */
> -		if (test_and_set_bit(0, &clp->cl_cb_slot_busy) != 0) {
> -			dprintk("%s slot is busy\n", __func__);
> -			return false;
> -		}
> -		rpc_wake_up_queued_task(&clp->cl_cb_waitq, task);
> +		goto retry;
>  	}
> -	cb->cb_holds_slot = true;
> -	return true;
> +	/* clear the bit for the slot */
> +	ses->se_cb_slot_avail &= ~BIT(idx);
> +	spin_unlock(&clp->cl_lock);
> +	cb->cb_held_slot = idx;
>  }
>  
>  static void nfsd41_cb_release_slot(struct nfsd4_callback *cb)
>  {
>  	struct nfs4_client *clp = cb->cb_clp;
> +	struct nfsd4_session *ses = clp->cl_cb_session;
>  
> -	if (cb->cb_holds_slot) {
> -		cb->cb_holds_slot = false;
> -		clear_bit(0, &clp->cl_cb_slot_busy);
> +	if (cb->cb_held_slot >= 0) {
> +		spin_lock(&clp->cl_lock);
> +		ses->se_cb_slot_avail |= BIT(cb->cb_held_slot);
> +		spin_unlock(&clp->cl_lock);
> +		cb->cb_held_slot = -1;
>  		rpc_wake_up_next(&clp->cl_cb_waitq);
>  	}
>  }
> @@ -1265,8 +1311,8 @@ static void nfsd4_cb_prepare(struct rpc_task *task, void *calldata)
>  	trace_nfsd_cb_rpc_prepare(clp);
>  	cb->cb_seq_status = 1;
>  	cb->cb_status = 0;
> -	if (minorversion && !nfsd41_cb_get_slot(cb, task))
> -		return;
> +	if (minorversion)
> +		nfsd41_cb_get_slot(cb, task);
>  	rpc_call_start(task);
>  }
>  
> @@ -1292,7 +1338,7 @@ static bool nfsd4_cb_sequence_done(struct rpc_task *task, struct nfsd4_callback
>  		return true;
>  	}
>  
> -	if (!cb->cb_holds_slot)
> +	if (cb->cb_held_slot < 0)
>  		goto need_restart;
>  
>  	/* This is the operation status code for CB_SEQUENCE */
> @@ -1306,10 +1352,10 @@ static bool nfsd4_cb_sequence_done(struct rpc_task *task, struct nfsd4_callback
>  		 * If CB_SEQUENCE returns an error, then the state of the slot
>  		 * (sequence ID, cached reply) MUST NOT change.
>  		 */
> -		++session->se_cb_seq_nr;
> +		++session->se_cb_seq_nr[cb->cb_held_slot];
>  		break;
>  	case -ESERVERFAULT:
> -		++session->se_cb_seq_nr;
> +		++session->se_cb_seq_nr[cb->cb_held_slot];
>  		nfsd4_mark_cb_fault(cb->cb_clp);
>  		ret = false;
>  		break;
> @@ -1335,17 +1381,16 @@ static bool nfsd4_cb_sequence_done(struct rpc_task *task, struct nfsd4_callback
>  	case -NFS4ERR_BADSLOT:
>  		goto retry_nowait;
>  	case -NFS4ERR_SEQ_MISORDERED:
> -		if (session->se_cb_seq_nr != 1) {
> -			session->se_cb_seq_nr = 1;
> +		if (session->se_cb_seq_nr[cb->cb_held_slot] != 1) {
> +			session->se_cb_seq_nr[cb->cb_held_slot] = 1;
>  			goto retry_nowait;
>  		}
>  		break;
>  	default:
>  		nfsd4_mark_cb_fault(cb->cb_clp);
>  	}
> -	nfsd41_cb_release_slot(cb);
> -
>  	trace_nfsd_cb_free_slot(task, cb);
> +	nfsd41_cb_release_slot(cb);
>  
>  	if (RPC_SIGNALLED(task))
>  		goto need_restart;
> @@ -1565,7 +1610,7 @@ void nfsd4_init_cb(struct nfsd4_callback *cb, struct nfs4_client *clp,
>  	INIT_WORK(&cb->cb_work, nfsd4_run_cb_work);
>  	cb->cb_status = 0;
>  	cb->cb_need_restart = false;
> -	cb->cb_holds_slot = false;
> +	cb->cb_held_slot = -1;
>  }
>  
>  /**
> diff --git a/fs/nfsd/nfs4state.c b/fs/nfsd/nfs4state.c
> index 5b718b349396f1aecd0ad4c63b2f43342841bbd4..20a0d40202e40eed1c84d5d6c0a85b908804a6ba 100644
> --- a/fs/nfsd/nfs4state.c
> +++ b/fs/nfsd/nfs4state.c
> @@ -2002,6 +2002,8 @@ static struct nfsd4_session *alloc_session(struct nfsd4_channel_attrs *fattrs,
>  	}
>  
>  	memcpy(&new->se_fchannel, fattrs, sizeof(struct nfsd4_channel_attrs));
> +	new->se_cb_slot_avail = ~0U;
> +	new->se_cb_highest_slot = battrs->maxreqs - 1;
>  	return new;
>  out_free:
>  	while (i--)
> @@ -2132,7 +2134,9 @@ static void init_session(struct svc_rqst *rqstp, struct nfsd4_session *new, stru
>  
>  	INIT_LIST_HEAD(&new->se_conns);
>  
> -	new->se_cb_seq_nr = 1;
> +	for (idx = 0; idx < NFSD_BC_SLOT_TABLE_MAX; ++idx)
> +		new->se_cb_seq_nr[idx] = 1;
> +
>  	new->se_flags = cses->flags;
>  	new->se_cb_prog = cses->callback_prog;
>  	new->se_cb_sec = cses->cb_sec;
> @@ -3159,7 +3163,6 @@ static struct nfs4_client *create_client(struct xdr_netobj name,
>  	kref_init(&clp->cl_nfsdfs.cl_ref);
>  	nfsd4_init_cb(&clp->cl_cb_null, clp, NULL, NFSPROC4_CLNT_CB_NULL);
>  	clp->cl_time = ktime_get_boottime_seconds();
> -	clear_bit(0, &clp->cl_cb_slot_busy);
>  	copy_verf(clp, verf);
>  	memcpy(&clp->cl_addr, sa, sizeof(struct sockaddr_storage));
>  	clp->cl_cb_session = NULL;
> diff --git a/fs/nfsd/state.h b/fs/nfsd/state.h
> index 41cda86fea1f6166a0fd0215d3d458c93ced3e6a..2987c362bdd56251e736879dc89302ada2259be8 100644
> --- a/fs/nfsd/state.h
> +++ b/fs/nfsd/state.h
> @@ -71,8 +71,8 @@ struct nfsd4_callback {
>  	struct work_struct cb_work;
>  	int cb_seq_status;
>  	int cb_status;
> +	int cb_held_slot;
>  	bool cb_need_restart;
> -	bool cb_holds_slot;
>  };
>  
>  struct nfsd4_callback_ops {
> @@ -307,6 +307,9 @@ struct nfsd4_conn {
>  	unsigned char cn_flags;
>  };
>  
> +/* Max number of slots that the server will use in the backchannel */
> +#define NFSD_BC_SLOT_TABLE_MAX	32
> +
>  /*
>   * Representation of a v4.1+ session. These are refcounted in a similar fashion
>   * to the nfs4_client. References are only taken when the server is actively
> @@ -325,7 +328,9 @@ struct nfsd4_session {
>  	struct nfsd4_cb_sec	se_cb_sec;
>  	struct list_head	se_conns;
>  	u32			se_cb_prog;
> -	u32			se_cb_seq_nr;
> +	u32			se_cb_slot_avail; /* bitmap of available slots */
> +	u32			se_cb_highest_slot;	/* highest slot client wants */
> +	u32			se_cb_seq_nr[NFSD_BC_SLOT_TABLE_MAX];
>  	struct nfsd4_slot	*se_slots[];	/* forward channel slots */
>  };
>  
> @@ -459,9 +464,6 @@ struct nfs4_client {
>  	 */
>  	struct dentry		*cl_nfsd_info_dentry;
>  
> -	/* for nfs41 callbacks */
> -	/* We currently support a single back channel with a single slot */
> -	unsigned long		cl_cb_slot_busy;
>  	struct rpc_wait_queue	cl_cb_waitq;	/* backchannel callers may */
>  						/* wait here for slots */
>  	struct net		*net;
> diff --git a/fs/nfsd/trace.h b/fs/nfsd/trace.h
> index f318898cfc31614b5a84a4867e18c2b3a07122c9..a9c17186b6892f1df8d7f7b90e250c2913ab23fe 100644
> --- a/fs/nfsd/trace.h
> +++ b/fs/nfsd/trace.h
> @@ -1697,7 +1697,7 @@ TRACE_EVENT(nfsd_cb_free_slot,
>  		__entry->cl_id = sid->clientid.cl_id;
>  		__entry->seqno = sid->sequence;
>  		__entry->reserved = sid->reserved;
> -		__entry->slot_seqno = session->se_cb_seq_nr;
> +		__entry->slot_seqno = session->se_cb_seq_nr[cb->cb_held_slot];
>  	),
>  	TP_printk(SUNRPC_TRACE_TASK_SPECIFIER
>  		" sessionid=%08x:%08x:%08x:%08x new slot seqno=%u",
> 

-- 
Jeff Layton <jlayton@kernel.org>