[PATCH 3/3] sunrpc: split cache_detail queue into request and reader lists

Jeff Layton posted 3 patches 1 month, 2 weeks ago
There is a newer version of this series
[PATCH 3/3] sunrpc: split cache_detail queue into request and reader lists
Posted by Jeff Layton 1 month, 2 weeks ago
Replace the single interleaved queue (which mixed cache_request and
cache_reader entries distinguished by a ->reader flag) with two
dedicated lists: cd->requests for upcall requests and cd->readers
for open file handles.

Readers now track their position via a monotonically increasing
sequence number (next_seqno) rather than by their position in the
shared list. Each cache_request is assigned a seqno when enqueued,
and a new cache_next_request() helper finds the next request at or
after a given seqno.

This eliminates the cache_queue wrapper struct entirely, simplifies
the reader-skipping loops in cache_read/cache_poll/cache_ioctl/
cache_release, and makes the data flow easier to reason about.

Also, remove an obsolete comment. CACHE_UPCALLING hasn't existed
since before the git era started.

Signed-off-by: Jeff Layton <jlayton@kernel.org>
---
 include/linux/sunrpc/cache.h |   4 +-
 net/sunrpc/cache.c           | 125 ++++++++++++++++++-------------------------
 2 files changed, 56 insertions(+), 73 deletions(-)

diff --git a/include/linux/sunrpc/cache.h b/include/linux/sunrpc/cache.h
index 031379efba24d40f64ce346cf1032261d4b98d05..b1e595c2615bd4be4d9ad19f71a8f4d08bd74a9b 100644
--- a/include/linux/sunrpc/cache.h
+++ b/include/linux/sunrpc/cache.h
@@ -113,9 +113,11 @@ struct cache_detail {
 	int			entries;
 
 	/* fields for communication over channel */
-	struct list_head	queue;
+	struct list_head	requests;
+	struct list_head	readers;
 	spinlock_t		queue_lock;
 	wait_queue_head_t	queue_wait;
+	u64			next_seqno;
 
 	atomic_t		writers;		/* how many time is /channel open */
 	time64_t		last_close;		/* if no writers, when did last close */
diff --git a/net/sunrpc/cache.c b/net/sunrpc/cache.c
index aef2607b3d7ffb61a42b9ea2ec17947465c026dc..09389ce8b961fe0cb5a472bcf2d3dd0b3faa13a6 100644
--- a/net/sunrpc/cache.c
+++ b/net/sunrpc/cache.c
@@ -399,9 +399,11 @@ static struct delayed_work cache_cleaner;
 void sunrpc_init_cache_detail(struct cache_detail *cd)
 {
 	spin_lock_init(&cd->hash_lock);
-	INIT_LIST_HEAD(&cd->queue);
+	INIT_LIST_HEAD(&cd->requests);
+	INIT_LIST_HEAD(&cd->readers);
 	spin_lock_init(&cd->queue_lock);
 	init_waitqueue_head(&cd->queue_wait);
+	cd->next_seqno = 0;
 	spin_lock(&cache_list_lock);
 	cd->nextcheck = 0;
 	cd->entries = 0;
@@ -796,29 +798,20 @@ void cache_clean_deferred(void *owner)
  * On read, you get a full request, or block.
  * On write, an update request is processed.
  * Poll works if anything to read, and always allows write.
- *
- * Implemented by linked list of requests.  Each open file has
- * a ->private that also exists in this list.  New requests are added
- * to the end and may wakeup and preceding readers.
- * New readers are added to the head.  If, on read, an item is found with
- * CACHE_UPCALLING clear, we free it from the list.
- *
  */
 
-struct cache_queue {
-	struct list_head	list;
-	int			reader;	/* if 0, then request */
-};
 struct cache_request {
-	struct cache_queue	q;
+	struct list_head	list;
 	struct cache_head	*item;
-	char			* buf;
+	char			*buf;
 	int			len;
 	int			readers;
+	u64			seqno;
 };
 struct cache_reader {
-	struct cache_queue	q;
+	struct list_head	list;
 	int			offset;	/* if non-0, we have a refcnt on next request */
+	u64			next_seqno;
 };
 
 static int cache_request(struct cache_detail *detail,
@@ -833,6 +826,17 @@ static int cache_request(struct cache_detail *detail,
 	return PAGE_SIZE - len;
 }
 
+static struct cache_request *
+cache_next_request(struct cache_detail *cd, u64 seqno)
+{
+	struct cache_request *rq;
+
+	list_for_each_entry(rq, &cd->requests, list)
+		if (rq->seqno >= seqno)
+			return rq;
+	return NULL;
+}
+
 static ssize_t cache_read(struct file *filp, char __user *buf, size_t count,
 			  loff_t *ppos, struct cache_detail *cd)
 {
@@ -849,20 +853,13 @@ static ssize_t cache_read(struct file *filp, char __user *buf, size_t count,
  again:
 	spin_lock(&cd->queue_lock);
 	/* need to find next request */
-	while (rp->q.list.next != &cd->queue &&
-	       list_entry(rp->q.list.next, struct cache_queue, list)
-	       ->reader) {
-		struct list_head *next = rp->q.list.next;
-		list_move(&rp->q.list, next);
-	}
-	if (rp->q.list.next == &cd->queue) {
+	rq = cache_next_request(cd, rp->next_seqno);
+	if (!rq) {
 		spin_unlock(&cd->queue_lock);
 		inode_unlock(inode);
 		WARN_ON_ONCE(rp->offset);
 		return 0;
 	}
-	rq = container_of(rp->q.list.next, struct cache_request, q.list);
-	WARN_ON_ONCE(rq->q.reader);
 	if (rp->offset == 0)
 		rq->readers++;
 	spin_unlock(&cd->queue_lock);
@@ -877,7 +874,7 @@ static ssize_t cache_read(struct file *filp, char __user *buf, size_t count,
 	if (rp->offset == 0 && !test_bit(CACHE_PENDING, &rq->item->flags)) {
 		err = -EAGAIN;
 		spin_lock(&cd->queue_lock);
-		list_move(&rp->q.list, &rq->q.list);
+		rp->next_seqno = rq->seqno + 1;
 		spin_unlock(&cd->queue_lock);
 	} else {
 		if (rp->offset + count > rq->len)
@@ -889,7 +886,7 @@ static ssize_t cache_read(struct file *filp, char __user *buf, size_t count,
 		if (rp->offset >= rq->len) {
 			rp->offset = 0;
 			spin_lock(&cd->queue_lock);
-			list_move(&rp->q.list, &rq->q.list);
+			rp->next_seqno = rq->seqno + 1;
 			spin_unlock(&cd->queue_lock);
 		}
 		err = 0;
@@ -901,7 +898,7 @@ static ssize_t cache_read(struct file *filp, char __user *buf, size_t count,
 		rq->readers--;
 		if (rq->readers == 0 &&
 		    !test_bit(CACHE_PENDING, &rq->item->flags)) {
-			list_del(&rq->q.list);
+			list_del(&rq->list);
 			spin_unlock(&cd->queue_lock);
 			cache_put(rq->item, cd);
 			kfree(rq->buf);
@@ -976,7 +973,6 @@ static __poll_t cache_poll(struct file *filp, poll_table *wait,
 {
 	__poll_t mask;
 	struct cache_reader *rp = filp->private_data;
-	struct cache_queue *cq;
 
 	poll_wait(filp, &cd->queue_wait, wait);
 
@@ -988,12 +984,8 @@ static __poll_t cache_poll(struct file *filp, poll_table *wait,
 
 	spin_lock(&cd->queue_lock);
 
-	for (cq= &rp->q; &cq->list != &cd->queue;
-	     cq = list_entry(cq->list.next, struct cache_queue, list))
-		if (!cq->reader) {
-			mask |= EPOLLIN | EPOLLRDNORM;
-			break;
-		}
+	if (cache_next_request(cd, rp->next_seqno))
+		mask |= EPOLLIN | EPOLLRDNORM;
 	spin_unlock(&cd->queue_lock);
 	return mask;
 }
@@ -1004,7 +996,7 @@ static int cache_ioctl(struct inode *ino, struct file *filp,
 {
 	int len = 0;
 	struct cache_reader *rp = filp->private_data;
-	struct cache_queue *cq;
+	struct cache_request *rq;
 
 	if (cmd != FIONREAD || !rp)
 		return -EINVAL;
@@ -1014,14 +1006,9 @@ static int cache_ioctl(struct inode *ino, struct file *filp,
 	/* only find the length remaining in current request,
 	 * or the length of the next request
 	 */
-	for (cq= &rp->q; &cq->list != &cd->queue;
-	     cq = list_entry(cq->list.next, struct cache_queue, list))
-		if (!cq->reader) {
-			struct cache_request *cr =
-				container_of(cq, struct cache_request, q);
-			len = cr->len - rp->offset;
-			break;
-		}
+	rq = cache_next_request(cd, rp->next_seqno);
+	if (rq)
+		len = rq->len - rp->offset;
 	spin_unlock(&cd->queue_lock);
 
 	return put_user(len, (int __user *)arg);
@@ -1042,10 +1029,10 @@ static int cache_open(struct inode *inode, struct file *filp,
 			return -ENOMEM;
 		}
 		rp->offset = 0;
-		rp->q.reader = 1;
+		rp->next_seqno = 0;
 
 		spin_lock(&cd->queue_lock);
-		list_add(&rp->q.list, &cd->queue);
+		list_add(&rp->list, &cd->readers);
 		spin_unlock(&cd->queue_lock);
 	}
 	if (filp->f_mode & FMODE_WRITE)
@@ -1062,17 +1049,14 @@ static int cache_release(struct inode *inode, struct file *filp,
 	if (rp) {
 		spin_lock(&cd->queue_lock);
 		if (rp->offset) {
-			struct cache_queue *cq;
-			for (cq= &rp->q; &cq->list != &cd->queue;
-			     cq = list_entry(cq->list.next, struct cache_queue, list))
-				if (!cq->reader) {
-					container_of(cq, struct cache_request, q)
-						->readers--;
-					break;
-				}
+			struct cache_request *rq;
+
+			rq = cache_next_request(cd, rp->next_seqno);
+			if (rq)
+				rq->readers--;
 			rp->offset = 0;
 		}
-		list_del(&rp->q.list);
+		list_del(&rp->list);
 		spin_unlock(&cd->queue_lock);
 
 		filp->private_data = NULL;
@@ -1091,27 +1075,24 @@ static int cache_release(struct inode *inode, struct file *filp,
 
 static void cache_dequeue(struct cache_detail *detail, struct cache_head *ch)
 {
-	struct cache_queue *cq, *tmp;
-	struct cache_request *cr;
+	struct cache_request *cr, *tmp;
 	LIST_HEAD(dequeued);
 
 	spin_lock(&detail->queue_lock);
-	list_for_each_entry_safe(cq, tmp, &detail->queue, list)
-		if (!cq->reader) {
-			cr = container_of(cq, struct cache_request, q);
-			if (cr->item != ch)
-				continue;
-			if (test_bit(CACHE_PENDING, &ch->flags))
-				/* Lost a race and it is pending again */
-				break;
-			if (cr->readers != 0)
-				continue;
-			list_move(&cr->q.list, &dequeued);
-		}
+	list_for_each_entry_safe(cr, tmp, &detail->requests, list) {
+		if (cr->item != ch)
+			continue;
+		if (test_bit(CACHE_PENDING, &ch->flags))
+			/* Lost a race and it is pending again */
+			break;
+		if (cr->readers != 0)
+			continue;
+		list_move(&cr->list, &dequeued);
+	}
 	spin_unlock(&detail->queue_lock);
 	while (!list_empty(&dequeued)) {
-		cr = list_entry(dequeued.next, struct cache_request, q.list);
-		list_del(&cr->q.list);
+		cr = list_entry(dequeued.next, struct cache_request, list);
+		list_del(&cr->list);
 		cache_put(cr->item, detail);
 		kfree(cr->buf);
 		kfree(cr);
@@ -1229,14 +1210,14 @@ static int cache_pipe_upcall(struct cache_detail *detail, struct cache_head *h)
 		return -EAGAIN;
 	}
 
-	crq->q.reader = 0;
 	crq->buf = buf;
 	crq->len = 0;
 	crq->readers = 0;
 	spin_lock(&detail->queue_lock);
 	if (test_bit(CACHE_PENDING, &h->flags)) {
 		crq->item = cache_get(h);
-		list_add_tail(&crq->q.list, &detail->queue);
+		crq->seqno = detail->next_seqno++;
+		list_add_tail(&crq->list, &detail->requests);
 		trace_cache_entry_upcall(detail, h);
 	} else
 		/* Lost a race, no longer PENDING, so don't enqueue */

-- 
2.53.0
Re: [PATCH 3/3] sunrpc: split cache_detail queue into request and reader lists
Posted by NeilBrown 1 month, 1 week ago
On Fri, 20 Feb 2026, Jeff Layton wrote:
> Replace the single interleaved queue (which mixed cache_request and
> cache_reader entries distinguished by a ->reader flag) with two
> dedicated lists: cd->requests for upcall requests and cd->readers
> for open file handles.
> 
> Readers now track their position via a monotonically increasing
> sequence number (next_seqno) rather than by their position in the
> shared list. Each cache_request is assigned a seqno when enqueued,
> and a new cache_next_request() helper finds the next request at or
> after a given seqno.
> 
> This eliminates the cache_queue wrapper struct entirely, simplifies
> the reader-skipping loops in cache_read/cache_poll/cache_ioctl/
> cache_release, and makes the data flow easier to reason about.
> 
> Also, remove an obsolete comment. CACHE_UPCALLING hasn't existed
> since before the git era started.
> 
> Signed-off-by: Jeff Layton <jlayton@kernel.org>
> ---
>  include/linux/sunrpc/cache.h |   4 +-
>  net/sunrpc/cache.c           | 125 ++++++++++++++++++-------------------------
>  2 files changed, 56 insertions(+), 73 deletions(-)
> 
> diff --git a/include/linux/sunrpc/cache.h b/include/linux/sunrpc/cache.h
> index 031379efba24d40f64ce346cf1032261d4b98d05..b1e595c2615bd4be4d9ad19f71a8f4d08bd74a9b 100644
> --- a/include/linux/sunrpc/cache.h
> +++ b/include/linux/sunrpc/cache.h
> @@ -113,9 +113,11 @@ struct cache_detail {
>  	int			entries;
>  
>  	/* fields for communication over channel */
> -	struct list_head	queue;
> +	struct list_head	requests;
> +	struct list_head	readers;
>  	spinlock_t		queue_lock;
>  	wait_queue_head_t	queue_wait;
> +	u64			next_seqno;
>  
>  	atomic_t		writers;		/* how many time is /channel open */
>  	time64_t		last_close;		/* if no writers, when did last close */
> diff --git a/net/sunrpc/cache.c b/net/sunrpc/cache.c
> index aef2607b3d7ffb61a42b9ea2ec17947465c026dc..09389ce8b961fe0cb5a472bcf2d3dd0b3faa13a6 100644
> --- a/net/sunrpc/cache.c
> +++ b/net/sunrpc/cache.c
> @@ -399,9 +399,11 @@ static struct delayed_work cache_cleaner;
>  void sunrpc_init_cache_detail(struct cache_detail *cd)
>  {
>  	spin_lock_init(&cd->hash_lock);
> -	INIT_LIST_HEAD(&cd->queue);
> +	INIT_LIST_HEAD(&cd->requests);
> +	INIT_LIST_HEAD(&cd->readers);
>  	spin_lock_init(&cd->queue_lock);
>  	init_waitqueue_head(&cd->queue_wait);
> +	cd->next_seqno = 0;
>  	spin_lock(&cache_list_lock);
>  	cd->nextcheck = 0;
>  	cd->entries = 0;
> @@ -796,29 +798,20 @@ void cache_clean_deferred(void *owner)
>   * On read, you get a full request, or block.
>   * On write, an update request is processed.
>   * Poll works if anything to read, and always allows write.
> - *
> - * Implemented by linked list of requests.  Each open file has
> - * a ->private that also exists in this list.  New requests are added
> - * to the end and may wakeup and preceding readers.
> - * New readers are added to the head.  If, on read, an item is found with
> - * CACHE_UPCALLING clear, we free it from the list.
> - *
>   */
>  
> -struct cache_queue {
> -	struct list_head	list;
> -	int			reader;	/* if 0, then request */
> -};
>  struct cache_request {
> -	struct cache_queue	q;
> +	struct list_head	list;
>  	struct cache_head	*item;
> -	char			* buf;
> +	char			*buf;
>  	int			len;
>  	int			readers;
> +	u64			seqno;
>  };
>  struct cache_reader {
> -	struct cache_queue	q;
> +	struct list_head	list;
>  	int			offset;	/* if non-0, we have a refcnt on next request */
> +	u64			next_seqno;
>  };
>  
>  static int cache_request(struct cache_detail *detail,
> @@ -833,6 +826,17 @@ static int cache_request(struct cache_detail *detail,
>  	return PAGE_SIZE - len;
>  }
>  
> +static struct cache_request *
> +cache_next_request(struct cache_detail *cd, u64 seqno)
> +{
> +	struct cache_request *rq;
> +
> +	list_for_each_entry(rq, &cd->requests, list)
> +		if (rq->seqno >= seqno)
> +			return rq;
> +	return NULL;
> +}
> +
>  static ssize_t cache_read(struct file *filp, char __user *buf, size_t count,
>  			  loff_t *ppos, struct cache_detail *cd)
>  {
> @@ -849,20 +853,13 @@ static ssize_t cache_read(struct file *filp, char __user *buf, size_t count,
>   again:
>  	spin_lock(&cd->queue_lock);
>  	/* need to find next request */
> -	while (rp->q.list.next != &cd->queue &&
> -	       list_entry(rp->q.list.next, struct cache_queue, list)
> -	       ->reader) {
> -		struct list_head *next = rp->q.list.next;
> -		list_move(&rp->q.list, next);
> -	}
> -	if (rp->q.list.next == &cd->queue) {
> +	rq = cache_next_request(cd, rp->next_seqno);
> +	if (!rq) {
>  		spin_unlock(&cd->queue_lock);
>  		inode_unlock(inode);
>  		WARN_ON_ONCE(rp->offset);
>  		return 0;
>  	}
> -	rq = container_of(rp->q.list.next, struct cache_request, q.list);
> -	WARN_ON_ONCE(rq->q.reader);
>  	if (rp->offset == 0)
>  		rq->readers++;
>  	spin_unlock(&cd->queue_lock);
> @@ -877,7 +874,7 @@ static ssize_t cache_read(struct file *filp, char __user *buf, size_t count,
>  	if (rp->offset == 0 && !test_bit(CACHE_PENDING, &rq->item->flags)) {
>  		err = -EAGAIN;
>  		spin_lock(&cd->queue_lock);
> -		list_move(&rp->q.list, &rq->q.list);
> +		rp->next_seqno = rq->seqno + 1;
>  		spin_unlock(&cd->queue_lock);

I don't think we need the spin_lock here any more.


>  	} else {
>  		if (rp->offset + count > rq->len)
> @@ -889,7 +886,7 @@ static ssize_t cache_read(struct file *filp, char __user *buf, size_t count,
>  		if (rp->offset >= rq->len) {
>  			rp->offset = 0;
>  			spin_lock(&cd->queue_lock);
> -			list_move(&rp->q.list, &rq->q.list);
> +			rp->next_seqno = rq->seqno + 1;
>  			spin_unlock(&cd->queue_lock);

Nor here.


>  		}
>  		err = 0;
> @@ -901,7 +898,7 @@ static ssize_t cache_read(struct file *filp, char __user *buf, size_t count,
>  		rq->readers--;
>  		if (rq->readers == 0 &&
>  		    !test_bit(CACHE_PENDING, &rq->item->flags)) {
> -			list_del(&rq->q.list);
> +			list_del(&rq->list);
>  			spin_unlock(&cd->queue_lock);
>  			cache_put(rq->item, cd);
>  			kfree(rq->buf);
> @@ -976,7 +973,6 @@ static __poll_t cache_poll(struct file *filp, poll_table *wait,
>  {
>  	__poll_t mask;
>  	struct cache_reader *rp = filp->private_data;
> -	struct cache_queue *cq;
>  
>  	poll_wait(filp, &cd->queue_wait, wait);
>  
> @@ -988,12 +984,8 @@ static __poll_t cache_poll(struct file *filp, poll_table *wait,
>  
>  	spin_lock(&cd->queue_lock);
>  
> -	for (cq= &rp->q; &cq->list != &cd->queue;
> -	     cq = list_entry(cq->list.next, struct cache_queue, list))
> -		if (!cq->reader) {
> -			mask |= EPOLLIN | EPOLLRDNORM;
> -			break;
> -		}
> +	if (cache_next_request(cd, rp->next_seqno))
> +		mask |= EPOLLIN | EPOLLRDNORM;
>  	spin_unlock(&cd->queue_lock);
>  	return mask;
>  }
> @@ -1004,7 +996,7 @@ static int cache_ioctl(struct inode *ino, struct file *filp,
>  {
>  	int len = 0;
>  	struct cache_reader *rp = filp->private_data;
> -	struct cache_queue *cq;
> +	struct cache_request *rq;
>  
>  	if (cmd != FIONREAD || !rp)
>  		return -EINVAL;
> @@ -1014,14 +1006,9 @@ static int cache_ioctl(struct inode *ino, struct file *filp,
>  	/* only find the length remaining in current request,
>  	 * or the length of the next request
>  	 */
> -	for (cq= &rp->q; &cq->list != &cd->queue;
> -	     cq = list_entry(cq->list.next, struct cache_queue, list))
> -		if (!cq->reader) {
> -			struct cache_request *cr =
> -				container_of(cq, struct cache_request, q);
> -			len = cr->len - rp->offset;
> -			break;
> -		}
> +	rq = cache_next_request(cd, rp->next_seqno);
> +	if (rq)
> +		len = rq->len - rp->offset;
>  	spin_unlock(&cd->queue_lock);
>  
>  	return put_user(len, (int __user *)arg);
> @@ -1042,10 +1029,10 @@ static int cache_open(struct inode *inode, struct file *filp,
>  			return -ENOMEM;
>  		}
>  		rp->offset = 0;
> -		rp->q.reader = 1;
> +		rp->next_seqno = 0;
>  
>  		spin_lock(&cd->queue_lock);
> -		list_add(&rp->q.list, &cd->queue);
> +		list_add(&rp->list, &cd->readers);
>  		spin_unlock(&cd->queue_lock);
>  	}
>  	if (filp->f_mode & FMODE_WRITE)
> @@ -1062,17 +1049,14 @@ static int cache_release(struct inode *inode, struct file *filp,
>  	if (rp) {
>  		spin_lock(&cd->queue_lock);
>  		if (rp->offset) {
> -			struct cache_queue *cq;
> -			for (cq= &rp->q; &cq->list != &cd->queue;
> -			     cq = list_entry(cq->list.next, struct cache_queue, list))
> -				if (!cq->reader) {
> -					container_of(cq, struct cache_request, q)
> -						->readers--;
> -					break;
> -				}
> +			struct cache_request *rq;
> +
> +			rq = cache_next_request(cd, rp->next_seqno);
> +			if (rq)
> +				rq->readers--;

Hmm..  The other place where we decrement ->readers we then check if it
is zero and if CACHE_PENDING is clear - and do something.
I suspect we should do that here.
This bug (if I'm right and it is a bug) if there before you patch, but
now might be a good time to fix it?

Thanks.  Nice cleanups.

NeilBrown


>  			rp->offset = 0;
>  		}
> -		list_del(&rp->q.list);
> +		list_del(&rp->list);
>  		spin_unlock(&cd->queue_lock);
>  
>  		filp->private_data = NULL;
> @@ -1091,27 +1075,24 @@ static int cache_release(struct inode *inode, struct file *filp,
>  
>  static void cache_dequeue(struct cache_detail *detail, struct cache_head *ch)
>  {
> -	struct cache_queue *cq, *tmp;
> -	struct cache_request *cr;
> +	struct cache_request *cr, *tmp;
>  	LIST_HEAD(dequeued);
>  
>  	spin_lock(&detail->queue_lock);
> -	list_for_each_entry_safe(cq, tmp, &detail->queue, list)
> -		if (!cq->reader) {
> -			cr = container_of(cq, struct cache_request, q);
> -			if (cr->item != ch)
> -				continue;
> -			if (test_bit(CACHE_PENDING, &ch->flags))
> -				/* Lost a race and it is pending again */
> -				break;
> -			if (cr->readers != 0)
> -				continue;
> -			list_move(&cr->q.list, &dequeued);
> -		}
> +	list_for_each_entry_safe(cr, tmp, &detail->requests, list) {
> +		if (cr->item != ch)
> +			continue;
> +		if (test_bit(CACHE_PENDING, &ch->flags))
> +			/* Lost a race and it is pending again */
> +			break;
> +		if (cr->readers != 0)
> +			continue;
> +		list_move(&cr->list, &dequeued);
> +	}
>  	spin_unlock(&detail->queue_lock);
>  	while (!list_empty(&dequeued)) {
> -		cr = list_entry(dequeued.next, struct cache_request, q.list);
> -		list_del(&cr->q.list);
> +		cr = list_entry(dequeued.next, struct cache_request, list);
> +		list_del(&cr->list);
>  		cache_put(cr->item, detail);
>  		kfree(cr->buf);
>  		kfree(cr);
> @@ -1229,14 +1210,14 @@ static int cache_pipe_upcall(struct cache_detail *detail, struct cache_head *h)
>  		return -EAGAIN;
>  	}
>  
> -	crq->q.reader = 0;
>  	crq->buf = buf;
>  	crq->len = 0;
>  	crq->readers = 0;
>  	spin_lock(&detail->queue_lock);
>  	if (test_bit(CACHE_PENDING, &h->flags)) {
>  		crq->item = cache_get(h);
> -		list_add_tail(&crq->q.list, &detail->queue);
> +		crq->seqno = detail->next_seqno++;
> +		list_add_tail(&crq->list, &detail->requests);
>  		trace_cache_entry_upcall(detail, h);
>  	} else
>  		/* Lost a race, no longer PENDING, so don't enqueue */
> 
> -- 
> 2.53.0
> 
> 
Re: [PATCH 3/3] sunrpc: split cache_detail queue into request and reader lists
Posted by Jeff Layton 1 month, 1 week ago
On Sun, 2026-02-22 at 09:41 +1100, NeilBrown wrote:
> On Fri, 20 Feb 2026, Jeff Layton wrote:
> > Replace the single interleaved queue (which mixed cache_request and
> > cache_reader entries distinguished by a ->reader flag) with two
> > dedicated lists: cd->requests for upcall requests and cd->readers
> > for open file handles.
> > 
> > Readers now track their position via a monotonically increasing
> > sequence number (next_seqno) rather than by their position in the
> > shared list. Each cache_request is assigned a seqno when enqueued,
> > and a new cache_next_request() helper finds the next request at or
> > after a given seqno.
> > 
> > This eliminates the cache_queue wrapper struct entirely, simplifies
> > the reader-skipping loops in cache_read/cache_poll/cache_ioctl/
> > cache_release, and makes the data flow easier to reason about.
> > 
> > Also, remove an obsolete comment. CACHE_UPCALLING hasn't existed
> > since before the git era started.
> > 
> > Signed-off-by: Jeff Layton <jlayton@kernel.org>
> > ---
> >  include/linux/sunrpc/cache.h |   4 +-
> >  net/sunrpc/cache.c           | 125 ++++++++++++++++++-------------------------
> >  2 files changed, 56 insertions(+), 73 deletions(-)
> > 
> > diff --git a/include/linux/sunrpc/cache.h b/include/linux/sunrpc/cache.h
> > index 031379efba24d40f64ce346cf1032261d4b98d05..b1e595c2615bd4be4d9ad19f71a8f4d08bd74a9b 100644
> > --- a/include/linux/sunrpc/cache.h
> > +++ b/include/linux/sunrpc/cache.h
> > @@ -113,9 +113,11 @@ struct cache_detail {
> >  	int			entries;
> >  
> >  	/* fields for communication over channel */
> > -	struct list_head	queue;
> > +	struct list_head	requests;
> > +	struct list_head	readers;
> >  	spinlock_t		queue_lock;
> >  	wait_queue_head_t	queue_wait;
> > +	u64			next_seqno;
> >  
> >  	atomic_t		writers;		/* how many time is /channel open */
> >  	time64_t		last_close;		/* if no writers, when did last close */
> > diff --git a/net/sunrpc/cache.c b/net/sunrpc/cache.c
> > index aef2607b3d7ffb61a42b9ea2ec17947465c026dc..09389ce8b961fe0cb5a472bcf2d3dd0b3faa13a6 100644
> > --- a/net/sunrpc/cache.c
> > +++ b/net/sunrpc/cache.c
> > @@ -399,9 +399,11 @@ static struct delayed_work cache_cleaner;
> >  void sunrpc_init_cache_detail(struct cache_detail *cd)
> >  {
> >  	spin_lock_init(&cd->hash_lock);
> > -	INIT_LIST_HEAD(&cd->queue);
> > +	INIT_LIST_HEAD(&cd->requests);
> > +	INIT_LIST_HEAD(&cd->readers);
> >  	spin_lock_init(&cd->queue_lock);
> >  	init_waitqueue_head(&cd->queue_wait);
> > +	cd->next_seqno = 0;
> >  	spin_lock(&cache_list_lock);
> >  	cd->nextcheck = 0;
> >  	cd->entries = 0;
> > @@ -796,29 +798,20 @@ void cache_clean_deferred(void *owner)
> >   * On read, you get a full request, or block.
> >   * On write, an update request is processed.
> >   * Poll works if anything to read, and always allows write.
> > - *
> > - * Implemented by linked list of requests.  Each open file has
> > - * a ->private that also exists in this list.  New requests are added
> > - * to the end and may wakeup and preceding readers.
> > - * New readers are added to the head.  If, on read, an item is found with
> > - * CACHE_UPCALLING clear, we free it from the list.
> > - *
> >   */
> >  
> > -struct cache_queue {
> > -	struct list_head	list;
> > -	int			reader;	/* if 0, then request */
> > -};
> >  struct cache_request {
> > -	struct cache_queue	q;
> > +	struct list_head	list;
> >  	struct cache_head	*item;
> > -	char			* buf;
> > +	char			*buf;
> >  	int			len;
> >  	int			readers;
> > +	u64			seqno;
> >  };
> >  struct cache_reader {
> > -	struct cache_queue	q;
> > +	struct list_head	list;
> >  	int			offset;	/* if non-0, we have a refcnt on next request */
> > +	u64			next_seqno;
> >  };
> >  
> >  static int cache_request(struct cache_detail *detail,
> > @@ -833,6 +826,17 @@ static int cache_request(struct cache_detail *detail,
> >  	return PAGE_SIZE - len;
> >  }
> >  
> > +static struct cache_request *
> > +cache_next_request(struct cache_detail *cd, u64 seqno)
> > +{
> > +	struct cache_request *rq;
> > +
> > +	list_for_each_entry(rq, &cd->requests, list)
> > +		if (rq->seqno >= seqno)
> > +			return rq;
> > +	return NULL;
> > +}
> > +
> >  static ssize_t cache_read(struct file *filp, char __user *buf, size_t count,
> >  			  loff_t *ppos, struct cache_detail *cd)
> >  {
> > @@ -849,20 +853,13 @@ static ssize_t cache_read(struct file *filp, char __user *buf, size_t count,
> >   again:
> >  	spin_lock(&cd->queue_lock);
> >  	/* need to find next request */
> > -	while (rp->q.list.next != &cd->queue &&
> > -	       list_entry(rp->q.list.next, struct cache_queue, list)
> > -	       ->reader) {
> > -		struct list_head *next = rp->q.list.next;
> > -		list_move(&rp->q.list, next);
> > -	}
> > -	if (rp->q.list.next == &cd->queue) {
> > +	rq = cache_next_request(cd, rp->next_seqno);
> > +	if (!rq) {
> >  		spin_unlock(&cd->queue_lock);
> >  		inode_unlock(inode);
> >  		WARN_ON_ONCE(rp->offset);
> >  		return 0;
> >  	}
> > -	rq = container_of(rp->q.list.next, struct cache_request, q.list);
> > -	WARN_ON_ONCE(rq->q.reader);
> >  	if (rp->offset == 0)
> >  		rq->readers++;
> >  	spin_unlock(&cd->queue_lock);
> > @@ -877,7 +874,7 @@ static ssize_t cache_read(struct file *filp, char __user *buf, size_t count,
> >  	if (rp->offset == 0 && !test_bit(CACHE_PENDING, &rq->item->flags)) {
> >  		err = -EAGAIN;
> >  		spin_lock(&cd->queue_lock);
> > -		list_move(&rp->q.list, &rq->q.list);
> > +		rp->next_seqno = rq->seqno + 1;
> >  		spin_unlock(&cd->queue_lock);
> 
> I don't think we need the spin_lock here any more.
> 

Good point in both places.

> 
> >  	} else {
> >  		if (rp->offset + count > rq->len)
> > @@ -889,7 +886,7 @@ static ssize_t cache_read(struct file *filp, char __user *buf, size_t count,
> >  		if (rp->offset >= rq->len) {
> >  			rp->offset = 0;
> >  			spin_lock(&cd->queue_lock);
> > -			list_move(&rp->q.list, &rq->q.list);
> > +			rp->next_seqno = rq->seqno + 1;
> >  			spin_unlock(&cd->queue_lock);
> 
> Nor here.
> 
> 
> >  		}
> >  		err = 0;
> > @@ -901,7 +898,7 @@ static ssize_t cache_read(struct file *filp, char __user *buf, size_t count,
> >  		rq->readers--;
> >  		if (rq->readers == 0 &&
> >  		    !test_bit(CACHE_PENDING, &rq->item->flags)) {
> > -			list_del(&rq->q.list);
> > +			list_del(&rq->list);
> >  			spin_unlock(&cd->queue_lock);
> >  			cache_put(rq->item, cd);
> >  			kfree(rq->buf);
> > @@ -976,7 +973,6 @@ static __poll_t cache_poll(struct file *filp, poll_table *wait,
> >  {
> >  	__poll_t mask;
> >  	struct cache_reader *rp = filp->private_data;
> > -	struct cache_queue *cq;
> >  
> >  	poll_wait(filp, &cd->queue_wait, wait);
> >  
> > @@ -988,12 +984,8 @@ static __poll_t cache_poll(struct file *filp, poll_table *wait,
> >  
> >  	spin_lock(&cd->queue_lock);
> >  
> > -	for (cq= &rp->q; &cq->list != &cd->queue;
> > -	     cq = list_entry(cq->list.next, struct cache_queue, list))
> > -		if (!cq->reader) {
> > -			mask |= EPOLLIN | EPOLLRDNORM;
> > -			break;
> > -		}
> > +	if (cache_next_request(cd, rp->next_seqno))
> > +		mask |= EPOLLIN | EPOLLRDNORM;
> >  	spin_unlock(&cd->queue_lock);
> >  	return mask;
> >  }
> > @@ -1004,7 +996,7 @@ static int cache_ioctl(struct inode *ino, struct file *filp,
> >  {
> >  	int len = 0;
> >  	struct cache_reader *rp = filp->private_data;
> > -	struct cache_queue *cq;
> > +	struct cache_request *rq;
> >  
> >  	if (cmd != FIONREAD || !rp)
> >  		return -EINVAL;
> > @@ -1014,14 +1006,9 @@ static int cache_ioctl(struct inode *ino, struct file *filp,
> >  	/* only find the length remaining in current request,
> >  	 * or the length of the next request
> >  	 */
> > -	for (cq= &rp->q; &cq->list != &cd->queue;
> > -	     cq = list_entry(cq->list.next, struct cache_queue, list))
> > -		if (!cq->reader) {
> > -			struct cache_request *cr =
> > -				container_of(cq, struct cache_request, q);
> > -			len = cr->len - rp->offset;
> > -			break;
> > -		}
> > +	rq = cache_next_request(cd, rp->next_seqno);
> > +	if (rq)
> > +		len = rq->len - rp->offset;
> >  	spin_unlock(&cd->queue_lock);
> >  
> >  	return put_user(len, (int __user *)arg);
> > @@ -1042,10 +1029,10 @@ static int cache_open(struct inode *inode, struct file *filp,
> >  			return -ENOMEM;
> >  		}
> >  		rp->offset = 0;
> > -		rp->q.reader = 1;
> > +		rp->next_seqno = 0;
> >  
> >  		spin_lock(&cd->queue_lock);
> > -		list_add(&rp->q.list, &cd->queue);
> > +		list_add(&rp->list, &cd->readers);
> >  		spin_unlock(&cd->queue_lock);
> >  	}
> >  	if (filp->f_mode & FMODE_WRITE)
> > @@ -1062,17 +1049,14 @@ static int cache_release(struct inode *inode, struct file *filp,
> >  	if (rp) {
> >  		spin_lock(&cd->queue_lock);
> >  		if (rp->offset) {
> > -			struct cache_queue *cq;
> > -			for (cq= &rp->q; &cq->list != &cd->queue;
> > -			     cq = list_entry(cq->list.next, struct cache_queue, list))
> > -				if (!cq->reader) {
> > -					container_of(cq, struct cache_request, q)
> > -						->readers--;
> > -					break;
> > -				}
> > +			struct cache_request *rq;
> > +
> > +			rq = cache_next_request(cd, rp->next_seqno);
> > +			if (rq)
> > +				rq->readers--;
> 
> Hmm..  The other place where we decrement ->readers we then check if it
> is zero and if CACHE_PENDING is clear - and do something.
> I suspect we should do that here.
> This bug (if I'm right and it is a bug) if there before you patch, but
> now might be a good time to fix it?
> 

Good catch. I'll add a patch to fix that preceding these patches, so it
can go to stable (if we think that's worthwhile).
 
> Thanks.  Nice cleanups.
> 


Thanks for the review! I'll send a v2 with the spinlock fixes and the
above bugfix.



> NeilBrown
> 
> 
> >  			rp->offset = 0;
> >  		}
> > -		list_del(&rp->q.list);
> > +		list_del(&rp->list);
> >  		spin_unlock(&cd->queue_lock);
> >  
> >  		filp->private_data = NULL;
> > @@ -1091,27 +1075,24 @@ static int cache_release(struct inode *inode, struct file *filp,
> >  
> >  static void cache_dequeue(struct cache_detail *detail, struct cache_head *ch)
> >  {
> > -	struct cache_queue *cq, *tmp;
> > -	struct cache_request *cr;
> > +	struct cache_request *cr, *tmp;
> >  	LIST_HEAD(dequeued);
> >  
> >  	spin_lock(&detail->queue_lock);
> > -	list_for_each_entry_safe(cq, tmp, &detail->queue, list)
> > -		if (!cq->reader) {
> > -			cr = container_of(cq, struct cache_request, q);
> > -			if (cr->item != ch)
> > -				continue;
> > -			if (test_bit(CACHE_PENDING, &ch->flags))
> > -				/* Lost a race and it is pending again */
> > -				break;
> > -			if (cr->readers != 0)
> > -				continue;
> > -			list_move(&cr->q.list, &dequeued);
> > -		}
> > +	list_for_each_entry_safe(cr, tmp, &detail->requests, list) {
> > +		if (cr->item != ch)
> > +			continue;
> > +		if (test_bit(CACHE_PENDING, &ch->flags))
> > +			/* Lost a race and it is pending again */
> > +			break;
> > +		if (cr->readers != 0)
> > +			continue;
> > +		list_move(&cr->list, &dequeued);
> > +	}
> >  	spin_unlock(&detail->queue_lock);
> >  	while (!list_empty(&dequeued)) {
> > -		cr = list_entry(dequeued.next, struct cache_request, q.list);
> > -		list_del(&cr->q.list);
> > +		cr = list_entry(dequeued.next, struct cache_request, list);
> > +		list_del(&cr->list);
> >  		cache_put(cr->item, detail);
> >  		kfree(cr->buf);
> >  		kfree(cr);
> > @@ -1229,14 +1210,14 @@ static int cache_pipe_upcall(struct cache_detail *detail, struct cache_head *h)
> >  		return -EAGAIN;
> >  	}
> >  
> > -	crq->q.reader = 0;
> >  	crq->buf = buf;
> >  	crq->len = 0;
> >  	crq->readers = 0;
> >  	spin_lock(&detail->queue_lock);
> >  	if (test_bit(CACHE_PENDING, &h->flags)) {
> >  		crq->item = cache_get(h);
> > -		list_add_tail(&crq->q.list, &detail->queue);
> > +		crq->seqno = detail->next_seqno++;
> > +		list_add_tail(&crq->list, &detail->requests);
> >  		trace_cache_entry_upcall(detail, h);
> >  	} else
> >  		/* Lost a race, no longer PENDING, so don't enqueue */
> > 
> > -- 
> > 2.53.0
> > 
> > 

-- 
Jeff Layton <jlayton@kernel.org>