[PATCH v3 22/24] vfio/migration: Multifd device state transfer support - receive side

Maciej S. Szmigiero posted 24 patches 1 year, 2 months ago
There is a newer version of this series
[PATCH v3 22/24] vfio/migration: Multifd device state transfer support - receive side
Posted by Maciej S. Szmigiero 1 year, 2 months ago
From: "Maciej S. Szmigiero" <maciej.szmigiero@oracle.com>

The multifd received data needs to be reassembled since device state
packets sent via different multifd channels can arrive out-of-order.

Therefore, each VFIO device state packet carries a header indicating its
position in the stream.

The last such VFIO device state packet should have
VFIO_DEVICE_STATE_CONFIG_STATE flag set and carry the device config state.

Since it's important to finish loading device state transferred via the
main migration channel (via save_live_iterate SaveVMHandler) before
starting loading the data asynchronously transferred via multifd the thread
doing the actual loading of the multifd transferred data is only started
from switchover_start SaveVMHandler.

switchover_start handler is called when MIG_CMD_SWITCHOVER_START
sub-command of QEMU_VM_COMMAND is received via the main migration channel.

This sub-command is only sent after all save_live_iterate data have already
been posted so it is safe to commence loading of the multifd-transferred
device state upon receiving it - loading of save_live_iterate data happens
synchronously in the main migration thread (much like the processing of
MIG_CMD_SWITCHOVER_START) so by the time MIG_CMD_SWITCHOVER_START is
processed all the proceeding data must have already been loaded.

Signed-off-by: Maciej S. Szmigiero <maciej.szmigiero@oracle.com>
---
 hw/vfio/migration.c           | 402 ++++++++++++++++++++++++++++++++++
 hw/vfio/pci.c                 |   2 +
 hw/vfio/trace-events          |   6 +
 include/hw/vfio/vfio-common.h |  19 ++
 4 files changed, 429 insertions(+)

diff --git a/hw/vfio/migration.c b/hw/vfio/migration.c
index 683f2ae98d5e..b54879fe6209 100644
--- a/hw/vfio/migration.c
+++ b/hw/vfio/migration.c
@@ -15,6 +15,7 @@
 #include <linux/vfio.h>
 #include <sys/ioctl.h>
 
+#include "io/channel-buffer.h"
 #include "sysemu/runstate.h"
 #include "hw/vfio/vfio-common.h"
 #include "migration/misc.h"
@@ -55,6 +56,15 @@
  */
 #define VFIO_MIG_DEFAULT_DATA_BUFFER_SIZE (1 * MiB)
 
+#define VFIO_DEVICE_STATE_CONFIG_STATE (1)
+
+typedef struct VFIODeviceStatePacket {
+    uint32_t version;
+    uint32_t idx;
+    uint32_t flags;
+    uint8_t data[0];
+} QEMU_PACKED VFIODeviceStatePacket;
+
 static int64_t bytes_transferred;
 
 static const char *mig_state_to_str(enum vfio_device_mig_state state)
@@ -254,6 +264,292 @@ static int vfio_load_buffer(QEMUFile *f, VFIODevice *vbasedev,
     return ret;
 }
 
+typedef struct VFIOStateBuffer {
+    bool is_present;
+    char *data;
+    size_t len;
+} VFIOStateBuffer;
+
+static void vfio_state_buffer_clear(gpointer data)
+{
+    VFIOStateBuffer *lb = data;
+
+    if (!lb->is_present) {
+        return;
+    }
+
+    g_clear_pointer(&lb->data, g_free);
+    lb->is_present = false;
+}
+
+static void vfio_state_buffers_init(VFIOStateBuffers *bufs)
+{
+    bufs->array = g_array_new(FALSE, TRUE, sizeof(VFIOStateBuffer));
+    g_array_set_clear_func(bufs->array, vfio_state_buffer_clear);
+}
+
+static void vfio_state_buffers_destroy(VFIOStateBuffers *bufs)
+{
+    g_clear_pointer(&bufs->array, g_array_unref);
+}
+
+static void vfio_state_buffers_assert_init(VFIOStateBuffers *bufs)
+{
+    assert(bufs->array);
+}
+
+static guint vfio_state_buffers_size_get(VFIOStateBuffers *bufs)
+{
+    return bufs->array->len;
+}
+
+static void vfio_state_buffers_size_set(VFIOStateBuffers *bufs, guint size)
+{
+    g_array_set_size(bufs->array, size);
+}
+
+static VFIOStateBuffer *vfio_state_buffers_at(VFIOStateBuffers *bufs, guint idx)
+{
+    return &g_array_index(bufs->array, VFIOStateBuffer, idx);
+}
+
+static int vfio_load_state_buffer(void *opaque, char *data, size_t data_size,
+                                  Error **errp)
+{
+    VFIODevice *vbasedev = opaque;
+    VFIOMigration *migration = vbasedev->migration;
+    VFIODeviceStatePacket *packet = (VFIODeviceStatePacket *)data;
+    VFIOStateBuffer *lb;
+
+    /*
+     * Holding BQL here would violate the lock order and can cause
+     * a deadlock once we attempt to lock load_bufs_mutex below.
+     */
+    assert(!bql_locked());
+
+    if (!migration->multifd_transfer) {
+        error_setg(errp,
+                   "got device state packet but not doing multifd transfer");
+        return -1;
+    }
+
+    if (data_size < sizeof(*packet)) {
+        error_setg(errp, "packet too short at %zu (min is %zu)",
+                   data_size, sizeof(*packet));
+        return -1;
+    }
+
+    if (packet->version != 0) {
+        error_setg(errp, "packet has unknown version %" PRIu32,
+                   packet->version);
+        return -1;
+    }
+
+    if (packet->idx == UINT32_MAX) {
+        error_setg(errp, "packet has too high idx %" PRIu32,
+                   packet->idx);
+        return -1;
+    }
+
+    trace_vfio_load_state_device_buffer_incoming(vbasedev->name, packet->idx);
+
+    QEMU_LOCK_GUARD(&migration->load_bufs_mutex);
+
+    /* config state packet should be the last one in the stream */
+    if (packet->flags & VFIO_DEVICE_STATE_CONFIG_STATE) {
+        migration->load_buf_idx_last = packet->idx;
+    }
+
+    vfio_state_buffers_assert_init(&migration->load_bufs);
+    if (packet->idx >= vfio_state_buffers_size_get(&migration->load_bufs)) {
+        vfio_state_buffers_size_set(&migration->load_bufs, packet->idx + 1);
+    }
+
+    lb = vfio_state_buffers_at(&migration->load_bufs, packet->idx);
+    if (lb->is_present) {
+        error_setg(errp, "state buffer %" PRIu32 " already filled",
+                   packet->idx);
+        return -1;
+    }
+
+    assert(packet->idx >= migration->load_buf_idx);
+
+    migration->load_buf_queued_pending_buffers++;
+    if (migration->load_buf_queued_pending_buffers >
+        vbasedev->migration_max_queued_buffers) {
+        error_setg(errp,
+                   "queuing state buffer %" PRIu32 " would exceed the max of %" PRIu64,
+                   packet->idx, vbasedev->migration_max_queued_buffers);
+        return -1;
+    }
+
+    lb->data = g_memdup2(&packet->data, data_size - sizeof(*packet));
+    lb->len = data_size - sizeof(*packet);
+    lb->is_present = true;
+
+    qemu_cond_signal(&migration->load_bufs_buffer_ready_cond);
+
+    return 0;
+}
+
+static int vfio_load_device_config_state(QEMUFile *f, void *opaque);
+
+static int vfio_load_bufs_thread_load_config(VFIODevice *vbasedev)
+{
+    VFIOMigration *migration = vbasedev->migration;
+    VFIOStateBuffer *lb;
+    g_autoptr(QIOChannelBuffer) bioc = NULL;
+    QEMUFile *f_out = NULL, *f_in = NULL;
+    uint64_t mig_header;
+    int ret;
+
+    assert(migration->load_buf_idx == migration->load_buf_idx_last);
+    lb = vfio_state_buffers_at(&migration->load_bufs, migration->load_buf_idx);
+    assert(lb->is_present);
+
+    bioc = qio_channel_buffer_new(lb->len);
+    qio_channel_set_name(QIO_CHANNEL(bioc), "vfio-device-config-load");
+
+    f_out = qemu_file_new_output(QIO_CHANNEL(bioc));
+    qemu_put_buffer(f_out, (uint8_t *)lb->data, lb->len);
+
+    ret = qemu_fflush(f_out);
+    if (ret) {
+        g_clear_pointer(&f_out, qemu_fclose);
+        return ret;
+    }
+
+    qio_channel_io_seek(QIO_CHANNEL(bioc), 0, 0, NULL);
+    f_in = qemu_file_new_input(QIO_CHANNEL(bioc));
+
+    mig_header = qemu_get_be64(f_in);
+    if (mig_header != VFIO_MIG_FLAG_DEV_CONFIG_STATE) {
+        g_clear_pointer(&f_out, qemu_fclose);
+        g_clear_pointer(&f_in, qemu_fclose);
+        return -EINVAL;
+    }
+
+    bql_lock();
+    ret = vfio_load_device_config_state(f_in, vbasedev);
+    bql_unlock();
+
+    g_clear_pointer(&f_out, qemu_fclose);
+    g_clear_pointer(&f_in, qemu_fclose);
+    if (ret < 0) {
+        return ret;
+    }
+
+    return 0;
+}
+
+static bool vfio_load_bufs_thread_want_abort(VFIODevice *vbasedev,
+                                             bool *abort_flag)
+{
+    VFIOMigration *migration = vbasedev->migration;
+
+    return migration->load_bufs_thread_want_exit || qatomic_read(abort_flag);
+}
+
+static int vfio_load_bufs_thread(bool *abort_flag, void *opaque)
+{
+    VFIODevice *vbasedev = opaque;
+    VFIOMigration *migration = vbasedev->migration;
+    QEMU_LOCK_GUARD(&migration->load_bufs_mutex);
+    int ret;
+
+    assert(migration->load_bufs_thread_running);
+
+    while (!vfio_load_bufs_thread_want_abort(vbasedev, abort_flag)) {
+        VFIOStateBuffer *lb;
+        guint bufs_len;
+        bool starved;
+
+        assert(migration->load_buf_idx <= migration->load_buf_idx_last);
+
+        bufs_len = vfio_state_buffers_size_get(&migration->load_bufs);
+        if (migration->load_buf_idx >= bufs_len) {
+            assert(migration->load_buf_idx == bufs_len);
+            starved = true;
+        } else {
+            lb = vfio_state_buffers_at(&migration->load_bufs,
+                                       migration->load_buf_idx);
+            starved = !lb->is_present;
+        }
+
+        if (starved) {
+            trace_vfio_load_state_device_buffer_starved(vbasedev->name,
+                                                        migration->load_buf_idx);
+            qemu_cond_wait(&migration->load_bufs_buffer_ready_cond,
+                           &migration->load_bufs_mutex);
+            continue;
+        }
+
+        if (migration->load_buf_idx == migration->load_buf_idx_last) {
+            break;
+        }
+
+        if (migration->load_buf_idx == 0) {
+            trace_vfio_load_state_device_buffer_start(vbasedev->name);
+        }
+
+        if (lb->len) {
+            g_autofree char *buf = NULL;
+            size_t buf_len;
+            ssize_t wr_ret;
+            int errno_save;
+
+            trace_vfio_load_state_device_buffer_load_start(vbasedev->name,
+                                                           migration->load_buf_idx);
+
+            /* lb might become re-allocated when we drop the lock */
+            buf = g_steal_pointer(&lb->data);
+            buf_len = lb->len;
+
+            /*
+             * Loading data to the device takes a while,
+             * drop the lock during this process.
+             */
+            qemu_mutex_unlock(&migration->load_bufs_mutex);
+            wr_ret = write(migration->data_fd, buf, buf_len);
+            errno_save = errno;
+            qemu_mutex_lock(&migration->load_bufs_mutex);
+
+            if (wr_ret < 0) {
+                ret = -errno_save;
+                goto ret_signal;
+            } else if (wr_ret < buf_len) {
+                ret = -EINVAL;
+                goto ret_signal;
+            }
+
+            trace_vfio_load_state_device_buffer_load_end(vbasedev->name,
+                                                         migration->load_buf_idx);
+        }
+
+        assert(migration->load_buf_queued_pending_buffers > 0);
+        migration->load_buf_queued_pending_buffers--;
+
+        if (migration->load_buf_idx == migration->load_buf_idx_last - 1) {
+            trace_vfio_load_state_device_buffer_end(vbasedev->name);
+        }
+
+        migration->load_buf_idx++;
+    }
+
+    if (vfio_load_bufs_thread_want_abort(vbasedev, abort_flag)) {
+        ret = -ECANCELED;
+        goto ret_signal;
+    }
+
+    ret = vfio_load_bufs_thread_load_config(vbasedev);
+
+ret_signal:
+    migration->load_bufs_thread_running = false;
+    qemu_cond_signal(&migration->load_bufs_thread_finished_cond);
+
+    return ret;
+}
+
 static int vfio_save_device_config_state(QEMUFile *f, void *opaque,
                                          Error **errp)
 {
@@ -430,6 +726,12 @@ static bool vfio_precopy_supported(VFIODevice *vbasedev)
     return migration->mig_flags & VFIO_MIGRATION_PRE_COPY;
 }
 
+static bool vfio_multifd_transfer_supported(void)
+{
+    return migration_has_device_state_support() &&
+        migrate_send_switchover_start();
+}
+
 /* ---------------------------------------------------------------------- */
 
 static int vfio_save_prepare(void *opaque, Error **errp)
@@ -695,17 +997,73 @@ static int vfio_load_setup(QEMUFile *f, void *opaque, Error **errp)
 
     assert(!migration->load_setup);
 
+    /*
+     * Make a copy of this setting at the start in case it is changed
+     * mid-migration.
+     */
+    if (vbasedev->migration_multifd_transfer == ON_OFF_AUTO_AUTO) {
+        migration->multifd_transfer = vfio_multifd_transfer_supported();
+    } else {
+        migration->multifd_transfer =
+            vbasedev->migration_multifd_transfer == ON_OFF_AUTO_ON;
+    }
+
+    if (migration->multifd_transfer && !vfio_multifd_transfer_supported()) {
+        error_setg(errp,
+                   "%s: Multifd device transfer requested but unsupported in the current config",
+                   vbasedev->name);
+        return -EINVAL;
+    }
+
     ret = vfio_migration_set_state(vbasedev, VFIO_DEVICE_STATE_RESUMING,
                                    migration->device_state, errp);
     if (ret) {
         return ret;
     }
 
+    if (migration->multifd_transfer) {
+        assert(!migration->load_bufs.array);
+        vfio_state_buffers_init(&migration->load_bufs);
+
+        qemu_mutex_init(&migration->load_bufs_mutex);
+
+        migration->load_buf_idx = 0;
+        migration->load_buf_idx_last = UINT32_MAX;
+        migration->load_buf_queued_pending_buffers = 0;
+        qemu_cond_init(&migration->load_bufs_buffer_ready_cond);
+
+        migration->load_bufs_thread_running = false;
+        migration->load_bufs_thread_want_exit = false;
+        qemu_cond_init(&migration->load_bufs_thread_finished_cond);
+    }
+
     migration->load_setup = true;
 
     return 0;
 }
 
+static void vfio_load_cleanup_load_bufs_thread(VFIODevice *vbasedev)
+{
+    VFIOMigration *migration = vbasedev->migration;
+
+    /* The lock order is load_bufs_mutex -> BQL so unlock BQL here first */
+    bql_unlock();
+    WITH_QEMU_LOCK_GUARD(&migration->load_bufs_mutex) {
+        if (!migration->load_bufs_thread_running) {
+            break;
+        }
+
+        migration->load_bufs_thread_want_exit = true;
+
+        qemu_cond_signal(&migration->load_bufs_buffer_ready_cond);
+        qemu_cond_wait(&migration->load_bufs_thread_finished_cond,
+                       &migration->load_bufs_mutex);
+
+        assert(!migration->load_bufs_thread_running);
+    }
+    bql_lock();
+}
+
 static int vfio_load_cleanup(void *opaque)
 {
     VFIODevice *vbasedev = opaque;
@@ -715,7 +1073,19 @@ static int vfio_load_cleanup(void *opaque)
         return 0;
     }
 
+    if (migration->multifd_transfer) {
+        vfio_load_cleanup_load_bufs_thread(vbasedev);
+    }
+
     vfio_migration_cleanup(vbasedev);
+
+    if (migration->multifd_transfer) {
+        qemu_cond_destroy(&migration->load_bufs_thread_finished_cond);
+        vfio_state_buffers_destroy(&migration->load_bufs);
+        qemu_cond_destroy(&migration->load_bufs_buffer_ready_cond);
+        qemu_mutex_destroy(&migration->load_bufs_mutex);
+    }
+
     migration->load_setup = false;
     trace_vfio_load_cleanup(vbasedev->name);
 
@@ -725,6 +1095,7 @@ static int vfio_load_cleanup(void *opaque)
 static int vfio_load_state(QEMUFile *f, void *opaque, int version_id)
 {
     VFIODevice *vbasedev = opaque;
+    VFIOMigration *migration = vbasedev->migration;
     int ret = 0;
     uint64_t data;
 
@@ -736,6 +1107,12 @@ static int vfio_load_state(QEMUFile *f, void *opaque, int version_id)
         switch (data) {
         case VFIO_MIG_FLAG_DEV_CONFIG_STATE:
         {
+            if (migration->multifd_transfer) {
+                error_report("%s: got DEV_CONFIG_STATE but doing multifd transfer",
+                             vbasedev->name);
+                return -EINVAL;
+            }
+
             return vfio_load_device_config_state(f, opaque);
         }
         case VFIO_MIG_FLAG_DEV_SETUP_STATE:
@@ -801,6 +1178,29 @@ static bool vfio_switchover_ack_needed(void *opaque)
     return vfio_precopy_supported(vbasedev);
 }
 
+static int vfio_switchover_start(void *opaque)
+{
+    VFIODevice *vbasedev = opaque;
+    VFIOMigration *migration = vbasedev->migration;
+
+    if (!migration->multifd_transfer) {
+        /* Load thread is only used for multifd transfer */
+        return 0;
+    }
+
+    /* The lock order is load_bufs_mutex -> BQL so unlock BQL here first */
+    bql_unlock();
+    WITH_QEMU_LOCK_GUARD(&migration->load_bufs_mutex) {
+        assert(!migration->load_bufs_thread_running);
+        migration->load_bufs_thread_running = true;
+    }
+    bql_lock();
+
+    qemu_loadvm_start_load_thread(vfio_load_bufs_thread, vbasedev);
+
+    return 0;
+}
+
 static const SaveVMHandlers savevm_vfio_handlers = {
     .save_prepare = vfio_save_prepare,
     .save_setup = vfio_save_setup,
@@ -814,7 +1214,9 @@ static const SaveVMHandlers savevm_vfio_handlers = {
     .load_setup = vfio_load_setup,
     .load_cleanup = vfio_load_cleanup,
     .load_state = vfio_load_state,
+    .load_state_buffer = vfio_load_state_buffer,
     .switchover_ack_needed = vfio_switchover_ack_needed,
+    .switchover_start = vfio_switchover_start,
 };
 
 /* ---------------------------------------------------------------------- */
diff --git a/hw/vfio/pci.c b/hw/vfio/pci.c
index 9d547cb5cdff..72d62ada8a39 100644
--- a/hw/vfio/pci.c
+++ b/hw/vfio/pci.c
@@ -3384,6 +3384,8 @@ static Property vfio_pci_dev_properties[] = {
                 vbasedev.migration_multifd_transfer,
                 qdev_prop_on_off_auto_mutable, OnOffAuto,
                 .set_default = true, .defval.i = ON_OFF_AUTO_AUTO),
+    DEFINE_PROP_UINT64("x-migration-max-queued-buffers", VFIOPCIDevice,
+                       vbasedev.migration_max_queued_buffers, UINT64_MAX),
     DEFINE_PROP_BOOL("migration-events", VFIOPCIDevice,
                      vbasedev.migration_events, false),
     DEFINE_PROP_BOOL("x-no-mmap", VFIOPCIDevice, vbasedev.no_mmap, false),
diff --git a/hw/vfio/trace-events b/hw/vfio/trace-events
index 1bebe9877d88..418b378ebd29 100644
--- a/hw/vfio/trace-events
+++ b/hw/vfio/trace-events
@@ -153,6 +153,12 @@ vfio_load_device_config_state_start(const char *name) " (%s)"
 vfio_load_device_config_state_end(const char *name) " (%s)"
 vfio_load_state(const char *name, uint64_t data) " (%s) data 0x%"PRIx64
 vfio_load_state_device_data(const char *name, uint64_t data_size, int ret) " (%s) size %"PRIu64" ret %d"
+vfio_load_state_device_buffer_incoming(const char *name, uint32_t idx) " (%s) idx %"PRIu32
+vfio_load_state_device_buffer_start(const char *name) " (%s)"
+vfio_load_state_device_buffer_starved(const char *name, uint32_t idx) " (%s) idx %"PRIu32
+vfio_load_state_device_buffer_load_start(const char *name, uint32_t idx) " (%s) idx %"PRIu32
+vfio_load_state_device_buffer_load_end(const char *name, uint32_t idx) " (%s) idx %"PRIu32
+vfio_load_state_device_buffer_end(const char *name) " (%s)"
 vfio_migration_realize(const char *name) " (%s)"
 vfio_migration_set_device_state(const char *name, const char *state) " (%s) state %s"
 vfio_migration_set_state(const char *name, const char *new_state, const char *recover_state) " (%s) new state %s, recover state %s"
diff --git a/include/hw/vfio/vfio-common.h b/include/hw/vfio/vfio-common.h
index b1c03a82eec8..0954d6981a22 100644
--- a/include/hw/vfio/vfio-common.h
+++ b/include/hw/vfio/vfio-common.h
@@ -61,6 +61,11 @@ typedef struct VFIORegion {
     uint8_t nr; /* cache the region number for debug */
 } VFIORegion;
 
+/* type safety */
+typedef struct VFIOStateBuffers {
+    GArray *array;
+} VFIOStateBuffers;
+
 typedef struct VFIOMigration {
     struct VFIODevice *vbasedev;
     VMChangeStateEntry *vm_state;
@@ -73,10 +78,23 @@ typedef struct VFIOMigration {
     uint64_t mig_flags;
     uint64_t precopy_init_size;
     uint64_t precopy_dirty_size;
+    bool multifd_transfer;
     bool initial_data_sent;
 
     bool event_save_iterate_started;
     bool event_precopy_empty_hit;
+
+    QemuThread load_bufs_thread;
+    bool load_bufs_thread_running;
+    bool load_bufs_thread_want_exit;
+
+    VFIOStateBuffers load_bufs;
+    QemuCond load_bufs_buffer_ready_cond;
+    QemuCond load_bufs_thread_finished_cond;
+    QemuMutex load_bufs_mutex; /* Lock order: this lock -> BQL */
+    uint32_t load_buf_idx;
+    uint32_t load_buf_idx_last;
+    uint32_t load_buf_queued_pending_buffers;
 } VFIOMigration;
 
 struct VFIOGroup;
@@ -136,6 +154,7 @@ typedef struct VFIODevice {
     OnOffAuto enable_migration;
     OnOffAuto migration_multifd_transfer;
     bool migration_events;
+    uint64_t migration_max_queued_buffers;
     VFIODeviceOps *ops;
     unsigned int num_irqs;
     unsigned int num_regions;
Re: [PATCH v3 22/24] vfio/migration: Multifd device state transfer support - receive side
Posted by Avihai Horon 1 year, 2 months ago
Hi Maciej,

On 17/11/2024 21:20, Maciej S. Szmigiero wrote:
> External email: Use caution opening links or attachments
>
>
> From: "Maciej S. Szmigiero" <maciej.szmigiero@oracle.com>
>
> The multifd received data needs to be reassembled since device state
> packets sent via different multifd channels can arrive out-of-order.
>
> Therefore, each VFIO device state packet carries a header indicating its
> position in the stream.
>
> The last such VFIO device state packet should have
> VFIO_DEVICE_STATE_CONFIG_STATE flag set and carry the device config state.
>
> Since it's important to finish loading device state transferred via the
> main migration channel (via save_live_iterate SaveVMHandler) before
> starting loading the data asynchronously transferred via multifd the thread
> doing the actual loading of the multifd transferred data is only started
> from switchover_start SaveVMHandler.
>
> switchover_start handler is called when MIG_CMD_SWITCHOVER_START
> sub-command of QEMU_VM_COMMAND is received via the main migration channel.
>
> This sub-command is only sent after all save_live_iterate data have already
> been posted so it is safe to commence loading of the multifd-transferred
> device state upon receiving it - loading of save_live_iterate data happens
> synchronously in the main migration thread (much like the processing of
> MIG_CMD_SWITCHOVER_START) so by the time MIG_CMD_SWITCHOVER_START is
> processed all the proceeding data must have already been loaded.
>
> Signed-off-by: Maciej S. Szmigiero <maciej.szmigiero@oracle.com>
> ---
>   hw/vfio/migration.c           | 402 ++++++++++++++++++++++++++++++++++
>   hw/vfio/pci.c                 |   2 +
>   hw/vfio/trace-events          |   6 +
>   include/hw/vfio/vfio-common.h |  19 ++
>   4 files changed, 429 insertions(+)
>
> diff --git a/hw/vfio/migration.c b/hw/vfio/migration.c
> index 683f2ae98d5e..b54879fe6209 100644
> --- a/hw/vfio/migration.c
> +++ b/hw/vfio/migration.c
> @@ -15,6 +15,7 @@
>   #include <linux/vfio.h>
>   #include <sys/ioctl.h>
>
> +#include "io/channel-buffer.h"
>   #include "sysemu/runstate.h"
>   #include "hw/vfio/vfio-common.h"
>   #include "migration/misc.h"
> @@ -55,6 +56,15 @@
>    */
>   #define VFIO_MIG_DEFAULT_DATA_BUFFER_SIZE (1 * MiB)
>
> +#define VFIO_DEVICE_STATE_CONFIG_STATE (1)
> +
> +typedef struct VFIODeviceStatePacket {
> +    uint32_t version;
> +    uint32_t idx;
> +    uint32_t flags;
> +    uint8_t data[0];
> +} QEMU_PACKED VFIODeviceStatePacket;
> +
>   static int64_t bytes_transferred;
>
>   static const char *mig_state_to_str(enum vfio_device_mig_state state)
> @@ -254,6 +264,292 @@ static int vfio_load_buffer(QEMUFile *f, VFIODevice *vbasedev,
>       return ret;
>   }
>
> +typedef struct VFIOStateBuffer {
> +    bool is_present;
> +    char *data;
> +    size_t len;
> +} VFIOStateBuffer;
> +
> +static void vfio_state_buffer_clear(gpointer data)
> +{
> +    VFIOStateBuffer *lb = data;
> +
> +    if (!lb->is_present) {
> +        return;
> +    }
> +
> +    g_clear_pointer(&lb->data, g_free);
> +    lb->is_present = false;
> +}
> +
> +static void vfio_state_buffers_init(VFIOStateBuffers *bufs)
> +{
> +    bufs->array = g_array_new(FALSE, TRUE, sizeof(VFIOStateBuffer));
> +    g_array_set_clear_func(bufs->array, vfio_state_buffer_clear);
> +}
> +
> +static void vfio_state_buffers_destroy(VFIOStateBuffers *bufs)
> +{
> +    g_clear_pointer(&bufs->array, g_array_unref);
> +}
> +
> +static void vfio_state_buffers_assert_init(VFIOStateBuffers *bufs)
> +{
> +    assert(bufs->array);
> +}
> +
> +static guint vfio_state_buffers_size_get(VFIOStateBuffers *bufs)
> +{
> +    return bufs->array->len;
> +}
> +
> +static void vfio_state_buffers_size_set(VFIOStateBuffers *bufs, guint size)
> +{
> +    g_array_set_size(bufs->array, size);
> +}

The above three functions seem a bit too specific.

How about:
Instead of size_set and assert_init, introduce a 
vfio_state_buffers_insert() function that handles buffer insertion to 
the array from the validated packet.

Instead of size_get, introduce vfio_state_buffers_get() that handles the 
array length and is_present checks.
We can also add a vfio_state_buffer_write() function that handles 
writing the buffer to the device.

IMHO this will also make vfio_load_state_buffer() and 
vfio_load_bufs_thread(), which are rather long, clearer.

> +
> +static VFIOStateBuffer *vfio_state_buffers_at(VFIOStateBuffers *bufs, guint idx)
> +{
> +    return &g_array_index(bufs->array, VFIOStateBuffer, idx);
> +}
> +
> +static int vfio_load_state_buffer(void *opaque, char *data, size_t data_size,
> +                                  Error **errp)
> +{
> +    VFIODevice *vbasedev = opaque;
> +    VFIOMigration *migration = vbasedev->migration;
> +    VFIODeviceStatePacket *packet = (VFIODeviceStatePacket *)data;
> +    VFIOStateBuffer *lb;
> +
> +    /*
> +     * Holding BQL here would violate the lock order and can cause
> +     * a deadlock once we attempt to lock load_bufs_mutex below.
> +     */
> +    assert(!bql_locked());
> +
> +    if (!migration->multifd_transfer) {
> +        error_setg(errp,
> +                   "got device state packet but not doing multifd transfer");
> +        return -1;
> +    }
> +
> +    if (data_size < sizeof(*packet)) {
> +        error_setg(errp, "packet too short at %zu (min is %zu)",
> +                   data_size, sizeof(*packet));
> +        return -1;
> +    }
> +
> +    if (packet->version != 0) {
> +        error_setg(errp, "packet has unknown version %" PRIu32,
> +                   packet->version);
> +        return -1;
> +    }
> +
> +    if (packet->idx == UINT32_MAX) {
> +        error_setg(errp, "packet has too high idx %" PRIu32,
> +                   packet->idx);
> +        return -1;
> +    }
> +
> +    trace_vfio_load_state_device_buffer_incoming(vbasedev->name, packet->idx);
> +
> +    QEMU_LOCK_GUARD(&migration->load_bufs_mutex);
> +
> +    /* config state packet should be the last one in the stream */
> +    if (packet->flags & VFIO_DEVICE_STATE_CONFIG_STATE) {
> +        migration->load_buf_idx_last = packet->idx;
> +    }
> +
> +    vfio_state_buffers_assert_init(&migration->load_bufs);
> +    if (packet->idx >= vfio_state_buffers_size_get(&migration->load_bufs)) {
> +        vfio_state_buffers_size_set(&migration->load_bufs, packet->idx + 1);
> +    }
> +
> +    lb = vfio_state_buffers_at(&migration->load_bufs, packet->idx);
> +    if (lb->is_present) {
> +        error_setg(errp, "state buffer %" PRIu32 " already filled",
> +                   packet->idx);
> +        return -1;
> +    }
> +
> +    assert(packet->idx >= migration->load_buf_idx);
> +
> +    migration->load_buf_queued_pending_buffers++;
> +    if (migration->load_buf_queued_pending_buffers >
> +        vbasedev->migration_max_queued_buffers) {
> +        error_setg(errp,
> +                   "queuing state buffer %" PRIu32 " would exceed the max of %" PRIu64,
> +                   packet->idx, vbasedev->migration_max_queued_buffers);
> +        return -1;
> +    }

Copying my question from v2:

Should we count bytes instead of buffers? Current buffer size is 1MB but 
this could change, and the normal user should not care or know what is 
the buffer size.
So maybe rename to migration_max_pending_bytes or such?

And Maciej replied:

Since it's Peter that asked for this limit to be introduced in the first 
place
I would like to ask him what his preference here.
@Peter: max queued buffers or bytes?

So Peter, what's your opinion here?

> +
> +    lb->data = g_memdup2(&packet->data, data_size - sizeof(*packet));
> +    lb->len = data_size - sizeof(*packet);
> +    lb->is_present = true;
> +
> +    qemu_cond_signal(&migration->load_bufs_buffer_ready_cond);
> +
> +    return 0;
> +}
> +
> +static int vfio_load_device_config_state(QEMUFile *f, void *opaque);
> +
> +static int vfio_load_bufs_thread_load_config(VFIODevice *vbasedev)
> +{
> +    VFIOMigration *migration = vbasedev->migration;
> +    VFIOStateBuffer *lb;
> +    g_autoptr(QIOChannelBuffer) bioc = NULL;
> +    QEMUFile *f_out = NULL, *f_in = NULL;
> +    uint64_t mig_header;
> +    int ret;
> +
> +    assert(migration->load_buf_idx == migration->load_buf_idx_last);
> +    lb = vfio_state_buffers_at(&migration->load_bufs, migration->load_buf_idx);
> +    assert(lb->is_present);
> +
> +    bioc = qio_channel_buffer_new(lb->len);
> +    qio_channel_set_name(QIO_CHANNEL(bioc), "vfio-device-config-load");
> +
> +    f_out = qemu_file_new_output(QIO_CHANNEL(bioc));
> +    qemu_put_buffer(f_out, (uint8_t *)lb->data, lb->len);
> +
> +    ret = qemu_fflush(f_out);
> +    if (ret) {
> +        g_clear_pointer(&f_out, qemu_fclose);
> +        return ret;
> +    }
> +
> +    qio_channel_io_seek(QIO_CHANNEL(bioc), 0, 0, NULL);
> +    f_in = qemu_file_new_input(QIO_CHANNEL(bioc));
> +
> +    mig_header = qemu_get_be64(f_in);
> +    if (mig_header != VFIO_MIG_FLAG_DEV_CONFIG_STATE) {
> +        g_clear_pointer(&f_out, qemu_fclose);
> +        g_clear_pointer(&f_in, qemu_fclose);
> +        return -EINVAL;
> +    }
> +
> +    bql_lock();
> +    ret = vfio_load_device_config_state(f_in, vbasedev);
> +    bql_unlock();
> +
> +    g_clear_pointer(&f_out, qemu_fclose);
> +    g_clear_pointer(&f_in, qemu_fclose);
> +    if (ret < 0) {
> +        return ret;
> +    }
> +
> +    return 0;
> +}
> +
> +static bool vfio_load_bufs_thread_want_abort(VFIODevice *vbasedev,
> +                                             bool *abort_flag)
> +{
> +    VFIOMigration *migration = vbasedev->migration;
> +
> +    return migration->load_bufs_thread_want_exit || qatomic_read(abort_flag);
> +}
> +
> +static int vfio_load_bufs_thread(bool *abort_flag, void *opaque)
> +{
> +    VFIODevice *vbasedev = opaque;
> +    VFIOMigration *migration = vbasedev->migration;
> +    QEMU_LOCK_GUARD(&migration->load_bufs_mutex);

Move QEMU_LOCK_GUARD() below the local var declaration?
I usually don't expect to see mutex lockings as part of local var 
declaration block, which makes it easy to miss when reading the code.
(Although QEMU_LOCK_GUARD declares a local variable under the hood, it's 
implicit and not visible to the user).

> +    int ret;
> +
> +    assert(migration->load_bufs_thread_running);
> +
> +    while (!vfio_load_bufs_thread_want_abort(vbasedev, abort_flag)) {
> +        VFIOStateBuffer *lb;
> +        guint bufs_len;
> +        bool starved;
> +
> +        assert(migration->load_buf_idx <= migration->load_buf_idx_last);
> +
> +        bufs_len = vfio_state_buffers_size_get(&migration->load_bufs);
> +        if (migration->load_buf_idx >= bufs_len) {
> +            assert(migration->load_buf_idx == bufs_len);
> +            starved = true;
> +        } else {
> +            lb = vfio_state_buffers_at(&migration->load_bufs,
> +                                       migration->load_buf_idx);
> +            starved = !lb->is_present;
> +        }
> +
> +        if (starved) {
> +            trace_vfio_load_state_device_buffer_starved(vbasedev->name,
> +                                                        migration->load_buf_idx);
> +            qemu_cond_wait(&migration->load_bufs_buffer_ready_cond,
> +                           &migration->load_bufs_mutex);
> +            continue;
> +        }
> +
> +        if (migration->load_buf_idx == migration->load_buf_idx_last) {
> +            break;
> +        }
> +
> +        if (migration->load_buf_idx == 0) {
> +            trace_vfio_load_state_device_buffer_start(vbasedev->name);
> +        }
> +
> +        if (lb->len) {
> +            g_autofree char *buf = NULL;
> +            size_t buf_len;
> +            ssize_t wr_ret;
> +            int errno_save;
> +
> +            trace_vfio_load_state_device_buffer_load_start(vbasedev->name,
> +                                                           migration->load_buf_idx);
> +
> +            /* lb might become re-allocated when we drop the lock */
> +            buf = g_steal_pointer(&lb->data);
> +            buf_len = lb->len;
> +
> +            /*
> +             * Loading data to the device takes a while,
> +             * drop the lock during this process.
> +             */
> +            qemu_mutex_unlock(&migration->load_bufs_mutex);
> +            wr_ret = write(migration->data_fd, buf, buf_len);
> +            errno_save = errno;
> +            qemu_mutex_lock(&migration->load_bufs_mutex);
> +
> +            if (wr_ret < 0) {
> +                ret = -errno_save;
> +                goto ret_signal;
> +            } else if (wr_ret < buf_len) {
> +                ret = -EINVAL;
> +                goto ret_signal;
> +            }

Should we loop the write until reaching buf_len bytes?
Partial write is not considered error according to write(2) manpage.

Thanks.

> +
> +            trace_vfio_load_state_device_buffer_load_end(vbasedev->name,
> +                                                         migration->load_buf_idx);
> +        }
> +
> +        assert(migration->load_buf_queued_pending_buffers > 0);
> +        migration->load_buf_queued_pending_buffers--;
> +
> +        if (migration->load_buf_idx == migration->load_buf_idx_last - 1) {
> +            trace_vfio_load_state_device_buffer_end(vbasedev->name);
> +        }
> +
> +        migration->load_buf_idx++;
> +    }
> +
> +    if (vfio_load_bufs_thread_want_abort(vbasedev, abort_flag)) {
> +        ret = -ECANCELED;
> +        goto ret_signal;
> +    }
> +
> +    ret = vfio_load_bufs_thread_load_config(vbasedev);
> +
> +ret_signal:
> +    migration->load_bufs_thread_running = false;
> +    qemu_cond_signal(&migration->load_bufs_thread_finished_cond);
> +
> +    return ret;
> +}
> +
>   static int vfio_save_device_config_state(QEMUFile *f, void *opaque,
>                                            Error **errp)
>   {
> @@ -430,6 +726,12 @@ static bool vfio_precopy_supported(VFIODevice *vbasedev)
>       return migration->mig_flags & VFIO_MIGRATION_PRE_COPY;
>   }
>
> +static bool vfio_multifd_transfer_supported(void)
> +{
> +    return migration_has_device_state_support() &&
> +        migrate_send_switchover_start();
> +}
> +
>   /* ---------------------------------------------------------------------- */
>
>   static int vfio_save_prepare(void *opaque, Error **errp)
> @@ -695,17 +997,73 @@ static int vfio_load_setup(QEMUFile *f, void *opaque, Error **errp)
>
>       assert(!migration->load_setup);
>
> +    /*
> +     * Make a copy of this setting at the start in case it is changed
> +     * mid-migration.
> +     */
> +    if (vbasedev->migration_multifd_transfer == ON_OFF_AUTO_AUTO) {
> +        migration->multifd_transfer = vfio_multifd_transfer_supported();
> +    } else {
> +        migration->multifd_transfer =
> +            vbasedev->migration_multifd_transfer == ON_OFF_AUTO_ON;
> +    }
> +
> +    if (migration->multifd_transfer && !vfio_multifd_transfer_supported()) {
> +        error_setg(errp,
> +                   "%s: Multifd device transfer requested but unsupported in the current config",
> +                   vbasedev->name);
> +        return -EINVAL;
> +    }
> +
>       ret = vfio_migration_set_state(vbasedev, VFIO_DEVICE_STATE_RESUMING,
>                                      migration->device_state, errp);
>       if (ret) {
>           return ret;
>       }
>
> +    if (migration->multifd_transfer) {
> +        assert(!migration->load_bufs.array);
> +        vfio_state_buffers_init(&migration->load_bufs);
> +
> +        qemu_mutex_init(&migration->load_bufs_mutex);
> +
> +        migration->load_buf_idx = 0;
> +        migration->load_buf_idx_last = UINT32_MAX;
> +        migration->load_buf_queued_pending_buffers = 0;
> +        qemu_cond_init(&migration->load_bufs_buffer_ready_cond);
> +
> +        migration->load_bufs_thread_running = false;
> +        migration->load_bufs_thread_want_exit = false;
> +        qemu_cond_init(&migration->load_bufs_thread_finished_cond);
> +    }
> +
>       migration->load_setup = true;
>
>       return 0;
>   }
>
> +static void vfio_load_cleanup_load_bufs_thread(VFIODevice *vbasedev)
> +{
> +    VFIOMigration *migration = vbasedev->migration;
> +
> +    /* The lock order is load_bufs_mutex -> BQL so unlock BQL here first */
> +    bql_unlock();
> +    WITH_QEMU_LOCK_GUARD(&migration->load_bufs_mutex) {
> +        if (!migration->load_bufs_thread_running) {
> +            break;
> +        }
> +
> +        migration->load_bufs_thread_want_exit = true;
> +
> +        qemu_cond_signal(&migration->load_bufs_buffer_ready_cond);
> +        qemu_cond_wait(&migration->load_bufs_thread_finished_cond,
> +                       &migration->load_bufs_mutex);
> +
> +        assert(!migration->load_bufs_thread_running);
> +    }
> +    bql_lock();
> +}
> +
>   static int vfio_load_cleanup(void *opaque)
>   {
>       VFIODevice *vbasedev = opaque;
> @@ -715,7 +1073,19 @@ static int vfio_load_cleanup(void *opaque)
>           return 0;
>       }
>
> +    if (migration->multifd_transfer) {
> +        vfio_load_cleanup_load_bufs_thread(vbasedev);
> +    }
> +
>       vfio_migration_cleanup(vbasedev);
> +
> +    if (migration->multifd_transfer) {
> +        qemu_cond_destroy(&migration->load_bufs_thread_finished_cond);
> +        vfio_state_buffers_destroy(&migration->load_bufs);
> +        qemu_cond_destroy(&migration->load_bufs_buffer_ready_cond);
> +        qemu_mutex_destroy(&migration->load_bufs_mutex);
> +    }
> +
>       migration->load_setup = false;
>       trace_vfio_load_cleanup(vbasedev->name);
>
> @@ -725,6 +1095,7 @@ static int vfio_load_cleanup(void *opaque)
>   static int vfio_load_state(QEMUFile *f, void *opaque, int version_id)
>   {
>       VFIODevice *vbasedev = opaque;
> +    VFIOMigration *migration = vbasedev->migration;
>       int ret = 0;
>       uint64_t data;
>
> @@ -736,6 +1107,12 @@ static int vfio_load_state(QEMUFile *f, void *opaque, int version_id)
>           switch (data) {
>           case VFIO_MIG_FLAG_DEV_CONFIG_STATE:
>           {
> +            if (migration->multifd_transfer) {
> +                error_report("%s: got DEV_CONFIG_STATE but doing multifd transfer",
> +                             vbasedev->name);
> +                return -EINVAL;
> +            }
> +
>               return vfio_load_device_config_state(f, opaque);
>           }
>           case VFIO_MIG_FLAG_DEV_SETUP_STATE:
> @@ -801,6 +1178,29 @@ static bool vfio_switchover_ack_needed(void *opaque)
>       return vfio_precopy_supported(vbasedev);
>   }
>
> +static int vfio_switchover_start(void *opaque)
> +{
> +    VFIODevice *vbasedev = opaque;
> +    VFIOMigration *migration = vbasedev->migration;
> +
> +    if (!migration->multifd_transfer) {
> +        /* Load thread is only used for multifd transfer */
> +        return 0;
> +    }
> +
> +    /* The lock order is load_bufs_mutex -> BQL so unlock BQL here first */
> +    bql_unlock();
> +    WITH_QEMU_LOCK_GUARD(&migration->load_bufs_mutex) {
> +        assert(!migration->load_bufs_thread_running);
> +        migration->load_bufs_thread_running = true;
> +    }
> +    bql_lock();
> +
> +    qemu_loadvm_start_load_thread(vfio_load_bufs_thread, vbasedev);
> +
> +    return 0;
> +}
> +
>   static const SaveVMHandlers savevm_vfio_handlers = {
>       .save_prepare = vfio_save_prepare,
>       .save_setup = vfio_save_setup,
> @@ -814,7 +1214,9 @@ static const SaveVMHandlers savevm_vfio_handlers = {
>       .load_setup = vfio_load_setup,
>       .load_cleanup = vfio_load_cleanup,
>       .load_state = vfio_load_state,
> +    .load_state_buffer = vfio_load_state_buffer,
>       .switchover_ack_needed = vfio_switchover_ack_needed,
> +    .switchover_start = vfio_switchover_start,
>   };
>
>   /* ---------------------------------------------------------------------- */
> diff --git a/hw/vfio/pci.c b/hw/vfio/pci.c
> index 9d547cb5cdff..72d62ada8a39 100644
> --- a/hw/vfio/pci.c
> +++ b/hw/vfio/pci.c
> @@ -3384,6 +3384,8 @@ static Property vfio_pci_dev_properties[] = {
>                   vbasedev.migration_multifd_transfer,
>                   qdev_prop_on_off_auto_mutable, OnOffAuto,
>                   .set_default = true, .defval.i = ON_OFF_AUTO_AUTO),
> +    DEFINE_PROP_UINT64("x-migration-max-queued-buffers", VFIOPCIDevice,
> +                       vbasedev.migration_max_queued_buffers, UINT64_MAX),
>       DEFINE_PROP_BOOL("migration-events", VFIOPCIDevice,
>                        vbasedev.migration_events, false),
>       DEFINE_PROP_BOOL("x-no-mmap", VFIOPCIDevice, vbasedev.no_mmap, false),
> diff --git a/hw/vfio/trace-events b/hw/vfio/trace-events
> index 1bebe9877d88..418b378ebd29 100644
> --- a/hw/vfio/trace-events
> +++ b/hw/vfio/trace-events
> @@ -153,6 +153,12 @@ vfio_load_device_config_state_start(const char *name) " (%s)"
>   vfio_load_device_config_state_end(const char *name) " (%s)"
>   vfio_load_state(const char *name, uint64_t data) " (%s) data 0x%"PRIx64
>   vfio_load_state_device_data(const char *name, uint64_t data_size, int ret) " (%s) size %"PRIu64" ret %d"
> +vfio_load_state_device_buffer_incoming(const char *name, uint32_t idx) " (%s) idx %"PRIu32
> +vfio_load_state_device_buffer_start(const char *name) " (%s)"
> +vfio_load_state_device_buffer_starved(const char *name, uint32_t idx) " (%s) idx %"PRIu32
> +vfio_load_state_device_buffer_load_start(const char *name, uint32_t idx) " (%s) idx %"PRIu32
> +vfio_load_state_device_buffer_load_end(const char *name, uint32_t idx) " (%s) idx %"PRIu32
> +vfio_load_state_device_buffer_end(const char *name) " (%s)"
>   vfio_migration_realize(const char *name) " (%s)"
>   vfio_migration_set_device_state(const char *name, const char *state) " (%s) state %s"
>   vfio_migration_set_state(const char *name, const char *new_state, const char *recover_state) " (%s) new state %s, recover state %s"
> diff --git a/include/hw/vfio/vfio-common.h b/include/hw/vfio/vfio-common.h
> index b1c03a82eec8..0954d6981a22 100644
> --- a/include/hw/vfio/vfio-common.h
> +++ b/include/hw/vfio/vfio-common.h
> @@ -61,6 +61,11 @@ typedef struct VFIORegion {
>       uint8_t nr; /* cache the region number for debug */
>   } VFIORegion;
>
> +/* type safety */
> +typedef struct VFIOStateBuffers {
> +    GArray *array;
> +} VFIOStateBuffers;
> +
>   typedef struct VFIOMigration {
>       struct VFIODevice *vbasedev;
>       VMChangeStateEntry *vm_state;
> @@ -73,10 +78,23 @@ typedef struct VFIOMigration {
>       uint64_t mig_flags;
>       uint64_t precopy_init_size;
>       uint64_t precopy_dirty_size;
> +    bool multifd_transfer;
>       bool initial_data_sent;
>
>       bool event_save_iterate_started;
>       bool event_precopy_empty_hit;
> +
> +    QemuThread load_bufs_thread;
> +    bool load_bufs_thread_running;
> +    bool load_bufs_thread_want_exit;
> +
> +    VFIOStateBuffers load_bufs;
> +    QemuCond load_bufs_buffer_ready_cond;
> +    QemuCond load_bufs_thread_finished_cond;
> +    QemuMutex load_bufs_mutex; /* Lock order: this lock -> BQL */
> +    uint32_t load_buf_idx;
> +    uint32_t load_buf_idx_last;
> +    uint32_t load_buf_queued_pending_buffers;
>   } VFIOMigration;
>
>   struct VFIOGroup;
> @@ -136,6 +154,7 @@ typedef struct VFIODevice {
>       OnOffAuto enable_migration;
>       OnOffAuto migration_multifd_transfer;
>       bool migration_events;
> +    uint64_t migration_max_queued_buffers;
>       VFIODeviceOps *ops;
>       unsigned int num_irqs;
>       unsigned int num_regions;
Re: [PATCH v3 22/24] vfio/migration: Multifd device state transfer support - receive side
Posted by Maciej S. Szmigiero 1 year, 2 months ago
Hi Avihai,

On 9.12.2024 10:13, Avihai Horon wrote:
> Hi Maciej,
> 
> On 17/11/2024 21:20, Maciej S. Szmigiero wrote:
>> External email: Use caution opening links or attachments
>>
>>
>> From: "Maciej S. Szmigiero" <maciej.szmigiero@oracle.com>
>>
>> The multifd received data needs to be reassembled since device state
>> packets sent via different multifd channels can arrive out-of-order.
>>
>> Therefore, each VFIO device state packet carries a header indicating its
>> position in the stream.
>>
>> The last such VFIO device state packet should have
>> VFIO_DEVICE_STATE_CONFIG_STATE flag set and carry the device config state.
>>
>> Since it's important to finish loading device state transferred via the
>> main migration channel (via save_live_iterate SaveVMHandler) before
>> starting loading the data asynchronously transferred via multifd the thread
>> doing the actual loading of the multifd transferred data is only started
>> from switchover_start SaveVMHandler.
>>
>> switchover_start handler is called when MIG_CMD_SWITCHOVER_START
>> sub-command of QEMU_VM_COMMAND is received via the main migration channel.
>>
>> This sub-command is only sent after all save_live_iterate data have already
>> been posted so it is safe to commence loading of the multifd-transferred
>> device state upon receiving it - loading of save_live_iterate data happens
>> synchronously in the main migration thread (much like the processing of
>> MIG_CMD_SWITCHOVER_START) so by the time MIG_CMD_SWITCHOVER_START is
>> processed all the proceeding data must have already been loaded.
>>
>> Signed-off-by: Maciej S. Szmigiero <maciej.szmigiero@oracle.com>
>> ---
>>   hw/vfio/migration.c           | 402 ++++++++++++++++++++++++++++++++++
>>   hw/vfio/pci.c                 |   2 +
>>   hw/vfio/trace-events          |   6 +
>>   include/hw/vfio/vfio-common.h |  19 ++
>>   4 files changed, 429 insertions(+)
>>
>> diff --git a/hw/vfio/migration.c b/hw/vfio/migration.c
>> index 683f2ae98d5e..b54879fe6209 100644
>> --- a/hw/vfio/migration.c
>> +++ b/hw/vfio/migration.c
>> @@ -15,6 +15,7 @@
>>   #include <linux/vfio.h>
>>   #include <sys/ioctl.h>
>>
>> +#include "io/channel-buffer.h"
>>   #include "sysemu/runstate.h"
>>   #include "hw/vfio/vfio-common.h"
>>   #include "migration/misc.h"
>> @@ -55,6 +56,15 @@
>>    */
>>   #define VFIO_MIG_DEFAULT_DATA_BUFFER_SIZE (1 * MiB)
>>
>> +#define VFIO_DEVICE_STATE_CONFIG_STATE (1)
>> +
>> +typedef struct VFIODeviceStatePacket {
>> +    uint32_t version;
>> +    uint32_t idx;
>> +    uint32_t flags;
>> +    uint8_t data[0];
>> +} QEMU_PACKED VFIODeviceStatePacket;
>> +
>>   static int64_t bytes_transferred;
>>
>>   static const char *mig_state_to_str(enum vfio_device_mig_state state)
>> @@ -254,6 +264,292 @@ static int vfio_load_buffer(QEMUFile *f, VFIODevice *vbasedev,
>>       return ret;
>>   }
>>
>> +typedef struct VFIOStateBuffer {
>> +    bool is_present;
>> +    char *data;
>> +    size_t len;
>> +} VFIOStateBuffer;
>> +
>> +static void vfio_state_buffer_clear(gpointer data)
>> +{
>> +    VFIOStateBuffer *lb = data;
>> +
>> +    if (!lb->is_present) {
>> +        return;
>> +    }
>> +
>> +    g_clear_pointer(&lb->data, g_free);
>> +    lb->is_present = false;
>> +}
>> +
>> +static void vfio_state_buffers_init(VFIOStateBuffers *bufs)
>> +{
>> +    bufs->array = g_array_new(FALSE, TRUE, sizeof(VFIOStateBuffer));
>> +    g_array_set_clear_func(bufs->array, vfio_state_buffer_clear);
>> +}
>> +
>> +static void vfio_state_buffers_destroy(VFIOStateBuffers *bufs)
>> +{
>> +    g_clear_pointer(&bufs->array, g_array_unref);
>> +}
>> +
>> +static void vfio_state_buffers_assert_init(VFIOStateBuffers *bufs)
>> +{
>> +    assert(bufs->array);
>> +}
>> +
>> +static guint vfio_state_buffers_size_get(VFIOStateBuffers *bufs)
>> +{
>> +    return bufs->array->len;
>> +}
>> +
>> +static void vfio_state_buffers_size_set(VFIOStateBuffers *bufs, guint size)
>> +{
>> +    g_array_set_size(bufs->array, size);
>> +}
> 
> The above three functions seem a bit too specific.

You asked to have "full API for this [VFIOStateBuffers - MSS],
that wraps the g_array_* calls and holds the extra members"
during the review of the previous version of this patch set so here it is.

> 
> How about:
> Instead of size_set and assert_init, introduce a vfio_state_buffers_insert() function that handles buffer insertion to the array from the validated packet.
> 
> Instead of size_get, introduce vfio_state_buffers_get() that handles the array length and is_present checks.
> We can also add a vfio_state_buffer_write() function that handles writing the buffer to the device.
> 
> IMHO this will also make vfio_load_state_buffer() and vfio_load_bufs_thread(), which are rather long, clearer.

I think it would be even nicer to keep vfio_state_buffer_*() methods as thin wrappers
(low level API) and introduce intermediate API doing more or less what you have
described above to simplify vfio_load_bufs_thread() (and possibly vfio_load_state_buffer() too).

>> +
>> +static VFIOStateBuffer *vfio_state_buffers_at(VFIOStateBuffers *bufs, guint idx)
>> +{
>> +    return &g_array_index(bufs->array, VFIOStateBuffer, idx);
>> +}
>> +
>> +static int vfio_load_state_buffer(void *opaque, char *data, size_t data_size,
>> +                                  Error **errp)
>> +{
>> +    VFIODevice *vbasedev = opaque;
>> +    VFIOMigration *migration = vbasedev->migration;
>> +    VFIODeviceStatePacket *packet = (VFIODeviceStatePacket *)data;
>> +    VFIOStateBuffer *lb;
>> +
>> +    /*
>> +     * Holding BQL here would violate the lock order and can cause
>> +     * a deadlock once we attempt to lock load_bufs_mutex below.
>> +     */
>> +    assert(!bql_locked());
>> +
>> +    if (!migration->multifd_transfer) {
>> +        error_setg(errp,
>> +                   "got device state packet but not doing multifd transfer");
>> +        return -1;
>> +    }
>> +
>> +    if (data_size < sizeof(*packet)) {
>> +        error_setg(errp, "packet too short at %zu (min is %zu)",
>> +                   data_size, sizeof(*packet));
>> +        return -1;
>> +    }
>> +
>> +    if (packet->version != 0) {
>> +        error_setg(errp, "packet has unknown version %" PRIu32,
>> +                   packet->version);
>> +        return -1;
>> +    }
>> +
>> +    if (packet->idx == UINT32_MAX) {
>> +        error_setg(errp, "packet has too high idx %" PRIu32,
>> +                   packet->idx);
>> +        return -1;
>> +    }
>> +
>> +    trace_vfio_load_state_device_buffer_incoming(vbasedev->name, packet->idx);
>> +
>> +    QEMU_LOCK_GUARD(&migration->load_bufs_mutex);
>> +
>> +    /* config state packet should be the last one in the stream */
>> +    if (packet->flags & VFIO_DEVICE_STATE_CONFIG_STATE) {
>> +        migration->load_buf_idx_last = packet->idx;
>> +    }
>> +
>> +    vfio_state_buffers_assert_init(&migration->load_bufs);
>> +    if (packet->idx >= vfio_state_buffers_size_get(&migration->load_bufs)) {
>> +        vfio_state_buffers_size_set(&migration->load_bufs, packet->idx + 1);
>> +    }
>> +
>> +    lb = vfio_state_buffers_at(&migration->load_bufs, packet->idx);
>> +    if (lb->is_present) {
>> +        error_setg(errp, "state buffer %" PRIu32 " already filled",
>> +                   packet->idx);
>> +        return -1;
>> +    }
>> +
>> +    assert(packet->idx >= migration->load_buf_idx);
>> +
>> +    migration->load_buf_queued_pending_buffers++;
>> +    if (migration->load_buf_queued_pending_buffers >
>> +        vbasedev->migration_max_queued_buffers) {
>> +        error_setg(errp,
>> +                   "queuing state buffer %" PRIu32 " would exceed the max of %" PRIu64,
>> +                   packet->idx, vbasedev->migration_max_queued_buffers);
>> +        return -1;
>> +    }
> 
> Copying my question from v2:
> 
> Should we count bytes instead of buffers? Current buffer size is 1MB but this could change, and the normal user should not care or know what is the buffer size.
> So maybe rename to migration_max_pending_bytes or such?
> 
> And Maciej replied:
> 
> Since it's Peter that asked for this limit to be introduced in the first place
> I would like to ask him what his preference here.
> @Peter: max queued buffers or bytes?
> 
> So Peter, what's your opinion here?
> 
>> +
>> +    lb->data = g_memdup2(&packet->data, data_size - sizeof(*packet));
>> +    lb->len = data_size - sizeof(*packet);
>> +    lb->is_present = true;
>> +
>> +    qemu_cond_signal(&migration->load_bufs_buffer_ready_cond);
>> +
>> +    return 0;
>> +}
>> +
>> +static int vfio_load_device_config_state(QEMUFile *f, void *opaque);
>> +
>> +static int vfio_load_bufs_thread_load_config(VFIODevice *vbasedev)
>> +{
>> +    VFIOMigration *migration = vbasedev->migration;
>> +    VFIOStateBuffer *lb;
>> +    g_autoptr(QIOChannelBuffer) bioc = NULL;
>> +    QEMUFile *f_out = NULL, *f_in = NULL;
>> +    uint64_t mig_header;
>> +    int ret;
>> +
>> +    assert(migration->load_buf_idx == migration->load_buf_idx_last);
>> +    lb = vfio_state_buffers_at(&migration->load_bufs, migration->load_buf_idx);
>> +    assert(lb->is_present);
>> +
>> +    bioc = qio_channel_buffer_new(lb->len);
>> +    qio_channel_set_name(QIO_CHANNEL(bioc), "vfio-device-config-load");
>> +
>> +    f_out = qemu_file_new_output(QIO_CHANNEL(bioc));
>> +    qemu_put_buffer(f_out, (uint8_t *)lb->data, lb->len);
>> +
>> +    ret = qemu_fflush(f_out);
>> +    if (ret) {
>> +        g_clear_pointer(&f_out, qemu_fclose);
>> +        return ret;
>> +    }
>> +
>> +    qio_channel_io_seek(QIO_CHANNEL(bioc), 0, 0, NULL);
>> +    f_in = qemu_file_new_input(QIO_CHANNEL(bioc));
>> +
>> +    mig_header = qemu_get_be64(f_in);
>> +    if (mig_header != VFIO_MIG_FLAG_DEV_CONFIG_STATE) {
>> +        g_clear_pointer(&f_out, qemu_fclose);
>> +        g_clear_pointer(&f_in, qemu_fclose);
>> +        return -EINVAL;
>> +    }
>> +
>> +    bql_lock();
>> +    ret = vfio_load_device_config_state(f_in, vbasedev);
>> +    bql_unlock();
>> +
>> +    g_clear_pointer(&f_out, qemu_fclose);
>> +    g_clear_pointer(&f_in, qemu_fclose);
>> +    if (ret < 0) {
>> +        return ret;
>> +    }
>> +
>> +    return 0;
>> +}
>> +
>> +static bool vfio_load_bufs_thread_want_abort(VFIODevice *vbasedev,
>> +                                             bool *abort_flag)
>> +{
>> +    VFIOMigration *migration = vbasedev->migration;
>> +
>> +    return migration->load_bufs_thread_want_exit || qatomic_read(abort_flag);
>> +}
>> +
>> +static int vfio_load_bufs_thread(bool *abort_flag, void *opaque)
>> +{
>> +    VFIODevice *vbasedev = opaque;
>> +    VFIOMigration *migration = vbasedev->migration;
>> +    QEMU_LOCK_GUARD(&migration->load_bufs_mutex);
> 
> Move QEMU_LOCK_GUARD() below the local var declaration?
> I usually don't expect to see mutex lockings as part of local var declaration block, which makes it easy to miss when reading the code.
> (Although QEMU_LOCK_GUARD declares a local variable under the hood, it's implicit and not visible to the user).

I guess you mean moving it..

>> +    int ret;

^ ..here.

Will do.

>> +    assert(migration->load_bufs_thread_running);
>> +
>> +    while (!vfio_load_bufs_thread_want_abort(vbasedev, abort_flag)) {
>> +        VFIOStateBuffer *lb;
>> +        guint bufs_len;
>> +        bool starved;
>> +
>> +        assert(migration->load_buf_idx <= migration->load_buf_idx_last);
>> +
>> +        bufs_len = vfio_state_buffers_size_get(&migration->load_bufs);
>> +        if (migration->load_buf_idx >= bufs_len) {
>> +            assert(migration->load_buf_idx == bufs_len);
>> +            starved = true;
>> +        } else {
>> +            lb = vfio_state_buffers_at(&migration->load_bufs,
>> +                                       migration->load_buf_idx);
>> +            starved = !lb->is_present;
>> +        }
>> +
>> +        if (starved) {
>> +            trace_vfio_load_state_device_buffer_starved(vbasedev->name,
>> +                                                        migration->load_buf_idx);
>> +            qemu_cond_wait(&migration->load_bufs_buffer_ready_cond,
>> +                           &migration->load_bufs_mutex);
>> +            continue;
>> +        }
>> +
>> +        if (migration->load_buf_idx == migration->load_buf_idx_last) {
>> +            break;
>> +        }
>> +
>> +        if (migration->load_buf_idx == 0) {
>> +            trace_vfio_load_state_device_buffer_start(vbasedev->name);
>> +        }
>> +
>> +        if (lb->len) {
>> +            g_autofree char *buf = NULL;
>> +            size_t buf_len;
>> +            ssize_t wr_ret;
>> +            int errno_save;
>> +
>> +            trace_vfio_load_state_device_buffer_load_start(vbasedev->name,
>> +                                                           migration->load_buf_idx);
>> +
>> +            /* lb might become re-allocated when we drop the lock */
>> +            buf = g_steal_pointer(&lb->data);
>> +            buf_len = lb->len;
>> +
>> +            /*
>> +             * Loading data to the device takes a while,
>> +             * drop the lock during this process.
>> +             */
>> +            qemu_mutex_unlock(&migration->load_bufs_mutex);
>> +            wr_ret = write(migration->data_fd, buf, buf_len);
>> +            errno_save = errno;
>> +            qemu_mutex_lock(&migration->load_bufs_mutex);
>> +
>> +            if (wr_ret < 0) {
>> +                ret = -errno_save;
>> +                goto ret_signal;
>> +            } else if (wr_ret < buf_len) {
>> +                ret = -EINVAL;
>> +                goto ret_signal;
>> +            }
> 
> Should we loop the write until reaching buf_len bytes?
> Partial write is not considered error according to write(2) manpage.

Yes, it's probably better to allow partial writes in case
some VFIO kernel driver actually makes use of them.

> 
> Thanks.

Thanks,
Maciej


Re: [PATCH v3 22/24] vfio/migration: Multifd device state transfer support - receive side
Posted by Avihai Horon 1 year, 1 month ago
On 11/12/2024 1:06, Maciej S. Szmigiero wrote:
> External email: Use caution opening links or attachments
>
>
> Hi Avihai,
>
> On 9.12.2024 10:13, Avihai Horon wrote:
>> Hi Maciej,
>>
>> On 17/11/2024 21:20, Maciej S. Szmigiero wrote:
>>> External email: Use caution opening links or attachments
>>>
>>>
>>> From: "Maciej S. Szmigiero" <maciej.szmigiero@oracle.com>
>>>
>>> The multifd received data needs to be reassembled since device state
>>> packets sent via different multifd channels can arrive out-of-order.
>>>
>>> Therefore, each VFIO device state packet carries a header indicating 
>>> its
>>> position in the stream.
>>>
>>> The last such VFIO device state packet should have
>>> VFIO_DEVICE_STATE_CONFIG_STATE flag set and carry the device config 
>>> state.
>>>
>>> Since it's important to finish loading device state transferred via the
>>> main migration channel (via save_live_iterate SaveVMHandler) before
>>> starting loading the data asynchronously transferred via multifd the 
>>> thread
>>> doing the actual loading of the multifd transferred data is only 
>>> started
>>> from switchover_start SaveVMHandler.
>>>
>>> switchover_start handler is called when MIG_CMD_SWITCHOVER_START
>>> sub-command of QEMU_VM_COMMAND is received via the main migration 
>>> channel.
>>>
>>> This sub-command is only sent after all save_live_iterate data have 
>>> already
>>> been posted so it is safe to commence loading of the 
>>> multifd-transferred
>>> device state upon receiving it - loading of save_live_iterate data 
>>> happens
>>> synchronously in the main migration thread (much like the processing of
>>> MIG_CMD_SWITCHOVER_START) so by the time MIG_CMD_SWITCHOVER_START is
>>> processed all the proceeding data must have already been loaded.
>>>
>>> Signed-off-by: Maciej S. Szmigiero <maciej.szmigiero@oracle.com>
>>> ---
>>>   hw/vfio/migration.c           | 402 
>>> ++++++++++++++++++++++++++++++++++
>>>   hw/vfio/pci.c                 |   2 +
>>>   hw/vfio/trace-events          |   6 +
>>>   include/hw/vfio/vfio-common.h |  19 ++
>>>   4 files changed, 429 insertions(+)
>>>
>>> diff --git a/hw/vfio/migration.c b/hw/vfio/migration.c
>>> index 683f2ae98d5e..b54879fe6209 100644
>>> --- a/hw/vfio/migration.c
>>> +++ b/hw/vfio/migration.c
>>> @@ -15,6 +15,7 @@
>>>   #include <linux/vfio.h>
>>>   #include <sys/ioctl.h>
>>>
>>> +#include "io/channel-buffer.h"
>>>   #include "sysemu/runstate.h"
>>>   #include "hw/vfio/vfio-common.h"
>>>   #include "migration/misc.h"
>>> @@ -55,6 +56,15 @@
>>>    */
>>>   #define VFIO_MIG_DEFAULT_DATA_BUFFER_SIZE (1 * MiB)
>>>
>>> +#define VFIO_DEVICE_STATE_CONFIG_STATE (1)
>>> +
>>> +typedef struct VFIODeviceStatePacket {
>>> +    uint32_t version;
>>> +    uint32_t idx;
>>> +    uint32_t flags;
>>> +    uint8_t data[0];
>>> +} QEMU_PACKED VFIODeviceStatePacket;
>>> +
>>>   static int64_t bytes_transferred;
>>>
>>>   static const char *mig_state_to_str(enum vfio_device_mig_state state)
>>> @@ -254,6 +264,292 @@ static int vfio_load_buffer(QEMUFile *f, 
>>> VFIODevice *vbasedev,
>>>       return ret;
>>>   }
>>>
>>> +typedef struct VFIOStateBuffer {
>>> +    bool is_present;
>>> +    char *data;
>>> +    size_t len;
>>> +} VFIOStateBuffer;
>>> +
>>> +static void vfio_state_buffer_clear(gpointer data)
>>> +{
>>> +    VFIOStateBuffer *lb = data;
>>> +
>>> +    if (!lb->is_present) {
>>> +        return;
>>> +    }
>>> +
>>> +    g_clear_pointer(&lb->data, g_free);
>>> +    lb->is_present = false;
>>> +}
>>> +
>>> +static void vfio_state_buffers_init(VFIOStateBuffers *bufs)
>>> +{
>>> +    bufs->array = g_array_new(FALSE, TRUE, sizeof(VFIOStateBuffer));
>>> +    g_array_set_clear_func(bufs->array, vfio_state_buffer_clear);
>>> +}
>>> +
>>> +static void vfio_state_buffers_destroy(VFIOStateBuffers *bufs)
>>> +{
>>> +    g_clear_pointer(&bufs->array, g_array_unref);
>>> +}
>>> +
>>> +static void vfio_state_buffers_assert_init(VFIOStateBuffers *bufs)
>>> +{
>>> +    assert(bufs->array);
>>> +}
>>> +
>>> +static guint vfio_state_buffers_size_get(VFIOStateBuffers *bufs)
>>> +{
>>> +    return bufs->array->len;
>>> +}
>>> +
>>> +static void vfio_state_buffers_size_set(VFIOStateBuffers *bufs, 
>>> guint size)
>>> +{
>>> +    g_array_set_size(bufs->array, size);
>>> +}
>>
>> The above three functions seem a bit too specific.
>
> You asked to have "full API for this [VFIOStateBuffers - MSS],
> that wraps the g_array_* calls and holds the extra members"
> during the review of the previous version of this patch set so here it 
> is.
>
>>
>> How about:
>> Instead of size_set and assert_init, introduce a 
>> vfio_state_buffers_insert() function that handles buffer insertion to 
>> the array from the validated packet.
>>
>> Instead of size_get, introduce vfio_state_buffers_get() that handles 
>> the array length and is_present checks.
>> We can also add a vfio_state_buffer_write() function that handles 
>> writing the buffer to the device.
>>
>> IMHO this will also make vfio_load_state_buffer() and 
>> vfio_load_bufs_thread(), which are rather long, clearer.
>
> I think it would be even nicer to keep vfio_state_buffer_*() methods 
> as thin wrappers
> (low level API) and introduce intermediate API doing more or less what 
> you have
> described above to simplify vfio_load_bufs_thread() (and possibly 
> vfio_load_state_buffer() too).

Yes, enriching the APIs sounds good.

Thanks.

>
>>> +
>>> +static VFIOStateBuffer *vfio_state_buffers_at(VFIOStateBuffers 
>>> *bufs, guint idx)
>>> +{
>>> +    return &g_array_index(bufs->array, VFIOStateBuffer, idx);
>>> +}
>>> +
>>> +static int vfio_load_state_buffer(void *opaque, char *data, size_t 
>>> data_size,
>>> +                                  Error **errp)
>>> +{
>>> +    VFIODevice *vbasedev = opaque;
>>> +    VFIOMigration *migration = vbasedev->migration;
>>> +    VFIODeviceStatePacket *packet = (VFIODeviceStatePacket *)data;
>>> +    VFIOStateBuffer *lb;
>>> +
>>> +    /*
>>> +     * Holding BQL here would violate the lock order and can cause
>>> +     * a deadlock once we attempt to lock load_bufs_mutex below.
>>> +     */
>>> +    assert(!bql_locked());
>>> +
>>> +    if (!migration->multifd_transfer) {
>>> +        error_setg(errp,
>>> +                   "got device state packet but not doing multifd 
>>> transfer");
>>> +        return -1;
>>> +    }
>>> +
>>> +    if (data_size < sizeof(*packet)) {
>>> +        error_setg(errp, "packet too short at %zu (min is %zu)",
>>> +                   data_size, sizeof(*packet));
>>> +        return -1;
>>> +    }
>>> +
>>> +    if (packet->version != 0) {
>>> +        error_setg(errp, "packet has unknown version %" PRIu32,
>>> +                   packet->version);
>>> +        return -1;
>>> +    }
>>> +
>>> +    if (packet->idx == UINT32_MAX) {
>>> +        error_setg(errp, "packet has too high idx %" PRIu32,
>>> +                   packet->idx);
>>> +        return -1;
>>> +    }
>>> +
>>> + trace_vfio_load_state_device_buffer_incoming(vbasedev->name, 
>>> packet->idx);
>>> +
>>> +    QEMU_LOCK_GUARD(&migration->load_bufs_mutex);
>>> +
>>> +    /* config state packet should be the last one in the stream */
>>> +    if (packet->flags & VFIO_DEVICE_STATE_CONFIG_STATE) {
>>> +        migration->load_buf_idx_last = packet->idx;
>>> +    }
>>> +
>>> + vfio_state_buffers_assert_init(&migration->load_bufs);
>>> +    if (packet->idx >= 
>>> vfio_state_buffers_size_get(&migration->load_bufs)) {
>>> + vfio_state_buffers_size_set(&migration->load_bufs, packet->idx + 1);
>>> +    }
>>> +
>>> +    lb = vfio_state_buffers_at(&migration->load_bufs, packet->idx);
>>> +    if (lb->is_present) {
>>> +        error_setg(errp, "state buffer %" PRIu32 " already filled",
>>> +                   packet->idx);
>>> +        return -1;
>>> +    }
>>> +
>>> +    assert(packet->idx >= migration->load_buf_idx);
>>> +
>>> +    migration->load_buf_queued_pending_buffers++;
>>> +    if (migration->load_buf_queued_pending_buffers >
>>> +        vbasedev->migration_max_queued_buffers) {
>>> +        error_setg(errp,
>>> +                   "queuing state buffer %" PRIu32 " would exceed 
>>> the max of %" PRIu64,
>>> +                   packet->idx, 
>>> vbasedev->migration_max_queued_buffers);
>>> +        return -1;
>>> +    }
>>
>> Copying my question from v2:
>>
>> Should we count bytes instead of buffers? Current buffer size is 1MB 
>> but this could change, and the normal user should not care or know 
>> what is the buffer size.
>> So maybe rename to migration_max_pending_bytes or such?
>>
>> And Maciej replied:
>>
>> Since it's Peter that asked for this limit to be introduced in the 
>> first place
>> I would like to ask him what his preference here.
>> @Peter: max queued buffers or bytes?
>>
>> So Peter, what's your opinion here?
>>
>>> +
>>> +    lb->data = g_memdup2(&packet->data, data_size - sizeof(*packet));
>>> +    lb->len = data_size - sizeof(*packet);
>>> +    lb->is_present = true;
>>> +
>>> + qemu_cond_signal(&migration->load_bufs_buffer_ready_cond);
>>> +
>>> +    return 0;
>>> +}
>>> +
>>> +static int vfio_load_device_config_state(QEMUFile *f, void *opaque);
>>> +
>>> +static int vfio_load_bufs_thread_load_config(VFIODevice *vbasedev)
>>> +{
>>> +    VFIOMigration *migration = vbasedev->migration;
>>> +    VFIOStateBuffer *lb;
>>> +    g_autoptr(QIOChannelBuffer) bioc = NULL;
>>> +    QEMUFile *f_out = NULL, *f_in = NULL;
>>> +    uint64_t mig_header;
>>> +    int ret;
>>> +
>>> +    assert(migration->load_buf_idx == migration->load_buf_idx_last);
>>> +    lb = vfio_state_buffers_at(&migration->load_bufs, 
>>> migration->load_buf_idx);
>>> +    assert(lb->is_present);
>>> +
>>> +    bioc = qio_channel_buffer_new(lb->len);
>>> +    qio_channel_set_name(QIO_CHANNEL(bioc), 
>>> "vfio-device-config-load");
>>> +
>>> +    f_out = qemu_file_new_output(QIO_CHANNEL(bioc));
>>> +    qemu_put_buffer(f_out, (uint8_t *)lb->data, lb->len);
>>> +
>>> +    ret = qemu_fflush(f_out);
>>> +    if (ret) {
>>> +        g_clear_pointer(&f_out, qemu_fclose);
>>> +        return ret;
>>> +    }
>>> +
>>> +    qio_channel_io_seek(QIO_CHANNEL(bioc), 0, 0, NULL);
>>> +    f_in = qemu_file_new_input(QIO_CHANNEL(bioc));
>>> +
>>> +    mig_header = qemu_get_be64(f_in);
>>> +    if (mig_header != VFIO_MIG_FLAG_DEV_CONFIG_STATE) {
>>> +        g_clear_pointer(&f_out, qemu_fclose);
>>> +        g_clear_pointer(&f_in, qemu_fclose);
>>> +        return -EINVAL;
>>> +    }
>>> +
>>> +    bql_lock();
>>> +    ret = vfio_load_device_config_state(f_in, vbasedev);
>>> +    bql_unlock();
>>> +
>>> +    g_clear_pointer(&f_out, qemu_fclose);
>>> +    g_clear_pointer(&f_in, qemu_fclose);
>>> +    if (ret < 0) {
>>> +        return ret;
>>> +    }
>>> +
>>> +    return 0;
>>> +}
>>> +
>>> +static bool vfio_load_bufs_thread_want_abort(VFIODevice *vbasedev,
>>> +                                             bool *abort_flag)
>>> +{
>>> +    VFIOMigration *migration = vbasedev->migration;
>>> +
>>> +    return migration->load_bufs_thread_want_exit || 
>>> qatomic_read(abort_flag);
>>> +}
>>> +
>>> +static int vfio_load_bufs_thread(bool *abort_flag, void *opaque)
>>> +{
>>> +    VFIODevice *vbasedev = opaque;
>>> +    VFIOMigration *migration = vbasedev->migration;
>>> +    QEMU_LOCK_GUARD(&migration->load_bufs_mutex);
>>
>> Move QEMU_LOCK_GUARD() below the local var declaration?
>> I usually don't expect to see mutex lockings as part of local var 
>> declaration block, which makes it easy to miss when reading the code.
>> (Although QEMU_LOCK_GUARD declares a local variable under the hood, 
>> it's implicit and not visible to the user).
>
> I guess you mean moving it..
>
>>> +    int ret;
>
> ^ ..here.
>
> Will do.
>
>>> + assert(migration->load_bufs_thread_running);
>>> +
>>> +    while (!vfio_load_bufs_thread_want_abort(vbasedev, abort_flag)) {
>>> +        VFIOStateBuffer *lb;
>>> +        guint bufs_len;
>>> +        bool starved;
>>> +
>>> +        assert(migration->load_buf_idx <= 
>>> migration->load_buf_idx_last);
>>> +
>>> +        bufs_len = vfio_state_buffers_size_get(&migration->load_bufs);
>>> +        if (migration->load_buf_idx >= bufs_len) {
>>> +            assert(migration->load_buf_idx == bufs_len);
>>> +            starved = true;
>>> +        } else {
>>> +            lb = vfio_state_buffers_at(&migration->load_bufs,
>>> + migration->load_buf_idx);
>>> +            starved = !lb->is_present;
>>> +        }
>>> +
>>> +        if (starved) {
>>> + trace_vfio_load_state_device_buffer_starved(vbasedev->name,
>>> + migration->load_buf_idx);
>>> + qemu_cond_wait(&migration->load_bufs_buffer_ready_cond,
>>> + &migration->load_bufs_mutex);
>>> +            continue;
>>> +        }
>>> +
>>> +        if (migration->load_buf_idx == migration->load_buf_idx_last) {
>>> +            break;
>>> +        }
>>> +
>>> +        if (migration->load_buf_idx == 0) {
>>> + trace_vfio_load_state_device_buffer_start(vbasedev->name);
>>> +        }
>>> +
>>> +        if (lb->len) {
>>> +            g_autofree char *buf = NULL;
>>> +            size_t buf_len;
>>> +            ssize_t wr_ret;
>>> +            int errno_save;
>>> +
>>> + trace_vfio_load_state_device_buffer_load_start(vbasedev->name,
>>> + migration->load_buf_idx);
>>> +
>>> +            /* lb might become re-allocated when we drop the lock */
>>> +            buf = g_steal_pointer(&lb->data);
>>> +            buf_len = lb->len;
>>> +
>>> +            /*
>>> +             * Loading data to the device takes a while,
>>> +             * drop the lock during this process.
>>> +             */
>>> + qemu_mutex_unlock(&migration->load_bufs_mutex);
>>> +            wr_ret = write(migration->data_fd, buf, buf_len);
>>> +            errno_save = errno;
>>> + qemu_mutex_lock(&migration->load_bufs_mutex);
>>> +
>>> +            if (wr_ret < 0) {
>>> +                ret = -errno_save;
>>> +                goto ret_signal;
>>> +            } else if (wr_ret < buf_len) {
>>> +                ret = -EINVAL;
>>> +                goto ret_signal;
>>> +            }
>>
>> Should we loop the write until reaching buf_len bytes?
>> Partial write is not considered error according to write(2) manpage.
>
> Yes, it's probably better to allow partial writes in case
> some VFIO kernel driver actually makes use of them.
>
>>
>> Thanks.
>
> Thanks,
> Maciej
>

Re: [PATCH v3 22/24] vfio/migration: Multifd device state transfer support - receive side
Posted by Cédric Le Goater 1 year, 2 months ago
Hello Maciej,

On 11/17/24 20:20, Maciej S. Szmigiero wrote:
> From: "Maciej S. Szmigiero" <maciej.szmigiero@oracle.com>
> 
> The multifd received data needs to be reassembled since device state
> packets sent via different multifd channels can arrive out-of-order.
> 
> Therefore, each VFIO device state packet carries a header indicating its
> position in the stream.
> 
> The last such VFIO device state packet should have
> VFIO_DEVICE_STATE_CONFIG_STATE flag set and carry the device config state.
> 
> Since it's important to finish loading device state transferred via the
> main migration channel (via save_live_iterate SaveVMHandler) before
> starting loading the data asynchronously transferred via multifd the thread
> doing the actual loading of the multifd transferred data is only started
> from switchover_start SaveVMHandler.
> 
> switchover_start handler is called when MIG_CMD_SWITCHOVER_START
> sub-command of QEMU_VM_COMMAND is received via the main migration channel.
> 
> This sub-command is only sent after all save_live_iterate data have already
> been posted so it is safe to commence loading of the multifd-transferred
> device state upon receiving it - loading of save_live_iterate data happens
> synchronously in the main migration thread (much like the processing of
> MIG_CMD_SWITCHOVER_START) so by the time MIG_CMD_SWITCHOVER_START is
> processed all the proceeding data must have already been loaded.
>
> Signed-off-by: Maciej S. Szmigiero <maciej.szmigiero@oracle.com>
> ---
>   hw/vfio/migration.c           | 402 ++++++++++++++++++++++++++++++++++

This is quite a significant update to introduce all at once. It lacks a
comprehensive overview of the design for those who were not involved in
the earlier discussions adding support for multifd migration of device
state. There are multiple threads and migration streams involved at
load time which deserve some descriptions. I think the best place
would be at the end of :

    https://qemu.readthedocs.io/en/v9.1.0/devel/migration/vfio.html

Could you please break down the patch to progressively introduce the
various elements needed for the receive sequence ? Something like :

   - data structures first
   - init phase
   - run time
   - and clean up phase
   - toggles to enable/disable/tune
   - finaly, documentation update (under vfio migration)

Some more below,

>   hw/vfio/pci.c                 |   2 +
>   hw/vfio/trace-events          |   6 +
>   include/hw/vfio/vfio-common.h |  19 ++
>   4 files changed, 429 insertions(+)
> 
> diff --git a/hw/vfio/migration.c b/hw/vfio/migration.c
> index 683f2ae98d5e..b54879fe6209 100644
> --- a/hw/vfio/migration.c
> +++ b/hw/vfio/migration.c
> @@ -15,6 +15,7 @@
>   #include <linux/vfio.h>
>   #include <sys/ioctl.h>
>   
> +#include "io/channel-buffer.h"
>   #include "sysemu/runstate.h"
>   #include "hw/vfio/vfio-common.h"
>   #include "migration/misc.h"
> @@ -55,6 +56,15 @@
>    */
>   #define VFIO_MIG_DEFAULT_DATA_BUFFER_SIZE (1 * MiB)
>   
> +#define VFIO_DEVICE_STATE_CONFIG_STATE (1)
> +
> +typedef struct VFIODeviceStatePacket {
> +    uint32_t version;
> +    uint32_t idx;
> +    uint32_t flags;
> +    uint8_t data[0];
> +} QEMU_PACKED VFIODeviceStatePacket;
> +
>   static int64_t bytes_transferred;
>   
>   static const char *mig_state_to_str(enum vfio_device_mig_state state)
> @@ -254,6 +264,292 @@ static int vfio_load_buffer(QEMUFile *f, VFIODevice *vbasedev,
>       return ret;
>   }
>   
> +typedef struct VFIOStateBuffer {
> +    bool is_present;
> +    char *data;
> +    size_t len;
> +} VFIOStateBuffer;
> +
> +static void vfio_state_buffer_clear(gpointer data)
> +{
> +    VFIOStateBuffer *lb = data;
> +
> +    if (!lb->is_present) {
> +        return;
> +    }
> +
> +    g_clear_pointer(&lb->data, g_free);
> +    lb->is_present = false;
> +}
> +
> +static void vfio_state_buffers_init(VFIOStateBuffers *bufs)
> +{
> +    bufs->array = g_array_new(FALSE, TRUE, sizeof(VFIOStateBuffer));
> +    g_array_set_clear_func(bufs->array, vfio_state_buffer_clear);
> +}
> +
> +static void vfio_state_buffers_destroy(VFIOStateBuffers *bufs)
> +{
> +    g_clear_pointer(&bufs->array, g_array_unref);
> +}
> +
> +static void vfio_state_buffers_assert_init(VFIOStateBuffers *bufs)
> +{
> +    assert(bufs->array);
> +}
> +
> +static guint vfio_state_buffers_size_get(VFIOStateBuffers *bufs)
> +{
> +    return bufs->array->len;
> +}
> +
> +static void vfio_state_buffers_size_set(VFIOStateBuffers *bufs, guint size)
> +{
> +    g_array_set_size(bufs->array, size);
> +}
> +
> +static VFIOStateBuffer *vfio_state_buffers_at(VFIOStateBuffers *bufs, guint idx)
> +{
> +    return &g_array_index(bufs->array, VFIOStateBuffer, idx);
> +}
> +
> +static int vfio_load_state_buffer(void *opaque, char *data, size_t data_size,
> +                                  Error **errp)
> +{
> +    VFIODevice *vbasedev = opaque;
> +    VFIOMigration *migration = vbasedev->migration;
> +    VFIODeviceStatePacket *packet = (VFIODeviceStatePacket *)data;
> +    VFIOStateBuffer *lb;
> +
> +    /*
> +     * Holding BQL here would violate the lock order and can cause
> +     * a deadlock once we attempt to lock load_bufs_mutex below.
> +     */
> +    assert(!bql_locked());
> +
> +    if (!migration->multifd_transfer) {

Hmm, why is 'multifd_transfer' a migration attribute ? Shouldn't it
be at the device level ? Or should all devices of a VM support multifd
transfer ? That said, I'm a bit unclear about the limitations, if there
are any. Could please you explain a bit more when the migration sequence
is setup for the  device ?



> +        error_setg(errp,
> +                   "got device state packet but not doing multifd transfer");
> +        return -1;
> +    }
> +
> +    if (data_size < sizeof(*packet)) {
> +        error_setg(errp, "packet too short at %zu (min is %zu)",
> +                   data_size, sizeof(*packet));
> +        return -1;
> +    }
> +
> +    if (packet->version != 0) {
> +        error_setg(errp, "packet has unknown version %" PRIu32,
> +                   packet->version);
> +        return -1;
> +    }
> +
> +    if (packet->idx == UINT32_MAX) {
> +        error_setg(errp, "packet has too high idx %" PRIu32,
> +                   packet->idx);
> +        return -1;
> +    }
> +
> +    trace_vfio_load_state_device_buffer_incoming(vbasedev->name, packet->idx);
> +
> +    QEMU_LOCK_GUARD(&migration->load_bufs_mutex);
> +
> +    /* config state packet should be the last one in the stream */
> +    if (packet->flags & VFIO_DEVICE_STATE_CONFIG_STATE) {
> +        migration->load_buf_idx_last = packet->idx;
> +    }
> +
> +    vfio_state_buffers_assert_init(&migration->load_bufs);
> +    if (packet->idx >= vfio_state_buffers_size_get(&migration->load_bufs)) {
> +        vfio_state_buffers_size_set(&migration->load_bufs, packet->idx + 1);
> +    }
> +
> +    lb = vfio_state_buffers_at(&migration->load_bufs, packet->idx);
> +    if (lb->is_present) {
> +        error_setg(errp, "state buffer %" PRIu32 " already filled",
> +                   packet->idx);
> +        return -1;
> +    }
> +
> +    assert(packet->idx >= migration->load_buf_idx);
> +
> +    migration->load_buf_queued_pending_buffers++;
> +    if (migration->load_buf_queued_pending_buffers >
> +        vbasedev->migration_max_queued_buffers) {
> +        error_setg(errp,
> +                   "queuing state buffer %" PRIu32 " would exceed the max of %" PRIu64,
> +                   packet->idx, vbasedev->migration_max_queued_buffers);
> +        return -1;
> +    }
> +
> +    lb->data = g_memdup2(&packet->data, data_size - sizeof(*packet));
> +    lb->len = data_size - sizeof(*packet);
> +    lb->is_present = true;
> +
> +    qemu_cond_signal(&migration->load_bufs_buffer_ready_cond);
> +
> +    return 0;
> +}
> +
> +static int vfio_load_device_config_state(QEMUFile *f, void *opaque);
> +
> +static int vfio_load_bufs_thread_load_config(VFIODevice *vbasedev)
> +{
> +    VFIOMigration *migration = vbasedev->migration;
> +    VFIOStateBuffer *lb;
> +    g_autoptr(QIOChannelBuffer) bioc = NULL;
> +    QEMUFile *f_out = NULL, *f_in = NULL;
> +    uint64_t mig_header;
> +    int ret;
> +
> +    assert(migration->load_buf_idx == migration->load_buf_idx_last);
> +    lb = vfio_state_buffers_at(&migration->load_bufs, migration->load_buf_idx);
> +    assert(lb->is_present);
> +
> +    bioc = qio_channel_buffer_new(lb->len);
> +    qio_channel_set_name(QIO_CHANNEL(bioc), "vfio-device-config-load");
> +
> +    f_out = qemu_file_new_output(QIO_CHANNEL(bioc));
> +    qemu_put_buffer(f_out, (uint8_t *)lb->data, lb->len);
> +
> +    ret = qemu_fflush(f_out);
> +    if (ret) {
> +        g_clear_pointer(&f_out, qemu_fclose);
> +        return ret;
> +    }
> +
> +    qio_channel_io_seek(QIO_CHANNEL(bioc), 0, 0, NULL);
> +    f_in = qemu_file_new_input(QIO_CHANNEL(bioc));
> +
> +    mig_header = qemu_get_be64(f_in);
> +    if (mig_header != VFIO_MIG_FLAG_DEV_CONFIG_STATE) {
> +        g_clear_pointer(&f_out, qemu_fclose);
> +        g_clear_pointer(&f_in, qemu_fclose);
> +        return -EINVAL;
> +    }

All the above code is using the QIOChannel interface which is sort of an
internal API of the migration subsystem. Can we move it under migration ?


> +
> +    bql_lock();
> +    ret = vfio_load_device_config_state(f_in, vbasedev);
> +    bql_unlock();
> +
> +    g_clear_pointer(&f_out, qemu_fclose);
> +    g_clear_pointer(&f_in, qemu_fclose);
> +    if (ret < 0) {
> +        return ret;
> +    }
> +
> +    return 0;
> +}
> +
> +static bool vfio_load_bufs_thread_want_abort(VFIODevice *vbasedev,
> +                                             bool *abort_flag)
> +{
> +    VFIOMigration *migration = vbasedev->migration;
> +
> +    return migration->load_bufs_thread_want_exit || qatomic_read(abort_flag);
> +}
> +
> +static int vfio_load_bufs_thread(bool *abort_flag, void *opaque)
> +{
> +    VFIODevice *vbasedev = opaque;
> +    VFIOMigration *migration = vbasedev->migration;
> +    QEMU_LOCK_GUARD(&migration->load_bufs_mutex);
> +    int ret;
> +
> +    assert(migration->load_bufs_thread_running);
> +
> +    while (!vfio_load_bufs_thread_want_abort(vbasedev, abort_flag)) {
> +        VFIOStateBuffer *lb;
> +        guint bufs_len;
> +        bool starved;
> +
> +        assert(migration->load_buf_idx <= migration->load_buf_idx_last);
> +
> +        bufs_len = vfio_state_buffers_size_get(&migration->load_bufs);
> +        if (migration->load_buf_idx >= bufs_len) {
> +            assert(migration->load_buf_idx == bufs_len);
> +            starved = true;
> +        } else {
> +            lb = vfio_state_buffers_at(&migration->load_bufs,
> +                                       migration->load_buf_idx);
> +            starved = !lb->is_present;
> +        }
> +
> +        if (starved) {
> +            trace_vfio_load_state_device_buffer_starved(vbasedev->name,
> +                                                        migration->load_buf_idx);
> +            qemu_cond_wait(&migration->load_bufs_buffer_ready_cond,
> +                           &migration->load_bufs_mutex);
> +            continue;
> +        }
> +
> +        if (migration->load_buf_idx == migration->load_buf_idx_last) {
> +            break;
> +        }
> +
> +        if (migration->load_buf_idx == 0) {
> +            trace_vfio_load_state_device_buffer_start(vbasedev->name);
> +        }
> +
> +        if (lb->len) {
> +            g_autofree char *buf = NULL;
> +            size_t buf_len;
> +            ssize_t wr_ret;
> +            int errno_save;
> +
> +            trace_vfio_load_state_device_buffer_load_start(vbasedev->name,
> +                                                           migration->load_buf_idx);
> +
> +            /* lb might become re-allocated when we drop the lock */
> +            buf = g_steal_pointer(&lb->data);
> +            buf_len = lb->len;
> +
> +            /*
> +             * Loading data to the device takes a while,
> +             * drop the lock during this process.
> +             */
> +            qemu_mutex_unlock(&migration->load_bufs_mutex);
> +            wr_ret = write(migration->data_fd, buf, buf_len);
> +            errno_save = errno;
> +            qemu_mutex_lock(&migration->load_bufs_mutex);
> +
> +            if (wr_ret < 0) {
> +                ret = -errno_save;
> +                goto ret_signal;
> +            } else if (wr_ret < buf_len) {
> +                ret = -EINVAL;
> +                goto ret_signal;
> +            }
> +
> +            trace_vfio_load_state_device_buffer_load_end(vbasedev->name,
> +                                                         migration->load_buf_idx);
> +        }
> +
> +        assert(migration->load_buf_queued_pending_buffers > 0);
> +        migration->load_buf_queued_pending_buffers--;
> +
> +        if (migration->load_buf_idx == migration->load_buf_idx_last - 1) {
> +            trace_vfio_load_state_device_buffer_end(vbasedev->name);
> +        }
> +
> +        migration->load_buf_idx++;
> +    }
> +
> +    if (vfio_load_bufs_thread_want_abort(vbasedev, abort_flag)) {
> +        ret = -ECANCELED;
> +        goto ret_signal;
> +    }
> +
> +    ret = vfio_load_bufs_thread_load_config(vbasedev);
> +
> +ret_signal:
> +    migration->load_bufs_thread_running = false;
> +    qemu_cond_signal(&migration->load_bufs_thread_finished_cond);
> +
> +    return ret;

Is the error reported to the migration subsytem ?

> +}
> +
>   static int vfio_save_device_config_state(QEMUFile *f, void *opaque,
>                                            Error **errp)
>   {
> @@ -430,6 +726,12 @@ static bool vfio_precopy_supported(VFIODevice *vbasedev)
>       return migration->mig_flags & VFIO_MIGRATION_PRE_COPY;
>   }
>   
> +static bool vfio_multifd_transfer_supported(void)
> +{
> +    return migration_has_device_state_support() &&
> +        migrate_send_switchover_start();
> +}
> +
>   /* ---------------------------------------------------------------------- */
>   
>   static int vfio_save_prepare(void *opaque, Error **errp)
> @@ -695,17 +997,73 @@ static int vfio_load_setup(QEMUFile *f, void *opaque, Error **errp)
>   
>       assert(!migration->load_setup);
>   
> +    /*
> +     * Make a copy of this setting at the start in case it is changed
> +     * mid-migration.
> +     */
> +    if (vbasedev->migration_multifd_transfer == ON_OFF_AUTO_AUTO) {
> +        migration->multifd_transfer = vfio_multifd_transfer_supported();
> +    } else {
> +        migration->multifd_transfer =
> +            vbasedev->migration_multifd_transfer == ON_OFF_AUTO_ON;
> +    }
> +
> +    if (migration->multifd_transfer && !vfio_multifd_transfer_supported()) {
> +        error_setg(errp,
> +                   "%s: Multifd device transfer requested but unsupported in the current config",
> +                   vbasedev->name);
> +        return -EINVAL;
> +    }

Can we move these checks ealier ? in vfio_migration_realize() ?
If possible, it would be good to avoid the multifd_transfer attribute also.

>       ret = vfio_migration_set_state(vbasedev, VFIO_DEVICE_STATE_RESUMING,
>                                      migration->device_state, errp);
>       if (ret) {
>           return ret;
>       }
>   
> +    if (migration->multifd_transfer) {
> +        assert(!migration->load_bufs.array);
> +        vfio_state_buffers_init(&migration->load_bufs);
> +
> +        qemu_mutex_init(&migration->load_bufs_mutex);
> +
> +        migration->load_buf_idx = 0;
> +        migration->load_buf_idx_last = UINT32_MAX;
> +        migration->load_buf_queued_pending_buffers = 0;
> +        qemu_cond_init(&migration->load_bufs_buffer_ready_cond);
> +
> +        migration->load_bufs_thread_running = false;
> +        migration->load_bufs_thread_want_exit = false;
> +        qemu_cond_init(&migration->load_bufs_thread_finished_cond);

Please provide an helper routine to initialize all the multifd transfer
attributes. We might want to add a struct to gather them all by the way.

> +    }
> +
>       migration->load_setup = true;
>   
>       return 0;
>   }
>   
> +static void vfio_load_cleanup_load_bufs_thread(VFIODevice *vbasedev)
> +{
> +    VFIOMigration *migration = vbasedev->migration;
> +
> +    /* The lock order is load_bufs_mutex -> BQL so unlock BQL here first */
> +    bql_unlock();
> +    WITH_QEMU_LOCK_GUARD(&migration->load_bufs_mutex) {
> +        if (!migration->load_bufs_thread_running) {
> +            break;
> +        }
> +
> +        migration->load_bufs_thread_want_exit = true;
> +
> +        qemu_cond_signal(&migration->load_bufs_buffer_ready_cond);
> +        qemu_cond_wait(&migration->load_bufs_thread_finished_cond,
> +                       &migration->load_bufs_mutex);
> +
> +        assert(!migration->load_bufs_thread_running);
> +    }
> +    bql_lock();
> +}
> +
>   static int vfio_load_cleanup(void *opaque)
>   {
>       VFIODevice *vbasedev = opaque;
> @@ -715,7 +1073,19 @@ static int vfio_load_cleanup(void *opaque)
>           return 0;
>       }
>   
> +    if (migration->multifd_transfer) {
> +        vfio_load_cleanup_load_bufs_thread(vbasedev);
> +    }
> +
>       vfio_migration_cleanup(vbasedev);

Why is the cleanup done in two steps ?

> +
> +    if (migration->multifd_transfer) {
> +        qemu_cond_destroy(&migration->load_bufs_thread_finished_cond);
> +        vfio_state_buffers_destroy(&migration->load_bufs);
> +        qemu_cond_destroy(&migration->load_bufs_buffer_ready_cond);
> +        qemu_mutex_destroy(&migration->load_bufs_mutex);
> +    }
> +
>       migration->load_setup = false;
>       trace_vfio_load_cleanup(vbasedev->name);
>   
> @@ -725,6 +1095,7 @@ static int vfio_load_cleanup(void *opaque)
>   static int vfio_load_state(QEMUFile *f, void *opaque, int version_id)
>   {
>       VFIODevice *vbasedev = opaque;
> +    VFIOMigration *migration = vbasedev->migration;
>       int ret = 0;
>       uint64_t data;
>   
> @@ -736,6 +1107,12 @@ static int vfio_load_state(QEMUFile *f, void *opaque, int version_id)
>           switch (data) {
>           case VFIO_MIG_FLAG_DEV_CONFIG_STATE:
>           {
> +            if (migration->multifd_transfer) {
> +                error_report("%s: got DEV_CONFIG_STATE but doing multifd transfer",
> +                             vbasedev->name);
> +                return -EINVAL;
> +            }
> +
>               return vfio_load_device_config_state(f, opaque);
>           }
>           case VFIO_MIG_FLAG_DEV_SETUP_STATE:
> @@ -801,6 +1178,29 @@ static bool vfio_switchover_ack_needed(void *opaque)
>       return vfio_precopy_supported(vbasedev);
>   }
>   
> +static int vfio_switchover_start(void *opaque)
> +{
> +    VFIODevice *vbasedev = opaque;
> +    VFIOMigration *migration = vbasedev->migration;
> +
> +    if (!migration->multifd_transfer) {
> +        /* Load thread is only used for multifd transfer */
> +        return 0;
> +    }
> +
> +    /* The lock order is load_bufs_mutex -> BQL so unlock BQL here first */
> +    bql_unlock();
> +    WITH_QEMU_LOCK_GUARD(&migration->load_bufs_mutex) {
> +        assert(!migration->load_bufs_thread_running);
> +        migration->load_bufs_thread_running = true;
> +    }
> +    bql_lock();
> +
> +    qemu_loadvm_start_load_thread(vfio_load_bufs_thread, vbasedev);
> +
> +    return 0;
> +}
> +
>   static const SaveVMHandlers savevm_vfio_handlers = {
>       .save_prepare = vfio_save_prepare,
>       .save_setup = vfio_save_setup,
> @@ -814,7 +1214,9 @@ static const SaveVMHandlers savevm_vfio_handlers = {
>       .load_setup = vfio_load_setup,
>       .load_cleanup = vfio_load_cleanup,
>       .load_state = vfio_load_state,
> +    .load_state_buffer = vfio_load_state_buffer,
>       .switchover_ack_needed = vfio_switchover_ack_needed,
> +    .switchover_start = vfio_switchover_start,
>   };
>   
>   /* ---------------------------------------------------------------------- */
> diff --git a/hw/vfio/pci.c b/hw/vfio/pci.c
> index 9d547cb5cdff..72d62ada8a39 100644
> --- a/hw/vfio/pci.c
> +++ b/hw/vfio/pci.c
> @@ -3384,6 +3384,8 @@ static Property vfio_pci_dev_properties[] = {
>                   vbasedev.migration_multifd_transfer,
>                   qdev_prop_on_off_auto_mutable, OnOffAuto,
>                   .set_default = true, .defval.i = ON_OFF_AUTO_AUTO),
> +    DEFINE_PROP_UINT64("x-migration-max-queued-buffers", VFIOPCIDevice,
> +                       vbasedev.migration_max_queued_buffers, UINT64_MAX),
>       DEFINE_PROP_BOOL("migration-events", VFIOPCIDevice,
>                        vbasedev.migration_events, false),
>       DEFINE_PROP_BOOL("x-no-mmap", VFIOPCIDevice, vbasedev.no_mmap, false),
> diff --git a/hw/vfio/trace-events b/hw/vfio/trace-events
> index 1bebe9877d88..418b378ebd29 100644
> --- a/hw/vfio/trace-events
> +++ b/hw/vfio/trace-events
> @@ -153,6 +153,12 @@ vfio_load_device_config_state_start(const char *name) " (%s)"
>   vfio_load_device_config_state_end(const char *name) " (%s)"
>   vfio_load_state(const char *name, uint64_t data) " (%s) data 0x%"PRIx64
>   vfio_load_state_device_data(const char *name, uint64_t data_size, int ret) " (%s) size %"PRIu64" ret %d"
> +vfio_load_state_device_buffer_incoming(const char *name, uint32_t idx) " (%s) idx %"PRIu32
> +vfio_load_state_device_buffer_start(const char *name) " (%s)"
> +vfio_load_state_device_buffer_starved(const char *name, uint32_t idx) " (%s) idx %"PRIu32
> +vfio_load_state_device_buffer_load_start(const char *name, uint32_t idx) " (%s) idx %"PRIu32
> +vfio_load_state_device_buffer_load_end(const char *name, uint32_t idx) " (%s) idx %"PRIu32
> +vfio_load_state_device_buffer_end(const char *name) " (%s)"
>   vfio_migration_realize(const char *name) " (%s)"
>   vfio_migration_set_device_state(const char *name, const char *state) " (%s) state %s"
>   vfio_migration_set_state(const char *name, const char *new_state, const char *recover_state) " (%s) new state %s, recover state %s"
> diff --git a/include/hw/vfio/vfio-common.h b/include/hw/vfio/vfio-common.h
> index b1c03a82eec8..0954d6981a22 100644
> --- a/include/hw/vfio/vfio-common.h
> +++ b/include/hw/vfio/vfio-common.h
> @@ -61,6 +61,11 @@ typedef struct VFIORegion {
>       uint8_t nr; /* cache the region number for debug */
>   } VFIORegion;
>   
> +/* type safety */
> +typedef struct VFIOStateBuffers {
> +    GArray *array;
> +} VFIOStateBuffers;
> +
>   typedef struct VFIOMigration {
>       struct VFIODevice *vbasedev;
>       VMChangeStateEntry *vm_state;
> @@ -73,10 +78,23 @@ typedef struct VFIOMigration {
>       uint64_t mig_flags;
>       uint64_t precopy_init_size;
>       uint64_t precopy_dirty_size;
> +    bool multifd_transfer;
>       bool initial_data_sent;
>   
>       bool event_save_iterate_started;
>       bool event_precopy_empty_hit;
> +
> +    QemuThread load_bufs_thread;
> +    bool load_bufs_thread_running;
> +    bool load_bufs_thread_want_exit;
> +
> +    VFIOStateBuffers load_bufs;
> +    QemuCond load_bufs_buffer_ready_cond;
> +    QemuCond load_bufs_thread_finished_cond;
> +    QemuMutex load_bufs_mutex; /* Lock order: this lock -> BQL */
> +    uint32_t load_buf_idx;
> +    uint32_t load_buf_idx_last;
> +    uint32_t load_buf_queued_pending_buffers;
>   } VFIOMigration;
>   
>   struct VFIOGroup;
> @@ -136,6 +154,7 @@ typedef struct VFIODevice {
>       OnOffAuto enable_migration;
>       OnOffAuto migration_multifd_transfer;
>       bool migration_events;
> +    uint64_t migration_max_queued_buffers;
>       VFIODeviceOps *ops;
>       unsigned int num_irqs;
>       unsigned int num_regions;
> 


Thanks,

C.
Re: [PATCH v3 22/24] vfio/migration: Multifd device state transfer support - receive side
Posted by Maciej S. Szmigiero 1 year, 2 months ago
Hi Cédric,

On 2.12.2024 18:56, Cédric Le Goater wrote:
> Hello Maciej,
> 
> On 11/17/24 20:20, Maciej S. Szmigiero wrote:
>> From: "Maciej S. Szmigiero" <maciej.szmigiero@oracle.com>
>>
>> The multifd received data needs to be reassembled since device state
>> packets sent via different multifd channels can arrive out-of-order.
>>
>> Therefore, each VFIO device state packet carries a header indicating its
>> position in the stream.
>>
>> The last such VFIO device state packet should have
>> VFIO_DEVICE_STATE_CONFIG_STATE flag set and carry the device config state.
>>
>> Since it's important to finish loading device state transferred via the
>> main migration channel (via save_live_iterate SaveVMHandler) before
>> starting loading the data asynchronously transferred via multifd the thread
>> doing the actual loading of the multifd transferred data is only started
>> from switchover_start SaveVMHandler.
>>
>> switchover_start handler is called when MIG_CMD_SWITCHOVER_START
>> sub-command of QEMU_VM_COMMAND is received via the main migration channel.
>>
>> This sub-command is only sent after all save_live_iterate data have already
>> been posted so it is safe to commence loading of the multifd-transferred
>> device state upon receiving it - loading of save_live_iterate data happens
>> synchronously in the main migration thread (much like the processing of
>> MIG_CMD_SWITCHOVER_START) so by the time MIG_CMD_SWITCHOVER_START is
>> processed all the proceeding data must have already been loaded.
>>
>> Signed-off-by: Maciej S. Szmigiero <maciej.szmigiero@oracle.com>
>> ---
>>   hw/vfio/migration.c           | 402 ++++++++++++++++++++++++++++++++++
> 
> This is quite a significant update to introduce all at once. It lacks a
> comprehensive overview of the design for those who were not involved in
> the earlier discussions adding support for multifd migration of device
> state. There are multiple threads and migration streams involved at
> load time which deserve some descriptions. I think the best place
> would be at the end of :
> 
>     https://qemu.readthedocs.io/en/v9.1.0/devel/migration/vfio.html

Will try to add some design/implementations descriptions to
docs/devel/migration/vfio.rst.

> Could you please break down the patch to progressively introduce the
> various elements needed for the receive sequence ? Something like :
> 
>    - data structures first
>    - init phase
>    - run time
>    - and clean up phase
>    - toggles to enable/disable/tune
>    - finaly, documentation update (under vfio migration)

Obviously I can split the VFIO patch into smaller fragments,
but this means that the intermediate form won't be testable
(I guess that's okay).

> Some more below,
> 
>>   hw/vfio/pci.c                 |   2 +
>>   hw/vfio/trace-events          |   6 +
>>   include/hw/vfio/vfio-common.h |  19 ++
>>   4 files changed, 429 insertions(+)
>>
>> diff --git a/hw/vfio/migration.c b/hw/vfio/migration.c
>> index 683f2ae98d5e..b54879fe6209 100644
>> --- a/hw/vfio/migration.c
>> +++ b/hw/vfio/migration.c
>> @@ -15,6 +15,7 @@
>>   #include <linux/vfio.h>
>>   #include <sys/ioctl.h>
>> +#include "io/channel-buffer.h"
>>   #include "sysemu/runstate.h"
>>   #include "hw/vfio/vfio-common.h"
>>   #include "migration/misc.h"
>> @@ -55,6 +56,15 @@
>>    */
>>   #define VFIO_MIG_DEFAULT_DATA_BUFFER_SIZE (1 * MiB)
>> +#define VFIO_DEVICE_STATE_CONFIG_STATE (1)
>> +
>> +typedef struct VFIODeviceStatePacket {
>> +    uint32_t version;
>> +    uint32_t idx;
>> +    uint32_t flags;
>> +    uint8_t data[0];
>> +} QEMU_PACKED VFIODeviceStatePacket;
>> +
>>   static int64_t bytes_transferred;
>>   static const char *mig_state_to_str(enum vfio_device_mig_state state)
>> @@ -254,6 +264,292 @@ static int vfio_load_buffer(QEMUFile *f, VFIODevice *vbasedev,
>>       return ret;
>>   }
>> +typedef struct VFIOStateBuffer {
>> +    bool is_present;
>> +    char *data;
>> +    size_t len;
>> +} VFIOStateBuffer;
>> +
>> +static void vfio_state_buffer_clear(gpointer data)
>> +{
>> +    VFIOStateBuffer *lb = data;
>> +
>> +    if (!lb->is_present) {
>> +        return;
>> +    }
>> +
>> +    g_clear_pointer(&lb->data, g_free);
>> +    lb->is_present = false;
>> +}
>> +
>> +static void vfio_state_buffers_init(VFIOStateBuffers *bufs)
>> +{
>> +    bufs->array = g_array_new(FALSE, TRUE, sizeof(VFIOStateBuffer));
>> +    g_array_set_clear_func(bufs->array, vfio_state_buffer_clear);
>> +}
>> +
>> +static void vfio_state_buffers_destroy(VFIOStateBuffers *bufs)
>> +{
>> +    g_clear_pointer(&bufs->array, g_array_unref);
>> +}
>> +
>> +static void vfio_state_buffers_assert_init(VFIOStateBuffers *bufs)
>> +{
>> +    assert(bufs->array);
>> +}
>> +
>> +static guint vfio_state_buffers_size_get(VFIOStateBuffers *bufs)
>> +{
>> +    return bufs->array->len;
>> +}
>> +
>> +static void vfio_state_buffers_size_set(VFIOStateBuffers *bufs, guint size)
>> +{
>> +    g_array_set_size(bufs->array, size);
>> +}
>> +
>> +static VFIOStateBuffer *vfio_state_buffers_at(VFIOStateBuffers *bufs, guint idx)
>> +{
>> +    return &g_array_index(bufs->array, VFIOStateBuffer, idx);
>> +}
>> +
>> +static int vfio_load_state_buffer(void *opaque, char *data, size_t data_size,
>> +                                  Error **errp)
>> +{
>> +    VFIODevice *vbasedev = opaque;
>> +    VFIOMigration *migration = vbasedev->migration;
>> +    VFIODeviceStatePacket *packet = (VFIODeviceStatePacket *)data;
>> +    VFIOStateBuffer *lb;
>> +
>> +    /*
>> +     * Holding BQL here would violate the lock order and can cause
>> +     * a deadlock once we attempt to lock load_bufs_mutex below.
>> +     */
>> +    assert(!bql_locked());
>> +
>> +    if (!migration->multifd_transfer) {
> 
> Hmm, why is 'multifd_transfer' a migration attribute ? Shouldn't it
> be at the device level ? 

I thought migration-time data goes into VFIOMigration?

I don't have any strong objections against moving it into VFIODevice though.

> Or should all devices of a VM support multifd
> transfer ? That said, I'm a bit unclear about the limitations, if there
> are any. Could please you explain a bit more when the migration sequence
> is setup for the  device ?
> 

The reason we need this setting on the receive side is because we
need to know whether to start the load_bufs_thread (the migration
core will later wait for this thread to finish before proceeding further).

We also need to know whether to allocate multifd-related data structures
in the VFIO driver based on this setting.

This setting ultimately comes from "x-migration-multifd-transfer"
VFIOPCIDevice setting, which is a ON_OFF_AUTO setting ("AUTO" value means
that multifd use in the driver is attempted in configurations that
otherwise support it).

> 
>> +        error_setg(errp,
>> +                   "got device state packet but not doing multifd transfer");
>> +        return -1;
>> +    }
>> +
>> +    if (data_size < sizeof(*packet)) {
>> +        error_setg(errp, "packet too short at %zu (min is %zu)",
>> +                   data_size, sizeof(*packet));
>> +        return -1;
>> +    }
>> +
>> +    if (packet->version != 0) {
>> +        error_setg(errp, "packet has unknown version %" PRIu32,
>> +                   packet->version);
>> +        return -1;
>> +    }
>> +
>> +    if (packet->idx == UINT32_MAX) {
>> +        error_setg(errp, "packet has too high idx %" PRIu32,
>> +                   packet->idx);
>> +        return -1;
>> +    }
>> +
>> +    trace_vfio_load_state_device_buffer_incoming(vbasedev->name, packet->idx);
>> +
>> +    QEMU_LOCK_GUARD(&migration->load_bufs_mutex);
>> +
>> +    /* config state packet should be the last one in the stream */
>> +    if (packet->flags & VFIO_DEVICE_STATE_CONFIG_STATE) {
>> +        migration->load_buf_idx_last = packet->idx;
>> +    }
>> +
>> +    vfio_state_buffers_assert_init(&migration->load_bufs);
>> +    if (packet->idx >= vfio_state_buffers_size_get(&migration->load_bufs)) {
>> +        vfio_state_buffers_size_set(&migration->load_bufs, packet->idx + 1);
>> +    }
>> +
>> +    lb = vfio_state_buffers_at(&migration->load_bufs, packet->idx);
>> +    if (lb->is_present) {
>> +        error_setg(errp, "state buffer %" PRIu32 " already filled",
>> +                   packet->idx);
>> +        return -1;
>> +    }
>> +
>> +    assert(packet->idx >= migration->load_buf_idx);
>> +
>> +    migration->load_buf_queued_pending_buffers++;
>> +    if (migration->load_buf_queued_pending_buffers >
>> +        vbasedev->migration_max_queued_buffers) {
>> +        error_setg(errp,
>> +                   "queuing state buffer %" PRIu32 " would exceed the max of %" PRIu64,
>> +                   packet->idx, vbasedev->migration_max_queued_buffers);
>> +        return -1;
>> +    }
>> +
>> +    lb->data = g_memdup2(&packet->data, data_size - sizeof(*packet));
>> +    lb->len = data_size - sizeof(*packet);
>> +    lb->is_present = true;
>> +
>> +    qemu_cond_signal(&migration->load_bufs_buffer_ready_cond);
>> +
>> +    return 0;
>> +}
>> +
>> +static int vfio_load_device_config_state(QEMUFile *f, void *opaque);
>> +
>> +static int vfio_load_bufs_thread_load_config(VFIODevice *vbasedev)
>> +{
>> +    VFIOMigration *migration = vbasedev->migration;
>> +    VFIOStateBuffer *lb;
>> +    g_autoptr(QIOChannelBuffer) bioc = NULL;
>> +    QEMUFile *f_out = NULL, *f_in = NULL;
>> +    uint64_t mig_header;
>> +    int ret;
>> +
>> +    assert(migration->load_buf_idx == migration->load_buf_idx_last);
>> +    lb = vfio_state_buffers_at(&migration->load_bufs, migration->load_buf_idx);
>> +    assert(lb->is_present);
>> +
>> +    bioc = qio_channel_buffer_new(lb->len);
>> +    qio_channel_set_name(QIO_CHANNEL(bioc), "vfio-device-config-load");
>> +
>> +    f_out = qemu_file_new_output(QIO_CHANNEL(bioc));
>> +    qemu_put_buffer(f_out, (uint8_t *)lb->data, lb->len);
>> +
>> +    ret = qemu_fflush(f_out);
>> +    if (ret) {
>> +        g_clear_pointer(&f_out, qemu_fclose);
>> +        return ret;
>> +    }
>> +
>> +    qio_channel_io_seek(QIO_CHANNEL(bioc), 0, 0, NULL);
>> +    f_in = qemu_file_new_input(QIO_CHANNEL(bioc));
>> +
>> +    mig_header = qemu_get_be64(f_in);
>> +    if (mig_header != VFIO_MIG_FLAG_DEV_CONFIG_STATE) {
>> +        g_clear_pointer(&f_out, qemu_fclose);
>> +        g_clear_pointer(&f_in, qemu_fclose);
>> +        return -EINVAL;
>> +    }
> 
> All the above code is using the QIOChannel interface which is sort of an
> internal API of the migration subsystem. Can we move it under migration ?

hw/remote and hw/virtio are also using QIOChannel API, not to mention
qemu-nbd, block/nbd and backends/tpm, so definitely it's not just the
core migration code that uses it.

I don't think introducing a tiny generic migration core helper which takes
VFIO-specific buffer with config data and ends calling VFIO-specific
device config state load function really makes sense.

> 
>> +
>> +    bql_lock();
>> +    ret = vfio_load_device_config_state(f_in, vbasedev);
>> +    bql_unlock();
>> +
>> +    g_clear_pointer(&f_out, qemu_fclose);
>> +    g_clear_pointer(&f_in, qemu_fclose);
>> +    if (ret < 0) {
>> +        return ret;
>> +    }
>> +
>> +    return 0;
>> +}
>> +
>> +static bool vfio_load_bufs_thread_want_abort(VFIODevice *vbasedev,
>> +                                             bool *abort_flag)
>> +{
>> +    VFIOMigration *migration = vbasedev->migration;
>> +
>> +    return migration->load_bufs_thread_want_exit || qatomic_read(abort_flag);
>> +}
>> +
>> +static int vfio_load_bufs_thread(bool *abort_flag, void *opaque)
>> +{
>> +    VFIODevice *vbasedev = opaque;
>> +    VFIOMigration *migration = vbasedev->migration;
>> +    QEMU_LOCK_GUARD(&migration->load_bufs_mutex);
>> +    int ret;
>> +
>> +    assert(migration->load_bufs_thread_running);
>> +
>> +    while (!vfio_load_bufs_thread_want_abort(vbasedev, abort_flag)) {
>> +        VFIOStateBuffer *lb;
>> +        guint bufs_len;
>> +        bool starved;
>> +
>> +        assert(migration->load_buf_idx <= migration->load_buf_idx_last);
>> +
>> +        bufs_len = vfio_state_buffers_size_get(&migration->load_bufs);
>> +        if (migration->load_buf_idx >= bufs_len) {
>> +            assert(migration->load_buf_idx == bufs_len);
>> +            starved = true;
>> +        } else {
>> +            lb = vfio_state_buffers_at(&migration->load_bufs,
>> +                                       migration->load_buf_idx);
>> +            starved = !lb->is_present;
>> +        }
>> +
>> +        if (starved) {
>> +            trace_vfio_load_state_device_buffer_starved(vbasedev->name,
>> +                                                        migration->load_buf_idx);
>> +            qemu_cond_wait(&migration->load_bufs_buffer_ready_cond,
>> +                           &migration->load_bufs_mutex);
>> +            continue;
>> +        }
>> +
>> +        if (migration->load_buf_idx == migration->load_buf_idx_last) {
>> +            break;
>> +        }
>> +
>> +        if (migration->load_buf_idx == 0) {
>> +            trace_vfio_load_state_device_buffer_start(vbasedev->name);
>> +        }
>> +
>> +        if (lb->len) {
>> +            g_autofree char *buf = NULL;
>> +            size_t buf_len;
>> +            ssize_t wr_ret;
>> +            int errno_save;
>> +
>> +            trace_vfio_load_state_device_buffer_load_start(vbasedev->name,
>> +                                                           migration->load_buf_idx);
>> +
>> +            /* lb might become re-allocated when we drop the lock */
>> +            buf = g_steal_pointer(&lb->data);
>> +            buf_len = lb->len;
>> +
>> +            /*
>> +             * Loading data to the device takes a while,
>> +             * drop the lock during this process.
>> +             */
>> +            qemu_mutex_unlock(&migration->load_bufs_mutex);
>> +            wr_ret = write(migration->data_fd, buf, buf_len);
>> +            errno_save = errno;
>> +            qemu_mutex_lock(&migration->load_bufs_mutex);
>> +
>> +            if (wr_ret < 0) {
>> +                ret = -errno_save;
>> +                goto ret_signal;
>> +            } else if (wr_ret < buf_len) {
>> +                ret = -EINVAL;
>> +                goto ret_signal;
>> +            }
>> +
>> +            trace_vfio_load_state_device_buffer_load_end(vbasedev->name,
>> +                                                         migration->load_buf_idx);
>> +        }
>> +
>> +        assert(migration->load_buf_queued_pending_buffers > 0);
>> +        migration->load_buf_queued_pending_buffers--;
>> +
>> +        if (migration->load_buf_idx == migration->load_buf_idx_last - 1) {
>> +            trace_vfio_load_state_device_buffer_end(vbasedev->name);
>> +        }
>> +
>> +        migration->load_buf_idx++;
>> +    }
>> +
>> +    if (vfio_load_bufs_thread_want_abort(vbasedev, abort_flag)) {
>> +        ret = -ECANCELED;
>> +        goto ret_signal;
>> +    }
>> +
>> +    ret = vfio_load_bufs_thread_load_config(vbasedev);
>> +
>> +ret_signal:
>> +    migration->load_bufs_thread_running = false;
>> +    qemu_cond_signal(&migration->load_bufs_thread_finished_cond);
>> +
>> +    return ret;
> 
> Is the error reported to the migration subsytem ?

Yes, via setting "load_threads_ret" in qemu_loadvm_load_thread().

>> +}
>> +
>>   static int vfio_save_device_config_state(QEMUFile *f, void *opaque,
>>                                            Error **errp)
>>   {
>> @@ -430,6 +726,12 @@ static bool vfio_precopy_supported(VFIODevice *vbasedev)
>>       return migration->mig_flags & VFIO_MIGRATION_PRE_COPY;
>>   }
>> +static bool vfio_multifd_transfer_supported(void)
>> +{
>> +    return migration_has_device_state_support() &&
>> +        migrate_send_switchover_start();
>> +}
>> +
>>   /* ---------------------------------------------------------------------- */
>>   static int vfio_save_prepare(void *opaque, Error **errp)
>> @@ -695,17 +997,73 @@ static int vfio_load_setup(QEMUFile *f, void *opaque, Error **errp)
>>       assert(!migration->load_setup);
>> +    /*
>> +     * Make a copy of this setting at the start in case it is changed
>> +     * mid-migration.
>> +     */
>> +    if (vbasedev->migration_multifd_transfer == ON_OFF_AUTO_AUTO) {
>> +        migration->multifd_transfer = vfio_multifd_transfer_supported();
>> +    } else {
>> +        migration->multifd_transfer =
>> +            vbasedev->migration_multifd_transfer == ON_OFF_AUTO_ON;
>> +    }
>> +
>> +    if (migration->multifd_transfer && !vfio_multifd_transfer_supported()) {
>> +        error_setg(errp,
>> +                   "%s: Multifd device transfer requested but unsupported in the current config",
>> +                   vbasedev->name);
>> +        return -EINVAL;
>> +    }
> 
> Can we move these checks ealier ? in vfio_migration_realize() ?
> If possible, it would be good to avoid the multifd_transfer attribute also.

We can't since the value is changeable at runtime, so it could have been
changed after the VFIO device got realized.

>>       ret = vfio_migration_set_state(vbasedev, VFIO_DEVICE_STATE_RESUMING,
>>                                      migration->device_state, errp);
>>       if (ret) {
>>           return ret;
>>       }
>> +    if (migration->multifd_transfer) {
>> +        assert(!migration->load_bufs.array);
>> +        vfio_state_buffers_init(&migration->load_bufs);
>> +
>> +        qemu_mutex_init(&migration->load_bufs_mutex);
>> +
>> +        migration->load_buf_idx = 0;
>> +        migration->load_buf_idx_last = UINT32_MAX;
>> +        migration->load_buf_queued_pending_buffers = 0;
>> +        qemu_cond_init(&migration->load_bufs_buffer_ready_cond);
>> +
>> +        migration->load_bufs_thread_running = false;
>> +        migration->load_bufs_thread_want_exit = false;
>> +        qemu_cond_init(&migration->load_bufs_thread_finished_cond);
> 
> Please provide an helper routine to initialize all the multifd transfer
> attributes. We might want to add a struct to gather them all by the way.

Will move these to a new helper.

>> +    }
>> +
>>       migration->load_setup = true;
>>       return 0;
>>   }
>> +static void vfio_load_cleanup_load_bufs_thread(VFIODevice *vbasedev)
>> +{
>> +    VFIOMigration *migration = vbasedev->migration;
>> +
>> +    /* The lock order is load_bufs_mutex -> BQL so unlock BQL here first */
>> +    bql_unlock();
>> +    WITH_QEMU_LOCK_GUARD(&migration->load_bufs_mutex) {
>> +        if (!migration->load_bufs_thread_running) {
>> +            break;
>> +        }
>> +
>> +        migration->load_bufs_thread_want_exit = true;
>> +
>> +        qemu_cond_signal(&migration->load_bufs_buffer_ready_cond);
>> +        qemu_cond_wait(&migration->load_bufs_thread_finished_cond,
>> +                       &migration->load_bufs_mutex);
>> +
>> +        assert(!migration->load_bufs_thread_running);
>> +    }
>> +    bql_lock();
>> +}
>> +
>>   static int vfio_load_cleanup(void *opaque)
>>   {
>>       VFIODevice *vbasedev = opaque;
>> @@ -715,7 +1073,19 @@ static int vfio_load_cleanup(void *opaque)
>>           return 0;
>>       }
>> +    if (migration->multifd_transfer) {
>> +        vfio_load_cleanup_load_bufs_thread(vbasedev);
>> +    }
>> +
>>       vfio_migration_cleanup(vbasedev);
> 
> Why is the cleanup done in two steps ?

I'm not sure what "two steps" here refer to, but
if you mean to move the "if (migration->multifd_transfer)"
block below to the similar one above then it should be possible.

>> +
>> +    if (migration->multifd_transfer) {
>> +        qemu_cond_destroy(&migration->load_bufs_thread_finished_cond);
>> +        vfio_state_buffers_destroy(&migration->load_bufs);
>> +        qemu_cond_destroy(&migration->load_bufs_buffer_ready_cond);
>> +        qemu_mutex_destroy(&migration->load_bufs_mutex);
>> +    }
>> +
>>       migration->load_setup = false;
>>       trace_vfio_load_cleanup(vbasedev->name);
(..)

  
> Thanks,
> 
> C.
> 

Thanks,
Maciej


Re: [PATCH v3 22/24] vfio/migration: Multifd device state transfer support - receive side
Posted by Cédric Le Goater 1 year, 1 month ago
On 12/11/24 00:04, Maciej S. Szmigiero wrote:
> Hi Cédric,
> 
> On 2.12.2024 18:56, Cédric Le Goater wrote:
>> Hello Maciej,
>>
>> On 11/17/24 20:20, Maciej S. Szmigiero wrote:
>>> From: "Maciej S. Szmigiero" <maciej.szmigiero@oracle.com>
>>>
>>> The multifd received data needs to be reassembled since device state
>>> packets sent via different multifd channels can arrive out-of-order.
>>>
>>> Therefore, each VFIO device state packet carries a header indicating its
>>> position in the stream.
>>>
>>> The last such VFIO device state packet should have
>>> VFIO_DEVICE_STATE_CONFIG_STATE flag set and carry the device config state.
>>>
>>> Since it's important to finish loading device state transferred via the
>>> main migration channel (via save_live_iterate SaveVMHandler) before
>>> starting loading the data asynchronously transferred via multifd the thread
>>> doing the actual loading of the multifd transferred data is only started
>>> from switchover_start SaveVMHandler.
>>>
>>> switchover_start handler is called when MIG_CMD_SWITCHOVER_START
>>> sub-command of QEMU_VM_COMMAND is received via the main migration channel.
>>>
>>> This sub-command is only sent after all save_live_iterate data have already
>>> been posted so it is safe to commence loading of the multifd-transferred
>>> device state upon receiving it - loading of save_live_iterate data happens
>>> synchronously in the main migration thread (much like the processing of
>>> MIG_CMD_SWITCHOVER_START) so by the time MIG_CMD_SWITCHOVER_START is
>>> processed all the proceeding data must have already been loaded.
>>>
>>> Signed-off-by: Maciej S. Szmigiero <maciej.szmigiero@oracle.com>
>>> ---
>>>   hw/vfio/migration.c           | 402 ++++++++++++++++++++++++++++++++++
>>
>> This is quite a significant update to introduce all at once. It lacks a
>> comprehensive overview of the design for those who were not involved in
>> the earlier discussions adding support for multifd migration of device
>> state. There are multiple threads and migration streams involved at
>> load time which deserve some descriptions. I think the best place
>> would be at the end of :
>>
>>     https://qemu.readthedocs.io/en/v9.1.0/devel/migration/vfio.html
> 
> Will try to add some design/implementations descriptions to
> docs/devel/migration/vfio.rst.
> 
>> Could you please break down the patch to progressively introduce the
>> various elements needed for the receive sequence ? Something like :
>>
>>    - data structures first
>>    - init phase
>>    - run time
>>    - and clean up phase
>>    - toggles to enable/disable/tune
>>    - finaly, documentation update (under vfio migration)
> 
> Obviously I can split the VFIO patch into smaller fragments,
> but this means that the intermediate form won't be testable
> (I guess that's okay).

As long as bisect is not broken, it is fine. Typically, the last patch
of a series is the one activating the new proposed feature.

> 
>> Some more below,
>>
>>>   hw/vfio/pci.c                 |   2 +
>>>   hw/vfio/trace-events          |   6 +
>>>   include/hw/vfio/vfio-common.h |  19 ++
>>>   4 files changed, 429 insertions(+)
>>>
>>> diff --git a/hw/vfio/migration.c b/hw/vfio/migration.c
>>> index 683f2ae98d5e..b54879fe6209 100644
>>> --- a/hw/vfio/migration.c
>>> +++ b/hw/vfio/migration.c
>>> @@ -15,6 +15,7 @@
>>>   #include <linux/vfio.h>
>>>   #include <sys/ioctl.h>
>>> +#include "io/channel-buffer.h"
>>>   #include "sysemu/runstate.h"
>>>   #include "hw/vfio/vfio-common.h"
>>>   #include "migration/misc.h"
>>> @@ -55,6 +56,15 @@
>>>    */
>>>   #define VFIO_MIG_DEFAULT_DATA_BUFFER_SIZE (1 * MiB)
>>> +#define VFIO_DEVICE_STATE_CONFIG_STATE (1)
>>> +
>>> +typedef struct VFIODeviceStatePacket {
>>> +    uint32_t version;
>>> +    uint32_t idx;
>>> +    uint32_t flags;
>>> +    uint8_t data[0];
>>> +} QEMU_PACKED VFIODeviceStatePacket;
>>> +
>>>   static int64_t bytes_transferred;
>>>   static const char *mig_state_to_str(enum vfio_device_mig_state state)
>>> @@ -254,6 +264,292 @@ static int vfio_load_buffer(QEMUFile *f, VFIODevice *vbasedev,
>>>       return ret;
>>>   }
>>> +typedef struct VFIOStateBuffer {
>>> +    bool is_present;
>>> +    char *data;
>>> +    size_t len;
>>> +} VFIOStateBuffer;
>>> +
>>> +static void vfio_state_buffer_clear(gpointer data)
>>> +{
>>> +    VFIOStateBuffer *lb = data;
>>> +
>>> +    if (!lb->is_present) {
>>> +        return;
>>> +    }
>>> +
>>> +    g_clear_pointer(&lb->data, g_free);
>>> +    lb->is_present = false;
>>> +}
>>> +
>>> +static void vfio_state_buffers_init(VFIOStateBuffers *bufs)
>>> +{
>>> +    bufs->array = g_array_new(FALSE, TRUE, sizeof(VFIOStateBuffer));
>>> +    g_array_set_clear_func(bufs->array, vfio_state_buffer_clear);
>>> +}
>>> +
>>> +static void vfio_state_buffers_destroy(VFIOStateBuffers *bufs)
>>> +{
>>> +    g_clear_pointer(&bufs->array, g_array_unref);
>>> +}
>>> +
>>> +static void vfio_state_buffers_assert_init(VFIOStateBuffers *bufs)
>>> +{
>>> +    assert(bufs->array);
>>> +}
>>> +
>>> +static guint vfio_state_buffers_size_get(VFIOStateBuffers *bufs)
>>> +{
>>> +    return bufs->array->len;
>>> +}
>>> +
>>> +static void vfio_state_buffers_size_set(VFIOStateBuffers *bufs, guint size)
>>> +{
>>> +    g_array_set_size(bufs->array, size);
>>> +}
>>> +
>>> +static VFIOStateBuffer *vfio_state_buffers_at(VFIOStateBuffers *bufs, guint idx)
>>> +{
>>> +    return &g_array_index(bufs->array, VFIOStateBuffer, idx);
>>> +}
>>> +
>>> +static int vfio_load_state_buffer(void *opaque, char *data, size_t data_size,
>>> +                                  Error **errp)
>>> +{
>>> +    VFIODevice *vbasedev = opaque;
>>> +    VFIOMigration *migration = vbasedev->migration;
>>> +    VFIODeviceStatePacket *packet = (VFIODeviceStatePacket *)data;
>>> +    VFIOStateBuffer *lb;
>>> +
>>> +    /*
>>> +     * Holding BQL here would violate the lock order and can cause
>>> +     * a deadlock once we attempt to lock load_bufs_mutex below.
>>> +     */
>>> +    assert(!bql_locked());
>>> +
>>> +    if (!migration->multifd_transfer) {
>>
>> Hmm, why is 'multifd_transfer' a migration attribute ? Shouldn't it
>> be at the device level ? 
> 
> I thought migration-time data goes into VFIOMigration?

yes. Sorry, I was confused by the object MigrationState which is global.
VFIOMigration is a device leve object. We are fine.

AFAICT, this supports hybrid configs : some devices using multifd
migration and some others using standard migration.

> I don't have any strong objections against moving it into VFIODevice though.
> 
>> Or should all devices of a VM support multifd
>> transfer ? That said, I'm a bit unclear about the limitations, if there
>> are any. Could please you explain a bit more when the migration sequence
>> is setup for the  device ?
>>
> 
> The reason we need this setting on the receive side is because we
> need to know whether to start the load_bufs_thread (the migration
> core will later wait for this thread to finish before proceeding further).
> 
> We also need to know whether to allocate multifd-related data structures
> in the VFIO driver based on this setting.
> 
> This setting ultimately comes from "x-migration-multifd-transfer"
> VFIOPCIDevice setting, which is a ON_OFF_AUTO setting ("AUTO" value means
> that multifd use in the driver is attempted in configurations that
> otherwise support it).
> 
>>
>>> +        error_setg(errp,
>>> +                   "got device state packet but not doing multifd transfer");
>>> +        return -1;
>>> +    }
>>> +
>>> +    if (data_size < sizeof(*packet)) {
>>> +        error_setg(errp, "packet too short at %zu (min is %zu)",
>>> +                   data_size, sizeof(*packet));
>>> +        return -1;
>>> +    }
>>> +
>>> +    if (packet->version != 0) {
>>> +        error_setg(errp, "packet has unknown version %" PRIu32,
>>> +                   packet->version);
>>> +        return -1;
>>> +    }
>>> +
>>> +    if (packet->idx == UINT32_MAX) {
>>> +        error_setg(errp, "packet has too high idx %" PRIu32,
>>> +                   packet->idx);
>>> +        return -1;
>>> +    }
>>> +
>>> +    trace_vfio_load_state_device_buffer_incoming(vbasedev->name, packet->idx);
>>> +
>>> +    QEMU_LOCK_GUARD(&migration->load_bufs_mutex);
>>> +
>>> +    /* config state packet should be the last one in the stream */
>>> +    if (packet->flags & VFIO_DEVICE_STATE_CONFIG_STATE) {
>>> +        migration->load_buf_idx_last = packet->idx;
>>> +    }
>>> +
>>> +    vfio_state_buffers_assert_init(&migration->load_bufs);
>>> +    if (packet->idx >= vfio_state_buffers_size_get(&migration->load_bufs)) {
>>> +        vfio_state_buffers_size_set(&migration->load_bufs, packet->idx + 1);
>>> +    }
>>> +
>>> +    lb = vfio_state_buffers_at(&migration->load_bufs, packet->idx);
>>> +    if (lb->is_present) {
>>> +        error_setg(errp, "state buffer %" PRIu32 " already filled",
>>> +                   packet->idx);
>>> +        return -1;
>>> +    }
>>> +
>>> +    assert(packet->idx >= migration->load_buf_idx);
>>> +
>>> +    migration->load_buf_queued_pending_buffers++;
>>> +    if (migration->load_buf_queued_pending_buffers >
>>> +        vbasedev->migration_max_queued_buffers) {
>>> +        error_setg(errp,
>>> +                   "queuing state buffer %" PRIu32 " would exceed the max of %" PRIu64,
>>> +                   packet->idx, vbasedev->migration_max_queued_buffers);
>>> +        return -1;
>>> +    }
>>> +
>>> +    lb->data = g_memdup2(&packet->data, data_size - sizeof(*packet));
>>> +    lb->len = data_size - sizeof(*packet);
>>> +    lb->is_present = true;
>>> +
>>> +    qemu_cond_signal(&migration->load_bufs_buffer_ready_cond);
>>> +
>>> +    return 0;
>>> +}
>>> +
>>> +static int vfio_load_device_config_state(QEMUFile *f, void *opaque);
>>> +
>>> +static int vfio_load_bufs_thread_load_config(VFIODevice *vbasedev)
>>> +{
>>> +    VFIOMigration *migration = vbasedev->migration;
>>> +    VFIOStateBuffer *lb;
>>> +    g_autoptr(QIOChannelBuffer) bioc = NULL;
>>> +    QEMUFile *f_out = NULL, *f_in = NULL;
>>> +    uint64_t mig_header;
>>> +    int ret;
>>> +
>>> +    assert(migration->load_buf_idx == migration->load_buf_idx_last);
>>> +    lb = vfio_state_buffers_at(&migration->load_bufs, migration->load_buf_idx);
>>> +    assert(lb->is_present);
>>> +
>>> +    bioc = qio_channel_buffer_new(lb->len);
>>> +    qio_channel_set_name(QIO_CHANNEL(bioc), "vfio-device-config-load");
>>> +
>>> +    f_out = qemu_file_new_output(QIO_CHANNEL(bioc));
>>> +    qemu_put_buffer(f_out, (uint8_t *)lb->data, lb->len);
>>> +
>>> +    ret = qemu_fflush(f_out);
>>> +    if (ret) {
>>> +        g_clear_pointer(&f_out, qemu_fclose);
>>> +        return ret;
>>> +    }
>>> +
>>> +    qio_channel_io_seek(QIO_CHANNEL(bioc), 0, 0, NULL);
>>> +    f_in = qemu_file_new_input(QIO_CHANNEL(bioc));
>>> +
>>> +    mig_header = qemu_get_be64(f_in);
>>> +    if (mig_header != VFIO_MIG_FLAG_DEV_CONFIG_STATE) {
>>> +        g_clear_pointer(&f_out, qemu_fclose);
>>> +        g_clear_pointer(&f_in, qemu_fclose);
>>> +        return -EINVAL;
>>> +    }
>>
>> All the above code is using the QIOChannel interface which is sort of an
>> internal API of the migration subsystem. Can we move it under migration ?
> 
> hw/remote and hw/virtio are also using QIOChannel API, not to mention
> qemu-nbd, block/nbd and backends/tpm, so definitely it's not just the
> core migration code that uses it.

These examples are not device models.

> I don't think introducing a tiny generic migration core helper which takes
> VFIO-specific buffer with config data and ends calling VFIO-specific
> device config state load function really makes sense.

qemu_file_new_input/ouput, qio_channel_buffer_new, qio_channel_io_seek,
qio_channel_buffer_new are solely used in migration. That's why I am
reluctant to use them directly in VFIO.

I agree it is small, for now.

> 
>>
>>> +
>>> +    bql_lock();
>>> +    ret = vfio_load_device_config_state(f_in, vbasedev);
>>> +    bql_unlock();
>>> +
>>> +    g_clear_pointer(&f_out, qemu_fclose);
>>> +    g_clear_pointer(&f_in, qemu_fclose);
>>> +    if (ret < 0) {
>>> +        return ret;
>>> +    }
>>> +
>>> +    return 0;
>>> +}
>>> +
>>> +static bool vfio_load_bufs_thread_want_abort(VFIODevice *vbasedev,
>>> +                                             bool *abort_flag)
>>> +{
>>> +    VFIOMigration *migration = vbasedev->migration;
>>> +
>>> +    return migration->load_bufs_thread_want_exit || qatomic_read(abort_flag);
>>> +}
>>> +
>>> +static int vfio_load_bufs_thread(bool *abort_flag, void *opaque)
>>> +{
>>> +    VFIODevice *vbasedev = opaque;
>>> +    VFIOMigration *migration = vbasedev->migration;
>>> +    QEMU_LOCK_GUARD(&migration->load_bufs_mutex);
>>> +    int ret;
>>> +
>>> +    assert(migration->load_bufs_thread_running);
>>> +
>>> +    while (!vfio_load_bufs_thread_want_abort(vbasedev, abort_flag)) {
>>> +        VFIOStateBuffer *lb;
>>> +        guint bufs_len;
>>> +        bool starved;
>>> +
>>> +        assert(migration->load_buf_idx <= migration->load_buf_idx_last);
>>> +
>>> +        bufs_len = vfio_state_buffers_size_get(&migration->load_bufs);
>>> +        if (migration->load_buf_idx >= bufs_len) {
>>> +            assert(migration->load_buf_idx == bufs_len);
>>> +            starved = true;
>>> +        } else {
>>> +            lb = vfio_state_buffers_at(&migration->load_bufs,
>>> +                                       migration->load_buf_idx);
>>> +            starved = !lb->is_present;
>>> +        }
>>> +
>>> +        if (starved) {
>>> +            trace_vfio_load_state_device_buffer_starved(vbasedev->name,
>>> +                                                        migration->load_buf_idx);
>>> +            qemu_cond_wait(&migration->load_bufs_buffer_ready_cond,
>>> +                           &migration->load_bufs_mutex);
>>> +            continue;
>>> +        }
>>> +
>>> +        if (migration->load_buf_idx == migration->load_buf_idx_last) {
>>> +            break;
>>> +        }
>>> +
>>> +        if (migration->load_buf_idx == 0) {
>>> +            trace_vfio_load_state_device_buffer_start(vbasedev->name);
>>> +        }
>>> +
>>> +        if (lb->len) {
>>> +            g_autofree char *buf = NULL;
>>> +            size_t buf_len;
>>> +            ssize_t wr_ret;
>>> +            int errno_save;
>>> +
>>> +            trace_vfio_load_state_device_buffer_load_start(vbasedev->name,
>>> +                                                           migration->load_buf_idx);
>>> +
>>> +            /* lb might become re-allocated when we drop the lock */
>>> +            buf = g_steal_pointer(&lb->data);
>>> +            buf_len = lb->len;
>>> +
>>> +            /*
>>> +             * Loading data to the device takes a while,
>>> +             * drop the lock during this process.
>>> +             */
>>> +            qemu_mutex_unlock(&migration->load_bufs_mutex);
>>> +            wr_ret = write(migration->data_fd, buf, buf_len);
>>> +            errno_save = errno;
>>> +            qemu_mutex_lock(&migration->load_bufs_mutex);
>>> +
>>> +            if (wr_ret < 0) {
>>> +                ret = -errno_save;
>>> +                goto ret_signal;
>>> +            } else if (wr_ret < buf_len) {
>>> +                ret = -EINVAL;
>>> +                goto ret_signal;
>>> +            }
>>> +
>>> +            trace_vfio_load_state_device_buffer_load_end(vbasedev->name,
>>> +                                                         migration->load_buf_idx);
>>> +        }
>>> +
>>> +        assert(migration->load_buf_queued_pending_buffers > 0);
>>> +        migration->load_buf_queued_pending_buffers--;
>>> +
>>> +        if (migration->load_buf_idx == migration->load_buf_idx_last - 1) {
>>> +            trace_vfio_load_state_device_buffer_end(vbasedev->name);
>>> +        }
>>> +
>>> +        migration->load_buf_idx++;
>>> +    }
>>> +
>>> +    if (vfio_load_bufs_thread_want_abort(vbasedev, abort_flag)) {
>>> +        ret = -ECANCELED;
>>> +        goto ret_signal;
>>> +    }
>>> +
>>> +    ret = vfio_load_bufs_thread_load_config(vbasedev);
>>> +
>>> +ret_signal:
>>> +    migration->load_bufs_thread_running = false;
>>> +    qemu_cond_signal(&migration->load_bufs_thread_finished_cond);
>>> +
>>> +    return ret;
>>
>> Is the error reported to the migration subsytem ?
> 
> Yes, via setting "load_threads_ret" in qemu_loadvm_load_thread().
> 
>>> +}
>>> +
>>>   static int vfio_save_device_config_state(QEMUFile *f, void *opaque,
>>>                                            Error **errp)
>>>   {
>>> @@ -430,6 +726,12 @@ static bool vfio_precopy_supported(VFIODevice *vbasedev)
>>>       return migration->mig_flags & VFIO_MIGRATION_PRE_COPY;
>>>   }
>>> +static bool vfio_multifd_transfer_supported(void)
>>> +{
>>> +    return migration_has_device_state_support() &&
>>> +        migrate_send_switchover_start();
>>> +}
>>> +
>>>   /* ---------------------------------------------------------------------- */
>>>   static int vfio_save_prepare(void *opaque, Error **errp)
>>> @@ -695,17 +997,73 @@ static int vfio_load_setup(QEMUFile *f, void *opaque, Error **errp)
>>>       assert(!migration->load_setup);
>>> +    /*
>>> +     * Make a copy of this setting at the start in case it is changed
>>> +     * mid-migration.
>>> +     */
>>> +    if (vbasedev->migration_multifd_transfer == ON_OFF_AUTO_AUTO) {
>>> +        migration->multifd_transfer = vfio_multifd_transfer_supported();
>>> +    } else {
>>> +        migration->multifd_transfer =
>>> +            vbasedev->migration_multifd_transfer == ON_OFF_AUTO_ON;
>>> +    }
>>> +
>>> +    if (migration->multifd_transfer && !vfio_multifd_transfer_supported()) {
>>> +        error_setg(errp,
>>> +                   "%s: Multifd device transfer requested but unsupported in the current config",
>>> +                   vbasedev->name);
>>> +        return -EINVAL;
>>> +    }
>>
>> Can we move these checks ealier ? in vfio_migration_realize() ?
>> If possible, it would be good to avoid the multifd_transfer attribute also.
> 
> We can't since the value is changeable at runtime, so it could have been
> changed after the VFIO device got realized.

We will need discuss this part again. Let's keep it that way for now.

>>>       ret = vfio_migration_set_state(vbasedev, VFIO_DEVICE_STATE_RESUMING,
>>>                                      migration->device_state, errp);
>>>       if (ret) {
>>>           return ret;
>>>       }
>>> +    if (migration->multifd_transfer) {
>>> +        assert(!migration->load_bufs.array);
>>> +        vfio_state_buffers_init(&migration->load_bufs);
>>> +
>>> +        qemu_mutex_init(&migration->load_bufs_mutex);
>>> +
>>> +        migration->load_buf_idx = 0;
>>> +        migration->load_buf_idx_last = UINT32_MAX;
>>> +        migration->load_buf_queued_pending_buffers = 0;
>>> +        qemu_cond_init(&migration->load_bufs_buffer_ready_cond);
>>> +
>>> +        migration->load_bufs_thread_running = false;
>>> +        migration->load_bufs_thread_want_exit = false;
>>> +        qemu_cond_init(&migration->load_bufs_thread_finished_cond);
>>
>> Please provide an helper routine to initialize all the multifd transfer
>> attributes. We might want to add a struct to gather them all by the way.
> 
> Will move these to a new helper.
> 
>>> +    }
>>> +
>>>       migration->load_setup = true;
>>>       return 0;
>>>   }
>>> +static void vfio_load_cleanup_load_bufs_thread(VFIODevice *vbasedev)
>>> +{
>>> +    VFIOMigration *migration = vbasedev->migration;
>>> +
>>> +    /* The lock order is load_bufs_mutex -> BQL so unlock BQL here first */
>>> +    bql_unlock();
>>> +    WITH_QEMU_LOCK_GUARD(&migration->load_bufs_mutex) {
>>> +        if (!migration->load_bufs_thread_running) {
>>> +            break;
>>> +        }
>>> +
>>> +        migration->load_bufs_thread_want_exit = true;
>>> +
>>> +        qemu_cond_signal(&migration->load_bufs_buffer_ready_cond);
>>> +        qemu_cond_wait(&migration->load_bufs_thread_finished_cond,
>>> +                       &migration->load_bufs_mutex);
>>> +
>>> +        assert(!migration->load_bufs_thread_running);
>>> +    }
>>> +    bql_lock();
>>> +}
>>> +
>>>   static int vfio_load_cleanup(void *opaque)
>>>   {
>>>       VFIODevice *vbasedev = opaque;
>>> @@ -715,7 +1073,19 @@ static int vfio_load_cleanup(void *opaque)
>>>           return 0;
>>>       }
>>> +    if (migration->multifd_transfer) {
>>> +        vfio_load_cleanup_load_bufs_thread(vbasedev);
>>> +    }
>>> +
>>>       vfio_migration_cleanup(vbasedev);
>>
>> Why is the cleanup done in two steps ?
> 
> I'm not sure what "two steps" here refer to, but
> if you mean to move the "if (migration->multifd_transfer)"
> block below to the similar one above then it should be possible.

good. It is preferable.


Thanks,

C.




> 
>>> +
>>> +    if (migration->multifd_transfer) {
>>> +        qemu_cond_destroy(&migration->load_bufs_thread_finished_cond);
>>> +        vfio_state_buffers_destroy(&migration->load_bufs);
>>> +        qemu_cond_destroy(&migration->load_bufs_buffer_ready_cond);
>>> +        qemu_mutex_destroy(&migration->load_bufs_mutex);
>>> +    }
>>> +
>>>       migration->load_setup = false;
>>>       trace_vfio_load_cleanup(vbasedev->name);
> (..)
> 
> 
>> Thanks,
>>
>> C.
>>
> 
> Thanks,
> Maciej
>