From nobody Mon May  5 00:09:57 2025
Delivered-To: importer@patchew.org
Received-SPF: pass (zoho.com: domain of gnu.org designates 208.118.235.17 as
 permitted sender) client-ip=208.118.235.17;
 envelope-from=qemu-devel-bounces+importer=patchew.org@nongnu.org;
 helo=lists.gnu.org;
Authentication-Results: mx.zoho.com;
	spf=pass (zoho.com: domain of gnu.org designates 208.118.235.17 as permitted
 sender)  smtp.mailfrom=qemu-devel-bounces+importer=patchew.org@nongnu.org;
Return-Path: <qemu-devel-bounces+importer=patchew.org@nongnu.org>
Received: from lists.gnu.org (lists.gnu.org [208.118.235.17]) by
 mx.zohomail.com
	with SMTPS id 1487680638522710.3598646912864;
 Tue, 21 Feb 2017 04:37:18 -0800 (PST)
Received: from localhost ([::1]:44130 helo=lists.gnu.org)
	by lists.gnu.org with esmtp (Exim 4.71)
	(envelope-from <qemu-devel-bounces+importer=patchew.org@nongnu.org>)
	id 1cg9gx-0000eU-Sm
	for importer@patchew.org; Tue, 21 Feb 2017 07:37:15 -0500
Received: from eggs.gnu.org ([2001:4830:134:3::10]:38847)
	by lists.gnu.org with esmtp (Exim 4.71)
	(envelope-from <stefanha@redhat.com>) id 1cg94V-000793-95
	for qemu-devel@nongnu.org; Tue, 21 Feb 2017 06:57:33 -0500
Received: from Debian-exim by eggs.gnu.org with spam-scanned (Exim 4.71)
	(envelope-from <stefanha@redhat.com>) id 1cg94T-0005Wh-HI
	for qemu-devel@nongnu.org; Tue, 21 Feb 2017 06:57:31 -0500
Received: from mx1.redhat.com ([209.132.183.28]:48326)
	by eggs.gnu.org with esmtps (TLS1.0:DHE_RSA_AES_256_CBC_SHA1:32)
	(Exim 4.71) (envelope-from <stefanha@redhat.com>) id 1cg94T-0005Wc-8W
	for qemu-devel@nongnu.org; Tue, 21 Feb 2017 06:57:29 -0500
Received: from int-mx10.intmail.prod.int.phx2.redhat.com
	(int-mx10.intmail.prod.int.phx2.redhat.com [10.5.11.23])
	(using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits))
	(No client certificate requested)
	by mx1.redhat.com (Postfix) with ESMTPS id 4E3051B11;
	Tue, 21 Feb 2017 11:57:29 +0000 (UTC)
Received: from localhost (ovpn-117-191.ams2.redhat.com [10.36.117.191])
	by int-mx10.intmail.prod.int.phx2.redhat.com (8.14.4/8.14.4) with ESMTP
	id v1LBvQxH004811; Tue, 21 Feb 2017 06:57:28 -0500
From: Stefan Hajnoczi <stefanha@redhat.com>
To: <qemu-devel@nongnu.org>
Date: Tue, 21 Feb 2017 11:56:39 +0000
Message-Id: <20170221115644.28264-20-stefanha@redhat.com>
In-Reply-To: <20170221115644.28264-1-stefanha@redhat.com>
References: <20170221115644.28264-1-stefanha@redhat.com>
X-Scanned-By: MIMEDefang 2.68 on 10.5.11.23
X-Greylist: Sender IP whitelisted, not delayed by milter-greylist-4.5.16
	(mx1.redhat.com [10.5.110.28]);
	Tue, 21 Feb 2017 11:57:29 +0000 (UTC)
X-detected-operating-system: by eggs.gnu.org: GNU/Linux 2.2.x-3.x [generic]
	[fuzzy]
X-Received-From: 209.132.183.28
Subject: [Qemu-devel] [PULL v2 19/24] coroutine-lock: make CoMutex
 thread-safe
X-BeenThere: qemu-devel@nongnu.org
X-Mailman-Version: 2.1.21
Precedence: list
List-Id: <qemu-devel.nongnu.org>
List-Unsubscribe: <https://lists.nongnu.org/mailman/options/qemu-devel>,
	<mailto:qemu-devel-request@nongnu.org?subject=unsubscribe>
List-Archive: <http://lists.nongnu.org/archive/html/qemu-devel/>
List-Post: <mailto:qemu-devel@nongnu.org>
List-Help: <mailto:qemu-devel-request@nongnu.org?subject=help>
List-Subscribe: <https://lists.nongnu.org/mailman/listinfo/qemu-devel>,
	<mailto:qemu-devel-request@nongnu.org?subject=subscribe>
Cc: Peter Maydell <peter.maydell@linaro.org>,
	Stefan Hajnoczi <stefanha@redhat.com>, Paolo Bonzini <pbonzini@redhat.com>
Errors-To: qemu-devel-bounces+importer=patchew.org@nongnu.org
Sender: "Qemu-devel" <qemu-devel-bounces+importer=patchew.org@nongnu.org>
X-ZohoMail: RSF_0  Z_629925259 SPT_0
Content-Transfer-Encoding: quoted-printable
MIME-Version: 1.0
Content-Type: text/plain; charset="utf-8"

From: Paolo Bonzini <pbonzini@redhat.com>

This uses the lock-free mutex described in the paper '"Blocking without
Locking", or LFTHREADS: A lock-free thread library' by Gidenstam and
Papatriantafilou.  The same technique is used in OSv, and in fact
the code is essentially a conversion to C of OSv's code.

[Added missing coroutine_fn in tests/test-aio-multithread.c.
--Stefan]

Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
Reviewed-by: Fam Zheng <famz@redhat.com>
Message-id: 20170213181244.16297-2-pbonzini@redhat.com
Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
---
 include/qemu/coroutine.h     |  17 ++++-
 tests/test-aio-multithread.c |  86 ++++++++++++++++++++++++
 util/qemu-coroutine-lock.c   | 155 +++++++++++++++++++++++++++++++++++++++=
+---
 util/trace-events            |   1 +
 4 files changed, 246 insertions(+), 13 deletions(-)

diff --git a/include/qemu/coroutine.h b/include/qemu/coroutine.h
index 12584ed..fce228f 100644
--- a/include/qemu/coroutine.h
+++ b/include/qemu/coroutine.h
@@ -160,10 +160,23 @@ bool qemu_co_queue_empty(CoQueue *queue);
 /**
  * Provides a mutex that can be used to synchronise coroutines
  */
+struct CoWaitRecord;
 typedef struct CoMutex {
-    bool locked;
+    /* Count of pending lockers; 0 for a free mutex, 1 for an
+     * uncontended mutex.
+     */
+    unsigned locked;
+
+    /* A queue of waiters.  Elements are added atomically in front of
+     * from_push.  to_pop is only populated, and popped from, by whoever
+     * is in charge of the next wakeup.  This can be an unlocker or,
+     * through the handoff protocol, a locker that is about to go to sleep.
+     */
+    QSLIST_HEAD(, CoWaitRecord) from_push, to_pop;
+
+    unsigned handoff, sequence;
+
     Coroutine *holder;
-    CoQueue queue;
 } CoMutex;
=20
 /**
diff --git a/tests/test-aio-multithread.c b/tests/test-aio-multithread.c
index 534807d..4fa2e9b 100644
--- a/tests/test-aio-multithread.c
+++ b/tests/test-aio-multithread.c
@@ -196,6 +196,88 @@ static void test_multi_co_schedule_10(void)
     test_multi_co_schedule(10);
 }
=20
+/* CoMutex thread-safety.  */
+
+static uint32_t atomic_counter;
+static uint32_t running;
+static uint32_t counter;
+static CoMutex comutex;
+
+static void coroutine_fn test_multi_co_mutex_entry(void *opaque)
+{
+    while (!atomic_mb_read(&now_stopping)) {
+        qemu_co_mutex_lock(&comutex);
+        counter++;
+        qemu_co_mutex_unlock(&comutex);
+
+        /* Increase atomic_counter *after* releasing the mutex.  Otherwise
+         * there is a chance (it happens about 1 in 3 runs) that the iothr=
ead
+         * exits before the coroutine is woken up, causing a spurious
+         * assertion failure.
+         */
+        atomic_inc(&atomic_counter);
+    }
+    atomic_dec(&running);
+}
+
+static void test_multi_co_mutex(int threads, int seconds)
+{
+    int i;
+
+    qemu_co_mutex_init(&comutex);
+    counter =3D 0;
+    atomic_counter =3D 0;
+    now_stopping =3D false;
+
+    create_aio_contexts();
+    assert(threads <=3D NUM_CONTEXTS);
+    running =3D threads;
+    for (i =3D 0; i < threads; i++) {
+        Coroutine *co1 =3D qemu_coroutine_create(test_multi_co_mutex_entry=
, NULL);
+        aio_co_schedule(ctx[i], co1);
+    }
+
+    g_usleep(seconds * 1000000);
+
+    atomic_mb_set(&now_stopping, true);
+    while (running > 0) {
+        g_usleep(100000);
+    }
+
+    join_aio_contexts();
+    g_test_message("%d iterations/second\n", counter / seconds);
+    g_assert_cmpint(counter, =3D=3D, atomic_counter);
+}
+
+/* Testing with NUM_CONTEXTS threads focuses on the queue.  The mutex howe=
ver
+ * is too contended (and the threads spend too much time in aio_poll)
+ * to actually stress the handoff protocol.
+ */
+static void test_multi_co_mutex_1(void)
+{
+    test_multi_co_mutex(NUM_CONTEXTS, 1);
+}
+
+static void test_multi_co_mutex_10(void)
+{
+    test_multi_co_mutex(NUM_CONTEXTS, 10);
+}
+
+/* Testing with fewer threads stresses the handoff protocol too.  Still, t=
he
+ * case where the locker _can_ pick up a handoff is very rare, happening
+ * about 10 times in 1 million, so increase the runtime a bit compared to
+ * other "quick" testcases that only run for 1 second.
+ */
+static void test_multi_co_mutex_2_3(void)
+{
+    test_multi_co_mutex(2, 3);
+}
+
+static void test_multi_co_mutex_2_30(void)
+{
+    test_multi_co_mutex(2, 30);
+}
+
 /* End of tests.  */
=20
 int main(int argc, char **argv)
@@ -206,8 +288,12 @@ int main(int argc, char **argv)
     g_test_add_func("/aio/multi/lifecycle", test_lifecycle);
     if (g_test_quick()) {
         g_test_add_func("/aio/multi/schedule", test_multi_co_schedule_1);
+        g_test_add_func("/aio/multi/mutex/contended", test_multi_co_mutex_=
1);
+        g_test_add_func("/aio/multi/mutex/handoff", test_multi_co_mutex_2_=
3);
     } else {
         g_test_add_func("/aio/multi/schedule", test_multi_co_schedule_10);
+        g_test_add_func("/aio/multi/mutex/contended", test_multi_co_mutex_=
10);
+        g_test_add_func("/aio/multi/mutex/handoff", test_multi_co_mutex_2_=
30);
     }
     return g_test_run();
 }
diff --git a/util/qemu-coroutine-lock.c b/util/qemu-coroutine-lock.c
index e6afd1a..25da9fa 100644
--- a/util/qemu-coroutine-lock.c
+++ b/util/qemu-coroutine-lock.c
@@ -20,6 +20,10 @@
  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING=
 FROM,
  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS =
IN
  * THE SOFTWARE.
+ *
+ * The lock-free mutex implementation is based on OSv
+ * (core/lfmutex.cc, include/lockfree/mutex.hh).
+ * Copyright (C) 2013 Cloudius Systems, Ltd.
  */
=20
 #include "qemu/osdep.h"
@@ -111,43 +115,172 @@ bool qemu_co_queue_empty(CoQueue *queue)
     return QSIMPLEQ_FIRST(&queue->entries) =3D=3D NULL;
 }
=20
+/* The wait records are handled with a multiple-producer, single-consumer
+ * lock-free queue.  There cannot be two concurrent pop_waiter() calls
+ * because pop_waiter() can only be called while mutex->handoff is zero.
+ * This can happen in three cases:
+ * - in qemu_co_mutex_unlock, before the hand-off protocol has started.
+ *   In this case, qemu_co_mutex_lock will see mutex->handoff =3D=3D 0 and
+ *   not take part in the handoff.
+ * - in qemu_co_mutex_lock, if it steals the hand-off responsibility from
+ *   qemu_co_mutex_unlock.  In this case, qemu_co_mutex_unlock will fail
+ *   the cmpxchg (it will see either 0 or the next sequence value) and
+ *   exit.  The next hand-off cannot begin until qemu_co_mutex_lock has
+ *   woken up someone.
+ * - in qemu_co_mutex_unlock, if it takes the hand-off token itself.
+ *   In this case another iteration starts with mutex->handoff =3D=3D 0;
+ *   a concurrent qemu_co_mutex_lock will fail the cmpxchg, and
+ *   qemu_co_mutex_unlock will go back to case (1).
+ *
+ * The following functions manage this queue.
+ */
+typedef struct CoWaitRecord {
+    Coroutine *co;
+    QSLIST_ENTRY(CoWaitRecord) next;
+} CoWaitRecord;
+
+static void push_waiter(CoMutex *mutex, CoWaitRecord *w)
+{
+    w->co =3D qemu_coroutine_self();
+    QSLIST_INSERT_HEAD_ATOMIC(&mutex->from_push, w, next);
+}
+
+static void move_waiters(CoMutex *mutex)
+{
+    QSLIST_HEAD(, CoWaitRecord) reversed;
+    QSLIST_MOVE_ATOMIC(&reversed, &mutex->from_push);
+    while (!QSLIST_EMPTY(&reversed)) {
+        CoWaitRecord *w =3D QSLIST_FIRST(&reversed);
+        QSLIST_REMOVE_HEAD(&reversed, next);
+        QSLIST_INSERT_HEAD(&mutex->to_pop, w, next);
+    }
+}
+
+static CoWaitRecord *pop_waiter(CoMutex *mutex)
+{
+    CoWaitRecord *w;
+
+    if (QSLIST_EMPTY(&mutex->to_pop)) {
+        move_waiters(mutex);
+        if (QSLIST_EMPTY(&mutex->to_pop)) {
+            return NULL;
+        }
+    }
+    w =3D QSLIST_FIRST(&mutex->to_pop);
+    QSLIST_REMOVE_HEAD(&mutex->to_pop, next);
+    return w;
+}
+
+static bool has_waiters(CoMutex *mutex)
+{
+    return QSLIST_EMPTY(&mutex->to_pop) || QSLIST_EMPTY(&mutex->from_push);
+}
+
 void qemu_co_mutex_init(CoMutex *mutex)
 {
     memset(mutex, 0, sizeof(*mutex));
-    qemu_co_queue_init(&mutex->queue);
 }
=20
-void coroutine_fn qemu_co_mutex_lock(CoMutex *mutex)
+static void coroutine_fn qemu_co_mutex_lock_slowpath(CoMutex *mutex)
 {
     Coroutine *self =3D qemu_coroutine_self();
+    CoWaitRecord w;
+    unsigned old_handoff;
=20
     trace_qemu_co_mutex_lock_entry(mutex, self);
+    w.co =3D self;
+    push_waiter(mutex, &w);
=20
-    while (mutex->locked) {
-        qemu_co_queue_wait(&mutex->queue);
+    /* This is the "Responsibility Hand-Off" protocol; a lock() picks from
+     * a concurrent unlock() the responsibility of waking somebody up.
+     */
+    old_handoff =3D atomic_mb_read(&mutex->handoff);
+    if (old_handoff &&
+        has_waiters(mutex) &&
+        atomic_cmpxchg(&mutex->handoff, old_handoff, 0) =3D=3D old_handoff=
) {
+        /* There can be no concurrent pops, because there can be only
+         * one active handoff at a time.
+         */
+        CoWaitRecord *to_wake =3D pop_waiter(mutex);
+        Coroutine *co =3D to_wake->co;
+        if (co =3D=3D self) {
+            /* We got the lock ourselves!  */
+            assert(to_wake =3D=3D &w);
+            return;
+        }
+
+        aio_co_wake(co);
     }
=20
-    mutex->locked =3D true;
-    mutex->holder =3D self;
-    self->locks_held++;
-
+    qemu_coroutine_yield();
     trace_qemu_co_mutex_lock_return(mutex, self);
 }
=20
+void coroutine_fn qemu_co_mutex_lock(CoMutex *mutex)
+{
+    Coroutine *self =3D qemu_coroutine_self();
+
+    if (atomic_fetch_inc(&mutex->locked) =3D=3D 0) {
+        /* Uncontended.  */
+        trace_qemu_co_mutex_lock_uncontended(mutex, self);
+    } else {
+        qemu_co_mutex_lock_slowpath(mutex);
+    }
+    mutex->holder =3D self;
+    self->locks_held++;
+}
+
 void coroutine_fn qemu_co_mutex_unlock(CoMutex *mutex)
 {
     Coroutine *self =3D qemu_coroutine_self();
=20
     trace_qemu_co_mutex_unlock_entry(mutex, self);
=20
-    assert(mutex->locked =3D=3D true);
+    assert(mutex->locked);
     assert(mutex->holder =3D=3D self);
     assert(qemu_in_coroutine());
=20
-    mutex->locked =3D false;
     mutex->holder =3D NULL;
     self->locks_held--;
-    qemu_co_queue_next(&mutex->queue);
+    if (atomic_fetch_dec(&mutex->locked) =3D=3D 1) {
+        /* No waiting qemu_co_mutex_lock().  Pfew, that was easy!  */
+        return;
+    }
+
+    for (;;) {
+        CoWaitRecord *to_wake =3D pop_waiter(mutex);
+        unsigned our_handoff;
+
+        if (to_wake) {
+            Coroutine *co =3D to_wake->co;
+            aio_co_wake(co);
+            break;
+        }
+
+        /* Some concurrent lock() is in progress (we know this because
+         * mutex->locked was >1) but it hasn't yet put itself on the wait
+         * queue.  Pick a sequence number for the handoff protocol (not 0).
+         */
+        if (++mutex->sequence =3D=3D 0) {
+            mutex->sequence =3D 1;
+        }
+
+        our_handoff =3D mutex->sequence;
+        atomic_mb_set(&mutex->handoff, our_handoff);
+        if (!has_waiters(mutex)) {
+            /* The concurrent lock has not added itself yet, so it
+             * will be able to pick our handoff.
+             */
+            break;
+        }
+
+        /* Try to do the handoff protocol ourselves; if somebody else has
+         * already taken it, however, we're done and they're responsible.
+         */
+        if (atomic_cmpxchg(&mutex->handoff, our_handoff, 0) !=3D our_hando=
ff) {
+            break;
+        }
+    }
=20
     trace_qemu_co_mutex_unlock_return(mutex, self);
 }
diff --git a/util/trace-events b/util/trace-events
index 65c9787..ac27d94 100644
--- a/util/trace-events
+++ b/util/trace-events
@@ -28,6 +28,7 @@ qemu_coroutine_terminate(void *co) "self %p"
=20
 # util/qemu-coroutine-lock.c
 qemu_co_queue_run_restart(void *co) "co %p"
+qemu_co_mutex_lock_uncontended(void *mutex, void *self) "mutex %p self %p"
 qemu_co_mutex_lock_entry(void *mutex, void *self) "mutex %p self %p"
 qemu_co_mutex_lock_return(void *mutex, void *self) "mutex %p self %p"
 qemu_co_mutex_unlock_entry(void *mutex, void *self) "mutex %p self %p"
--=20
2.9.3