[PATCH v2 4/4] sunrpc: split cache_detail queue into request and reader lists

Jeff Layton posted 4 patches 1 month, 1 week ago
[PATCH v2 4/4] sunrpc: split cache_detail queue into request and reader lists
Posted by Jeff Layton 1 month, 1 week ago
Replace the single interleaved queue (which mixed cache_request and
cache_reader entries distinguished by a ->reader flag) with two
dedicated lists: cd->requests for upcall requests and cd->readers
for open file handles.

Readers now track their position via a monotonically increasing
sequence number (next_seqno) rather than by their position in the
shared list. Each cache_request is assigned a seqno when enqueued,
and a new cache_next_request() helper finds the next request at or
after a given seqno.

This eliminates the cache_queue wrapper struct entirely, simplifies
the reader-skipping loops in cache_read/cache_poll/cache_ioctl/
cache_release, and makes the data flow easier to reason about.

Signed-off-by: Jeff Layton <jlayton@kernel.org>
---
 include/linux/sunrpc/cache.h |   4 +-
 net/sunrpc/cache.c           | 143 ++++++++++++++++++-------------------------
 2 files changed, 62 insertions(+), 85 deletions(-)

diff --git a/include/linux/sunrpc/cache.h b/include/linux/sunrpc/cache.h
index 031379efba24d40f64ce346cf1032261d4b98d05..b1e595c2615bd4be4d9ad19f71a8f4d08bd74a9b 100644
--- a/include/linux/sunrpc/cache.h
+++ b/include/linux/sunrpc/cache.h
@@ -113,9 +113,11 @@ struct cache_detail {
 	int			entries;
 
 	/* fields for communication over channel */
-	struct list_head	queue;
+	struct list_head	requests;
+	struct list_head	readers;
 	spinlock_t		queue_lock;
 	wait_queue_head_t	queue_wait;
+	u64			next_seqno;
 
 	atomic_t		writers;		/* how many time is /channel open */
 	time64_t		last_close;		/* if no writers, when did last close */
diff --git a/net/sunrpc/cache.c b/net/sunrpc/cache.c
index fd02dca1f07afec2f09c591037bac3ea3e8d7e17..7081c1214e6c3226f8ac82c8bc7ff6c36f598744 100644
--- a/net/sunrpc/cache.c
+++ b/net/sunrpc/cache.c
@@ -399,9 +399,11 @@ static struct delayed_work cache_cleaner;
 void sunrpc_init_cache_detail(struct cache_detail *cd)
 {
 	spin_lock_init(&cd->hash_lock);
-	INIT_LIST_HEAD(&cd->queue);
+	INIT_LIST_HEAD(&cd->requests);
+	INIT_LIST_HEAD(&cd->readers);
 	spin_lock_init(&cd->queue_lock);
 	init_waitqueue_head(&cd->queue_wait);
+	cd->next_seqno = 0;
 	spin_lock(&cache_list_lock);
 	cd->nextcheck = 0;
 	cd->entries = 0;
@@ -796,29 +798,20 @@ void cache_clean_deferred(void *owner)
  * On read, you get a full request, or block.
  * On write, an update request is processed.
  * Poll works if anything to read, and always allows write.
- *
- * Implemented by linked list of requests.  Each open file has
- * a ->private that also exists in this list.  New requests are added
- * to the end and may wakeup and preceding readers.
- * New readers are added to the head.  If, on read, an item is found with
- * CACHE_UPCALLING clear, we free it from the list.
- *
  */
 
-struct cache_queue {
-	struct list_head	list;
-	int			reader;	/* if 0, then request */
-};
 struct cache_request {
-	struct cache_queue	q;
+	struct list_head	list;
 	struct cache_head	*item;
-	char			* buf;
+	char			*buf;
 	int			len;
 	int			readers;
+	u64			seqno;
 };
 struct cache_reader {
-	struct cache_queue	q;
+	struct list_head	list;
 	int			offset;	/* if non-0, we have a refcnt on next request */
+	u64			next_seqno;
 };
 
 static int cache_request(struct cache_detail *detail,
@@ -833,6 +826,17 @@ static int cache_request(struct cache_detail *detail,
 	return PAGE_SIZE - len;
 }
 
+static struct cache_request *
+cache_next_request(struct cache_detail *cd, u64 seqno)
+{
+	struct cache_request *rq;
+
+	list_for_each_entry(rq, &cd->requests, list)
+		if (rq->seqno >= seqno)
+			return rq;
+	return NULL;
+}
+
 static ssize_t cache_read(struct file *filp, char __user *buf, size_t count,
 			  loff_t *ppos, struct cache_detail *cd)
 {
@@ -849,20 +853,13 @@ static ssize_t cache_read(struct file *filp, char __user *buf, size_t count,
  again:
 	spin_lock(&cd->queue_lock);
 	/* need to find next request */
-	while (rp->q.list.next != &cd->queue &&
-	       list_entry(rp->q.list.next, struct cache_queue, list)
-	       ->reader) {
-		struct list_head *next = rp->q.list.next;
-		list_move(&rp->q.list, next);
-	}
-	if (rp->q.list.next == &cd->queue) {
+	rq = cache_next_request(cd, rp->next_seqno);
+	if (!rq) {
 		spin_unlock(&cd->queue_lock);
 		inode_unlock(inode);
 		WARN_ON_ONCE(rp->offset);
 		return 0;
 	}
-	rq = container_of(rp->q.list.next, struct cache_request, q.list);
-	WARN_ON_ONCE(rq->q.reader);
 	if (rp->offset == 0)
 		rq->readers++;
 	spin_unlock(&cd->queue_lock);
@@ -876,9 +873,7 @@ static ssize_t cache_read(struct file *filp, char __user *buf, size_t count,
 
 	if (rp->offset == 0 && !test_bit(CACHE_PENDING, &rq->item->flags)) {
 		err = -EAGAIN;
-		spin_lock(&cd->queue_lock);
-		list_move(&rp->q.list, &rq->q.list);
-		spin_unlock(&cd->queue_lock);
+		rp->next_seqno = rq->seqno + 1;
 	} else {
 		if (rp->offset + count > rq->len)
 			count = rq->len - rp->offset;
@@ -888,9 +883,7 @@ static ssize_t cache_read(struct file *filp, char __user *buf, size_t count,
 		rp->offset += count;
 		if (rp->offset >= rq->len) {
 			rp->offset = 0;
-			spin_lock(&cd->queue_lock);
-			list_move(&rp->q.list, &rq->q.list);
-			spin_unlock(&cd->queue_lock);
+			rp->next_seqno = rq->seqno + 1;
 		}
 		err = 0;
 	}
@@ -901,7 +894,7 @@ static ssize_t cache_read(struct file *filp, char __user *buf, size_t count,
 		rq->readers--;
 		if (rq->readers == 0 &&
 		    !test_bit(CACHE_PENDING, &rq->item->flags)) {
-			list_del(&rq->q.list);
+			list_del(&rq->list);
 			spin_unlock(&cd->queue_lock);
 			cache_put(rq->item, cd);
 			kfree(rq->buf);
@@ -976,7 +969,6 @@ static __poll_t cache_poll(struct file *filp, poll_table *wait,
 {
 	__poll_t mask;
 	struct cache_reader *rp = filp->private_data;
-	struct cache_queue *cq;
 
 	poll_wait(filp, &cd->queue_wait, wait);
 
@@ -988,12 +980,8 @@ static __poll_t cache_poll(struct file *filp, poll_table *wait,
 
 	spin_lock(&cd->queue_lock);
 
-	for (cq= &rp->q; &cq->list != &cd->queue;
-	     cq = list_entry(cq->list.next, struct cache_queue, list))
-		if (!cq->reader) {
-			mask |= EPOLLIN | EPOLLRDNORM;
-			break;
-		}
+	if (cache_next_request(cd, rp->next_seqno))
+		mask |= EPOLLIN | EPOLLRDNORM;
 	spin_unlock(&cd->queue_lock);
 	return mask;
 }
@@ -1004,7 +992,7 @@ static int cache_ioctl(struct inode *ino, struct file *filp,
 {
 	int len = 0;
 	struct cache_reader *rp = filp->private_data;
-	struct cache_queue *cq;
+	struct cache_request *rq;
 
 	if (cmd != FIONREAD || !rp)
 		return -EINVAL;
@@ -1014,14 +1002,9 @@ static int cache_ioctl(struct inode *ino, struct file *filp,
 	/* only find the length remaining in current request,
 	 * or the length of the next request
 	 */
-	for (cq= &rp->q; &cq->list != &cd->queue;
-	     cq = list_entry(cq->list.next, struct cache_queue, list))
-		if (!cq->reader) {
-			struct cache_request *cr =
-				container_of(cq, struct cache_request, q);
-			len = cr->len - rp->offset;
-			break;
-		}
+	rq = cache_next_request(cd, rp->next_seqno);
+	if (rq)
+		len = rq->len - rp->offset;
 	spin_unlock(&cd->queue_lock);
 
 	return put_user(len, (int __user *)arg);
@@ -1042,10 +1025,10 @@ static int cache_open(struct inode *inode, struct file *filp,
 			return -ENOMEM;
 		}
 		rp->offset = 0;
-		rp->q.reader = 1;
+		rp->next_seqno = 0;
 
 		spin_lock(&cd->queue_lock);
-		list_add(&rp->q.list, &cd->queue);
+		list_add(&rp->list, &cd->readers);
 		spin_unlock(&cd->queue_lock);
 	}
 	if (filp->f_mode & FMODE_WRITE)
@@ -1064,26 +1047,21 @@ static int cache_release(struct inode *inode, struct file *filp,
 
 		spin_lock(&cd->queue_lock);
 		if (rp->offset) {
-			struct cache_queue *cq;
-			for (cq = &rp->q; &cq->list != &cd->queue;
-			     cq = list_entry(cq->list.next,
-					     struct cache_queue, list))
-				if (!cq->reader) {
-					struct cache_request *cr =
-						container_of(cq,
-						struct cache_request, q);
-					cr->readers--;
-					if (cr->readers == 0 &&
-					    !test_bit(CACHE_PENDING,
-						      &cr->item->flags)) {
-						list_del(&cr->q.list);
-						rq = cr;
-					}
-					break;
+			struct cache_request *cr;
+
+			cr = cache_next_request(cd, rp->next_seqno);
+			if (cr) {
+				cr->readers--;
+				if (cr->readers == 0 &&
+				    !test_bit(CACHE_PENDING,
+					      &cr->item->flags)) {
+					list_del(&cr->list);
+					rq = cr;
 				}
+			}
 			rp->offset = 0;
 		}
-		list_del(&rp->q.list);
+		list_del(&rp->list);
 		spin_unlock(&cd->queue_lock);
 
 		if (rq) {
@@ -1107,27 +1085,24 @@ static int cache_release(struct inode *inode, struct file *filp,
 
 static void cache_dequeue(struct cache_detail *detail, struct cache_head *ch)
 {
-	struct cache_queue *cq, *tmp;
-	struct cache_request *cr;
+	struct cache_request *cr, *tmp;
 	LIST_HEAD(dequeued);
 
 	spin_lock(&detail->queue_lock);
-	list_for_each_entry_safe(cq, tmp, &detail->queue, list)
-		if (!cq->reader) {
-			cr = container_of(cq, struct cache_request, q);
-			if (cr->item != ch)
-				continue;
-			if (test_bit(CACHE_PENDING, &ch->flags))
-				/* Lost a race and it is pending again */
-				break;
-			if (cr->readers != 0)
-				continue;
-			list_move(&cr->q.list, &dequeued);
-		}
+	list_for_each_entry_safe(cr, tmp, &detail->requests, list) {
+		if (cr->item != ch)
+			continue;
+		if (test_bit(CACHE_PENDING, &ch->flags))
+			/* Lost a race and it is pending again */
+			break;
+		if (cr->readers != 0)
+			continue;
+		list_move(&cr->list, &dequeued);
+	}
 	spin_unlock(&detail->queue_lock);
 	while (!list_empty(&dequeued)) {
-		cr = list_entry(dequeued.next, struct cache_request, q.list);
-		list_del(&cr->q.list);
+		cr = list_entry(dequeued.next, struct cache_request, list);
+		list_del(&cr->list);
 		cache_put(cr->item, detail);
 		kfree(cr->buf);
 		kfree(cr);
@@ -1245,14 +1220,14 @@ static int cache_pipe_upcall(struct cache_detail *detail, struct cache_head *h)
 		return -EAGAIN;
 	}
 
-	crq->q.reader = 0;
 	crq->buf = buf;
 	crq->len = 0;
 	crq->readers = 0;
 	spin_lock(&detail->queue_lock);
 	if (test_bit(CACHE_PENDING, &h->flags)) {
 		crq->item = cache_get(h);
-		list_add_tail(&crq->q.list, &detail->queue);
+		crq->seqno = detail->next_seqno++;
+		list_add_tail(&crq->list, &detail->requests);
 		trace_cache_entry_upcall(detail, h);
 	} else
 		/* Lost a race, no longer PENDING, so don't enqueue */

-- 
2.53.0