This is still a work in progress, but get everything sent as expected
and it is faster than the code that is already there.
Signed-off-by: Juan Quintela <quintela@redhat.com>
---
migration/ram.c | 106 +++++++++++++++++++++++++++++++++++++++++++++++-
1 file changed, 104 insertions(+), 2 deletions(-)
diff --git a/migration/ram.c b/migration/ram.c
index fdb5bf07a5..efbb253c1a 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -747,6 +747,100 @@ MultifdMethods multifd_none_ops = {
.recv_pages = none_recv_pages
};
+/* Multifd zlib compression */
+
+static int zlib_send_prepare(MultiFDSendParams *p, uint32_t used)
+{
+ struct iovec *iov = p->pages->iov;
+ z_stream *zs = &p->zs;
+ uint32_t out_size = 0;
+ int ret;
+ int i;
+
+ for (i = 0; i < used; i++) {
+ uint32_t available = p->zbuff_len - out_size;
+ int flush = Z_NO_FLUSH;
+
+ if (i == used - 1) {
+ flush = Z_SYNC_FLUSH;
+ }
+
+ zs->avail_in = iov[i].iov_len;
+ zs->next_in = iov[i].iov_base;
+
+ zs->avail_out = available;
+ zs->next_out = p->zbuff + out_size;
+
+ ret = deflate(zs, flush);
+ if (ret != Z_OK) {
+ printf("problem with deflate? %d\n", ret);
+ qemu_mutex_unlock(&p->mutex);
+ return -1;
+ }
+ out_size += available - zs->avail_out;
+ }
+ p->next_packet_size = out_size;
+
+ return 0;
+}
+
+static int zlib_send_write(MultiFDSendParams *p, uint32_t used, Error **perr)
+{
+ return qio_channel_write_all(p->c, (void *)p->zbuff, p->next_packet_size,
+ perr);
+}
+
+static int zlib_recv_pages(MultiFDRecvParams *p, uint32_t used, Error **perr)
+{
+ uint32_t in_size = p->next_packet_size;
+ uint32_t out_size = 0;
+ uint32_t expected_size = used * qemu_target_page_size();
+ z_stream *zs = &p->zs;
+ int ret;
+ int i;
+
+ ret = qio_channel_read_all(p->c, (void *)p->zbuff, in_size, perr);
+
+ if (ret != 0) {
+ return ret;
+ }
+
+ zs->avail_in = in_size;
+ zs->next_in = p->zbuff;
+
+ for (i = 0; i < used; i++) {
+ struct iovec *iov = &p->pages->iov[i];
+ int flush = Z_NO_FLUSH;
+
+ if (i == used - 1) {
+ flush = Z_SYNC_FLUSH;
+ }
+
+ zs->avail_out = iov->iov_len;
+ zs->next_out = iov->iov_base;
+
+ ret = inflate(zs, flush);
+ if (ret != Z_OK) {
+ printf("%d: problem with inflate? %d\n", p->id, ret);
+ qemu_mutex_unlock(&p->mutex);
+ return ret;
+ }
+ out_size += iov->iov_len;
+ }
+ if (out_size != expected_size) {
+ printf("out size %d expected size %d\n",
+ out_size, expected_size);
+ return -1;
+ }
+ return 0;
+}
+
+MultifdMethods multifd_zlib_ops = {
+ .send_prepare = zlib_send_prepare,
+ .send_write = zlib_send_write,
+ .recv_pages = zlib_recv_pages
+};
+
static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
{
MultiFDInit_t msg;
@@ -1145,7 +1239,11 @@ static void *multifd_send_thread(void *opaque)
/* initial packet */
p->num_packets = 1;
- multifd_send_state->ops = &multifd_none_ops;
+ if (migrate_use_multifd_zlib()) {
+ multifd_send_state->ops = &multifd_zlib_ops;
+ } else {
+ multifd_send_state->ops = &multifd_none_ops;
+ }
while (true) {
qemu_sem_wait(&p->sem);
@@ -1399,7 +1497,11 @@ static void *multifd_recv_thread(void *opaque)
trace_multifd_recv_thread_start(p->id);
rcu_register_thread();
- multifd_recv_state->ops = &multifd_none_ops;
+ if (migrate_use_multifd_zlib()) {
+ multifd_recv_state->ops = &multifd_zlib_ops;
+ } else {
+ multifd_recv_state->ops = &multifd_none_ops;
+ }
while (true) {
uint32_t used;
uint32_t flags;
--
2.21.0
On Wed, May 15, 2019 at 02:15:44PM +0200, Juan Quintela wrote:
>This is still a work in progress, but get everything sent as expected
>and it is faster than the code that is already there.
Generally, I prefer to merge this one with previous one.
>
>Signed-off-by: Juan Quintela <quintela@redhat.com>
>---
> migration/ram.c | 106 +++++++++++++++++++++++++++++++++++++++++++++++-
> 1 file changed, 104 insertions(+), 2 deletions(-)
>
>diff --git a/migration/ram.c b/migration/ram.c
>index fdb5bf07a5..efbb253c1a 100644
>--- a/migration/ram.c
>+++ b/migration/ram.c
>@@ -747,6 +747,100 @@ MultifdMethods multifd_none_ops = {
> .recv_pages = none_recv_pages
> };
>
>+/* Multifd zlib compression */
>+
>+static int zlib_send_prepare(MultiFDSendParams *p, uint32_t used)
>+{
>+ struct iovec *iov = p->pages->iov;
>+ z_stream *zs = &p->zs;
>+ uint32_t out_size = 0;
>+ int ret;
>+ int i;
>+
>+ for (i = 0; i < used; i++) {
>+ uint32_t available = p->zbuff_len - out_size;
>+ int flush = Z_NO_FLUSH;
>+
>+ if (i == used - 1) {
>+ flush = Z_SYNC_FLUSH;
>+ }
>+
>+ zs->avail_in = iov[i].iov_len;
>+ zs->next_in = iov[i].iov_base;
>+
>+ zs->avail_out = available;
>+ zs->next_out = p->zbuff + out_size;
>+
>+ ret = deflate(zs, flush);
>+ if (ret != Z_OK) {
>+ printf("problem with deflate? %d\n", ret);
>+ qemu_mutex_unlock(&p->mutex);
>+ return -1;
>+ }
>+ out_size += available - zs->avail_out;
>+ }
>+ p->next_packet_size = out_size;
>+
>+ return 0;
>+}
>+
>+static int zlib_send_write(MultiFDSendParams *p, uint32_t used, Error **perr)
>+{
>+ return qio_channel_write_all(p->c, (void *)p->zbuff, p->next_packet_size,
>+ perr);
>+}
>+
>+static int zlib_recv_pages(MultiFDRecvParams *p, uint32_t used, Error **perr)
>+{
>+ uint32_t in_size = p->next_packet_size;
>+ uint32_t out_size = 0;
>+ uint32_t expected_size = used * qemu_target_page_size();
>+ z_stream *zs = &p->zs;
>+ int ret;
>+ int i;
>+
>+ ret = qio_channel_read_all(p->c, (void *)p->zbuff, in_size, perr);
>+
>+ if (ret != 0) {
>+ return ret;
>+ }
>+
>+ zs->avail_in = in_size;
>+ zs->next_in = p->zbuff;
>+
>+ for (i = 0; i < used; i++) {
>+ struct iovec *iov = &p->pages->iov[i];
>+ int flush = Z_NO_FLUSH;
>+
>+ if (i == used - 1) {
>+ flush = Z_SYNC_FLUSH;
>+ }
>+
>+ zs->avail_out = iov->iov_len;
>+ zs->next_out = iov->iov_base;
>+
>+ ret = inflate(zs, flush);
>+ if (ret != Z_OK) {
>+ printf("%d: problem with inflate? %d\n", p->id, ret);
>+ qemu_mutex_unlock(&p->mutex);
>+ return ret;
>+ }
>+ out_size += iov->iov_len;
>+ }
>+ if (out_size != expected_size) {
>+ printf("out size %d expected size %d\n",
>+ out_size, expected_size);
>+ return -1;
>+ }
>+ return 0;
>+}
>+
>+MultifdMethods multifd_zlib_ops = {
>+ .send_prepare = zlib_send_prepare,
>+ .send_write = zlib_send_write,
>+ .recv_pages = zlib_recv_pages
>+};
>+
> static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
> {
> MultiFDInit_t msg;
>@@ -1145,7 +1239,11 @@ static void *multifd_send_thread(void *opaque)
> /* initial packet */
> p->num_packets = 1;
>
>- multifd_send_state->ops = &multifd_none_ops;
>+ if (migrate_use_multifd_zlib()) {
>+ multifd_send_state->ops = &multifd_zlib_ops;
>+ } else {
>+ multifd_send_state->ops = &multifd_none_ops;
>+ }
Again, to manipulate a global variable in each thread is not a good idea.
This would be better to use an array to assign ops instead of *if*. In case
you would have several compress methods, the code would be difficult to read.
>
> while (true) {
> qemu_sem_wait(&p->sem);
>@@ -1399,7 +1497,11 @@ static void *multifd_recv_thread(void *opaque)
> trace_multifd_recv_thread_start(p->id);
> rcu_register_thread();
>
>- multifd_recv_state->ops = &multifd_none_ops;
>+ if (migrate_use_multifd_zlib()) {
>+ multifd_recv_state->ops = &multifd_zlib_ops;
>+ } else {
>+ multifd_recv_state->ops = &multifd_none_ops;
>+ }
> while (true) {
> uint32_t used;
> uint32_t flags;
>--
>2.21.0
>
--
Wei Yang
Help you, Help me
Wei Yang <richardw.yang@linux.intel.com> wrote:
> On Wed, May 15, 2019 at 02:15:44PM +0200, Juan Quintela wrote:
>>This is still a work in progress, but get everything sent as expected
>>and it is faster than the code that is already there.
>
> Generally, I prefer to merge this one with previous one.
Done, sir O:-)
For the WIP part, it was easier to have the bits that didn't change and
the ones that I was working with.
>>@@ -1145,7 +1239,11 @@ static void *multifd_send_thread(void *opaque)
>> /* initial packet */
>> p->num_packets = 1;
>>
>>- multifd_send_state->ops = &multifd_none_ops;
>>+ if (migrate_use_multifd_zlib()) {
>>+ multifd_send_state->ops = &multifd_zlib_ops;
>>+ } else {
>>+ multifd_send_state->ops = &multifd_none_ops;
>>+ }
>
> Again, to manipulate a global variable in each thread is not a good idea.
Fixed.
> This would be better to use an array to assign ops instead of *if*. In case
> you would have several compress methods, the code would be difficult to read.
it is going to end:
if (migrate_use_multifd_zlib()) {
multifd_send_state->ops = &multifd_zlib_ops;
if (migrate_use_multifd_zstd()) {
multifd_send_state->ops = &multifd_zstd_ops;
} else {
multifd_send_state->ops = &multifd_none_ops;
}
We can use:
multifd_send_state->ops = multifd_ops[migrate_multifd_method(void)];
About what is easier to read ..... it depends on taste.
Will change anyways.
Thanks, Juan.
* Juan Quintela (quintela@redhat.com) wrote:
> This is still a work in progress, but get everything sent as expected
> and it is faster than the code that is already there.
>
> Signed-off-by: Juan Quintela <quintela@redhat.com>
> ---
> migration/ram.c | 106 +++++++++++++++++++++++++++++++++++++++++++++++-
> 1 file changed, 104 insertions(+), 2 deletions(-)
>
> diff --git a/migration/ram.c b/migration/ram.c
> index fdb5bf07a5..efbb253c1a 100644
> --- a/migration/ram.c
> +++ b/migration/ram.c
> @@ -747,6 +747,100 @@ MultifdMethods multifd_none_ops = {
> .recv_pages = none_recv_pages
> };
>
> +/* Multifd zlib compression */
> +
Comment the return value?
> +static int zlib_send_prepare(MultiFDSendParams *p, uint32_t used)
> +{
> + struct iovec *iov = p->pages->iov;
> + z_stream *zs = &p->zs;
> + uint32_t out_size = 0;
> + int ret;
> + int i;
uint32_t to match 'used' ?
> + for (i = 0; i < used; i++) {
> + uint32_t available = p->zbuff_len - out_size;
> + int flush = Z_NO_FLUSH;
> +
> + if (i == used - 1) {
> + flush = Z_SYNC_FLUSH;
> + }
> +
> + zs->avail_in = iov[i].iov_len;
> + zs->next_in = iov[i].iov_base;
> +
> + zs->avail_out = available;
> + zs->next_out = p->zbuff + out_size;
> +
> + ret = deflate(zs, flush);
> + if (ret != Z_OK) {
> + printf("problem with deflate? %d\n", ret);
If it's an error it should probably be at least an fprintf(stderr or
err_ something.
Should this also check that the avail_in/next_in has consumed the whole
of the input?
> + qemu_mutex_unlock(&p->mutex);
Can you explain and/or comment whyit's unlocked here in the error path?
> + return -1;
> + }
> + out_size += available - zs->avail_out;
> + }
> + p->next_packet_size = out_size;
Some traces_ wouldn't hurt.
> + return 0;
> +}
> +
> +static int zlib_send_write(MultiFDSendParams *p, uint32_t used, Error **perr)
> +{
> + return qio_channel_write_all(p->c, (void *)p->zbuff, p->next_packet_size,
> + perr);
> +}
> +
> +static int zlib_recv_pages(MultiFDRecvParams *p, uint32_t used, Error **perr)
> +{
> + uint32_t in_size = p->next_packet_size;
> + uint32_t out_size = 0;
> + uint32_t expected_size = used * qemu_target_page_size();
> + z_stream *zs = &p->zs;
> + int ret;
> + int i;
> +
> + ret = qio_channel_read_all(p->c, (void *)p->zbuff, in_size, perr);
> +
> + if (ret != 0) {
> + return ret;
> + }
> +
> + zs->avail_in = in_size;
> + zs->next_in = p->zbuff;
> +
> + for (i = 0; i < used; i++) {
> + struct iovec *iov = &p->pages->iov[i];
> + int flush = Z_NO_FLUSH;
> +
> + if (i == used - 1) {
> + flush = Z_SYNC_FLUSH;
> + }
> +
> + zs->avail_out = iov->iov_len;
> + zs->next_out = iov->iov_base;
> +
> + ret = inflate(zs, flush);
> + if (ret != Z_OK) {
> + printf("%d: problem with inflate? %d\n", p->id, ret);
> + qemu_mutex_unlock(&p->mutex);
> + return ret;
> + }
> + out_size += iov->iov_len;
> + }
> + if (out_size != expected_size) {
> + printf("out size %d expected size %d\n",
> + out_size, expected_size);
> + return -1;
> + }
> + return 0;
> +}
> +
> +MultifdMethods multifd_zlib_ops = {
> + .send_prepare = zlib_send_prepare,
> + .send_write = zlib_send_write,
> + .recv_pages = zlib_recv_pages
> +};
> +
> static int multifd_send_initial_packet(MultiFDSendParams *p, Error **errp)
> {
> MultiFDInit_t msg;
> @@ -1145,7 +1239,11 @@ static void *multifd_send_thread(void *opaque)
> /* initial packet */
> p->num_packets = 1;
>
> - multifd_send_state->ops = &multifd_none_ops;
> + if (migrate_use_multifd_zlib()) {
> + multifd_send_state->ops = &multifd_zlib_ops;
> + } else {
> + multifd_send_state->ops = &multifd_none_ops;
> + }
>
> while (true) {
> qemu_sem_wait(&p->sem);
> @@ -1399,7 +1497,11 @@ static void *multifd_recv_thread(void *opaque)
> trace_multifd_recv_thread_start(p->id);
> rcu_register_thread();
>
> - multifd_recv_state->ops = &multifd_none_ops;
> + if (migrate_use_multifd_zlib()) {
> + multifd_recv_state->ops = &multifd_zlib_ops;
> + } else {
> + multifd_recv_state->ops = &multifd_none_ops;
> + }
> while (true) {
> uint32_t used;
> uint32_t flags;
> --
> 2.21.0
>
--
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK
"Dr. David Alan Gilbert" <dgilbert@redhat.com> wrote:
> * Juan Quintela (quintela@redhat.com) wrote:
>> This is still a work in progress, but get everything sent as expected
>> and it is faster than the code that is already there.
>>
>> Signed-off-by: Juan Quintela <quintela@redhat.com>
>> ---
>> migration/ram.c | 106 +++++++++++++++++++++++++++++++++++++++++++++++-
>> 1 file changed, 104 insertions(+), 2 deletions(-)
>>
>> diff --git a/migration/ram.c b/migration/ram.c
>> index fdb5bf07a5..efbb253c1a 100644
>> --- a/migration/ram.c
>> +++ b/migration/ram.c
>> @@ -747,6 +747,100 @@ MultifdMethods multifd_none_ops = {
>> .recv_pages = none_recv_pages
>> };
>>
>> +/* Multifd zlib compression */
>> +
>
> Comment the return value?
Once there, commented all the functions.
>> +static int zlib_send_prepare(MultiFDSendParams *p, uint32_t used)
>> +{
>> + struct iovec *iov = p->pages->iov;
>> + z_stream *zs = &p->zs;
>> + uint32_t out_size = 0;
>> + int ret;
>> + int i;
>
> uint32_t to match 'used' ?
Done
>> + for (i = 0; i < used; i++) {
>> + uint32_t available = p->zbuff_len - out_size;
>> + int flush = Z_NO_FLUSH;
>> +
>> + if (i == used - 1) {
>> + flush = Z_SYNC_FLUSH;
>> + }
>> +
>> + zs->avail_in = iov[i].iov_len;
>> + zs->next_in = iov[i].iov_base;
>> +
>> + zs->avail_out = available;
>> + zs->next_out = p->zbuff + out_size;
>> +
>> + ret = deflate(zs, flush);
>> + if (ret != Z_OK) {
>> + printf("problem with deflate? %d\n", ret);
>
> If it's an error it should probably be at least an fprintf(stderr or
> err_ something.
We don't have any error arround really, we need one. Searching for it.
> Should this also check that the avail_in/next_in has consumed the whole
> of the input?
I am not checking because _it_ is supposed to b doing it right. We can
test it through, specially in reception.
>> + qemu_mutex_unlock(&p->mutex);
>
> Can you explain and/or comment whyit's unlocked here in the error path?
Uh, oh ....
Leftover for when it was done inline inside the main function.
Removed.
>> + return -1;
>> + }
>> + out_size += available - zs->avail_out;
>> + }
>> + p->next_packet_size = out_size;
>
> Some traces_ wouldn't hurt.
Humm, you are right here.
Thanks, Juan.
© 2016 - 2026 Red Hat, Inc.