[Qemu-devel] [PATCH v10 12/14] migration: Sent the page list over the normal thread

Juan Quintela posted 14 patches 7 years, 9 months ago
There is a newer version of this series
[Qemu-devel] [PATCH v10 12/14] migration: Sent the page list over the normal thread
Posted by Juan Quintela 7 years, 9 months ago
Signed-off-by: Juan Quintela <quintela@redhat.com>
---
 migration/ram.c | 69 +++++++++++++++++++++++++++++++++++++++++++++++++--------
 1 file changed, 60 insertions(+), 9 deletions(-)

diff --git a/migration/ram.c b/migration/ram.c
index b1ad7b2730..f636c7da0a 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -411,6 +411,19 @@ static void compress_threads_save_setup(void)
 /* used to continue on the same multifd group */
 #define MULTIFD_CONTINUE UINT16_MAX
 
+#define MULTIFD_MAGIC 0x112233d
+#define MULTIFD_VERSION 1
+
+typedef struct {
+    uint32_t magic;
+    uint32_t version;
+    uint32_t size;
+    uint32_t used;
+    uint32_t seq;
+    char ramblock[256];
+    ram_addr_t offset[];
+} __attribute__((packed)) MultiFDPacket_t;
+
 typedef struct {
     /* number of used pages */
     uint32_t used;
@@ -420,6 +433,8 @@ typedef struct {
     uint32_t seq;
     struct iovec *iov;
     RAMBlock *block;
+    uint32_t packet_len;
+    MultiFDPacket_t *packet;
 } multifd_pages_t;
 
 struct MultiFDSendParams {
@@ -456,6 +471,8 @@ static void multifd_pages_init(multifd_pages_t **ppages, size_t size)
 
     pages->allocated = size;
     pages->iov = g_new0(struct iovec, size);
+    pages->packet_len = sizeof(MultiFDPacket_t) + sizeof(ram_addr_t) * size;
+    pages->packet = g_malloc0(pages->packet_len);
     *ppages = pages;
 }
 
@@ -467,6 +484,9 @@ static void multifd_pages_clear(multifd_pages_t *pages)
     pages->block = NULL;
     g_free(pages->iov);
     pages->iov = NULL;
+    pages->packet_len = 0;
+    g_free(pages->packet);
+    pages->packet = NULL;
     g_free(pages);
 }
 
@@ -553,16 +573,27 @@ static void *multifd_send_thread(void *opaque)
             break;
         }
         if (p->pages->used) {
+            MultiFDPacket_t *packet = p->pages->packet;
             Error *local_err = NULL;
             size_t ret;
-            uint32_t used;
 
-            used = p->pages->used;
+            packet->used = p->pages->used;
             p->pages->used = 0;
             qemu_mutex_unlock(&p->mutex);
-
-            trace_multifd_send(p->id, p->pages->seq, used);
-            ret = qio_channel_writev_all(p->c, p->pages->iov, used, &local_err);
+            packet->magic = MULTIFD_MAGIC;
+            packet->version = MULTIFD_VERSION;
+            strncpy(packet->ramblock, p->pages->block->idstr, 256);
+            packet->size = migrate_multifd_page_count();
+            packet->seq = p->pages->seq;
+            ret = qio_channel_write_all(p->c, (void *)packet,
+                                        p->pages->packet_len, &local_err);
+            if (ret != 0) {
+                terminate_multifd_send_threads(local_err);
+                return NULL;
+            }
+            trace_multifd_send(p->id, p->pages->seq, packet->used);
+            ret = qio_channel_writev_all(p->c, p->pages->iov,
+                                         packet->used, &local_err);
             if (ret != 0) {
                 terminate_multifd_send_threads(local_err);
                 return NULL;
@@ -645,6 +676,7 @@ static uint16_t multifd_send_page(RAMBlock *block, ram_addr_t offset,
         pages->block = block;
     }
 
+    pages->packet->offset[pages->used] = offset;
     pages->iov[pages->used].iov_base = block->host + offset;
     pages->iov[pages->used].iov_len = TARGET_PAGE_SIZE;
     pages->used++;
@@ -776,16 +808,35 @@ static void *multifd_recv_thread(void *opaque)
             break;
         }
         if (p->pages->used) {
+            MultiFDPacket_t *packet = p->pages->packet;
+            RAMBlock *block;
             Error *local_err = NULL;
             size_t ret;
-            uint32_t used;
+            int i;
 
-            used = p->pages->used;
             p->pages->used = 0;
             qemu_mutex_unlock(&p->mutex);
 
-            trace_multifd_recv(p->id, p->pages->seq, used);
-            ret = qio_channel_readv_all(p->c, p->pages->iov, used, &local_err);
+            ret = qio_channel_read_all(p->c, (void *)packet,
+                                       p->pages->packet_len, &local_err);
+            if (ret != 0) {
+                terminate_multifd_recv_threads(local_err);
+                return NULL;
+            }
+            block = qemu_ram_block_by_name(packet->ramblock);
+            p->pages->seq = packet->seq;
+            for (i = 0; i < packet->used; i++) {
+                if (block->host + packet->offset[i]
+                    != p->pages->iov[i].iov_base) {
+                    printf("page offset %d packet %p pages %p\n", i,
+                           block->host + packet->offset[i],
+                           p->pages->iov[i].iov_base);
+                    break;
+                }
+            }
+            trace_multifd_recv(p->id, p->pages->seq, packet->used);
+            ret = qio_channel_readv_all(p->c, p->pages->iov,
+                                        packet->used, &local_err);
             if (ret != 0) {
                 terminate_multifd_recv_threads(local_err);
                 return NULL;
-- 
2.14.3


Re: [Qemu-devel] [PATCH v10 12/14] migration: Sent the page list over the normal thread
Posted by Dr. David Alan Gilbert 7 years, 9 months ago
* Juan Quintela (quintela@redhat.com) wrote:
> Signed-off-by: Juan Quintela <quintela@redhat.com>

Hmm not sure what this one is doing;  is this permanent?
Is there a guarantee already that all the pages in one chunk sent over
multifd are from the same RAMBlock?
(and there's a printf down there)

Dave

> ---
>  migration/ram.c | 69 +++++++++++++++++++++++++++++++++++++++++++++++++--------
>  1 file changed, 60 insertions(+), 9 deletions(-)
> 
> diff --git a/migration/ram.c b/migration/ram.c
> index b1ad7b2730..f636c7da0a 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -411,6 +411,19 @@ static void compress_threads_save_setup(void)
>  /* used to continue on the same multifd group */
>  #define MULTIFD_CONTINUE UINT16_MAX
>  
> +#define MULTIFD_MAGIC 0x112233d
> +#define MULTIFD_VERSION 1
> +
> +typedef struct {
> +    uint32_t magic;
> +    uint32_t version;
> +    uint32_t size;
> +    uint32_t used;
> +    uint32_t seq;
> +    char ramblock[256];
> +    ram_addr_t offset[];
> +} __attribute__((packed)) MultiFDPacket_t;
> +
>  typedef struct {
>      /* number of used pages */
>      uint32_t used;
> @@ -420,6 +433,8 @@ typedef struct {
>      uint32_t seq;
>      struct iovec *iov;
>      RAMBlock *block;
> +    uint32_t packet_len;
> +    MultiFDPacket_t *packet;
>  } multifd_pages_t;
>  
>  struct MultiFDSendParams {
> @@ -456,6 +471,8 @@ static void multifd_pages_init(multifd_pages_t **ppages, size_t size)
>  
>      pages->allocated = size;
>      pages->iov = g_new0(struct iovec, size);
> +    pages->packet_len = sizeof(MultiFDPacket_t) + sizeof(ram_addr_t) * size;
> +    pages->packet = g_malloc0(pages->packet_len);
>      *ppages = pages;
>  }
>  
> @@ -467,6 +484,9 @@ static void multifd_pages_clear(multifd_pages_t *pages)
>      pages->block = NULL;
>      g_free(pages->iov);
>      pages->iov = NULL;
> +    pages->packet_len = 0;
> +    g_free(pages->packet);
> +    pages->packet = NULL;
>      g_free(pages);
>  }
>  
> @@ -553,16 +573,27 @@ static void *multifd_send_thread(void *opaque)
>              break;
>          }
>          if (p->pages->used) {
> +            MultiFDPacket_t *packet = p->pages->packet;
>              Error *local_err = NULL;
>              size_t ret;
> -            uint32_t used;
>  
> -            used = p->pages->used;
> +            packet->used = p->pages->used;
>              p->pages->used = 0;
>              qemu_mutex_unlock(&p->mutex);
> -
> -            trace_multifd_send(p->id, p->pages->seq, used);
> -            ret = qio_channel_writev_all(p->c, p->pages->iov, used, &local_err);
> +            packet->magic = MULTIFD_MAGIC;
> +            packet->version = MULTIFD_VERSION;
> +            strncpy(packet->ramblock, p->pages->block->idstr, 256);
> +            packet->size = migrate_multifd_page_count();
> +            packet->seq = p->pages->seq;
> +            ret = qio_channel_write_all(p->c, (void *)packet,
> +                                        p->pages->packet_len, &local_err);
> +            if (ret != 0) {
> +                terminate_multifd_send_threads(local_err);
> +                return NULL;
> +            }
> +            trace_multifd_send(p->id, p->pages->seq, packet->used);
> +            ret = qio_channel_writev_all(p->c, p->pages->iov,
> +                                         packet->used, &local_err);
>              if (ret != 0) {
>                  terminate_multifd_send_threads(local_err);
>                  return NULL;
> @@ -645,6 +676,7 @@ static uint16_t multifd_send_page(RAMBlock *block, ram_addr_t offset,
>          pages->block = block;
>      }
>  
> +    pages->packet->offset[pages->used] = offset;
>      pages->iov[pages->used].iov_base = block->host + offset;
>      pages->iov[pages->used].iov_len = TARGET_PAGE_SIZE;
>      pages->used++;
> @@ -776,16 +808,35 @@ static void *multifd_recv_thread(void *opaque)
>              break;
>          }
>          if (p->pages->used) {
> +            MultiFDPacket_t *packet = p->pages->packet;
> +            RAMBlock *block;
>              Error *local_err = NULL;
>              size_t ret;
> -            uint32_t used;
> +            int i;
>  
> -            used = p->pages->used;
>              p->pages->used = 0;
>              qemu_mutex_unlock(&p->mutex);
>  
> -            trace_multifd_recv(p->id, p->pages->seq, used);
> -            ret = qio_channel_readv_all(p->c, p->pages->iov, used, &local_err);
> +            ret = qio_channel_read_all(p->c, (void *)packet,
> +                                       p->pages->packet_len, &local_err);
> +            if (ret != 0) {
> +                terminate_multifd_recv_threads(local_err);
> +                return NULL;
> +            }
> +            block = qemu_ram_block_by_name(packet->ramblock);
> +            p->pages->seq = packet->seq;
> +            for (i = 0; i < packet->used; i++) {
> +                if (block->host + packet->offset[i]
> +                    != p->pages->iov[i].iov_base) {
> +                    printf("page offset %d packet %p pages %p\n", i,
> +                           block->host + packet->offset[i],
> +                           p->pages->iov[i].iov_base);
> +                    break;
> +                }
> +            }
> +            trace_multifd_recv(p->id, p->pages->seq, packet->used);
> +            ret = qio_channel_readv_all(p->c, p->pages->iov,
> +                                        packet->used, &local_err);
>              if (ret != 0) {
>                  terminate_multifd_recv_threads(local_err);
>                  return NULL;
> -- 
> 2.14.3
> 
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK