[PATCH 5/5] migration iaa-compress: Implement IAA compression

Yuan Liu posted 5 patches 1 year, 1 month ago
Maintainers: Paolo Bonzini <pbonzini@redhat.com>, "Marc-André Lureau" <marcandre.lureau@redhat.com>, "Daniel P. Berrangé" <berrange@redhat.com>, Thomas Huth <thuth@redhat.com>, "Philippe Mathieu-Daudé" <philmd@linaro.org>, Juan Quintela <quintela@redhat.com>, Peter Xu <peterx@redhat.com>, Fabiano Rosas <farosas@suse.de>, Leonardo Bras <leobras@redhat.com>, Eric Blake <eblake@redhat.com>, Markus Armbruster <armbru@redhat.com>
There is a newer version of this series
[PATCH 5/5] migration iaa-compress: Implement IAA compression
Posted by Yuan Liu 1 year, 1 month ago
Implement the functions of IAA for data compression and decompression.
The implementation uses non-blocking job submission and polling to check
the job completion status to reduce IAA's overhead in the live migration
process.

Signed-off-by: Yuan Liu <yuan1.liu@intel.com>
Reviewed-by: Nanhai Zou <nanhai.zou@intel.com>
---
 migration/iaa-ram-compress.c | 167 +++++++++++++++++++++++++++++++++++
 migration/iaa-ram-compress.h |   7 ++
 migration/ram-compress.c     |  10 ++-
 migration/ram.c              |  56 ++++++++++--
 4 files changed, 232 insertions(+), 8 deletions(-)

diff --git a/migration/iaa-ram-compress.c b/migration/iaa-ram-compress.c
index da45952594..243aeb6d55 100644
--- a/migration/iaa-ram-compress.c
+++ b/migration/iaa-ram-compress.c
@@ -12,6 +12,7 @@
 
 #include "qemu/osdep.h"
 #include "qemu/cutils.h"
+
 #include "qemu/error-report.h"
 #include "migration.h"
 #include "options.h"
@@ -62,6 +63,31 @@ static IaaJobPool iaa_job_pool;
 static QSIMPLEQ_HEAD(, IaaJob) polling_queue =
                                    QSIMPLEQ_HEAD_INITIALIZER(polling_queue);
 
+static IaaJob *get_job(send_iaa_data send_page)
+{
+    IaaJob *job;
+
+retry:
+    /* Wait for a job to complete when there is no available job */
+    if (iaa_job_pool.cnt == IAA_JOB_NUM) {
+        flush_iaa_jobs(false, send_page);
+        goto retry;
+    }
+    job = iaa_job_pool.jobs[iaa_job_pool.pos];
+    iaa_job_pool.pos++;
+    iaa_job_pool.cnt++;
+    if (iaa_job_pool.pos == IAA_JOB_NUM) {
+        iaa_job_pool.pos = 0;
+    }
+    return job;
+}
+
+static void put_job(IaaJob *job)
+{
+    assert(iaa_job_pool.cnt > 0);
+    iaa_job_pool.cnt--;
+}
+
 void iaa_compress_deinit(void)
 {
     for (int i = 0; i < IAA_JOB_NUM; i++) {
@@ -150,3 +176,144 @@ init_err:
     iaa_compress_deinit();
     return -1;
 }
+
+static void process_completed_job(IaaJob *job, send_iaa_data send_page)
+{
+    if (job->is_compression) {
+        send_page(job->param.comp.block, job->param.comp.offset,
+                  job->out_buf, job->out_len, job->param.comp.result);
+    } else {
+        assert(job->out_len == qemu_target_page_size());
+        memcpy(job->param.decomp.host, job->out_buf, job->out_len);
+    }
+    put_job(job);
+}
+
+static qpl_status check_job_status(IaaJob *job, bool block)
+{
+    qpl_status status;
+    qpl_job *qpl = job->qpl;
+
+    status = block ? qpl_wait_job(qpl) : qpl_check_job(qpl);
+    if (status == QPL_STS_OK) {
+        job->out_len = qpl->total_out;
+        if (job->is_compression) {
+            job->param.comp.result = RES_COMPRESS;
+            /* if no compression benefit, send a normal page for migration */
+            if (job->out_len == qemu_target_page_size()) {
+                iaa_comp_param *param = &(job->param.comp);
+                memcpy(job->out_buf, (param->block->host + param->offset),
+                       job->out_len);
+                job->param.comp.result = RES_NONE;
+            }
+        }
+    } else if (status == QPL_STS_MORE_OUTPUT_NEEDED) {
+        if (job->is_compression) {
+            /*
+             * if the compressed data is larger than the original data, send a
+             * normal page for migration, in this case, IAA has copied the
+             * original data to job->out_buf automatically.
+             */
+            job->out_len = qemu_target_page_size();
+            job->param.comp.result = RES_NONE;
+            status = QPL_STS_OK;
+        }
+    }
+    return status;
+}
+
+static void check_polling_jobs(send_iaa_data send_page)
+{
+    IaaJob *job, *job_next;
+    qpl_status status;
+
+    QSIMPLEQ_FOREACH_SAFE(job, &polling_queue, entry, job_next) {
+        status = check_job_status(job, false);
+        if (status == QPL_STS_OK) { /* job has done */
+            process_completed_job(job, send_page);
+            QSIMPLEQ_REMOVE_HEAD(&polling_queue, entry);
+        } else if (status == QPL_STS_BEING_PROCESSED) { /* job is running */
+            break;
+        } else {
+            abort();
+        }
+    }
+}
+
+static int submit_new_job(IaaJob *job)
+{
+    qpl_status status;
+    qpl_job *qpl = job->qpl;
+
+    qpl->op = job->is_compression ? qpl_op_compress : qpl_op_decompress;
+    qpl->next_in_ptr = job->in_buf;
+    qpl->next_out_ptr = job->out_buf;
+    qpl->available_in = job->in_len;
+    qpl->available_out = qemu_target_page_size(); /* outbuf maximum size */
+    qpl->flags = QPL_FLAG_FIRST | QPL_FLAG_LAST | QPL_FLAG_OMIT_VERIFY;
+    qpl->level = 1; /* only level 1 compression is supported */
+
+    do {
+        status = qpl_submit_job(qpl);
+    } while (status == QPL_STS_QUEUES_ARE_BUSY_ERR);
+
+    if (status != QPL_STS_OK) {
+        error_report("Failed to submit iaa job, error %d", status);
+        return -1;
+    }
+    QSIMPLEQ_INSERT_TAIL(&polling_queue, job, entry);
+    return 0;
+}
+
+int flush_iaa_jobs(bool flush_all_jobs, send_iaa_data send_page)
+{
+    IaaJob *job, *job_next;
+
+    QSIMPLEQ_FOREACH_SAFE(job, &polling_queue, entry, job_next) {
+        if (check_job_status(job, true) != QPL_STS_OK) {
+            return -1;
+        }
+        process_completed_job(job, send_page);
+        QSIMPLEQ_REMOVE_HEAD(&polling_queue, entry);
+        if (!flush_all_jobs) {
+            break;
+        }
+    }
+    return 0;
+}
+
+int compress_page_with_iaa(RAMBlock *block, ram_addr_t offset,
+                           send_iaa_data send_page)
+{
+    IaaJob *job;
+
+    if (iaa_job_pool.cnt != 0) {
+        check_polling_jobs(send_page);
+    }
+    if (buffer_is_zero(block->host + offset, qemu_target_page_size())) {
+        send_page(block, offset, NULL, 0, RES_ZEROPAGE);
+        return 1;
+    }
+    job = get_job(send_page);
+    job->is_compression = true;
+    job->in_buf = block->host + offset;
+    job->in_len = qemu_target_page_size();
+    job->param.comp.offset = offset;
+    job->param.comp.block = block;
+    return (submit_new_job(job) == 0 ? 1 : 0);
+}
+
+int decompress_data_with_iaa(QEMUFile *f, void *host, int len)
+{
+    IaaJob *job;
+
+    if (iaa_job_pool.cnt != 0) {
+        check_polling_jobs(NULL);
+    }
+    job = get_job(NULL);
+    job->is_compression = false;
+    qemu_get_buffer(f, job->in_buf, len);
+    job->in_len = len;
+    job->param.decomp.host = host;
+    return submit_new_job(job);
+}
diff --git a/migration/iaa-ram-compress.h b/migration/iaa-ram-compress.h
index 27998b255b..5a555b3b8d 100644
--- a/migration/iaa-ram-compress.h
+++ b/migration/iaa-ram-compress.h
@@ -15,6 +15,13 @@
 #include "qemu-file.h"
 #include "ram-compress.h"
 
+typedef int (*send_iaa_data) (RAMBlock *block, ram_addr_t offset, uint8_t *data,
+                              uint32_t data_len, CompressResult result);
+
 int iaa_compress_init(bool is_decompression);
 void iaa_compress_deinit(void);
+int compress_page_with_iaa(RAMBlock *block, ram_addr_t offset,
+                           send_iaa_data send_page);
+int decompress_data_with_iaa(QEMUFile *f, void *host, int len);
+int flush_iaa_jobs(bool flush_all_jobs, send_iaa_data send_page);
 #endif
diff --git a/migration/ram-compress.c b/migration/ram-compress.c
index acc511ce57..0bddf8b9ea 100644
--- a/migration/ram-compress.c
+++ b/migration/ram-compress.c
@@ -370,10 +370,11 @@ int wait_for_decompress_done(void)
         return 0;
     }
 
+#ifdef CONFIG_QPL
     if (migrate_compress_with_iaa()) {
-        /* Implement in next patch */
-        return 0;
+        return flush_iaa_jobs(true, NULL);
     }
+#endif
 
     thread_count = migrate_decompress_threads();
     qemu_mutex_lock(&decomp_done_lock);
@@ -511,9 +512,12 @@ void ram_compress_save_cleanup(void)
 
 void ram_decompress_data(QEMUFile *f, void *host, int len)
 {
+#ifdef CONFIG_QPL
     if (migrate_compress_with_iaa()) {
-        /* Implement in next patch */
+        decompress_data_with_iaa(f, host, len);
+        return;
     }
+#endif
     decompress_data_with_multi_threads(f, host, len);
 }
 
diff --git a/migration/ram.c b/migration/ram.c
index 34ee1de332..5ef818112c 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -69,6 +69,9 @@
 #include "qemu/userfaultfd.h"
 #endif /* defined(__linux__) */
 
+#ifdef CONFIG_QPL
+#include "iaa-ram-compress.h"
+#endif
 /***********************************************************/
 /* ram save/restore */
 
@@ -1342,16 +1345,59 @@ static int send_queued_data(CompressParam *param)
     return len;
 }
 
+#ifdef CONFIG_QPL
+static int send_iaa_compressed_page(RAMBlock *block, ram_addr_t offset,
+                                    uint8_t *data, uint32_t data_len,
+                                    CompressResult result)
+{
+    PageSearchStatus *pss = &ram_state->pss[RAM_CHANNEL_PRECOPY];
+    MigrationState *ms = migrate_get_current();
+    QEMUFile *file = ms->to_dst_file;
+    int len = 0;
+
+    assert(block == pss->last_sent_block);
+    if (result == RES_ZEROPAGE) {
+        len += save_page_header(pss, file, block, offset | RAM_SAVE_FLAG_ZERO);
+        qemu_put_byte(file, 0);
+        len += 1;
+        ram_release_page(block->idstr, offset);
+        stat64_add(&mig_stats.zero_pages, 1);
+    } else if (result == RES_COMPRESS) {
+        assert(data != NULL);
+        assert((data_len > 0) && (data_len < qemu_target_page_size()));
+        len += save_page_header(pss, file, block,
+                                offset | RAM_SAVE_FLAG_COMPRESS_PAGE);
+        qemu_put_be32(file, data_len);
+        qemu_put_buffer(file, data, data_len);
+        len += data_len;
+        /* 8 means a header with RAM_SAVE_FLAG_CONTINUE. */
+        compression_counters.compressed_size += len - 8;
+        compression_counters.pages++;
+    } else if (result == RES_NONE) {
+        assert((data != NULL) && (data_len == TARGET_PAGE_SIZE));
+        len += save_page_header(pss, file, block, offset | RAM_SAVE_FLAG_PAGE);
+        qemu_put_buffer(file, data, data_len);
+        len += data_len;
+        stat64_add(&mig_stats.normal_pages, 1);
+    } else {
+        abort();
+    }
+    ram_transferred_add(len);
+    return len;
+}
+#endif
+
 static void ram_flush_compressed_data(RAMState *rs)
 {
     if (!save_page_use_compression(rs)) {
         return;
     }
+#ifdef CONFIG_QPL
     if (migrate_compress_with_iaa()) {
-        /* Implement in next patch */
+        flush_iaa_jobs(true, send_iaa_compressed_page);
         return;
     }
-
+#endif
     flush_compressed_data(send_queued_data);
 }
 
@@ -2102,11 +2148,11 @@ static bool save_compress_page(RAMState *rs, PageSearchStatus *pss,
         ram_flush_compressed_data(rs);
         return false;
     }
-
+#ifdef CONFIG_QPL
     if (migrate_compress_with_iaa()) {
-        /* Implement in next patch */
-        return true;
+        return compress_page_with_iaa(block, offset, send_iaa_compressed_page);
     }
+#endif
     if (compress_page_with_multi_thread(block, offset, send_queued_data) > 0) {
         return true;
     }
-- 
2.39.3
Re: [PATCH 5/5] migration iaa-compress: Implement IAA compression
Posted by Juan Quintela 1 year, 1 month ago
Yuan Liu <yuan1.liu@intel.com> wrote:
> Implement the functions of IAA for data compression and decompression.
> The implementation uses non-blocking job submission and polling to check
> the job completion status to reduce IAA's overhead in the live migration
> process.
>
> Signed-off-by: Yuan Liu <yuan1.liu@intel.com>
> Reviewed-by: Nanhai Zou <nanhai.zou@intel.com>


> +static void process_completed_job(IaaJob *job, send_iaa_data send_page)
> +{
> +    if (job->is_compression) {
> +        send_page(job->param.comp.block, job->param.comp.offset,
> +                  job->out_buf, job->out_len, job->param.comp.result);
> +    } else {
> +        assert(job->out_len == qemu_target_page_size());
> +        memcpy(job->param.decomp.host, job->out_buf, job->out_len);
> +    }
> +    put_job(job);
> +}

Shouldn't it be easier to add a helper to job struct and not having that
if here?  I.e. become:

static void process_completed_job(IaaJob *job, send_iaa_data send_page)
{
    job->completed(job, send_page);
    put_job(job);
}

And do proper initializations.  You can even put the send_page callback
in the job struct.

> +static qpl_status check_job_status(IaaJob *job, bool block)
> +{
> +    qpl_status status;
> +    qpl_job *qpl = job->qpl;
> +
> +    status = block ? qpl_wait_job(qpl) : qpl_check_job(qpl);
> +    if (status == QPL_STS_OK) {
> +        job->out_len = qpl->total_out;
> +        if (job->is_compression) {
> +            job->param.comp.result = RES_COMPRESS;
> +            /* if no compression benefit, send a normal page for migration */
> +            if (job->out_len == qemu_target_page_size()) {
> +                iaa_comp_param *param = &(job->param.comp);
> +                memcpy(job->out_buf, (param->block->host + param->offset),
> +                       job->out_len);
> +                job->param.comp.result = RES_NONE;
> +            }
> +        }
> +    } else if (status == QPL_STS_MORE_OUTPUT_NEEDED) {
> +        if (job->is_compression) {
> +            /*
> +             * if the compressed data is larger than the original data, send a
> +             * normal page for migration, in this case, IAA has copied the
> +             * original data to job->out_buf automatically.
> +             */
> +            job->out_len = qemu_target_page_size();
> +            job->param.comp.result = RES_NONE;
> +            status = QPL_STS_OK;
> +        }
> +    }

Again, this function for decompression becomes a single line:

    status = block ? qpl_wait_job(qpl) : qpl_check_job(qpl);
    if (status == QPL_STS_OK) {
        job->out_len = qpl->total_out;
    }

Wait complicate it?

> +static void check_polling_jobs(send_iaa_data send_page)
> +{
> +    IaaJob *job, *job_next;
> +    qpl_status status;
> +
> +    QSIMPLEQ_FOREACH_SAFE(job, &polling_queue, entry, job_next) {
> +        status = check_job_status(job, false);
> +        if (status == QPL_STS_OK) { /* job has done */
> +            process_completed_job(job, send_page);
> +            QSIMPLEQ_REMOVE_HEAD(&polling_queue, entry);
> +        } else if (status == QPL_STS_BEING_PROCESSED) { /* job is running */
> +            break;
> +        } else {
> +            abort();

Not even printing an error message?

The two callers of check_polling_jobs() can return an error, so no
reason to abort() here.

Later, Juan.