Replace the single interleaved queue (which mixed cache_request and
cache_reader entries distinguished by a ->reader flag) with two
dedicated lists: cd->requests for upcall requests and cd->readers
for open file handles.
Readers now track their position via a monotonically increasing
sequence number (next_seqno) rather than by their position in the
shared list. Each cache_request is assigned a seqno when enqueued,
and a new cache_next_request() helper finds the next request at or
after a given seqno.
This eliminates the cache_queue wrapper struct entirely, simplifies
the reader-skipping loops in cache_read/cache_poll/cache_ioctl/
cache_release, and makes the data flow easier to reason about.
Also, remove an obsolete comment. CACHE_UPCALLING hasn't existed
since before the git era started.
Signed-off-by: Jeff Layton <jlayton@kernel.org>
---
include/linux/sunrpc/cache.h | 4 +-
net/sunrpc/cache.c | 125 ++++++++++++++++++-------------------------
2 files changed, 56 insertions(+), 73 deletions(-)
diff --git a/include/linux/sunrpc/cache.h b/include/linux/sunrpc/cache.h
index 031379efba24d40f64ce346cf1032261d4b98d05..b1e595c2615bd4be4d9ad19f71a8f4d08bd74a9b 100644
--- a/include/linux/sunrpc/cache.h
+++ b/include/linux/sunrpc/cache.h
@@ -113,9 +113,11 @@ struct cache_detail {
int entries;
/* fields for communication over channel */
- struct list_head queue;
+ struct list_head requests;
+ struct list_head readers;
spinlock_t queue_lock;
wait_queue_head_t queue_wait;
+ u64 next_seqno;
atomic_t writers; /* how many time is /channel open */
time64_t last_close; /* if no writers, when did last close */
diff --git a/net/sunrpc/cache.c b/net/sunrpc/cache.c
index aef2607b3d7ffb61a42b9ea2ec17947465c026dc..09389ce8b961fe0cb5a472bcf2d3dd0b3faa13a6 100644
--- a/net/sunrpc/cache.c
+++ b/net/sunrpc/cache.c
@@ -399,9 +399,11 @@ static struct delayed_work cache_cleaner;
void sunrpc_init_cache_detail(struct cache_detail *cd)
{
spin_lock_init(&cd->hash_lock);
- INIT_LIST_HEAD(&cd->queue);
+ INIT_LIST_HEAD(&cd->requests);
+ INIT_LIST_HEAD(&cd->readers);
spin_lock_init(&cd->queue_lock);
init_waitqueue_head(&cd->queue_wait);
+ cd->next_seqno = 0;
spin_lock(&cache_list_lock);
cd->nextcheck = 0;
cd->entries = 0;
@@ -796,29 +798,20 @@ void cache_clean_deferred(void *owner)
* On read, you get a full request, or block.
* On write, an update request is processed.
* Poll works if anything to read, and always allows write.
- *
- * Implemented by linked list of requests. Each open file has
- * a ->private that also exists in this list. New requests are added
- * to the end and may wakeup and preceding readers.
- * New readers are added to the head. If, on read, an item is found with
- * CACHE_UPCALLING clear, we free it from the list.
- *
*/
-struct cache_queue {
- struct list_head list;
- int reader; /* if 0, then request */
-};
struct cache_request {
- struct cache_queue q;
+ struct list_head list;
struct cache_head *item;
- char * buf;
+ char *buf;
int len;
int readers;
+ u64 seqno;
};
struct cache_reader {
- struct cache_queue q;
+ struct list_head list;
int offset; /* if non-0, we have a refcnt on next request */
+ u64 next_seqno;
};
static int cache_request(struct cache_detail *detail,
@@ -833,6 +826,17 @@ static int cache_request(struct cache_detail *detail,
return PAGE_SIZE - len;
}
+static struct cache_request *
+cache_next_request(struct cache_detail *cd, u64 seqno)
+{
+ struct cache_request *rq;
+
+ list_for_each_entry(rq, &cd->requests, list)
+ if (rq->seqno >= seqno)
+ return rq;
+ return NULL;
+}
+
static ssize_t cache_read(struct file *filp, char __user *buf, size_t count,
loff_t *ppos, struct cache_detail *cd)
{
@@ -849,20 +853,13 @@ static ssize_t cache_read(struct file *filp, char __user *buf, size_t count,
again:
spin_lock(&cd->queue_lock);
/* need to find next request */
- while (rp->q.list.next != &cd->queue &&
- list_entry(rp->q.list.next, struct cache_queue, list)
- ->reader) {
- struct list_head *next = rp->q.list.next;
- list_move(&rp->q.list, next);
- }
- if (rp->q.list.next == &cd->queue) {
+ rq = cache_next_request(cd, rp->next_seqno);
+ if (!rq) {
spin_unlock(&cd->queue_lock);
inode_unlock(inode);
WARN_ON_ONCE(rp->offset);
return 0;
}
- rq = container_of(rp->q.list.next, struct cache_request, q.list);
- WARN_ON_ONCE(rq->q.reader);
if (rp->offset == 0)
rq->readers++;
spin_unlock(&cd->queue_lock);
@@ -877,7 +874,7 @@ static ssize_t cache_read(struct file *filp, char __user *buf, size_t count,
if (rp->offset == 0 && !test_bit(CACHE_PENDING, &rq->item->flags)) {
err = -EAGAIN;
spin_lock(&cd->queue_lock);
- list_move(&rp->q.list, &rq->q.list);
+ rp->next_seqno = rq->seqno + 1;
spin_unlock(&cd->queue_lock);
} else {
if (rp->offset + count > rq->len)
@@ -889,7 +886,7 @@ static ssize_t cache_read(struct file *filp, char __user *buf, size_t count,
if (rp->offset >= rq->len) {
rp->offset = 0;
spin_lock(&cd->queue_lock);
- list_move(&rp->q.list, &rq->q.list);
+ rp->next_seqno = rq->seqno + 1;
spin_unlock(&cd->queue_lock);
}
err = 0;
@@ -901,7 +898,7 @@ static ssize_t cache_read(struct file *filp, char __user *buf, size_t count,
rq->readers--;
if (rq->readers == 0 &&
!test_bit(CACHE_PENDING, &rq->item->flags)) {
- list_del(&rq->q.list);
+ list_del(&rq->list);
spin_unlock(&cd->queue_lock);
cache_put(rq->item, cd);
kfree(rq->buf);
@@ -976,7 +973,6 @@ static __poll_t cache_poll(struct file *filp, poll_table *wait,
{
__poll_t mask;
struct cache_reader *rp = filp->private_data;
- struct cache_queue *cq;
poll_wait(filp, &cd->queue_wait, wait);
@@ -988,12 +984,8 @@ static __poll_t cache_poll(struct file *filp, poll_table *wait,
spin_lock(&cd->queue_lock);
- for (cq= &rp->q; &cq->list != &cd->queue;
- cq = list_entry(cq->list.next, struct cache_queue, list))
- if (!cq->reader) {
- mask |= EPOLLIN | EPOLLRDNORM;
- break;
- }
+ if (cache_next_request(cd, rp->next_seqno))
+ mask |= EPOLLIN | EPOLLRDNORM;
spin_unlock(&cd->queue_lock);
return mask;
}
@@ -1004,7 +996,7 @@ static int cache_ioctl(struct inode *ino, struct file *filp,
{
int len = 0;
struct cache_reader *rp = filp->private_data;
- struct cache_queue *cq;
+ struct cache_request *rq;
if (cmd != FIONREAD || !rp)
return -EINVAL;
@@ -1014,14 +1006,9 @@ static int cache_ioctl(struct inode *ino, struct file *filp,
/* only find the length remaining in current request,
* or the length of the next request
*/
- for (cq= &rp->q; &cq->list != &cd->queue;
- cq = list_entry(cq->list.next, struct cache_queue, list))
- if (!cq->reader) {
- struct cache_request *cr =
- container_of(cq, struct cache_request, q);
- len = cr->len - rp->offset;
- break;
- }
+ rq = cache_next_request(cd, rp->next_seqno);
+ if (rq)
+ len = rq->len - rp->offset;
spin_unlock(&cd->queue_lock);
return put_user(len, (int __user *)arg);
@@ -1042,10 +1029,10 @@ static int cache_open(struct inode *inode, struct file *filp,
return -ENOMEM;
}
rp->offset = 0;
- rp->q.reader = 1;
+ rp->next_seqno = 0;
spin_lock(&cd->queue_lock);
- list_add(&rp->q.list, &cd->queue);
+ list_add(&rp->list, &cd->readers);
spin_unlock(&cd->queue_lock);
}
if (filp->f_mode & FMODE_WRITE)
@@ -1062,17 +1049,14 @@ static int cache_release(struct inode *inode, struct file *filp,
if (rp) {
spin_lock(&cd->queue_lock);
if (rp->offset) {
- struct cache_queue *cq;
- for (cq= &rp->q; &cq->list != &cd->queue;
- cq = list_entry(cq->list.next, struct cache_queue, list))
- if (!cq->reader) {
- container_of(cq, struct cache_request, q)
- ->readers--;
- break;
- }
+ struct cache_request *rq;
+
+ rq = cache_next_request(cd, rp->next_seqno);
+ if (rq)
+ rq->readers--;
rp->offset = 0;
}
- list_del(&rp->q.list);
+ list_del(&rp->list);
spin_unlock(&cd->queue_lock);
filp->private_data = NULL;
@@ -1091,27 +1075,24 @@ static int cache_release(struct inode *inode, struct file *filp,
static void cache_dequeue(struct cache_detail *detail, struct cache_head *ch)
{
- struct cache_queue *cq, *tmp;
- struct cache_request *cr;
+ struct cache_request *cr, *tmp;
LIST_HEAD(dequeued);
spin_lock(&detail->queue_lock);
- list_for_each_entry_safe(cq, tmp, &detail->queue, list)
- if (!cq->reader) {
- cr = container_of(cq, struct cache_request, q);
- if (cr->item != ch)
- continue;
- if (test_bit(CACHE_PENDING, &ch->flags))
- /* Lost a race and it is pending again */
- break;
- if (cr->readers != 0)
- continue;
- list_move(&cr->q.list, &dequeued);
- }
+ list_for_each_entry_safe(cr, tmp, &detail->requests, list) {
+ if (cr->item != ch)
+ continue;
+ if (test_bit(CACHE_PENDING, &ch->flags))
+ /* Lost a race and it is pending again */
+ break;
+ if (cr->readers != 0)
+ continue;
+ list_move(&cr->list, &dequeued);
+ }
spin_unlock(&detail->queue_lock);
while (!list_empty(&dequeued)) {
- cr = list_entry(dequeued.next, struct cache_request, q.list);
- list_del(&cr->q.list);
+ cr = list_entry(dequeued.next, struct cache_request, list);
+ list_del(&cr->list);
cache_put(cr->item, detail);
kfree(cr->buf);
kfree(cr);
@@ -1229,14 +1210,14 @@ static int cache_pipe_upcall(struct cache_detail *detail, struct cache_head *h)
return -EAGAIN;
}
- crq->q.reader = 0;
crq->buf = buf;
crq->len = 0;
crq->readers = 0;
spin_lock(&detail->queue_lock);
if (test_bit(CACHE_PENDING, &h->flags)) {
crq->item = cache_get(h);
- list_add_tail(&crq->q.list, &detail->queue);
+ crq->seqno = detail->next_seqno++;
+ list_add_tail(&crq->list, &detail->requests);
trace_cache_entry_upcall(detail, h);
} else
/* Lost a race, no longer PENDING, so don't enqueue */
--
2.53.0
On Fri, 20 Feb 2026, Jeff Layton wrote:
> Replace the single interleaved queue (which mixed cache_request and
> cache_reader entries distinguished by a ->reader flag) with two
> dedicated lists: cd->requests for upcall requests and cd->readers
> for open file handles.
>
> Readers now track their position via a monotonically increasing
> sequence number (next_seqno) rather than by their position in the
> shared list. Each cache_request is assigned a seqno when enqueued,
> and a new cache_next_request() helper finds the next request at or
> after a given seqno.
>
> This eliminates the cache_queue wrapper struct entirely, simplifies
> the reader-skipping loops in cache_read/cache_poll/cache_ioctl/
> cache_release, and makes the data flow easier to reason about.
>
> Also, remove an obsolete comment. CACHE_UPCALLING hasn't existed
> since before the git era started.
>
> Signed-off-by: Jeff Layton <jlayton@kernel.org>
> ---
> include/linux/sunrpc/cache.h | 4 +-
> net/sunrpc/cache.c | 125 ++++++++++++++++++-------------------------
> 2 files changed, 56 insertions(+), 73 deletions(-)
>
> diff --git a/include/linux/sunrpc/cache.h b/include/linux/sunrpc/cache.h
> index 031379efba24d40f64ce346cf1032261d4b98d05..b1e595c2615bd4be4d9ad19f71a8f4d08bd74a9b 100644
> --- a/include/linux/sunrpc/cache.h
> +++ b/include/linux/sunrpc/cache.h
> @@ -113,9 +113,11 @@ struct cache_detail {
> int entries;
>
> /* fields for communication over channel */
> - struct list_head queue;
> + struct list_head requests;
> + struct list_head readers;
> spinlock_t queue_lock;
> wait_queue_head_t queue_wait;
> + u64 next_seqno;
>
> atomic_t writers; /* how many time is /channel open */
> time64_t last_close; /* if no writers, when did last close */
> diff --git a/net/sunrpc/cache.c b/net/sunrpc/cache.c
> index aef2607b3d7ffb61a42b9ea2ec17947465c026dc..09389ce8b961fe0cb5a472bcf2d3dd0b3faa13a6 100644
> --- a/net/sunrpc/cache.c
> +++ b/net/sunrpc/cache.c
> @@ -399,9 +399,11 @@ static struct delayed_work cache_cleaner;
> void sunrpc_init_cache_detail(struct cache_detail *cd)
> {
> spin_lock_init(&cd->hash_lock);
> - INIT_LIST_HEAD(&cd->queue);
> + INIT_LIST_HEAD(&cd->requests);
> + INIT_LIST_HEAD(&cd->readers);
> spin_lock_init(&cd->queue_lock);
> init_waitqueue_head(&cd->queue_wait);
> + cd->next_seqno = 0;
> spin_lock(&cache_list_lock);
> cd->nextcheck = 0;
> cd->entries = 0;
> @@ -796,29 +798,20 @@ void cache_clean_deferred(void *owner)
> * On read, you get a full request, or block.
> * On write, an update request is processed.
> * Poll works if anything to read, and always allows write.
> - *
> - * Implemented by linked list of requests. Each open file has
> - * a ->private that also exists in this list. New requests are added
> - * to the end and may wakeup and preceding readers.
> - * New readers are added to the head. If, on read, an item is found with
> - * CACHE_UPCALLING clear, we free it from the list.
> - *
> */
>
> -struct cache_queue {
> - struct list_head list;
> - int reader; /* if 0, then request */
> -};
> struct cache_request {
> - struct cache_queue q;
> + struct list_head list;
> struct cache_head *item;
> - char * buf;
> + char *buf;
> int len;
> int readers;
> + u64 seqno;
> };
> struct cache_reader {
> - struct cache_queue q;
> + struct list_head list;
> int offset; /* if non-0, we have a refcnt on next request */
> + u64 next_seqno;
> };
>
> static int cache_request(struct cache_detail *detail,
> @@ -833,6 +826,17 @@ static int cache_request(struct cache_detail *detail,
> return PAGE_SIZE - len;
> }
>
> +static struct cache_request *
> +cache_next_request(struct cache_detail *cd, u64 seqno)
> +{
> + struct cache_request *rq;
> +
> + list_for_each_entry(rq, &cd->requests, list)
> + if (rq->seqno >= seqno)
> + return rq;
> + return NULL;
> +}
> +
> static ssize_t cache_read(struct file *filp, char __user *buf, size_t count,
> loff_t *ppos, struct cache_detail *cd)
> {
> @@ -849,20 +853,13 @@ static ssize_t cache_read(struct file *filp, char __user *buf, size_t count,
> again:
> spin_lock(&cd->queue_lock);
> /* need to find next request */
> - while (rp->q.list.next != &cd->queue &&
> - list_entry(rp->q.list.next, struct cache_queue, list)
> - ->reader) {
> - struct list_head *next = rp->q.list.next;
> - list_move(&rp->q.list, next);
> - }
> - if (rp->q.list.next == &cd->queue) {
> + rq = cache_next_request(cd, rp->next_seqno);
> + if (!rq) {
> spin_unlock(&cd->queue_lock);
> inode_unlock(inode);
> WARN_ON_ONCE(rp->offset);
> return 0;
> }
> - rq = container_of(rp->q.list.next, struct cache_request, q.list);
> - WARN_ON_ONCE(rq->q.reader);
> if (rp->offset == 0)
> rq->readers++;
> spin_unlock(&cd->queue_lock);
> @@ -877,7 +874,7 @@ static ssize_t cache_read(struct file *filp, char __user *buf, size_t count,
> if (rp->offset == 0 && !test_bit(CACHE_PENDING, &rq->item->flags)) {
> err = -EAGAIN;
> spin_lock(&cd->queue_lock);
> - list_move(&rp->q.list, &rq->q.list);
> + rp->next_seqno = rq->seqno + 1;
> spin_unlock(&cd->queue_lock);
I don't think we need the spin_lock here any more.
> } else {
> if (rp->offset + count > rq->len)
> @@ -889,7 +886,7 @@ static ssize_t cache_read(struct file *filp, char __user *buf, size_t count,
> if (rp->offset >= rq->len) {
> rp->offset = 0;
> spin_lock(&cd->queue_lock);
> - list_move(&rp->q.list, &rq->q.list);
> + rp->next_seqno = rq->seqno + 1;
> spin_unlock(&cd->queue_lock);
Nor here.
> }
> err = 0;
> @@ -901,7 +898,7 @@ static ssize_t cache_read(struct file *filp, char __user *buf, size_t count,
> rq->readers--;
> if (rq->readers == 0 &&
> !test_bit(CACHE_PENDING, &rq->item->flags)) {
> - list_del(&rq->q.list);
> + list_del(&rq->list);
> spin_unlock(&cd->queue_lock);
> cache_put(rq->item, cd);
> kfree(rq->buf);
> @@ -976,7 +973,6 @@ static __poll_t cache_poll(struct file *filp, poll_table *wait,
> {
> __poll_t mask;
> struct cache_reader *rp = filp->private_data;
> - struct cache_queue *cq;
>
> poll_wait(filp, &cd->queue_wait, wait);
>
> @@ -988,12 +984,8 @@ static __poll_t cache_poll(struct file *filp, poll_table *wait,
>
> spin_lock(&cd->queue_lock);
>
> - for (cq= &rp->q; &cq->list != &cd->queue;
> - cq = list_entry(cq->list.next, struct cache_queue, list))
> - if (!cq->reader) {
> - mask |= EPOLLIN | EPOLLRDNORM;
> - break;
> - }
> + if (cache_next_request(cd, rp->next_seqno))
> + mask |= EPOLLIN | EPOLLRDNORM;
> spin_unlock(&cd->queue_lock);
> return mask;
> }
> @@ -1004,7 +996,7 @@ static int cache_ioctl(struct inode *ino, struct file *filp,
> {
> int len = 0;
> struct cache_reader *rp = filp->private_data;
> - struct cache_queue *cq;
> + struct cache_request *rq;
>
> if (cmd != FIONREAD || !rp)
> return -EINVAL;
> @@ -1014,14 +1006,9 @@ static int cache_ioctl(struct inode *ino, struct file *filp,
> /* only find the length remaining in current request,
> * or the length of the next request
> */
> - for (cq= &rp->q; &cq->list != &cd->queue;
> - cq = list_entry(cq->list.next, struct cache_queue, list))
> - if (!cq->reader) {
> - struct cache_request *cr =
> - container_of(cq, struct cache_request, q);
> - len = cr->len - rp->offset;
> - break;
> - }
> + rq = cache_next_request(cd, rp->next_seqno);
> + if (rq)
> + len = rq->len - rp->offset;
> spin_unlock(&cd->queue_lock);
>
> return put_user(len, (int __user *)arg);
> @@ -1042,10 +1029,10 @@ static int cache_open(struct inode *inode, struct file *filp,
> return -ENOMEM;
> }
> rp->offset = 0;
> - rp->q.reader = 1;
> + rp->next_seqno = 0;
>
> spin_lock(&cd->queue_lock);
> - list_add(&rp->q.list, &cd->queue);
> + list_add(&rp->list, &cd->readers);
> spin_unlock(&cd->queue_lock);
> }
> if (filp->f_mode & FMODE_WRITE)
> @@ -1062,17 +1049,14 @@ static int cache_release(struct inode *inode, struct file *filp,
> if (rp) {
> spin_lock(&cd->queue_lock);
> if (rp->offset) {
> - struct cache_queue *cq;
> - for (cq= &rp->q; &cq->list != &cd->queue;
> - cq = list_entry(cq->list.next, struct cache_queue, list))
> - if (!cq->reader) {
> - container_of(cq, struct cache_request, q)
> - ->readers--;
> - break;
> - }
> + struct cache_request *rq;
> +
> + rq = cache_next_request(cd, rp->next_seqno);
> + if (rq)
> + rq->readers--;
Hmm.. The other place where we decrement ->readers we then check if it
is zero and if CACHE_PENDING is clear - and do something.
I suspect we should do that here.
This bug (if I'm right and it is a bug) if there before you patch, but
now might be a good time to fix it?
Thanks. Nice cleanups.
NeilBrown
> rp->offset = 0;
> }
> - list_del(&rp->q.list);
> + list_del(&rp->list);
> spin_unlock(&cd->queue_lock);
>
> filp->private_data = NULL;
> @@ -1091,27 +1075,24 @@ static int cache_release(struct inode *inode, struct file *filp,
>
> static void cache_dequeue(struct cache_detail *detail, struct cache_head *ch)
> {
> - struct cache_queue *cq, *tmp;
> - struct cache_request *cr;
> + struct cache_request *cr, *tmp;
> LIST_HEAD(dequeued);
>
> spin_lock(&detail->queue_lock);
> - list_for_each_entry_safe(cq, tmp, &detail->queue, list)
> - if (!cq->reader) {
> - cr = container_of(cq, struct cache_request, q);
> - if (cr->item != ch)
> - continue;
> - if (test_bit(CACHE_PENDING, &ch->flags))
> - /* Lost a race and it is pending again */
> - break;
> - if (cr->readers != 0)
> - continue;
> - list_move(&cr->q.list, &dequeued);
> - }
> + list_for_each_entry_safe(cr, tmp, &detail->requests, list) {
> + if (cr->item != ch)
> + continue;
> + if (test_bit(CACHE_PENDING, &ch->flags))
> + /* Lost a race and it is pending again */
> + break;
> + if (cr->readers != 0)
> + continue;
> + list_move(&cr->list, &dequeued);
> + }
> spin_unlock(&detail->queue_lock);
> while (!list_empty(&dequeued)) {
> - cr = list_entry(dequeued.next, struct cache_request, q.list);
> - list_del(&cr->q.list);
> + cr = list_entry(dequeued.next, struct cache_request, list);
> + list_del(&cr->list);
> cache_put(cr->item, detail);
> kfree(cr->buf);
> kfree(cr);
> @@ -1229,14 +1210,14 @@ static int cache_pipe_upcall(struct cache_detail *detail, struct cache_head *h)
> return -EAGAIN;
> }
>
> - crq->q.reader = 0;
> crq->buf = buf;
> crq->len = 0;
> crq->readers = 0;
> spin_lock(&detail->queue_lock);
> if (test_bit(CACHE_PENDING, &h->flags)) {
> crq->item = cache_get(h);
> - list_add_tail(&crq->q.list, &detail->queue);
> + crq->seqno = detail->next_seqno++;
> + list_add_tail(&crq->list, &detail->requests);
> trace_cache_entry_upcall(detail, h);
> } else
> /* Lost a race, no longer PENDING, so don't enqueue */
>
> --
> 2.53.0
>
>
On Sun, 2026-02-22 at 09:41 +1100, NeilBrown wrote:
> On Fri, 20 Feb 2026, Jeff Layton wrote:
> > Replace the single interleaved queue (which mixed cache_request and
> > cache_reader entries distinguished by a ->reader flag) with two
> > dedicated lists: cd->requests for upcall requests and cd->readers
> > for open file handles.
> >
> > Readers now track their position via a monotonically increasing
> > sequence number (next_seqno) rather than by their position in the
> > shared list. Each cache_request is assigned a seqno when enqueued,
> > and a new cache_next_request() helper finds the next request at or
> > after a given seqno.
> >
> > This eliminates the cache_queue wrapper struct entirely, simplifies
> > the reader-skipping loops in cache_read/cache_poll/cache_ioctl/
> > cache_release, and makes the data flow easier to reason about.
> >
> > Also, remove an obsolete comment. CACHE_UPCALLING hasn't existed
> > since before the git era started.
> >
> > Signed-off-by: Jeff Layton <jlayton@kernel.org>
> > ---
> > include/linux/sunrpc/cache.h | 4 +-
> > net/sunrpc/cache.c | 125 ++++++++++++++++++-------------------------
> > 2 files changed, 56 insertions(+), 73 deletions(-)
> >
> > diff --git a/include/linux/sunrpc/cache.h b/include/linux/sunrpc/cache.h
> > index 031379efba24d40f64ce346cf1032261d4b98d05..b1e595c2615bd4be4d9ad19f71a8f4d08bd74a9b 100644
> > --- a/include/linux/sunrpc/cache.h
> > +++ b/include/linux/sunrpc/cache.h
> > @@ -113,9 +113,11 @@ struct cache_detail {
> > int entries;
> >
> > /* fields for communication over channel */
> > - struct list_head queue;
> > + struct list_head requests;
> > + struct list_head readers;
> > spinlock_t queue_lock;
> > wait_queue_head_t queue_wait;
> > + u64 next_seqno;
> >
> > atomic_t writers; /* how many time is /channel open */
> > time64_t last_close; /* if no writers, when did last close */
> > diff --git a/net/sunrpc/cache.c b/net/sunrpc/cache.c
> > index aef2607b3d7ffb61a42b9ea2ec17947465c026dc..09389ce8b961fe0cb5a472bcf2d3dd0b3faa13a6 100644
> > --- a/net/sunrpc/cache.c
> > +++ b/net/sunrpc/cache.c
> > @@ -399,9 +399,11 @@ static struct delayed_work cache_cleaner;
> > void sunrpc_init_cache_detail(struct cache_detail *cd)
> > {
> > spin_lock_init(&cd->hash_lock);
> > - INIT_LIST_HEAD(&cd->queue);
> > + INIT_LIST_HEAD(&cd->requests);
> > + INIT_LIST_HEAD(&cd->readers);
> > spin_lock_init(&cd->queue_lock);
> > init_waitqueue_head(&cd->queue_wait);
> > + cd->next_seqno = 0;
> > spin_lock(&cache_list_lock);
> > cd->nextcheck = 0;
> > cd->entries = 0;
> > @@ -796,29 +798,20 @@ void cache_clean_deferred(void *owner)
> > * On read, you get a full request, or block.
> > * On write, an update request is processed.
> > * Poll works if anything to read, and always allows write.
> > - *
> > - * Implemented by linked list of requests. Each open file has
> > - * a ->private that also exists in this list. New requests are added
> > - * to the end and may wakeup and preceding readers.
> > - * New readers are added to the head. If, on read, an item is found with
> > - * CACHE_UPCALLING clear, we free it from the list.
> > - *
> > */
> >
> > -struct cache_queue {
> > - struct list_head list;
> > - int reader; /* if 0, then request */
> > -};
> > struct cache_request {
> > - struct cache_queue q;
> > + struct list_head list;
> > struct cache_head *item;
> > - char * buf;
> > + char *buf;
> > int len;
> > int readers;
> > + u64 seqno;
> > };
> > struct cache_reader {
> > - struct cache_queue q;
> > + struct list_head list;
> > int offset; /* if non-0, we have a refcnt on next request */
> > + u64 next_seqno;
> > };
> >
> > static int cache_request(struct cache_detail *detail,
> > @@ -833,6 +826,17 @@ static int cache_request(struct cache_detail *detail,
> > return PAGE_SIZE - len;
> > }
> >
> > +static struct cache_request *
> > +cache_next_request(struct cache_detail *cd, u64 seqno)
> > +{
> > + struct cache_request *rq;
> > +
> > + list_for_each_entry(rq, &cd->requests, list)
> > + if (rq->seqno >= seqno)
> > + return rq;
> > + return NULL;
> > +}
> > +
> > static ssize_t cache_read(struct file *filp, char __user *buf, size_t count,
> > loff_t *ppos, struct cache_detail *cd)
> > {
> > @@ -849,20 +853,13 @@ static ssize_t cache_read(struct file *filp, char __user *buf, size_t count,
> > again:
> > spin_lock(&cd->queue_lock);
> > /* need to find next request */
> > - while (rp->q.list.next != &cd->queue &&
> > - list_entry(rp->q.list.next, struct cache_queue, list)
> > - ->reader) {
> > - struct list_head *next = rp->q.list.next;
> > - list_move(&rp->q.list, next);
> > - }
> > - if (rp->q.list.next == &cd->queue) {
> > + rq = cache_next_request(cd, rp->next_seqno);
> > + if (!rq) {
> > spin_unlock(&cd->queue_lock);
> > inode_unlock(inode);
> > WARN_ON_ONCE(rp->offset);
> > return 0;
> > }
> > - rq = container_of(rp->q.list.next, struct cache_request, q.list);
> > - WARN_ON_ONCE(rq->q.reader);
> > if (rp->offset == 0)
> > rq->readers++;
> > spin_unlock(&cd->queue_lock);
> > @@ -877,7 +874,7 @@ static ssize_t cache_read(struct file *filp, char __user *buf, size_t count,
> > if (rp->offset == 0 && !test_bit(CACHE_PENDING, &rq->item->flags)) {
> > err = -EAGAIN;
> > spin_lock(&cd->queue_lock);
> > - list_move(&rp->q.list, &rq->q.list);
> > + rp->next_seqno = rq->seqno + 1;
> > spin_unlock(&cd->queue_lock);
>
> I don't think we need the spin_lock here any more.
>
Good point in both places.
>
> > } else {
> > if (rp->offset + count > rq->len)
> > @@ -889,7 +886,7 @@ static ssize_t cache_read(struct file *filp, char __user *buf, size_t count,
> > if (rp->offset >= rq->len) {
> > rp->offset = 0;
> > spin_lock(&cd->queue_lock);
> > - list_move(&rp->q.list, &rq->q.list);
> > + rp->next_seqno = rq->seqno + 1;
> > spin_unlock(&cd->queue_lock);
>
> Nor here.
>
>
> > }
> > err = 0;
> > @@ -901,7 +898,7 @@ static ssize_t cache_read(struct file *filp, char __user *buf, size_t count,
> > rq->readers--;
> > if (rq->readers == 0 &&
> > !test_bit(CACHE_PENDING, &rq->item->flags)) {
> > - list_del(&rq->q.list);
> > + list_del(&rq->list);
> > spin_unlock(&cd->queue_lock);
> > cache_put(rq->item, cd);
> > kfree(rq->buf);
> > @@ -976,7 +973,6 @@ static __poll_t cache_poll(struct file *filp, poll_table *wait,
> > {
> > __poll_t mask;
> > struct cache_reader *rp = filp->private_data;
> > - struct cache_queue *cq;
> >
> > poll_wait(filp, &cd->queue_wait, wait);
> >
> > @@ -988,12 +984,8 @@ static __poll_t cache_poll(struct file *filp, poll_table *wait,
> >
> > spin_lock(&cd->queue_lock);
> >
> > - for (cq= &rp->q; &cq->list != &cd->queue;
> > - cq = list_entry(cq->list.next, struct cache_queue, list))
> > - if (!cq->reader) {
> > - mask |= EPOLLIN | EPOLLRDNORM;
> > - break;
> > - }
> > + if (cache_next_request(cd, rp->next_seqno))
> > + mask |= EPOLLIN | EPOLLRDNORM;
> > spin_unlock(&cd->queue_lock);
> > return mask;
> > }
> > @@ -1004,7 +996,7 @@ static int cache_ioctl(struct inode *ino, struct file *filp,
> > {
> > int len = 0;
> > struct cache_reader *rp = filp->private_data;
> > - struct cache_queue *cq;
> > + struct cache_request *rq;
> >
> > if (cmd != FIONREAD || !rp)
> > return -EINVAL;
> > @@ -1014,14 +1006,9 @@ static int cache_ioctl(struct inode *ino, struct file *filp,
> > /* only find the length remaining in current request,
> > * or the length of the next request
> > */
> > - for (cq= &rp->q; &cq->list != &cd->queue;
> > - cq = list_entry(cq->list.next, struct cache_queue, list))
> > - if (!cq->reader) {
> > - struct cache_request *cr =
> > - container_of(cq, struct cache_request, q);
> > - len = cr->len - rp->offset;
> > - break;
> > - }
> > + rq = cache_next_request(cd, rp->next_seqno);
> > + if (rq)
> > + len = rq->len - rp->offset;
> > spin_unlock(&cd->queue_lock);
> >
> > return put_user(len, (int __user *)arg);
> > @@ -1042,10 +1029,10 @@ static int cache_open(struct inode *inode, struct file *filp,
> > return -ENOMEM;
> > }
> > rp->offset = 0;
> > - rp->q.reader = 1;
> > + rp->next_seqno = 0;
> >
> > spin_lock(&cd->queue_lock);
> > - list_add(&rp->q.list, &cd->queue);
> > + list_add(&rp->list, &cd->readers);
> > spin_unlock(&cd->queue_lock);
> > }
> > if (filp->f_mode & FMODE_WRITE)
> > @@ -1062,17 +1049,14 @@ static int cache_release(struct inode *inode, struct file *filp,
> > if (rp) {
> > spin_lock(&cd->queue_lock);
> > if (rp->offset) {
> > - struct cache_queue *cq;
> > - for (cq= &rp->q; &cq->list != &cd->queue;
> > - cq = list_entry(cq->list.next, struct cache_queue, list))
> > - if (!cq->reader) {
> > - container_of(cq, struct cache_request, q)
> > - ->readers--;
> > - break;
> > - }
> > + struct cache_request *rq;
> > +
> > + rq = cache_next_request(cd, rp->next_seqno);
> > + if (rq)
> > + rq->readers--;
>
> Hmm.. The other place where we decrement ->readers we then check if it
> is zero and if CACHE_PENDING is clear - and do something.
> I suspect we should do that here.
> This bug (if I'm right and it is a bug) if there before you patch, but
> now might be a good time to fix it?
>
Good catch. I'll add a patch to fix that preceding these patches, so it
can go to stable (if we think that's worthwhile).
> Thanks. Nice cleanups.
>
Thanks for the review! I'll send a v2 with the spinlock fixes and the
above bugfix.
> NeilBrown
>
>
> > rp->offset = 0;
> > }
> > - list_del(&rp->q.list);
> > + list_del(&rp->list);
> > spin_unlock(&cd->queue_lock);
> >
> > filp->private_data = NULL;
> > @@ -1091,27 +1075,24 @@ static int cache_release(struct inode *inode, struct file *filp,
> >
> > static void cache_dequeue(struct cache_detail *detail, struct cache_head *ch)
> > {
> > - struct cache_queue *cq, *tmp;
> > - struct cache_request *cr;
> > + struct cache_request *cr, *tmp;
> > LIST_HEAD(dequeued);
> >
> > spin_lock(&detail->queue_lock);
> > - list_for_each_entry_safe(cq, tmp, &detail->queue, list)
> > - if (!cq->reader) {
> > - cr = container_of(cq, struct cache_request, q);
> > - if (cr->item != ch)
> > - continue;
> > - if (test_bit(CACHE_PENDING, &ch->flags))
> > - /* Lost a race and it is pending again */
> > - break;
> > - if (cr->readers != 0)
> > - continue;
> > - list_move(&cr->q.list, &dequeued);
> > - }
> > + list_for_each_entry_safe(cr, tmp, &detail->requests, list) {
> > + if (cr->item != ch)
> > + continue;
> > + if (test_bit(CACHE_PENDING, &ch->flags))
> > + /* Lost a race and it is pending again */
> > + break;
> > + if (cr->readers != 0)
> > + continue;
> > + list_move(&cr->list, &dequeued);
> > + }
> > spin_unlock(&detail->queue_lock);
> > while (!list_empty(&dequeued)) {
> > - cr = list_entry(dequeued.next, struct cache_request, q.list);
> > - list_del(&cr->q.list);
> > + cr = list_entry(dequeued.next, struct cache_request, list);
> > + list_del(&cr->list);
> > cache_put(cr->item, detail);
> > kfree(cr->buf);
> > kfree(cr);
> > @@ -1229,14 +1210,14 @@ static int cache_pipe_upcall(struct cache_detail *detail, struct cache_head *h)
> > return -EAGAIN;
> > }
> >
> > - crq->q.reader = 0;
> > crq->buf = buf;
> > crq->len = 0;
> > crq->readers = 0;
> > spin_lock(&detail->queue_lock);
> > if (test_bit(CACHE_PENDING, &h->flags)) {
> > crq->item = cache_get(h);
> > - list_add_tail(&crq->q.list, &detail->queue);
> > + crq->seqno = detail->next_seqno++;
> > + list_add_tail(&crq->list, &detail->requests);
> > trace_cache_entry_upcall(detail, h);
> > } else
> > /* Lost a race, no longer PENDING, so don't enqueue */
> >
> > --
> > 2.53.0
> >
> >
--
Jeff Layton <jlayton@kernel.org>
© 2016 - 2026 Red Hat, Inc.