[PATCH v4 14/18] migration/rdma: register memory for multifd RDMA channels

Chuan Zheng posted 18 patches 5 years ago
Maintainers: "Dr. David Alan Gilbert" <dgilbert@redhat.com>, Juan Quintela <quintela@redhat.com>
[PATCH v4 14/18] migration/rdma: register memory for multifd RDMA channels
Posted by Chuan Zheng 5 years ago
Signed-off-by: Zhimin Feng <fengzhimin1@huawei.com>
Signed-off-by: Chuan Zheng <zhengchuan@huawei.com>
---
 migration/multifd.c |  3 ++
 migration/rdma.c    | 92 +++++++++++++++++++++++++++++++++++++++++++++++++++--
 2 files changed, 93 insertions(+), 2 deletions(-)

diff --git a/migration/multifd.c b/migration/multifd.c
index 919a414..1186246 100644
--- a/migration/multifd.c
+++ b/migration/multifd.c
@@ -537,6 +537,9 @@ void multifd_send_terminate_threads(Error *err)
         qemu_mutex_lock(&p->mutex);
         p->quit = true;
         qemu_sem_post(&p->sem);
+        if (migrate_use_rdma()) {
+            qemu_sem_post(&p->sem_sync);
+        }
         qemu_mutex_unlock(&p->mutex);
     }
 }
diff --git a/migration/rdma.c b/migration/rdma.c
index 1095a8f..c906cc7 100644
--- a/migration/rdma.c
+++ b/migration/rdma.c
@@ -3838,6 +3838,19 @@ static int rdma_load_hook(QEMUFile *f, void *opaque, uint64_t flags, void *data)
         return rdma_block_notification_handle(opaque, data);
 
     case RAM_CONTROL_HOOK:
+        if (migrate_use_multifd()) {
+            int i;
+            MultiFDRecvParams *multifd_recv_param = NULL;
+            int thread_count = migrate_multifd_channels();
+            /* Inform dest recv_thread to poll */
+            for (i = 0; i < thread_count; i++) {
+                if (get_multifd_recv_param(i, &multifd_recv_param)) {
+                    return -1;
+                }
+                qemu_sem_post(&multifd_recv_param->sem_sync);
+            }
+        }
+
         return qemu_rdma_registration_handle(f, opaque);
 
     default:
@@ -3910,6 +3923,24 @@ static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque,
         head.type = RDMA_CONTROL_RAM_BLOCKS_REQUEST;
         trace_qemu_rdma_registration_stop_ram();
 
+        if (migrate_use_multifd()) {
+            /*
+             * Inform the multifd channels to register memory
+             */
+            int i;
+            int thread_count = migrate_multifd_channels();
+            MultiFDSendParams *multifd_send_param = NULL;
+            for (i = 0; i < thread_count; i++) {
+                ret = get_multifd_send_param(i, &multifd_send_param);
+                if (ret) {
+                    error_report("rdma: error getting multifd(%d)", i);
+                    return ret;
+                }
+
+                qemu_sem_post(&multifd_send_param->sem_sync);
+            }
+        }
+
         /*
          * Make sure that we parallelize the pinning on both sides.
          * For very large guests, doing this serially takes a really
@@ -3968,6 +3999,21 @@ static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque,
                     rdma->dest_blocks[i].remote_host_addr;
             local->block[i].remote_rkey = rdma->dest_blocks[i].remote_rkey;
         }
+        /* Wait for all multifd channels to complete registration */
+        if (migrate_use_multifd()) {
+            int i;
+            int thread_count = migrate_multifd_channels();
+            MultiFDSendParams *multifd_send_param = NULL;
+            for (i = 0; i < thread_count; i++) {
+                ret = get_multifd_send_param(i, &multifd_send_param);
+                if (ret) {
+                    error_report("rdma: error getting multifd(%d)", i);
+                    return ret;
+                }
+
+                qemu_sem_wait(&multifd_send_param->sem);
+            }
+        }
     }
 
     trace_qemu_rdma_registration_stop(flags);
@@ -3979,6 +4025,24 @@ static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque,
         goto err;
     }
 
+    if (migrate_use_multifd()) {
+        /*
+         * Inform src send_thread to send FINISHED signal.
+         * Wait for multifd RDMA send threads to poll the CQE.
+         */
+        int i;
+        int thread_count = migrate_multifd_channels();
+        MultiFDSendParams *multifd_send_param = NULL;
+        for (i = 0; i < thread_count; i++) {
+            ret = get_multifd_send_param(i, &multifd_send_param);
+            if (ret < 0) {
+                goto err;
+            }
+
+            qemu_sem_post(&multifd_send_param->sem_sync);
+        }
+    }
+
     return 0;
 err:
     rdma->error_state = ret;
@@ -4355,19 +4419,37 @@ static void *multifd_rdma_send_thread(void *opaque)
 {
     MultiFDSendParams *p = opaque;
     Error *local_err = NULL;
+    int ret = 0;
+    RDMAControlHeader head = { .len = 0, .repeat = 1 };
 
     trace_multifd_send_thread_start(p->id);
     if (multifd_send_initial_packet(p, &local_err) < 0) {
         goto out;
     }
 
+    /* wait for semaphore notification to register memory */
+    qemu_sem_wait(&p->sem_sync);
+    if (qemu_rdma_registration(p->rdma) < 0) {
+        goto out;
+    }
+    /*
+     * Inform the main RDMA thread to run when multifd
+     * RDMA thread have completed registration.
+     */
+    qemu_sem_post(&p->sem);
     while (true) {
+        qemu_sem_wait(&p->sem_sync);
         WITH_QEMU_LOCK_GUARD(&p->mutex) {
             if (p->quit) {
                 break;
             }
         }
-        qemu_sem_wait(&p->sem);
+        /* Send FINISHED to the destination */
+        head.type = RDMA_CONTROL_REGISTER_FINISHED;
+        ret = qemu_rdma_exchange_send(p->rdma, &head, NULL, NULL, NULL, NULL);
+        if (ret < 0) {
+            return NULL;
+        }
     }
 
 out:
@@ -4406,14 +4488,20 @@ static void multifd_rdma_send_channel_setup(MultiFDSendParams *p)
 static void *multifd_rdma_recv_thread(void *opaque)
 {
     MultiFDRecvParams *p = opaque;
+    int ret = 0;
 
     while (true) {
+        qemu_sem_wait(&p->sem_sync);
         WITH_QEMU_LOCK_GUARD(&p->mutex) {
             if (p->quit) {
                 break;
             }
         }
-        qemu_sem_wait(&p->sem_sync);
+        ret = qemu_rdma_registration_handle(p->file, p->c);
+        if (ret < 0) {
+            qemu_file_set_error(p->file, ret);
+            break;
+        }
     }
 
     WITH_QEMU_LOCK_GUARD(&p->mutex) {
-- 
1.8.3.1


Re: [PATCH v4 14/18] migration/rdma: register memory for multifd RDMA channels
Posted by Dr. David Alan Gilbert 5 years ago
* Chuan Zheng (zhengchuan@huawei.com) wrote:
> Signed-off-by: Zhimin Feng <fengzhimin1@huawei.com>
> Signed-off-by: Chuan Zheng <zhengchuan@huawei.com>

This could do with a description in the commit message of the sequence;
I think you're waiting for the semaphore; doing the registratin, then
waiting again to say that everyone has finished???

> ---
>  migration/multifd.c |  3 ++
>  migration/rdma.c    | 92 +++++++++++++++++++++++++++++++++++++++++++++++++++--
>  2 files changed, 93 insertions(+), 2 deletions(-)
> 
> diff --git a/migration/multifd.c b/migration/multifd.c
> index 919a414..1186246 100644
> --- a/migration/multifd.c
> +++ b/migration/multifd.c
> @@ -537,6 +537,9 @@ void multifd_send_terminate_threads(Error *err)
>          qemu_mutex_lock(&p->mutex);
>          p->quit = true;
>          qemu_sem_post(&p->sem);
> +        if (migrate_use_rdma()) {
> +            qemu_sem_post(&p->sem_sync);
> +        }
>          qemu_mutex_unlock(&p->mutex);
>      }
>  }
> diff --git a/migration/rdma.c b/migration/rdma.c
> index 1095a8f..c906cc7 100644
> --- a/migration/rdma.c
> +++ b/migration/rdma.c
> @@ -3838,6 +3838,19 @@ static int rdma_load_hook(QEMUFile *f, void *opaque, uint64_t flags, void *data)
>          return rdma_block_notification_handle(opaque, data);
>  
>      case RAM_CONTROL_HOOK:
> +        if (migrate_use_multifd()) {
> +            int i;
> +            MultiFDRecvParams *multifd_recv_param = NULL;
> +            int thread_count = migrate_multifd_channels();
> +            /* Inform dest recv_thread to poll */
> +            for (i = 0; i < thread_count; i++) {
> +                if (get_multifd_recv_param(i, &multifd_recv_param)) {
> +                    return -1;
> +                }
> +                qemu_sem_post(&multifd_recv_param->sem_sync);
> +            }
> +        }
> +
>          return qemu_rdma_registration_handle(f, opaque);
>  
>      default:
> @@ -3910,6 +3923,24 @@ static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque,
>          head.type = RDMA_CONTROL_RAM_BLOCKS_REQUEST;
>          trace_qemu_rdma_registration_stop_ram();
>  
> +        if (migrate_use_multifd()) {
> +            /*
> +             * Inform the multifd channels to register memory
> +             */
> +            int i;
> +            int thread_count = migrate_multifd_channels();
> +            MultiFDSendParams *multifd_send_param = NULL;
> +            for (i = 0; i < thread_count; i++) {
> +                ret = get_multifd_send_param(i, &multifd_send_param);
> +                if (ret) {
> +                    error_report("rdma: error getting multifd(%d)", i);
> +                    return ret;
> +                }
> +
> +                qemu_sem_post(&multifd_send_param->sem_sync);
> +            }
> +        }
> +
>          /*
>           * Make sure that we parallelize the pinning on both sides.
>           * For very large guests, doing this serially takes a really
> @@ -3968,6 +3999,21 @@ static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque,
>                      rdma->dest_blocks[i].remote_host_addr;
>              local->block[i].remote_rkey = rdma->dest_blocks[i].remote_rkey;
>          }
> +        /* Wait for all multifd channels to complete registration */
> +        if (migrate_use_multifd()) {
> +            int i;
> +            int thread_count = migrate_multifd_channels();
> +            MultiFDSendParams *multifd_send_param = NULL;
> +            for (i = 0; i < thread_count; i++) {
> +                ret = get_multifd_send_param(i, &multifd_send_param);
> +                if (ret) {
> +                    error_report("rdma: error getting multifd(%d)", i);
> +                    return ret;
> +                }
> +
> +                qemu_sem_wait(&multifd_send_param->sem);
> +            }
> +        }
>      }
>  
>      trace_qemu_rdma_registration_stop(flags);
> @@ -3979,6 +4025,24 @@ static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque,
>          goto err;
>      }
>  
> +    if (migrate_use_multifd()) {
> +        /*
> +         * Inform src send_thread to send FINISHED signal.
> +         * Wait for multifd RDMA send threads to poll the CQE.
> +         */
> +        int i;
> +        int thread_count = migrate_multifd_channels();
> +        MultiFDSendParams *multifd_send_param = NULL;
> +        for (i = 0; i < thread_count; i++) {
> +            ret = get_multifd_send_param(i, &multifd_send_param);
> +            if (ret < 0) {
> +                goto err;
> +            }
> +
> +            qemu_sem_post(&multifd_send_param->sem_sync);
> +        }
> +    }
> +
>      return 0;
>  err:
>      rdma->error_state = ret;
> @@ -4355,19 +4419,37 @@ static void *multifd_rdma_send_thread(void *opaque)
>  {
>      MultiFDSendParams *p = opaque;
>      Error *local_err = NULL;
> +    int ret = 0;
> +    RDMAControlHeader head = { .len = 0, .repeat = 1 };
>  
>      trace_multifd_send_thread_start(p->id);
>      if (multifd_send_initial_packet(p, &local_err) < 0) {
>          goto out;
>      }
>  
> +    /* wait for semaphore notification to register memory */
> +    qemu_sem_wait(&p->sem_sync);
> +    if (qemu_rdma_registration(p->rdma) < 0) {
> +        goto out;
> +    }
> +    /*
> +     * Inform the main RDMA thread to run when multifd
> +     * RDMA thread have completed registration.
> +     */
> +    qemu_sem_post(&p->sem);
>      while (true) {
> +        qemu_sem_wait(&p->sem_sync);
>          WITH_QEMU_LOCK_GUARD(&p->mutex) {
>              if (p->quit) {
>                  break;
>              }
>          }
> -        qemu_sem_wait(&p->sem);

Is this something where you put the line in just a few patches earlier -
if so, put it in the right place in the original patch.

Dave

> +        /* Send FINISHED to the destination */
> +        head.type = RDMA_CONTROL_REGISTER_FINISHED;
> +        ret = qemu_rdma_exchange_send(p->rdma, &head, NULL, NULL, NULL, NULL);
> +        if (ret < 0) {
> +            return NULL;
> +        }
>      }
>  
>  out:
> @@ -4406,14 +4488,20 @@ static void multifd_rdma_send_channel_setup(MultiFDSendParams *p)
>  static void *multifd_rdma_recv_thread(void *opaque)
>  {
>      MultiFDRecvParams *p = opaque;
> +    int ret = 0;
>  
>      while (true) {
> +        qemu_sem_wait(&p->sem_sync);
>          WITH_QEMU_LOCK_GUARD(&p->mutex) {
>              if (p->quit) {
>                  break;
>              }
>          }
> -        qemu_sem_wait(&p->sem_sync);
> +        ret = qemu_rdma_registration_handle(p->file, p->c);
> +        if (ret < 0) {
> +            qemu_file_set_error(p->file, ret);
> +            break;
> +        }
>      }
>  
>      WITH_QEMU_LOCK_GUARD(&p->mutex) {
> -- 
> 1.8.3.1
> 
-- 
Dr. David Alan Gilbert / dgilbert@redhat.com / Manchester, UK


Re: [PATCH v4 14/18] migration/rdma: register memory for multifd RDMA channels
Posted by Zheng Chuan 4 years, 11 months ago

On 2021/2/4 4:12, Dr. David Alan Gilbert wrote:
> * Chuan Zheng (zhengchuan@huawei.com) wrote:
>> Signed-off-by: Zhimin Feng <fengzhimin1@huawei.com>
>> Signed-off-by: Chuan Zheng <zhengchuan@huawei.com>
> 
> This could do with a description in the commit message of the sequence;
> I think you're waiting for the semaphore; doing the registratin, then
> waiting again to say that everyone has finished???
> 
Yes. The detailed description will be added in v5.
>> ---
>>  migration/multifd.c |  3 ++
>>  migration/rdma.c    | 92 +++++++++++++++++++++++++++++++++++++++++++++++++++--
>>  2 files changed, 93 insertions(+), 2 deletions(-)
>>
>> diff --git a/migration/multifd.c b/migration/multifd.c
>> index 919a414..1186246 100644
>> --- a/migration/multifd.c
>> +++ b/migration/multifd.c
>> @@ -537,6 +537,9 @@ void multifd_send_terminate_threads(Error *err)
>>          qemu_mutex_lock(&p->mutex);
>>          p->quit = true;
>>          qemu_sem_post(&p->sem);
>> +        if (migrate_use_rdma()) {
>> +            qemu_sem_post(&p->sem_sync);
>> +        }
>>          qemu_mutex_unlock(&p->mutex);
>>      }
>>  }
>> diff --git a/migration/rdma.c b/migration/rdma.c
>> index 1095a8f..c906cc7 100644
>> --- a/migration/rdma.c
>> +++ b/migration/rdma.c
>> @@ -3838,6 +3838,19 @@ static int rdma_load_hook(QEMUFile *f, void *opaque, uint64_t flags, void *data)
>>          return rdma_block_notification_handle(opaque, data);
>>  
>>      case RAM_CONTROL_HOOK:
>> +        if (migrate_use_multifd()) {
>> +            int i;
>> +            MultiFDRecvParams *multifd_recv_param = NULL;
>> +            int thread_count = migrate_multifd_channels();
>> +            /* Inform dest recv_thread to poll */
>> +            for (i = 0; i < thread_count; i++) {
>> +                if (get_multifd_recv_param(i, &multifd_recv_param)) {
>> +                    return -1;
>> +                }
>> +                qemu_sem_post(&multifd_recv_param->sem_sync);
>> +            }
>> +        }
>> +
>>          return qemu_rdma_registration_handle(f, opaque);
>>  
>>      default:
>> @@ -3910,6 +3923,24 @@ static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque,
>>          head.type = RDMA_CONTROL_RAM_BLOCKS_REQUEST;
>>          trace_qemu_rdma_registration_stop_ram();
>>  
>> +        if (migrate_use_multifd()) {
>> +            /*
>> +             * Inform the multifd channels to register memory
>> +             */
>> +            int i;
>> +            int thread_count = migrate_multifd_channels();
>> +            MultiFDSendParams *multifd_send_param = NULL;
>> +            for (i = 0; i < thread_count; i++) {
>> +                ret = get_multifd_send_param(i, &multifd_send_param);
>> +                if (ret) {
>> +                    error_report("rdma: error getting multifd(%d)", i);
>> +                    return ret;
>> +                }
>> +
>> +                qemu_sem_post(&multifd_send_param->sem_sync);
>> +            }
>> +        }
>> +
>>          /*
>>           * Make sure that we parallelize the pinning on both sides.
>>           * For very large guests, doing this serially takes a really
>> @@ -3968,6 +3999,21 @@ static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque,
>>                      rdma->dest_blocks[i].remote_host_addr;
>>              local->block[i].remote_rkey = rdma->dest_blocks[i].remote_rkey;
>>          }
>> +        /* Wait for all multifd channels to complete registration */
>> +        if (migrate_use_multifd()) {
>> +            int i;
>> +            int thread_count = migrate_multifd_channels();
>> +            MultiFDSendParams *multifd_send_param = NULL;
>> +            for (i = 0; i < thread_count; i++) {
>> +                ret = get_multifd_send_param(i, &multifd_send_param);
>> +                if (ret) {
>> +                    error_report("rdma: error getting multifd(%d)", i);
>> +                    return ret;
>> +                }
>> +
>> +                qemu_sem_wait(&multifd_send_param->sem);
>> +            }
>> +        }
>>      }
>>  
>>      trace_qemu_rdma_registration_stop(flags);
>> @@ -3979,6 +4025,24 @@ static int qemu_rdma_registration_stop(QEMUFile *f, void *opaque,
>>          goto err;
>>      }
>>  
>> +    if (migrate_use_multifd()) {
>> +        /*
>> +         * Inform src send_thread to send FINISHED signal.
>> +         * Wait for multifd RDMA send threads to poll the CQE.
>> +         */
>> +        int i;
>> +        int thread_count = migrate_multifd_channels();
>> +        MultiFDSendParams *multifd_send_param = NULL;
>> +        for (i = 0; i < thread_count; i++) {
>> +            ret = get_multifd_send_param(i, &multifd_send_param);
>> +            if (ret < 0) {
>> +                goto err;
>> +            }
>> +
>> +            qemu_sem_post(&multifd_send_param->sem_sync);
>> +        }
>> +    }
>> +
>>      return 0;
>>  err:
>>      rdma->error_state = ret;
>> @@ -4355,19 +4419,37 @@ static void *multifd_rdma_send_thread(void *opaque)
>>  {
>>      MultiFDSendParams *p = opaque;
>>      Error *local_err = NULL;
>> +    int ret = 0;
>> +    RDMAControlHeader head = { .len = 0, .repeat = 1 };
>>  
>>      trace_multifd_send_thread_start(p->id);
>>      if (multifd_send_initial_packet(p, &local_err) < 0) {
>>          goto out;
>>      }
>>  
>> +    /* wait for semaphore notification to register memory */
>> +    qemu_sem_wait(&p->sem_sync);
>> +    if (qemu_rdma_registration(p->rdma) < 0) {
>> +        goto out;
>> +    }
>> +    /*
>> +     * Inform the main RDMA thread to run when multifd
>> +     * RDMA thread have completed registration.
>> +     */
>> +    qemu_sem_post(&p->sem);
>>      while (true) {
>> +        qemu_sem_wait(&p->sem_sync);
>>          WITH_QEMU_LOCK_GUARD(&p->mutex) {
>>              if (p->quit) {
>>                  break;
>>              }
>>          }
>> -        qemu_sem_wait(&p->sem);
> 
> Is this something where you put the line in just a few patches earlier -
> if so, put it in the right place in the original patch.
> 
> Dave
> 
My mistake. this is wrong place in patch-004, will correct it. Thanks.

>> +        /* Send FINISHED to the destination */
>> +        head.type = RDMA_CONTROL_REGISTER_FINISHED;
>> +        ret = qemu_rdma_exchange_send(p->rdma, &head, NULL, NULL, NULL, NULL);
>> +        if (ret < 0) {
>> +            return NULL;
>> +        }
>>      }
>>  
>>  out:
>> @@ -4406,14 +4488,20 @@ static void multifd_rdma_send_channel_setup(MultiFDSendParams *p)
>>  static void *multifd_rdma_recv_thread(void *opaque)
>>  {
>>      MultiFDRecvParams *p = opaque;
>> +    int ret = 0;
>>  
>>      while (true) {
>> +        qemu_sem_wait(&p->sem_sync);
>>          WITH_QEMU_LOCK_GUARD(&p->mutex) {
>>              if (p->quit) {
>>                  break;
>>              }
>>          }
>> -        qemu_sem_wait(&p->sem_sync);
>> +        ret = qemu_rdma_registration_handle(p->file, p->c);
>> +        if (ret < 0) {
>> +            qemu_file_set_error(p->file, ret);
>> +            break;
>> +        }
>>      }
>>  
>>      WITH_QEMU_LOCK_GUARD(&p->mutex) {
>> -- 
>> 1.8.3.1
>>

-- 
Regards.
Chuan