[PATCH v3 13/20] migration/multifd: Prepare to introduce DSA acceleration on the multifd path.

Hao Xiang posted 20 patches 10 months, 3 weeks ago
Maintainers: "Michael S. Tsirkin" <mst@redhat.com>, Cornelia Huck <cohuck@redhat.com>, Paolo Bonzini <pbonzini@redhat.com>, "Marc-André Lureau" <marcandre.lureau@redhat.com>, "Daniel P. Berrangé" <berrange@redhat.com>, Thomas Huth <thuth@redhat.com>, "Philippe Mathieu-Daudé" <philmd@linaro.org>, Juan Quintela <quintela@redhat.com>, Peter Xu <peterx@redhat.com>, Fabiano Rosas <farosas@suse.de>, Leonardo Bras <leobras@redhat.com>, Eric Blake <eblake@redhat.com>, Markus Armbruster <armbru@redhat.com>, Laurent Vivier <lvivier@redhat.com>
There is a newer version of this series
[PATCH v3 13/20] migration/multifd: Prepare to introduce DSA acceleration on the multifd path.
Posted by Hao Xiang 10 months, 3 weeks ago
1. Refactor multifd_send_thread function.
2. Implement buffer_is_zero_use_cpu to handle CPU based zero page
checking.
3. Introduce the batch task structure in MultiFDSendParams.

Signed-off-by: Hao Xiang <hao.xiang@bytedance.com>
---
 include/qemu/dsa.h  | 43 +++++++++++++++++++++++--
 migration/multifd.c | 77 ++++++++++++++++++++++++++++++++++++---------
 migration/multifd.h |  2 ++
 util/dsa.c          | 51 +++++++++++++++++++++++++-----
 4 files changed, 148 insertions(+), 25 deletions(-)

diff --git a/include/qemu/dsa.h b/include/qemu/dsa.h
index e002652879..fe7772107a 100644
--- a/include/qemu/dsa.h
+++ b/include/qemu/dsa.h
@@ -2,6 +2,7 @@
 #define QEMU_DSA_H
 
 #include "qemu/error-report.h"
+#include "exec/cpu-common.h"
 #include "qemu/thread.h"
 #include "qemu/queue.h"
 
@@ -42,6 +43,20 @@ typedef struct dsa_batch_task {
     QSIMPLEQ_ENTRY(dsa_batch_task) entry;
 } dsa_batch_task;
 
+#endif
+
+struct batch_task {
+    /* Address of each pages in pages */
+    ram_addr_t *addr;
+    /* Zero page checking results */
+    bool *results;
+#ifdef CONFIG_DSA_OPT
+    struct dsa_batch_task *dsa_batch;
+#endif
+};
+
+#ifdef CONFIG_DSA_OPT
+
 /**
  * @brief Initializes DSA devices.
  *
@@ -74,7 +89,7 @@ void dsa_cleanup(void);
 bool dsa_is_running(void);
 
 /**
- * @brief Initializes a buffer zero batch task.
+ * @brief Initializes a buffer zero DSA batch task.
  *
  * @param task A pointer to the batch task to initialize.
  * @param results A pointer to an array of zero page checking results.
@@ -102,7 +117,7 @@ void buffer_zero_batch_task_destroy(struct dsa_batch_task *task);
  * @return Zero if successful, otherwise non-zero.
  */
 int
-buffer_is_zero_dsa_batch_async(struct dsa_batch_task *batch_task,
+buffer_is_zero_dsa_batch_async(struct batch_task *batch_task,
                                const void **buf, size_t count, size_t len);
 
 #else
@@ -128,6 +143,30 @@ static inline void dsa_stop(void) {}
 
 static inline void dsa_cleanup(void) {}
 
+static inline int
+buffer_is_zero_dsa_batch_async(struct batch_task *batch_task,
+                               const void **buf, size_t count, size_t len)
+{
+    exit(1);
+}
+
 #endif
 
+/**
+ * @brief Initializes a general buffer zero batch task.
+ *
+ * @param task A pointer to the general batch task to initialize.
+ * @param batch_size The number of zero page checking tasks in the batch.
+ */
+void
+batch_task_init(struct batch_task *task, int batch_size);
+
+/**
+ * @brief Destroys a general buffer zero batch task.
+ *
+ * @param task A pointer to the general batch task to destroy.
+ */
+void
+batch_task_destroy(struct batch_task *task);
+
 #endif
diff --git a/migration/multifd.c b/migration/multifd.c
index eece85569f..e7c549b93e 100644
--- a/migration/multifd.c
+++ b/migration/multifd.c
@@ -14,6 +14,8 @@
 #include "qemu/cutils.h"
 #include "qemu/rcu.h"
 #include "qemu/cutils.h"
+#include "qemu/dsa.h"
+#include "qemu/memalign.h"
 #include "exec/target_page.h"
 #include "sysemu/sysemu.h"
 #include "exec/ramblock.h"
@@ -574,6 +576,8 @@ void multifd_save_cleanup(void)
         p->name = NULL;
         multifd_pages_clear(p->pages);
         p->pages = NULL;
+        batch_task_destroy(p->batch_task);
+        p->batch_task = NULL;
         p->packet_len = 0;
         g_free(p->packet);
         p->packet = NULL;
@@ -678,13 +682,66 @@ int multifd_send_sync_main(QEMUFile *f)
     return 0;
 }
 
+static void set_page(MultiFDSendParams *p, bool zero_page, uint64_t offset)
+{
+    RAMBlock *rb = p->pages->block;
+    if (zero_page) {
+        p->zero[p->zero_num] = offset;
+        p->zero_num++;
+        ram_release_page(rb->idstr, offset);
+    } else {
+        p->normal[p->normal_num] = offset;
+        p->normal_num++;
+    }
+}
+
+static void buffer_is_zero_use_cpu(MultiFDSendParams *p)
+{
+    const void **buf = (const void **)p->batch_task->addr;
+    assert(!migrate_use_main_zero_page());
+
+    for (int i = 0; i < p->pages->num; i++) {
+        p->batch_task->results[i] = buffer_is_zero(buf[i], p->page_size);
+    }
+}
+
+static void set_normal_pages(MultiFDSendParams *p)
+{
+    for (int i = 0; i < p->pages->num; i++) {
+        p->batch_task->results[i] = false;
+    }
+}
+
+static void multifd_zero_page_check(MultiFDSendParams *p)
+{
+    /* older qemu don't understand zero page on multifd channel */
+    bool use_multifd_zero_page = !migrate_use_main_zero_page();
+
+    RAMBlock *rb = p->pages->block;
+
+    for (int i = 0; i < p->pages->num; i++) {
+        p->batch_task->addr[i] = (ram_addr_t)(rb->host + p->pages->offset[i]);
+    }
+
+    if (use_multifd_zero_page) {
+        buffer_is_zero_use_cpu(p);
+    } else {
+        /* No zero page checking. All pages are normal pages. */
+        set_normal_pages(p);
+    }
+
+    for (int i = 0; i < p->pages->num; i++) {
+        uint64_t offset = p->pages->offset[i];
+        bool zero_page = p->batch_task->results[i];
+        set_page(p, zero_page, offset);
+    }
+}
+
 static void *multifd_send_thread(void *opaque)
 {
     MultiFDSendParams *p = opaque;
     MigrationThread *thread = NULL;
     Error *local_err = NULL;
-    /* qemu older than 8.2 don't understand zero page on multifd channel */
-    bool use_multifd_zero_page = !migrate_use_main_zero_page();
     int ret = 0;
     bool use_zero_copy_send = migrate_zero_copy_send();
 
@@ -710,7 +767,6 @@ static void *multifd_send_thread(void *opaque)
         qemu_mutex_lock(&p->mutex);
 
         if (p->pending_job) {
-            RAMBlock *rb = p->pages->block;
             uint64_t packet_num = p->packet_num;
             uint32_t flags;
 
@@ -723,18 +779,7 @@ static void *multifd_send_thread(void *opaque)
                 p->iovs_num = 1;
             }
 
-            for (int i = 0; i < p->pages->num; i++) {
-                uint64_t offset = p->pages->offset[i];
-                if (use_multifd_zero_page &&
-                    buffer_is_zero(rb->host + offset, p->page_size)) {
-                    p->zero[p->zero_num] = offset;
-                    p->zero_num++;
-                    ram_release_page(rb->idstr, offset);
-                } else {
-                    p->normal[p->normal_num] = offset;
-                    p->normal_num++;
-                }
-            }
+            multifd_zero_page_check(p);
 
             if (p->normal_num) {
                 ret = multifd_send_state->ops->send_prepare(p, &local_err);
@@ -975,6 +1020,8 @@ int multifd_save_setup(Error **errp)
         p->pending_job = 0;
         p->id = i;
         p->pages = multifd_pages_init(page_count);
+        p->batch_task = g_malloc0(sizeof(struct batch_task));
+        batch_task_init(p->batch_task, page_count);
         p->packet_len = sizeof(MultiFDPacket_t)
                       + sizeof(uint64_t) * page_count;
         p->packet = g_malloc0(p->packet_len);
diff --git a/migration/multifd.h b/migration/multifd.h
index 13762900d4..97b5f888a7 100644
--- a/migration/multifd.h
+++ b/migration/multifd.h
@@ -119,6 +119,8 @@ typedef struct {
      * pending_job != 0 -> multifd_channel can use it.
      */
     MultiFDPages_t *pages;
+    /* Zero page checking batch task */
+    struct batch_task *batch_task;
 
     /* thread local variables. No locking required */
 
diff --git a/util/dsa.c b/util/dsa.c
index 5a2bf33651..f6224a27d4 100644
--- a/util/dsa.c
+++ b/util/dsa.c
@@ -802,7 +802,7 @@ buffer_zero_task_init_int(struct dsa_hw_desc *descriptor,
 }
 
 /**
- * @brief Initializes a buffer zero batch task.
+ * @brief Initializes a buffer zero DSA batch task.
  *
  * @param task A pointer to the batch task to initialize.
  * @param results A pointer to an array of zero page checking results.
@@ -1107,29 +1107,64 @@ void dsa_cleanup(void)
  * @return Zero if successful, otherwise non-zero.
  */
 int
-buffer_is_zero_dsa_batch_async(struct dsa_batch_task *batch_task,
+buffer_is_zero_dsa_batch_async(struct batch_task *batch_task,
                                const void **buf, size_t count, size_t len)
 {
-    if (count <= 0 || count > batch_task->batch_size) {
+    struct dsa_batch_task *dsa_batch = batch_task->dsa_batch;
+
+    if (count <= 0 || count > dsa_batch->batch_size) {
         return -1;
     }
 
-    assert(batch_task != NULL);
+    assert(dsa_batch != NULL);
     assert(len != 0);
     assert(buf != NULL);
 
     if (count == 1) {
         /* DSA doesn't take batch operation with only 1 task. */
-        buffer_zero_dsa_async(batch_task, buf[0], len);
+        buffer_zero_dsa_async(dsa_batch, buf[0], len);
     } else {
-        buffer_zero_dsa_batch_async(batch_task, buf, count, len);
+        buffer_zero_dsa_batch_async(dsa_batch, buf, count, len);
     }
 
-    buffer_zero_dsa_wait(batch_task);
-    buffer_zero_cpu_fallback(batch_task);
+    buffer_zero_dsa_wait(dsa_batch);
+    buffer_zero_cpu_fallback(dsa_batch);
 
     return 0;
 }
 
 #endif
 
+/**
+ * @brief Initializes a general buffer zero batch task.
+ *
+ * @param task A pointer to the general batch task to initialize.
+ * @param batch_size The number of zero page checking tasks in the batch.
+ */
+void
+batch_task_init(struct batch_task *task, int batch_size)
+{
+    task->addr = g_new0(ram_addr_t, batch_size);
+    task->results = g_new0(bool, batch_size);
+#ifdef CONFIG_DSA_OPT
+    task->dsa_batch = qemu_memalign(64, sizeof(struct dsa_batch_task));
+    buffer_zero_batch_task_init(task->dsa_batch, task->results, batch_size);
+#endif
+}
+
+/**
+ * @brief Destroys a general buffer zero batch task.
+ *
+ * @param task A pointer to the general batch task to destroy.
+ */
+void
+batch_task_destroy(struct batch_task *task)
+{
+    g_free(task->addr);
+    g_free(task->results);
+#ifdef CONFIG_DSA_OPT
+    buffer_zero_batch_task_destroy(task->dsa_batch);
+    qemu_vfree(task->dsa_batch);
+#endif
+}
+
-- 
2.30.2
Re: [PATCH v3 13/20] migration/multifd: Prepare to introduce DSA acceleration on the multifd path.
Posted by Shivam Kumar 10 months, 2 weeks ago

> On 04-Jan-2024, at 6:14 AM, Hao Xiang <hao.xiang@bytedance.com> wrote:
> 
> 1. Refactor multifd_send_thread function.
> 2. Implement buffer_is_zero_use_cpu to handle CPU based zero page
> checking.
> 3. Introduce the batch task structure in MultiFDSendParams.
> 
> Signed-off-by: Hao Xiang <hao.xiang@bytedance.com>
> ---
> include/qemu/dsa.h  | 43 +++++++++++++++++++++++--
> migration/multifd.c | 77 ++++++++++++++++++++++++++++++++++++---------
> migration/multifd.h |  2 ++
> util/dsa.c          | 51 +++++++++++++++++++++++++-----
> 4 files changed, 148 insertions(+), 25 deletions(-)
> 
> diff --git a/include/qemu/dsa.h b/include/qemu/dsa.h
> index e002652879..fe7772107a 100644
> --- a/include/qemu/dsa.h
> +++ b/include/qemu/dsa.h
> @@ -2,6 +2,7 @@
> #define QEMU_DSA_H
> 
> #include "qemu/error-report.h"
> +#include "exec/cpu-common.h"
> #include "qemu/thread.h"
> #include "qemu/queue.h"
> 
> @@ -42,6 +43,20 @@ typedef struct dsa_batch_task {
>     QSIMPLEQ_ENTRY(dsa_batch_task) entry;
> } dsa_batch_task;
> 
> +#endif
> +
> +struct batch_task {
> +    /* Address of each pages in pages */
> +    ram_addr_t *addr;
> +    /* Zero page checking results */
> +    bool *results;
> +#ifdef CONFIG_DSA_OPT
> +    struct dsa_batch_task *dsa_batch;
> +#endif
> +};
> +
> +#ifdef CONFIG_DSA_OPT
> +
> /**
>  * @brief Initializes DSA devices.
>  *
> @@ -74,7 +89,7 @@ void dsa_cleanup(void);
> bool dsa_is_running(void);
> 
> /**
> - * @brief Initializes a buffer zero batch task.
> + * @brief Initializes a buffer zero DSA batch task.
>  *
>  * @param task A pointer to the batch task to initialize.
>  * @param results A pointer to an array of zero page checking results.
> @@ -102,7 +117,7 @@ void buffer_zero_batch_task_destroy(struct dsa_batch_task *task);
>  * @return Zero if successful, otherwise non-zero.
>  */
> int
> -buffer_is_zero_dsa_batch_async(struct dsa_batch_task *batch_task,
> +buffer_is_zero_dsa_batch_async(struct batch_task *batch_task,
>                                const void **buf, size_t count, size_t len);
> 
> #else
> @@ -128,6 +143,30 @@ static inline void dsa_stop(void) {}
> 
> static inline void dsa_cleanup(void) {}
> 
> +static inline int
> +buffer_is_zero_dsa_batch_async(struct batch_task *batch_task,
> +                               const void **buf, size_t count, size_t len)
> +{
> +    exit(1);
> +}
> +
> #endif
> 
> +/**
> + * @brief Initializes a general buffer zero batch task.
> + *
> + * @param task A pointer to the general batch task to initialize.
> + * @param batch_size The number of zero page checking tasks in the batch.
> + */
> +void
> +batch_task_init(struct batch_task *task, int batch_size);
> +
> +/**
> + * @brief Destroys a general buffer zero batch task.
> + *
> + * @param task A pointer to the general batch task to destroy.
> + */
> +void
> +batch_task_destroy(struct batch_task *task);
> +
> #endif
> diff --git a/migration/multifd.c b/migration/multifd.c
> index eece85569f..e7c549b93e 100644
> --- a/migration/multifd.c
> +++ b/migration/multifd.c
> @@ -14,6 +14,8 @@
> #include "qemu/cutils.h"
> #include "qemu/rcu.h"
> #include "qemu/cutils.h"
> +#include "qemu/dsa.h"
> +#include "qemu/memalign.h"
> #include "exec/target_page.h"
> #include "sysemu/sysemu.h"
> #include "exec/ramblock.h"
> @@ -574,6 +576,8 @@ void multifd_save_cleanup(void)
>         p->name = NULL;
>         multifd_pages_clear(p->pages);
>         p->pages = NULL;
> +        batch_task_destroy(p->batch_task);
> +        p->batch_task = NULL;
>         p->packet_len = 0;
>         g_free(p->packet);
>         p->packet = NULL;
> @@ -678,13 +682,66 @@ int multifd_send_sync_main(QEMUFile *f)
>     return 0;
> }
> 
> +static void set_page(MultiFDSendParams *p, bool zero_page, uint64_t offset)
> +{
> +    RAMBlock *rb = p->pages->block;
> +    if (zero_page) {
> +        p->zero[p->zero_num] = offset;
> +        p->zero_num++;
> +        ram_release_page(rb->idstr, offset);
> +    } else {
> +        p->normal[p->normal_num] = offset;
> +        p->normal_num++;
> +    }
> +}
> +
> +static void buffer_is_zero_use_cpu(MultiFDSendParams *p)
> +{
> +    const void **buf = (const void **)p->batch_task->addr;
> +    assert(!migrate_use_main_zero_page());
> +
> +    for (int i = 0; i < p->pages->num; i++) {
> +        p->batch_task->results[i] = buffer_is_zero(buf[i], p->page_size);
> +    }
> +}
> +
> +static void set_normal_pages(MultiFDSendParams *p)
> +{
> +    for (int i = 0; i < p->pages->num; i++) {
> +        p->batch_task->results[i] = false;
> +    }
> +}
Please correct me if I am wrong but set_normal_pages will not be a part of the final patch, right? They are there for testing out the performance against different zero page ration scenarios. If it’s so, can we isolate these parts into a separate patch.
> +
> +static void multifd_zero_page_check(MultiFDSendParams *p)
> +{
> +    /* older qemu don't understand zero page on multifd channel */
> +    bool use_multifd_zero_page = !migrate_use_main_zero_page();
> +
> +    RAMBlock *rb = p->pages->block;
> +
> +    for (int i = 0; i < p->pages->num; i++) {
> +        p->batch_task->addr[i] = (ram_addr_t)(rb->host + p->pages->offset[i]);
> +    }
> +
> +    if (use_multifd_zero_page) {
> +        buffer_is_zero_use_cpu(p);
> +    } else {
> +        /* No zero page checking. All pages are normal pages. */
> +        set_normal_pages(p);
> +    }
> +
> +    for (int i = 0; i < p->pages->num; i++) {
> +        uint64_t offset = p->pages->offset[i];
> +        bool zero_page = p->batch_task->results[i];
> +        set_page(p, zero_page, offset);
> +    }
> +}
> +
> static void *multifd_send_thread(void *opaque)
> {
>     MultiFDSendParams *p = opaque;
>     MigrationThread *thread = NULL;
>     Error *local_err = NULL;
> -    /* qemu older than 8.2 don't understand zero page on multifd channel */
> -    bool use_multifd_zero_page = !migrate_use_main_zero_page();
>     int ret = 0;
>     bool use_zero_copy_send = migrate_zero_copy_send();
> 
> @@ -710,7 +767,6 @@ static void *multifd_send_thread(void *opaque)
>         qemu_mutex_lock(&p->mutex);
> 
>         if (p->pending_job) {
> -            RAMBlock *rb = p->pages->block;
>             uint64_t packet_num = p->packet_num;
>             uint32_t flags;
> 
> @@ -723,18 +779,7 @@ static void *multifd_send_thread(void *opaque)
>                 p->iovs_num = 1;
>             }
> 
> -            for (int i = 0; i < p->pages->num; i++) {
> -                uint64_t offset = p->pages->offset[i];
> -                if (use_multifd_zero_page &&
> -                    buffer_is_zero(rb->host + offset, p->page_size)) {
> -                    p->zero[p->zero_num] = offset;
> -                    p->zero_num++;
> -                    ram_release_page(rb->idstr, offset);
> -                } else {
> -                    p->normal[p->normal_num] = offset;
> -                    p->normal_num++;
> -                }
> -            }
> +            multifd_zero_page_check(p);
> 
>             if (p->normal_num) {
>                 ret = multifd_send_state->ops->send_prepare(p, &local_err);
> @@ -975,6 +1020,8 @@ int multifd_save_setup(Error **errp)
>         p->pending_job = 0;
>         p->id = i;
>         p->pages = multifd_pages_init(page_count);
> +        p->batch_task = g_malloc0(sizeof(struct batch_task));
> +        batch_task_init(p->batch_task, page_count);
>         p->packet_len = sizeof(MultiFDPacket_t)
>                       + sizeof(uint64_t) * page_count;
>         p->packet = g_malloc0(p->packet_len);
> diff --git a/migration/multifd.h b/migration/multifd.h
> index 13762900d4..97b5f888a7 100644
> --- a/migration/multifd.h
> +++ b/migration/multifd.h
> @@ -119,6 +119,8 @@ typedef struct {
>      * pending_job != 0 -> multifd_channel can use it.
>      */
>     MultiFDPages_t *pages;
> +    /* Zero page checking batch task */
> +    struct batch_task *batch_task;
> 
>     /* thread local variables. No locking required */
> 
> diff --git a/util/dsa.c b/util/dsa.c
> index 5a2bf33651..f6224a27d4 100644
> --- a/util/dsa.c
> +++ b/util/dsa.c
> @@ -802,7 +802,7 @@ buffer_zero_task_init_int(struct dsa_hw_desc *descriptor,
> }
> 
> /**
> - * @brief Initializes a buffer zero batch task.
> + * @brief Initializes a buffer zero DSA batch task.
>  *
>  * @param task A pointer to the batch task to initialize.
>  * @param results A pointer to an array of zero page checking results.
> @@ -1107,29 +1107,64 @@ void dsa_cleanup(void)
>  * @return Zero if successful, otherwise non-zero.
>  */
> int
> -buffer_is_zero_dsa_batch_async(struct dsa_batch_task *batch_task,
> +buffer_is_zero_dsa_batch_async(struct batch_task *batch_task,
>                                const void **buf, size_t count, size_t len)
> {
> -    if (count <= 0 || count > batch_task->batch_size) {
> +    struct dsa_batch_task *dsa_batch = batch_task->dsa_batch;
> +
> +    if (count <= 0 || count > dsa_batch->batch_size) {
>         return -1;
>     }
> 
> -    assert(batch_task != NULL);
> +    assert(dsa_batch != NULL);
>     assert(len != 0);
>     assert(buf != NULL);
> 
>     if (count == 1) {
>         /* DSA doesn't take batch operation with only 1 task. */
> -        buffer_zero_dsa_async(batch_task, buf[0], len);
> +        buffer_zero_dsa_async(dsa_batch, buf[0], len);
>     } else {
> -        buffer_zero_dsa_batch_async(batch_task, buf, count, len);
> +        buffer_zero_dsa_batch_async(dsa_batch, buf, count, len);
>     }
> 
> -    buffer_zero_dsa_wait(batch_task);
> -    buffer_zero_cpu_fallback(batch_task);
> +    buffer_zero_dsa_wait(dsa_batch);
> +    buffer_zero_cpu_fallback(dsa_batch);
> 
>     return 0;
> }
> 
> #endif
> 
> +/**
> + * @brief Initializes a general buffer zero batch task.
> + *
> + * @param task A pointer to the general batch task to initialize.
> + * @param batch_size The number of zero page checking tasks in the batch.
> + */
> +void
> +batch_task_init(struct batch_task *task, int batch_size)
> +{
> +    task->addr = g_new0(ram_addr_t, batch_size);
> +    task->results = g_new0(bool, batch_size);
> +#ifdef CONFIG_DSA_OPT
> +    task->dsa_batch = qemu_memalign(64, sizeof(struct dsa_batch_task));
> +    buffer_zero_batch_task_init(task->dsa_batch, task->results, batch_size);
> +#endif
> +}
> +
> +/**
> + * @brief Destroys a general buffer zero batch task.
> + *
> + * @param task A pointer to the general batch task to destroy.
> + */
> +void
> +batch_task_destroy(struct batch_task *task)
> +{
> +    g_free(task->addr);
> +    g_free(task->results);
> +#ifdef CONFIG_DSA_OPT
> +    buffer_zero_batch_task_destroy(task->dsa_batch);
> +    qemu_vfree(task->dsa_batch);
> +#endif
> +}
> +
> -- 
> 2.30.2
> 
> 
> 

Re: [External] Re: [PATCH v3 13/20] migration/multifd: Prepare to introduce DSA acceleration on the multifd path.
Posted by Hao Xiang 10 months, 1 week ago
On Sun, Jan 14, 2024 at 10:46 PM Shivam Kumar <shivam.kumar1@nutanix.com> wrote:
>
>
>
> > On 04-Jan-2024, at 6:14 AM, Hao Xiang <hao.xiang@bytedance.com> wrote:
> >
> > 1. Refactor multifd_send_thread function.
> > 2. Implement buffer_is_zero_use_cpu to handle CPU based zero page
> > checking.
> > 3. Introduce the batch task structure in MultiFDSendParams.
> >
> > Signed-off-by: Hao Xiang <hao.xiang@bytedance.com>
> > ---
> > include/qemu/dsa.h  | 43 +++++++++++++++++++++++--
> > migration/multifd.c | 77 ++++++++++++++++++++++++++++++++++++---------
> > migration/multifd.h |  2 ++
> > util/dsa.c          | 51 +++++++++++++++++++++++++-----
> > 4 files changed, 148 insertions(+), 25 deletions(-)
> >
> > diff --git a/include/qemu/dsa.h b/include/qemu/dsa.h
> > index e002652879..fe7772107a 100644
> > --- a/include/qemu/dsa.h
> > +++ b/include/qemu/dsa.h
> > @@ -2,6 +2,7 @@
> > #define QEMU_DSA_H
> >
> > #include "qemu/error-report.h"
> > +#include "exec/cpu-common.h"
> > #include "qemu/thread.h"
> > #include "qemu/queue.h"
> >
> > @@ -42,6 +43,20 @@ typedef struct dsa_batch_task {
> >     QSIMPLEQ_ENTRY(dsa_batch_task) entry;
> > } dsa_batch_task;
> >
> > +#endif
> > +
> > +struct batch_task {
> > +    /* Address of each pages in pages */
> > +    ram_addr_t *addr;
> > +    /* Zero page checking results */
> > +    bool *results;
> > +#ifdef CONFIG_DSA_OPT
> > +    struct dsa_batch_task *dsa_batch;
> > +#endif
> > +};
> > +
> > +#ifdef CONFIG_DSA_OPT
> > +
> > /**
> >  * @brief Initializes DSA devices.
> >  *
> > @@ -74,7 +89,7 @@ void dsa_cleanup(void);
> > bool dsa_is_running(void);
> >
> > /**
> > - * @brief Initializes a buffer zero batch task.
> > + * @brief Initializes a buffer zero DSA batch task.
> >  *
> >  * @param task A pointer to the batch task to initialize.
> >  * @param results A pointer to an array of zero page checking results.
> > @@ -102,7 +117,7 @@ void buffer_zero_batch_task_destroy(struct dsa_batch_task *task);
> >  * @return Zero if successful, otherwise non-zero.
> >  */
> > int
> > -buffer_is_zero_dsa_batch_async(struct dsa_batch_task *batch_task,
> > +buffer_is_zero_dsa_batch_async(struct batch_task *batch_task,
> >                                const void **buf, size_t count, size_t len);
> >
> > #else
> > @@ -128,6 +143,30 @@ static inline void dsa_stop(void) {}
> >
> > static inline void dsa_cleanup(void) {}
> >
> > +static inline int
> > +buffer_is_zero_dsa_batch_async(struct batch_task *batch_task,
> > +                               const void **buf, size_t count, size_t len)
> > +{
> > +    exit(1);
> > +}
> > +
> > #endif
> >
> > +/**
> > + * @brief Initializes a general buffer zero batch task.
> > + *
> > + * @param task A pointer to the general batch task to initialize.
> > + * @param batch_size The number of zero page checking tasks in the batch.
> > + */
> > +void
> > +batch_task_init(struct batch_task *task, int batch_size);
> > +
> > +/**
> > + * @brief Destroys a general buffer zero batch task.
> > + *
> > + * @param task A pointer to the general batch task to destroy.
> > + */
> > +void
> > +batch_task_destroy(struct batch_task *task);
> > +
> > #endif
> > diff --git a/migration/multifd.c b/migration/multifd.c
> > index eece85569f..e7c549b93e 100644
> > --- a/migration/multifd.c
> > +++ b/migration/multifd.c
> > @@ -14,6 +14,8 @@
> > #include "qemu/cutils.h"
> > #include "qemu/rcu.h"
> > #include "qemu/cutils.h"
> > +#include "qemu/dsa.h"
> > +#include "qemu/memalign.h"
> > #include "exec/target_page.h"
> > #include "sysemu/sysemu.h"
> > #include "exec/ramblock.h"
> > @@ -574,6 +576,8 @@ void multifd_save_cleanup(void)
> >         p->name = NULL;
> >         multifd_pages_clear(p->pages);
> >         p->pages = NULL;
> > +        batch_task_destroy(p->batch_task);
> > +        p->batch_task = NULL;
> >         p->packet_len = 0;
> >         g_free(p->packet);
> >         p->packet = NULL;
> > @@ -678,13 +682,66 @@ int multifd_send_sync_main(QEMUFile *f)
> >     return 0;
> > }
> >
> > +static void set_page(MultiFDSendParams *p, bool zero_page, uint64_t offset)
> > +{
> > +    RAMBlock *rb = p->pages->block;
> > +    if (zero_page) {
> > +        p->zero[p->zero_num] = offset;
> > +        p->zero_num++;
> > +        ram_release_page(rb->idstr, offset);
> > +    } else {
> > +        p->normal[p->normal_num] = offset;
> > +        p->normal_num++;
> > +    }
> > +}
> > +
> > +static void buffer_is_zero_use_cpu(MultiFDSendParams *p)
> > +{
> > +    const void **buf = (const void **)p->batch_task->addr;
> > +    assert(!migrate_use_main_zero_page());
> > +
> > +    for (int i = 0; i < p->pages->num; i++) {
> > +        p->batch_task->results[i] = buffer_is_zero(buf[i], p->page_size);
> > +    }
> > +}
> > +
> > +static void set_normal_pages(MultiFDSendParams *p)
> > +{
> > +    for (int i = 0; i < p->pages->num; i++) {
> > +        p->batch_task->results[i] = false;
> > +    }
> > +}
> Please correct me if I am wrong but set_normal_pages will not be a part of the final patch, right? They are there for testing out the performance against different zero page ration scenarios. If it’s so, can we isolate these parts into a separate patch.

set_normal_pages is used for performance testing and testing only. It
won't introduce any "incorrect" behavior and I would love to see it
being part of the upstream code. But the argument that testing change
should remain private is always correct. So I am totally OK with
isolating the parts into a separate patch.

> > +
> > +static void multifd_zero_page_check(MultiFDSendParams *p)
> > +{
> > +    /* older qemu don't understand zero page on multifd channel */
> > +    bool use_multifd_zero_page = !migrate_use_main_zero_page();
> > +
> > +    RAMBlock *rb = p->pages->block;
> > +
> > +    for (int i = 0; i < p->pages->num; i++) {
> > +        p->batch_task->addr[i] = (ram_addr_t)(rb->host + p->pages->offset[i]);
> > +    }
> > +
> > +    if (use_multifd_zero_page) {
> > +        buffer_is_zero_use_cpu(p);
> > +    } else {
> > +        /* No zero page checking. All pages are normal pages. */
> > +        set_normal_pages(p);
> > +    }
> > +
> > +    for (int i = 0; i < p->pages->num; i++) {
> > +        uint64_t offset = p->pages->offset[i];
> > +        bool zero_page = p->batch_task->results[i];
> > +        set_page(p, zero_page, offset);
> > +    }
> > +}
> > +
> > static void *multifd_send_thread(void *opaque)
> > {
> >     MultiFDSendParams *p = opaque;
> >     MigrationThread *thread = NULL;
> >     Error *local_err = NULL;
> > -    /* qemu older than 8.2 don't understand zero page on multifd channel */
> > -    bool use_multifd_zero_page = !migrate_use_main_zero_page();
> >     int ret = 0;
> >     bool use_zero_copy_send = migrate_zero_copy_send();
> >
> > @@ -710,7 +767,6 @@ static void *multifd_send_thread(void *opaque)
> >         qemu_mutex_lock(&p->mutex);
> >
> >         if (p->pending_job) {
> > -            RAMBlock *rb = p->pages->block;
> >             uint64_t packet_num = p->packet_num;
> >             uint32_t flags;
> >
> > @@ -723,18 +779,7 @@ static void *multifd_send_thread(void *opaque)
> >                 p->iovs_num = 1;
> >             }
> >
> > -            for (int i = 0; i < p->pages->num; i++) {
> > -                uint64_t offset = p->pages->offset[i];
> > -                if (use_multifd_zero_page &&
> > -                    buffer_is_zero(rb->host + offset, p->page_size)) {
> > -                    p->zero[p->zero_num] = offset;
> > -                    p->zero_num++;
> > -                    ram_release_page(rb->idstr, offset);
> > -                } else {
> > -                    p->normal[p->normal_num] = offset;
> > -                    p->normal_num++;
> > -                }
> > -            }
> > +            multifd_zero_page_check(p);
> >
> >             if (p->normal_num) {
> >                 ret = multifd_send_state->ops->send_prepare(p, &local_err);
> > @@ -975,6 +1020,8 @@ int multifd_save_setup(Error **errp)
> >         p->pending_job = 0;
> >         p->id = i;
> >         p->pages = multifd_pages_init(page_count);
> > +        p->batch_task = g_malloc0(sizeof(struct batch_task));
> > +        batch_task_init(p->batch_task, page_count);
> >         p->packet_len = sizeof(MultiFDPacket_t)
> >                       + sizeof(uint64_t) * page_count;
> >         p->packet = g_malloc0(p->packet_len);
> > diff --git a/migration/multifd.h b/migration/multifd.h
> > index 13762900d4..97b5f888a7 100644
> > --- a/migration/multifd.h
> > +++ b/migration/multifd.h
> > @@ -119,6 +119,8 @@ typedef struct {
> >      * pending_job != 0 -> multifd_channel can use it.
> >      */
> >     MultiFDPages_t *pages;
> > +    /* Zero page checking batch task */
> > +    struct batch_task *batch_task;
> >
> >     /* thread local variables. No locking required */
> >
> > diff --git a/util/dsa.c b/util/dsa.c
> > index 5a2bf33651..f6224a27d4 100644
> > --- a/util/dsa.c
> > +++ b/util/dsa.c
> > @@ -802,7 +802,7 @@ buffer_zero_task_init_int(struct dsa_hw_desc *descriptor,
> > }
> >
> > /**
> > - * @brief Initializes a buffer zero batch task.
> > + * @brief Initializes a buffer zero DSA batch task.
> >  *
> >  * @param task A pointer to the batch task to initialize.
> >  * @param results A pointer to an array of zero page checking results.
> > @@ -1107,29 +1107,64 @@ void dsa_cleanup(void)
> >  * @return Zero if successful, otherwise non-zero.
> >  */
> > int
> > -buffer_is_zero_dsa_batch_async(struct dsa_batch_task *batch_task,
> > +buffer_is_zero_dsa_batch_async(struct batch_task *batch_task,
> >                                const void **buf, size_t count, size_t len)
> > {
> > -    if (count <= 0 || count > batch_task->batch_size) {
> > +    struct dsa_batch_task *dsa_batch = batch_task->dsa_batch;
> > +
> > +    if (count <= 0 || count > dsa_batch->batch_size) {
> >         return -1;
> >     }
> >
> > -    assert(batch_task != NULL);
> > +    assert(dsa_batch != NULL);
> >     assert(len != 0);
> >     assert(buf != NULL);
> >
> >     if (count == 1) {
> >         /* DSA doesn't take batch operation with only 1 task. */
> > -        buffer_zero_dsa_async(batch_task, buf[0], len);
> > +        buffer_zero_dsa_async(dsa_batch, buf[0], len);
> >     } else {
> > -        buffer_zero_dsa_batch_async(batch_task, buf, count, len);
> > +        buffer_zero_dsa_batch_async(dsa_batch, buf, count, len);
> >     }
> >
> > -    buffer_zero_dsa_wait(batch_task);
> > -    buffer_zero_cpu_fallback(batch_task);
> > +    buffer_zero_dsa_wait(dsa_batch);
> > +    buffer_zero_cpu_fallback(dsa_batch);
> >
> >     return 0;
> > }
> >
> > #endif
> >
> > +/**
> > + * @brief Initializes a general buffer zero batch task.
> > + *
> > + * @param task A pointer to the general batch task to initialize.
> > + * @param batch_size The number of zero page checking tasks in the batch.
> > + */
> > +void
> > +batch_task_init(struct batch_task *task, int batch_size)
> > +{
> > +    task->addr = g_new0(ram_addr_t, batch_size);
> > +    task->results = g_new0(bool, batch_size);
> > +#ifdef CONFIG_DSA_OPT
> > +    task->dsa_batch = qemu_memalign(64, sizeof(struct dsa_batch_task));
> > +    buffer_zero_batch_task_init(task->dsa_batch, task->results, batch_size);
> > +#endif
> > +}
> > +
> > +/**
> > + * @brief Destroys a general buffer zero batch task.
> > + *
> > + * @param task A pointer to the general batch task to destroy.
> > + */
> > +void
> > +batch_task_destroy(struct batch_task *task)
> > +{
> > +    g_free(task->addr);
> > +    g_free(task->results);
> > +#ifdef CONFIG_DSA_OPT
> > +    buffer_zero_batch_task_destroy(task->dsa_batch);
> > +    qemu_vfree(task->dsa_batch);
> > +#endif
> > +}
> > +
> > --
> > 2.30.2
> >
> >
> >
>
Re: [External] Re: [PATCH v3 13/20] migration/multifd: Prepare to introduce DSA acceleration on the multifd path.
Posted by Peter Xu 9 months, 4 weeks ago
On Mon, Jan 22, 2024 at 04:37:09PM -0800, Hao Xiang wrote:
> > > +static void set_normal_pages(MultiFDSendParams *p)
> > > +{
> > > +    for (int i = 0; i < p->pages->num; i++) {
> > > +        p->batch_task->results[i] = false;
> > > +    }
> > > +}
> > Please correct me if I am wrong but set_normal_pages will not be a part of the final patch, right? They are there for testing out the performance against different zero page ration scenarios. If it’s so, can we isolate these parts into a separate patch.
> 
> set_normal_pages is used for performance testing and testing only. It
> won't introduce any "incorrect" behavior and I would love to see it
> being part of the upstream code. But the argument that testing change
> should remain private is always correct. So I am totally OK with
> isolating the parts into a separate patch.

IMHO we can allow that to be production code; as long as the new zeropage
detection parameter can allow user to choose "none" (as I mentioned in
another reply), then it's not a test code only but allow the user to
disable zeropage detections when the user wants.  Thanks,

-- 
Peter Xu