[PATCH v7 07/13] multifd: Make flags field thread local

Juan Quintela posted 13 patches 2 years, 5 months ago
Maintainers: Eduardo Habkost <eduardo@habkost.net>, Marcel Apfelbaum <marcel.apfelbaum@gmail.com>, "Philippe Mathieu-Daudé" <f4bug@amsat.org>, Yanan Wang <wangyanan55@huawei.com>, Juan Quintela <quintela@redhat.com>, "Dr. David Alan Gilbert" <dgilbert@redhat.com>
There is a newer version of this series
[PATCH v7 07/13] multifd: Make flags field thread local
Posted by Juan Quintela 2 years, 5 months ago
Use of flags with respect to locking was incensistant.  For the
sending side:
- it was set to 0 with mutex held on the multifd channel.
- MULTIFD_FLAG_SYNC was set with mutex held on the migration thread.
- Everything else was done without the mutex held on the multifd channel.

On the reception side, it is not used on the migration thread, only on
the multifd channels threads.

So we move it to the multifd channels thread only variables, and we
introduce a new bool sync_needed on the send side to pass that information.

Signed-off-by: Juan Quintela <quintela@redhat.com>
---
 migration/multifd.h | 10 ++++++----
 migration/multifd.c | 23 +++++++++++++----------
 2 files changed, 19 insertions(+), 14 deletions(-)

diff --git a/migration/multifd.h b/migration/multifd.h
index 8a45dda58c..af8ce8921d 100644
--- a/migration/multifd.h
+++ b/migration/multifd.h
@@ -98,12 +98,12 @@ typedef struct {
     bool running;
     /* should this thread finish */
     bool quit;
-    /* multifd flags for each packet */
-    uint32_t flags;
     /* global number of generated multifd packets */
     uint64_t packet_num;
     /* How many bytes have we sent on the last packet */
     uint64_t sent_bytes;
+    /* Do we need to do an iteration sync */
+    bool sync_needed;
     /* thread has work to do */
     int pending_job;
     /* array of pages to sent.
@@ -117,6 +117,8 @@ typedef struct {
 
     /* pointer to the packet */
     MultiFDPacket_t *packet;
+    /* multifd flags for each packet */
+    uint32_t flags;
     /* size of the next packet that contains pages */
     uint32_t next_packet_size;
     /* packets sent through this channel */
@@ -163,8 +165,6 @@ typedef struct {
     bool running;
     /* should this thread finish */
     bool quit;
-    /* multifd flags for each packet */
-    uint32_t flags;
     /* global number of generated multifd packets */
     uint64_t packet_num;
 
@@ -172,6 +172,8 @@ typedef struct {
 
     /* pointer to the packet */
     MultiFDPacket_t *packet;
+    /* multifd flags for each packet */
+    uint32_t flags;
     /* size of the next packet that contains pages */
     uint32_t next_packet_size;
     /* packets sent through this channel */
diff --git a/migration/multifd.c b/migration/multifd.c
index eef47c274f..69b9d7cf98 100644
--- a/migration/multifd.c
+++ b/migration/multifd.c
@@ -602,7 +602,7 @@ int multifd_send_sync_main(QEMUFile *f)
         }
 
         p->packet_num = multifd_send_state->packet_num++;
-        p->flags |= MULTIFD_FLAG_SYNC;
+        p->sync_needed = true;
         p->pending_job++;
         qemu_mutex_unlock(&p->mutex);
         qemu_sem_post(&p->sem);
@@ -656,7 +656,11 @@ static void *multifd_send_thread(void *opaque)
 
         if (p->pending_job) {
             uint64_t packet_num = p->packet_num;
-            uint32_t flags = p->flags;
+            p->flags = 0;
+            if (p->sync_needed) {
+                p->flags |= MULTIFD_FLAG_SYNC;
+                p->sync_needed = false;
+            }
             p->normal_num = 0;
 
             if (use_zero_copy_send) {
@@ -678,14 +682,13 @@ static void *multifd_send_thread(void *opaque)
                 }
             }
             multifd_send_fill_packet(p);
-            p->flags = 0;
             p->num_packets++;
             p->total_normal_pages += p->normal_num;
             p->pages->num = 0;
             p->pages->block = NULL;
             qemu_mutex_unlock(&p->mutex);
 
-            trace_multifd_send(p->id, packet_num, p->normal_num, flags,
+            trace_multifd_send(p->id, packet_num, p->normal_num, p->flags,
                                p->next_packet_size);
 
             if (use_zero_copy_send) {
@@ -713,7 +716,7 @@ static void *multifd_send_thread(void *opaque)
             p->pending_job--;
             qemu_mutex_unlock(&p->mutex);
 
-            if (flags & MULTIFD_FLAG_SYNC) {
+            if (p->flags & MULTIFD_FLAG_SYNC) {
                 qemu_sem_post(&p->sem_sync);
             }
             qemu_sem_post(&multifd_send_state->channels_ready);
@@ -1090,7 +1093,7 @@ static void *multifd_recv_thread(void *opaque)
     rcu_register_thread();
 
     while (true) {
-        uint32_t flags;
+        bool sync_needed = false;
 
         if (p->quit) {
             break;
@@ -1112,11 +1115,11 @@ static void *multifd_recv_thread(void *opaque)
             break;
         }
 
-        flags = p->flags;
+        trace_multifd_recv(p->id, p->packet_num, p->normal_num, p->flags,
+                           p->next_packet_size);
+        sync_needed = p->flags & MULTIFD_FLAG_SYNC;
         /* recv methods don't know how to handle the SYNC flag */
         p->flags &= ~MULTIFD_FLAG_SYNC;
-        trace_multifd_recv(p->id, p->packet_num, p->normal_num, flags,
-                           p->next_packet_size);
         p->num_packets++;
         p->total_normal_pages += p->normal_num;
         qemu_mutex_unlock(&p->mutex);
@@ -1128,7 +1131,7 @@ static void *multifd_recv_thread(void *opaque)
             }
         }
 
-        if (flags & MULTIFD_FLAG_SYNC) {
+        if (sync_needed) {
             qemu_sem_post(&multifd_recv_state->sem_sync);
             qemu_sem_wait(&p->sem_sync);
         }
-- 
2.35.3
Re: [PATCH v7 07/13] multifd: Make flags field thread local
Posted by Dr. David Alan Gilbert 2 years, 3 months ago
* Juan Quintela (quintela@redhat.com) wrote:
> Use of flags with respect to locking was incensistant.  For the
> sending side:
> - it was set to 0 with mutex held on the multifd channel.
> - MULTIFD_FLAG_SYNC was set with mutex held on the migration thread.
> - Everything else was done without the mutex held on the multifd channel.
> 
> On the reception side, it is not used on the migration thread, only on
> the multifd channels threads.
> 
> So we move it to the multifd channels thread only variables, and we
> introduce a new bool sync_needed on the send side to pass that information.

Does this need rework after Leo's recent changes?

Dave

> Signed-off-by: Juan Quintela <quintela@redhat.com>
> ---
>  migration/multifd.h | 10 ++++++----
>  migration/multifd.c | 23 +++++++++++++----------
>  2 files changed, 19 insertions(+), 14 deletions(-)
> 
> diff --git a/migration/multifd.h b/migration/multifd.h
> index 8a45dda58c..af8ce8921d 100644
> --- a/migration/multifd.h
> +++ b/migration/multifd.h
> @@ -98,12 +98,12 @@ typedef struct {
>      bool running;
>      /* should this thread finish */
>      bool quit;
> -    /* multifd flags for each packet */
> -    uint32_t flags;
>      /* global number of generated multifd packets */
>      uint64_t packet_num;
>      /* How many bytes have we sent on the last packet */
>      uint64_t sent_bytes;
> +    /* Do we need to do an iteration sync */
> +    bool sync_needed;
>      /* thread has work to do */
>      int pending_job;
>      /* array of pages to sent.
> @@ -117,6 +117,8 @@ typedef struct {
>  
>      /* pointer to the packet */
>      MultiFDPacket_t *packet;
> +    /* multifd flags for each packet */
> +    uint32_t flags;
>      /* size of the next packet that contains pages */
>      uint32_t next_packet_size;
>      /* packets sent through this channel */
> @@ -163,8 +165,6 @@ typedef struct {
>      bool running;
>      /* should this thread finish */
>      bool quit;
> -    /* multifd flags for each packet */
> -    uint32_t flags;
>      /* global number of generated multifd packets */
>      uint64_t packet_num;
>  
> @@ -172,6 +172,8 @@ typedef struct {
>  
>      /* pointer to the packet */
>      MultiFDPacket_t *packet;
> +    /* multifd flags for each packet */
> +    uint32_t flags;
>      /* size of the next packet that contains pages */
>      uint32_t next_packet_size;
>      /* packets sent through this channel */
> diff --git a/migration/multifd.c b/migration/multifd.c
> index eef47c274f..69b9d7cf98 100644
> --- a/migration/multifd.c
> +++ b/migration/multifd.c
> @@ -602,7 +602,7 @@ int multifd_send_sync_main(QEMUFile *f)
>          }
>  
>          p->packet_num = multifd_send_state->packet_num++;
> -        p->flags |= MULTIFD_FLAG_SYNC;
> +        p->sync_needed = true;
>          p->pending_job++;
>          qemu_mutex_unlock(&p->mutex);
>          qemu_sem_post(&p->sem);
> @@ -656,7 +656,11 @@ static void *multifd_send_thread(void *opaque)
>  
>          if (p->pending_job) {
>              uint64_t packet_num = p->packet_num;
> -            uint32_t flags = p->flags;
> +            p->flags = 0;
> +            if (p->sync_needed) {
> +                p->flags |= MULTIFD_FLAG_SYNC;
> +                p->sync_needed = false;
> +            }
>              p->normal_num = 0;
>  
>              if (use_zero_copy_send) {
> @@ -678,14 +682,13 @@ static void *multifd_send_thread(void *opaque)
>                  }
>              }
>              multifd_send_fill_packet(p);
> -            p->flags = 0;
>              p->num_packets++;
>              p->total_normal_pages += p->normal_num;
>              p->pages->num = 0;
>              p->pages->block = NULL;
>              qemu_mutex_unlock(&p->mutex);
>  
> -            trace_multifd_send(p->id, packet_num, p->normal_num, flags,
> +            trace_multifd_send(p->id, packet_num, p->normal_num, p->flags,
>                                 p->next_packet_size);
>  
>              if (use_zero_copy_send) {
> @@ -713,7 +716,7 @@ static void *multifd_send_thread(void *opaque)
>              p->pending_job--;
>              qemu_mutex_unlock(&p->mutex);
>  
> -            if (flags & MULTIFD_FLAG_SYNC) {
> +            if (p->flags & MULTIFD_FLAG_SYNC) {
>                  qemu_sem_post(&p->sem_sync);
>              }
>              qemu_sem_post(&multifd_send_state->channels_ready);
> @@ -1090,7 +1093,7 @@ static void *multifd_recv_thread(void *opaque)
>      rcu_register_thread();
>  
>      while (true) {
> -        uint32_t flags;
> +        bool sync_needed = false;
>  
>          if (p->quit) {
>              break;
> @@ -1112,11 +1115,11 @@ static void *multifd_recv_thread(void *opaque)
>              break;
>          }
>  
> -        flags = p->flags;
> +        trace_multifd_recv(p->id, p->packet_num, p->normal_num, p->flags,
> +                           p->next_packet_size);
> +        sync_needed = p->flags & MULTIFD_FLAG_SYNC;
>          /* recv methods don't know how to handle the SYNC flag */
>          p->flags &= ~MULTIFD_FLAG_SYNC;
> -        trace_multifd_recv(p->id, p->packet_num, p->normal_num, flags,
> -                           p->next_packet_size);
>          p->num_packets++;
>          p->total_normal_pages += p->normal_num;
>          qemu_mutex_unlock(&p->mutex);
> @@ -1128,7 +1131,7 @@ static void *multifd_recv_thread(void *opaque)
>              }
>          }
>  
> -        if (flags & MULTIFD_FLAG_SYNC) {
> +        if (sync_needed) {
>              qemu_sem_post(&multifd_recv_state->sem_sync);
>              qemu_sem_wait(&p->sem_sync);
>          }
> -- 
> 2.35.3
> 
-- 
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK