[PATCH v7 09/12] migration/multifd: Enable DSA offloading in multifd sender path.

Yichen Wang posted 12 patches 1 year, 2 months ago
There is a newer version of this series
[PATCH v7 09/12] migration/multifd: Enable DSA offloading in multifd sender path.
Posted by Yichen Wang 1 year, 2 months ago
From: Hao Xiang <hao.xiang@linux.dev>

Multifd sender path gets an array of pages queued by the migration
thread. It performs zero page checking on every page in the array.
The pages are classfied as either a zero page or a normal page. This
change uses Intel DSA to offload the zero page checking from CPU to
the DSA accelerator. The sender thread submits a batch of pages to DSA
hardware and waits for the DSA completion thread to signal for work
completion.

Signed-off-by: Hao Xiang <hao.xiang@linux.dev>
Signed-off-by: Yichen Wang <yichen.wang@bytedance.com>
---
 migration/multifd-zero-page.c | 129 ++++++++++++++++++++++++++++++----
 migration/multifd.c           |  29 +++++++-
 migration/multifd.h           |   5 ++
 3 files changed, 147 insertions(+), 16 deletions(-)

diff --git a/migration/multifd-zero-page.c b/migration/multifd-zero-page.c
index f1e988a959..639aed9f6b 100644
--- a/migration/multifd-zero-page.c
+++ b/migration/multifd-zero-page.c
@@ -21,7 +21,9 @@
 
 static bool multifd_zero_page_enabled(void)
 {
-    return migrate_zero_page_detection() == ZERO_PAGE_DETECTION_MULTIFD;
+    ZeroPageDetection curMethod = migrate_zero_page_detection();
+    return (curMethod == ZERO_PAGE_DETECTION_MULTIFD ||
+            curMethod == ZERO_PAGE_DETECTION_DSA_ACCEL);
 }
 
 static void swap_page_offset(ram_addr_t *pages_offset, int a, int b)
@@ -37,26 +39,49 @@ static void swap_page_offset(ram_addr_t *pages_offset, int a, int b)
     pages_offset[b] = temp;
 }
 
+#ifdef CONFIG_DSA_OPT
+
+static void swap_result(bool *results, int a, int b)
+{
+    bool temp;
+
+    if (a == b) {
+        return;
+    }
+
+    temp = results[a];
+    results[a] = results[b];
+    results[b] = temp;
+}
+
 /**
- * multifd_send_zero_page_detect: Perform zero page detection on all pages.
+ * zero_page_detect_dsa: Perform zero page detection using
+ * Intel Data Streaming Accelerator (DSA).
  *
- * Sorts normal pages before zero pages in p->pages->offset and updates
- * p->pages->normal_num.
+ * Sorts normal pages before zero pages in pages->offset and updates
+ * pages->normal_num.
  *
  * @param p A pointer to the send params.
  */
-void multifd_send_zero_page_detect(MultiFDSendParams *p)
+static void zero_page_detect_dsa(MultiFDSendParams *p)
 {
     MultiFDPages_t *pages = &p->data->u.ram;
     RAMBlock *rb = pages->block;
-    int i = 0;
-    int j = pages->num - 1;
+    bool *results = p->dsa_batch_task->results;
 
-    if (!multifd_zero_page_enabled()) {
-        pages->normal_num = pages->num;
-        goto out;
+    for (int i = 0; i < pages->num; i++) {
+        p->dsa_batch_task->addr[i] =
+            (ram_addr_t)(rb->host + pages->offset[i]);
     }
 
+    buffer_is_zero_dsa_batch_sync(p->dsa_batch_task,
+                                  (const void **)p->dsa_batch_task->addr,
+                                  pages->num,
+                                  multifd_ram_page_size());
+
+    int i = 0;
+    int j = pages->num - 1;
+
     /*
      * Sort the page offset array by moving all normal pages to
      * the left and all zero pages to the right of the array.
@@ -64,23 +89,39 @@ void multifd_send_zero_page_detect(MultiFDSendParams *p)
     while (i <= j) {
         uint64_t offset = pages->offset[i];
 
-        if (!buffer_is_zero(rb->host + offset, multifd_ram_page_size())) {
+        if (!results[i]) {
             i++;
             continue;
         }
 
+        swap_result(results, i, j);
         swap_page_offset(pages->offset, i, j);
         ram_release_page(rb->idstr, offset);
         j--;
     }
 
     pages->normal_num = i;
+}
 
-out:
-    stat64_add(&mig_stats.normal_pages, pages->normal_num);
-    stat64_add(&mig_stats.zero_pages, pages->num - pages->normal_num);
+void multifd_dsa_cleanup(void)
+{
+    qemu_dsa_cleanup();
+}
+
+#else
+
+static void zero_page_detect_dsa(MultiFDSendParams *p)
+{
+    g_assert_not_reached();
+}
+
+void multifd_dsa_cleanup(void)
+{
+    return ;
 }
 
+#endif
+
 void multifd_recv_zero_page_process(MultiFDRecvParams *p)
 {
     for (int i = 0; i < p->zero_num; i++) {
@@ -92,3 +133,63 @@ void multifd_recv_zero_page_process(MultiFDRecvParams *p)
         }
     }
 }
+
+/**
+ * zero_page_detect_cpu: Perform zero page detection using CPU.
+ *
+ * Sorts normal pages before zero pages in p->pages->offset and updates
+ * p->pages->normal_num.
+ *
+ * @param p A pointer to the send params.
+ */
+static void zero_page_detect_cpu(MultiFDSendParams *p)
+{
+    MultiFDPages_t *pages = &p->data->u.ram;
+    RAMBlock *rb = pages->block;
+    int i = 0;
+    int j = pages->num - 1;
+
+    /*
+     * Sort the page offset array by moving all normal pages to
+     * the left and all zero pages to the right of the array.
+     */
+    while (i <= j) {
+        uint64_t offset = pages->offset[i];
+
+        if (!buffer_is_zero(rb->host + offset, multifd_ram_page_size())) {
+            i++;
+            continue;
+        }
+
+        swap_page_offset(pages->offset, i, j);
+        ram_release_page(rb->idstr, offset);
+        j--;
+    }
+
+    pages->normal_num = i;
+}
+
+/**
+ * multifd_send_zero_page_detect: Perform zero page detection on all pages.
+ *
+ * @param p A pointer to the send params.
+ */
+void multifd_send_zero_page_detect(MultiFDSendParams *p)
+{
+    MultiFDPages_t *pages = &p->data->u.ram;
+
+    if (!multifd_zero_page_enabled()) {
+        pages->normal_num = pages->num;
+        goto out;
+    }
+
+    if (qemu_dsa_is_running()) {
+        zero_page_detect_dsa(p);
+    } else {
+        zero_page_detect_cpu(p);
+    }
+
+out:
+    stat64_add(&mig_stats.normal_pages, pages->normal_num);
+    stat64_add(&mig_stats.zero_pages, pages->num - pages->normal_num);
+}
diff --git a/migration/multifd.c b/migration/multifd.c
index 4374e14a96..689acceff2 100644
--- a/migration/multifd.c
+++ b/migration/multifd.c
@@ -13,6 +13,7 @@
 #include "qemu/osdep.h"
 #include "qemu/cutils.h"
 #include "qemu/rcu.h"
+#include "qemu/dsa.h"
 #include "exec/target_page.h"
 #include "sysemu/sysemu.h"
 #include "exec/ramblock.h"
@@ -462,6 +463,8 @@ static bool multifd_send_cleanup_channel(MultiFDSendParams *p, Error **errp)
     p->name = NULL;
     g_free(p->data);
     p->data = NULL;
+    buffer_zero_batch_task_destroy(p->dsa_batch_task);
+    p->dsa_batch_task = NULL;
     p->packet_len = 0;
     g_free(p->packet);
     p->packet = NULL;
@@ -493,6 +496,8 @@ void multifd_send_shutdown(void)
 
     multifd_send_terminate_threads();
 
+    multifd_dsa_cleanup();
+
     for (i = 0; i < migrate_multifd_channels(); i++) {
         MultiFDSendParams *p = &multifd_send_state->params[i];
         Error *local_err = NULL;
@@ -814,11 +819,31 @@ bool multifd_send_setup(void)
     uint32_t page_count = multifd_ram_page_count();
     bool use_packets = multifd_use_packets();
     uint8_t i;
+    Error *local_err = NULL;
 
     if (!migrate_multifd()) {
         return true;
     }
 
+    if (s &&
+        s->parameters.zero_page_detection == ZERO_PAGE_DETECTION_DSA_ACCEL) {
+        // Populate the dsa device path from accel-path
+        const strList *accel_path = migrate_accel_path();
+        g_autofree strList *dsa_parameter = g_malloc0(sizeof(strList));
+        strList **tail = &dsa_parameter;
+        while (accel_path) {
+            if (strncmp(accel_path->value, "dsa:", 4) == 0) {
+                QAPI_LIST_APPEND(tail, &accel_path->value[4]);
+            }
+            accel_path = accel_path->next;
+        }
+        if (qemu_dsa_init(dsa_parameter, &local_err)) {
+            ret = -1;
+        } else {
+            qemu_dsa_start();
+        }
+    }
+
     thread_count = migrate_multifd_channels();
     multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
     multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
@@ -829,12 +854,12 @@ bool multifd_send_setup(void)
 
     for (i = 0; i < thread_count; i++) {
         MultiFDSendParams *p = &multifd_send_state->params[i];
-        Error *local_err = NULL;
 
         qemu_sem_init(&p->sem, 0);
         qemu_sem_init(&p->sem_sync, 0);
         p->id = i;
         p->data = multifd_send_data_alloc();
+        p->dsa_batch_task = buffer_zero_batch_task_init(page_count);
 
         if (use_packets) {
             p->packet_len = sizeof(MultiFDPacket_t)
@@ -865,7 +890,6 @@ bool multifd_send_setup(void)
 
     for (i = 0; i < thread_count; i++) {
         MultiFDSendParams *p = &multifd_send_state->params[i];
-        Error *local_err = NULL;
 
         ret = multifd_send_state->ops->send_setup(p, &local_err);
         if (ret) {
@@ -1047,6 +1071,7 @@ void multifd_recv_cleanup(void)
             qemu_thread_join(&p->thread);
         }
     }
+    multifd_dsa_cleanup();
     for (i = 0; i < migrate_multifd_channels(); i++) {
         multifd_recv_cleanup_channel(&multifd_recv_state->params[i]);
     }
diff --git a/migration/multifd.h b/migration/multifd.h
index 50d58c0c9c..e293ddbc1d 100644
--- a/migration/multifd.h
+++ b/migration/multifd.h
@@ -15,6 +15,7 @@
 
 #include "exec/target_page.h"
 #include "ram.h"
+#include "qemu/dsa.h"
 
 typedef struct MultiFDRecvData MultiFDRecvData;
 typedef struct MultiFDSendData MultiFDSendData;
@@ -155,6 +156,9 @@ typedef struct {
     bool pending_sync;
     MultiFDSendData *data;
 
+    /* Zero page checking batch task */
+    QemuDsaBatchTask *dsa_batch_task;
+
     /* thread local variables. No locking required */
 
     /* pointer to the packet */
@@ -313,6 +317,7 @@ void multifd_send_fill_packet(MultiFDSendParams *p);
 bool multifd_send_prepare_common(MultiFDSendParams *p);
 void multifd_send_zero_page_detect(MultiFDSendParams *p);
 void multifd_recv_zero_page_process(MultiFDRecvParams *p);
+void multifd_dsa_cleanup(void);
 
 static inline void multifd_send_prepare_header(MultiFDSendParams *p)
 {
-- 
Yichen Wang
Re: [PATCH v7 09/12] migration/multifd: Enable DSA offloading in multifd sender path.
Posted by Fabiano Rosas 1 year, 2 months ago
Yichen Wang <yichen.wang@bytedance.com> writes:

> From: Hao Xiang <hao.xiang@linux.dev>
>
> Multifd sender path gets an array of pages queued by the migration
> thread. It performs zero page checking on every page in the array.
> The pages are classfied as either a zero page or a normal page. This
> change uses Intel DSA to offload the zero page checking from CPU to
> the DSA accelerator. The sender thread submits a batch of pages to DSA
> hardware and waits for the DSA completion thread to signal for work
> completion.
>
> Signed-off-by: Hao Xiang <hao.xiang@linux.dev>
> Signed-off-by: Yichen Wang <yichen.wang@bytedance.com>
> ---
>  migration/multifd-zero-page.c | 129 ++++++++++++++++++++++++++++++----
>  migration/multifd.c           |  29 +++++++-
>  migration/multifd.h           |   5 ++
>  3 files changed, 147 insertions(+), 16 deletions(-)
>
> diff --git a/migration/multifd-zero-page.c b/migration/multifd-zero-page.c
> index f1e988a959..639aed9f6b 100644
> --- a/migration/multifd-zero-page.c
> +++ b/migration/multifd-zero-page.c
> @@ -21,7 +21,9 @@
>  
>  static bool multifd_zero_page_enabled(void)
>  {
> -    return migrate_zero_page_detection() == ZERO_PAGE_DETECTION_MULTIFD;
> +    ZeroPageDetection curMethod = migrate_zero_page_detection();
> +    return (curMethod == ZERO_PAGE_DETECTION_MULTIFD ||
> +            curMethod == ZERO_PAGE_DETECTION_DSA_ACCEL);
>  }
>  
>  static void swap_page_offset(ram_addr_t *pages_offset, int a, int b)
> @@ -37,26 +39,49 @@ static void swap_page_offset(ram_addr_t *pages_offset, int a, int b)
>      pages_offset[b] = temp;
>  }
>  
> +#ifdef CONFIG_DSA_OPT
> +
> +static void swap_result(bool *results, int a, int b)
> +{
> +    bool temp;
> +
> +    if (a == b) {
> +        return;
> +    }
> +
> +    temp = results[a];
> +    results[a] = results[b];
> +    results[b] = temp;
> +}
> +
>  /**
> - * multifd_send_zero_page_detect: Perform zero page detection on all pages.
> + * zero_page_detect_dsa: Perform zero page detection using
> + * Intel Data Streaming Accelerator (DSA).
>   *
> - * Sorts normal pages before zero pages in p->pages->offset and updates
> - * p->pages->normal_num.
> + * Sorts normal pages before zero pages in pages->offset and updates
> + * pages->normal_num.
>   *
>   * @param p A pointer to the send params.
>   */
> -void multifd_send_zero_page_detect(MultiFDSendParams *p)
> +static void zero_page_detect_dsa(MultiFDSendParams *p)
>  {
>      MultiFDPages_t *pages = &p->data->u.ram;
>      RAMBlock *rb = pages->block;
> -    int i = 0;
> -    int j = pages->num - 1;
> +    bool *results = p->dsa_batch_task->results;
>  
> -    if (!multifd_zero_page_enabled()) {
> -        pages->normal_num = pages->num;
> -        goto out;
> +    for (int i = 0; i < pages->num; i++) {
> +        p->dsa_batch_task->addr[i] =
> +            (ram_addr_t)(rb->host + pages->offset[i]);
>      }
>  
> +    buffer_is_zero_dsa_batch_sync(p->dsa_batch_task,
> +                                  (const void **)p->dsa_batch_task->addr,
> +                                  pages->num,
> +                                  multifd_ram_page_size());
> +
> +    int i = 0;
> +    int j = pages->num - 1;
> +
>      /*
>       * Sort the page offset array by moving all normal pages to
>       * the left and all zero pages to the right of the array.
> @@ -64,23 +89,39 @@ void multifd_send_zero_page_detect(MultiFDSendParams *p)
>      while (i <= j) {
>          uint64_t offset = pages->offset[i];
>  
> -        if (!buffer_is_zero(rb->host + offset, multifd_ram_page_size())) {
> +        if (!results[i]) {
>              i++;
>              continue;
>          }
>  
> +        swap_result(results, i, j);
>          swap_page_offset(pages->offset, i, j);
>          ram_release_page(rb->idstr, offset);
>          j--;
>      }
>  
>      pages->normal_num = i;
> +}
>  
> -out:
> -    stat64_add(&mig_stats.normal_pages, pages->normal_num);
> -    stat64_add(&mig_stats.zero_pages, pages->num - pages->normal_num);
> +void multifd_dsa_cleanup(void)
> +{
> +    qemu_dsa_cleanup();
> +}
> +
> +#else
> +
> +static void zero_page_detect_dsa(MultiFDSendParams *p)
> +{
> +    g_assert_not_reached();
> +}
> +
> +void multifd_dsa_cleanup(void)
> +{
> +    return ;
>  }
>  
> +#endif
> +
>  void multifd_recv_zero_page_process(MultiFDRecvParams *p)
>  {
>      for (int i = 0; i < p->zero_num; i++) {
> @@ -92,3 +133,63 @@ void multifd_recv_zero_page_process(MultiFDRecvParams *p)
>          }
>      }
>  }
> +
> +/**
> + * zero_page_detect_cpu: Perform zero page detection using CPU.
> + *
> + * Sorts normal pages before zero pages in p->pages->offset and updates
> + * p->pages->normal_num.
> + *
> + * @param p A pointer to the send params.
> + */
> +static void zero_page_detect_cpu(MultiFDSendParams *p)
> +{
> +    MultiFDPages_t *pages = &p->data->u.ram;
> +    RAMBlock *rb = pages->block;
> +    int i = 0;
> +    int j = pages->num - 1;
> +
> +    /*
> +     * Sort the page offset array by moving all normal pages to
> +     * the left and all zero pages to the right of the array.
> +     */
> +    while (i <= j) {
> +        uint64_t offset = pages->offset[i];
> +
> +        if (!buffer_is_zero(rb->host + offset, multifd_ram_page_size())) {
> +            i++;
> +            continue;
> +        }
> +
> +        swap_page_offset(pages->offset, i, j);
> +        ram_release_page(rb->idstr, offset);
> +        j--;
> +    }
> +
> +    pages->normal_num = i;
> +}
> +
> +/**
> + * multifd_send_zero_page_detect: Perform zero page detection on all pages.
> + *
> + * @param p A pointer to the send params.
> + */
> +void multifd_send_zero_page_detect(MultiFDSendParams *p)
> +{
> +    MultiFDPages_t *pages = &p->data->u.ram;
> +
> +    if (!multifd_zero_page_enabled()) {
> +        pages->normal_num = pages->num;
> +        goto out;
> +    }
> +
> +    if (qemu_dsa_is_running()) {
> +        zero_page_detect_dsa(p);
> +    } else {
> +        zero_page_detect_cpu(p);
> +    }
> +
> +out:
> +    stat64_add(&mig_stats.normal_pages, pages->normal_num);
> +    stat64_add(&mig_stats.zero_pages, pages->num - pages->normal_num);
> +}
> diff --git a/migration/multifd.c b/migration/multifd.c
> index 4374e14a96..689acceff2 100644
> --- a/migration/multifd.c
> +++ b/migration/multifd.c
> @@ -13,6 +13,7 @@
>  #include "qemu/osdep.h"
>  #include "qemu/cutils.h"
>  #include "qemu/rcu.h"
> +#include "qemu/dsa.h"
>  #include "exec/target_page.h"
>  #include "sysemu/sysemu.h"
>  #include "exec/ramblock.h"
> @@ -462,6 +463,8 @@ static bool multifd_send_cleanup_channel(MultiFDSendParams *p, Error **errp)
>      p->name = NULL;
>      g_free(p->data);
>      p->data = NULL;
> +    buffer_zero_batch_task_destroy(p->dsa_batch_task);
> +    p->dsa_batch_task = NULL;
>      p->packet_len = 0;
>      g_free(p->packet);
>      p->packet = NULL;
> @@ -493,6 +496,8 @@ void multifd_send_shutdown(void)
>  
>      multifd_send_terminate_threads();
>  
> +    multifd_dsa_cleanup();
> +
>      for (i = 0; i < migrate_multifd_channels(); i++) {
>          MultiFDSendParams *p = &multifd_send_state->params[i];
>          Error *local_err = NULL;
> @@ -814,11 +819,31 @@ bool multifd_send_setup(void)
>      uint32_t page_count = multifd_ram_page_count();
>      bool use_packets = multifd_use_packets();
>      uint8_t i;
> +    Error *local_err = NULL;
>  
>      if (!migrate_multifd()) {
>          return true;
>      }
>  
> +    if (s &&
> +        s->parameters.zero_page_detection == ZERO_PAGE_DETECTION_DSA_ACCEL) {
> +        // Populate the dsa device path from accel-path

scripts/checkpatch.pl would have rejected this.

> +        const strList *accel_path = migrate_accel_path();
> +        g_autofree strList *dsa_parameter = g_malloc0(sizeof(strList));
> +        strList **tail = &dsa_parameter;
> +        while (accel_path) {
> +            if (strncmp(accel_path->value, "dsa:", 4) == 0) {
> +                QAPI_LIST_APPEND(tail, &accel_path->value[4]);
> +            }
> +            accel_path = accel_path->next;
> +        }

The parsing of the parameter should be in options.c. In fact, Peter
suggested in v4 to make all of this a multifd_dsa_send_setup() or
multifd_dsa_init(), I think that's a good idea.

> +        if (qemu_dsa_init(dsa_parameter, &local_err)) {
> +            ret = -1;

migrate_set_error(s, local_err);
goto err;

> +        } else {
> +            qemu_dsa_start();
> +        }
> +    }
> +
>      thread_count = migrate_multifd_channels();
>      multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
>      multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
> @@ -829,12 +854,12 @@ bool multifd_send_setup(void)
>  
>      for (i = 0; i < thread_count; i++) {
>          MultiFDSendParams *p = &multifd_send_state->params[i];
> -        Error *local_err = NULL;
>  
>          qemu_sem_init(&p->sem, 0);
>          qemu_sem_init(&p->sem_sync, 0);
>          p->id = i;
>          p->data = multifd_send_data_alloc();
> +        p->dsa_batch_task = buffer_zero_batch_task_init(page_count);
>  
>          if (use_packets) {
>              p->packet_len = sizeof(MultiFDPacket_t)
> @@ -865,7 +890,6 @@ bool multifd_send_setup(void)
>  
>      for (i = 0; i < thread_count; i++) {
>          MultiFDSendParams *p = &multifd_send_state->params[i];
> -        Error *local_err = NULL;
>  
>          ret = multifd_send_state->ops->send_setup(p, &local_err);
>          if (ret) {
> @@ -1047,6 +1071,7 @@ void multifd_recv_cleanup(void)
>              qemu_thread_join(&p->thread);
>          }
>      }
> +    multifd_dsa_cleanup();
>      for (i = 0; i < migrate_multifd_channels(); i++) {
>          multifd_recv_cleanup_channel(&multifd_recv_state->params[i]);
>      }
> diff --git a/migration/multifd.h b/migration/multifd.h
> index 50d58c0c9c..e293ddbc1d 100644
> --- a/migration/multifd.h
> +++ b/migration/multifd.h
> @@ -15,6 +15,7 @@
>  
>  #include "exec/target_page.h"
>  #include "ram.h"
> +#include "qemu/dsa.h"
>  
>  typedef struct MultiFDRecvData MultiFDRecvData;
>  typedef struct MultiFDSendData MultiFDSendData;
> @@ -155,6 +156,9 @@ typedef struct {
>      bool pending_sync;
>      MultiFDSendData *data;
>  
> +    /* Zero page checking batch task */
> +    QemuDsaBatchTask *dsa_batch_task;
> +
>      /* thread local variables. No locking required */
>  
>      /* pointer to the packet */
> @@ -313,6 +317,7 @@ void multifd_send_fill_packet(MultiFDSendParams *p);
>  bool multifd_send_prepare_common(MultiFDSendParams *p);
>  void multifd_send_zero_page_detect(MultiFDSendParams *p);
>  void multifd_recv_zero_page_process(MultiFDRecvParams *p);
> +void multifd_dsa_cleanup(void);
>  
>  static inline void multifd_send_prepare_header(MultiFDSendParams *p)
>  {
Re: [External] Re: [PATCH v7 09/12] migration/multifd: Enable DSA offloading in multifd sender path.
Posted by Yichen Wang 1 year, 2 months ago
On Thu, Nov 21, 2024 at 12:52 PM Fabiano Rosas <farosas@suse.de> wrote:
>
> Yichen Wang <yichen.wang@bytedance.com> writes:
>
> > From: Hao Xiang <hao.xiang@linux.dev>
> >
> > Multifd sender path gets an array of pages queued by the migration
> > thread. It performs zero page checking on every page in the array.
> > The pages are classfied as either a zero page or a normal page. This
> > change uses Intel DSA to offload the zero page checking from CPU to
> > the DSA accelerator. The sender thread submits a batch of pages to DSA
> > hardware and waits for the DSA completion thread to signal for work
> > completion.
> >
> > Signed-off-by: Hao Xiang <hao.xiang@linux.dev>
> > Signed-off-by: Yichen Wang <yichen.wang@bytedance.com>
> > ---
> >  migration/multifd-zero-page.c | 129 ++++++++++++++++++++++++++++++----
> >  migration/multifd.c           |  29 +++++++-
> >  migration/multifd.h           |   5 ++
> >  3 files changed, 147 insertions(+), 16 deletions(-)
> >
> > diff --git a/migration/multifd-zero-page.c b/migration/multifd-zero-page.c
> > index f1e988a959..639aed9f6b 100644
> > --- a/migration/multifd-zero-page.c
> > +++ b/migration/multifd-zero-page.c
> > @@ -21,7 +21,9 @@
> >
> >  static bool multifd_zero_page_enabled(void)
> >  {
> > -    return migrate_zero_page_detection() == ZERO_PAGE_DETECTION_MULTIFD;
> > +    ZeroPageDetection curMethod = migrate_zero_page_detection();
> > +    return (curMethod == ZERO_PAGE_DETECTION_MULTIFD ||
> > +            curMethod == ZERO_PAGE_DETECTION_DSA_ACCEL);
> >  }
> >
> >  static void swap_page_offset(ram_addr_t *pages_offset, int a, int b)
> > @@ -37,26 +39,49 @@ static void swap_page_offset(ram_addr_t *pages_offset, int a, int b)
> >      pages_offset[b] = temp;
> >  }
> >
> > +#ifdef CONFIG_DSA_OPT
> > +
> > +static void swap_result(bool *results, int a, int b)
> > +{
> > +    bool temp;
> > +
> > +    if (a == b) {
> > +        return;
> > +    }
> > +
> > +    temp = results[a];
> > +    results[a] = results[b];
> > +    results[b] = temp;
> > +}
> > +
> >  /**
> > - * multifd_send_zero_page_detect: Perform zero page detection on all pages.
> > + * zero_page_detect_dsa: Perform zero page detection using
> > + * Intel Data Streaming Accelerator (DSA).
> >   *
> > - * Sorts normal pages before zero pages in p->pages->offset and updates
> > - * p->pages->normal_num.
> > + * Sorts normal pages before zero pages in pages->offset and updates
> > + * pages->normal_num.
> >   *
> >   * @param p A pointer to the send params.
> >   */
> > -void multifd_send_zero_page_detect(MultiFDSendParams *p)
> > +static void zero_page_detect_dsa(MultiFDSendParams *p)
> >  {
> >      MultiFDPages_t *pages = &p->data->u.ram;
> >      RAMBlock *rb = pages->block;
> > -    int i = 0;
> > -    int j = pages->num - 1;
> > +    bool *results = p->dsa_batch_task->results;
> >
> > -    if (!multifd_zero_page_enabled()) {
> > -        pages->normal_num = pages->num;
> > -        goto out;
> > +    for (int i = 0; i < pages->num; i++) {
> > +        p->dsa_batch_task->addr[i] =
> > +            (ram_addr_t)(rb->host + pages->offset[i]);
> >      }
> >
> > +    buffer_is_zero_dsa_batch_sync(p->dsa_batch_task,
> > +                                  (const void **)p->dsa_batch_task->addr,
> > +                                  pages->num,
> > +                                  multifd_ram_page_size());
> > +
> > +    int i = 0;
> > +    int j = pages->num - 1;
> > +
> >      /*
> >       * Sort the page offset array by moving all normal pages to
> >       * the left and all zero pages to the right of the array.
> > @@ -64,23 +89,39 @@ void multifd_send_zero_page_detect(MultiFDSendParams *p)
> >      while (i <= j) {
> >          uint64_t offset = pages->offset[i];
> >
> > -        if (!buffer_is_zero(rb->host + offset, multifd_ram_page_size())) {
> > +        if (!results[i]) {
> >              i++;
> >              continue;
> >          }
> >
> > +        swap_result(results, i, j);
> >          swap_page_offset(pages->offset, i, j);
> >          ram_release_page(rb->idstr, offset);
> >          j--;
> >      }
> >
> >      pages->normal_num = i;
> > +}
> >
> > -out:
> > -    stat64_add(&mig_stats.normal_pages, pages->normal_num);
> > -    stat64_add(&mig_stats.zero_pages, pages->num - pages->normal_num);
> > +void multifd_dsa_cleanup(void)
> > +{
> > +    qemu_dsa_cleanup();
> > +}
> > +
> > +#else
> > +
> > +static void zero_page_detect_dsa(MultiFDSendParams *p)
> > +{
> > +    g_assert_not_reached();
> > +}
> > +
> > +void multifd_dsa_cleanup(void)
> > +{
> > +    return ;
> >  }
> >
> > +#endif
> > +
> >  void multifd_recv_zero_page_process(MultiFDRecvParams *p)
> >  {
> >      for (int i = 0; i < p->zero_num; i++) {
> > @@ -92,3 +133,63 @@ void multifd_recv_zero_page_process(MultiFDRecvParams *p)
> >          }
> >      }
> >  }
> > +
> > +/**
> > + * zero_page_detect_cpu: Perform zero page detection using CPU.
> > + *
> > + * Sorts normal pages before zero pages in p->pages->offset and updates
> > + * p->pages->normal_num.
> > + *
> > + * @param p A pointer to the send params.
> > + */
> > +static void zero_page_detect_cpu(MultiFDSendParams *p)
> > +{
> > +    MultiFDPages_t *pages = &p->data->u.ram;
> > +    RAMBlock *rb = pages->block;
> > +    int i = 0;
> > +    int j = pages->num - 1;
> > +
> > +    /*
> > +     * Sort the page offset array by moving all normal pages to
> > +     * the left and all zero pages to the right of the array.
> > +     */
> > +    while (i <= j) {
> > +        uint64_t offset = pages->offset[i];
> > +
> > +        if (!buffer_is_zero(rb->host + offset, multifd_ram_page_size())) {
> > +            i++;
> > +            continue;
> > +        }
> > +
> > +        swap_page_offset(pages->offset, i, j);
> > +        ram_release_page(rb->idstr, offset);
> > +        j--;
> > +    }
> > +
> > +    pages->normal_num = i;
> > +}
> > +
> > +/**
> > + * multifd_send_zero_page_detect: Perform zero page detection on all pages.
> > + *
> > + * @param p A pointer to the send params.
> > + */
> > +void multifd_send_zero_page_detect(MultiFDSendParams *p)
> > +{
> > +    MultiFDPages_t *pages = &p->data->u.ram;
> > +
> > +    if (!multifd_zero_page_enabled()) {
> > +        pages->normal_num = pages->num;
> > +        goto out;
> > +    }
> > +
> > +    if (qemu_dsa_is_running()) {
> > +        zero_page_detect_dsa(p);
> > +    } else {
> > +        zero_page_detect_cpu(p);
> > +    }
> > +
> > +out:
> > +    stat64_add(&mig_stats.normal_pages, pages->normal_num);
> > +    stat64_add(&mig_stats.zero_pages, pages->num - pages->normal_num);
> > +}
> > diff --git a/migration/multifd.c b/migration/multifd.c
> > index 4374e14a96..689acceff2 100644
> > --- a/migration/multifd.c
> > +++ b/migration/multifd.c
> > @@ -13,6 +13,7 @@
> >  #include "qemu/osdep.h"
> >  #include "qemu/cutils.h"
> >  #include "qemu/rcu.h"
> > +#include "qemu/dsa.h"
> >  #include "exec/target_page.h"
> >  #include "sysemu/sysemu.h"
> >  #include "exec/ramblock.h"
> > @@ -462,6 +463,8 @@ static bool multifd_send_cleanup_channel(MultiFDSendParams *p, Error **errp)
> >      p->name = NULL;
> >      g_free(p->data);
> >      p->data = NULL;
> > +    buffer_zero_batch_task_destroy(p->dsa_batch_task);
> > +    p->dsa_batch_task = NULL;
> >      p->packet_len = 0;
> >      g_free(p->packet);
> >      p->packet = NULL;
> > @@ -493,6 +496,8 @@ void multifd_send_shutdown(void)
> >
> >      multifd_send_terminate_threads();
> >
> > +    multifd_dsa_cleanup();
> > +
> >      for (i = 0; i < migrate_multifd_channels(); i++) {
> >          MultiFDSendParams *p = &multifd_send_state->params[i];
> >          Error *local_err = NULL;
> > @@ -814,11 +819,31 @@ bool multifd_send_setup(void)
> >      uint32_t page_count = multifd_ram_page_count();
> >      bool use_packets = multifd_use_packets();
> >      uint8_t i;
> > +    Error *local_err = NULL;
> >
> >      if (!migrate_multifd()) {
> >          return true;
> >      }
> >
> > +    if (s &&
> > +        s->parameters.zero_page_detection == ZERO_PAGE_DETECTION_DSA_ACCEL) {
> > +        // Populate the dsa device path from accel-path
>
> scripts/checkpatch.pl would have rejected this.
>

Sorry. I will make sure to run checkpatch.pl, unit test (both
with/without DSA), before the send-email...

> > +        const strList *accel_path = migrate_accel_path();
> > +        g_autofree strList *dsa_parameter = g_malloc0(sizeof(strList));
> > +        strList **tail = &dsa_parameter;
> > +        while (accel_path) {
> > +            if (strncmp(accel_path->value, "dsa:", 4) == 0) {
> > +                QAPI_LIST_APPEND(tail, &accel_path->value[4]);
> > +            }
> > +            accel_path = accel_path->next;
> > +        }
>
> The parsing of the parameter should be in options.c. In fact, Peter
> suggested in v4 to make all of this a multifd_dsa_send_setup() or
> multifd_dsa_init(), I think that's a good idea.
>

Will fix it in the next version.

> > +        if (qemu_dsa_init(dsa_parameter, &local_err)) {
> > +            ret = -1;
>
> migrate_set_error(s, local_err);
> goto err;

Will fix it in the next version. But here we can't goto err, because
the cleanup() function will be called when setup() fails, and it has
assumptions that a certain data structure is in place. If we exit
earlier, the cleanup() function will complain and fail.

>
> > +        } else {
> > +            qemu_dsa_start();
> > +        }
> > +    }
> > +
> >      thread_count = migrate_multifd_channels();
> >      multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
> >      multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
> > @@ -829,12 +854,12 @@ bool multifd_send_setup(void)
> >
> >      for (i = 0; i < thread_count; i++) {
> >          MultiFDSendParams *p = &multifd_send_state->params[i];
> > -        Error *local_err = NULL;
> >
> >          qemu_sem_init(&p->sem, 0);
> >          qemu_sem_init(&p->sem_sync, 0);
> >          p->id = i;
> >          p->data = multifd_send_data_alloc();
> > +        p->dsa_batch_task = buffer_zero_batch_task_init(page_count);
> >
> >          if (use_packets) {
> >              p->packet_len = sizeof(MultiFDPacket_t)
> > @@ -865,7 +890,6 @@ bool multifd_send_setup(void)
> >
> >      for (i = 0; i < thread_count; i++) {
> >          MultiFDSendParams *p = &multifd_send_state->params[i];
> > -        Error *local_err = NULL;
> >
> >          ret = multifd_send_state->ops->send_setup(p, &local_err);
> >          if (ret) {
> > @@ -1047,6 +1071,7 @@ void multifd_recv_cleanup(void)
> >              qemu_thread_join(&p->thread);
> >          }
> >      }
> > +    multifd_dsa_cleanup();
> >      for (i = 0; i < migrate_multifd_channels(); i++) {
> >          multifd_recv_cleanup_channel(&multifd_recv_state->params[i]);
> >      }
> > diff --git a/migration/multifd.h b/migration/multifd.h
> > index 50d58c0c9c..e293ddbc1d 100644
> > --- a/migration/multifd.h
> > +++ b/migration/multifd.h
> > @@ -15,6 +15,7 @@
> >
> >  #include "exec/target_page.h"
> >  #include "ram.h"
> > +#include "qemu/dsa.h"
> >
> >  typedef struct MultiFDRecvData MultiFDRecvData;
> >  typedef struct MultiFDSendData MultiFDSendData;
> > @@ -155,6 +156,9 @@ typedef struct {
> >      bool pending_sync;
> >      MultiFDSendData *data;
> >
> > +    /* Zero page checking batch task */
> > +    QemuDsaBatchTask *dsa_batch_task;
> > +
> >      /* thread local variables. No locking required */
> >
> >      /* pointer to the packet */
> > @@ -313,6 +317,7 @@ void multifd_send_fill_packet(MultiFDSendParams *p);
> >  bool multifd_send_prepare_common(MultiFDSendParams *p);
> >  void multifd_send_zero_page_detect(MultiFDSendParams *p);
> >  void multifd_recv_zero_page_process(MultiFDRecvParams *p);
> > +void multifd_dsa_cleanup(void);
> >
> >  static inline void multifd_send_prepare_header(MultiFDSendParams *p)
> >  {
Re: [External] Re: [PATCH v7 09/12] migration/multifd: Enable DSA offloading in multifd sender path.
Posted by Fabiano Rosas 1 year, 2 months ago
Yichen Wang <yichen.wang@bytedance.com> writes:

> On Thu, Nov 21, 2024 at 12:52 PM Fabiano Rosas <farosas@suse.de> wrote:
>>
>> Yichen Wang <yichen.wang@bytedance.com> writes:
>>
>> > From: Hao Xiang <hao.xiang@linux.dev>
>> >
>> > Multifd sender path gets an array of pages queued by the migration
>> > thread. It performs zero page checking on every page in the array.
>> > The pages are classfied as either a zero page or a normal page. This
>> > change uses Intel DSA to offload the zero page checking from CPU to
>> > the DSA accelerator. The sender thread submits a batch of pages to DSA
>> > hardware and waits for the DSA completion thread to signal for work
>> > completion.
>> >
>> > Signed-off-by: Hao Xiang <hao.xiang@linux.dev>
>> > Signed-off-by: Yichen Wang <yichen.wang@bytedance.com>
>> > ---
>> >  migration/multifd-zero-page.c | 129 ++++++++++++++++++++++++++++++----
>> >  migration/multifd.c           |  29 +++++++-
>> >  migration/multifd.h           |   5 ++
>> >  3 files changed, 147 insertions(+), 16 deletions(-)
>> >
>> > diff --git a/migration/multifd-zero-page.c b/migration/multifd-zero-page.c
>> > index f1e988a959..639aed9f6b 100644
>> > --- a/migration/multifd-zero-page.c
>> > +++ b/migration/multifd-zero-page.c
>> > @@ -21,7 +21,9 @@
>> >
>> >  static bool multifd_zero_page_enabled(void)
>> >  {
>> > -    return migrate_zero_page_detection() == ZERO_PAGE_DETECTION_MULTIFD;
>> > +    ZeroPageDetection curMethod = migrate_zero_page_detection();
>> > +    return (curMethod == ZERO_PAGE_DETECTION_MULTIFD ||
>> > +            curMethod == ZERO_PAGE_DETECTION_DSA_ACCEL);
>> >  }
>> >
>> >  static void swap_page_offset(ram_addr_t *pages_offset, int a, int b)
>> > @@ -37,26 +39,49 @@ static void swap_page_offset(ram_addr_t *pages_offset, int a, int b)
>> >      pages_offset[b] = temp;
>> >  }
>> >
>> > +#ifdef CONFIG_DSA_OPT
>> > +
>> > +static void swap_result(bool *results, int a, int b)
>> > +{
>> > +    bool temp;
>> > +
>> > +    if (a == b) {
>> > +        return;
>> > +    }
>> > +
>> > +    temp = results[a];
>> > +    results[a] = results[b];
>> > +    results[b] = temp;
>> > +}
>> > +
>> >  /**
>> > - * multifd_send_zero_page_detect: Perform zero page detection on all pages.
>> > + * zero_page_detect_dsa: Perform zero page detection using
>> > + * Intel Data Streaming Accelerator (DSA).
>> >   *
>> > - * Sorts normal pages before zero pages in p->pages->offset and updates
>> > - * p->pages->normal_num.
>> > + * Sorts normal pages before zero pages in pages->offset and updates
>> > + * pages->normal_num.
>> >   *
>> >   * @param p A pointer to the send params.
>> >   */
>> > -void multifd_send_zero_page_detect(MultiFDSendParams *p)
>> > +static void zero_page_detect_dsa(MultiFDSendParams *p)
>> >  {
>> >      MultiFDPages_t *pages = &p->data->u.ram;
>> >      RAMBlock *rb = pages->block;
>> > -    int i = 0;
>> > -    int j = pages->num - 1;
>> > +    bool *results = p->dsa_batch_task->results;
>> >
>> > -    if (!multifd_zero_page_enabled()) {
>> > -        pages->normal_num = pages->num;
>> > -        goto out;
>> > +    for (int i = 0; i < pages->num; i++) {
>> > +        p->dsa_batch_task->addr[i] =
>> > +            (ram_addr_t)(rb->host + pages->offset[i]);
>> >      }
>> >
>> > +    buffer_is_zero_dsa_batch_sync(p->dsa_batch_task,
>> > +                                  (const void **)p->dsa_batch_task->addr,
>> > +                                  pages->num,
>> > +                                  multifd_ram_page_size());
>> > +
>> > +    int i = 0;
>> > +    int j = pages->num - 1;
>> > +
>> >      /*
>> >       * Sort the page offset array by moving all normal pages to
>> >       * the left and all zero pages to the right of the array.
>> > @@ -64,23 +89,39 @@ void multifd_send_zero_page_detect(MultiFDSendParams *p)
>> >      while (i <= j) {
>> >          uint64_t offset = pages->offset[i];
>> >
>> > -        if (!buffer_is_zero(rb->host + offset, multifd_ram_page_size())) {
>> > +        if (!results[i]) {
>> >              i++;
>> >              continue;
>> >          }
>> >
>> > +        swap_result(results, i, j);
>> >          swap_page_offset(pages->offset, i, j);
>> >          ram_release_page(rb->idstr, offset);
>> >          j--;
>> >      }
>> >
>> >      pages->normal_num = i;
>> > +}
>> >
>> > -out:
>> > -    stat64_add(&mig_stats.normal_pages, pages->normal_num);
>> > -    stat64_add(&mig_stats.zero_pages, pages->num - pages->normal_num);
>> > +void multifd_dsa_cleanup(void)
>> > +{
>> > +    qemu_dsa_cleanup();
>> > +}
>> > +
>> > +#else
>> > +
>> > +static void zero_page_detect_dsa(MultiFDSendParams *p)
>> > +{
>> > +    g_assert_not_reached();
>> > +}
>> > +
>> > +void multifd_dsa_cleanup(void)
>> > +{
>> > +    return ;
>> >  }
>> >
>> > +#endif
>> > +
>> >  void multifd_recv_zero_page_process(MultiFDRecvParams *p)
>> >  {
>> >      for (int i = 0; i < p->zero_num; i++) {
>> > @@ -92,3 +133,63 @@ void multifd_recv_zero_page_process(MultiFDRecvParams *p)
>> >          }
>> >      }
>> >  }
>> > +
>> > +/**
>> > + * zero_page_detect_cpu: Perform zero page detection using CPU.
>> > + *
>> > + * Sorts normal pages before zero pages in p->pages->offset and updates
>> > + * p->pages->normal_num.
>> > + *
>> > + * @param p A pointer to the send params.
>> > + */
>> > +static void zero_page_detect_cpu(MultiFDSendParams *p)
>> > +{
>> > +    MultiFDPages_t *pages = &p->data->u.ram;
>> > +    RAMBlock *rb = pages->block;
>> > +    int i = 0;
>> > +    int j = pages->num - 1;
>> > +
>> > +    /*
>> > +     * Sort the page offset array by moving all normal pages to
>> > +     * the left and all zero pages to the right of the array.
>> > +     */
>> > +    while (i <= j) {
>> > +        uint64_t offset = pages->offset[i];
>> > +
>> > +        if (!buffer_is_zero(rb->host + offset, multifd_ram_page_size())) {
>> > +            i++;
>> > +            continue;
>> > +        }
>> > +
>> > +        swap_page_offset(pages->offset, i, j);
>> > +        ram_release_page(rb->idstr, offset);
>> > +        j--;
>> > +    }
>> > +
>> > +    pages->normal_num = i;
>> > +}
>> > +
>> > +/**
>> > + * multifd_send_zero_page_detect: Perform zero page detection on all pages.
>> > + *
>> > + * @param p A pointer to the send params.
>> > + */
>> > +void multifd_send_zero_page_detect(MultiFDSendParams *p)
>> > +{
>> > +    MultiFDPages_t *pages = &p->data->u.ram;
>> > +
>> > +    if (!multifd_zero_page_enabled()) {
>> > +        pages->normal_num = pages->num;
>> > +        goto out;
>> > +    }
>> > +
>> > +    if (qemu_dsa_is_running()) {
>> > +        zero_page_detect_dsa(p);
>> > +    } else {
>> > +        zero_page_detect_cpu(p);
>> > +    }
>> > +
>> > +out:
>> > +    stat64_add(&mig_stats.normal_pages, pages->normal_num);
>> > +    stat64_add(&mig_stats.zero_pages, pages->num - pages->normal_num);
>> > +}
>> > diff --git a/migration/multifd.c b/migration/multifd.c
>> > index 4374e14a96..689acceff2 100644
>> > --- a/migration/multifd.c
>> > +++ b/migration/multifd.c
>> > @@ -13,6 +13,7 @@
>> >  #include "qemu/osdep.h"
>> >  #include "qemu/cutils.h"
>> >  #include "qemu/rcu.h"
>> > +#include "qemu/dsa.h"
>> >  #include "exec/target_page.h"
>> >  #include "sysemu/sysemu.h"
>> >  #include "exec/ramblock.h"
>> > @@ -462,6 +463,8 @@ static bool multifd_send_cleanup_channel(MultiFDSendParams *p, Error **errp)
>> >      p->name = NULL;
>> >      g_free(p->data);
>> >      p->data = NULL;
>> > +    buffer_zero_batch_task_destroy(p->dsa_batch_task);
>> > +    p->dsa_batch_task = NULL;
>> >      p->packet_len = 0;
>> >      g_free(p->packet);
>> >      p->packet = NULL;
>> > @@ -493,6 +496,8 @@ void multifd_send_shutdown(void)
>> >
>> >      multifd_send_terminate_threads();
>> >
>> > +    multifd_dsa_cleanup();
>> > +
>> >      for (i = 0; i < migrate_multifd_channels(); i++) {
>> >          MultiFDSendParams *p = &multifd_send_state->params[i];
>> >          Error *local_err = NULL;
>> > @@ -814,11 +819,31 @@ bool multifd_send_setup(void)
>> >      uint32_t page_count = multifd_ram_page_count();
>> >      bool use_packets = multifd_use_packets();
>> >      uint8_t i;
>> > +    Error *local_err = NULL;
>> >
>> >      if (!migrate_multifd()) {
>> >          return true;
>> >      }
>> >
>> > +    if (s &&
>> > +        s->parameters.zero_page_detection == ZERO_PAGE_DETECTION_DSA_ACCEL) {
>> > +        // Populate the dsa device path from accel-path
>>
>> scripts/checkpatch.pl would have rejected this.
>>
>
> Sorry. I will make sure to run checkpatch.pl, unit test (both
> with/without DSA), before the send-email...
>
>> > +        const strList *accel_path = migrate_accel_path();
>> > +        g_autofree strList *dsa_parameter = g_malloc0(sizeof(strList));
>> > +        strList **tail = &dsa_parameter;
>> > +        while (accel_path) {
>> > +            if (strncmp(accel_path->value, "dsa:", 4) == 0) {
>> > +                QAPI_LIST_APPEND(tail, &accel_path->value[4]);
>> > +            }
>> > +            accel_path = accel_path->next;
>> > +        }
>>
>> The parsing of the parameter should be in options.c. In fact, Peter
>> suggested in v4 to make all of this a multifd_dsa_send_setup() or
>> multifd_dsa_init(), I think that's a good idea.
>>
>
> Will fix it in the next version.
>
>> > +        if (qemu_dsa_init(dsa_parameter, &local_err)) {
>> > +            ret = -1;
>>
>> migrate_set_error(s, local_err);
>> goto err;
>
> Will fix it in the next version. But here we can't goto err, because
> the cleanup() function will be called when setup() fails, and it has
> assumptions that a certain data structure is in place. If we exit
> earlier, the cleanup() function will complain and fail.
>

Which data structure? Is that multifd_send_state below? You could move
those before qemu_dsa_init if that's the case.

>>
>> > +        } else {
>> > +            qemu_dsa_start();
>> > +        }
>> > +    }
>> > +
>> >      thread_count = migrate_multifd_channels();
>> >      multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
>> >      multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
>> > @@ -829,12 +854,12 @@ bool multifd_send_setup(void)
>> >
>> >      for (i = 0; i < thread_count; i++) {
>> >          MultiFDSendParams *p = &multifd_send_state->params[i];
>> > -        Error *local_err = NULL;
>> >
>> >          qemu_sem_init(&p->sem, 0);
>> >          qemu_sem_init(&p->sem_sync, 0);
>> >          p->id = i;
>> >          p->data = multifd_send_data_alloc();
>> > +        p->dsa_batch_task = buffer_zero_batch_task_init(page_count);
>> >
>> >          if (use_packets) {
>> >              p->packet_len = sizeof(MultiFDPacket_t)
>> > @@ -865,7 +890,6 @@ bool multifd_send_setup(void)
>> >
>> >      for (i = 0; i < thread_count; i++) {
>> >          MultiFDSendParams *p = &multifd_send_state->params[i];
>> > -        Error *local_err = NULL;
>> >
>> >          ret = multifd_send_state->ops->send_setup(p, &local_err);
>> >          if (ret) {
>> > @@ -1047,6 +1071,7 @@ void multifd_recv_cleanup(void)
>> >              qemu_thread_join(&p->thread);
>> >          }
>> >      }
>> > +    multifd_dsa_cleanup();
>> >      for (i = 0; i < migrate_multifd_channels(); i++) {
>> >          multifd_recv_cleanup_channel(&multifd_recv_state->params[i]);
>> >      }
>> > diff --git a/migration/multifd.h b/migration/multifd.h
>> > index 50d58c0c9c..e293ddbc1d 100644
>> > --- a/migration/multifd.h
>> > +++ b/migration/multifd.h
>> > @@ -15,6 +15,7 @@
>> >
>> >  #include "exec/target_page.h"
>> >  #include "ram.h"
>> > +#include "qemu/dsa.h"
>> >
>> >  typedef struct MultiFDRecvData MultiFDRecvData;
>> >  typedef struct MultiFDSendData MultiFDSendData;
>> > @@ -155,6 +156,9 @@ typedef struct {
>> >      bool pending_sync;
>> >      MultiFDSendData *data;
>> >
>> > +    /* Zero page checking batch task */
>> > +    QemuDsaBatchTask *dsa_batch_task;
>> > +
>> >      /* thread local variables. No locking required */
>> >
>> >      /* pointer to the packet */
>> > @@ -313,6 +317,7 @@ void multifd_send_fill_packet(MultiFDSendParams *p);
>> >  bool multifd_send_prepare_common(MultiFDSendParams *p);
>> >  void multifd_send_zero_page_detect(MultiFDSendParams *p);
>> >  void multifd_recv_zero_page_process(MultiFDRecvParams *p);
>> > +void multifd_dsa_cleanup(void);
>> >
>> >  static inline void multifd_send_prepare_header(MultiFDSendParams *p)
>> >  {
Re: [External] Re: [PATCH v7 09/12] migration/multifd: Enable DSA offloading in multifd sender path.
Posted by Yichen Wang 1 year, 2 months ago
On Tue, Nov 26, 2024 at 5:23 AM Fabiano Rosas <farosas@suse.de> wrote:
>
> Yichen Wang <yichen.wang@bytedance.com> writes:
>
> > On Thu, Nov 21, 2024 at 12:52 PM Fabiano Rosas <farosas@suse.de> wrote:
> >>
> >> Yichen Wang <yichen.wang@bytedance.com> writes:
> >>
> >> > From: Hao Xiang <hao.xiang@linux.dev>
> >> >
> >> > Multifd sender path gets an array of pages queued by the migration
> >> > thread. It performs zero page checking on every page in the array.
> >> > The pages are classfied as either a zero page or a normal page. This
> >> > change uses Intel DSA to offload the zero page checking from CPU to
> >> > the DSA accelerator. The sender thread submits a batch of pages to DSA
> >> > hardware and waits for the DSA completion thread to signal for work
> >> > completion.
> >> >
> >> > Signed-off-by: Hao Xiang <hao.xiang@linux.dev>
> >> > Signed-off-by: Yichen Wang <yichen.wang@bytedance.com>
> >> > ---
> >> >  migration/multifd-zero-page.c | 129 ++++++++++++++++++++++++++++++----
> >> >  migration/multifd.c           |  29 +++++++-
> >> >  migration/multifd.h           |   5 ++
> >> >  3 files changed, 147 insertions(+), 16 deletions(-)
> >> >
> >> > diff --git a/migration/multifd-zero-page.c b/migration/multifd-zero-page.c
> >> > index f1e988a959..639aed9f6b 100644
> >> > --- a/migration/multifd-zero-page.c
> >> > +++ b/migration/multifd-zero-page.c
> >> > @@ -21,7 +21,9 @@
> >> >
> >> >  static bool multifd_zero_page_enabled(void)
> >> >  {
> >> > -    return migrate_zero_page_detection() == ZERO_PAGE_DETECTION_MULTIFD;
> >> > +    ZeroPageDetection curMethod = migrate_zero_page_detection();
> >> > +    return (curMethod == ZERO_PAGE_DETECTION_MULTIFD ||
> >> > +            curMethod == ZERO_PAGE_DETECTION_DSA_ACCEL);
> >> >  }
> >> >
> >> >  static void swap_page_offset(ram_addr_t *pages_offset, int a, int b)
> >> > @@ -37,26 +39,49 @@ static void swap_page_offset(ram_addr_t *pages_offset, int a, int b)
> >> >      pages_offset[b] = temp;
> >> >  }
> >> >
> >> > +#ifdef CONFIG_DSA_OPT
> >> > +
> >> > +static void swap_result(bool *results, int a, int b)
> >> > +{
> >> > +    bool temp;
> >> > +
> >> > +    if (a == b) {
> >> > +        return;
> >> > +    }
> >> > +
> >> > +    temp = results[a];
> >> > +    results[a] = results[b];
> >> > +    results[b] = temp;
> >> > +}
> >> > +
> >> >  /**
> >> > - * multifd_send_zero_page_detect: Perform zero page detection on all pages.
> >> > + * zero_page_detect_dsa: Perform zero page detection using
> >> > + * Intel Data Streaming Accelerator (DSA).
> >> >   *
> >> > - * Sorts normal pages before zero pages in p->pages->offset and updates
> >> > - * p->pages->normal_num.
> >> > + * Sorts normal pages before zero pages in pages->offset and updates
> >> > + * pages->normal_num.
> >> >   *
> >> >   * @param p A pointer to the send params.
> >> >   */
> >> > -void multifd_send_zero_page_detect(MultiFDSendParams *p)
> >> > +static void zero_page_detect_dsa(MultiFDSendParams *p)
> >> >  {
> >> >      MultiFDPages_t *pages = &p->data->u.ram;
> >> >      RAMBlock *rb = pages->block;
> >> > -    int i = 0;
> >> > -    int j = pages->num - 1;
> >> > +    bool *results = p->dsa_batch_task->results;
> >> >
> >> > -    if (!multifd_zero_page_enabled()) {
> >> > -        pages->normal_num = pages->num;
> >> > -        goto out;
> >> > +    for (int i = 0; i < pages->num; i++) {
> >> > +        p->dsa_batch_task->addr[i] =
> >> > +            (ram_addr_t)(rb->host + pages->offset[i]);
> >> >      }
> >> >
> >> > +    buffer_is_zero_dsa_batch_sync(p->dsa_batch_task,
> >> > +                                  (const void **)p->dsa_batch_task->addr,
> >> > +                                  pages->num,
> >> > +                                  multifd_ram_page_size());
> >> > +
> >> > +    int i = 0;
> >> > +    int j = pages->num - 1;
> >> > +
> >> >      /*
> >> >       * Sort the page offset array by moving all normal pages to
> >> >       * the left and all zero pages to the right of the array.
> >> > @@ -64,23 +89,39 @@ void multifd_send_zero_page_detect(MultiFDSendParams *p)
> >> >      while (i <= j) {
> >> >          uint64_t offset = pages->offset[i];
> >> >
> >> > -        if (!buffer_is_zero(rb->host + offset, multifd_ram_page_size())) {
> >> > +        if (!results[i]) {
> >> >              i++;
> >> >              continue;
> >> >          }
> >> >
> >> > +        swap_result(results, i, j);
> >> >          swap_page_offset(pages->offset, i, j);
> >> >          ram_release_page(rb->idstr, offset);
> >> >          j--;
> >> >      }
> >> >
> >> >      pages->normal_num = i;
> >> > +}
> >> >
> >> > -out:
> >> > -    stat64_add(&mig_stats.normal_pages, pages->normal_num);
> >> > -    stat64_add(&mig_stats.zero_pages, pages->num - pages->normal_num);
> >> > +void multifd_dsa_cleanup(void)
> >> > +{
> >> > +    qemu_dsa_cleanup();
> >> > +}
> >> > +
> >> > +#else
> >> > +
> >> > +static void zero_page_detect_dsa(MultiFDSendParams *p)
> >> > +{
> >> > +    g_assert_not_reached();
> >> > +}
> >> > +
> >> > +void multifd_dsa_cleanup(void)
> >> > +{
> >> > +    return ;
> >> >  }
> >> >
> >> > +#endif
> >> > +
> >> >  void multifd_recv_zero_page_process(MultiFDRecvParams *p)
> >> >  {
> >> >      for (int i = 0; i < p->zero_num; i++) {
> >> > @@ -92,3 +133,63 @@ void multifd_recv_zero_page_process(MultiFDRecvParams *p)
> >> >          }
> >> >      }
> >> >  }
> >> > +
> >> > +/**
> >> > + * zero_page_detect_cpu: Perform zero page detection using CPU.
> >> > + *
> >> > + * Sorts normal pages before zero pages in p->pages->offset and updates
> >> > + * p->pages->normal_num.
> >> > + *
> >> > + * @param p A pointer to the send params.
> >> > + */
> >> > +static void zero_page_detect_cpu(MultiFDSendParams *p)
> >> > +{
> >> > +    MultiFDPages_t *pages = &p->data->u.ram;
> >> > +    RAMBlock *rb = pages->block;
> >> > +    int i = 0;
> >> > +    int j = pages->num - 1;
> >> > +
> >> > +    /*
> >> > +     * Sort the page offset array by moving all normal pages to
> >> > +     * the left and all zero pages to the right of the array.
> >> > +     */
> >> > +    while (i <= j) {
> >> > +        uint64_t offset = pages->offset[i];
> >> > +
> >> > +        if (!buffer_is_zero(rb->host + offset, multifd_ram_page_size())) {
> >> > +            i++;
> >> > +            continue;
> >> > +        }
> >> > +
> >> > +        swap_page_offset(pages->offset, i, j);
> >> > +        ram_release_page(rb->idstr, offset);
> >> > +        j--;
> >> > +    }
> >> > +
> >> > +    pages->normal_num = i;
> >> > +}
> >> > +
> >> > +/**
> >> > + * multifd_send_zero_page_detect: Perform zero page detection on all pages.
> >> > + *
> >> > + * @param p A pointer to the send params.
> >> > + */
> >> > +void multifd_send_zero_page_detect(MultiFDSendParams *p)
> >> > +{
> >> > +    MultiFDPages_t *pages = &p->data->u.ram;
> >> > +
> >> > +    if (!multifd_zero_page_enabled()) {
> >> > +        pages->normal_num = pages->num;
> >> > +        goto out;
> >> > +    }
> >> > +
> >> > +    if (qemu_dsa_is_running()) {
> >> > +        zero_page_detect_dsa(p);
> >> > +    } else {
> >> > +        zero_page_detect_cpu(p);
> >> > +    }
> >> > +
> >> > +out:
> >> > +    stat64_add(&mig_stats.normal_pages, pages->normal_num);
> >> > +    stat64_add(&mig_stats.zero_pages, pages->num - pages->normal_num);
> >> > +}
> >> > diff --git a/migration/multifd.c b/migration/multifd.c
> >> > index 4374e14a96..689acceff2 100644
> >> > --- a/migration/multifd.c
> >> > +++ b/migration/multifd.c
> >> > @@ -13,6 +13,7 @@
> >> >  #include "qemu/osdep.h"
> >> >  #include "qemu/cutils.h"
> >> >  #include "qemu/rcu.h"
> >> > +#include "qemu/dsa.h"
> >> >  #include "exec/target_page.h"
> >> >  #include "sysemu/sysemu.h"
> >> >  #include "exec/ramblock.h"
> >> > @@ -462,6 +463,8 @@ static bool multifd_send_cleanup_channel(MultiFDSendParams *p, Error **errp)
> >> >      p->name = NULL;
> >> >      g_free(p->data);
> >> >      p->data = NULL;
> >> > +    buffer_zero_batch_task_destroy(p->dsa_batch_task);
> >> > +    p->dsa_batch_task = NULL;
> >> >      p->packet_len = 0;
> >> >      g_free(p->packet);
> >> >      p->packet = NULL;
> >> > @@ -493,6 +496,8 @@ void multifd_send_shutdown(void)
> >> >
> >> >      multifd_send_terminate_threads();
> >> >
> >> > +    multifd_dsa_cleanup();
> >> > +
> >> >      for (i = 0; i < migrate_multifd_channels(); i++) {
> >> >          MultiFDSendParams *p = &multifd_send_state->params[i];
> >> >          Error *local_err = NULL;
> >> > @@ -814,11 +819,31 @@ bool multifd_send_setup(void)
> >> >      uint32_t page_count = multifd_ram_page_count();
> >> >      bool use_packets = multifd_use_packets();
> >> >      uint8_t i;
> >> > +    Error *local_err = NULL;
> >> >
> >> >      if (!migrate_multifd()) {
> >> >          return true;
> >> >      }
> >> >
> >> > +    if (s &&
> >> > +        s->parameters.zero_page_detection == ZERO_PAGE_DETECTION_DSA_ACCEL) {
> >> > +        // Populate the dsa device path from accel-path
> >>
> >> scripts/checkpatch.pl would have rejected this.
> >>
> >
> > Sorry. I will make sure to run checkpatch.pl, unit test (both
> > with/without DSA), before the send-email...
> >
> >> > +        const strList *accel_path = migrate_accel_path();
> >> > +        g_autofree strList *dsa_parameter = g_malloc0(sizeof(strList));
> >> > +        strList **tail = &dsa_parameter;
> >> > +        while (accel_path) {
> >> > +            if (strncmp(accel_path->value, "dsa:", 4) == 0) {
> >> > +                QAPI_LIST_APPEND(tail, &accel_path->value[4]);
> >> > +            }
> >> > +            accel_path = accel_path->next;
> >> > +        }
> >>
> >> The parsing of the parameter should be in options.c. In fact, Peter
> >> suggested in v4 to make all of this a multifd_dsa_send_setup() or
> >> multifd_dsa_init(), I think that's a good idea.
> >>
> >
> > Will fix it in the next version.
> >
> >> > +        if (qemu_dsa_init(dsa_parameter, &local_err)) {
> >> > +            ret = -1;
> >>
> >> migrate_set_error(s, local_err);
> >> goto err;
> >
> > Will fix it in the next version. But here we can't goto err, because
> > the cleanup() function will be called when setup() fails, and it has
> > assumptions that a certain data structure is in place. If we exit
> > earlier, the cleanup() function will complain and fail.
> >
>
> Which data structure? Is that multifd_send_state below? You could move
> those before qemu_dsa_init if that's the case.
>

Yes, actually the whole below code including the for-loop are expected
to be done and cannot exit earlier. For example, inside the
thread_count loop, multifd_new_send_channel_create() is also not exit
earlier, and the whole data structure will be referred to in the
cleanup() function. I also thought to put dsa setup function post
these original initializations, but "p->dsa_batch_task =
buffer_zero_batch_task_init(page_count);" has dependence on DSA and
dsa setup needs to be earlier. So I guess this is the only way...

> >>
> >> > +        } else {
> >> > +            qemu_dsa_start();
> >> > +        }
> >> > +    }
> >> > +
> >> >      thread_count = migrate_multifd_channels();
> >> >      multifd_send_state = g_malloc0(sizeof(*multifd_send_state));
> >> >      multifd_send_state->params = g_new0(MultiFDSendParams, thread_count);
> >> > @@ -829,12 +854,12 @@ bool multifd_send_setup(void)
> >> >
> >> >      for (i = 0; i < thread_count; i++) {
> >> >          MultiFDSendParams *p = &multifd_send_state->params[i];
> >> > -        Error *local_err = NULL;
> >> >
> >> >          qemu_sem_init(&p->sem, 0);
> >> >          qemu_sem_init(&p->sem_sync, 0);
> >> >          p->id = i;
> >> >          p->data = multifd_send_data_alloc();
> >> > +        p->dsa_batch_task = buffer_zero_batch_task_init(page_count);
> >> >
> >> >          if (use_packets) {
> >> >              p->packet_len = sizeof(MultiFDPacket_t)
> >> > @@ -865,7 +890,6 @@ bool multifd_send_setup(void)
> >> >
> >> >      for (i = 0; i < thread_count; i++) {
> >> >          MultiFDSendParams *p = &multifd_send_state->params[i];
> >> > -        Error *local_err = NULL;
> >> >
> >> >          ret = multifd_send_state->ops->send_setup(p, &local_err);
> >> >          if (ret) {
> >> > @@ -1047,6 +1071,7 @@ void multifd_recv_cleanup(void)
> >> >              qemu_thread_join(&p->thread);
> >> >          }
> >> >      }
> >> > +    multifd_dsa_cleanup();
> >> >      for (i = 0; i < migrate_multifd_channels(); i++) {
> >> >          multifd_recv_cleanup_channel(&multifd_recv_state->params[i]);
> >> >      }
> >> > diff --git a/migration/multifd.h b/migration/multifd.h
> >> > index 50d58c0c9c..e293ddbc1d 100644
> >> > --- a/migration/multifd.h
> >> > +++ b/migration/multifd.h
> >> > @@ -15,6 +15,7 @@
> >> >
> >> >  #include "exec/target_page.h"
> >> >  #include "ram.h"
> >> > +#include "qemu/dsa.h"
> >> >
> >> >  typedef struct MultiFDRecvData MultiFDRecvData;
> >> >  typedef struct MultiFDSendData MultiFDSendData;
> >> > @@ -155,6 +156,9 @@ typedef struct {
> >> >      bool pending_sync;
> >> >      MultiFDSendData *data;
> >> >
> >> > +    /* Zero page checking batch task */
> >> > +    QemuDsaBatchTask *dsa_batch_task;
> >> > +
> >> >      /* thread local variables. No locking required */
> >> >
> >> >      /* pointer to the packet */
> >> > @@ -313,6 +317,7 @@ void multifd_send_fill_packet(MultiFDSendParams *p);
> >> >  bool multifd_send_prepare_common(MultiFDSendParams *p);
> >> >  void multifd_send_zero_page_detect(MultiFDSendParams *p);
> >> >  void multifd_recv_zero_page_process(MultiFDRecvParams *p);
> >> > +void multifd_dsa_cleanup(void);
> >> >
> >> >  static inline void multifd_send_prepare_header(MultiFDSendParams *p)
> >> >  {