[Qemu-devel] [PATCH v2 03/10] migration: stop decompression to allocate and free memory frequently

guangrong.xiao@gmail.com posted 10 patches 7 years, 7 months ago
There is a newer version of this series
[Qemu-devel] [PATCH v2 03/10] migration: stop decompression to allocate and free memory frequently
Posted by guangrong.xiao@gmail.com 7 years, 7 months ago
From: Xiao Guangrong <xiaoguangrong@tencent.com>

Current code uses uncompress() to decompress memory which manages
memory internally, that causes huge memory is allocated and freed
very frequently, more worse, frequently returning memory to kernel
will flush TLBs

So, we maintain the memory by ourselves and reuse it for each
decompression

Reviewed-by: Jiang Biao <jiang.biao2@zte.com.cn>
Signed-off-by: Xiao Guangrong <xiaoguangrong@tencent.com>
---
 migration/ram.c | 110 ++++++++++++++++++++++++++++++++++++++++----------------
 1 file changed, 80 insertions(+), 30 deletions(-)

diff --git a/migration/ram.c b/migration/ram.c
index e043a192e1..6b699650ca 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -281,6 +281,7 @@ struct DecompressParam {
     void *des;
     uint8_t *compbuf;
     int len;
+    z_stream stream;
 };
 typedef struct DecompressParam DecompressParam;
 
@@ -2526,6 +2527,31 @@ void ram_handle_compressed(void *host, uint8_t ch, uint64_t size)
     }
 }
 
+/* return the size after decompression, or negative value on error */
+static int
+qemu_uncompress_data(z_stream *stream, uint8_t *dest, size_t dest_len,
+                     const uint8_t *source, size_t source_len)
+{
+    int err;
+
+    err = inflateReset(stream);
+    if (err != Z_OK) {
+        return -1;
+    }
+
+    stream->avail_in = source_len;
+    stream->next_in = (uint8_t *)source;
+    stream->avail_out = dest_len;
+    stream->next_out = dest;
+
+    err = inflate(stream, Z_NO_FLUSH);
+    if (err != Z_STREAM_END) {
+        return -1;
+    }
+
+    return stream->total_out;
+}
+
 static void *do_data_decompress(void *opaque)
 {
     DecompressParam *param = opaque;
@@ -2542,13 +2568,13 @@ static void *do_data_decompress(void *opaque)
             qemu_mutex_unlock(&param->mutex);
 
             pagesize = TARGET_PAGE_SIZE;
-            /* uncompress() will return failed in some case, especially
-             * when the page is dirted when doing the compression, it's
-             * not a problem because the dirty page will be retransferred
+            /* qemu_uncompress_data() will return failed in some case,
+             * especially when the page is dirtied when doing the compression,
+             * it's not a problem because the dirty page will be retransferred
              * and uncompress() won't break the data in other pages.
              */
-            uncompress((Bytef *)des, &pagesize,
-                       (const Bytef *)param->compbuf, len);
+            qemu_uncompress_data(&param->stream, des, pagesize, param->compbuf,
+                                 len);
 
             qemu_mutex_lock(&decomp_done_lock);
             param->done = true;
@@ -2583,30 +2609,6 @@ static void wait_for_decompress_done(void)
     qemu_mutex_unlock(&decomp_done_lock);
 }
 
-static void compress_threads_load_setup(void)
-{
-    int i, thread_count;
-
-    if (!migrate_use_compression()) {
-        return;
-    }
-    thread_count = migrate_decompress_threads();
-    decompress_threads = g_new0(QemuThread, thread_count);
-    decomp_param = g_new0(DecompressParam, thread_count);
-    qemu_mutex_init(&decomp_done_lock);
-    qemu_cond_init(&decomp_done_cond);
-    for (i = 0; i < thread_count; i++) {
-        qemu_mutex_init(&decomp_param[i].mutex);
-        qemu_cond_init(&decomp_param[i].cond);
-        decomp_param[i].compbuf = g_malloc0(compressBound(TARGET_PAGE_SIZE));
-        decomp_param[i].done = true;
-        decomp_param[i].quit = false;
-        qemu_thread_create(decompress_threads + i, "decompress",
-                           do_data_decompress, decomp_param + i,
-                           QEMU_THREAD_JOINABLE);
-    }
-}
-
 static void compress_threads_load_cleanup(void)
 {
     int i, thread_count;
@@ -2616,16 +2618,27 @@ static void compress_threads_load_cleanup(void)
     }
     thread_count = migrate_decompress_threads();
     for (i = 0; i < thread_count; i++) {
+        /* see the comments in compress_threads_save_cleanup() */
+        if (!decomp_param[i].stream.opaque) {
+            break;
+        }
+
         qemu_mutex_lock(&decomp_param[i].mutex);
         decomp_param[i].quit = true;
         qemu_cond_signal(&decomp_param[i].cond);
         qemu_mutex_unlock(&decomp_param[i].mutex);
     }
     for (i = 0; i < thread_count; i++) {
+        if (!decomp_param[i].stream.opaque) {
+            break;
+        }
+
         qemu_thread_join(decompress_threads + i);
         qemu_mutex_destroy(&decomp_param[i].mutex);
         qemu_cond_destroy(&decomp_param[i].cond);
         g_free(decomp_param[i].compbuf);
+        inflateEnd(&decomp_param[i].stream);
+        decomp_param[i].stream.opaque = NULL;
     }
     g_free(decompress_threads);
     g_free(decomp_param);
@@ -2633,6 +2646,40 @@ static void compress_threads_load_cleanup(void)
     decomp_param = NULL;
 }
 
+static int compress_threads_load_setup(void)
+{
+    int i, thread_count;
+
+    if (!migrate_use_compression()) {
+        return 0;
+    }
+
+    thread_count = migrate_decompress_threads();
+    decompress_threads = g_new0(QemuThread, thread_count);
+    decomp_param = g_new0(DecompressParam, thread_count);
+    qemu_mutex_init(&decomp_done_lock);
+    qemu_cond_init(&decomp_done_cond);
+    for (i = 0; i < thread_count; i++) {
+        if (inflateInit(&decomp_param[i].stream) != Z_OK) {
+            goto exit;
+        }
+        decomp_param[i].stream.opaque = &decomp_param[i];
+
+        qemu_mutex_init(&decomp_param[i].mutex);
+        qemu_cond_init(&decomp_param[i].cond);
+        decomp_param[i].compbuf = g_malloc0(compressBound(TARGET_PAGE_SIZE));
+        decomp_param[i].done = true;
+        decomp_param[i].quit = false;
+        qemu_thread_create(decompress_threads + i, "decompress",
+                           do_data_decompress, decomp_param + i,
+                           QEMU_THREAD_JOINABLE);
+    }
+    return 0;
+exit:
+    compress_threads_load_cleanup();
+    return -1;
+}
+
 static void decompress_data_with_multi_threads(QEMUFile *f,
                                                void *host, int len)
 {
@@ -2672,8 +2719,11 @@ static void decompress_data_with_multi_threads(QEMUFile *f,
  */
 static int ram_load_setup(QEMUFile *f, void *opaque)
 {
+    if (compress_threads_load_setup()) {
+        return -1;
+    }
+
     xbzrle_load_setup();
-    compress_threads_load_setup();
     ramblock_recv_map_init();
     return 0;
 }
-- 
2.14.3


Re: [Qemu-devel] [PATCH v2 03/10] migration: stop decompression to allocate and free memory frequently
Posted by Peter Xu 7 years, 7 months ago
On Tue, Mar 27, 2018 at 05:10:36PM +0800, guangrong.xiao@gmail.com wrote:

[...]

> +static int compress_threads_load_setup(void)
> +{
> +    int i, thread_count;
> +
> +    if (!migrate_use_compression()) {
> +        return 0;
> +    }
> +
> +    thread_count = migrate_decompress_threads();
> +    decompress_threads = g_new0(QemuThread, thread_count);
> +    decomp_param = g_new0(DecompressParam, thread_count);
> +    qemu_mutex_init(&decomp_done_lock);
> +    qemu_cond_init(&decomp_done_cond);
> +    for (i = 0; i < thread_count; i++) {
> +        if (inflateInit(&decomp_param[i].stream) != Z_OK) {
> +            goto exit;
> +        }
> +        decomp_param[i].stream.opaque = &decomp_param[i];

Same question as the encoding patch here, otherwise looks good to me.

Thanks,

-- 
Peter Xu

Re: [Qemu-devel] [PATCH v2 03/10] migration: stop decompression to allocate and free memory frequently
Posted by Xiao Guangrong 7 years, 7 months ago

On 03/28/2018 05:42 PM, Peter Xu wrote:
> On Tue, Mar 27, 2018 at 05:10:36PM +0800, guangrong.xiao@gmail.com wrote:
> 
> [...]
> 
>> +static int compress_threads_load_setup(void)
>> +{
>> +    int i, thread_count;
>> +
>> +    if (!migrate_use_compression()) {
>> +        return 0;
>> +    }
>> +
>> +    thread_count = migrate_decompress_threads();
>> +    decompress_threads = g_new0(QemuThread, thread_count);
>> +    decomp_param = g_new0(DecompressParam, thread_count);
>> +    qemu_mutex_init(&decomp_done_lock);
>> +    qemu_cond_init(&decomp_done_cond);
>> +    for (i = 0; i < thread_count; i++) {
>> +        if (inflateInit(&decomp_param[i].stream) != Z_OK) {
>> +            goto exit;
>> +        }
>> +        decomp_param[i].stream.opaque = &decomp_param[i];
> 
> Same question as the encoding patch here, otherwise looks good to me.

Thanks for you pointed out, will fix.

Hmm, can i treat it as your Reviewed-by for the next version?

Re: [Qemu-devel] [PATCH v2 03/10] migration: stop decompression to allocate and free memory frequently
Posted by Peter Xu 7 years, 7 months ago
On Thu, Mar 29, 2018 at 11:43:07AM +0800, Xiao Guangrong wrote:
> 
> 
> On 03/28/2018 05:42 PM, Peter Xu wrote:
> > On Tue, Mar 27, 2018 at 05:10:36PM +0800, guangrong.xiao@gmail.com wrote:
> > 
> > [...]
> > 
> > > +static int compress_threads_load_setup(void)
> > > +{
> > > +    int i, thread_count;
> > > +
> > > +    if (!migrate_use_compression()) {
> > > +        return 0;
> > > +    }
> > > +
> > > +    thread_count = migrate_decompress_threads();
> > > +    decompress_threads = g_new0(QemuThread, thread_count);
> > > +    decomp_param = g_new0(DecompressParam, thread_count);
> > > +    qemu_mutex_init(&decomp_done_lock);
> > > +    qemu_cond_init(&decomp_done_cond);
> > > +    for (i = 0; i < thread_count; i++) {
> > > +        if (inflateInit(&decomp_param[i].stream) != Z_OK) {
> > > +            goto exit;
> > > +        }
> > > +        decomp_param[i].stream.opaque = &decomp_param[i];
> > 
> > Same question as the encoding patch here, otherwise looks good to me.
> 
> Thanks for you pointed out, will fix.
> 
> Hmm, can i treat it as your Reviewed-by for the next version?

Yes :), as long as we drop the usage of zstream.opaque and use any
existing fields.

And also for the previous patch too, since they are mostly the same.

Thanks,

-- 
Peter Xu