On 11/14/2023 13:40, Hao Xiang wrote:> 1. Refactor multifd_send_thread function.
> 2. Implement buffer_is_zero_use_cpu to handle CPU based zero page
> checking.
> 3. Introduce the batch task structure in MultiFDSendParams.
>
> Signed-off-by: Hao Xiang <hao.xiang@bytedance.com>
> ---
> migration/multifd.c | 82 ++++++++++++++++++++++++++++++++++++---------
> migration/multifd.h | 3 ++
> 2 files changed, 70 insertions(+), 15 deletions(-)
>
> diff --git a/migration/multifd.c b/migration/multifd.c
> index 1198ffde9c..68ab97f918 100644
> --- a/migration/multifd.c
> +++ b/migration/multifd.c
> @@ -14,6 +14,8 @@
> #include "qemu/cutils.h"
> #include "qemu/rcu.h"
> #include "qemu/cutils.h"
> +#include "qemu/dsa.h"
> +#include "qemu/memalign.h"
> #include "exec/target_page.h"
> #include "sysemu/sysemu.h"
> #include "exec/ramblock.h"
> @@ -574,6 +576,11 @@ void multifd_save_cleanup(void)
> p->name = NULL;
> multifd_pages_clear(p->pages);
> p->pages = NULL;
> + g_free(p->addr);
> + p->addr = NULL;
> + buffer_zero_batch_task_destroy(p->batch_task);
> + qemu_vfree(p->batch_task);
> + p->batch_task = NULL;
> p->packet_len = 0;
> g_free(p->packet);
> p->packet = NULL;
> @@ -678,13 +685,66 @@ int multifd_send_sync_main(QEMUFile *f)
> return 0;
> }
>
> +static void set_page(MultiFDSendParams *p, bool zero_page, uint64_t offset)
> +{
> + RAMBlock *rb = p->pages->block;
> + if (zero_page) {
> + p->zero[p->zero_num] = offset;
> + p->zero_num++;
> + ram_release_page(rb->idstr, offset);
> + } else {
> + p->normal[p->normal_num] = offset;
> + p->normal_num++;
> + }
> +}
> +
> +static void buffer_is_zero_use_cpu(MultiFDSendParams *p)
> +{
> + const void **buf = (const void **)p->addr;
> + assert(!migrate_use_main_zero_page());
> +
> + for (int i = 0; i < p->pages->num; i++) {
> + p->batch_task->results[i] = buffer_is_zero(buf[i], p->page_size);
> + }
> +}
> +
> +static void set_normal_pages(MultiFDSendParams *p)
> +{
> + for (int i = 0; i < p->pages->num; i++) {
> + p->batch_task->results[i] = false;
> + }
> +}
> +
> +static void multifd_zero_page_check(MultiFDSendParams *p)
> +{
> + /* older qemu don't understand zero page on multifd channel */
> + bool use_multifd_zero_page = !migrate_use_main_zero_page();
> +
> + RAMBlock *rb = p->pages->block;
> +
> + for (int i = 0; i < p->pages->num; i++) {
> + p->addr[i] = (ram_addr_t)(rb->host + p->pages->offset[i]);
> + }
> +
> + if (use_multifd_zero_page) {
> + buffer_is_zero_use_cpu(p);
> + } else {
> + // No zero page checking. All pages are normal pages.
Please pay attention to the comment style here and in other patches.
> + set_normal_pages(p);
> + }
> +
> + for (int i = 0; i < p->pages->num; i++) {
> + uint64_t offset = p->pages->offset[i];
> + bool zero_page = p->batch_task->results[i];
> + set_page(p, zero_page, offset);
> + }
> +}
> +
> static void *multifd_send_thread(void *opaque)
> {
> MultiFDSendParams *p = opaque;
> MigrationThread *thread = NULL;
> Error *local_err = NULL;
> - /* qemu older than 8.2 don't understand zero page on multifd channel */
> - bool use_multifd_zero_page = !migrate_use_main_zero_page();
> int ret = 0;
> bool use_zero_copy_send = migrate_zero_copy_send();
>
> @@ -710,7 +770,6 @@ static void *multifd_send_thread(void *opaque)
> qemu_mutex_lock(&p->mutex);
>
> if (p->pending_job) {
> - RAMBlock *rb = p->pages->block;
> uint64_t packet_num = p->packet_num;
> uint32_t flags;
>
> @@ -723,18 +782,7 @@ static void *multifd_send_thread(void *opaque)
> p->iovs_num = 1;
> }
>
> - for (int i = 0; i < p->pages->num; i++) {
> - uint64_t offset = p->pages->offset[i];
> - if (use_multifd_zero_page &&
> - buffer_is_zero(rb->host + offset, p->page_size)) {
> - p->zero[p->zero_num] = offset;
> - p->zero_num++;
> - ram_release_page(rb->idstr, offset);
> - } else {
> - p->normal[p->normal_num] = offset;
> - p->normal_num++;
> - }
> - }
> + multifd_zero_page_check(p);
>
> if (p->normal_num) {
> ret = multifd_send_state->ops->send_prepare(p, &local_err);
> @@ -976,6 +1024,10 @@ int multifd_save_setup(Error **errp)
> p->pending_job = 0;
> p->id = i;
> p->pages = multifd_pages_init(page_count);
> + p->addr = g_new0(ram_addr_t, page_count);
> + p->batch_task =
> + (struct buffer_zero_batch_task *)qemu_memalign(64, sizeof(*p->batch_task));
> + buffer_zero_batch_task_init(p->batch_task, page_count);
> p->packet_len = sizeof(MultiFDPacket_t)
> + sizeof(uint64_t) * page_count;
> p->packet = g_malloc0(p->packet_len);
> diff --git a/migration/multifd.h b/migration/multifd.h
> index 13762900d4..62f31b03c0 100644
> --- a/migration/multifd.h
> +++ b/migration/multifd.h
> @@ -119,6 +119,9 @@ typedef struct {
> * pending_job != 0 -> multifd_channel can use it.
> */
> MultiFDPages_t *pages;
> + /* Address of each pages in pages */
s/pages/page
> + ram_addr_t *addr;
I think there is not need to introduce this variable since is can be simply
derived by:
p->addr[i] = (ram_addr_t)(rb->host + p->pages->offset[i]);
and it is useless when we check the zero page in main thread (main-zero-page=y)
> + struct buffer_zero_batch_task *batch_task;
>
> /* thread local variables. No locking required */
>