From: Xiao Guangrong <xiaoguangrong@tencent.com>
Adapt the compression code to the threaded workqueue
Signed-off-by: Xiao Guangrong <xiaoguangrong@tencent.com>
---
migration/ram.c | 308 ++++++++++++++++++++------------------------------------
1 file changed, 110 insertions(+), 198 deletions(-)
diff --git a/migration/ram.c b/migration/ram.c
index 7e7deec4d8..254c08f27b 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -57,6 +57,7 @@
#include "qemu/uuid.h"
#include "savevm.h"
#include "qemu/iov.h"
+#include "qemu/threaded-workqueue.h"
/***********************************************************/
/* ram save/restore */
@@ -349,22 +350,6 @@ typedef struct PageSearchStatus PageSearchStatus;
CompressionStats compression_counters;
-struct CompressParam {
- bool done;
- bool quit;
- bool zero_page;
- QEMUFile *file;
- QemuMutex mutex;
- QemuCond cond;
- RAMBlock *block;
- ram_addr_t offset;
-
- /* internally used fields */
- z_stream stream;
- uint8_t *originbuf;
-};
-typedef struct CompressParam CompressParam;
-
struct DecompressParam {
bool done;
bool quit;
@@ -377,15 +362,6 @@ struct DecompressParam {
};
typedef struct DecompressParam DecompressParam;
-static CompressParam *comp_param;
-static QemuThread *compress_threads;
-/* comp_done_cond is used to wake up the migration thread when
- * one of the compression threads has finished the compression.
- * comp_done_lock is used to co-work with comp_done_cond.
- */
-static QemuMutex comp_done_lock;
-static QemuCond comp_done_cond;
-/* The empty QEMUFileOps will be used by file in CompressParam */
static const QEMUFileOps empty_ops = { };
static QEMUFile *decomp_file;
@@ -394,125 +370,6 @@ static QemuThread *decompress_threads;
static QemuMutex decomp_done_lock;
static QemuCond decomp_done_cond;
-static bool do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block,
- ram_addr_t offset, uint8_t *source_buf);
-
-static void *do_data_compress(void *opaque)
-{
- CompressParam *param = opaque;
- RAMBlock *block;
- ram_addr_t offset;
- bool zero_page;
-
- qemu_mutex_lock(¶m->mutex);
- while (!param->quit) {
- if (param->block) {
- block = param->block;
- offset = param->offset;
- param->block = NULL;
- qemu_mutex_unlock(¶m->mutex);
-
- zero_page = do_compress_ram_page(param->file, ¶m->stream,
- block, offset, param->originbuf);
-
- qemu_mutex_lock(&comp_done_lock);
- param->done = true;
- param->zero_page = zero_page;
- qemu_cond_signal(&comp_done_cond);
- qemu_mutex_unlock(&comp_done_lock);
-
- qemu_mutex_lock(¶m->mutex);
- } else {
- qemu_cond_wait(¶m->cond, ¶m->mutex);
- }
- }
- qemu_mutex_unlock(¶m->mutex);
-
- return NULL;
-}
-
-static void compress_threads_save_cleanup(void)
-{
- int i, thread_count;
-
- if (!migrate_use_compression() || !comp_param) {
- return;
- }
-
- thread_count = migrate_compress_threads();
- for (i = 0; i < thread_count; i++) {
- /*
- * we use it as a indicator which shows if the thread is
- * properly init'd or not
- */
- if (!comp_param[i].file) {
- break;
- }
-
- qemu_mutex_lock(&comp_param[i].mutex);
- comp_param[i].quit = true;
- qemu_cond_signal(&comp_param[i].cond);
- qemu_mutex_unlock(&comp_param[i].mutex);
-
- qemu_thread_join(compress_threads + i);
- qemu_mutex_destroy(&comp_param[i].mutex);
- qemu_cond_destroy(&comp_param[i].cond);
- deflateEnd(&comp_param[i].stream);
- g_free(comp_param[i].originbuf);
- qemu_fclose(comp_param[i].file);
- comp_param[i].file = NULL;
- }
- qemu_mutex_destroy(&comp_done_lock);
- qemu_cond_destroy(&comp_done_cond);
- g_free(compress_threads);
- g_free(comp_param);
- compress_threads = NULL;
- comp_param = NULL;
-}
-
-static int compress_threads_save_setup(void)
-{
- int i, thread_count;
-
- if (!migrate_use_compression()) {
- return 0;
- }
- thread_count = migrate_compress_threads();
- compress_threads = g_new0(QemuThread, thread_count);
- comp_param = g_new0(CompressParam, thread_count);
- qemu_cond_init(&comp_done_cond);
- qemu_mutex_init(&comp_done_lock);
- for (i = 0; i < thread_count; i++) {
- comp_param[i].originbuf = g_try_malloc(TARGET_PAGE_SIZE);
- if (!comp_param[i].originbuf) {
- goto exit;
- }
-
- if (deflateInit(&comp_param[i].stream,
- migrate_compress_level()) != Z_OK) {
- g_free(comp_param[i].originbuf);
- goto exit;
- }
-
- /* comp_param[i].file is just used as a dummy buffer to save data,
- * set its ops to empty.
- */
- comp_param[i].file = qemu_fopen_ops(NULL, &empty_ops);
- comp_param[i].done = true;
- comp_param[i].quit = false;
- qemu_mutex_init(&comp_param[i].mutex);
- qemu_cond_init(&comp_param[i].cond);
- qemu_thread_create(compress_threads + i, "compress",
- do_data_compress, comp_param + i,
- QEMU_THREAD_JOINABLE);
- }
- return 0;
-
-exit:
- compress_threads_save_cleanup();
- return -1;
-}
-
/* Multiple fd's */
#define MULTIFD_MAGIC 0x11223344U
@@ -1909,12 +1766,25 @@ exit:
return zero_page;
}
+struct CompressData {
+ /* filled by migration thread.*/
+ RAMBlock *block;
+ ram_addr_t offset;
+
+ /* filled by compress thread. */
+ QEMUFile *file;
+ z_stream stream;
+ uint8_t *originbuf;
+ bool zero_page;
+};
+typedef struct CompressData CompressData;
+
static void
-update_compress_thread_counts(const CompressParam *param, int bytes_xmit)
+update_compress_thread_counts(CompressData *cd, int bytes_xmit)
{
ram_counters.transferred += bytes_xmit;
- if (param->zero_page) {
+ if (cd->zero_page) {
ram_counters.duplicate++;
return;
}
@@ -1924,81 +1794,123 @@ update_compress_thread_counts(const CompressParam *param, int bytes_xmit)
compression_counters.pages++;
}
+static int compress_thread_data_init(void *request)
+{
+ CompressData *cd = request;
+
+ cd->originbuf = g_try_malloc(TARGET_PAGE_SIZE);
+ if (!cd->originbuf) {
+ return -1;
+ }
+
+ if (deflateInit(&cd->stream, migrate_compress_level()) != Z_OK) {
+ g_free(cd->originbuf);
+ return -1;
+ }
+
+ cd->file = qemu_fopen_ops(NULL, &empty_ops);
+ return 0;
+}
+
+static void compress_thread_data_fini(void *request)
+{
+ CompressData *cd = request;
+
+ qemu_fclose(cd->file);
+ deflateEnd(&cd->stream);
+ g_free(cd->originbuf);
+}
+
+static void compress_thread_data_handler(void *request)
+{
+ CompressData *cd = request;
+
+ /*
+ * if compression fails, it will be indicated by
+ * migrate_get_current()->to_dst_file.
+ */
+ cd->zero_page = do_compress_ram_page(cd->file, &cd->stream, cd->block,
+ cd->offset, cd->originbuf);
+}
+
+static void compress_thread_data_done(void *request)
+{
+ CompressData *cd = request;
+ RAMState *rs = ram_state;
+ int bytes_xmit;
+
+ bytes_xmit = qemu_put_qemu_file(rs->f, cd->file);
+ update_compress_thread_counts(cd, bytes_xmit);
+}
+
+static const ThreadedWorkqueueOps compress_ops = {
+ .thread_request_init = compress_thread_data_init,
+ .thread_request_uninit = compress_thread_data_fini,
+ .thread_request_handler = compress_thread_data_handler,
+ .thread_request_done = compress_thread_data_done,
+ .request_size = sizeof(CompressData),
+};
+
+static Threads *compress_threads;
+
static bool save_page_use_compression(RAMState *rs);
static void flush_compressed_data(RAMState *rs)
{
- int idx, len, thread_count;
-
if (!save_page_use_compression(rs)) {
return;
}
- thread_count = migrate_compress_threads();
- qemu_mutex_lock(&comp_done_lock);
- for (idx = 0; idx < thread_count; idx++) {
- while (!comp_param[idx].done) {
- qemu_cond_wait(&comp_done_cond, &comp_done_lock);
- }
- }
- qemu_mutex_unlock(&comp_done_lock);
+ threaded_workqueue_wait_for_requests(compress_threads);
+}
- for (idx = 0; idx < thread_count; idx++) {
- qemu_mutex_lock(&comp_param[idx].mutex);
- if (!comp_param[idx].quit) {
- len = qemu_put_qemu_file(rs->f, comp_param[idx].file);
- /*
- * it's safe to fetch zero_page without holding comp_done_lock
- * as there is no further request submitted to the thread,
- * i.e, the thread should be waiting for a request at this point.
- */
- update_compress_thread_counts(&comp_param[idx], len);
- }
- qemu_mutex_unlock(&comp_param[idx].mutex);
+static void compress_threads_save_cleanup(void)
+{
+ if (!compress_threads) {
+ return;
}
+
+ threaded_workqueue_destroy(compress_threads);
+ compress_threads = NULL;
}
-static inline void set_compress_params(CompressParam *param, RAMBlock *block,
- ram_addr_t offset)
+static int compress_threads_save_setup(void)
{
- param->block = block;
- param->offset = offset;
+ if (!migrate_use_compression()) {
+ return 0;
+ }
+
+ compress_threads = threaded_workqueue_create("compress",
+ migrate_compress_threads(),
+ DEFAULT_THREAD_REQUEST_NR, &compress_ops);
+ return compress_threads ? 0 : -1;
}
static int compress_page_with_multi_thread(RAMState *rs, RAMBlock *block,
ram_addr_t offset)
{
- int idx, thread_count, bytes_xmit = -1, pages = -1;
+ CompressData *cd;
bool wait = migrate_compress_wait_thread();
- thread_count = migrate_compress_threads();
- qemu_mutex_lock(&comp_done_lock);
retry:
- for (idx = 0; idx < thread_count; idx++) {
- if (comp_param[idx].done) {
- comp_param[idx].done = false;
- bytes_xmit = qemu_put_qemu_file(rs->f, comp_param[idx].file);
- qemu_mutex_lock(&comp_param[idx].mutex);
- set_compress_params(&comp_param[idx], block, offset);
- qemu_cond_signal(&comp_param[idx].cond);
- qemu_mutex_unlock(&comp_param[idx].mutex);
- pages = 1;
- update_compress_thread_counts(&comp_param[idx], bytes_xmit);
- break;
+ cd = threaded_workqueue_get_request(compress_threads);
+ if (!cd) {
+ /*
+ * wait for the free thread if the user specifies
+ * 'compress-wait-thread', otherwise we will post
+ * the page out in the main thread as normal page.
+ */
+ if (wait) {
+ cpu_relax();
+ goto retry;
}
- }
- /*
- * wait for the free thread if the user specifies 'compress-wait-thread',
- * otherwise we will post the page out in the main thread as normal page.
- */
- if (pages < 0 && wait) {
- qemu_cond_wait(&comp_done_cond, &comp_done_lock);
- goto retry;
- }
- qemu_mutex_unlock(&comp_done_lock);
-
- return pages;
+ return -1;
+ }
+ cd->block = block;
+ cd->offset = offset;
+ threaded_workqueue_submit_request(compress_threads, cd);
+ return 1;
}
/**
--
2.14.5
* guangrong.xiao@gmail.com (guangrong.xiao@gmail.com) wrote:
> From: Xiao Guangrong <xiaoguangrong@tencent.com>
>
> Adapt the compression code to the threaded workqueue
>
> Signed-off-by: Xiao Guangrong <xiaoguangrong@tencent.com>
> ---
> migration/ram.c | 308 ++++++++++++++++++++------------------------------------
> 1 file changed, 110 insertions(+), 198 deletions(-)
>
> diff --git a/migration/ram.c b/migration/ram.c
> index 7e7deec4d8..254c08f27b 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -57,6 +57,7 @@
> #include "qemu/uuid.h"
> #include "savevm.h"
> #include "qemu/iov.h"
> +#include "qemu/threaded-workqueue.h"
>
> /***********************************************************/
> /* ram save/restore */
> @@ -349,22 +350,6 @@ typedef struct PageSearchStatus PageSearchStatus;
>
> CompressionStats compression_counters;
>
> -struct CompressParam {
> - bool done;
> - bool quit;
> - bool zero_page;
> - QEMUFile *file;
> - QemuMutex mutex;
> - QemuCond cond;
> - RAMBlock *block;
> - ram_addr_t offset;
> -
> - /* internally used fields */
> - z_stream stream;
> - uint8_t *originbuf;
> -};
> -typedef struct CompressParam CompressParam;
> -
> struct DecompressParam {
> bool done;
> bool quit;
> @@ -377,15 +362,6 @@ struct DecompressParam {
> };
> typedef struct DecompressParam DecompressParam;
>
> -static CompressParam *comp_param;
> -static QemuThread *compress_threads;
> -/* comp_done_cond is used to wake up the migration thread when
> - * one of the compression threads has finished the compression.
> - * comp_done_lock is used to co-work with comp_done_cond.
> - */
> -static QemuMutex comp_done_lock;
> -static QemuCond comp_done_cond;
> -/* The empty QEMUFileOps will be used by file in CompressParam */
> static const QEMUFileOps empty_ops = { };
>
> static QEMUFile *decomp_file;
> @@ -394,125 +370,6 @@ static QemuThread *decompress_threads;
> static QemuMutex decomp_done_lock;
> static QemuCond decomp_done_cond;
>
> -static bool do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block,
> - ram_addr_t offset, uint8_t *source_buf);
> -
> -static void *do_data_compress(void *opaque)
> -{
> - CompressParam *param = opaque;
> - RAMBlock *block;
> - ram_addr_t offset;
> - bool zero_page;
> -
> - qemu_mutex_lock(¶m->mutex);
> - while (!param->quit) {
> - if (param->block) {
> - block = param->block;
> - offset = param->offset;
> - param->block = NULL;
> - qemu_mutex_unlock(¶m->mutex);
> -
> - zero_page = do_compress_ram_page(param->file, ¶m->stream,
> - block, offset, param->originbuf);
> -
> - qemu_mutex_lock(&comp_done_lock);
> - param->done = true;
> - param->zero_page = zero_page;
> - qemu_cond_signal(&comp_done_cond);
> - qemu_mutex_unlock(&comp_done_lock);
> -
> - qemu_mutex_lock(¶m->mutex);
> - } else {
> - qemu_cond_wait(¶m->cond, ¶m->mutex);
> - }
> - }
> - qemu_mutex_unlock(¶m->mutex);
> -
> - return NULL;
> -}
> -
> -static void compress_threads_save_cleanup(void)
> -{
> - int i, thread_count;
> -
> - if (!migrate_use_compression() || !comp_param) {
> - return;
> - }
> -
> - thread_count = migrate_compress_threads();
> - for (i = 0; i < thread_count; i++) {
> - /*
> - * we use it as a indicator which shows if the thread is
> - * properly init'd or not
> - */
> - if (!comp_param[i].file) {
> - break;
> - }
> -
> - qemu_mutex_lock(&comp_param[i].mutex);
> - comp_param[i].quit = true;
> - qemu_cond_signal(&comp_param[i].cond);
> - qemu_mutex_unlock(&comp_param[i].mutex);
> -
> - qemu_thread_join(compress_threads + i);
> - qemu_mutex_destroy(&comp_param[i].mutex);
> - qemu_cond_destroy(&comp_param[i].cond);
> - deflateEnd(&comp_param[i].stream);
> - g_free(comp_param[i].originbuf);
> - qemu_fclose(comp_param[i].file);
> - comp_param[i].file = NULL;
> - }
> - qemu_mutex_destroy(&comp_done_lock);
> - qemu_cond_destroy(&comp_done_cond);
> - g_free(compress_threads);
> - g_free(comp_param);
> - compress_threads = NULL;
> - comp_param = NULL;
> -}
> -
> -static int compress_threads_save_setup(void)
> -{
> - int i, thread_count;
> -
> - if (!migrate_use_compression()) {
> - return 0;
> - }
> - thread_count = migrate_compress_threads();
> - compress_threads = g_new0(QemuThread, thread_count);
> - comp_param = g_new0(CompressParam, thread_count);
> - qemu_cond_init(&comp_done_cond);
> - qemu_mutex_init(&comp_done_lock);
> - for (i = 0; i < thread_count; i++) {
> - comp_param[i].originbuf = g_try_malloc(TARGET_PAGE_SIZE);
> - if (!comp_param[i].originbuf) {
> - goto exit;
> - }
> -
> - if (deflateInit(&comp_param[i].stream,
> - migrate_compress_level()) != Z_OK) {
> - g_free(comp_param[i].originbuf);
> - goto exit;
> - }
> -
> - /* comp_param[i].file is just used as a dummy buffer to save data,
> - * set its ops to empty.
> - */
> - comp_param[i].file = qemu_fopen_ops(NULL, &empty_ops);
> - comp_param[i].done = true;
> - comp_param[i].quit = false;
> - qemu_mutex_init(&comp_param[i].mutex);
> - qemu_cond_init(&comp_param[i].cond);
> - qemu_thread_create(compress_threads + i, "compress",
> - do_data_compress, comp_param + i,
> - QEMU_THREAD_JOINABLE);
> - }
> - return 0;
> -
> -exit:
> - compress_threads_save_cleanup();
> - return -1;
> -}
> -
> /* Multiple fd's */
>
> #define MULTIFD_MAGIC 0x11223344U
> @@ -1909,12 +1766,25 @@ exit:
> return zero_page;
> }
>
> +struct CompressData {
> + /* filled by migration thread.*/
> + RAMBlock *block;
> + ram_addr_t offset;
> +
> + /* filled by compress thread. */
> + QEMUFile *file;
> + z_stream stream;
> + uint8_t *originbuf;
> + bool zero_page;
> +};
> +typedef struct CompressData CompressData;
> +
> static void
> -update_compress_thread_counts(const CompressParam *param, int bytes_xmit)
> +update_compress_thread_counts(CompressData *cd, int bytes_xmit)
Keep the const?
> {
> ram_counters.transferred += bytes_xmit;
>
> - if (param->zero_page) {
> + if (cd->zero_page) {
> ram_counters.duplicate++;
> return;
> }
> @@ -1924,81 +1794,123 @@ update_compress_thread_counts(const CompressParam *param, int bytes_xmit)
> compression_counters.pages++;
> }
>
> +static int compress_thread_data_init(void *request)
> +{
> + CompressData *cd = request;
> +
> + cd->originbuf = g_try_malloc(TARGET_PAGE_SIZE);
> + if (!cd->originbuf) {
> + return -1;
> + }
> +
> + if (deflateInit(&cd->stream, migrate_compress_level()) != Z_OK) {
> + g_free(cd->originbuf);
> + return -1;
> + }
Please print errors if you fail in any case so we can easily tell what
happened.
> + cd->file = qemu_fopen_ops(NULL, &empty_ops);
> + return 0;
> +}
> +
> +static void compress_thread_data_fini(void *request)
> +{
> + CompressData *cd = request;
> +
> + qemu_fclose(cd->file);
> + deflateEnd(&cd->stream);
> + g_free(cd->originbuf);
> +}
> +
> +static void compress_thread_data_handler(void *request)
> +{
> + CompressData *cd = request;
> +
> + /*
> + * if compression fails, it will be indicated by
> + * migrate_get_current()->to_dst_file.
> + */
> + cd->zero_page = do_compress_ram_page(cd->file, &cd->stream, cd->block,
> + cd->offset, cd->originbuf);
> +}
> +
> +static void compress_thread_data_done(void *request)
> +{
> + CompressData *cd = request;
> + RAMState *rs = ram_state;
> + int bytes_xmit;
> +
> + bytes_xmit = qemu_put_qemu_file(rs->f, cd->file);
> + update_compress_thread_counts(cd, bytes_xmit);
> +}
> +
> +static const ThreadedWorkqueueOps compress_ops = {
> + .thread_request_init = compress_thread_data_init,
> + .thread_request_uninit = compress_thread_data_fini,
> + .thread_request_handler = compress_thread_data_handler,
> + .thread_request_done = compress_thread_data_done,
> + .request_size = sizeof(CompressData),
> +};
> +
> +static Threads *compress_threads;
> +
> static bool save_page_use_compression(RAMState *rs);
>
> static void flush_compressed_data(RAMState *rs)
> {
> - int idx, len, thread_count;
> -
> if (!save_page_use_compression(rs)) {
> return;
> }
> - thread_count = migrate_compress_threads();
>
> - qemu_mutex_lock(&comp_done_lock);
> - for (idx = 0; idx < thread_count; idx++) {
> - while (!comp_param[idx].done) {
> - qemu_cond_wait(&comp_done_cond, &comp_done_lock);
> - }
> - }
> - qemu_mutex_unlock(&comp_done_lock);
> + threaded_workqueue_wait_for_requests(compress_threads);
> +}
>
> - for (idx = 0; idx < thread_count; idx++) {
> - qemu_mutex_lock(&comp_param[idx].mutex);
> - if (!comp_param[idx].quit) {
> - len = qemu_put_qemu_file(rs->f, comp_param[idx].file);
> - /*
> - * it's safe to fetch zero_page without holding comp_done_lock
> - * as there is no further request submitted to the thread,
> - * i.e, the thread should be waiting for a request at this point.
> - */
> - update_compress_thread_counts(&comp_param[idx], len);
> - }
> - qemu_mutex_unlock(&comp_param[idx].mutex);
> +static void compress_threads_save_cleanup(void)
> +{
> + if (!compress_threads) {
> + return;
> }
> +
> + threaded_workqueue_destroy(compress_threads);
> + compress_threads = NULL;
> }
>
> -static inline void set_compress_params(CompressParam *param, RAMBlock *block,
> - ram_addr_t offset)
> +static int compress_threads_save_setup(void)
> {
> - param->block = block;
> - param->offset = offset;
> + if (!migrate_use_compression()) {
> + return 0;
> + }
> +
> + compress_threads = threaded_workqueue_create("compress",
> + migrate_compress_threads(),
> + DEFAULT_THREAD_REQUEST_NR, &compress_ops);
> + return compress_threads ? 0 : -1;
> }
>
> static int compress_page_with_multi_thread(RAMState *rs, RAMBlock *block,
> ram_addr_t offset)
> {
> - int idx, thread_count, bytes_xmit = -1, pages = -1;
> + CompressData *cd;
> bool wait = migrate_compress_wait_thread();
>
> - thread_count = migrate_compress_threads();
> - qemu_mutex_lock(&comp_done_lock);
> retry:
> - for (idx = 0; idx < thread_count; idx++) {
> - if (comp_param[idx].done) {
> - comp_param[idx].done = false;
> - bytes_xmit = qemu_put_qemu_file(rs->f, comp_param[idx].file);
> - qemu_mutex_lock(&comp_param[idx].mutex);
> - set_compress_params(&comp_param[idx], block, offset);
> - qemu_cond_signal(&comp_param[idx].cond);
> - qemu_mutex_unlock(&comp_param[idx].mutex);
> - pages = 1;
> - update_compress_thread_counts(&comp_param[idx], bytes_xmit);
> - break;
> + cd = threaded_workqueue_get_request(compress_threads);
> + if (!cd) {
> + /*
> + * wait for the free thread if the user specifies
> + * 'compress-wait-thread', otherwise we will post
> + * the page out in the main thread as normal page.
> + */
> + if (wait) {
> + cpu_relax();
> + goto retry;
Is there nothing better we can use to wait without eating CPU time?
Dave
> }
> - }
>
> - /*
> - * wait for the free thread if the user specifies 'compress-wait-thread',
> - * otherwise we will post the page out in the main thread as normal page.
> - */
> - if (pages < 0 && wait) {
> - qemu_cond_wait(&comp_done_cond, &comp_done_lock);
> - goto retry;
> - }
> - qemu_mutex_unlock(&comp_done_lock);
> -
> - return pages;
> + return -1;
> + }
> + cd->block = block;
> + cd->offset = offset;
> + threaded_workqueue_submit_request(compress_threads, cd);
> + return 1;
> }
>
> /**
> --
> 2.14.5
>
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
On 23/11/18 19:17, Dr. David Alan Gilbert wrote:
> * guangrong.xiao@gmail.com (guangrong.xiao@gmail.com) wrote:
>> From: Xiao Guangrong <xiaoguangrong@tencent.com>
>>
>> Adapt the compression code to the threaded workqueue
>>
>> Signed-off-by: Xiao Guangrong <xiaoguangrong@tencent.com>
>> ---
>> migration/ram.c | 308 ++++++++++++++++++++------------------------------------
>> 1 file changed, 110 insertions(+), 198 deletions(-)
>>
>> diff --git a/migration/ram.c b/migration/ram.c
>> index 7e7deec4d8..254c08f27b 100644
>> --- a/migration/ram.c
>> +++ b/migration/ram.c
>> @@ -57,6 +57,7 @@
>> #include "qemu/uuid.h"
>> #include "savevm.h"
>> #include "qemu/iov.h"
>> +#include "qemu/threaded-workqueue.h"
>>
>> /***********************************************************/
>> /* ram save/restore */
>> @@ -349,22 +350,6 @@ typedef struct PageSearchStatus PageSearchStatus;
>>
>> CompressionStats compression_counters;
>>
>> -struct CompressParam {
>> - bool done;
>> - bool quit;
>> - bool zero_page;
>> - QEMUFile *file;
>> - QemuMutex mutex;
>> - QemuCond cond;
>> - RAMBlock *block;
>> - ram_addr_t offset;
>> -
>> - /* internally used fields */
>> - z_stream stream;
>> - uint8_t *originbuf;
>> -};
>> -typedef struct CompressParam CompressParam;
>> -
>> struct DecompressParam {
>> bool done;
>> bool quit;
>> @@ -377,15 +362,6 @@ struct DecompressParam {
>> };
>> typedef struct DecompressParam DecompressParam;
>>
>> -static CompressParam *comp_param;
>> -static QemuThread *compress_threads;
>> -/* comp_done_cond is used to wake up the migration thread when
>> - * one of the compression threads has finished the compression.
>> - * comp_done_lock is used to co-work with comp_done_cond.
>> - */
>> -static QemuMutex comp_done_lock;
>> -static QemuCond comp_done_cond;
>> -/* The empty QEMUFileOps will be used by file in CompressParam */
>> static const QEMUFileOps empty_ops = { };
>>
>> static QEMUFile *decomp_file;
>> @@ -394,125 +370,6 @@ static QemuThread *decompress_threads;
>> static QemuMutex decomp_done_lock;
>> static QemuCond decomp_done_cond;
>>
>> -static bool do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block,
>> - ram_addr_t offset, uint8_t *source_buf);
>> -
>> -static void *do_data_compress(void *opaque)
>> -{
>> - CompressParam *param = opaque;
>> - RAMBlock *block;
>> - ram_addr_t offset;
>> - bool zero_page;
>> -
>> - qemu_mutex_lock(¶m->mutex);
>> - while (!param->quit) {
>> - if (param->block) {
>> - block = param->block;
>> - offset = param->offset;
>> - param->block = NULL;
>> - qemu_mutex_unlock(¶m->mutex);
>> -
>> - zero_page = do_compress_ram_page(param->file, ¶m->stream,
>> - block, offset, param->originbuf);
>> -
>> - qemu_mutex_lock(&comp_done_lock);
>> - param->done = true;
>> - param->zero_page = zero_page;
>> - qemu_cond_signal(&comp_done_cond);
>> - qemu_mutex_unlock(&comp_done_lock);
>> -
>> - qemu_mutex_lock(¶m->mutex);
>> - } else {
>> - qemu_cond_wait(¶m->cond, ¶m->mutex);
>> - }
>> - }
>> - qemu_mutex_unlock(¶m->mutex);
>> -
>> - return NULL;
>> -}
>> -
>> -static void compress_threads_save_cleanup(void)
>> -{
>> - int i, thread_count;
>> -
>> - if (!migrate_use_compression() || !comp_param) {
>> - return;
>> - }
>> -
>> - thread_count = migrate_compress_threads();
>> - for (i = 0; i < thread_count; i++) {
>> - /*
>> - * we use it as a indicator which shows if the thread is
>> - * properly init'd or not
>> - */
>> - if (!comp_param[i].file) {
>> - break;
>> - }
>> -
>> - qemu_mutex_lock(&comp_param[i].mutex);
>> - comp_param[i].quit = true;
>> - qemu_cond_signal(&comp_param[i].cond);
>> - qemu_mutex_unlock(&comp_param[i].mutex);
>> -
>> - qemu_thread_join(compress_threads + i);
>> - qemu_mutex_destroy(&comp_param[i].mutex);
>> - qemu_cond_destroy(&comp_param[i].cond);
>> - deflateEnd(&comp_param[i].stream);
>> - g_free(comp_param[i].originbuf);
>> - qemu_fclose(comp_param[i].file);
>> - comp_param[i].file = NULL;
>> - }
>> - qemu_mutex_destroy(&comp_done_lock);
>> - qemu_cond_destroy(&comp_done_cond);
>> - g_free(compress_threads);
>> - g_free(comp_param);
>> - compress_threads = NULL;
>> - comp_param = NULL;
>> -}
>> -
>> -static int compress_threads_save_setup(void)
>> -{
>> - int i, thread_count;
>> -
>> - if (!migrate_use_compression()) {
>> - return 0;
>> - }
>> - thread_count = migrate_compress_threads();
>> - compress_threads = g_new0(QemuThread, thread_count);
>> - comp_param = g_new0(CompressParam, thread_count);
>> - qemu_cond_init(&comp_done_cond);
>> - qemu_mutex_init(&comp_done_lock);
>> - for (i = 0; i < thread_count; i++) {
>> - comp_param[i].originbuf = g_try_malloc(TARGET_PAGE_SIZE);
>> - if (!comp_param[i].originbuf) {
>> - goto exit;
>> - }
>> -
>> - if (deflateInit(&comp_param[i].stream,
>> - migrate_compress_level()) != Z_OK) {
>> - g_free(comp_param[i].originbuf);
>> - goto exit;
>> - }
>> -
>> - /* comp_param[i].file is just used as a dummy buffer to save data,
>> - * set its ops to empty.
>> - */
>> - comp_param[i].file = qemu_fopen_ops(NULL, &empty_ops);
>> - comp_param[i].done = true;
>> - comp_param[i].quit = false;
>> - qemu_mutex_init(&comp_param[i].mutex);
>> - qemu_cond_init(&comp_param[i].cond);
>> - qemu_thread_create(compress_threads + i, "compress",
>> - do_data_compress, comp_param + i,
>> - QEMU_THREAD_JOINABLE);
>> - }
>> - return 0;
>> -
>> -exit:
>> - compress_threads_save_cleanup();
>> - return -1;
>> -}
>> -
>> /* Multiple fd's */
>>
>> #define MULTIFD_MAGIC 0x11223344U
>> @@ -1909,12 +1766,25 @@ exit:
>> return zero_page;
>> }
>>
>> +struct CompressData {
>> + /* filled by migration thread.*/
>> + RAMBlock *block;
>> + ram_addr_t offset;
>> +
>> + /* filled by compress thread. */
>> + QEMUFile *file;
>> + z_stream stream;
>> + uint8_t *originbuf;
>> + bool zero_page;
>> +};
>> +typedef struct CompressData CompressData;
>> +
>> static void
>> -update_compress_thread_counts(const CompressParam *param, int bytes_xmit)
>> +update_compress_thread_counts(CompressData *cd, int bytes_xmit)
>
> Keep the const?
>> {
>> ram_counters.transferred += bytes_xmit;
>>
>> - if (param->zero_page) {
>> + if (cd->zero_page) {
>> ram_counters.duplicate++;
>> return;
>> }
>> @@ -1924,81 +1794,123 @@ update_compress_thread_counts(const CompressParam *param, int bytes_xmit)
>> compression_counters.pages++;
>> }
>>
>> +static int compress_thread_data_init(void *request)
>> +{
>> + CompressData *cd = request;
>> +
>> + cd->originbuf = g_try_malloc(TARGET_PAGE_SIZE);
>> + if (!cd->originbuf) {
>> + return -1;
>> + }
>> +
>> + if (deflateInit(&cd->stream, migrate_compress_level()) != Z_OK) {
>> + g_free(cd->originbuf);
>> + return -1;
>> + }
>
> Please print errors if you fail in any case so we can easily tell what
> happened.
>
>> + cd->file = qemu_fopen_ops(NULL, &empty_ops);
>> + return 0;
>> +}
>> +
>> +static void compress_thread_data_fini(void *request)
>> +{
>> + CompressData *cd = request;
>> +
>> + qemu_fclose(cd->file);
>> + deflateEnd(&cd->stream);
>> + g_free(cd->originbuf);
>> +}
>> +
>> +static void compress_thread_data_handler(void *request)
>> +{
>> + CompressData *cd = request;
>> +
>> + /*
>> + * if compression fails, it will be indicated by
>> + * migrate_get_current()->to_dst_file.
>> + */
>> + cd->zero_page = do_compress_ram_page(cd->file, &cd->stream, cd->block,
>> + cd->offset, cd->originbuf);
>> +}
>> +
>> +static void compress_thread_data_done(void *request)
>> +{
>> + CompressData *cd = request;
>> + RAMState *rs = ram_state;
>> + int bytes_xmit;
>> +
>> + bytes_xmit = qemu_put_qemu_file(rs->f, cd->file);
>> + update_compress_thread_counts(cd, bytes_xmit);
>> +}
>> +
>> +static const ThreadedWorkqueueOps compress_ops = {
>> + .thread_request_init = compress_thread_data_init,
>> + .thread_request_uninit = compress_thread_data_fini,
>> + .thread_request_handler = compress_thread_data_handler,
>> + .thread_request_done = compress_thread_data_done,
>> + .request_size = sizeof(CompressData),
>> +};
>> +
>> +static Threads *compress_threads;
>> +
>> static bool save_page_use_compression(RAMState *rs);
>>
>> static void flush_compressed_data(RAMState *rs)
>> {
>> - int idx, len, thread_count;
>> -
>> if (!save_page_use_compression(rs)) {
>> return;
>> }
>> - thread_count = migrate_compress_threads();
>>
>> - qemu_mutex_lock(&comp_done_lock);
>> - for (idx = 0; idx < thread_count; idx++) {
>> - while (!comp_param[idx].done) {
>> - qemu_cond_wait(&comp_done_cond, &comp_done_lock);
>> - }
>> - }
>> - qemu_mutex_unlock(&comp_done_lock);
>> + threaded_workqueue_wait_for_requests(compress_threads);
>> +}
>>
>> - for (idx = 0; idx < thread_count; idx++) {
>> - qemu_mutex_lock(&comp_param[idx].mutex);
>> - if (!comp_param[idx].quit) {
>> - len = qemu_put_qemu_file(rs->f, comp_param[idx].file);
>> - /*
>> - * it's safe to fetch zero_page without holding comp_done_lock
>> - * as there is no further request submitted to the thread,
>> - * i.e, the thread should be waiting for a request at this point.
>> - */
>> - update_compress_thread_counts(&comp_param[idx], len);
>> - }
>> - qemu_mutex_unlock(&comp_param[idx].mutex);
>> +static void compress_threads_save_cleanup(void)
>> +{
>> + if (!compress_threads) {
>> + return;
>> }
>> +
>> + threaded_workqueue_destroy(compress_threads);
>> + compress_threads = NULL;
>> }
>>
>> -static inline void set_compress_params(CompressParam *param, RAMBlock *block,
>> - ram_addr_t offset)
>> +static int compress_threads_save_setup(void)
>> {
>> - param->block = block;
>> - param->offset = offset;
>> + if (!migrate_use_compression()) {
>> + return 0;
>> + }
>> +
>> + compress_threads = threaded_workqueue_create("compress",
>> + migrate_compress_threads(),
>> + DEFAULT_THREAD_REQUEST_NR, &compress_ops);
>> + return compress_threads ? 0 : -1;
>> }
>>
>> static int compress_page_with_multi_thread(RAMState *rs, RAMBlock *block,
>> ram_addr_t offset)
>> {
>> - int idx, thread_count, bytes_xmit = -1, pages = -1;
>> + CompressData *cd;
>> bool wait = migrate_compress_wait_thread();
>>
>> - thread_count = migrate_compress_threads();
>> - qemu_mutex_lock(&comp_done_lock);
>> retry:
>> - for (idx = 0; idx < thread_count; idx++) {
>> - if (comp_param[idx].done) {
>> - comp_param[idx].done = false;
>> - bytes_xmit = qemu_put_qemu_file(rs->f, comp_param[idx].file);
>> - qemu_mutex_lock(&comp_param[idx].mutex);
>> - set_compress_params(&comp_param[idx], block, offset);
>> - qemu_cond_signal(&comp_param[idx].cond);
>> - qemu_mutex_unlock(&comp_param[idx].mutex);
>> - pages = 1;
>> - update_compress_thread_counts(&comp_param[idx], bytes_xmit);
>> - break;
>> + cd = threaded_workqueue_get_request(compress_threads);
>> + if (!cd) {
>> + /*
>> + * wait for the free thread if the user specifies
>> + * 'compress-wait-thread', otherwise we will post
>> + * the page out in the main thread as normal page.
>> + */
>> + if (wait) {
>> + cpu_relax();
>> + goto retry;
>
> Is there nothing better we can use to wait without eating CPU time?
There is a mechanism to wait without eating CPU time in the data
structure, but it makes sense to busy wait. There are 4 threads in the
workqueue, so you have to compare 1/4th of the time spent compressing a
page, with the trip into the kernel to wake you up. You're adding 20%
CPU usage, but I'm not surprised it's worthwhile.
Paolo
* Paolo Bonzini (pbonzini@redhat.com) wrote:
> On 23/11/18 19:17, Dr. David Alan Gilbert wrote:
> > * guangrong.xiao@gmail.com (guangrong.xiao@gmail.com) wrote:
> >> From: Xiao Guangrong <xiaoguangrong@tencent.com>
> >>
> >> Adapt the compression code to the threaded workqueue
> >>
> >> Signed-off-by: Xiao Guangrong <xiaoguangrong@tencent.com>
> >> ---
> >> migration/ram.c | 308 ++++++++++++++++++++------------------------------------
> >> 1 file changed, 110 insertions(+), 198 deletions(-)
> >>
> >> diff --git a/migration/ram.c b/migration/ram.c
> >> index 7e7deec4d8..254c08f27b 100644
> >> --- a/migration/ram.c
> >> +++ b/migration/ram.c
> >> @@ -57,6 +57,7 @@
> >> #include "qemu/uuid.h"
> >> #include "savevm.h"
> >> #include "qemu/iov.h"
> >> +#include "qemu/threaded-workqueue.h"
> >>
> >> /***********************************************************/
> >> /* ram save/restore */
> >> @@ -349,22 +350,6 @@ typedef struct PageSearchStatus PageSearchStatus;
> >>
> >> CompressionStats compression_counters;
> >>
> >> -struct CompressParam {
> >> - bool done;
> >> - bool quit;
> >> - bool zero_page;
> >> - QEMUFile *file;
> >> - QemuMutex mutex;
> >> - QemuCond cond;
> >> - RAMBlock *block;
> >> - ram_addr_t offset;
> >> -
> >> - /* internally used fields */
> >> - z_stream stream;
> >> - uint8_t *originbuf;
> >> -};
> >> -typedef struct CompressParam CompressParam;
> >> -
> >> struct DecompressParam {
> >> bool done;
> >> bool quit;
> >> @@ -377,15 +362,6 @@ struct DecompressParam {
> >> };
> >> typedef struct DecompressParam DecompressParam;
> >>
> >> -static CompressParam *comp_param;
> >> -static QemuThread *compress_threads;
> >> -/* comp_done_cond is used to wake up the migration thread when
> >> - * one of the compression threads has finished the compression.
> >> - * comp_done_lock is used to co-work with comp_done_cond.
> >> - */
> >> -static QemuMutex comp_done_lock;
> >> -static QemuCond comp_done_cond;
> >> -/* The empty QEMUFileOps will be used by file in CompressParam */
> >> static const QEMUFileOps empty_ops = { };
> >>
> >> static QEMUFile *decomp_file;
> >> @@ -394,125 +370,6 @@ static QemuThread *decompress_threads;
> >> static QemuMutex decomp_done_lock;
> >> static QemuCond decomp_done_cond;
> >>
> >> -static bool do_compress_ram_page(QEMUFile *f, z_stream *stream, RAMBlock *block,
> >> - ram_addr_t offset, uint8_t *source_buf);
> >> -
> >> -static void *do_data_compress(void *opaque)
> >> -{
> >> - CompressParam *param = opaque;
> >> - RAMBlock *block;
> >> - ram_addr_t offset;
> >> - bool zero_page;
> >> -
> >> - qemu_mutex_lock(¶m->mutex);
> >> - while (!param->quit) {
> >> - if (param->block) {
> >> - block = param->block;
> >> - offset = param->offset;
> >> - param->block = NULL;
> >> - qemu_mutex_unlock(¶m->mutex);
> >> -
> >> - zero_page = do_compress_ram_page(param->file, ¶m->stream,
> >> - block, offset, param->originbuf);
> >> -
> >> - qemu_mutex_lock(&comp_done_lock);
> >> - param->done = true;
> >> - param->zero_page = zero_page;
> >> - qemu_cond_signal(&comp_done_cond);
> >> - qemu_mutex_unlock(&comp_done_lock);
> >> -
> >> - qemu_mutex_lock(¶m->mutex);
> >> - } else {
> >> - qemu_cond_wait(¶m->cond, ¶m->mutex);
> >> - }
> >> - }
> >> - qemu_mutex_unlock(¶m->mutex);
> >> -
> >> - return NULL;
> >> -}
> >> -
> >> -static void compress_threads_save_cleanup(void)
> >> -{
> >> - int i, thread_count;
> >> -
> >> - if (!migrate_use_compression() || !comp_param) {
> >> - return;
> >> - }
> >> -
> >> - thread_count = migrate_compress_threads();
> >> - for (i = 0; i < thread_count; i++) {
> >> - /*
> >> - * we use it as a indicator which shows if the thread is
> >> - * properly init'd or not
> >> - */
> >> - if (!comp_param[i].file) {
> >> - break;
> >> - }
> >> -
> >> - qemu_mutex_lock(&comp_param[i].mutex);
> >> - comp_param[i].quit = true;
> >> - qemu_cond_signal(&comp_param[i].cond);
> >> - qemu_mutex_unlock(&comp_param[i].mutex);
> >> -
> >> - qemu_thread_join(compress_threads + i);
> >> - qemu_mutex_destroy(&comp_param[i].mutex);
> >> - qemu_cond_destroy(&comp_param[i].cond);
> >> - deflateEnd(&comp_param[i].stream);
> >> - g_free(comp_param[i].originbuf);
> >> - qemu_fclose(comp_param[i].file);
> >> - comp_param[i].file = NULL;
> >> - }
> >> - qemu_mutex_destroy(&comp_done_lock);
> >> - qemu_cond_destroy(&comp_done_cond);
> >> - g_free(compress_threads);
> >> - g_free(comp_param);
> >> - compress_threads = NULL;
> >> - comp_param = NULL;
> >> -}
> >> -
> >> -static int compress_threads_save_setup(void)
> >> -{
> >> - int i, thread_count;
> >> -
> >> - if (!migrate_use_compression()) {
> >> - return 0;
> >> - }
> >> - thread_count = migrate_compress_threads();
> >> - compress_threads = g_new0(QemuThread, thread_count);
> >> - comp_param = g_new0(CompressParam, thread_count);
> >> - qemu_cond_init(&comp_done_cond);
> >> - qemu_mutex_init(&comp_done_lock);
> >> - for (i = 0; i < thread_count; i++) {
> >> - comp_param[i].originbuf = g_try_malloc(TARGET_PAGE_SIZE);
> >> - if (!comp_param[i].originbuf) {
> >> - goto exit;
> >> - }
> >> -
> >> - if (deflateInit(&comp_param[i].stream,
> >> - migrate_compress_level()) != Z_OK) {
> >> - g_free(comp_param[i].originbuf);
> >> - goto exit;
> >> - }
> >> -
> >> - /* comp_param[i].file is just used as a dummy buffer to save data,
> >> - * set its ops to empty.
> >> - */
> >> - comp_param[i].file = qemu_fopen_ops(NULL, &empty_ops);
> >> - comp_param[i].done = true;
> >> - comp_param[i].quit = false;
> >> - qemu_mutex_init(&comp_param[i].mutex);
> >> - qemu_cond_init(&comp_param[i].cond);
> >> - qemu_thread_create(compress_threads + i, "compress",
> >> - do_data_compress, comp_param + i,
> >> - QEMU_THREAD_JOINABLE);
> >> - }
> >> - return 0;
> >> -
> >> -exit:
> >> - compress_threads_save_cleanup();
> >> - return -1;
> >> -}
> >> -
> >> /* Multiple fd's */
> >>
> >> #define MULTIFD_MAGIC 0x11223344U
> >> @@ -1909,12 +1766,25 @@ exit:
> >> return zero_page;
> >> }
> >>
> >> +struct CompressData {
> >> + /* filled by migration thread.*/
> >> + RAMBlock *block;
> >> + ram_addr_t offset;
> >> +
> >> + /* filled by compress thread. */
> >> + QEMUFile *file;
> >> + z_stream stream;
> >> + uint8_t *originbuf;
> >> + bool zero_page;
> >> +};
> >> +typedef struct CompressData CompressData;
> >> +
> >> static void
> >> -update_compress_thread_counts(const CompressParam *param, int bytes_xmit)
> >> +update_compress_thread_counts(CompressData *cd, int bytes_xmit)
> >
> > Keep the const?
> >> {
> >> ram_counters.transferred += bytes_xmit;
> >>
> >> - if (param->zero_page) {
> >> + if (cd->zero_page) {
> >> ram_counters.duplicate++;
> >> return;
> >> }
> >> @@ -1924,81 +1794,123 @@ update_compress_thread_counts(const CompressParam *param, int bytes_xmit)
> >> compression_counters.pages++;
> >> }
> >>
> >> +static int compress_thread_data_init(void *request)
> >> +{
> >> + CompressData *cd = request;
> >> +
> >> + cd->originbuf = g_try_malloc(TARGET_PAGE_SIZE);
> >> + if (!cd->originbuf) {
> >> + return -1;
> >> + }
> >> +
> >> + if (deflateInit(&cd->stream, migrate_compress_level()) != Z_OK) {
> >> + g_free(cd->originbuf);
> >> + return -1;
> >> + }
> >
> > Please print errors if you fail in any case so we can easily tell what
> > happened.
> >
> >> + cd->file = qemu_fopen_ops(NULL, &empty_ops);
> >> + return 0;
> >> +}
> >> +
> >> +static void compress_thread_data_fini(void *request)
> >> +{
> >> + CompressData *cd = request;
> >> +
> >> + qemu_fclose(cd->file);
> >> + deflateEnd(&cd->stream);
> >> + g_free(cd->originbuf);
> >> +}
> >> +
> >> +static void compress_thread_data_handler(void *request)
> >> +{
> >> + CompressData *cd = request;
> >> +
> >> + /*
> >> + * if compression fails, it will be indicated by
> >> + * migrate_get_current()->to_dst_file.
> >> + */
> >> + cd->zero_page = do_compress_ram_page(cd->file, &cd->stream, cd->block,
> >> + cd->offset, cd->originbuf);
> >> +}
> >> +
> >> +static void compress_thread_data_done(void *request)
> >> +{
> >> + CompressData *cd = request;
> >> + RAMState *rs = ram_state;
> >> + int bytes_xmit;
> >> +
> >> + bytes_xmit = qemu_put_qemu_file(rs->f, cd->file);
> >> + update_compress_thread_counts(cd, bytes_xmit);
> >> +}
> >> +
> >> +static const ThreadedWorkqueueOps compress_ops = {
> >> + .thread_request_init = compress_thread_data_init,
> >> + .thread_request_uninit = compress_thread_data_fini,
> >> + .thread_request_handler = compress_thread_data_handler,
> >> + .thread_request_done = compress_thread_data_done,
> >> + .request_size = sizeof(CompressData),
> >> +};
> >> +
> >> +static Threads *compress_threads;
> >> +
> >> static bool save_page_use_compression(RAMState *rs);
> >>
> >> static void flush_compressed_data(RAMState *rs)
> >> {
> >> - int idx, len, thread_count;
> >> -
> >> if (!save_page_use_compression(rs)) {
> >> return;
> >> }
> >> - thread_count = migrate_compress_threads();
> >>
> >> - qemu_mutex_lock(&comp_done_lock);
> >> - for (idx = 0; idx < thread_count; idx++) {
> >> - while (!comp_param[idx].done) {
> >> - qemu_cond_wait(&comp_done_cond, &comp_done_lock);
> >> - }
> >> - }
> >> - qemu_mutex_unlock(&comp_done_lock);
> >> + threaded_workqueue_wait_for_requests(compress_threads);
> >> +}
> >>
> >> - for (idx = 0; idx < thread_count; idx++) {
> >> - qemu_mutex_lock(&comp_param[idx].mutex);
> >> - if (!comp_param[idx].quit) {
> >> - len = qemu_put_qemu_file(rs->f, comp_param[idx].file);
> >> - /*
> >> - * it's safe to fetch zero_page without holding comp_done_lock
> >> - * as there is no further request submitted to the thread,
> >> - * i.e, the thread should be waiting for a request at this point.
> >> - */
> >> - update_compress_thread_counts(&comp_param[idx], len);
> >> - }
> >> - qemu_mutex_unlock(&comp_param[idx].mutex);
> >> +static void compress_threads_save_cleanup(void)
> >> +{
> >> + if (!compress_threads) {
> >> + return;
> >> }
> >> +
> >> + threaded_workqueue_destroy(compress_threads);
> >> + compress_threads = NULL;
> >> }
> >>
> >> -static inline void set_compress_params(CompressParam *param, RAMBlock *block,
> >> - ram_addr_t offset)
> >> +static int compress_threads_save_setup(void)
> >> {
> >> - param->block = block;
> >> - param->offset = offset;
> >> + if (!migrate_use_compression()) {
> >> + return 0;
> >> + }
> >> +
> >> + compress_threads = threaded_workqueue_create("compress",
> >> + migrate_compress_threads(),
> >> + DEFAULT_THREAD_REQUEST_NR, &compress_ops);
> >> + return compress_threads ? 0 : -1;
> >> }
> >>
> >> static int compress_page_with_multi_thread(RAMState *rs, RAMBlock *block,
> >> ram_addr_t offset)
> >> {
> >> - int idx, thread_count, bytes_xmit = -1, pages = -1;
> >> + CompressData *cd;
> >> bool wait = migrate_compress_wait_thread();
> >>
> >> - thread_count = migrate_compress_threads();
> >> - qemu_mutex_lock(&comp_done_lock);
> >> retry:
> >> - for (idx = 0; idx < thread_count; idx++) {
> >> - if (comp_param[idx].done) {
> >> - comp_param[idx].done = false;
> >> - bytes_xmit = qemu_put_qemu_file(rs->f, comp_param[idx].file);
> >> - qemu_mutex_lock(&comp_param[idx].mutex);
> >> - set_compress_params(&comp_param[idx], block, offset);
> >> - qemu_cond_signal(&comp_param[idx].cond);
> >> - qemu_mutex_unlock(&comp_param[idx].mutex);
> >> - pages = 1;
> >> - update_compress_thread_counts(&comp_param[idx], bytes_xmit);
> >> - break;
> >> + cd = threaded_workqueue_get_request(compress_threads);
> >> + if (!cd) {
> >> + /*
> >> + * wait for the free thread if the user specifies
> >> + * 'compress-wait-thread', otherwise we will post
> >> + * the page out in the main thread as normal page.
> >> + */
> >> + if (wait) {
> >> + cpu_relax();
> >> + goto retry;
> >
> > Is there nothing better we can use to wait without eating CPU time?
>
> There is a mechanism to wait without eating CPU time in the data
> structure, but it makes sense to busy wait. There are 4 threads in the
> workqueue, so you have to compare 1/4th of the time spent compressing a
> page, with the trip into the kernel to wake you up. You're adding 20%
> CPU usage, but I'm not surprised it's worthwhile.
Hmm OK; in that case it does at least need a comment because it's a bit
odd, and we should watch out how that scales - I guess it's less of
an overhead the more threads you use.
Dave
> Paolo
>
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
On 11/24/18 2:29 AM, Dr. David Alan Gilbert wrote:
>>>> static void
>>>> -update_compress_thread_counts(const CompressParam *param, int bytes_xmit)
>>>> +update_compress_thread_counts(CompressData *cd, int bytes_xmit)
>>>
>>> Keep the const?
Yes, indeed. Will correct it in the next version.
>>>> + if (deflateInit(&cd->stream, migrate_compress_level()) != Z_OK) {
>>>> + g_free(cd->originbuf);
>>>> + return -1;
>>>> + }
>>>
>>> Please print errors if you fail in any case so we can easily tell what
>>> happened.
Sure, will do.
>>>> + if (wait) {
>>>> + cpu_relax();
>>>> + goto retry;
>>>
>>> Is there nothing better we can use to wait without eating CPU time?
>>
>> There is a mechanism to wait without eating CPU time in the data
>> structure, but it makes sense to busy wait. There are 4 threads in the
>> workqueue, so you have to compare 1/4th of the time spent compressing a
>> page, with the trip into the kernel to wake you up. You're adding 20%
>> CPU usage, but I'm not surprised it's worthwhile.
>
> Hmm OK; in that case it does at least need a comment because it's a bit
> odd, and we should watch out how that scales - I guess it's less of
> an overhead the more threads you use.
>
Sure, will add some comments to explain the purpose.
© 2016 - 2025 Red Hat, Inc.