[PATCH 3/5] rcu: Use call_rcu() in synchronize_rcu()

Akihiko Odaki posted 5 patches 2 weeks, 2 days ago
Maintainers: "Alex Bennée" <alex.bennee@linaro.org>, Akihiko Odaki <odaki@rsg.ci.i.u-tokyo.ac.jp>, Dmitry Osipenko <dmitry.osipenko@collabora.com>, "Michael S. Tsirkin" <mst@redhat.com>, Paolo Bonzini <pbonzini@redhat.com>
[PATCH 3/5] rcu: Use call_rcu() in synchronize_rcu()
Posted by Akihiko Odaki 2 weeks, 2 days ago
Previously, synchronize_rcu() was a single-threaded implementation that
is protected with a mutex. It was used only in the RCU thread and tests,
and real users instead use call_rcu(), which relies on the RCU thread.

The usage of synchronize_rcu() in tests did not accurately represent
real use cases because it caused locking with the mutex, which never
happened in real use cases, and it did not exercise the logic in the
RCU thread.

Add a new implementation of synchronize_rcu() which uses call_rcu() to
represent real use cases in tests. The old synchronize_rcu() is now
renamed to enter_qs() and only used in the RCU thread, making the mutex
unnecessary.

Signed-off-by: Akihiko Odaki <odaki@rsg.ci.i.u-tokyo.ac.jp>
---
 util/rcu.c | 51 +++++++++++++++++++++++++++------------------------
 1 file changed, 27 insertions(+), 24 deletions(-)

diff --git a/util/rcu.c b/util/rcu.c
index acac9446ea98..3c4af9d213c8 100644
--- a/util/rcu.c
+++ b/util/rcu.c
@@ -38,7 +38,7 @@
 
 /*
  * Global grace period counter.  Bit 0 is always one in rcu_gp_ctr.
- * Bits 1 and above are defined in synchronize_rcu.
+ * Bits 1 and above are defined in enter_qs().
  */
 #define RCU_GP_LOCKED           (1UL << 0)
 #define RCU_GP_CTR              (1UL << 1)
@@ -52,7 +52,6 @@ QemuEvent rcu_gp_event;
 static int in_drain_call_rcu;
 static int rcu_call_count;
 static QemuMutex rcu_registry_lock;
-static QemuMutex rcu_sync_lock;
 
 /*
  * Check whether a quiescent state was crossed between the beginning of
@@ -111,7 +110,7 @@ static void wait_for_readers(void)
          *
          * If this is the last iteration, this barrier also prevents
          * frees from seeping upwards, and orders the two wait phases
-         * on architectures with 32-bit longs; see synchronize_rcu().
+         * on architectures with 32-bit longs; see enter_qs().
          */
         smp_mb_global();
 
@@ -137,9 +136,9 @@ static void wait_for_readers(void)
          * wait too much time.
          *
          * rcu_register_thread() may add nodes to &registry; it will not
-         * wake up synchronize_rcu, but that is okay because at least another
+         * wake up enter_qs(), but that is okay because at least another
          * thread must exit its RCU read-side critical section before
-         * synchronize_rcu is done.  The next iteration of the loop will
+         * enter_qs() is done.  The next iteration of the loop will
          * move the new thread's rcu_reader from &registry to &qsreaders,
          * because rcu_gp_ongoing() will return false.
          *
@@ -171,10 +170,8 @@ static void wait_for_readers(void)
     QLIST_SWAP(&registry, &qsreaders, node);
 }
 
-void synchronize_rcu(void)
+static void enter_qs(void)
 {
-    QEMU_LOCK_GUARD(&rcu_sync_lock);
-
     /* Write RCU-protected pointers before reading p_rcu_reader->ctr.
      * Pairs with smp_mb_placeholder() in rcu_read_lock().
      *
@@ -289,7 +286,7 @@ static void *call_rcu_thread(void *opaque)
 
         /*
          * Fetch rcu_call_count now, we only must process elements that were
-         * added before synchronize_rcu() starts.
+         * added before enter_qs() starts.
          */
         for (;;) {
             qemu_event_reset(&rcu_call_ready_event);
@@ -304,7 +301,7 @@ static void *call_rcu_thread(void *opaque)
             qemu_event_wait(&rcu_call_ready_event);
         }
 
-        synchronize_rcu();
+        enter_qs();
         qatomic_sub(&rcu_call_count, n);
         bql_lock();
         while (n > 0) {
@@ -337,15 +334,24 @@ void call_rcu1(struct rcu_head *node, void (*func)(struct rcu_head *node))
 }
 
 
-struct rcu_drain {
+typedef struct Sync {
     struct rcu_head rcu;
-    QemuEvent drain_complete_event;
-};
+    QemuEvent complete_event;
+} Sync;
 
-static void drain_rcu_callback(struct rcu_head *node)
+static void sync_rcu_callback(Sync *sync)
 {
-    struct rcu_drain *event = (struct rcu_drain *)node;
-    qemu_event_set(&event->drain_complete_event);
+    qemu_event_set(&sync->complete_event);
+}
+
+void synchronize_rcu(void)
+{
+    Sync sync;
+
+    qemu_event_init(&sync.complete_event, false);
+    call_rcu(&sync, sync_rcu_callback, rcu);
+    qemu_event_wait(&sync.complete_event);
+    qemu_event_destroy(&sync.complete_event);
 }
 
 /*
@@ -359,11 +365,11 @@ static void drain_rcu_callback(struct rcu_head *node)
 
 void drain_call_rcu(void)
 {
-    struct rcu_drain rcu_drain;
+    Sync sync;
     bool locked = bql_locked();
 
-    memset(&rcu_drain, 0, sizeof(struct rcu_drain));
-    qemu_event_init(&rcu_drain.drain_complete_event, false);
+    memset(&sync, 0, sizeof(sync));
+    qemu_event_init(&sync.complete_event, false);
 
     if (locked) {
         bql_unlock();
@@ -383,8 +389,8 @@ void drain_call_rcu(void)
      */
 
     qatomic_inc(&in_drain_call_rcu);
-    call_rcu1(&rcu_drain.rcu, drain_rcu_callback);
-    qemu_event_wait(&rcu_drain.drain_complete_event);
+    call_rcu(&sync, sync_rcu_callback, rcu);
+    qemu_event_wait(&sync.complete_event);
     qatomic_dec(&in_drain_call_rcu);
 
     if (locked) {
@@ -427,7 +433,6 @@ static void rcu_init_complete(void)
     QemuThread thread;
 
     qemu_mutex_init(&rcu_registry_lock);
-    qemu_mutex_init(&rcu_sync_lock);
     qemu_event_init(&rcu_gp_event, true);
 
     qemu_event_init(&rcu_call_ready_event, false);
@@ -460,7 +465,6 @@ static void rcu_init_lock(void)
         return;
     }
 
-    qemu_mutex_lock(&rcu_sync_lock);
     qemu_mutex_lock(&rcu_registry_lock);
 }
 
@@ -471,7 +475,6 @@ static void rcu_init_unlock(void)
     }
 
     qemu_mutex_unlock(&rcu_registry_lock);
-    qemu_mutex_unlock(&rcu_sync_lock);
 }
 
 static void rcu_init_child(void)

-- 
2.51.0