[PATCH v7 6/7] migration/multifd: implement qpl compression and decompression

Yuan Liu posted 7 patches 5 months, 3 weeks ago
Maintainers: Peter Xu <peterx@redhat.com>, Fabiano Rosas <farosas@suse.de>, Paolo Bonzini <pbonzini@redhat.com>, "Daniel P. Berrangé" <berrange@redhat.com>, Eduardo Habkost <eduardo@habkost.net>, "Marc-André Lureau" <marcandre.lureau@redhat.com>, Thomas Huth <thuth@redhat.com>, "Philippe Mathieu-Daudé" <philmd@linaro.org>, Eric Blake <eblake@redhat.com>, Markus Armbruster <armbru@redhat.com>, Laurent Vivier <lvivier@redhat.com>
There is a newer version of this series
[PATCH v7 6/7] migration/multifd: implement qpl compression and decompression
Posted by Yuan Liu 5 months, 3 weeks ago
QPL compression and decompression will use IAA hardware first.
If IAA hardware is not available, it will automatically fall
back to QPL software path, if the software job also fails,
the uncompressed page is sent directly.

Signed-off-by: Yuan Liu <yuan1.liu@intel.com>
Reviewed-by: Nanhai Zou <nanhai.zou@intel.com>
---
 migration/multifd-qpl.c | 412 +++++++++++++++++++++++++++++++++++++++-
 1 file changed, 408 insertions(+), 4 deletions(-)

diff --git a/migration/multifd-qpl.c b/migration/multifd-qpl.c
index 6791a204d5..18b3384bd5 100644
--- a/migration/multifd-qpl.c
+++ b/migration/multifd-qpl.c
@@ -13,9 +13,14 @@
 #include "qemu/osdep.h"
 #include "qemu/module.h"
 #include "qapi/error.h"
+#include "qapi/qapi-types-migration.h"
+#include "exec/ramblock.h"
 #include "multifd.h"
 #include "qpl/qpl.h"
 
+/* Maximum number of retries to resubmit a job if IAA work queues are full */
+#define MAX_SUBMIT_RETRY_NUM (3)
+
 typedef struct {
     /* the QPL hardware path job */
     qpl_job *job;
@@ -260,6 +265,219 @@ static void multifd_qpl_send_cleanup(MultiFDSendParams *p, Error **errp)
     p->iov = NULL;
 }
 
+/**
+ * multifd_qpl_prepare_job: prepare the job
+ *
+ * Set the QPL job parameters and properties.
+ *
+ * @job: pointer to the qpl_job structure
+ * @is_compression: indicates compression and decompression
+ * @input: pointer to the input data buffer
+ * @input_len: the length of the input data
+ * @output: pointer to the output data buffer
+ * @output_len: the length of the output data
+ */
+static void multifd_qpl_prepare_job(qpl_job *job, bool is_compression,
+                                    uint8_t *input, uint32_t input_len,
+                                    uint8_t *output, uint32_t output_len)
+{
+    job->op = is_compression ? qpl_op_compress : qpl_op_decompress;
+    job->next_in_ptr = input;
+    job->next_out_ptr = output;
+    job->available_in = input_len;
+    job->available_out = output_len;
+    job->flags = QPL_FLAG_FIRST | QPL_FLAG_LAST | QPL_FLAG_OMIT_VERIFY;
+    /* only supports compression level 1 */
+    job->level = 1;
+}
+
+/**
+ * multifd_qpl_prepare_job: prepare the compression job
+ *
+ * Set the compression job parameters and properties.
+ *
+ * @job: pointer to the qpl_job structure
+ * @input: pointer to the input data buffer
+ * @input_len: the length of the input data
+ * @output: pointer to the output data buffer
+ * @output_len: the length of the output data
+ */
+static void multifd_qpl_prepare_comp_job(qpl_job *job, uint8_t *input,
+                                         uint32_t input_len, uint8_t *output,
+                                         uint32_t output_len)
+{
+    multifd_qpl_prepare_job(job, true, input, input_len, output, output_len);
+}
+
+/**
+ * multifd_qpl_prepare_job: prepare the decompression job
+ *
+ * Set the decompression job parameters and properties.
+ *
+ * @job: pointer to the qpl_job structure
+ * @input: pointer to the input data buffer
+ * @input_len: the length of the input data
+ * @output: pointer to the output data buffer
+ * @output_len: the length of the output data
+ */
+static void multifd_qpl_prepare_decomp_job(qpl_job *job, uint8_t *input,
+                                           uint32_t input_len, uint8_t *output,
+                                           uint32_t output_len)
+{
+    multifd_qpl_prepare_job(job, false, input, input_len, output, output_len);
+}
+
+/**
+ * multifd_qpl_fill_iov: fill in the IOV
+ *
+ * Fill in the QPL packet IOV
+ *
+ * @p: Params for the channel being used
+ * @data: pointer to the IOV data
+ * @len: The length of the IOV data
+ */
+static void multifd_qpl_fill_iov(MultiFDSendParams *p, uint8_t *data,
+                                 uint32_t len)
+{
+    p->iov[p->iovs_num].iov_base = data;
+    p->iov[p->iovs_num].iov_len = len;
+    p->iovs_num++;
+    p->next_packet_size += len;
+}
+
+/**
+ * multifd_qpl_fill_packet: fill the compressed page into the QPL packet
+ *
+ * Fill the compressed page length and IOV into the QPL packet
+ *
+ * @idx: The index of the compressed length array
+ * @p: Params for the channel being used
+ * @data: pointer to the compressed page buffer
+ * @len: The length of the compressed page
+ */
+static void multifd_qpl_fill_packet(uint32_t idx, MultiFDSendParams *p,
+                                    uint8_t *data, uint32_t len)
+{
+    QplData *qpl = p->compress_data;
+
+    qpl->zlen[idx] = cpu_to_be32(len);
+    multifd_qpl_fill_iov(p, data, len);
+}
+
+/**
+ * multifd_qpl_submit_job: submit a job to the hardware
+ *
+ * Submit a QPL hardware job to the IAA device
+ *
+ * Returns true if the job is submitted successfully, otherwise false.
+ *
+ * @job: pointer to the qpl_job structure
+ */
+static bool multifd_qpl_submit_job(qpl_job *job)
+{
+    qpl_status status;
+    uint32_t num = 0;
+
+retry:
+    status = qpl_submit_job(job);
+    if (status == QPL_STS_QUEUES_ARE_BUSY_ERR) {
+        if (num < MAX_SUBMIT_RETRY_NUM) {
+            num++;
+            goto retry;
+        }
+    }
+    return (status == QPL_STS_OK);
+}
+
+/**
+ * multifd_qpl_compress_pages_slow_path: compress pages using slow path
+ *
+ * Compress the pages using software. If compression fails, the page will
+ * be sent directly.
+ *
+ * @p: Params for the channel being used
+ */
+static void multifd_qpl_compress_pages_slow_path(MultiFDSendParams *p)
+{
+    QplData *qpl = p->compress_data;
+    uint32_t size = p->page_size;
+    qpl_job *job = qpl->sw_job;
+    uint8_t *zbuf = qpl->zbuf;
+    uint8_t *buf;
+
+    for (int i = 0; i < p->pages->normal_num; i++) {
+        buf = p->pages->block->host + p->pages->offset[i];
+        /* Set output length to less than the page to reduce decompression */
+        multifd_qpl_prepare_comp_job(job, buf, size, zbuf, size - 1);
+        if (qpl_execute_job(job) == QPL_STS_OK) {
+            multifd_qpl_fill_packet(i, p, zbuf, job->total_out);
+        } else {
+            /* send the page directly */
+            multifd_qpl_fill_packet(i, p, buf, size);
+        }
+        zbuf += size;
+    }
+}
+
+/**
+ * multifd_qpl_compress_pages: compress pages
+ *
+ * Submit the pages to the IAA hardware for compression. If hardware
+ * compression fails, it falls back to software compression. If software
+ * compression also fails, the page is sent directly
+ *
+ * @p: Params for the channel being used
+ */
+static void multifd_qpl_compress_pages(MultiFDSendParams *p)
+{
+    QplData *qpl = p->compress_data;
+    MultiFDPages_t *pages = p->pages;
+    uint32_t size = p->page_size;
+    QplHwJob *hw_job;
+    uint8_t *buf;
+    uint8_t *zbuf;
+
+    for (int i = 0; i < pages->normal_num; i++) {
+        buf = pages->block->host + pages->offset[i];
+        zbuf = qpl->zbuf + (size * i);
+        hw_job = &qpl->hw_jobs[i];
+        /* Set output length to less than the page to reduce decompression */
+        multifd_qpl_prepare_comp_job(hw_job->job, buf, size, zbuf, size - 1);
+        if (multifd_qpl_submit_job(hw_job->job)) {
+            hw_job->fallback_sw_path = false;
+        } else {
+            hw_job->fallback_sw_path = true;
+            /* Set output length less than page size to reduce decompression */
+            multifd_qpl_prepare_comp_job(qpl->sw_job, buf, size, zbuf,
+                                         size - 1);
+            if (qpl_execute_job(qpl->sw_job) == QPL_STS_OK) {
+                hw_job->sw_output = zbuf;
+                hw_job->sw_output_len = qpl->sw_job->total_out;
+            } else {
+                hw_job->sw_output = buf;
+                hw_job->sw_output_len = size;
+            }
+        }
+    }
+
+    for (int i = 0; i < pages->normal_num; i++) {
+        buf = pages->block->host + pages->offset[i];
+        zbuf = qpl->zbuf + (size * i);
+        hw_job = &qpl->hw_jobs[i];
+        if (hw_job->fallback_sw_path) {
+            multifd_qpl_fill_packet(i, p, hw_job->sw_output,
+                                    hw_job->sw_output_len);
+            continue;
+        }
+        if (qpl_wait_job(hw_job->job) == QPL_STS_OK) {
+            multifd_qpl_fill_packet(i, p, zbuf, hw_job->job->total_out);
+        } else {
+            /* send the page directly */
+            multifd_qpl_fill_packet(i, p, buf, size);
+        }
+    }
+}
+
 /**
  * multifd_qpl_send_prepare: prepare data to be able to send
  *
@@ -273,8 +491,26 @@ static void multifd_qpl_send_cleanup(MultiFDSendParams *p, Error **errp)
  */
 static int multifd_qpl_send_prepare(MultiFDSendParams *p, Error **errp)
 {
-    /* Implement in next patch */
-    return -1;
+    QplData *qpl = p->compress_data;
+    uint32_t len = 0;
+
+    if (!multifd_send_prepare_common(p)) {
+        goto out;
+    }
+
+    /* The first IOV is used to store the compressed page lengths */
+    len = p->pages->normal_num * sizeof(uint32_t);
+    multifd_qpl_fill_iov(p, (uint8_t *) qpl->zlen, len);
+    if (qpl->hw_avail) {
+        multifd_qpl_compress_pages(p);
+    } else {
+        multifd_qpl_compress_pages_slow_path(p);
+    }
+
+out:
+    p->flags |= MULTIFD_FLAG_QPL;
+    multifd_send_fill_packet(p);
+    return 0;
 }
 
 /**
@@ -312,6 +548,134 @@ static void multifd_qpl_recv_cleanup(MultiFDRecvParams *p)
     p->compress_data = NULL;
 }
 
+/**
+ * multifd_qpl_process_and_check_job: process and check a QPL job
+ *
+ * Process the job and check whether the job output length is the
+ * same as the specified length
+ *
+ * Returns true if the job execution succeeded and the output length
+ * is equal to the specified length, otherwise false.
+ *
+ * @job: pointer to the qpl_job structure
+ * @is_hardware: indicates whether the job is a hardware job
+ * @len: Specified output length
+ * @errp: pointer to an error
+ */
+static bool multifd_qpl_process_and_check_job(qpl_job *job, bool is_hardware,
+                                              uint32_t len, Error **errp)
+{
+    qpl_status status;
+
+    status = (is_hardware ? qpl_wait_job(job) : qpl_execute_job(job));
+    if (status != QPL_STS_OK) {
+        error_setg(errp, "qpl_execute_job failed with error %d", status);
+        return false;
+    }
+    if (job->total_out != len) {
+        error_setg(errp, "qpl decompressed len %u, expected len %u",
+                   job->total_out, len);
+        return false;
+    }
+    return true;
+}
+
+/**
+ * multifd_qpl_decompress_pages_slow_path: decompress pages using slow path
+ *
+ * Decompress the pages using software
+ *
+ * Returns 0 on success or -1 on error
+ *
+ * @p: Params for the channel being used
+ * @errp: pointer to an error
+ */
+static int multifd_qpl_decompress_pages_slow_path(MultiFDRecvParams *p,
+                                                  Error **errp)
+{
+    QplData *qpl = p->compress_data;
+    uint32_t size = p->page_size;
+    qpl_job *job = qpl->sw_job;
+    uint8_t *zbuf = qpl->zbuf;
+    uint8_t *addr;
+    uint32_t len;
+
+    for (int i = 0; i < p->normal_num; i++) {
+        len = qpl->zlen[i];
+        addr = p->host + p->normal[i];
+        /* the page is uncompressed, load it */
+        if (len == size) {
+            memcpy(addr, zbuf, size);
+            zbuf += size;
+            continue;
+        }
+        multifd_qpl_prepare_decomp_job(job, zbuf, len, addr, size);
+        if (!multifd_qpl_process_and_check_job(job, false, size, errp)) {
+            return -1;
+        }
+        zbuf += len;
+    }
+    return 0;
+}
+
+/**
+ * multifd_qpl_decompress_pages: decompress pages
+ *
+ * Decompress the pages using the IAA hardware. If hardware
+ * decompression fails, it falls back to software decompression.
+ *
+ * Returns 0 on success or -1 on error
+ *
+ * @p: Params for the channel being used
+ * @errp: pointer to an error
+ */
+static int multifd_qpl_decompress_pages(MultiFDRecvParams *p, Error **errp)
+{
+    QplData *qpl = p->compress_data;
+    uint32_t size = p->page_size;
+    uint8_t *zbuf = qpl->zbuf;
+    uint8_t *addr;
+    uint32_t len;
+    qpl_job *job;
+
+    for (int i = 0; i < p->normal_num; i++) {
+        addr = p->host + p->normal[i];
+        len = qpl->zlen[i];
+        /* the page is uncompressed if received length equals the page size */
+        if (len == size) {
+            memcpy(addr, zbuf, size);
+            zbuf += size;
+            continue;
+        }
+
+        job = qpl->hw_jobs[i].job;
+        multifd_qpl_prepare_decomp_job(job, zbuf, len, addr, size);
+        if (multifd_qpl_submit_job(job)) {
+            qpl->hw_jobs[i].fallback_sw_path = false;
+        } else {
+            qpl->hw_jobs[i].fallback_sw_path = true;
+            job = qpl->sw_job;
+            multifd_qpl_prepare_decomp_job(job, zbuf, len, addr, size);
+            if (!multifd_qpl_process_and_check_job(job, false, size, errp)) {
+                return -1;
+            }
+        }
+        zbuf += len;
+    }
+
+    for (int i = 0; i < p->normal_num; i++) {
+        /* ignore pages that have already been processed */
+        if (qpl->zlen[i] == size || qpl->hw_jobs[i].fallback_sw_path) {
+            continue;
+        }
+
+        job = qpl->hw_jobs[i].job;
+        if (!multifd_qpl_process_and_check_job(job, true, size, errp)) {
+            return -1;
+        }
+    }
+    return 0;
+}
 /**
  * multifd_qpl_recv: read the data from the channel into actual pages
  *
@@ -325,8 +689,48 @@ static void multifd_qpl_recv_cleanup(MultiFDRecvParams *p)
  */
 static int multifd_qpl_recv(MultiFDRecvParams *p, Error **errp)
 {
-    /* Implement in next patch */
-    return -1;
+    QplData *qpl = p->compress_data;
+    uint32_t in_size = p->next_packet_size;
+    uint32_t flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK;
+    uint32_t len = 0;
+    uint32_t zbuf_len = 0;
+    int ret;
+
+    if (flags != MULTIFD_FLAG_QPL) {
+        error_setg(errp, "multifd %u: flags received %x flags expected %x",
+                   p->id, flags, MULTIFD_FLAG_QPL);
+        return -1;
+    }
+    multifd_recv_zero_page_process(p);
+    if (!p->normal_num) {
+        assert(in_size == 0);
+        return 0;
+    }
+
+    /* read compressed page lengths */
+    len = p->normal_num * sizeof(uint32_t);
+    assert(len < in_size);
+    ret = qio_channel_read_all(p->c, (void *) qpl->zlen, len, errp);
+    if (ret != 0) {
+        return ret;
+    }
+    for (int i = 0; i < p->normal_num; i++) {
+        qpl->zlen[i] = be32_to_cpu(qpl->zlen[i]);
+        assert(qpl->zlen[i] <= p->page_size);
+        zbuf_len += qpl->zlen[i];
+    }
+
+    /* read compressed pages */
+    assert(in_size == len + zbuf_len);
+    ret = qio_channel_read_all(p->c, (void *) qpl->zbuf, zbuf_len, errp);
+    if (ret != 0) {
+        return ret;
+    }
+
+    if (qpl->hw_avail) {
+        return multifd_qpl_decompress_pages(p, errp);
+    }
+    return multifd_qpl_decompress_pages_slow_path(p, errp);
 }
 
 static MultiFDMethods multifd_qpl_ops = {
-- 
2.43.0
Re: [PATCH v7 6/7] migration/multifd: implement qpl compression and decompression
Posted by Fabiano Rosas 5 months, 3 weeks ago
Yuan Liu <yuan1.liu@intel.com> writes:

> QPL compression and decompression will use IAA hardware first.
> If IAA hardware is not available, it will automatically fall
> back to QPL software path, if the software job also fails,
> the uncompressed page is sent directly.
>
> Signed-off-by: Yuan Liu <yuan1.liu@intel.com>
> Reviewed-by: Nanhai Zou <nanhai.zou@intel.com>
> ---
>  migration/multifd-qpl.c | 412 +++++++++++++++++++++++++++++++++++++++-
>  1 file changed, 408 insertions(+), 4 deletions(-)
>
> diff --git a/migration/multifd-qpl.c b/migration/multifd-qpl.c
> index 6791a204d5..18b3384bd5 100644
> --- a/migration/multifd-qpl.c
> +++ b/migration/multifd-qpl.c
> @@ -13,9 +13,14 @@
>  #include "qemu/osdep.h"
>  #include "qemu/module.h"
>  #include "qapi/error.h"
> +#include "qapi/qapi-types-migration.h"
> +#include "exec/ramblock.h"
>  #include "multifd.h"
>  #include "qpl/qpl.h"
>  
> +/* Maximum number of retries to resubmit a job if IAA work queues are full */
> +#define MAX_SUBMIT_RETRY_NUM (3)
> +
>  typedef struct {
>      /* the QPL hardware path job */
>      qpl_job *job;
> @@ -260,6 +265,219 @@ static void multifd_qpl_send_cleanup(MultiFDSendParams *p, Error **errp)
>      p->iov = NULL;
>  }
>  
> +/**
> + * multifd_qpl_prepare_job: prepare the job
> + *
> + * Set the QPL job parameters and properties.
> + *
> + * @job: pointer to the qpl_job structure
> + * @is_compression: indicates compression and decompression
> + * @input: pointer to the input data buffer
> + * @input_len: the length of the input data
> + * @output: pointer to the output data buffer
> + * @output_len: the length of the output data
> + */
> +static void multifd_qpl_prepare_job(qpl_job *job, bool is_compression,
> +                                    uint8_t *input, uint32_t input_len,
> +                                    uint8_t *output, uint32_t output_len)
> +{
> +    job->op = is_compression ? qpl_op_compress : qpl_op_decompress;
> +    job->next_in_ptr = input;
> +    job->next_out_ptr = output;
> +    job->available_in = input_len;
> +    job->available_out = output_len;
> +    job->flags = QPL_FLAG_FIRST | QPL_FLAG_LAST | QPL_FLAG_OMIT_VERIFY;
> +    /* only supports compression level 1 */
> +    job->level = 1;
> +}
> +
> +/**
> + * multifd_qpl_prepare_job: prepare the compression job

function name is wrong

> + *
> + * Set the compression job parameters and properties.
> + *
> + * @job: pointer to the qpl_job structure
> + * @input: pointer to the input data buffer
> + * @input_len: the length of the input data
> + * @output: pointer to the output data buffer
> + * @output_len: the length of the output data
> + */
> +static void multifd_qpl_prepare_comp_job(qpl_job *job, uint8_t *input,
> +                                         uint32_t input_len, uint8_t *output,
> +                                         uint32_t output_len)
> +{
> +    multifd_qpl_prepare_job(job, true, input, input_len, output, output_len);
> +}
> +
> +/**
> + * multifd_qpl_prepare_job: prepare the decompression job

here as well

> + *
> + * Set the decompression job parameters and properties.
> + *
> + * @job: pointer to the qpl_job structure
> + * @input: pointer to the input data buffer
> + * @input_len: the length of the input data
> + * @output: pointer to the output data buffer
> + * @output_len: the length of the output data
> + */
> +static void multifd_qpl_prepare_decomp_job(qpl_job *job, uint8_t *input,
> +                                           uint32_t input_len, uint8_t *output,
> +                                           uint32_t output_len)
> +{
> +    multifd_qpl_prepare_job(job, false, input, input_len, output, output_len);
> +}
> +
> +/**
> + * multifd_qpl_fill_iov: fill in the IOV
> + *
> + * Fill in the QPL packet IOV
> + *
> + * @p: Params for the channel being used
> + * @data: pointer to the IOV data
> + * @len: The length of the IOV data
> + */
> +static void multifd_qpl_fill_iov(MultiFDSendParams *p, uint8_t *data,
> +                                 uint32_t len)
> +{
> +    p->iov[p->iovs_num].iov_base = data;
> +    p->iov[p->iovs_num].iov_len = len;
> +    p->iovs_num++;
> +    p->next_packet_size += len;
> +}
> +
> +/**
> + * multifd_qpl_fill_packet: fill the compressed page into the QPL packet
> + *
> + * Fill the compressed page length and IOV into the QPL packet
> + *
> + * @idx: The index of the compressed length array
> + * @p: Params for the channel being used
> + * @data: pointer to the compressed page buffer
> + * @len: The length of the compressed page
> + */
> +static void multifd_qpl_fill_packet(uint32_t idx, MultiFDSendParams *p,
> +                                    uint8_t *data, uint32_t len)
> +{
> +    QplData *qpl = p->compress_data;
> +
> +    qpl->zlen[idx] = cpu_to_be32(len);
> +    multifd_qpl_fill_iov(p, data, len);
> +}
> +
> +/**
> + * multifd_qpl_submit_job: submit a job to the hardware
> + *
> + * Submit a QPL hardware job to the IAA device
> + *
> + * Returns true if the job is submitted successfully, otherwise false.
> + *
> + * @job: pointer to the qpl_job structure
> + */
> +static bool multifd_qpl_submit_job(qpl_job *job)
> +{
> +    qpl_status status;
> +    uint32_t num = 0;
> +
> +retry:
> +    status = qpl_submit_job(job);
> +    if (status == QPL_STS_QUEUES_ARE_BUSY_ERR) {
> +        if (num < MAX_SUBMIT_RETRY_NUM) {
> +            num++;
> +            goto retry;
> +        }
> +    }
> +    return (status == QPL_STS_OK);

How often do we expect this to fail? Will the queues be busy frequently
or is this an unlikely event? I'm thinking whether we really need to
allow a fallback for the hw path. Sorry if this has been discussed
already, I don't remember.

> +}
> +
> +/**
> + * multifd_qpl_compress_pages_slow_path: compress pages using slow path
> + *
> + * Compress the pages using software. If compression fails, the page will
> + * be sent directly.
> + *
> + * @p: Params for the channel being used
> + */
> +static void multifd_qpl_compress_pages_slow_path(MultiFDSendParams *p)
> +{
> +    QplData *qpl = p->compress_data;
> +    uint32_t size = p->page_size;
> +    qpl_job *job = qpl->sw_job;
> +    uint8_t *zbuf = qpl->zbuf;
> +    uint8_t *buf;
> +
> +    for (int i = 0; i < p->pages->normal_num; i++) {
> +        buf = p->pages->block->host + p->pages->offset[i];
> +        /* Set output length to less than the page to reduce decompression */
> +        multifd_qpl_prepare_comp_job(job, buf, size, zbuf, size - 1);
> +        if (qpl_execute_job(job) == QPL_STS_OK) {
> +            multifd_qpl_fill_packet(i, p, zbuf, job->total_out);
> +        } else {
> +            /* send the page directly */

s/directly/uncompressed/

a bit clearer.

> +            multifd_qpl_fill_packet(i, p, buf, size);
> +        }
> +        zbuf += size;
> +    }
> +}
> +
> +/**
> + * multifd_qpl_compress_pages: compress pages
> + *
> + * Submit the pages to the IAA hardware for compression. If hardware
> + * compression fails, it falls back to software compression. If software
> + * compression also fails, the page is sent directly
> + *
> + * @p: Params for the channel being used
> + */
> +static void multifd_qpl_compress_pages(MultiFDSendParams *p)
> +{
> +    QplData *qpl = p->compress_data;
> +    MultiFDPages_t *pages = p->pages;
> +    uint32_t size = p->page_size;
> +    QplHwJob *hw_job;
> +    uint8_t *buf;
> +    uint8_t *zbuf;
> +

Let's document the output size choice more explicitly:

    /*
     * Set output length to less than the page size to force the job to
     * fail in case it compresses to a larger size. We'll send that page
     * without compression and skip the decompression operation on the
     * destination.
     */
     out_size = size - 1;

you can then omit the other comments.

> +    for (int i = 0; i < pages->normal_num; i++) {
> +        buf = pages->block->host + pages->offset[i];
> +        zbuf = qpl->zbuf + (size * i);
> +        hw_job = &qpl->hw_jobs[i];
> +        /* Set output length to less than the page to reduce decompression */
> +        multifd_qpl_prepare_comp_job(hw_job->job, buf, size, zbuf, size - 1);
> +        if (multifd_qpl_submit_job(hw_job->job)) {
> +            hw_job->fallback_sw_path = false;
> +        } else {
> +            hw_job->fallback_sw_path = true;
> +            /* Set output length less than page size to reduce decompression */
> +            multifd_qpl_prepare_comp_job(qpl->sw_job, buf, size, zbuf,
> +                                         size - 1);
> +            if (qpl_execute_job(qpl->sw_job) == QPL_STS_OK) {
> +                hw_job->sw_output = zbuf;
> +                hw_job->sw_output_len = qpl->sw_job->total_out;
> +            } else {
> +                hw_job->sw_output = buf;
> +                hw_job->sw_output_len = size;
> +            }

Hmm, these look a bit cumbersome, would it work if we moved the fallback
qpl_execute_job() down into the other loop? We could then avoid the
extra fields. Something like:

static void multifd_qpl_compress_pages(MultiFDSendParams *p)
{
    QplData *qpl = p->compress_data;
    MultiFDPages_t *pages = p->pages;
    uint32_t out_size, size = p->page_size;
    uint8_t *buf, *zbuf;

    /*
     * Set output length to less than the page size to force the job to
     * fail in case it compresses to a larger size. We'll send that page
     * without compression to skip the decompression operation on the
     * destination.
     */
    out_size = size - 1;

    for (int i = 0; i < pages->normal_num; i++) {
        QplHwJob *hw_job = &qpl->hw_jobs[i];

        hw_job->fallback_sw_path = false;
        buf = pages->block->host + pages->offset[i];
        zbuf = qpl->zbuf + (size * i);

        multifd_qpl_prepare_comp_job(hw_job->job, buf, size, zbuf, out_size);

        if (!multifd_qpl_submit_job(hw_job->job)) {
            hw_job->fallback_sw_path = true;
        }
    }

    for (int i = 0; i < pages->normal_num; i++) {
        QplHwJob *hw_job = &qpl->hw_jobs[i];
        qpl_job *job;

        buf = pages->block->host + pages->offset[i];
        zbuf = qpl->zbuf + (size * i);

        if (hw_job->fallback_sw_path) {
            job = qpl->sw_job;
            multifd_qpl_prepare_comp_job(job, buf, size, zbuf, out_size);
            ret = qpl_execute_job(job);
        } else {            
            job = hw_job->job; 
            ret = qpl_wait_job(job);
        }

        if (ret == QPL_STS_OK) {
            multifd_qpl_fill_packet(i, p, zbuf, job->total_out);
        } else {
            multifd_qpl_fill_packet(i, p, buf, size);
        }
    }
}

> +        }
> +    }
> +
> +    for (int i = 0; i < pages->normal_num; i++) {
> +        buf = pages->block->host + pages->offset[i];
> +        zbuf = qpl->zbuf + (size * i);
> +        hw_job = &qpl->hw_jobs[i];
> +        if (hw_job->fallback_sw_path) {
> +            multifd_qpl_fill_packet(i, p, hw_job->sw_output,
> +                                    hw_job->sw_output_len);
> +            continue;
> +        }
> +        if (qpl_wait_job(hw_job->job) == QPL_STS_OK) {
> +            multifd_qpl_fill_packet(i, p, zbuf, hw_job->job->total_out);
> +        } else {
> +            /* send the page directly */
> +            multifd_qpl_fill_packet(i, p, buf, size);
> +        }
> +    }
> +}
> +
>  /**
>   * multifd_qpl_send_prepare: prepare data to be able to send
>   *
> @@ -273,8 +491,26 @@ static void multifd_qpl_send_cleanup(MultiFDSendParams *p, Error **errp)
>   */
>  static int multifd_qpl_send_prepare(MultiFDSendParams *p, Error **errp)
>  {
> -    /* Implement in next patch */
> -    return -1;
> +    QplData *qpl = p->compress_data;
> +    uint32_t len = 0;
> +
> +    if (!multifd_send_prepare_common(p)) {
> +        goto out;
> +    }
> +
> +    /* The first IOV is used to store the compressed page lengths */
> +    len = p->pages->normal_num * sizeof(uint32_t);
> +    multifd_qpl_fill_iov(p, (uint8_t *) qpl->zlen, len);
> +    if (qpl->hw_avail) {
> +        multifd_qpl_compress_pages(p);
> +    } else {
> +        multifd_qpl_compress_pages_slow_path(p);
> +    }
> +
> +out:
> +    p->flags |= MULTIFD_FLAG_QPL;
> +    multifd_send_fill_packet(p);
> +    return 0;
>  }
>  
>  /**
> @@ -312,6 +548,134 @@ static void multifd_qpl_recv_cleanup(MultiFDRecvParams *p)
>      p->compress_data = NULL;
>  }
>  
> +/**
> + * multifd_qpl_process_and_check_job: process and check a QPL job
> + *
> + * Process the job and check whether the job output length is the
> + * same as the specified length
> + *
> + * Returns true if the job execution succeeded and the output length
> + * is equal to the specified length, otherwise false.
> + *
> + * @job: pointer to the qpl_job structure
> + * @is_hardware: indicates whether the job is a hardware job
> + * @len: Specified output length
> + * @errp: pointer to an error
> + */
> +static bool multifd_qpl_process_and_check_job(qpl_job *job, bool is_hardware,
> +                                              uint32_t len, Error **errp)
> +{
> +    qpl_status status;
> +
> +    status = (is_hardware ? qpl_wait_job(job) : qpl_execute_job(job));
> +    if (status != QPL_STS_OK) {
> +        error_setg(errp, "qpl_execute_job failed with error %d", status);

The error message should also cover qpl_wait_job(), right? Maybe just
use "qpl job failed".

> +        return false;
> +    }
> +    if (job->total_out != len) {
> +        error_setg(errp, "qpl decompressed len %u, expected len %u",
> +                   job->total_out, len);
> +        return false;
> +    }
> +    return true;
> +}
> +
> +/**
> + * multifd_qpl_decompress_pages_slow_path: decompress pages using slow path
> + *
> + * Decompress the pages using software
> + *
> + * Returns 0 on success or -1 on error
> + *
> + * @p: Params for the channel being used
> + * @errp: pointer to an error
> + */
> +static int multifd_qpl_decompress_pages_slow_path(MultiFDRecvParams *p,
> +                                                  Error **errp)
> +{
> +    QplData *qpl = p->compress_data;
> +    uint32_t size = p->page_size;
> +    qpl_job *job = qpl->sw_job;
> +    uint8_t *zbuf = qpl->zbuf;
> +    uint8_t *addr;
> +    uint32_t len;
> +
> +    for (int i = 0; i < p->normal_num; i++) {
> +        len = qpl->zlen[i];
> +        addr = p->host + p->normal[i];
> +        /* the page is uncompressed, load it */
> +        if (len == size) {
> +            memcpy(addr, zbuf, size);
> +            zbuf += size;
> +            continue;
> +        }
> +        multifd_qpl_prepare_decomp_job(job, zbuf, len, addr, size);
> +        if (!multifd_qpl_process_and_check_job(job, false, size, errp)) {
> +            return -1;
> +        }
> +        zbuf += len;
> +    }
> +    return 0;
> +}
> +
> +/**
> + * multifd_qpl_decompress_pages: decompress pages
> + *
> + * Decompress the pages using the IAA hardware. If hardware
> + * decompression fails, it falls back to software decompression.
> + *
> + * Returns 0 on success or -1 on error
> + *
> + * @p: Params for the channel being used
> + * @errp: pointer to an error
> + */
> +static int multifd_qpl_decompress_pages(MultiFDRecvParams *p, Error **errp)
> +{
> +    QplData *qpl = p->compress_data;
> +    uint32_t size = p->page_size;
> +    uint8_t *zbuf = qpl->zbuf;
> +    uint8_t *addr;
> +    uint32_t len;
> +    qpl_job *job;
> +
> +    for (int i = 0; i < p->normal_num; i++) {
> +        addr = p->host + p->normal[i];
> +        len = qpl->zlen[i];
> +        /* the page is uncompressed if received length equals the page size */
> +        if (len == size) {
> +            memcpy(addr, zbuf, size);
> +            zbuf += size;
> +            continue;
> +        }
> +
> +        job = qpl->hw_jobs[i].job;
> +        multifd_qpl_prepare_decomp_job(job, zbuf, len, addr, size);
> +        if (multifd_qpl_submit_job(job)) {
> +            qpl->hw_jobs[i].fallback_sw_path = false;
> +        } else {
> +            qpl->hw_jobs[i].fallback_sw_path = true;
> +            job = qpl->sw_job;
> +            multifd_qpl_prepare_decomp_job(job, zbuf, len, addr, size);
> +            if (!multifd_qpl_process_and_check_job(job, false, size, errp)) {
> +                return -1;
> +            }

Here the same suggestion applies. You created
multifd_qpl_process_and_check_job() but is now calling it twice, which
seems to lose the purpose. If the fallback moves to the loop below, then
you do it all in one place:

    for (int i = 0; i < p->normal_num; i++) {
        bool is_sw = !qpl->hw_jobs[i].fallback_sw_path;

        if (is_sw) {
            job = qpl->sw_job;
            multifd_qpl_prepare_decomp_job(job, zbuf, len, addr, size);
        } else {
            job = qpl->hw_jobs[i].job;
        }

        if (!multifd_qpl_process_and_check_job(job, !is_sw, size, errp)) {
            return -1;
        }
    }

> +        }
> +        zbuf += len;
> +    }
> +
> +    for (int i = 0; i < p->normal_num; i++) {
> +        /* ignore pages that have already been processed */
> +        if (qpl->zlen[i] == size || qpl->hw_jobs[i].fallback_sw_path) {
> +            continue;
> +        }
> +
> +        job = qpl->hw_jobs[i].job;
> +        if (!multifd_qpl_process_and_check_job(job, true, size, errp)) {
> +            return -1;
> +        }
> +    }
> +    return 0;
> +}
>  /**
>   * multifd_qpl_recv: read the data from the channel into actual pages
>   *
> @@ -325,8 +689,48 @@ static void multifd_qpl_recv_cleanup(MultiFDRecvParams *p)
>   */
>  static int multifd_qpl_recv(MultiFDRecvParams *p, Error **errp)
>  {
> -    /* Implement in next patch */
> -    return -1;
> +    QplData *qpl = p->compress_data;
> +    uint32_t in_size = p->next_packet_size;
> +    uint32_t flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK;
> +    uint32_t len = 0;
> +    uint32_t zbuf_len = 0;
> +    int ret;
> +
> +    if (flags != MULTIFD_FLAG_QPL) {
> +        error_setg(errp, "multifd %u: flags received %x flags expected %x",
> +                   p->id, flags, MULTIFD_FLAG_QPL);
> +        return -1;
> +    }
> +    multifd_recv_zero_page_process(p);
> +    if (!p->normal_num) {
> +        assert(in_size == 0);
> +        return 0;
> +    }
> +
> +    /* read compressed page lengths */
> +    len = p->normal_num * sizeof(uint32_t);
> +    assert(len < in_size);
> +    ret = qio_channel_read_all(p->c, (void *) qpl->zlen, len, errp);
> +    if (ret != 0) {
> +        return ret;
> +    }
> +    for (int i = 0; i < p->normal_num; i++) {
> +        qpl->zlen[i] = be32_to_cpu(qpl->zlen[i]);
> +        assert(qpl->zlen[i] <= p->page_size);
> +        zbuf_len += qpl->zlen[i];
> +    }
> +
> +    /* read compressed pages */
> +    assert(in_size == len + zbuf_len);
> +    ret = qio_channel_read_all(p->c, (void *) qpl->zbuf, zbuf_len, errp);
> +    if (ret != 0) {
> +        return ret;
> +    }
> +
> +    if (qpl->hw_avail) {
> +        return multifd_qpl_decompress_pages(p, errp);
> +    }
> +    return multifd_qpl_decompress_pages_slow_path(p, errp);
>  }
>  
>  static MultiFDMethods multifd_qpl_ops = {
RE: [PATCH v7 6/7] migration/multifd: implement qpl compression and decompression
Posted by Liu, Yuan1 5 months, 3 weeks ago
> -----Original Message-----
> From: Fabiano Rosas <farosas@suse.de>
> Sent: Thursday, June 6, 2024 6:26 AM
> To: Liu, Yuan1 <yuan1.liu@intel.com>; peterx@redhat.com;
> pbonzini@redhat.com; marcandre.lureau@redhat.com; berrange@redhat.com;
> thuth@redhat.com; philmd@linaro.org
> Cc: qemu-devel@nongnu.org; Liu, Yuan1 <yuan1.liu@intel.com>; Zou, Nanhai
> <nanhai.zou@intel.com>; shameerali.kolothum.thodi@huawei.com
> Subject: Re: [PATCH v7 6/7] migration/multifd: implement qpl compression
> and decompression
> 
> Yuan Liu <yuan1.liu@intel.com> writes:
> 
> > QPL compression and decompression will use IAA hardware first.
> > If IAA hardware is not available, it will automatically fall
> > back to QPL software path, if the software job also fails,
> > the uncompressed page is sent directly.
> >
> > Signed-off-by: Yuan Liu <yuan1.liu@intel.com>
> > Reviewed-by: Nanhai Zou <nanhai.zou@intel.com>
> > ---
> >  migration/multifd-qpl.c | 412 +++++++++++++++++++++++++++++++++++++++-
> >  1 file changed, 408 insertions(+), 4 deletions(-)
> >
> > diff --git a/migration/multifd-qpl.c b/migration/multifd-qpl.c
> > index 6791a204d5..18b3384bd5 100644
> > --- a/migration/multifd-qpl.c
> > +++ b/migration/multifd-qpl.c
> > @@ -13,9 +13,14 @@
> >  #include "qemu/osdep.h"
> >  #include "qemu/module.h"
> >  #include "qapi/error.h"
> > +#include "qapi/qapi-types-migration.h"
> > +#include "exec/ramblock.h"
> >  #include "multifd.h"
> >  #include "qpl/qpl.h"
> >
> > +/* Maximum number of retries to resubmit a job if IAA work queues are
> full */
> > +#define MAX_SUBMIT_RETRY_NUM (3)
> > +
> >  typedef struct {
> >      /* the QPL hardware path job */
> >      qpl_job *job;
> > @@ -260,6 +265,219 @@ static void
> multifd_qpl_send_cleanup(MultiFDSendParams *p, Error **errp)
> >      p->iov = NULL;
> >  }
> >
> > +/**
> > + * multifd_qpl_prepare_job: prepare the job
> > + *
> > + * Set the QPL job parameters and properties.
> > + *
> > + * @job: pointer to the qpl_job structure
> > + * @is_compression: indicates compression and decompression
> > + * @input: pointer to the input data buffer
> > + * @input_len: the length of the input data
> > + * @output: pointer to the output data buffer
> > + * @output_len: the length of the output data
> > + */
> > +static void multifd_qpl_prepare_job(qpl_job *job, bool is_compression,
> > +                                    uint8_t *input, uint32_t input_len,
> > +                                    uint8_t *output, uint32_t
> output_len)
> > +{
> > +    job->op = is_compression ? qpl_op_compress : qpl_op_decompress;
> > +    job->next_in_ptr = input;
> > +    job->next_out_ptr = output;
> > +    job->available_in = input_len;
> > +    job->available_out = output_len;
> > +    job->flags = QPL_FLAG_FIRST | QPL_FLAG_LAST | QPL_FLAG_OMIT_VERIFY;
> > +    /* only supports compression level 1 */
> > +    job->level = 1;
> > +}
> > +
> > +/**
> > + * multifd_qpl_prepare_job: prepare the compression job
> 
> function name is wrong

Thanks, I will fix this next version.
 
> > + *
> > + * Set the compression job parameters and properties.
> > + *
> > + * @job: pointer to the qpl_job structure
> > + * @input: pointer to the input data buffer
> > + * @input_len: the length of the input data
> > + * @output: pointer to the output data buffer
> > + * @output_len: the length of the output data
> > + */
> > +static void multifd_qpl_prepare_comp_job(qpl_job *job, uint8_t *input,
> > +                                         uint32_t input_len, uint8_t
> *output,
> > +                                         uint32_t output_len)
> > +{
> > +    multifd_qpl_prepare_job(job, true, input, input_len, output,
> output_len);
> > +}
> > +
> > +/**
> > + * multifd_qpl_prepare_job: prepare the decompression job

Thanks, I will fix this next version.
 
> > + *
> > + * Set the decompression job parameters and properties.
> > + *
> > + * @job: pointer to the qpl_job structure
> > + * @input: pointer to the input data buffer
> > + * @input_len: the length of the input data
> > + * @output: pointer to the output data buffer
> > + * @output_len: the length of the output data
> > + */
> > +static void multifd_qpl_prepare_decomp_job(qpl_job *job, uint8_t
> *input,
> > +                                           uint32_t input_len, uint8_t
> *output,
> > +                                           uint32_t output_len)
> > +{
> > +    multifd_qpl_prepare_job(job, false, input, input_len, output,
> output_len);
> > +}
> > +
> > +/**
> > + * multifd_qpl_fill_iov: fill in the IOV
> > + *
> > + * Fill in the QPL packet IOV
> > + *
> > + * @p: Params for the channel being used
> > + * @data: pointer to the IOV data
> > + * @len: The length of the IOV data
> > + */
> > +static void multifd_qpl_fill_iov(MultiFDSendParams *p, uint8_t *data,
> > +                                 uint32_t len)
> > +{
> > +    p->iov[p->iovs_num].iov_base = data;
> > +    p->iov[p->iovs_num].iov_len = len;
> > +    p->iovs_num++;
> > +    p->next_packet_size += len;
> > +}
> > +
> > +/**
> > + * multifd_qpl_fill_packet: fill the compressed page into the QPL
> packet
> > + *
> > + * Fill the compressed page length and IOV into the QPL packet
> > + *
> > + * @idx: The index of the compressed length array
> > + * @p: Params for the channel being used
> > + * @data: pointer to the compressed page buffer
> > + * @len: The length of the compressed page
> > + */
> > +static void multifd_qpl_fill_packet(uint32_t idx, MultiFDSendParams *p,
> > +                                    uint8_t *data, uint32_t len)
> > +{
> > +    QplData *qpl = p->compress_data;
> > +
> > +    qpl->zlen[idx] = cpu_to_be32(len);
> > +    multifd_qpl_fill_iov(p, data, len);
> > +}
> > +
> > +/**
> > + * multifd_qpl_submit_job: submit a job to the hardware
> > + *
> > + * Submit a QPL hardware job to the IAA device
> > + *
> > + * Returns true if the job is submitted successfully, otherwise false.
> > + *
> > + * @job: pointer to the qpl_job structure
> > + */
> > +static bool multifd_qpl_submit_job(qpl_job *job)
> > +{
> > +    qpl_status status;
> > +    uint32_t num = 0;
> > +
> > +retry:
> > +    status = qpl_submit_job(job);
> > +    if (status == QPL_STS_QUEUES_ARE_BUSY_ERR) {
> > +        if (num < MAX_SUBMIT_RETRY_NUM) {
> > +            num++;
> > +            goto retry;
> > +        }
> > +    }
> > +    return (status == QPL_STS_OK);
> 
> How often do we expect this to fail? Will the queues be busy frequently
> or is this an unlikely event? I'm thinking whether we really need to
> allow a fallback for the hw path. Sorry if this has been discussed
> already, I don't remember.

In some scenarios, this may happen frequently, such as configuring 4 channels 
but only one IAA device is available. In the case of insufficient IAA hardware 
resources, retry and fallback can help optimize performance.
I have a comparison test below

1. Retry + SW fallback:
   total time: 14649 ms
   downtime: 25 ms
   throughput: 17666.57 mbps
   pages-per-second: 1509647

2. No fallback, always wait for work queues to become available
   total time: 18381 ms
   downtime: 25 ms
   throughput: 13698.65 mbps
   pages-per-second: 859607

> > +}
> > +
> > +/**
> > + * multifd_qpl_compress_pages_slow_path: compress pages using slow path
> > + *
> > + * Compress the pages using software. If compression fails, the page
> will
> > + * be sent directly.
> > + *
> > + * @p: Params for the channel being used
> > + */
> > +static void multifd_qpl_compress_pages_slow_path(MultiFDSendParams *p)
> > +{
> > +    QplData *qpl = p->compress_data;
> > +    uint32_t size = p->page_size;
> > +    qpl_job *job = qpl->sw_job;
> > +    uint8_t *zbuf = qpl->zbuf;
> > +    uint8_t *buf;
> > +
> > +    for (int i = 0; i < p->pages->normal_num; i++) {
> > +        buf = p->pages->block->host + p->pages->offset[i];
> > +        /* Set output length to less than the page to reduce
> decompression */
> > +        multifd_qpl_prepare_comp_job(job, buf, size, zbuf, size - 1);
> > +        if (qpl_execute_job(job) == QPL_STS_OK) {
> > +            multifd_qpl_fill_packet(i, p, zbuf, job->total_out);
> > +        } else {
> > +            /* send the page directly */
> 
> s/directly/uncompressed/
> 
> a bit clearer.

Sure, I will fix it next version. 

> > +            multifd_qpl_fill_packet(i, p, buf, size);
> > +        }
> > +        zbuf += size;
> > +    }
> > +}
> > +
> > +/**
> > + * multifd_qpl_compress_pages: compress pages
> > + *
> > + * Submit the pages to the IAA hardware for compression. If hardware
> > + * compression fails, it falls back to software compression. If
> software
> > + * compression also fails, the page is sent directly
> > + *
> > + * @p: Params for the channel being used
> > + */
> > +static void multifd_qpl_compress_pages(MultiFDSendParams *p)
> > +{
> > +    QplData *qpl = p->compress_data;
> > +    MultiFDPages_t *pages = p->pages;
> > +    uint32_t size = p->page_size;
> > +    QplHwJob *hw_job;
> > +    uint8_t *buf;
> > +    uint8_t *zbuf;
> > +
> 
> Let's document the output size choice more explicitly:
> 
>     /*
>      * Set output length to less than the page size to force the job to
>      * fail in case it compresses to a larger size. We'll send that page
>      * without compression and skip the decompression operation on the
>      * destination.
>      */
>      out_size = size - 1;
> 
> you can then omit the other comments.

Thanks for the comments, I will refine this next version.
 
> > +    for (int i = 0; i < pages->normal_num; i++) {
> > +        buf = pages->block->host + pages->offset[i];
> > +        zbuf = qpl->zbuf + (size * i);
> > +        hw_job = &qpl->hw_jobs[i];
> > +        /* Set output length to less than the page to reduce
> decompression */
> > +        multifd_qpl_prepare_comp_job(hw_job->job, buf, size, zbuf, size
> - 1);
> > +        if (multifd_qpl_submit_job(hw_job->job)) {
> > +            hw_job->fallback_sw_path = false;
> > +        } else {
> > +            hw_job->fallback_sw_path = true;
> > +            /* Set output length less than page size to reduce
> decompression */
> > +            multifd_qpl_prepare_comp_job(qpl->sw_job, buf, size, zbuf,
> > +                                         size - 1);
> > +            if (qpl_execute_job(qpl->sw_job) == QPL_STS_OK) {
> > +                hw_job->sw_output = zbuf;
> > +                hw_job->sw_output_len = qpl->sw_job->total_out;
> > +            } else {
> > +                hw_job->sw_output = buf;
> > +                hw_job->sw_output_len = size;
> > +            }
> 
> Hmm, these look a bit cumbersome, would it work if we moved the fallback
> qpl_execute_job() down into the other loop? We could then avoid the
> extra fields. Something like:
> 
> static void multifd_qpl_compress_pages(MultiFDSendParams *p)
> {
>     QplData *qpl = p->compress_data;
>     MultiFDPages_t *pages = p->pages;
>     uint32_t out_size, size = p->page_size;
>     uint8_t *buf, *zbuf;
> 
>     /*
>      * Set output length to less than the page size to force the job to
>      * fail in case it compresses to a larger size. We'll send that page
>      * without compression to skip the decompression operation on the
>      * destination.
>      */
>     out_size = size - 1;
> 
>     for (int i = 0; i < pages->normal_num; i++) {
>         QplHwJob *hw_job = &qpl->hw_jobs[i];
> 
>         hw_job->fallback_sw_path = false;
>         buf = pages->block->host + pages->offset[i];
>         zbuf = qpl->zbuf + (size * i);
> 
>         multifd_qpl_prepare_comp_job(hw_job->job, buf, size, zbuf,
> out_size);
> 
>         if (!multifd_qpl_submit_job(hw_job->job)) {
>             hw_job->fallback_sw_path = true;
>         }
>     }
> 
>     for (int i = 0; i < pages->normal_num; i++) {
>         QplHwJob *hw_job = &qpl->hw_jobs[i];
>         qpl_job *job;
> 
>         buf = pages->block->host + pages->offset[i];
>         zbuf = qpl->zbuf + (size * i);
> 
>         if (hw_job->fallback_sw_path) {
>             job = qpl->sw_job;
>             multifd_qpl_prepare_comp_job(job, buf, size, zbuf, out_size);
>             ret = qpl_execute_job(job);
>         } else {
>             job = hw_job->job;
>             ret = qpl_wait_job(job);
>         }
> 
>         if (ret == QPL_STS_OK) {
>             multifd_qpl_fill_packet(i, p, zbuf, job->total_out);
>         } else {
>             multifd_qpl_fill_packet(i, p, buf, size);
>         }
>     }
> }

Very thanks for the reference code, I have test the code and the performance is not good.
When the work queue is full, after a hardware job fails to be submitted, the subsequent
job submission will most likely fail as well. so my idea is to use software job execution
instead immediately, but all subsequent jobs will still give priority to hardware path. 

There is almost no overhead in job submission because Intel uses the new "enqcmd" instruction,
which allows the user program to submit the job directly to the hardware.

According to the implementation of the reference code, when a job fails to be submitted, there 
is a high probability that "ALL" subsequent jobs will fail to be submitted and then use software
compression, resulting in the IAA hardware not being fully utilized.

For 4 Channel, 1 IAA device test case, using the reference code will reduce IAA throughput 
from 3.4GBps to 2.2GBps, thus affecting live migration performance.(total time from 14s to 18s)

> > +        }
> > +    }
> > +
> > +    for (int i = 0; i < pages->normal_num; i++) {
> > +        buf = pages->block->host + pages->offset[i];
> > +        zbuf = qpl->zbuf + (size * i);
> > +        hw_job = &qpl->hw_jobs[i];
> > +        if (hw_job->fallback_sw_path) {
> > +            multifd_qpl_fill_packet(i, p, hw_job->sw_output,
> > +                                    hw_job->sw_output_len);
> > +            continue;
> > +        }
> > +        if (qpl_wait_job(hw_job->job) == QPL_STS_OK) {
> > +            multifd_qpl_fill_packet(i, p, zbuf, hw_job->job-
> >total_out);
> > +        } else {
> > +            /* send the page directly */
> > +            multifd_qpl_fill_packet(i, p, buf, size);
> > +        }
> > +    }
> > +}
> > +
> >  /**
> >   * multifd_qpl_send_prepare: prepare data to be able to send
> >   *
> > @@ -273,8 +491,26 @@ static void
> multifd_qpl_send_cleanup(MultiFDSendParams *p, Error **errp)
> >   */
> >  static int multifd_qpl_send_prepare(MultiFDSendParams *p, Error **errp)
> >  {
> > -    /* Implement in next patch */
> > -    return -1;
> > +    QplData *qpl = p->compress_data;
> > +    uint32_t len = 0;
> > +
> > +    if (!multifd_send_prepare_common(p)) {
> > +        goto out;
> > +    }
> > +
> > +    /* The first IOV is used to store the compressed page lengths */
> > +    len = p->pages->normal_num * sizeof(uint32_t);
> > +    multifd_qpl_fill_iov(p, (uint8_t *) qpl->zlen, len);
> > +    if (qpl->hw_avail) {
> > +        multifd_qpl_compress_pages(p);
> > +    } else {
> > +        multifd_qpl_compress_pages_slow_path(p);
> > +    }
> > +
> > +out:
> > +    p->flags |= MULTIFD_FLAG_QPL;
> > +    multifd_send_fill_packet(p);
> > +    return 0;
> >  }
> >
> >  /**
> > @@ -312,6 +548,134 @@ static void
> multifd_qpl_recv_cleanup(MultiFDRecvParams *p)
> >      p->compress_data = NULL;
> >  }
> >
> > +/**
> > + * multifd_qpl_process_and_check_job: process and check a QPL job
> > + *
> > + * Process the job and check whether the job output length is the
> > + * same as the specified length
> > + *
> > + * Returns true if the job execution succeeded and the output length
> > + * is equal to the specified length, otherwise false.
> > + *
> > + * @job: pointer to the qpl_job structure
> > + * @is_hardware: indicates whether the job is a hardware job
> > + * @len: Specified output length
> > + * @errp: pointer to an error
> > + */
> > +static bool multifd_qpl_process_and_check_job(qpl_job *job, bool
> is_hardware,
> > +                                              uint32_t len, Error
> **errp)
> > +{
> > +    qpl_status status;
> > +
> > +    status = (is_hardware ? qpl_wait_job(job) : qpl_execute_job(job));
> > +    if (status != QPL_STS_OK) {
> > +        error_setg(errp, "qpl_execute_job failed with error %d",
> status);
> 
> The error message should also cover qpl_wait_job(), right? Maybe just
> use "qpl job failed".

You are right, I will fix this next version.

> > +        return false;
> > +    }
> > +    if (job->total_out != len) {
> > +        error_setg(errp, "qpl decompressed len %u, expected len %u",
> > +                   job->total_out, len);
> > +        return false;
> > +    }
> > +    return true;
> > +}
> > +
> > +/**
> > + * multifd_qpl_decompress_pages_slow_path: decompress pages using slow
> path
> > + *
> > + * Decompress the pages using software
> > + *
> > + * Returns 0 on success or -1 on error
> > + *
> > + * @p: Params for the channel being used
> > + * @errp: pointer to an error
> > + */
> > +static int multifd_qpl_decompress_pages_slow_path(MultiFDRecvParams *p,
> > +                                                  Error **errp)
> > +{
> > +    QplData *qpl = p->compress_data;
> > +    uint32_t size = p->page_size;
> > +    qpl_job *job = qpl->sw_job;
> > +    uint8_t *zbuf = qpl->zbuf;
> > +    uint8_t *addr;
> > +    uint32_t len;
> > +
> > +    for (int i = 0; i < p->normal_num; i++) {
> > +        len = qpl->zlen[i];
> > +        addr = p->host + p->normal[i];
> > +        /* the page is uncompressed, load it */
> > +        if (len == size) {
> > +            memcpy(addr, zbuf, size);
> > +            zbuf += size;
> > +            continue;
> > +        }
> > +        multifd_qpl_prepare_decomp_job(job, zbuf, len, addr, size);
> > +        if (!multifd_qpl_process_and_check_job(job, false, size, errp))
> {
> > +            return -1;
> > +        }
> > +        zbuf += len;
> > +    }
> > +    return 0;
> > +}
> > +
> > +/**
> > + * multifd_qpl_decompress_pages: decompress pages
> > + *
> > + * Decompress the pages using the IAA hardware. If hardware
> > + * decompression fails, it falls back to software decompression.
> > + *
> > + * Returns 0 on success or -1 on error
> > + *
> > + * @p: Params for the channel being used
> > + * @errp: pointer to an error
> > + */
> > +static int multifd_qpl_decompress_pages(MultiFDRecvParams *p, Error
> **errp)
> > +{
> > +    QplData *qpl = p->compress_data;
> > +    uint32_t size = p->page_size;
> > +    uint8_t *zbuf = qpl->zbuf;
> > +    uint8_t *addr;
> > +    uint32_t len;
> > +    qpl_job *job;
> > +
> > +    for (int i = 0; i < p->normal_num; i++) {
> > +        addr = p->host + p->normal[i];
> > +        len = qpl->zlen[i];
> > +        /* the page is uncompressed if received length equals the page
> size */
> > +        if (len == size) {
> > +            memcpy(addr, zbuf, size);
> > +            zbuf += size;
> > +            continue;
> > +        }
> > +
> > +        job = qpl->hw_jobs[i].job;
> > +        multifd_qpl_prepare_decomp_job(job, zbuf, len, addr, size);
> > +        if (multifd_qpl_submit_job(job)) {
> > +            qpl->hw_jobs[i].fallback_sw_path = false;
> > +        } else {
> > +            qpl->hw_jobs[i].fallback_sw_path = true;
> > +            job = qpl->sw_job;
> > +            multifd_qpl_prepare_decomp_job(job, zbuf, len, addr, size);
> > +            if (!multifd_qpl_process_and_check_job(job, false, size,
> errp)) {
> > +                return -1;
> > +            }
> 
> Here the same suggestion applies. You created
> multifd_qpl_process_and_check_job() but is now calling it twice, which
> seems to lose the purpose. If the fallback moves to the loop below, then
> you do it all in one place:
> 
>     for (int i = 0; i < p->normal_num; i++) {
>         bool is_sw = !qpl->hw_jobs[i].fallback_sw_path;
> 
>         if (is_sw) {
>             job = qpl->sw_job;
>             multifd_qpl_prepare_decomp_job(job, zbuf, len, addr, size);
>         } else {
>             job = qpl->hw_jobs[i].job;
>         }
> 
>         if (!multifd_qpl_process_and_check_job(job, !is_sw, size, errp)) {
>             return -1;
>         }
>     }

I think this is the same issue as discussed above, after a hardware job fails to
be submitted, execute a software job immediately, and subsequent jobs are
prioritized to use hardware jobs. So use the same multifd_qpl_process_and_check_job 
in two parts.
 
> > +        }
> > +        zbuf += len;
> > +    }
> > +
> > +    for (int i = 0; i < p->normal_num; i++) {
> > +        /* ignore pages that have already been processed */
> > +        if (qpl->zlen[i] == size || qpl->hw_jobs[i].fallback_sw_path) {
> > +            continue;
> > +        }
> > +
> > +        job = qpl->hw_jobs[i].job;
> > +        if (!multifd_qpl_process_and_check_job(job, true, size, errp))
> {
> > +            return -1;
> > +        }
> > +    }
> > +    return 0;
> > +}
> >  /**
> >   * multifd_qpl_recv: read the data from the channel into actual pages
> >   *
> > @@ -325,8 +689,48 @@ static void
> multifd_qpl_recv_cleanup(MultiFDRecvParams *p)
> >   */
> >  static int multifd_qpl_recv(MultiFDRecvParams *p, Error **errp)
> >  {
> > -    /* Implement in next patch */
> > -    return -1;
> > +    QplData *qpl = p->compress_data;
> > +    uint32_t in_size = p->next_packet_size;
> > +    uint32_t flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK;
> > +    uint32_t len = 0;
> > +    uint32_t zbuf_len = 0;
> > +    int ret;
> > +
> > +    if (flags != MULTIFD_FLAG_QPL) {
> > +        error_setg(errp, "multifd %u: flags received %x flags
> expected %x",
> > +                   p->id, flags, MULTIFD_FLAG_QPL);
> > +        return -1;
> > +    }
> > +    multifd_recv_zero_page_process(p);
> > +    if (!p->normal_num) {
> > +        assert(in_size == 0);
> > +        return 0;
> > +    }
> > +
> > +    /* read compressed page lengths */
> > +    len = p->normal_num * sizeof(uint32_t);
> > +    assert(len < in_size);
> > +    ret = qio_channel_read_all(p->c, (void *) qpl->zlen, len, errp);
> > +    if (ret != 0) {
> > +        return ret;
> > +    }
> > +    for (int i = 0; i < p->normal_num; i++) {
> > +        qpl->zlen[i] = be32_to_cpu(qpl->zlen[i]);
> > +        assert(qpl->zlen[i] <= p->page_size);
> > +        zbuf_len += qpl->zlen[i];
> > +    }
> > +
> > +    /* read compressed pages */
> > +    assert(in_size == len + zbuf_len);
> > +    ret = qio_channel_read_all(p->c, (void *) qpl->zbuf, zbuf_len,
> errp);
> > +    if (ret != 0) {
> > +        return ret;
> > +    }
> > +
> > +    if (qpl->hw_avail) {
> > +        return multifd_qpl_decompress_pages(p, errp);
> > +    }
> > +    return multifd_qpl_decompress_pages_slow_path(p, errp);
> >  }
> >
> >  static MultiFDMethods multifd_qpl_ops = {
RE: [PATCH v7 6/7] migration/multifd: implement qpl compression and decompression
Posted by Fabiano Rosas 5 months, 3 weeks ago
"Liu, Yuan1" <yuan1.liu@intel.com> writes:

>> -----Original Message-----
>> From: Fabiano Rosas <farosas@suse.de>
>> Sent: Thursday, June 6, 2024 6:26 AM
>> To: Liu, Yuan1 <yuan1.liu@intel.com>; peterx@redhat.com;
>> pbonzini@redhat.com; marcandre.lureau@redhat.com; berrange@redhat.com;
>> thuth@redhat.com; philmd@linaro.org
>> Cc: qemu-devel@nongnu.org; Liu, Yuan1 <yuan1.liu@intel.com>; Zou, Nanhai
>> <nanhai.zou@intel.com>; shameerali.kolothum.thodi@huawei.com
>> Subject: Re: [PATCH v7 6/7] migration/multifd: implement qpl compression
>> and decompression
>> 
>> Yuan Liu <yuan1.liu@intel.com> writes:
>> 
>> > QPL compression and decompression will use IAA hardware first.
>> > If IAA hardware is not available, it will automatically fall
>> > back to QPL software path, if the software job also fails,
>> > the uncompressed page is sent directly.
>> >
>> > Signed-off-by: Yuan Liu <yuan1.liu@intel.com>
>> > Reviewed-by: Nanhai Zou <nanhai.zou@intel.com>
>> > ---
>> >  migration/multifd-qpl.c | 412 +++++++++++++++++++++++++++++++++++++++-
>> >  1 file changed, 408 insertions(+), 4 deletions(-)
>> >
>> > diff --git a/migration/multifd-qpl.c b/migration/multifd-qpl.c
>> > index 6791a204d5..18b3384bd5 100644
>> > --- a/migration/multifd-qpl.c
>> > +++ b/migration/multifd-qpl.c
>> > @@ -13,9 +13,14 @@
>> >  #include "qemu/osdep.h"
>> >  #include "qemu/module.h"
>> >  #include "qapi/error.h"
>> > +#include "qapi/qapi-types-migration.h"
>> > +#include "exec/ramblock.h"
>> >  #include "multifd.h"
>> >  #include "qpl/qpl.h"
>> >
>> > +/* Maximum number of retries to resubmit a job if IAA work queues are
>> full */
>> > +#define MAX_SUBMIT_RETRY_NUM (3)
>> > +
>> >  typedef struct {
>> >      /* the QPL hardware path job */
>> >      qpl_job *job;
>> > @@ -260,6 +265,219 @@ static void
>> multifd_qpl_send_cleanup(MultiFDSendParams *p, Error **errp)
>> >      p->iov = NULL;
>> >  }
>> >
>> > +/**
>> > + * multifd_qpl_prepare_job: prepare the job
>> > + *
>> > + * Set the QPL job parameters and properties.
>> > + *
>> > + * @job: pointer to the qpl_job structure
>> > + * @is_compression: indicates compression and decompression
>> > + * @input: pointer to the input data buffer
>> > + * @input_len: the length of the input data
>> > + * @output: pointer to the output data buffer
>> > + * @output_len: the length of the output data
>> > + */
>> > +static void multifd_qpl_prepare_job(qpl_job *job, bool is_compression,
>> > +                                    uint8_t *input, uint32_t input_len,
>> > +                                    uint8_t *output, uint32_t
>> output_len)
>> > +{
>> > +    job->op = is_compression ? qpl_op_compress : qpl_op_decompress;
>> > +    job->next_in_ptr = input;
>> > +    job->next_out_ptr = output;
>> > +    job->available_in = input_len;
>> > +    job->available_out = output_len;
>> > +    job->flags = QPL_FLAG_FIRST | QPL_FLAG_LAST | QPL_FLAG_OMIT_VERIFY;
>> > +    /* only supports compression level 1 */
>> > +    job->level = 1;
>> > +}
>> > +
>> > +/**
>> > + * multifd_qpl_prepare_job: prepare the compression job
>> 
>> function name is wrong
>
> Thanks, I will fix this next version.
>  
>> > + *
>> > + * Set the compression job parameters and properties.
>> > + *
>> > + * @job: pointer to the qpl_job structure
>> > + * @input: pointer to the input data buffer
>> > + * @input_len: the length of the input data
>> > + * @output: pointer to the output data buffer
>> > + * @output_len: the length of the output data
>> > + */
>> > +static void multifd_qpl_prepare_comp_job(qpl_job *job, uint8_t *input,
>> > +                                         uint32_t input_len, uint8_t
>> *output,
>> > +                                         uint32_t output_len)
>> > +{
>> > +    multifd_qpl_prepare_job(job, true, input, input_len, output,
>> output_len);
>> > +}
>> > +
>> > +/**
>> > + * multifd_qpl_prepare_job: prepare the decompression job
>
> Thanks, I will fix this next version.
>  
>> > + *
>> > + * Set the decompression job parameters and properties.
>> > + *
>> > + * @job: pointer to the qpl_job structure
>> > + * @input: pointer to the input data buffer
>> > + * @input_len: the length of the input data
>> > + * @output: pointer to the output data buffer
>> > + * @output_len: the length of the output data
>> > + */
>> > +static void multifd_qpl_prepare_decomp_job(qpl_job *job, uint8_t
>> *input,
>> > +                                           uint32_t input_len, uint8_t
>> *output,
>> > +                                           uint32_t output_len)
>> > +{
>> > +    multifd_qpl_prepare_job(job, false, input, input_len, output,
>> output_len);
>> > +}
>> > +
>> > +/**
>> > + * multifd_qpl_fill_iov: fill in the IOV
>> > + *
>> > + * Fill in the QPL packet IOV
>> > + *
>> > + * @p: Params for the channel being used
>> > + * @data: pointer to the IOV data
>> > + * @len: The length of the IOV data
>> > + */
>> > +static void multifd_qpl_fill_iov(MultiFDSendParams *p, uint8_t *data,
>> > +                                 uint32_t len)
>> > +{
>> > +    p->iov[p->iovs_num].iov_base = data;
>> > +    p->iov[p->iovs_num].iov_len = len;
>> > +    p->iovs_num++;
>> > +    p->next_packet_size += len;
>> > +}
>> > +
>> > +/**
>> > + * multifd_qpl_fill_packet: fill the compressed page into the QPL
>> packet
>> > + *
>> > + * Fill the compressed page length and IOV into the QPL packet
>> > + *
>> > + * @idx: The index of the compressed length array
>> > + * @p: Params for the channel being used
>> > + * @data: pointer to the compressed page buffer
>> > + * @len: The length of the compressed page
>> > + */
>> > +static void multifd_qpl_fill_packet(uint32_t idx, MultiFDSendParams *p,
>> > +                                    uint8_t *data, uint32_t len)
>> > +{
>> > +    QplData *qpl = p->compress_data;
>> > +
>> > +    qpl->zlen[idx] = cpu_to_be32(len);
>> > +    multifd_qpl_fill_iov(p, data, len);
>> > +}
>> > +
>> > +/**
>> > + * multifd_qpl_submit_job: submit a job to the hardware
>> > + *
>> > + * Submit a QPL hardware job to the IAA device
>> > + *
>> > + * Returns true if the job is submitted successfully, otherwise false.
>> > + *
>> > + * @job: pointer to the qpl_job structure
>> > + */
>> > +static bool multifd_qpl_submit_job(qpl_job *job)
>> > +{
>> > +    qpl_status status;
>> > +    uint32_t num = 0;
>> > +
>> > +retry:
>> > +    status = qpl_submit_job(job);
>> > +    if (status == QPL_STS_QUEUES_ARE_BUSY_ERR) {
>> > +        if (num < MAX_SUBMIT_RETRY_NUM) {
>> > +            num++;
>> > +            goto retry;
>> > +        }
>> > +    }
>> > +    return (status == QPL_STS_OK);
>> 
>> How often do we expect this to fail? Will the queues be busy frequently
>> or is this an unlikely event? I'm thinking whether we really need to
>> allow a fallback for the hw path. Sorry if this has been discussed
>> already, I don't remember.
>
> In some scenarios, this may happen frequently, such as configuring 4 channels 
> but only one IAA device is available. In the case of insufficient IAA hardware 
> resources, retry and fallback can help optimize performance.
> I have a comparison test below
>
> 1. Retry + SW fallback:
>    total time: 14649 ms
>    downtime: 25 ms
>    throughput: 17666.57 mbps
>    pages-per-second: 1509647
>
> 2. No fallback, always wait for work queues to become available
>    total time: 18381 ms
>    downtime: 25 ms
>    throughput: 13698.65 mbps
>    pages-per-second: 859607

Thanks for the data, this is helpful. Let's include it in the commit
message, it's important to let people know you actually did that
analysis. I put a suggestion below:

---
QPL compression and decompression will use IAA hardware path if the IAA
hardware is available. Otherwise the QPL library software path is used.

The hardware path will automatically fall back to QPL software path if
the IAA queues are busy. In some scenarios, this may happen frequently,
such as configuring 4 channels but only one IAA device is available. In
the case of insufficient IAA hardware resources, retry and fallback can
help optimize performance:

 1. Retry + SW fallback:
    total time: 14649 ms
    downtime: 25 ms
    throughput: 17666.57 mbps
    pages-per-second: 1509647

 2. No fallback, always wait for work queues to become available
    total time: 18381 ms
    downtime: 25 ms
    throughput: 13698.65 mbps
    pages-per-second: 859607

If both the hardware and software paths fail, the uncompressed page is
sent directly.

>> > +}
>> > +
>> > +/**
>> > + * multifd_qpl_compress_pages_slow_path: compress pages using slow path
>> > + *
>> > + * Compress the pages using software. If compression fails, the page
>> will
>> > + * be sent directly.
>> > + *
>> > + * @p: Params for the channel being used
>> > + */
>> > +static void multifd_qpl_compress_pages_slow_path(MultiFDSendParams *p)
>> > +{
>> > +    QplData *qpl = p->compress_data;
>> > +    uint32_t size = p->page_size;
>> > +    qpl_job *job = qpl->sw_job;
>> > +    uint8_t *zbuf = qpl->zbuf;
>> > +    uint8_t *buf;
>> > +
>> > +    for (int i = 0; i < p->pages->normal_num; i++) {
>> > +        buf = p->pages->block->host + p->pages->offset[i];
>> > +        /* Set output length to less than the page to reduce
>> decompression */
>> > +        multifd_qpl_prepare_comp_job(job, buf, size, zbuf, size - 1);
>> > +        if (qpl_execute_job(job) == QPL_STS_OK) {
>> > +            multifd_qpl_fill_packet(i, p, zbuf, job->total_out);
>> > +        } else {
>> > +            /* send the page directly */
>> 
>> s/directly/uncompressed/
>> 
>> a bit clearer.
>
> Sure, I will fix it next version. 
>
>> > +            multifd_qpl_fill_packet(i, p, buf, size);
>> > +        }
>> > +        zbuf += size;
>> > +    }
>> > +}
>> > +
>> > +/**
>> > + * multifd_qpl_compress_pages: compress pages
>> > + *
>> > + * Submit the pages to the IAA hardware for compression. If hardware
>> > + * compression fails, it falls back to software compression. If
>> software
>> > + * compression also fails, the page is sent directly
>> > + *
>> > + * @p: Params for the channel being used
>> > + */
>> > +static void multifd_qpl_compress_pages(MultiFDSendParams *p)
>> > +{
>> > +    QplData *qpl = p->compress_data;
>> > +    MultiFDPages_t *pages = p->pages;
>> > +    uint32_t size = p->page_size;
>> > +    QplHwJob *hw_job;
>> > +    uint8_t *buf;
>> > +    uint8_t *zbuf;
>> > +
>> 
>> Let's document the output size choice more explicitly:
>> 
>>     /*
>>      * Set output length to less than the page size to force the job to
>>      * fail in case it compresses to a larger size. We'll send that page
>>      * without compression and skip the decompression operation on the
>>      * destination.
>>      */
>>      out_size = size - 1;
>> 
>> you can then omit the other comments.
>
> Thanks for the comments, I will refine this next version.
>  
>> > +    for (int i = 0; i < pages->normal_num; i++) {
>> > +        buf = pages->block->host + pages->offset[i];
>> > +        zbuf = qpl->zbuf + (size * i);
>> > +        hw_job = &qpl->hw_jobs[i];
>> > +        /* Set output length to less than the page to reduce
>> decompression */
>> > +        multifd_qpl_prepare_comp_job(hw_job->job, buf, size, zbuf, size
>> - 1);
>> > +        if (multifd_qpl_submit_job(hw_job->job)) {
>> > +            hw_job->fallback_sw_path = false;
>> > +        } else {
>> > +            hw_job->fallback_sw_path = true;
>> > +            /* Set output length less than page size to reduce
>> decompression */
>> > +            multifd_qpl_prepare_comp_job(qpl->sw_job, buf, size, zbuf,
>> > +                                         size - 1);
>> > +            if (qpl_execute_job(qpl->sw_job) == QPL_STS_OK) {
>> > +                hw_job->sw_output = zbuf;
>> > +                hw_job->sw_output_len = qpl->sw_job->total_out;
>> > +            } else {
>> > +                hw_job->sw_output = buf;
>> > +                hw_job->sw_output_len = size;
>> > +            }
>> 
>> Hmm, these look a bit cumbersome, would it work if we moved the fallback
>> qpl_execute_job() down into the other loop? We could then avoid the
>> extra fields. Something like:
>> 
>> static void multifd_qpl_compress_pages(MultiFDSendParams *p)
>> {
>>     QplData *qpl = p->compress_data;
>>     MultiFDPages_t *pages = p->pages;
>>     uint32_t out_size, size = p->page_size;
>>     uint8_t *buf, *zbuf;
>> 
>>     /*
>>      * Set output length to less than the page size to force the job to
>>      * fail in case it compresses to a larger size. We'll send that page
>>      * without compression to skip the decompression operation on the
>>      * destination.
>>      */
>>     out_size = size - 1;
>> 
>>     for (int i = 0; i < pages->normal_num; i++) {
>>         QplHwJob *hw_job = &qpl->hw_jobs[i];
>> 
>>         hw_job->fallback_sw_path = false;
>>         buf = pages->block->host + pages->offset[i];
>>         zbuf = qpl->zbuf + (size * i);
>> 
>>         multifd_qpl_prepare_comp_job(hw_job->job, buf, size, zbuf,
>> out_size);
>> 
>>         if (!multifd_qpl_submit_job(hw_job->job)) {
>>             hw_job->fallback_sw_path = true;
>>         }
>>     }
>> 
>>     for (int i = 0; i < pages->normal_num; i++) {
>>         QplHwJob *hw_job = &qpl->hw_jobs[i];
>>         qpl_job *job;
>> 
>>         buf = pages->block->host + pages->offset[i];
>>         zbuf = qpl->zbuf + (size * i);
>> 
>>         if (hw_job->fallback_sw_path) {
>>             job = qpl->sw_job;
>>             multifd_qpl_prepare_comp_job(job, buf, size, zbuf, out_size);
>>             ret = qpl_execute_job(job);
>>         } else {
>>             job = hw_job->job;
>>             ret = qpl_wait_job(job);
>>         }
>> 
>>         if (ret == QPL_STS_OK) {
>>             multifd_qpl_fill_packet(i, p, zbuf, job->total_out);
>>         } else {
>>             multifd_qpl_fill_packet(i, p, buf, size);
>>         }
>>     }
>> }
>
> Very thanks for the reference code, I have test the code and the performance is not good.
> When the work queue is full, after a hardware job fails to be submitted, the subsequent
> job submission will most likely fail as well. so my idea is to use software job execution
> instead immediately, but all subsequent jobs will still give priority to hardware path. 

So let me see if I get this, you're saying that going with the sw path
immediately after a hw path failure is beneficial because the time it
takes to call the sw path serves as a backoff time for the hw path?

Do you have an idea on the time difference of waiting for sw path
vs. introducing a delay to multifd_qpl_submit_job()? Aren't we leaving
performance on the table by going with a much slower sw path instead of
waiting for the queues to open up? Or some other strategy, such as going
once again over the not-submitted pages.

I understand there's a tradeoff here between your effort to investigate
these things and the amount of performance to be had, so feel free to
leave this question unanswered. We could choose to simply document this
with a comment:

    if (multifd_qpl_submit_job(hw_job->job)) {
        hw_job->fallback_sw_path = false;
        continue;
    }

    /* 
     * The IAA work queue is full, any immediate subsequent job
     * submission is likely to fail, sending the page via the QPL
     * software path at this point gives us a better chance of
     * finding the queue open for the next pages.
     */
    hw_job->fallback_sw_path = true;
    ...

> There is almost no overhead in job submission because Intel uses the new "enqcmd" instruction,
> which allows the user program to submit the job directly to the hardware.
>
> According to the implementation of the reference code, when a job fails to be submitted, there 
> is a high probability that "ALL" subsequent jobs will fail to be submitted and then use software
> compression, resulting in the IAA hardware not being fully utilized.
>
> For 4 Channel, 1 IAA device test case, using the reference code will reduce IAA throughput 
> from 3.4GBps to 2.2GBps, thus affecting live migration performance.(total time from 14s to 18s)
>
RE: [PATCH v7 6/7] migration/multifd: implement qpl compression and decompression
Posted by Liu, Yuan1 5 months, 3 weeks ago
> -----Original Message-----
> From: Fabiano Rosas <farosas@suse.de>
> Sent: Thursday, June 6, 2024 9:52 PM
> To: Liu, Yuan1 <yuan1.liu@intel.com>; peterx@redhat.com;
> pbonzini@redhat.com; marcandre.lureau@redhat.com; berrange@redhat.com;
> thuth@redhat.com; philmd@linaro.org
> Cc: qemu-devel@nongnu.org; Zou, Nanhai <nanhai.zou@intel.com>;
> shameerali.kolothum.thodi@huawei.com
> Subject: RE: [PATCH v7 6/7] migration/multifd: implement qpl compression
> and decompression
> 
> "Liu, Yuan1" <yuan1.liu@intel.com> writes:
> 
> >> -----Original Message-----
> >> From: Fabiano Rosas <farosas@suse.de>
> >> Sent: Thursday, June 6, 2024 6:26 AM
> >> To: Liu, Yuan1 <yuan1.liu@intel.com>; peterx@redhat.com;
> >> pbonzini@redhat.com; marcandre.lureau@redhat.com; berrange@redhat.com;
> >> thuth@redhat.com; philmd@linaro.org
> >> Cc: qemu-devel@nongnu.org; Liu, Yuan1 <yuan1.liu@intel.com>; Zou,
> Nanhai
> >> <nanhai.zou@intel.com>; shameerali.kolothum.thodi@huawei.com
> >> Subject: Re: [PATCH v7 6/7] migration/multifd: implement qpl
> compression
> >> and decompression
> >>
> >> Yuan Liu <yuan1.liu@intel.com> writes:
> >>
> >> > QPL compression and decompression will use IAA hardware first.
> >> > If IAA hardware is not available, it will automatically fall
> >> > back to QPL software path, if the software job also fails,
> >> > the uncompressed page is sent directly.
> >> >
> >> > Signed-off-by: Yuan Liu <yuan1.liu@intel.com>
> >> > Reviewed-by: Nanhai Zou <nanhai.zou@intel.com>
> >> > ---
> >> >  migration/multifd-qpl.c | 412
> +++++++++++++++++++++++++++++++++++++++-
> >> >  1 file changed, 408 insertions(+), 4 deletions(-)
> >> >
> >> > diff --git a/migration/multifd-qpl.c b/migration/multifd-qpl.c
> >> > index 6791a204d5..18b3384bd5 100644
> >> > --- a/migration/multifd-qpl.c
> >> > +++ b/migration/multifd-qpl.c
> >> > @@ -13,9 +13,14 @@
> >> >  #include "qemu/osdep.h"
> >> >  #include "qemu/module.h"
> >> >  #include "qapi/error.h"
> >> > +#include "qapi/qapi-types-migration.h"
> >> > +#include "exec/ramblock.h"
> >> >  #include "multifd.h"
> >> >  #include "qpl/qpl.h"
> >> >
> >> > +/* Maximum number of retries to resubmit a job if IAA work queues
> are
> >> full */
> >> > +#define MAX_SUBMIT_RETRY_NUM (3)
> >> > +
> >> >  typedef struct {
> >> >      /* the QPL hardware path job */
> >> >      qpl_job *job;
> >> > @@ -260,6 +265,219 @@ static void
> >> multifd_qpl_send_cleanup(MultiFDSendParams *p, Error **errp)
> >> >      p->iov = NULL;
> >> >  }
> >> >
> >> > +/**
> >> > + * multifd_qpl_prepare_job: prepare the job
> >> > + *
> >> > + * Set the QPL job parameters and properties.
> >> > + *
> >> > + * @job: pointer to the qpl_job structure
> >> > + * @is_compression: indicates compression and decompression
> >> > + * @input: pointer to the input data buffer
> >> > + * @input_len: the length of the input data
> >> > + * @output: pointer to the output data buffer
> >> > + * @output_len: the length of the output data
> >> > + */
> >> > +static void multifd_qpl_prepare_job(qpl_job *job, bool
> is_compression,
> >> > +                                    uint8_t *input, uint32_t
> input_len,
> >> > +                                    uint8_t *output, uint32_t
> >> output_len)
> >> > +{
> >> > +    job->op = is_compression ? qpl_op_compress : qpl_op_decompress;
> >> > +    job->next_in_ptr = input;
> >> > +    job->next_out_ptr = output;
> >> > +    job->available_in = input_len;
> >> > +    job->available_out = output_len;
> >> > +    job->flags = QPL_FLAG_FIRST | QPL_FLAG_LAST |
> QPL_FLAG_OMIT_VERIFY;
> >> > +    /* only supports compression level 1 */
> >> > +    job->level = 1;
> >> > +}
> >> > +
> >> > +/**
> >> > + * multifd_qpl_prepare_job: prepare the compression job
> >>
> >> function name is wrong
> >
> > Thanks, I will fix this next version.
> >
> >> > + *
> >> > + * Set the compression job parameters and properties.
> >> > + *
> >> > + * @job: pointer to the qpl_job structure
> >> > + * @input: pointer to the input data buffer
> >> > + * @input_len: the length of the input data
> >> > + * @output: pointer to the output data buffer
> >> > + * @output_len: the length of the output data
> >> > + */
> >> > +static void multifd_qpl_prepare_comp_job(qpl_job *job, uint8_t
> *input,
> >> > +                                         uint32_t input_len, uint8_t
> >> *output,
> >> > +                                         uint32_t output_len)
> >> > +{
> >> > +    multifd_qpl_prepare_job(job, true, input, input_len, output,
> >> output_len);
> >> > +}
> >> > +
> >> > +/**
> >> > + * multifd_qpl_prepare_job: prepare the decompression job
> >
> > Thanks, I will fix this next version.
> >
> >> > + *
> >> > + * Set the decompression job parameters and properties.
> >> > + *
> >> > + * @job: pointer to the qpl_job structure
> >> > + * @input: pointer to the input data buffer
> >> > + * @input_len: the length of the input data
> >> > + * @output: pointer to the output data buffer
> >> > + * @output_len: the length of the output data
> >> > + */
> >> > +static void multifd_qpl_prepare_decomp_job(qpl_job *job, uint8_t
> >> *input,
> >> > +                                           uint32_t input_len,
> uint8_t
> >> *output,
> >> > +                                           uint32_t output_len)
> >> > +{
> >> > +    multifd_qpl_prepare_job(job, false, input, input_len, output,
> >> output_len);
> >> > +}
> >> > +
> >> > +/**
> >> > + * multifd_qpl_fill_iov: fill in the IOV
> >> > + *
> >> > + * Fill in the QPL packet IOV
> >> > + *
> >> > + * @p: Params for the channel being used
> >> > + * @data: pointer to the IOV data
> >> > + * @len: The length of the IOV data
> >> > + */
> >> > +static void multifd_qpl_fill_iov(MultiFDSendParams *p, uint8_t
> *data,
> >> > +                                 uint32_t len)
> >> > +{
> >> > +    p->iov[p->iovs_num].iov_base = data;
> >> > +    p->iov[p->iovs_num].iov_len = len;
> >> > +    p->iovs_num++;
> >> > +    p->next_packet_size += len;
> >> > +}
> >> > +
> >> > +/**
> >> > + * multifd_qpl_fill_packet: fill the compressed page into the QPL
> >> packet
> >> > + *
> >> > + * Fill the compressed page length and IOV into the QPL packet
> >> > + *
> >> > + * @idx: The index of the compressed length array
> >> > + * @p: Params for the channel being used
> >> > + * @data: pointer to the compressed page buffer
> >> > + * @len: The length of the compressed page
> >> > + */
> >> > +static void multifd_qpl_fill_packet(uint32_t idx, MultiFDSendParams
> *p,
> >> > +                                    uint8_t *data, uint32_t len)
> >> > +{
> >> > +    QplData *qpl = p->compress_data;
> >> > +
> >> > +    qpl->zlen[idx] = cpu_to_be32(len);
> >> > +    multifd_qpl_fill_iov(p, data, len);
> >> > +}
> >> > +
> >> > +/**
> >> > + * multifd_qpl_submit_job: submit a job to the hardware
> >> > + *
> >> > + * Submit a QPL hardware job to the IAA device
> >> > + *
> >> > + * Returns true if the job is submitted successfully, otherwise
> false.
> >> > + *
> >> > + * @job: pointer to the qpl_job structure
> >> > + */
> >> > +static bool multifd_qpl_submit_job(qpl_job *job)
> >> > +{
> >> > +    qpl_status status;
> >> > +    uint32_t num = 0;
> >> > +
> >> > +retry:
> >> > +    status = qpl_submit_job(job);
> >> > +    if (status == QPL_STS_QUEUES_ARE_BUSY_ERR) {
> >> > +        if (num < MAX_SUBMIT_RETRY_NUM) {
> >> > +            num++;
> >> > +            goto retry;
> >> > +        }
> >> > +    }
> >> > +    return (status == QPL_STS_OK);
> >>
> >> How often do we expect this to fail? Will the queues be busy frequently
> >> or is this an unlikely event? I'm thinking whether we really need to
> >> allow a fallback for the hw path. Sorry if this has been discussed
> >> already, I don't remember.
> >
> > In some scenarios, this may happen frequently, such as configuring 4
> channels
> > but only one IAA device is available. In the case of insufficient IAA
> hardware
> > resources, retry and fallback can help optimize performance.
> > I have a comparison test below
> >
> > 1. Retry + SW fallback:
> >    total time: 14649 ms
> >    downtime: 25 ms
> >    throughput: 17666.57 mbps
> >    pages-per-second: 1509647
> >
> > 2. No fallback, always wait for work queues to become available
> >    total time: 18381 ms
> >    downtime: 25 ms
> >    throughput: 13698.65 mbps
> >    pages-per-second: 859607
> 
> Thanks for the data, this is helpful. Let's include it in the commit
> message, it's important to let people know you actually did that
> analysis. I put a suggestion below:
> 
> ---
> QPL compression and decompression will use IAA hardware path if the IAA
> hardware is available. Otherwise the QPL library software path is used.
> 
> The hardware path will automatically fall back to QPL software path if
> the IAA queues are busy. In some scenarios, this may happen frequently,
> such as configuring 4 channels but only one IAA device is available. In
> the case of insufficient IAA hardware resources, retry and fallback can
> help optimize performance:
> 
>  1. Retry + SW fallback:
>     total time: 14649 ms
>     downtime: 25 ms
>     throughput: 17666.57 mbps
>     pages-per-second: 1509647
> 
>  2. No fallback, always wait for work queues to become available
>     total time: 18381 ms
>     downtime: 25 ms
>     throughput: 13698.65 mbps
>     pages-per-second: 859607
> 
> If both the hardware and software paths fail, the uncompressed page is
> sent directly.

Very thanks for your comments, I will add these to the commit message.

> >> > +}
> >> > +
> >> > +/**
> >> > + * multifd_qpl_compress_pages_slow_path: compress pages using slow
> path
> >> > + *
> >> > + * Compress the pages using software. If compression fails, the page
> >> will
> >> > + * be sent directly.
> >> > + *
> >> > + * @p: Params for the channel being used
> >> > + */
> >> > +static void multifd_qpl_compress_pages_slow_path(MultiFDSendParams
> *p)
> >> > +{
> >> > +    QplData *qpl = p->compress_data;
> >> > +    uint32_t size = p->page_size;
> >> > +    qpl_job *job = qpl->sw_job;
> >> > +    uint8_t *zbuf = qpl->zbuf;
> >> > +    uint8_t *buf;
> >> > +
> >> > +    for (int i = 0; i < p->pages->normal_num; i++) {
> >> > +        buf = p->pages->block->host + p->pages->offset[i];
> >> > +        /* Set output length to less than the page to reduce
> >> decompression */
> >> > +        multifd_qpl_prepare_comp_job(job, buf, size, zbuf, size -
> 1);
> >> > +        if (qpl_execute_job(job) == QPL_STS_OK) {
> >> > +            multifd_qpl_fill_packet(i, p, zbuf, job->total_out);
> >> > +        } else {
> >> > +            /* send the page directly */
> >>
> >> s/directly/uncompressed/
> >>
> >> a bit clearer.
> >
> > Sure, I will fix it next version.
> >
> >> > +            multifd_qpl_fill_packet(i, p, buf, size);
> >> > +        }
> >> > +        zbuf += size;
> >> > +    }
> >> > +}
> >> > +
> >> > +/**
> >> > + * multifd_qpl_compress_pages: compress pages
> >> > + *
> >> > + * Submit the pages to the IAA hardware for compression. If hardware
> >> > + * compression fails, it falls back to software compression. If
> >> software
> >> > + * compression also fails, the page is sent directly
> >> > + *
> >> > + * @p: Params for the channel being used
> >> > + */
> >> > +static void multifd_qpl_compress_pages(MultiFDSendParams *p)
> >> > +{
> >> > +    QplData *qpl = p->compress_data;
> >> > +    MultiFDPages_t *pages = p->pages;
> >> > +    uint32_t size = p->page_size;
> >> > +    QplHwJob *hw_job;
> >> > +    uint8_t *buf;
> >> > +    uint8_t *zbuf;
> >> > +
> >>
> >> Let's document the output size choice more explicitly:
> >>
> >>     /*
> >>      * Set output length to less than the page size to force the job to
> >>      * fail in case it compresses to a larger size. We'll send that
> page
> >>      * without compression and skip the decompression operation on the
> >>      * destination.
> >>      */
> >>      out_size = size - 1;
> >>
> >> you can then omit the other comments.
> >
> > Thanks for the comments, I will refine this next version.
> >
> >> > +    for (int i = 0; i < pages->normal_num; i++) {
> >> > +        buf = pages->block->host + pages->offset[i];
> >> > +        zbuf = qpl->zbuf + (size * i);
> >> > +        hw_job = &qpl->hw_jobs[i];
> >> > +        /* Set output length to less than the page to reduce
> >> decompression */
> >> > +        multifd_qpl_prepare_comp_job(hw_job->job, buf, size, zbuf,
> size
> >> - 1);
> >> > +        if (multifd_qpl_submit_job(hw_job->job)) {
> >> > +            hw_job->fallback_sw_path = false;
> >> > +        } else {
> >> > +            hw_job->fallback_sw_path = true;
> >> > +            /* Set output length less than page size to reduce
> >> decompression */
> >> > +            multifd_qpl_prepare_comp_job(qpl->sw_job, buf, size,
> zbuf,
> >> > +                                         size - 1);
> >> > +            if (qpl_execute_job(qpl->sw_job) == QPL_STS_OK) {
> >> > +                hw_job->sw_output = zbuf;
> >> > +                hw_job->sw_output_len = qpl->sw_job->total_out;
> >> > +            } else {
> >> > +                hw_job->sw_output = buf;
> >> > +                hw_job->sw_output_len = size;
> >> > +            }
> >>
> >> Hmm, these look a bit cumbersome, would it work if we moved the
> fallback
> >> qpl_execute_job() down into the other loop? We could then avoid the
> >> extra fields. Something like:
> >>
> >> static void multifd_qpl_compress_pages(MultiFDSendParams *p)
> >> {
> >>     QplData *qpl = p->compress_data;
> >>     MultiFDPages_t *pages = p->pages;
> >>     uint32_t out_size, size = p->page_size;
> >>     uint8_t *buf, *zbuf;
> >>
> >>     /*
> >>      * Set output length to less than the page size to force the job to
> >>      * fail in case it compresses to a larger size. We'll send that
> page
> >>      * without compression to skip the decompression operation on the
> >>      * destination.
> >>      */
> >>     out_size = size - 1;
> >>
> >>     for (int i = 0; i < pages->normal_num; i++) {
> >>         QplHwJob *hw_job = &qpl->hw_jobs[i];
> >>
> >>         hw_job->fallback_sw_path = false;
> >>         buf = pages->block->host + pages->offset[i];
> >>         zbuf = qpl->zbuf + (size * i);
> >>
> >>         multifd_qpl_prepare_comp_job(hw_job->job, buf, size, zbuf,
> >> out_size);
> >>
> >>         if (!multifd_qpl_submit_job(hw_job->job)) {
> >>             hw_job->fallback_sw_path = true;
> >>         }
> >>     }
> >>
> >>     for (int i = 0; i < pages->normal_num; i++) {
> >>         QplHwJob *hw_job = &qpl->hw_jobs[i];
> >>         qpl_job *job;
> >>
> >>         buf = pages->block->host + pages->offset[i];
> >>         zbuf = qpl->zbuf + (size * i);
> >>
> >>         if (hw_job->fallback_sw_path) {
> >>             job = qpl->sw_job;
> >>             multifd_qpl_prepare_comp_job(job, buf, size, zbuf,
> out_size);
> >>             ret = qpl_execute_job(job);
> >>         } else {
> >>             job = hw_job->job;
> >>             ret = qpl_wait_job(job);
> >>         }
> >>
> >>         if (ret == QPL_STS_OK) {
> >>             multifd_qpl_fill_packet(i, p, zbuf, job->total_out);
> >>         } else {
> >>             multifd_qpl_fill_packet(i, p, buf, size);
> >>         }
> >>     }
> >> }
> >
> > Very thanks for the reference code, I have test the code and the
> performance is not good.
> > When the work queue is full, after a hardware job fails to be submitted,
> the subsequent
> > job submission will most likely fail as well. so my idea is to use
> software job execution
> > instead immediately, but all subsequent jobs will still give priority to
> hardware path.
> 
> So let me see if I get this, you're saying that going with the sw path
> immediately after a hw path failure is beneficial because the time it
> takes to call the sw path serves as a backoff time for the hw path?

Exactly, I want to use the sw path as the backoff time for the hardware path.

> Do you have an idea on the time difference of waiting for sw path
> vs. introducing a delay to multifd_qpl_submit_job()? Aren't we leaving
> performance on the table by going with a much slower sw path instead of
> waiting for the queues to open up? Or some other strategy, such as going
> once again over the not-submitted pages.

Using a specific delay time to guarantee performance may be difficult now,
because the solution only supports shared working queue mode and when the live
migration starts waiting for a specified time, other workloads may still fill 
the device work queue, causing the live migration job submission to fail after
a while.

I agree with your point. Currently, using the software path to solve the backoff
time may also cause the performance drop due to software path overhead. I will 
consider how to solve it in the future.

> I understand there's a tradeoff here between your effort to investigate
> these things and the amount of performance to be had, so feel free to
> leave this question unanswered. We could choose to simply document this
> with a comment:

Sure, I'll leave the comments on this so I can improve it in the future

>     if (multifd_qpl_submit_job(hw_job->job)) {
>         hw_job->fallback_sw_path = false;
>         continue;
>     }
> 
>     /*
>      * The IAA work queue is full, any immediate subsequent job
>      * submission is likely to fail, sending the page via the QPL
>      * software path at this point gives us a better chance of
>      * finding the queue open for the next pages.
>      */
>     hw_job->fallback_sw_path = true;
>     ...
> 
> > There is almost no overhead in job submission because Intel uses the new
> "enqcmd" instruction,
> > which allows the user program to submit the job directly to the
> hardware.
> >
> > According to the implementation of the reference code, when a job fails
> to be submitted, there
> > is a high probability that "ALL" subsequent jobs will fail to be
> submitted and then use software
> > compression, resulting in the IAA hardware not being fully utilized.
> >
> > For 4 Channel, 1 IAA device test case, using the reference code will
> reduce IAA throughput
> > from 3.4GBps to 2.2GBps, thus affecting live migration
> performance.(total time from 14s to 18s)
> >