[PATCH v2 8/8] tests/functional: add test_x86_64_tap_fd_migration

Vladimir Sementsov-Ogievskiy posted 8 patches 3 days, 1 hour ago
There is a newer version of this series
[PATCH v2 8/8] tests/functional: add test_x86_64_tap_fd_migration
Posted by Vladimir Sementsov-Ogievskiy 3 days, 1 hour ago
Add test for a new feature of local TAP migration with fd passing
through unix socket.

Signed-off-by: Vladimir Sementsov-Ogievskiy <vsementsov@yandex-team.ru>
---
 .../test_x86_64_tap_fd_migration.py           | 347 ++++++++++++++++++
 1 file changed, 347 insertions(+)
 create mode 100644 tests/functional/test_x86_64_tap_fd_migration.py

diff --git a/tests/functional/test_x86_64_tap_fd_migration.py b/tests/functional/test_x86_64_tap_fd_migration.py
new file mode 100644
index 0000000000..f6d18fe39f
--- /dev/null
+++ b/tests/functional/test_x86_64_tap_fd_migration.py
@@ -0,0 +1,347 @@
+#!/usr/bin/env python3
+#
+# Functional test that tests TAP local migration
+# with fd passing
+#
+# Copyright (c) Yandex
+#
+# SPDX-License-Identifier: GPL-2.0-or-later
+
+import os
+import time
+import subprocess
+import signal
+from typing import Tuple
+
+from qemu_test import (
+    LinuxKernelTest,
+    Asset,
+    exec_command_and_wait_for_pattern,
+)
+
+GUEST_IP = "10.0.1.2"
+GUEST_IP_MASK = f"{GUEST_IP}/24"
+GUEST_MAC = "d6:0d:75:f8:0f:b7"
+HOST_IP = "10.0.1.1"
+HOST_IP_MASK = f"{HOST_IP}/24"
+TAP_ID = "tap0"
+TAP_MAC = "e6:1d:44:b5:03:5d"
+
+
+def run(cmd: str, check: bool = True) -> None:
+    subprocess.run(cmd, check=check, shell=True)
+
+
+def fetch(cmd: str, check: bool = True) -> str:
+    return subprocess.run(
+        cmd, check=check, shell=True, stdout=subprocess.PIPE, text=True
+    ).stdout
+
+
+def del_tap() -> None:
+    run(f"ip tuntap del {TAP_ID} mode tap multi_queue", check=False)
+
+
+def init_tap() -> None:
+    run(f"ip tuntap add dev {TAP_ID} mode tap multi_queue")
+    run(f"ip link set dev {TAP_ID} address {TAP_MAC}")
+    run(f"ip addr add {HOST_IP_MASK} dev {TAP_ID}")
+    run(f"ip link set {TAP_ID} up")
+
+
+def parse_ping_line(line: str) -> float:
+    # suspect lines like
+    # [1748524876.590509] 64 bytes from 94.245.155.3 \
+    #      (94.245.155.3): icmp_seq=1 ttl=250 time=101 ms
+    spl = line.split()
+    return float(spl[0][1:-1])
+
+
+def parse_ping_output(out) -> Tuple[bool, float, float]:
+    lines = [x for x in out.split("\n") if x.startswith("[")]
+
+    try:
+        first_no_ans = next(
+            (ind for ind in range(len(lines)) if lines[ind][20:26] == "no ans")
+        )
+    except StopIteration:
+        return False, parse_ping_line(lines[0]), parse_ping_line(lines[-1])
+
+    last_no_ans = next(
+        (ind for ind in range(len(lines) - 1, -1, -1) if lines[ind][20:26] == "no ans")
+    )
+
+    return (
+        True,
+        parse_ping_line(lines[first_no_ans]),
+        parse_ping_line(lines[last_no_ans]),
+    )
+
+
+def wait_migration_finish(source_vm, target_vm):
+    migr_events = (
+        ("MIGRATION", {"data": {"status": "completed"}}),
+        ("MIGRATION", {"data": {"status": "failed"}}),
+    )
+
+    source_e = source_vm.events_wait(migr_events)["data"]
+    target_e = target_vm.events_wait(migr_events)["data"]
+
+    source_s = source_vm.cmd("query-status")["status"]
+    target_s = target_vm.cmd("query-status")["status"]
+
+    assert (
+        source_e["status"] == "completed"
+        and target_e["status"] == "completed"
+        and source_s == "postmigrate"
+        and target_s == "paused"
+    ), f"""Migration failed:
+    SRC status: {source_s}
+    SRC event: {source_e}
+    TGT status: {target_s}
+    TGT event:{target_e}"""
+
+
+class VhostUserBlkFdMigration(LinuxKernelTest):
+
+    ASSET_KERNEL = Asset(
+        (
+            "https://archives.fedoraproject.org/pub/archive/fedora/linux/releases"
+            "/31/Server/x86_64/os/images/pxeboot/vmlinuz"
+        ),
+        "d4738d03dbbe083ca610d0821d0a8f1488bebbdccef54ce33e3adb35fda00129",
+    )
+
+    ASSET_INITRD = Asset(
+        (
+            "https://archives.fedoraproject.org/pub/archive/fedora/linux/releases"
+            "/31/Server/x86_64/os/images/pxeboot/initrd.img"
+        ),
+        "277cd6c7adf77c7e63d73bbb2cded8ef9e2d3a2f100000e92ff1f8396513cd8b",
+    )
+
+    ASSET_ALPINE_ISO = Asset(
+        (
+            "https://dl-cdn.alpinelinux.org/"
+            "alpine/v3.22/releases/x86_64/alpine-standard-3.22.1-x86_64.iso"
+        ),
+        "96d1b44ea1b8a5a884f193526d92edb4676054e9fa903ad2f016441a0fe13089",
+    )
+
+    def setUp(self):
+        super().setUp()
+
+        init_tap()
+
+        self.outer_ping_proc = None
+
+    def tearDown(self):
+        del_tap()
+
+        if self.outer_ping_proc:
+            self.stop_outer_ping()
+
+        super().tearDown()
+
+    def start_outer_ping(self) -> None:
+        assert self.outer_ping_proc is None
+        self.outer_ping_log = open("/tmp/ping.log", "w")
+        self.outer_ping_proc = subprocess.Popen(
+            ["ping", "-i", "0", "-O", "-D", GUEST_IP],
+            text=True,
+            stdout=self.outer_ping_log,
+        )
+
+    def stop_outer_ping(self) -> str:
+        assert self.outer_ping_proc
+        self.outer_ping_proc.send_signal(signal.SIGINT)
+
+        self.outer_ping_proc.communicate(timeout=5)
+        self.outer_ping_proc = None
+        self.outer_ping_log.close()
+
+        # We need the start, the end and several lines around "no answer"
+        cmd = "cat /tmp/ping.log | grep -A 4 -B 4 'PING\\|packets\\|no ans'"
+        return fetch(cmd)
+
+    def stop_ping_and_check(self, stop_time, resume_time):
+        ping_res = self.stop_outer_ping()
+
+        discon, a, b = parse_ping_output(ping_res)
+
+        if not discon:
+            text = f"STOP: {stop_time}, RESUME: {resume_time}," f"PING: {a} - {b}"
+            if a > stop_time or b < resume_time:
+                self.fail(f"PING failed: {text}")
+            self.log.info(f"PING: no packets lost: {text}")
+            return
+
+        text = (
+            f"STOP: {stop_time}, RESUME: {resume_time}," f"PING: disconnect: {a} - {b}"
+        )
+        self.log.info(text)
+        eps = 0.01
+        if a < stop_time - eps or b > resume_time + eps:
+            self.fail(text)
+
+    def one_ping_from_guest(self, vm) -> None:
+        exec_command_and_wait_for_pattern(
+            self,
+            f"ping -c 1 -W 1 {HOST_IP}",
+            "1 packets transmitted, 1 packets received",
+            "1 packets transmitted, 0 packets received",
+            vm=vm,
+        )
+        self.wait_for_console_pattern("# ", vm=vm)
+
+    def one_ping_from_host(self) -> None:
+        run(f"ping -c 1 -W 1 {GUEST_IP}")
+
+    def setup_shared_memory(self):
+        shm_path = f"/dev/shm/qemu_test_{os.getpid()}"
+
+        try:
+            with open(shm_path, "wb") as f:
+                f.write(b"\0" * (1024 * 1024 * 1024))  # 1GB
+        except Exception as e:
+            self.fail(f"Failed to create shared memory file: {e}")
+
+        return shm_path
+
+    def prepare_and_launch_vm(self, shm_path, vhost, incoming=False, vm=None):
+        if not vm:
+            vm = self.vm
+
+        vm.set_console()
+        vm.add_args("-accel", "kvm")
+        vm.add_args("-device", "pcie-pci-bridge,id=pci.1,bus=pcie.0")
+        vm.add_args("-m", "1G")
+
+        vm.add_args(
+            "-object",
+            f"memory-backend-file,id=ram0,size=1G,mem-path={shm_path},share=on",
+        )
+        vm.add_args("-machine", "memory-backend=ram0")
+
+        vm.add_args(
+            "-drive", f"file={self.ASSET_ALPINE_ISO.fetch()},media=cdrom,format=raw"
+        )
+
+        vm.add_args("-S")
+
+        if incoming:
+            vm.add_args("-incoming", "defer")
+
+        vm_s = "target" if incoming else "source"
+        self.log.info(f"Launching {vm_s} VM")
+        vm.launch()
+
+        self.set_migration_capabilities(vm)
+        self.add_virtio_net(vm, vhost, incoming)
+
+    def add_virtio_net(self, vm, vhost: bool, incoming: bool = False):
+        netdev_params = {
+            "id": "netdev.1",
+            "vhost": vhost,
+            "type": "tap",
+            "ifname": "tap0",
+            "downscript": "no",
+            "queues": 4,
+        }
+
+        if incoming:
+            netdev_params["local-incoming"] = True
+        else:
+            netdev_params["script"] = "no"
+
+        vm.cmd("netdev_add", netdev_params)
+
+        vm.cmd(
+            "device_add",
+            driver="virtio-net-pci",
+            romfile="",
+            id="vnet.1",
+            netdev="netdev.1",
+            mq=True,
+            vectors=18,
+            bus="pci.1",
+            mac=GUEST_MAC,
+            disable_legacy="off",
+        )
+
+    def set_migration_capabilities(self, vm):
+        capabilities = [
+            {"capability": "events", "state": True},
+            {"capability": "x-ignore-shared", "state": True},
+            {"capability": "local-tap", "state": True},
+        ]
+        vm.cmd("migrate-set-capabilities", {"capabilities": capabilities})
+
+    def setup_guest_network(self) -> None:
+        exec_command_and_wait_for_pattern(self, "ip addr", "# ")
+        exec_command_and_wait_for_pattern(
+            self,
+            f"ip addr add {GUEST_IP_MASK} dev eth0 && ip link set eth0 up && echo OK",
+            "OK",
+        )
+        self.wait_for_console_pattern("# ")
+
+    def do_test_tap_fd_migration(self, vhost):
+        self.require_accelerator("kvm")
+        self.set_machine("q35")
+
+        socket_dir = self.socket_dir()
+        migration_socket = os.path.join(socket_dir.name, "migration.sock")
+
+        shm_path = self.setup_shared_memory()
+
+        self.prepare_and_launch_vm(shm_path, vhost)
+        self.vm.cmd("cont")
+        self.wait_for_console_pattern("login:")
+        exec_command_and_wait_for_pattern(self, "root", "# ")
+
+        self.setup_guest_network()
+
+        self.one_ping_from_guest(self.vm)
+        self.one_ping_from_host()
+        self.start_outer_ping()
+
+        # Get some successful pings before migration
+        time.sleep(0.5)
+
+        target_vm = self.get_vm(name="target")
+        self.prepare_and_launch_vm(shm_path, vhost, incoming=True, vm=target_vm)
+
+        target_vm.cmd("migrate-incoming", {"uri": f"unix:{migration_socket}"})
+
+        self.log.info("Starting migration")
+        freeze_start = time.time()
+        self.vm.cmd("migrate", {"uri": f"unix:{migration_socket}"})
+
+        self.log.info("Waiting for migration completion")
+        wait_migration_finish(self.vm, target_vm)
+
+        target_vm.cmd("cont")
+        freeze_end = time.time()
+
+        self.vm.shutdown()
+
+        self.log.info("Verifying PING on target VM after migration")
+        self.one_ping_from_guest(target_vm)
+        self.one_ping_from_host()
+
+        # And a bit more pings after source shutdown
+        time.sleep(0.3)
+        self.stop_ping_and_check(freeze_start, freeze_end)
+
+        target_vm.shutdown()
+
+    def test_tap_fd_migration(self):
+        self.do_test_tap_fd_migration(False)
+
+    def test_tap_fd_migration_vhost(self):
+        self.do_test_tap_fd_migration(True)
+
+
+if __name__ == "__main__":
+    LinuxKernelTest.main()
-- 
2.48.1
Re: [PATCH v2 8/8] tests/functional: add test_x86_64_tap_fd_migration
Posted by Daniel P. Berrangé 2 days, 23 hours ago
On Wed, Sep 03, 2025 at 04:37:05PM +0300, Vladimir Sementsov-Ogievskiy wrote:
> Add test for a new feature of local TAP migration with fd passing
> through unix socket.
> 
> Signed-off-by: Vladimir Sementsov-Ogievskiy <vsementsov@yandex-team.ru>
> ---
>  .../test_x86_64_tap_fd_migration.py           | 347 ++++++++++++++++++
>  1 file changed, 347 insertions(+)
>  create mode 100644 tests/functional/test_x86_64_tap_fd_migration.py
> 
> diff --git a/tests/functional/test_x86_64_tap_fd_migration.py b/tests/functional/test_x86_64_tap_fd_migration.py
> new file mode 100644
> index 0000000000..f6d18fe39f
> --- /dev/null
> +++ b/tests/functional/test_x86_64_tap_fd_migration.py
> @@ -0,0 +1,347 @@
> +#!/usr/bin/env python3
> +#
> +# Functional test that tests TAP local migration
> +# with fd passing
> +#
> +# Copyright (c) Yandex
> +#
> +# SPDX-License-Identifier: GPL-2.0-or-later
> +
> +import os
> +import time
> +import subprocess
> +import signal
> +from typing import Tuple
> +
> +from qemu_test import (
> +    LinuxKernelTest,
> +    Asset,
> +    exec_command_and_wait_for_pattern,
> +)
> +
> +GUEST_IP = "10.0.1.2"
> +GUEST_IP_MASK = f"{GUEST_IP}/24"
> +GUEST_MAC = "d6:0d:75:f8:0f:b7"
> +HOST_IP = "10.0.1.1"
> +HOST_IP_MASK = f"{HOST_IP}/24"
> +TAP_ID = "tap0"
> +TAP_MAC = "e6:1d:44:b5:03:5d"
> +
> +
> +def run(cmd: str, check: bool = True) -> None:
> +    subprocess.run(cmd, check=check, shell=True)

I don't see need to be using shell here - just use
"check_call()" and pass the argv as an array instead
of a string at which point this 'run' helper can be
removed.

> +
> +
> +def fetch(cmd: str, check: bool = True) -> str:
> +    return subprocess.run(
> +        cmd, check=check, shell=True, stdout=subprocess.PIPE, text=True
> +    ).stdout
> +
> +
> +def del_tap() -> None:
> +    run(f"ip tuntap del {TAP_ID} mode tap multi_queue", check=False)
> +
> +
> +def init_tap() -> None:
> +    run(f"ip tuntap add dev {TAP_ID} mode tap multi_queue")
> +    run(f"ip link set dev {TAP_ID} address {TAP_MAC}")
> +    run(f"ip addr add {HOST_IP_MASK} dev {TAP_ID}")
> +    run(f"ip link set {TAP_ID} up")

$ ip tuntap add dev foo mode tap multi_queue
ioctl(TUNSETIFF): Operation not permitted


The functional tests run as the developer's normal unprivileged user
account, so it doesn't look like this can work ?

Were you testing this as root ?

> +
> +
> +def parse_ping_line(line: str) -> float:
> +    # suspect lines like
> +    # [1748524876.590509] 64 bytes from 94.245.155.3 \
> +    #      (94.245.155.3): icmp_seq=1 ttl=250 time=101 ms
> +    spl = line.split()
> +    return float(spl[0][1:-1])
> +
> +
> +def parse_ping_output(out) -> Tuple[bool, float, float]:
> +    lines = [x for x in out.split("\n") if x.startswith("[")]
> +
> +    try:
> +        first_no_ans = next(
> +            (ind for ind in range(len(lines)) if lines[ind][20:26] == "no ans")
> +        )
> +    except StopIteration:
> +        return False, parse_ping_line(lines[0]), parse_ping_line(lines[-1])
> +
> +    last_no_ans = next(
> +        (ind for ind in range(len(lines) - 1, -1, -1) if lines[ind][20:26] == "no ans")
> +    )
> +
> +    return (
> +        True,
> +        parse_ping_line(lines[first_no_ans]),
> +        parse_ping_line(lines[last_no_ans]),
> +    )
> +
> +
> +def wait_migration_finish(source_vm, target_vm):
> +    migr_events = (
> +        ("MIGRATION", {"data": {"status": "completed"}}),
> +        ("MIGRATION", {"data": {"status": "failed"}}),
> +    )
> +
> +    source_e = source_vm.events_wait(migr_events)["data"]
> +    target_e = target_vm.events_wait(migr_events)["data"]
> +
> +    source_s = source_vm.cmd("query-status")["status"]
> +    target_s = target_vm.cmd("query-status")["status"]
> +
> +    assert (
> +        source_e["status"] == "completed"
> +        and target_e["status"] == "completed"
> +        and source_s == "postmigrate"
> +        and target_s == "paused"
> +    ), f"""Migration failed:
> +    SRC status: {source_s}
> +    SRC event: {source_e}
> +    TGT status: {target_s}
> +    TGT event:{target_e}"""
> +
> +
> +class VhostUserBlkFdMigration(LinuxKernelTest):
> +
> +    ASSET_KERNEL = Asset(
> +        (
> +            "https://archives.fedoraproject.org/pub/archive/fedora/linux/releases"
> +            "/31/Server/x86_64/os/images/pxeboot/vmlinuz"
> +        ),
> +        "d4738d03dbbe083ca610d0821d0a8f1488bebbdccef54ce33e3adb35fda00129",
> +    )
> +
> +    ASSET_INITRD = Asset(
> +        (
> +            "https://archives.fedoraproject.org/pub/archive/fedora/linux/releases"
> +            "/31/Server/x86_64/os/images/pxeboot/initrd.img"
> +        ),
> +        "277cd6c7adf77c7e63d73bbb2cded8ef9e2d3a2f100000e92ff1f8396513cd8b",
> +    )
> +
> +    ASSET_ALPINE_ISO = Asset(
> +        (
> +            "https://dl-cdn.alpinelinux.org/"
> +            "alpine/v3.22/releases/x86_64/alpine-standard-3.22.1-x86_64.iso"
> +        ),
> +        "96d1b44ea1b8a5a884f193526d92edb4676054e9fa903ad2f016441a0fe13089",
> +    )
> +
> +    def setUp(self):
> +        super().setUp()
> +
> +        init_tap()
> +
> +        self.outer_ping_proc = None
> +
> +    def tearDown(self):
> +        del_tap()
> +
> +        if self.outer_ping_proc:
> +            self.stop_outer_ping()
> +
> +        super().tearDown()
> +
> +    def start_outer_ping(self) -> None:
> +        assert self.outer_ping_proc is None
> +        self.outer_ping_log = open("/tmp/ping.log", "w")
> +        self.outer_ping_proc = subprocess.Popen(
> +            ["ping", "-i", "0", "-O", "-D", GUEST_IP],
> +            text=True,
> +            stdout=self.outer_ping_log,
> +        )

Surely outer_ping_log can be closed immediately as the child
process will keep the FD open ?

> +
> +    def stop_outer_ping(self) -> str:
> +        assert self.outer_ping_proc
> +        self.outer_ping_proc.send_signal(signal.SIGINT)
> +
> +        self.outer_ping_proc.communicate(timeout=5)
> +        self.outer_ping_proc = None
> +        self.outer_ping_log.close()
> +
> +        # We need the start, the end and several lines around "no answer"
> +        cmd = "cat /tmp/ping.log | grep -A 4 -B 4 'PING\\|packets\\|no ans'"
> +        return fetch(cmd)

IMHO this can just read the whole of /tmp/ping.log directly into memory
and then the parse_ping_output can jjust match on it with a regex,
avoiding the pre-processing with grep.


> +
> +    def stop_ping_and_check(self, stop_time, resume_time):
> +        ping_res = self.stop_outer_ping()
> +
> +        discon, a, b = parse_ping_output(ping_res)
> +
> +        if not discon:
> +            text = f"STOP: {stop_time}, RESUME: {resume_time}," f"PING: {a} - {b}"
> +            if a > stop_time or b < resume_time:
> +                self.fail(f"PING failed: {text}")
> +            self.log.info(f"PING: no packets lost: {text}")
> +            return
> +
> +        text = (
> +            f"STOP: {stop_time}, RESUME: {resume_time}," f"PING: disconnect: {a} - {b}"
> +        )
> +        self.log.info(text)
> +        eps = 0.01
> +        if a < stop_time - eps or b > resume_time + eps:
> +            self.fail(text)
> +
> +    def one_ping_from_guest(self, vm) -> None:
> +        exec_command_and_wait_for_pattern(
> +            self,
> +            f"ping -c 1 -W 1 {HOST_IP}",
> +            "1 packets transmitted, 1 packets received",
> +            "1 packets transmitted, 0 packets received",
> +            vm=vm,
> +        )
> +        self.wait_for_console_pattern("# ", vm=vm)
> +
> +    def one_ping_from_host(self) -> None:
> +        run(f"ping -c 1 -W 1 {GUEST_IP}")
> +
> +    def setup_shared_memory(self):
> +        shm_path = f"/dev/shm/qemu_test_{os.getpid()}"
> +
> +        try:
> +            with open(shm_path, "wb") as f:
> +                f.write(b"\0" * (1024 * 1024 * 1024))  # 1GB
> +        except Exception as e:
> +            self.fail(f"Failed to create shared memory file: {e}")
> +
> +        return shm_path
> +
> +    def prepare_and_launch_vm(self, shm_path, vhost, incoming=False, vm=None):
> +        if not vm:
> +            vm = self.vm
> +
> +        vm.set_console()
> +        vm.add_args("-accel", "kvm")
> +        vm.add_args("-device", "pcie-pci-bridge,id=pci.1,bus=pcie.0")
> +        vm.add_args("-m", "1G")
> +
> +        vm.add_args(
> +            "-object",
> +            f"memory-backend-file,id=ram0,size=1G,mem-path={shm_path},share=on",
> +        )
> +        vm.add_args("-machine", "memory-backend=ram0")
> +
> +        vm.add_args(
> +            "-drive", f"file={self.ASSET_ALPINE_ISO.fetch()},media=cdrom,format=raw"
> +        )
> +
> +        vm.add_args("-S")
> +
> +        if incoming:
> +            vm.add_args("-incoming", "defer")
> +
> +        vm_s = "target" if incoming else "source"
> +        self.log.info(f"Launching {vm_s} VM")
> +        vm.launch()
> +
> +        self.set_migration_capabilities(vm)
> +        self.add_virtio_net(vm, vhost, incoming)
> +
> +    def add_virtio_net(self, vm, vhost: bool, incoming: bool = False):
> +        netdev_params = {
> +            "id": "netdev.1",
> +            "vhost": vhost,
> +            "type": "tap",
> +            "ifname": "tap0",
> +            "downscript": "no",
> +            "queues": 4,
> +        }
> +
> +        if incoming:
> +            netdev_params["local-incoming"] = True
> +        else:
> +            netdev_params["script"] = "no"
> +
> +        vm.cmd("netdev_add", netdev_params)
> +
> +        vm.cmd(
> +            "device_add",
> +            driver="virtio-net-pci",
> +            romfile="",
> +            id="vnet.1",
> +            netdev="netdev.1",
> +            mq=True,
> +            vectors=18,
> +            bus="pci.1",
> +            mac=GUEST_MAC,
> +            disable_legacy="off",
> +        )
> +
> +    def set_migration_capabilities(self, vm):
> +        capabilities = [
> +            {"capability": "events", "state": True},
> +            {"capability": "x-ignore-shared", "state": True},
> +            {"capability": "local-tap", "state": True},
> +        ]
> +        vm.cmd("migrate-set-capabilities", {"capabilities": capabilities})
> +
> +    def setup_guest_network(self) -> None:
> +        exec_command_and_wait_for_pattern(self, "ip addr", "# ")
> +        exec_command_and_wait_for_pattern(
> +            self,
> +            f"ip addr add {GUEST_IP_MASK} dev eth0 && ip link set eth0 up && echo OK",
> +            "OK",
> +        )
> +        self.wait_for_console_pattern("# ")
> +
> +    def do_test_tap_fd_migration(self, vhost):
> +        self.require_accelerator("kvm")
> +        self.set_machine("q35")
> +
> +        socket_dir = self.socket_dir()
> +        migration_socket = os.path.join(socket_dir.name, "migration.sock")
> +
> +        shm_path = self.setup_shared_memory()
> +
> +        self.prepare_and_launch_vm(shm_path, vhost)
> +        self.vm.cmd("cont")
> +        self.wait_for_console_pattern("login:")
> +        exec_command_and_wait_for_pattern(self, "root", "# ")
> +
> +        self.setup_guest_network()
> +
> +        self.one_ping_from_guest(self.vm)
> +        self.one_ping_from_host()
> +        self.start_outer_ping()
> +
> +        # Get some successful pings before migration
> +        time.sleep(0.5)
> +
> +        target_vm = self.get_vm(name="target")
> +        self.prepare_and_launch_vm(shm_path, vhost, incoming=True, vm=target_vm)
> +
> +        target_vm.cmd("migrate-incoming", {"uri": f"unix:{migration_socket}"})
> +
> +        self.log.info("Starting migration")
> +        freeze_start = time.time()
> +        self.vm.cmd("migrate", {"uri": f"unix:{migration_socket}"})
> +
> +        self.log.info("Waiting for migration completion")
> +        wait_migration_finish(self.vm, target_vm)
> +
> +        target_vm.cmd("cont")
> +        freeze_end = time.time()
> +
> +        self.vm.shutdown()
> +
> +        self.log.info("Verifying PING on target VM after migration")
> +        self.one_ping_from_guest(target_vm)
> +        self.one_ping_from_host()
> +
> +        # And a bit more pings after source shutdown
> +        time.sleep(0.3)
> +        self.stop_ping_and_check(freeze_start, freeze_end)
> +
> +        target_vm.shutdown()
> +
> +    def test_tap_fd_migration(self):
> +        self.do_test_tap_fd_migration(False)
> +
> +    def test_tap_fd_migration_vhost(self):
> +        self.do_test_tap_fd_migration(True)
> +
> +
> +if __name__ == "__main__":
> +    LinuxKernelTest.main()
> -- 
> 2.48.1
> 

With regards,
Daniel
-- 
|: https://berrange.com      -o-    https://www.flickr.com/photos/dberrange :|
|: https://libvirt.org         -o-            https://fstop138.berrange.com :|
|: https://entangle-photo.org    -o-    https://www.instagram.com/dberrange :|
Re: [PATCH v2 8/8] tests/functional: add test_x86_64_tap_fd_migration
Posted by Vladimir Sementsov-Ogievskiy 2 days, 23 hours ago
On 03.09.25 17:47, Daniel P. Berrangé wrote:
> On Wed, Sep 03, 2025 at 04:37:05PM +0300, Vladimir Sementsov-Ogievskiy wrote:
>> Add test for a new feature of local TAP migration with fd passing
>> through unix socket.
>>
>> Signed-off-by: Vladimir Sementsov-Ogievskiy <vsementsov@yandex-team.ru>
>> ---
>>   .../test_x86_64_tap_fd_migration.py           | 347 ++++++++++++++++++
>>   1 file changed, 347 insertions(+)
>>   create mode 100644 tests/functional/test_x86_64_tap_fd_migration.py
>>
>> diff --git a/tests/functional/test_x86_64_tap_fd_migration.py b/tests/functional/test_x86_64_tap_fd_migration.py
>> new file mode 100644
>> index 0000000000..f6d18fe39f
>> --- /dev/null
>> +++ b/tests/functional/test_x86_64_tap_fd_migration.py
>> @@ -0,0 +1,347 @@
>> +#!/usr/bin/env python3
>> +#
>> +# Functional test that tests TAP local migration
>> +# with fd passing
>> +#
>> +# Copyright (c) Yandex
>> +#
>> +# SPDX-License-Identifier: GPL-2.0-or-later
>> +
>> +import os
>> +import time
>> +import subprocess
>> +import signal
>> +from typing import Tuple
>> +
>> +from qemu_test import (
>> +    LinuxKernelTest,
>> +    Asset,
>> +    exec_command_and_wait_for_pattern,
>> +)
>> +
>> +GUEST_IP = "10.0.1.2"
>> +GUEST_IP_MASK = f"{GUEST_IP}/24"
>> +GUEST_MAC = "d6:0d:75:f8:0f:b7"
>> +HOST_IP = "10.0.1.1"
>> +HOST_IP_MASK = f"{HOST_IP}/24"
>> +TAP_ID = "tap0"
>> +TAP_MAC = "e6:1d:44:b5:03:5d"
>> +
>> +
>> +def run(cmd: str, check: bool = True) -> None:
>> +    subprocess.run(cmd, check=check, shell=True)
> 
> I don't see need to be using shell here - just use
> "check_call()" and pass the argv as an array instead
> of a string at which point this 'run' helper can be
> removed.

I just like how commands look in one string more than array. But array is probably better choice for production. Not problem to change, if we I understand what to do with access denied)

> 
>> +
>> +
>> +def fetch(cmd: str, check: bool = True) -> str:
>> +    return subprocess.run(
>> +        cmd, check=check, shell=True, stdout=subprocess.PIPE, text=True
>> +    ).stdout
>> +
>> +
>> +def del_tap() -> None:
>> +    run(f"ip tuntap del {TAP_ID} mode tap multi_queue", check=False)
>> +
>> +
>> +def init_tap() -> None:
>> +    run(f"ip tuntap add dev {TAP_ID} mode tap multi_queue")
>> +    run(f"ip link set dev {TAP_ID} address {TAP_MAC}")
>> +    run(f"ip addr add {HOST_IP_MASK} dev {TAP_ID}")
>> +    run(f"ip link set {TAP_ID} up")
> 
> $ ip tuntap add dev foo mode tap multi_queue
> ioctl(TUNSETIFF): Operation not permitted
> 
> 
> The functional tests run as the developer's normal unprivileged user
> account, so it doesn't look like this can work ?
> 
> Were you testing this as root ?

Yes I run the test with sudo..

Do we have any tests in QEMU that requires root? Or anyway to do this for functional tests? I'm afraid there is no way to setup TAP interface in unprivileged user account :(

> 
>> +
>> +
>> +def parse_ping_line(line: str) -> float:
>> +    # suspect lines like
>> +    # [1748524876.590509] 64 bytes from 94.245.155.3 \
>> +    #      (94.245.155.3): icmp_seq=1 ttl=250 time=101 ms
>> +    spl = line.split()
>> +    return float(spl[0][1:-1])
>> +
>> +
>> +def parse_ping_output(out) -> Tuple[bool, float, float]:
>> +    lines = [x for x in out.split("\n") if x.startswith("[")]
>> +
>> +    try:
>> +        first_no_ans = next(
>> +            (ind for ind in range(len(lines)) if lines[ind][20:26] == "no ans")
>> +        )
>> +    except StopIteration:
>> +        return False, parse_ping_line(lines[0]), parse_ping_line(lines[-1])
>> +
>> +    last_no_ans = next(
>> +        (ind for ind in range(len(lines) - 1, -1, -1) if lines[ind][20:26] == "no ans")
>> +    )
>> +
>> +    return (
>> +        True,
>> +        parse_ping_line(lines[first_no_ans]),
>> +        parse_ping_line(lines[last_no_ans]),
>> +    )
>> +
>> +
>> +def wait_migration_finish(source_vm, target_vm):
>> +    migr_events = (
>> +        ("MIGRATION", {"data": {"status": "completed"}}),
>> +        ("MIGRATION", {"data": {"status": "failed"}}),
>> +    )
>> +
>> +    source_e = source_vm.events_wait(migr_events)["data"]
>> +    target_e = target_vm.events_wait(migr_events)["data"]
>> +
>> +    source_s = source_vm.cmd("query-status")["status"]
>> +    target_s = target_vm.cmd("query-status")["status"]
>> +
>> +    assert (
>> +        source_e["status"] == "completed"
>> +        and target_e["status"] == "completed"
>> +        and source_s == "postmigrate"
>> +        and target_s == "paused"
>> +    ), f"""Migration failed:
>> +    SRC status: {source_s}
>> +    SRC event: {source_e}
>> +    TGT status: {target_s}
>> +    TGT event:{target_e}"""
>> +
>> +
>> +class VhostUserBlkFdMigration(LinuxKernelTest):
>> +
>> +    ASSET_KERNEL = Asset(
>> +        (
>> +            "https://archives.fedoraproject.org/pub/archive/fedora/linux/releases"
>> +            "/31/Server/x86_64/os/images/pxeboot/vmlinuz"
>> +        ),
>> +        "d4738d03dbbe083ca610d0821d0a8f1488bebbdccef54ce33e3adb35fda00129",
>> +    )
>> +
>> +    ASSET_INITRD = Asset(
>> +        (
>> +            "https://archives.fedoraproject.org/pub/archive/fedora/linux/releases"
>> +            "/31/Server/x86_64/os/images/pxeboot/initrd.img"
>> +        ),
>> +        "277cd6c7adf77c7e63d73bbb2cded8ef9e2d3a2f100000e92ff1f8396513cd8b",
>> +    )
>> +
>> +    ASSET_ALPINE_ISO = Asset(
>> +        (
>> +            "https://dl-cdn.alpinelinux.org/"
>> +            "alpine/v3.22/releases/x86_64/alpine-standard-3.22.1-x86_64.iso"
>> +        ),
>> +        "96d1b44ea1b8a5a884f193526d92edb4676054e9fa903ad2f016441a0fe13089",
>> +    )
>> +
>> +    def setUp(self):
>> +        super().setUp()
>> +
>> +        init_tap()
>> +
>> +        self.outer_ping_proc = None
>> +
>> +    def tearDown(self):
>> +        del_tap()
>> +
>> +        if self.outer_ping_proc:
>> +            self.stop_outer_ping()
>> +
>> +        super().tearDown()
>> +
>> +    def start_outer_ping(self) -> None:
>> +        assert self.outer_ping_proc is None
>> +        self.outer_ping_log = open("/tmp/ping.log", "w")
>> +        self.outer_ping_proc = subprocess.Popen(
>> +            ["ping", "-i", "0", "-O", "-D", GUEST_IP],
>> +            text=True,
>> +            stdout=self.outer_ping_log,
>> +        )
> 
> Surely outer_ping_log can be closed immediately as the child
> process will keep the FD open ?

Will try.

> 
>> +
>> +    def stop_outer_ping(self) -> str:
>> +        assert self.outer_ping_proc
>> +        self.outer_ping_proc.send_signal(signal.SIGINT)
>> +
>> +        self.outer_ping_proc.communicate(timeout=5)
>> +        self.outer_ping_proc = None
>> +        self.outer_ping_log.close()
>> +
>> +        # We need the start, the end and several lines around "no answer"
>> +        cmd = "cat /tmp/ping.log | grep -A 4 -B 4 'PING\\|packets\\|no ans'"
>> +        return fetch(cmd)
> 
> IMHO this can just read the whole of /tmp/ping.log directly into memory
> and then the parse_ping_output can jjust match on it with a regex,
> avoiding the pre-processing with grep.
> 

Agree, that would be better. Will do.

> 
>> +
>> +    def stop_ping_and_check(self, stop_time, resume_time):
>> +        ping_res = self.stop_outer_ping()
>> +
>> +        discon, a, b = parse_ping_output(ping_res)
>> +
>> +        if not discon:
>> +            text = f"STOP: {stop_time}, RESUME: {resume_time}," f"PING: {a} - {b}"
>> +            if a > stop_time or b < resume_time:
>> +                self.fail(f"PING failed: {text}")
>> +            self.log.info(f"PING: no packets lost: {text}")
>> +            return
>> +
>> +        text = (
>> +            f"STOP: {stop_time}, RESUME: {resume_time}," f"PING: disconnect: {a} - {b}"
>> +        )
>> +        self.log.info(text)
>> +        eps = 0.01
>> +        if a < stop_time - eps or b > resume_time + eps:
>> +            self.fail(text)
>> +
>> +    def one_ping_from_guest(self, vm) -> None:
>> +        exec_command_and_wait_for_pattern(
>> +            self,
>> +            f"ping -c 1 -W 1 {HOST_IP}",
>> +            "1 packets transmitted, 1 packets received",
>> +            "1 packets transmitted, 0 packets received",
>> +            vm=vm,
>> +        )
>> +        self.wait_for_console_pattern("# ", vm=vm)
>> +
>> +    def one_ping_from_host(self) -> None:
>> +        run(f"ping -c 1 -W 1 {GUEST_IP}")
>> +
>> +    def setup_shared_memory(self):
>> +        shm_path = f"/dev/shm/qemu_test_{os.getpid()}"
>> +
>> +        try:
>> +            with open(shm_path, "wb") as f:
>> +                f.write(b"\0" * (1024 * 1024 * 1024))  # 1GB
>> +        except Exception as e:
>> +            self.fail(f"Failed to create shared memory file: {e}")
>> +
>> +        return shm_path
>> +
>> +    def prepare_and_launch_vm(self, shm_path, vhost, incoming=False, vm=None):
>> +        if not vm:
>> +            vm = self.vm
>> +
>> +        vm.set_console()
>> +        vm.add_args("-accel", "kvm")
>> +        vm.add_args("-device", "pcie-pci-bridge,id=pci.1,bus=pcie.0")
>> +        vm.add_args("-m", "1G")
>> +
>> +        vm.add_args(
>> +            "-object",
>> +            f"memory-backend-file,id=ram0,size=1G,mem-path={shm_path},share=on",
>> +        )
>> +        vm.add_args("-machine", "memory-backend=ram0")
>> +
>> +        vm.add_args(
>> +            "-drive", f"file={self.ASSET_ALPINE_ISO.fetch()},media=cdrom,format=raw"
>> +        )
>> +
>> +        vm.add_args("-S")
>> +
>> +        if incoming:
>> +            vm.add_args("-incoming", "defer")
>> +
>> +        vm_s = "target" if incoming else "source"
>> +        self.log.info(f"Launching {vm_s} VM")
>> +        vm.launch()
>> +
>> +        self.set_migration_capabilities(vm)
>> +        self.add_virtio_net(vm, vhost, incoming)
>> +
>> +    def add_virtio_net(self, vm, vhost: bool, incoming: bool = False):
>> +        netdev_params = {
>> +            "id": "netdev.1",
>> +            "vhost": vhost,
>> +            "type": "tap",
>> +            "ifname": "tap0",
>> +            "downscript": "no",
>> +            "queues": 4,
>> +        }
>> +
>> +        if incoming:
>> +            netdev_params["local-incoming"] = True
>> +        else:
>> +            netdev_params["script"] = "no"
>> +
>> +        vm.cmd("netdev_add", netdev_params)
>> +
>> +        vm.cmd(
>> +            "device_add",
>> +            driver="virtio-net-pci",
>> +            romfile="",
>> +            id="vnet.1",
>> +            netdev="netdev.1",
>> +            mq=True,
>> +            vectors=18,
>> +            bus="pci.1",
>> +            mac=GUEST_MAC,
>> +            disable_legacy="off",
>> +        )
>> +
>> +    def set_migration_capabilities(self, vm):
>> +        capabilities = [
>> +            {"capability": "events", "state": True},
>> +            {"capability": "x-ignore-shared", "state": True},
>> +            {"capability": "local-tap", "state": True},
>> +        ]
>> +        vm.cmd("migrate-set-capabilities", {"capabilities": capabilities})
>> +
>> +    def setup_guest_network(self) -> None:
>> +        exec_command_and_wait_for_pattern(self, "ip addr", "# ")
>> +        exec_command_and_wait_for_pattern(
>> +            self,
>> +            f"ip addr add {GUEST_IP_MASK} dev eth0 && ip link set eth0 up && echo OK",
>> +            "OK",
>> +        )
>> +        self.wait_for_console_pattern("# ")
>> +
>> +    def do_test_tap_fd_migration(self, vhost):
>> +        self.require_accelerator("kvm")
>> +        self.set_machine("q35")
>> +
>> +        socket_dir = self.socket_dir()
>> +        migration_socket = os.path.join(socket_dir.name, "migration.sock")
>> +
>> +        shm_path = self.setup_shared_memory()
>> +
>> +        self.prepare_and_launch_vm(shm_path, vhost)
>> +        self.vm.cmd("cont")
>> +        self.wait_for_console_pattern("login:")
>> +        exec_command_and_wait_for_pattern(self, "root", "# ")
>> +
>> +        self.setup_guest_network()
>> +
>> +        self.one_ping_from_guest(self.vm)
>> +        self.one_ping_from_host()
>> +        self.start_outer_ping()
>> +
>> +        # Get some successful pings before migration
>> +        time.sleep(0.5)
>> +
>> +        target_vm = self.get_vm(name="target")
>> +        self.prepare_and_launch_vm(shm_path, vhost, incoming=True, vm=target_vm)
>> +
>> +        target_vm.cmd("migrate-incoming", {"uri": f"unix:{migration_socket}"})
>> +
>> +        self.log.info("Starting migration")
>> +        freeze_start = time.time()
>> +        self.vm.cmd("migrate", {"uri": f"unix:{migration_socket}"})
>> +
>> +        self.log.info("Waiting for migration completion")
>> +        wait_migration_finish(self.vm, target_vm)
>> +
>> +        target_vm.cmd("cont")
>> +        freeze_end = time.time()
>> +
>> +        self.vm.shutdown()
>> +
>> +        self.log.info("Verifying PING on target VM after migration")
>> +        self.one_ping_from_guest(target_vm)
>> +        self.one_ping_from_host()
>> +
>> +        # And a bit more pings after source shutdown
>> +        time.sleep(0.3)
>> +        self.stop_ping_and_check(freeze_start, freeze_end)
>> +
>> +        target_vm.shutdown()
>> +
>> +    def test_tap_fd_migration(self):
>> +        self.do_test_tap_fd_migration(False)
>> +
>> +    def test_tap_fd_migration_vhost(self):
>> +        self.do_test_tap_fd_migration(True)
>> +
>> +
>> +if __name__ == "__main__":
>> +    LinuxKernelTest.main()
>> -- 
>> 2.48.1
>>
> 
> With regards,
> Daniel


-- 
Best regards,
Vladimir

Re: [PATCH v2 8/8] tests/functional: add test_x86_64_tap_fd_migration
Posted by Daniel P. Berrangé 2 days, 23 hours ago
On Wed, Sep 03, 2025 at 06:14:36PM +0300, Vladimir Sementsov-Ogievskiy wrote:
> On 03.09.25 17:47, Daniel P. Berrangé wrote:
> > On Wed, Sep 03, 2025 at 04:37:05PM +0300, Vladimir Sementsov-Ogievskiy wrote:
> > > Add test for a new feature of local TAP migration with fd passing
> > > through unix socket.
> > > 
> > > Signed-off-by: Vladimir Sementsov-Ogievskiy <vsementsov@yandex-team.ru>
> > > ---
> > >   .../test_x86_64_tap_fd_migration.py           | 347 ++++++++++++++++++
> > >   1 file changed, 347 insertions(+)
> > >   create mode 100644 tests/functional/test_x86_64_tap_fd_migration.py
> > > 
> > > diff --git a/tests/functional/test_x86_64_tap_fd_migration.py b/tests/functional/test_x86_64_tap_fd_migration.py
> > > new file mode 100644
> > > index 0000000000..f6d18fe39f
> > > --- /dev/null
> > > +++ b/tests/functional/test_x86_64_tap_fd_migration.py


> > > +def init_tap() -> None:
> > > +    run(f"ip tuntap add dev {TAP_ID} mode tap multi_queue")
> > > +    run(f"ip link set dev {TAP_ID} address {TAP_MAC}")
> > > +    run(f"ip addr add {HOST_IP_MASK} dev {TAP_ID}")
> > > +    run(f"ip link set {TAP_ID} up")
> > 
> > $ ip tuntap add dev foo mode tap multi_queue
> > ioctl(TUNSETIFF): Operation not permitted
> > 
> > 
> > The functional tests run as the developer's normal unprivileged user
> > account, so it doesn't look like this can work ?
> > 
> > Were you testing this as root ?
> 
> Yes I run the test with sudo..
> 
> Do we have any tests in QEMU that requires root? Or anyway to do this for functional tests? I'm afraid there is no way to setup TAP interface in unprivileged user account :(

There are a variety of iotests that include calls to sudo.

These automatically mark themselves as skipped if
password-less sudo fails to work. So in practice the tests
will rarely get run, but its better than nothing I guess.

We should add a skipUnlesPasswordlessSudo() method to the
tests/functional/qemu_test/decorators.py class, and you
then then annotate this test with that. Then just add sudo
to the "ip" calls.

With regards,
Daniel
-- 
|: https://berrange.com      -o-    https://www.flickr.com/photos/dberrange :|
|: https://libvirt.org         -o-            https://fstop138.berrange.com :|
|: https://entangle-photo.org    -o-    https://www.instagram.com/dberrange :|


Re: [PATCH v2 8/8] tests/functional: add test_x86_64_tap_fd_migration
Posted by Vladimir Sementsov-Ogievskiy 2 days, 23 hours ago
On 03.09.25 18:19, Daniel P. Berrangé wrote:
> On Wed, Sep 03, 2025 at 06:14:36PM +0300, Vladimir Sementsov-Ogievskiy wrote:
>> On 03.09.25 17:47, Daniel P. Berrangé wrote:
>>> On Wed, Sep 03, 2025 at 04:37:05PM +0300, Vladimir Sementsov-Ogievskiy wrote:
>>>> Add test for a new feature of local TAP migration with fd passing
>>>> through unix socket.
>>>>
>>>> Signed-off-by: Vladimir Sementsov-Ogievskiy <vsementsov@yandex-team.ru>
>>>> ---
>>>>    .../test_x86_64_tap_fd_migration.py           | 347 ++++++++++++++++++
>>>>    1 file changed, 347 insertions(+)
>>>>    create mode 100644 tests/functional/test_x86_64_tap_fd_migration.py
>>>>
>>>> diff --git a/tests/functional/test_x86_64_tap_fd_migration.py b/tests/functional/test_x86_64_tap_fd_migration.py
>>>> new file mode 100644
>>>> index 0000000000..f6d18fe39f
>>>> --- /dev/null
>>>> +++ b/tests/functional/test_x86_64_tap_fd_migration.py
> 
> 
>>>> +def init_tap() -> None:
>>>> +    run(f"ip tuntap add dev {TAP_ID} mode tap multi_queue")
>>>> +    run(f"ip link set dev {TAP_ID} address {TAP_MAC}")
>>>> +    run(f"ip addr add {HOST_IP_MASK} dev {TAP_ID}")
>>>> +    run(f"ip link set {TAP_ID} up")
>>>
>>> $ ip tuntap add dev foo mode tap multi_queue
>>> ioctl(TUNSETIFF): Operation not permitted
>>>
>>>
>>> The functional tests run as the developer's normal unprivileged user
>>> account, so it doesn't look like this can work ?
>>>
>>> Were you testing this as root ?
>>
>> Yes I run the test with sudo..
>>
>> Do we have any tests in QEMU that requires root? Or anyway to do this for functional tests? I'm afraid there is no way to setup TAP interface in unprivileged user account :(
> 
> There are a variety of iotests that include calls to sudo.
> 
> These automatically mark themselves as skipped if
> password-less sudo fails to work. So in practice the tests
> will rarely get run, but its better than nothing I guess.
> 
> We should add a skipUnlesPasswordlessSudo() method to the
> tests/functional/qemu_test/decorators.py class, and you
> then then annotate this test with that. Then just add sudo
> to the "ip" calls.
> 

Good, that works for me.

Thanks!

-- 
Best regards,
Vladimir