[PATCH v8 10/12] bpf/rqspinlock: Use smp_cond_load_acquire_timeout()

Ankur Arora posted 12 patches 3 days, 8 hours ago
[PATCH v8 10/12] bpf/rqspinlock: Use smp_cond_load_acquire_timeout()
Posted by Ankur Arora 3 days, 8 hours ago
Switch out the conditional load interfaces used by rqspinlock
to smp_cond_read_acquire_timeout() and its wrapper,
atomic_cond_read_acquire_timeout().

Both these handle the timeout and amortize as needed, so use
clock_deadlock() directly instead of going through RES_CHECK_TIMEOUT().

For correctness, however, we need to ensure that the timeout case in
smp_cond_read_acquire_timeout() always agrees with that in
clock_deadlock(), which returns with -ETIMEDOUT.

For the most part, this is fine because smp_cond_load_acquire_timeout()
does not have an independent clock and does not do double reads from
clock_deadlock() which could cause its internal state to go out of
sync from that of clock_deadlock().

There is, however, an edge case where clock_deadlock() checks for:

        if (time > ts->timeout_end)
                return -ETIMEDOUT;

while smp_cond_load_acquire_timeout() checks for:

        __time_now = (time_expr_ns);
        if (__time_now <= 0 || __time_now >= __time_end) {
                VAL = READ_ONCE(*__PTR);
                break;
        }

This runs into a problem when (__time_now == __time_end) since
clock_deadlock() does not treat it as a timeout condition but
the second clause in the conditional above does.
So, add an equality check in clock_deadlock().

Finally, redefine SMP_TIMEOUT_POLL_COUNT to be 16k to be similar to the
spin-count used in RES_CHECK_TIMEOUT(). We only do this for non-arm64
as that uses a waiting implementation.

Cc: Kumar Kartikeya Dwivedi <memxor@gmail.com>
Cc: Alexei Starovoitov <ast@kernel.org>
Cc: bpf@vger.kernel.org
Signed-off-by: Ankur Arora <ankur.a.arora@oracle.com>
---
Notes:
  - change the check in clock_deadlock()

 kernel/bpf/rqspinlock.c | 37 ++++++++++++++++++++-----------------
 1 file changed, 20 insertions(+), 17 deletions(-)

diff --git a/kernel/bpf/rqspinlock.c b/kernel/bpf/rqspinlock.c
index ac9b3572e42f..2a361c4c7393 100644
--- a/kernel/bpf/rqspinlock.c
+++ b/kernel/bpf/rqspinlock.c
@@ -215,7 +215,7 @@ static noinline s64 clock_deadlock(rqspinlock_t *lock, u32 mask,
 	}
 
 	time = ktime_get_mono_fast_ns();
-	if (time > ts->timeout_end)
+	if (time >= ts->timeout_end)
 		return -ETIMEDOUT;
 
 	/*
@@ -235,20 +235,14 @@ static noinline s64 clock_deadlock(rqspinlock_t *lock, u32 mask,
 }
 
 /*
- * Do not amortize with spins when res_smp_cond_load_acquire is defined,
- * as the macro does internal amortization for us.
+ * Amortize timeout check for busy-wait loops.
  */
-#ifndef res_smp_cond_load_acquire
 #define RES_CHECK_TIMEOUT(ts, ret, mask)                              \
 	({                                                            \
 		if (!(ts).spin++)                                     \
 			(ret) = clock_deadlock((lock), (mask), &(ts));\
 		(ret);                                                \
 	})
-#else
-#define RES_CHECK_TIMEOUT(ts, ret, mask)			      \
-	({ (ret) = clock_deadlock((lock), (mask), &(ts)); })
-#endif
 
 /*
  * Initialize the 'spin' member.
@@ -262,6 +256,18 @@ static noinline s64 clock_deadlock(rqspinlock_t *lock, u32 mask,
  */
 #define RES_RESET_TIMEOUT(ts, _duration) ({ (ts).timeout_end = 0; (ts).duration = _duration; })
 
+/*
+ * Limit how often we invoke clock_deadlock() while spin-waiting in
+ * smp_cond_load_acquire_timeout() or atomic_cond_read_acquire_timeout().
+ *
+ * (ARM64 generally uses a waited implementation so we use the default
+ * value there.)
+ */
+#ifndef CONFIG_ARM64
+#undef SMP_TIMEOUT_POLL_COUNT
+#define SMP_TIMEOUT_POLL_COUNT	(16*1024)
+#endif
+
 /*
  * Provide a test-and-set fallback for cases when queued spin lock support is
  * absent from the architecture.
@@ -312,12 +318,6 @@ EXPORT_SYMBOL_GPL(resilient_tas_spin_lock);
  */
 static DEFINE_PER_CPU_ALIGNED(struct qnode, rqnodes[_Q_MAX_NODES]);
 
-#ifndef res_smp_cond_load_acquire
-#define res_smp_cond_load_acquire(v, c) smp_cond_load_acquire(v, c)
-#endif
-
-#define res_atomic_cond_read_acquire(v, c) res_smp_cond_load_acquire(&(v)->counter, (c))
-
 /**
  * resilient_queued_spin_lock_slowpath - acquire the queued spinlock
  * @lock: Pointer to queued spinlock structure
@@ -415,7 +415,9 @@ int __lockfunc resilient_queued_spin_lock_slowpath(rqspinlock_t *lock, u32 val)
 	 */
 	if (val & _Q_LOCKED_MASK) {
 		RES_RESET_TIMEOUT(ts, RES_DEF_TIMEOUT);
-		res_smp_cond_load_acquire(&lock->locked, !VAL || RES_CHECK_TIMEOUT(ts, timeout_err, _Q_LOCKED_MASK) < 0);
+		smp_cond_load_acquire_timeout(&lock->locked, !VAL,
+					      (timeout_err = clock_deadlock(lock, _Q_LOCKED_MASK, &ts)),
+					      ts.duration);
 	}
 
 	if (timeout_err < 0) {
@@ -577,8 +579,9 @@ int __lockfunc resilient_queued_spin_lock_slowpath(rqspinlock_t *lock, u32 val)
 	 * us.
 	 */
 	RES_RESET_TIMEOUT(ts, RES_DEF_TIMEOUT * 2);
-	val = res_atomic_cond_read_acquire(&lock->val, !(VAL & _Q_LOCKED_PENDING_MASK) ||
-					   RES_CHECK_TIMEOUT(ts, timeout_err, _Q_LOCKED_PENDING_MASK) < 0);
+	val = atomic_cond_read_acquire_timeout(&lock->val, !(VAL & _Q_LOCKED_PENDING_MASK),
+					       (timeout_err = clock_deadlock(lock, _Q_LOCKED_PENDING_MASK, &ts)),
+					       ts.duration);
 
 	/* Disable queue destruction when we detect deadlocks. */
 	if (timeout_err == -EDEADLK) {
-- 
2.31.1
Re: [PATCH v8 10/12] bpf/rqspinlock: Use smp_cond_load_acquire_timeout()
Posted by Alexei Starovoitov 2 days, 15 hours ago
On Sun, Dec 14, 2025 at 8:51 PM Ankur Arora <ankur.a.arora@oracle.com> wrote:
>
>  /**
>   * resilient_queued_spin_lock_slowpath - acquire the queued spinlock
>   * @lock: Pointer to queued spinlock structure
> @@ -415,7 +415,9 @@ int __lockfunc resilient_queued_spin_lock_slowpath(rqspinlock_t *lock, u32 val)
>          */
>         if (val & _Q_LOCKED_MASK) {
>                 RES_RESET_TIMEOUT(ts, RES_DEF_TIMEOUT);
> -               res_smp_cond_load_acquire(&lock->locked, !VAL || RES_CHECK_TIMEOUT(ts, timeout_err, _Q_LOCKED_MASK) < 0);
> +               smp_cond_load_acquire_timeout(&lock->locked, !VAL,
> +                                             (timeout_err = clock_deadlock(lock, _Q_LOCKED_MASK, &ts)),
> +                                             ts.duration);

I'm pretty sure we already discussed this and pointed out that
this approach is not acceptable.
We cannot call ktime_get_mono_fast_ns() first.
That's why RES_CHECK_TIMEOUT() exists and it does
if (!(ts).spin++)
before doing the first check_timeout() that will do ktime_get_mono_fast_ns().
Above is a performance critical lock acquisition path where
pending is spinning on the lock word waiting for the owner to
release the lock.
Adding unconditional ktime_get_mono_fast_ns() will destroy
performance for quick critical section.
Re: [PATCH v8 10/12] bpf/rqspinlock: Use smp_cond_load_acquire_timeout()
Posted by Ankur Arora 2 days, 5 hours ago
Alexei Starovoitov <alexei.starovoitov@gmail.com> writes:

> On Sun, Dec 14, 2025 at 8:51 PM Ankur Arora <ankur.a.arora@oracle.com> wrote:
>>
>>  /**
>>   * resilient_queued_spin_lock_slowpath - acquire the queued spinlock
>>   * @lock: Pointer to queued spinlock structure
>> @@ -415,7 +415,9 @@ int __lockfunc resilient_queued_spin_lock_slowpath(rqspinlock_t *lock, u32 val)
>>          */
>>         if (val & _Q_LOCKED_MASK) {
>>                 RES_RESET_TIMEOUT(ts, RES_DEF_TIMEOUT);
>> -               res_smp_cond_load_acquire(&lock->locked, !VAL || RES_CHECK_TIMEOUT(ts, timeout_err, _Q_LOCKED_MASK) < 0);
>> +               smp_cond_load_acquire_timeout(&lock->locked, !VAL,
>> +                                             (timeout_err = clock_deadlock(lock, _Q_LOCKED_MASK, &ts)),
>> +                                             ts.duration);
>
> I'm pretty sure we already discussed this and pointed out that
> this approach is not acceptable.

Where? I don't see any mail on this at all.

In any case your technical point below is quite reasonable. It's better
to lead with that instead of peremptorily declaring what you find
acceptable and what not.

> We cannot call ktime_get_mono_fast_ns() first.
> That's why RES_CHECK_TIMEOUT() exists and it does
> if (!(ts).spin++)
> before doing the first check_timeout() that will do ktime_get_mono_fast_ns().
> Above is a performance critical lock acquisition path where
> pending is spinning on the lock word waiting for the owner to
> release the lock.
> Adding unconditional ktime_get_mono_fast_ns() will destroy
> performance for quick critical section.

Yes that makes sense.

This is not ideal, but how about something like this:

  #define smp_cond_load_relaxed_timeout(ptr, cond_expr,                 \
                                      time_expr_ns, timeout_ns)         \
  ({                                                                    \
        typeof(ptr) __PTR = (ptr);                                      \
        __unqual_scalar_typeof(*ptr) VAL;                               \
        u32 __n = 0, __spin = SMP_TIMEOUT_POLL_COUNT;                   \
        s64 __timeout = (s64)timeout_ns;                                \
        s64 __time_now, __time_end = 0;                                 \
                                                                        \
        for (;;) {                                                      \
                VAL = READ_ONCE(*__PTR);                                \
                if (cond_expr)                                          \
                        break;                                          \
                cpu_poll_relax(__PTR, VAL, __timeout);                  \
                if (++__n < __spin)                                     \
                        continue;                                       \
                __time_now = (s64)(time_expr_ns);                       \
                if (unlikely(__time_end == 0))                          \
                        __time_end = __time_now + __timeout;            \
                __timeout = __time_end - __time_now;                    \
                if (__time_now <= 0 || __timeout <= 0) {                \
                        VAL = READ_ONCE(*__PTR);                        \
                        break;                                          \
                }                                                       \
                __n = 0;                                                \
        }                                                               \
                                                                        \
        (typeof(*ptr))VAL;                                              \
  })

The problem with this version is that there's no way to know how much
time has passed in the first cpu_poll_relax()). So for the waiting case
this has a builtin overshoot of up to __timeout for the WFET case.
And ~100us for WFE and ~2us for x86 cpu_relax.

Of course we could specify a small __timeout value for the first iteration
which would help WFET.

Anyway let me take another look at this tomorrow.

But, that is more complexity.

Opinions?

--
ankur