[PATCH v3 3/4] tests/functional: Adapt reverse_debugging to run w/o Avocado

Gustavo Romero posted 4 patches 6 days, 10 hours ago
Maintainers: Paolo Bonzini <pbonzini@redhat.com>, "Alex Bennée" <alex.bennee@linaro.org>, Thomas Huth <thuth@redhat.com>, "Marc-André Lureau" <marcandre.lureau@redhat.com>, "Daniel P. Berrangé" <berrange@redhat.com>, "Philippe Mathieu-Daudé" <philmd@linaro.org>, Zhao Liu <zhao1.liu@intel.com>
There is a newer version of this series
[PATCH v3 3/4] tests/functional: Adapt reverse_debugging to run w/o Avocado
Posted by Gustavo Romero 6 days, 10 hours ago
This commit removes Avocado as a dependency for running the
reverse_debugging test.

The main benefit, beyond eliminating an extra dependency, is that there
is no longer any need to handle GDB packets manually. This removes the
need for ad-hoc functions dealing with endianness and arch-specific
register numbers, making the test easier to read. The timeout variable
is also removed, since Meson now manages timeouts automatically.

reverse_debugging now uses the pygdbmi module to interact with GDB, if
it's available in the test environment, otherwise the test is skipped.
GDB is detect via the QEMU_TEST_GDB env. variable.

This commit also significantly improves the output for the test and
now prints all the GDB commands used in sequence. It also adds
some clarifications to existing comments, for example, clarifying that
once the replay-break is reached, a SIGINT is captured in GDB.

reverse_debugging is kept "skipped" for aarch64, ppc64, and x86_64, so
won't run unless QEMU_TEST_FLAKY_TESTS=1 is set in the test environment,
before running 'make check-functional' or 'meson test [...]'.

Signed-off-by: Gustavo Romero <gustavo.romero@linaro.org>
---
 tests/functional/reverse_debugging.py | 308 ++++++++++++++++----------
 1 file changed, 188 insertions(+), 120 deletions(-)

diff --git a/tests/functional/reverse_debugging.py b/tests/functional/reverse_debugging.py
index f9a1d395f1..38161beab8 100644
--- a/tests/functional/reverse_debugging.py
+++ b/tests/functional/reverse_debugging.py
@@ -1,21 +1,94 @@
-# Reverse debugging test
-#
 # SPDX-License-Identifier: GPL-2.0-or-later
 #
+# Reverse debugging test
+#
 # Copyright (c) 2020 ISP RAS
+# Copyright (c) 2025 Linaro Limited
 #
 # Author:
 #  Pavel Dovgalyuk <Pavel.Dovgalyuk@ispras.ru>
+#  Gustavo Romero <gustavo.romero@linaro.org> (Run without Avocado)
 #
 # This work is licensed under the terms of the GNU GPL, version 2 or
 # later.  See the COPYING file in the top-level directory.
-import os
+
 import logging
+import os
+import re
+import subprocess
+from pygdbmi.gdbcontroller import GdbController
+from pygdbmi.constants import GdbTimeoutError
+
 
 from qemu_test import LinuxKernelTest, get_qemu_img
 from qemu_test.ports import Ports
 
 
+class GDB:
+    def __init__(self, gdb_path, echo=True, suffix='# ', prompt="$ "):
+        gdb_cmd = [gdb_path, "-q", "--interpreter=mi2"]
+        self.gdbmi = GdbController(gdb_cmd)
+        self.echo = echo
+        self.suffix = suffix
+        self.prompt = prompt
+
+
+    def get_payload(self, response, kind):
+        output = []
+        for o in response:
+            # Unpack payloads of the same type.
+            _type, _, payload, *_ = o.values()
+            if _type == kind:
+                output += [payload]
+
+        # Some output lines do not end with \n but begin with it,
+        # so remove the leading \n and merge them with the next line
+        # that ends with \n.
+        lines = [line.lstrip('\n') for line in output]
+        lines = "".join(lines)
+        lines = lines.splitlines(keepends=True)
+
+        return lines
+
+
+    def cli(self, cmd, timeout=4.0):
+        self.response = self.gdbmi.write(cmd, timeout_sec=timeout)
+        self.cmd_output = self.get_payload(self.response, "console")
+        if self.echo:
+            print(self.suffix + self.prompt + cmd)
+
+            if len(self.cmd_output) > 0:
+                cmd_output = self.suffix.join(self.cmd_output)
+                print(self.suffix + cmd_output, end="")
+
+        return self
+
+
+    def get_addr(self):
+        pattern = r"0x[0-9A-Fa-f]+"
+        cmd_output = "".join(self.cmd_output)
+        match = re.search(pattern, cmd_output)
+
+        return int(match[0], 16) if match else None
+
+
+    def get_log(self):
+        r = self.get_payload(self.response, kind="log")
+        r = "".join(r)
+
+        return r
+
+
+    def get_console(self):
+        r = "".join(self.cmd_output)
+
+        return r
+
+
+    def exit(self):
+        self.gdbmi.exit()
+
+
 class ReverseDebugging(LinuxKernelTest):
     """
     Test GDB reverse debugging commands: reverse step and reverse continue.
@@ -28,21 +101,17 @@ class ReverseDebugging(LinuxKernelTest):
     that the execution is stopped at the last of them.
     """
 
-    timeout = 10
     STEPS = 10
-    endian_is_le = True
 
     def run_vm(self, record, shift, args, replay_path, image_path, port):
-        from avocado.utils import datadrainer
-
         logger = logging.getLogger('replay')
         vm = self.get_vm(name='record' if record else 'replay')
         vm.set_console()
         if record:
-            logger.info('recording the execution...')
+            logger.info('Recording the execution...')
             mode = 'record'
         else:
-            logger.info('replaying the execution...')
+            logger.info('Replaying the execution...')
             mode = 'replay'
             vm.add_args('-gdb', 'tcp::%d' % port, '-S')
         vm.add_args('-icount', 'shift=%s,rr=%s,rrfile=%s,rrsnapshot=init' %
@@ -52,145 +121,144 @@ def run_vm(self, record, shift, args, replay_path, image_path, port):
         if args:
             vm.add_args(*args)
         vm.launch()
-        console_drainer = datadrainer.LineLogger(vm.console_socket.fileno(),
-                                    logger=self.log.getChild('console'),
-                                    stop_check=(lambda : not vm.is_running()))
-        console_drainer.start()
-        return vm
 
-    @staticmethod
-    def get_reg_le(g, reg):
-        res = g.cmd(b'p%x' % reg)
-        num = 0
-        for i in range(len(res))[-2::-2]:
-            num = 0x100 * num + int(res[i:i + 2], 16)
-        return num
-
-    @staticmethod
-    def get_reg_be(g, reg):
-        res = g.cmd(b'p%x' % reg)
-        return int(res, 16)
-
-    def get_reg(self, g, reg):
-        # value may be encoded in BE or LE order
-        if self.endian_is_le:
-            return self.get_reg_le(g, reg)
-        else:
-            return self.get_reg_be(g, reg)
-
-    def get_pc(self, g):
-        return self.get_reg(g, self.REG_PC)
-
-    def check_pc(self, g, addr):
-        pc = self.get_pc(g)
-        if pc != addr:
-            self.fail('Invalid PC (read %x instead of %x)' % (pc, addr))
-
-    @staticmethod
-    def gdb_step(g):
-        g.cmd(b's', b'T05thread:01;')
-
-    @staticmethod
-    def gdb_bstep(g):
-        g.cmd(b'bs', b'T05thread:01;')
+        return vm
 
     @staticmethod
     def vm_get_icount(vm):
         return vm.qmp('query-replay')['return']['icount']
 
     def reverse_debugging(self, shift=7, args=None):
-        from avocado.utils import gdb
-        from avocado.utils import process
-
         logger = logging.getLogger('replay')
 
-        # create qcow2 for snapshots
-        logger.info('creating qcow2 image for VM snapshots')
+        # Create qcow2 for snapshots
+        logger.info('Creating qcow2 image for VM snapshots')
         image_path = os.path.join(self.workdir, 'disk.qcow2')
         qemu_img = get_qemu_img(self)
         if qemu_img is None:
             self.skipTest('Could not find "qemu-img", which is required to '
                           'create the temporary qcow2 image')
         cmd = '%s create -f qcow2 %s 128M' % (qemu_img, image_path)
-        process.run(cmd)
+        r = subprocess.run(cmd, capture_output=True, shell=True, text=True)
+        logger.info(r.args)
+        logger.info(r.stdout)
 
         replay_path = os.path.join(self.workdir, 'replay.bin')
 
-        # record the log
+        # Record the log.
         vm = self.run_vm(True, shift, args, replay_path, image_path, -1)
         while self.vm_get_icount(vm) <= self.STEPS:
             pass
         last_icount = self.vm_get_icount(vm)
         vm.shutdown()
 
-        logger.info("recorded log with %s+ steps" % last_icount)
+        logger.info("Recorded log with %s+ steps" % last_icount)
+
+        # Replay and run debug commands.
+        gdb_cmd = os.getenv('QEMU_TEST_GDB')
+        if not gdb_cmd:
+            test.skipTest(f"Test skipped because there is no GDB available!")
 
-        # replay and run debug commands
         with Ports() as ports:
             port = ports.find_free_port()
             vm = self.run_vm(False, shift, args, replay_path, image_path, port)
-        logger.info('connecting to gdbstub')
-        g = gdb.GDBRemote('127.0.0.1', port, False, False)
-        g.connect()
-        r = g.cmd(b'qSupported')
-        if b'qXfer:features:read+' in r:
-            g.cmd(b'qXfer:features:read:target.xml:0,ffb')
-        if b'ReverseStep+' not in r:
-            self.fail('Reverse step is not supported by QEMU')
-        if b'ReverseContinue+' not in r:
-            self.fail('Reverse continue is not supported by QEMU')
-
-        logger.info('stepping forward')
-        steps = []
-        # record first instruction addresses
-        for _ in range(self.STEPS):
-            pc = self.get_pc(g)
-            logger.info('saving position %x' % pc)
-            steps.append(pc)
-            self.gdb_step(g)
-
-        # visit the recorded instruction in reverse order
-        logger.info('stepping backward')
-        for addr in steps[::-1]:
-            self.gdb_bstep(g)
-            self.check_pc(g, addr)
-            logger.info('found position %x' % addr)
-
-        # visit the recorded instruction in forward order
-        logger.info('stepping forward')
-        for addr in steps:
-            self.check_pc(g, addr)
-            self.gdb_step(g)
-            logger.info('found position %x' % addr)
-
-        # set breakpoints for the instructions just stepped over
-        logger.info('setting breakpoints')
-        for addr in steps:
-            # hardware breakpoint at addr with len=1
-            g.cmd(b'Z1,%x,1' % addr, b'OK')
-
-        # this may hit a breakpoint if first instructions are executed
-        # again
-        logger.info('continuing execution')
-        vm.qmp('replay-break', icount=last_icount - 1)
-        # continue - will return after pausing
-        # This could stop at the end and get a T02 return, or by
-        # re-executing one of the breakpoints and get a T05 return.
-        g.cmd(b'c')
-        if self.vm_get_icount(vm) == last_icount - 1:
-            logger.info('reached the end (icount %s)' % (last_icount - 1))
-        else:
-            logger.info('hit a breakpoint again at %x (icount %s)' %
-                        (self.get_pc(g), self.vm_get_icount(vm)))
 
-        logger.info('running reverse continue to reach %x' % steps[-1])
-        # reverse continue - will return after stopping at the breakpoint
-        g.cmd(b'bc', b'T05thread:01;')
+        try:
+            gdb = GDB(gdb_cmd)
 
-        # assume that none of the first instructions is executed again
-        # breaking the order of the breakpoints
-        self.check_pc(g, steps[-1])
-        logger.info('successfully reached %x' % steps[-1])
+            logger.info('Connecting to gdbstub...')
 
-        logger.info('exiting gdb and qemu')
-        vm.shutdown()
+            gdb.cli("set debug remote 1")
+
+            c = gdb.cli(f"target remote localhost:{port}").get_console()
+            if not f"Remote debugging using localhost:{port}" in c:
+                self.fail("Could not connect to gdbstub!")
+
+            # Remote debug messages are in 'log' payloads.
+            r = gdb.get_log()
+            if 'ReverseStep+' not in r:
+                self.fail('Reverse step is not supported by QEMU')
+            if 'ReverseContinue+' not in r:
+                self.fail('Reverse continue is not supported by QEMU')
+
+            gdb.cli("set debug remote 0")
+
+            logger.info('Stepping forward')
+            steps = []
+            # Record first instruction addresses.
+            for _ in range(self.STEPS):
+                pc = gdb.cli("print $pc").get_addr()
+                logger.info('Saving position %x' % pc)
+                steps.append(pc)
+
+                gdb.cli("stepi")
+
+            # Visit the recorded instructions in reverse order.
+            logger.info('Stepping backward')
+            for saved_pc in steps[::-1]:
+                logger.info('Found position %x' % saved_pc)
+                gdb.cli("reverse-stepi")
+                pc = gdb.cli("print $pc").get_addr()
+                if pc != saved_pc:
+                    logger.info('Invalid PC (read %x instead of %x)' % (pc, saved_pc))
+                    self.fail('Reverse stepping failed!')
+
+            # Visit the recorded instructions in forward order.
+            logger.info('Stepping forward')
+            for saved_pc in steps:
+                logger.info('Found position %x' % saved_pc)
+                pc = gdb.cli("print $pc").get_addr()
+                if pc != saved_pc:
+                    logger.info('Invalid PC (read %x instead of %x)' % (pc, saved_pc))
+                    self.fail('Forward stepping failed!')
+
+                gdb.cli("stepi")
+
+            # Set breakpoints for the instructions just stepped over.
+            logger.info('Setting breakpoints')
+            for saved_pc in steps:
+                gdb.cli(f"break *{hex(saved_pc)}")
+
+            # This may hit a breakpoint if first instructions are executed again.
+            logger.info('Continuing execution')
+            vm.qmp('replay-break', icount=last_icount - 1)
+            # continue - will return after pausing.
+            # This can stop at the end of the replay-break and gdb gets a SIGINT,
+            # or by re-executing one of the breakpoints and gdb stops at a
+            # breakpoint.
+            gdb.cli("continue")
+
+            pc = gdb.cli("print $pc").get_addr()
+            current_icount = self.vm_get_icount(vm)
+            if current_icount == last_icount - 1:
+                print(f"# **** Hit replay-break at icount={current_icount}, pc={hex(pc)} ****")
+                logger.info('Reached the end (icount %s)' % (current_icount))
+            else:
+                print(f"# **** Hit breakpoint at icount={current_icount}, pc={hex(pc)} ****")
+                logger.info('Hit a breakpoint again at %x (icount %s)' %
+                            (pc, current_icount))
+
+            logger.info('Running reverse continue to reach %x' % steps[-1])
+            # reverse-continue - will return after stopping at the breakpoint.
+            gdb.cli("reverse-continue")
+
+            # Assume that none of the first instructions are executed again
+            # breaking the order of the breakpoints.
+            # steps[-1] is the first saved $pc in reverse order.
+            pc = gdb.cli("print $pc").get_addr()
+            first_pc_in_rev_order = steps[-1]
+            if pc == first_pc_in_rev_order:
+                print(f"# **** Hit breakpoint at the first PC in reverse order ({hex(pc)}) ****")
+                logger.info('Successfully reached breakpoint at %x' % first_pc_in_rev_order)
+            else:
+                logger.info('Failed to reach breakpoint at %x' % first_pc_in_rev_order)
+                self.fail("'reverse-continue' did not hit the first PC in reverse order!")
+
+            logger.info('Exiting GDB and QEMU...')
+            gdb.exit()
+            vm.shutdown()
+
+            logger.info('Test passed.')
+
+        except GdbTimeoutError:
+            self.fail("Connection to gdbstub timeouted...")
-- 
2.34.1
Re: [PATCH v3 3/4] tests/functional: Adapt reverse_debugging to run w/o Avocado
Posted by Daniel P. Berrangé 6 days, 6 hours ago
On Mon, Sep 22, 2025 at 05:43:50AM +0000, Gustavo Romero wrote:
> This commit removes Avocado as a dependency for running the
> reverse_debugging test.
> 
> The main benefit, beyond eliminating an extra dependency, is that there
> is no longer any need to handle GDB packets manually. This removes the
> need for ad-hoc functions dealing with endianness and arch-specific
> register numbers, making the test easier to read. The timeout variable
> is also removed, since Meson now manages timeouts automatically.
> 
> reverse_debugging now uses the pygdbmi module to interact with GDB, if
> it's available in the test environment, otherwise the test is skipped.
> GDB is detect via the QEMU_TEST_GDB env. variable.
> 
> This commit also significantly improves the output for the test and
> now prints all the GDB commands used in sequence. It also adds
> some clarifications to existing comments, for example, clarifying that
> once the replay-break is reached, a SIGINT is captured in GDB.
> 
> reverse_debugging is kept "skipped" for aarch64, ppc64, and x86_64, so
> won't run unless QEMU_TEST_FLAKY_TESTS=1 is set in the test environment,
> before running 'make check-functional' or 'meson test [...]'.
> 
> Signed-off-by: Gustavo Romero <gustavo.romero@linaro.org>
> ---
>  tests/functional/reverse_debugging.py | 308 ++++++++++++++++----------
>  1 file changed, 188 insertions(+), 120 deletions(-)
> 
> diff --git a/tests/functional/reverse_debugging.py b/tests/functional/reverse_debugging.py
> index f9a1d395f1..38161beab8 100644
> --- a/tests/functional/reverse_debugging.py
> +++ b/tests/functional/reverse_debugging.py
> @@ -1,21 +1,94 @@
> -# Reverse debugging test
> -#
>  # SPDX-License-Identifier: GPL-2.0-or-later
>  #
> +# Reverse debugging test
> +#
>  # Copyright (c) 2020 ISP RAS
> +# Copyright (c) 2025 Linaro Limited
>  #
>  # Author:
>  #  Pavel Dovgalyuk <Pavel.Dovgalyuk@ispras.ru>
> +#  Gustavo Romero <gustavo.romero@linaro.org> (Run without Avocado)
>  #
>  # This work is licensed under the terms of the GNU GPL, version 2 or
>  # later.  See the COPYING file in the top-level directory.
> -import os
> +
>  import logging
> +import os
> +import re
> +import subprocess
> +from pygdbmi.gdbcontroller import GdbController
> +from pygdbmi.constants import GdbTimeoutError
> +
>  
>  from qemu_test import LinuxKernelTest, get_qemu_img
>  from qemu_test.ports import Ports
>  
>  
> +class GDB:
> +    def __init__(self, gdb_path, echo=True, suffix='# ', prompt="$ "):
> +        gdb_cmd = [gdb_path, "-q", "--interpreter=mi2"]
> +        self.gdbmi = GdbController(gdb_cmd)
> +        self.echo = echo
> +        self.suffix = suffix
> +        self.prompt = prompt
> +
> +
> +    def get_payload(self, response, kind):
> +        output = []
> +        for o in response:
> +            # Unpack payloads of the same type.
> +            _type, _, payload, *_ = o.values()
> +            if _type == kind:
> +                output += [payload]
> +
> +        # Some output lines do not end with \n but begin with it,
> +        # so remove the leading \n and merge them with the next line
> +        # that ends with \n.
> +        lines = [line.lstrip('\n') for line in output]
> +        lines = "".join(lines)
> +        lines = lines.splitlines(keepends=True)
> +
> +        return lines
> +
> +
> +    def cli(self, cmd, timeout=4.0):
> +        self.response = self.gdbmi.write(cmd, timeout_sec=timeout)
> +        self.cmd_output = self.get_payload(self.response, "console")
> +        if self.echo:
> +            print(self.suffix + self.prompt + cmd)
> +
> +            if len(self.cmd_output) > 0:
> +                cmd_output = self.suffix.join(self.cmd_output)
> +                print(self.suffix + cmd_output, end="")
> +
> +        return self
> +
> +
> +    def get_addr(self):
> +        pattern = r"0x[0-9A-Fa-f]+"
> +        cmd_output = "".join(self.cmd_output)
> +        match = re.search(pattern, cmd_output)
> +
> +        return int(match[0], 16) if match else None
> +
> +
> +    def get_log(self):
> +        r = self.get_payload(self.response, kind="log")
> +        r = "".join(r)
> +
> +        return r
> +
> +
> +    def get_console(self):
> +        r = "".join(self.cmd_output)
> +
> +        return r
> +
> +
> +    def exit(self):
> +        self.gdbmi.exit()
> +

Can you put this in tests/functional/qemu_test/gdb.py as it is
generic logic not tied to this specific test.

>  class ReverseDebugging(LinuxKernelTest):
>      """
>      Test GDB reverse debugging commands: reverse step and reverse continue.
> @@ -28,21 +101,17 @@ class ReverseDebugging(LinuxKernelTest):
>      that the execution is stopped at the last of them.
>      """
>  
> -    timeout = 10
>      STEPS = 10
> -    endian_is_le = True
>  
>      def run_vm(self, record, shift, args, replay_path, image_path, port):
> -        from avocado.utils import datadrainer
> -
>          logger = logging.getLogger('replay')
>          vm = self.get_vm(name='record' if record else 'replay')
>          vm.set_console()
>          if record:
> -            logger.info('recording the execution...')
> +            logger.info('Recording the execution...')
>              mode = 'record'
>          else:
> -            logger.info('replaying the execution...')
> +            logger.info('Replaying the execution...')

This change isn't really needed imho.

>              mode = 'replay'
>              vm.add_args('-gdb', 'tcp::%d' % port, '-S')
>          vm.add_args('-icount', 'shift=%s,rr=%s,rrfile=%s,rrsnapshot=init' %
> @@ -52,145 +121,144 @@ def run_vm(self, record, shift, args, replay_path, image_path, port):
>          if args:
>              vm.add_args(*args)
>          vm.launch()
> -        console_drainer = datadrainer.LineLogger(vm.console_socket.fileno(),
> -                                    logger=self.log.getChild('console'),
> -                                    stop_check=(lambda : not vm.is_running()))
> -        console_drainer.start()

This is unrelated to gdbmi conversino, so can you drop the data drainer
stuff on its own, eg this patch

  https://lists.nongnu.org/archive/html/qemu-devel/2025-09/msg02501.html

> -        return vm
>  
> -    @staticmethod
> -    def get_reg_le(g, reg):
> -        res = g.cmd(b'p%x' % reg)
> -        num = 0
> -        for i in range(len(res))[-2::-2]:
> -            num = 0x100 * num + int(res[i:i + 2], 16)
> -        return num
> -
> -    @staticmethod
> -    def get_reg_be(g, reg):
> -        res = g.cmd(b'p%x' % reg)
> -        return int(res, 16)
> -
> -    def get_reg(self, g, reg):
> -        # value may be encoded in BE or LE order
> -        if self.endian_is_le:
> -            return self.get_reg_le(g, reg)
> -        else:
> -            return self.get_reg_be(g, reg)
> -
> -    def get_pc(self, g):
> -        return self.get_reg(g, self.REG_PC)
> -
> -    def check_pc(self, g, addr):
> -        pc = self.get_pc(g)
> -        if pc != addr:
> -            self.fail('Invalid PC (read %x instead of %x)' % (pc, addr))
> -
> -    @staticmethod
> -    def gdb_step(g):
> -        g.cmd(b's', b'T05thread:01;')
> -
> -    @staticmethod
> -    def gdb_bstep(g):
> -        g.cmd(b'bs', b'T05thread:01;')
> +        return vm
>  
>      @staticmethod
>      def vm_get_icount(vm):
>          return vm.qmp('query-replay')['return']['icount']
>  
>      def reverse_debugging(self, shift=7, args=None):
> -        from avocado.utils import gdb
> -        from avocado.utils import process
> -
>          logger = logging.getLogger('replay')
>  
> -        # create qcow2 for snapshots
> -        logger.info('creating qcow2 image for VM snapshots')
> +        # Create qcow2 for snapshots
> +        logger.info('Creating qcow2 image for VM snapshots')

Please avoid mixing in extra changes like this with other
functional refactoring, as it makes the diff larger and
harder to review. There's many more examples of changes
like this but I wouln't point them all out.  If you still
think this is beneficial, do it as a separate commit from
the functional changes.

>          image_path = os.path.join(self.workdir, 'disk.qcow2')
>          qemu_img = get_qemu_img(self)
>          if qemu_img is None:
>              self.skipTest('Could not find "qemu-img", which is required to '
>                            'create the temporary qcow2 image')
>          cmd = '%s create -f qcow2 %s 128M' % (qemu_img, image_path)
> -        process.run(cmd)
> +        r = subprocess.run(cmd, capture_output=True, shell=True, text=True)
> +        logger.info(r.args)
> +        logger.info(r.stdout)

Can you remove the 'process.run' in a separate commit, and use
check_output, rather than 'run', and avoid the use of shell.
ie this patch:

 https://lists.nongnu.org/archive/html/qemu-devel/2025-09/msg02500.html

>  
>          replay_path = os.path.join(self.workdir, 'replay.bin')
>  
> -        # record the log
> +        # Record the log.
>          vm = self.run_vm(True, shift, args, replay_path, image_path, -1)
>          while self.vm_get_icount(vm) <= self.STEPS:
>              pass
>          last_icount = self.vm_get_icount(vm)
>          vm.shutdown()
>  
> -        logger.info("recorded log with %s+ steps" % last_icount)
> +        logger.info("Recorded log with %s+ steps" % last_icount)
> +
> +        # Replay and run debug commands.
> +        gdb_cmd = os.getenv('QEMU_TEST_GDB')
> +        if not gdb_cmd:
> +            test.skipTest(f"Test skipped because there is no GDB available!")


This message doesn't tell the user what envrionment variable they
are missing.

Can you introduce a 'skipIfMissingEnv(envname)' decorator to
tests/functional/qemu_test/decorators.py, and then use it to
decorate the tests

>  
> -        # replay and run debug commands
>          with Ports() as ports:
>              port = ports.find_free_port()
>              vm = self.run_vm(False, shift, args, replay_path, image_path, port)


I'd suggest that at this point to start a new method

       try:
         logger.info('connecting to gdbstub')
         self.reverse_debugging_run(vm)
         logger.info('Test passed.')
       except GdbTimeoutError:
         self.fail("Connection to gdbstub timeouted...")

So we have a separation between the test bootstrap and the test
execution, with the added benefit that this patch won't bulk
re-indent everything below this point - the bulk re-indent makes
it hard to review this

> -        logger.info('connecting to gdbstub')
> -        g = gdb.GDBRemote('127.0.0.1', port, False, False)
> -        g.connect()
> -        r = g.cmd(b'qSupported')
> -        if b'qXfer:features:read+' in r:
> -            g.cmd(b'qXfer:features:read:target.xml:0,ffb')
> -        if b'ReverseStep+' not in r:
> -            self.fail('Reverse step is not supported by QEMU')
> -        if b'ReverseContinue+' not in r:
> -            self.fail('Reverse continue is not supported by QEMU')
> -
> -        logger.info('stepping forward')
> -        steps = []
> -        # record first instruction addresses
> -        for _ in range(self.STEPS):
> -            pc = self.get_pc(g)
> -            logger.info('saving position %x' % pc)
> -            steps.append(pc)
> -            self.gdb_step(g)
> -
> -        # visit the recorded instruction in reverse order
> -        logger.info('stepping backward')
> -        for addr in steps[::-1]:
> -            self.gdb_bstep(g)
> -            self.check_pc(g, addr)
> -            logger.info('found position %x' % addr)
> -
> -        # visit the recorded instruction in forward order
> -        logger.info('stepping forward')
> -        for addr in steps:
> -            self.check_pc(g, addr)
> -            self.gdb_step(g)
> -            logger.info('found position %x' % addr)
> -
> -        # set breakpoints for the instructions just stepped over
> -        logger.info('setting breakpoints')
> -        for addr in steps:
> -            # hardware breakpoint at addr with len=1
> -            g.cmd(b'Z1,%x,1' % addr, b'OK')
> -
> -        # this may hit a breakpoint if first instructions are executed
> -        # again
> -        logger.info('continuing execution')
> -        vm.qmp('replay-break', icount=last_icount - 1)
> -        # continue - will return after pausing
> -        # This could stop at the end and get a T02 return, or by
> -        # re-executing one of the breakpoints and get a T05 return.
> -        g.cmd(b'c')
> -        if self.vm_get_icount(vm) == last_icount - 1:
> -            logger.info('reached the end (icount %s)' % (last_icount - 1))
> -        else:
> -            logger.info('hit a breakpoint again at %x (icount %s)' %
> -                        (self.get_pc(g), self.vm_get_icount(vm)))
>  
> -        logger.info('running reverse continue to reach %x' % steps[-1])
> -        # reverse continue - will return after stopping at the breakpoint
> -        g.cmd(b'bc', b'T05thread:01;')
> +        try:
> +            gdb = GDB(gdb_cmd)
>  
> -        # assume that none of the first instructions is executed again
> -        # breaking the order of the breakpoints
> -        self.check_pc(g, steps[-1])
> -        logger.info('successfully reached %x' % steps[-1])
> +            logger.info('Connecting to gdbstub...')
>  
> -        logger.info('exiting gdb and qemu')
> -        vm.shutdown()
> +            gdb.cli("set debug remote 1")
> +
> +            c = gdb.cli(f"target remote localhost:{port}").get_console()
> +            if not f"Remote debugging using localhost:{port}" in c:
> +                self.fail("Could not connect to gdbstub!")
> +
> +            # Remote debug messages are in 'log' payloads.
> +            r = gdb.get_log()
> +            if 'ReverseStep+' not in r:
> +                self.fail('Reverse step is not supported by QEMU')
> +            if 'ReverseContinue+' not in r:
> +                self.fail('Reverse continue is not supported by QEMU')
> +
> +            gdb.cli("set debug remote 0")
> +
> +            logger.info('Stepping forward')
> +            steps = []
> +            # Record first instruction addresses.
> +            for _ in range(self.STEPS):
> +                pc = gdb.cli("print $pc").get_addr()
> +                logger.info('Saving position %x' % pc)
> +                steps.append(pc)
> +
> +                gdb.cli("stepi")
> +
> +            # Visit the recorded instructions in reverse order.
> +            logger.info('Stepping backward')
> +            for saved_pc in steps[::-1]:
> +                logger.info('Found position %x' % saved_pc)
> +                gdb.cli("reverse-stepi")
> +                pc = gdb.cli("print $pc").get_addr()
> +                if pc != saved_pc:
> +                    logger.info('Invalid PC (read %x instead of %x)' % (pc, saved_pc))
> +                    self.fail('Reverse stepping failed!')
> +
> +            # Visit the recorded instructions in forward order.
> +            logger.info('Stepping forward')
> +            for saved_pc in steps:
> +                logger.info('Found position %x' % saved_pc)
> +                pc = gdb.cli("print $pc").get_addr()
> +                if pc != saved_pc:
> +                    logger.info('Invalid PC (read %x instead of %x)' % (pc, saved_pc))
> +                    self.fail('Forward stepping failed!')
> +
> +                gdb.cli("stepi")
> +
> +            # Set breakpoints for the instructions just stepped over.
> +            logger.info('Setting breakpoints')
> +            for saved_pc in steps:
> +                gdb.cli(f"break *{hex(saved_pc)}")
> +
> +            # This may hit a breakpoint if first instructions are executed again.
> +            logger.info('Continuing execution')
> +            vm.qmp('replay-break', icount=last_icount - 1)
> +            # continue - will return after pausing.
> +            # This can stop at the end of the replay-break and gdb gets a SIGINT,
> +            # or by re-executing one of the breakpoints and gdb stops at a
> +            # breakpoint.
> +            gdb.cli("continue")
> +
> +            pc = gdb.cli("print $pc").get_addr()
> +            current_icount = self.vm_get_icount(vm)
> +            if current_icount == last_icount - 1:
> +                print(f"# **** Hit replay-break at icount={current_icount}, pc={hex(pc)} ****")
> +                logger.info('Reached the end (icount %s)' % (current_icount))
> +            else:
> +                print(f"# **** Hit breakpoint at icount={current_icount}, pc={hex(pc)} ****")
> +                logger.info('Hit a breakpoint again at %x (icount %s)' %
> +                            (pc, current_icount))
> +
> +            logger.info('Running reverse continue to reach %x' % steps[-1])
> +            # reverse-continue - will return after stopping at the breakpoint.
> +            gdb.cli("reverse-continue")
> +
> +            # Assume that none of the first instructions are executed again
> +            # breaking the order of the breakpoints.
> +            # steps[-1] is the first saved $pc in reverse order.
> +            pc = gdb.cli("print $pc").get_addr()
> +            first_pc_in_rev_order = steps[-1]
> +            if pc == first_pc_in_rev_order:
> +                print(f"# **** Hit breakpoint at the first PC in reverse order ({hex(pc)}) ****")
> +                logger.info('Successfully reached breakpoint at %x' % first_pc_in_rev_order)
> +            else:
> +                logger.info('Failed to reach breakpoint at %x' % first_pc_in_rev_order)
> +                self.fail("'reverse-continue' did not hit the first PC in reverse order!")
> +
> +            logger.info('Exiting GDB and QEMU...')
> +            gdb.exit()
> +            vm.shutdown()
> +
> +            logger.info('Test passed.')
> +
> +        except GdbTimeoutError:
> +            self.fail("Connection to gdbstub timeouted...")
> -- 
> 2.34.1
> 

With regards,
Daniel
-- 
|: https://berrange.com      -o-    https://www.flickr.com/photos/dberrange :|
|: https://libvirt.org         -o-            https://fstop138.berrange.com :|
|: https://entangle-photo.org    -o-    https://www.instagram.com/dberrange :|
Re: [PATCH v3 3/4] tests/functional: Adapt reverse_debugging to run w/o Avocado
Posted by Alex Bennée 6 days, 6 hours ago
Gustavo Romero <gustavo.romero@linaro.org> writes:

> This commit removes Avocado as a dependency for running the
> reverse_debugging test.
>
> The main benefit, beyond eliminating an extra dependency, is that there
> is no longer any need to handle GDB packets manually. This removes the
> need for ad-hoc functions dealing with endianness and arch-specific
> register numbers, making the test easier to read. The timeout variable
> is also removed, since Meson now manages timeouts automatically.
>
> reverse_debugging now uses the pygdbmi module to interact with GDB, if
> it's available in the test environment, otherwise the test is skipped.
> GDB is detect via the QEMU_TEST_GDB env. variable.
>
> This commit also significantly improves the output for the test and
> now prints all the GDB commands used in sequence. It also adds
> some clarifications to existing comments, for example, clarifying that
> once the replay-break is reached, a SIGINT is captured in GDB.
>
> reverse_debugging is kept "skipped" for aarch64, ppc64, and x86_64, so
> won't run unless QEMU_TEST_FLAKY_TESTS=1 is set in the test environment,
> before running 'make check-functional' or 'meson test [...]'.
>
> Signed-off-by: Gustavo Romero <gustavo.romero@linaro.org>
> ---
>  tests/functional/reverse_debugging.py | 308 ++++++++++++++++----------
>  1 file changed, 188 insertions(+), 120 deletions(-)
>
> diff --git a/tests/functional/reverse_debugging.py b/tests/functional/reverse_debugging.py
> index f9a1d395f1..38161beab8 100644
> --- a/tests/functional/reverse_debugging.py
> +++ b/tests/functional/reverse_debugging.py
> @@ -1,21 +1,94 @@
> -# Reverse debugging test
> -#
>  # SPDX-License-Identifier: GPL-2.0-or-later
>  #
> +# Reverse debugging test
> +#
>  # Copyright (c) 2020 ISP RAS
> +# Copyright (c) 2025 Linaro Limited
>  #
>  # Author:
>  #  Pavel Dovgalyuk <Pavel.Dovgalyuk@ispras.ru>
> +#  Gustavo Romero <gustavo.romero@linaro.org> (Run without Avocado)
>  #
>  # This work is licensed under the terms of the GNU GPL, version 2 or
>  # later.  See the COPYING file in the top-level directory.
> -import os
> +
>  import logging
> +import os
> +import re
> +import subprocess
> +from pygdbmi.gdbcontroller import GdbController
> +from pygdbmi.constants import GdbTimeoutError
> +
>  
>  from qemu_test import LinuxKernelTest, get_qemu_img
>  from qemu_test.ports import Ports
>  
>  
> +class GDB:
> +    def __init__(self, gdb_path, echo=True, suffix='# ', prompt="$ "):
> +        gdb_cmd = [gdb_path, "-q", "--interpreter=mi2"]
> +        self.gdbmi = GdbController(gdb_cmd)
> +        self.echo = echo
> +        self.suffix = suffix
> +        self.prompt = prompt
> +
> +
> +    def get_payload(self, response, kind):
> +        output = []
> +        for o in response:
> +            # Unpack payloads of the same type.
> +            _type, _, payload, *_ = o.values()
> +            if _type == kind:
> +                output += [payload]
> +
> +        # Some output lines do not end with \n but begin with it,
> +        # so remove the leading \n and merge them with the next line
> +        # that ends with \n.
> +        lines = [line.lstrip('\n') for line in output]
> +        lines = "".join(lines)
> +        lines = lines.splitlines(keepends=True)
> +
> +        return lines
> +
> +
> +    def cli(self, cmd, timeout=4.0):
> +        self.response = self.gdbmi.write(cmd, timeout_sec=timeout)
> +        self.cmd_output = self.get_payload(self.response, "console")
> +        if self.echo:
> +            print(self.suffix + self.prompt + cmd)
> +
> +            if len(self.cmd_output) > 0:
> +                cmd_output = self.suffix.join(self.cmd_output)
> +                print(self.suffix + cmd_output, end="")
> +
> +        return self
> +
> +
> +    def get_addr(self):
> +        pattern = r"0x[0-9A-Fa-f]+"
> +        cmd_output = "".join(self.cmd_output)
> +        match = re.search(pattern, cmd_output)
> +
> +        return int(match[0], 16) if match else None
> +
> +
> +    def get_log(self):
> +        r = self.get_payload(self.response, kind="log")
> +        r = "".join(r)
> +
> +        return r
> +
> +
> +    def get_console(self):
> +        r = "".join(self.cmd_output)
> +
> +        return r
> +
> +
> +    def exit(self):
> +        self.gdbmi.exit()
> +
> +

Could this re-factor into a class have been a separate commit?

>  class ReverseDebugging(LinuxKernelTest):
>      """
>      Test GDB reverse debugging commands: reverse step and reverse continue.
> @@ -28,21 +101,17 @@ class ReverseDebugging(LinuxKernelTest):
>      that the execution is stopped at the last of them.
>      """
>  
> -    timeout = 10
>      STEPS = 10
> -    endian_is_le = True
>  
>      def run_vm(self, record, shift, args, replay_path, image_path, port):
> -        from avocado.utils import datadrainer
> -
>          logger = logging.getLogger('replay')
>          vm = self.get_vm(name='record' if record else 'replay')
>          vm.set_console()
>          if record:
> -            logger.info('recording the execution...')
> +            logger.info('Recording the execution...')

Mixing capitalisation fixes with logical change makes reviewing a pain.

>              mode = 'record'
>          else:
> -            logger.info('replaying the execution...')
> +            logger.info('Replaying the execution...')
>              mode = 'replay'
>              vm.add_args('-gdb', 'tcp::%d' % port, '-S')
>          vm.add_args('-icount', 'shift=%s,rr=%s,rrfile=%s,rrsnapshot=init' %
> @@ -52,145 +121,144 @@ def run_vm(self, record, shift, args, replay_path, image_path, port):
>          if args:
>              vm.add_args(*args)
>          vm.launch()
> -        console_drainer = datadrainer.LineLogger(vm.console_socket.fileno(),
> -                                    logger=self.log.getChild('console'),
> -                                    stop_check=(lambda : not vm.is_running()))
> -        console_drainer.start()
> -        return vm

I suspect dropping the console drainer could be a separate commit like
in Daniels series.

>  
> -    @staticmethod
> -    def get_reg_le(g, reg):
> -        res = g.cmd(b'p%x' % reg)
> -        num = 0
> -        for i in range(len(res))[-2::-2]:
> -            num = 0x100 * num + int(res[i:i + 2], 16)
> -        return num
> -
> -    @staticmethod
> -    def get_reg_be(g, reg):
> -        res = g.cmd(b'p%x' % reg)
> -        return int(res, 16)
> -
> -    def get_reg(self, g, reg):
> -        # value may be encoded in BE or LE order
> -        if self.endian_is_le:
> -            return self.get_reg_le(g, reg)
> -        else:
> -            return self.get_reg_be(g, reg)
> -
> -    def get_pc(self, g):
> -        return self.get_reg(g, self.REG_PC)
> -
> -    def check_pc(self, g, addr):
> -        pc = self.get_pc(g)
> -        if pc != addr:
> -            self.fail('Invalid PC (read %x instead of %x)' % (pc, addr))
> -
> -    @staticmethod
> -    def gdb_step(g):
> -        g.cmd(b's', b'T05thread:01;')
> -
> -    @staticmethod
> -    def gdb_bstep(g):
> -        g.cmd(b'bs', b'T05thread:01;')
> +        return vm
>  
>      @staticmethod
>      def vm_get_icount(vm):
>          return vm.qmp('query-replay')['return']['icount']
>  
>      def reverse_debugging(self, shift=7, args=None):
> -        from avocado.utils import gdb
> -        from avocado.utils import process
> -
>          logger = logging.getLogger('replay')
>  
> -        # create qcow2 for snapshots
> -        logger.info('creating qcow2 image for VM snapshots')
> +        # Create qcow2 for snapshots
> +        logger.info('Creating qcow2 image for VM snapshots')
>          image_path = os.path.join(self.workdir, 'disk.qcow2')
>          qemu_img = get_qemu_img(self)
>          if qemu_img is None:
>              self.skipTest('Could not find "qemu-img", which is required to '
>                            'create the temporary qcow2 image')
>          cmd = '%s create -f qcow2 %s 128M' % (qemu_img, image_path)
> -        process.run(cmd)
> +        r = subprocess.run(cmd, capture_output=True, shell=True, text=True)
> +        logger.info(r.args)
> +        logger.info(r.stdout)
>  
>          replay_path = os.path.join(self.workdir, 'replay.bin')
>  
> -        # record the log
> +        # Record the log.
>          vm = self.run_vm(True, shift, args, replay_path, image_path, -1)
>          while self.vm_get_icount(vm) <= self.STEPS:
>              pass
>          last_icount = self.vm_get_icount(vm)
>          vm.shutdown()
>  
> -        logger.info("recorded log with %s+ steps" % last_icount)
> +        logger.info("Recorded log with %s+ steps" % last_icount)
> +
> +        # Replay and run debug commands.
> +        gdb_cmd = os.getenv('QEMU_TEST_GDB')
> +        if not gdb_cmd:
> +            test.skipTest(f"Test skipped because there is no GDB
> available!")

This fails:

  test:         qemu:func-thorough+func-aarch64-thorough+thorough / func-aarch64-reverse_debug
  start time:   09:24:25
  duration:     0.88s
  result:       exit status 1
  command:      ASAN_OPTIONS=halt_on_error=1:abort_on_error=1:print_summary=1 MALLOC_PERTURB_=220 QEMU_TEST_QEMU_BINARY=/home/alex/lsrc/qemu.git/builds/all/qemu-system-aarch64 
  LD_LIBRARY_PATH=/home/alex/lsrc/qemu.git/builds/all/contrib/plugins:/home/alex/lsrc/qemu.git/builds/all/tests/tcg/plugins UBSAN_OPTIONS=halt_on_error=1:abort_on_error=1:print
  _summary=1:print_stacktrace=1 PYTHONPATH=/home/alex/lsrc/qemu.git/python:/home/alex/lsrc/qemu.git/tests/functional RUST_BACKTRACE=1 QEMU_BUILD_ROOT=/home/alex/lsrc/qemu.git/b
  uilds/all MSAN_OPTIONS=halt_on_error=1:abort_on_error=1:print_summary=1:print_stacktrace=1 QEMU_TEST_QEMU_IMG=/home/alex/lsrc/qemu.git/builds/all/qemu-img MESON_TEST_ITERATIO
  N=1 /home/alex/lsrc/qemu.git/builds/all/pyvenv/bin/python3 /home/alex/lsrc/qemu.git/tests/functional/aarch64/test_reverse_debug.py
  ----------------------------------- stdout -----------------------------------
  TAP version 13
  not ok 1 test_reverse_debug.ReverseDebugging_AArch64.test_aarch64_virt
  1..1
  ----------------------------------- stderr -----------------------------------
  Traceback (most recent call last):
    File "/home/alex/lsrc/qemu.git/tests/functional/aarch64/test_reverse_debug.py", line 31, in test_aarch64_virt
      self.reverse_debugging(args=('-kernel', kernel_path))
      ~~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
    File "/home/alex/lsrc/qemu.git/tests/functional/reverse_debugging.py", line 160, in reverse_debugging
      test.skipTest(f"Test skipped because there is no GDB available!")
      ^^^^
  NameError: name 'test' is not defined

  More information on test_reverse_debug.ReverseDebugging_AArch64.test_aarch64_virt could be found here:
   /home/alex/lsrc/qemu.git/builds/all/tests/functional/aarch64/test_reverse_debug.ReverseDebugging_AArch64.test_aarch64_virt/base.log
   /home/alex/lsrc/qemu.git/builds/all/tests/functional/aarch64/test_reverse_debug.ReverseDebugging_AArch64.test_aarch64_virt/console.log

  (test program exited with status code 1)

Not sure why though as:

    cat config-host.mak
  # Automatically generated by configure - do not modify

  all:
  SRC_PATH=/home/alex/lsrc/qemu.git
  TARGET_DIRS=aarch64-linux-user aarch64_be-linux-user alpha-linux-user arm-linux-user armeb-linux-user hexagon-linux-user hppa-linux-user i386-linux-user loongarch64-linux-user m68k-linux-user microblaze-linux-user microblazeel-linux-user mips-linux-user mips64-linux-user mips64el-linux-user mipsel-linux-user mipsn32-linux-user mipsn32el-linux-user or1k-linux-user ppc-linux-user ppc64-linux-user ppc64le-linux-user riscv32-linux-user riscv64-linux-user s390x-linux-user sh4-linux-user sh4eb-linux-user sparc-linux-user sparc32plus-linux-user sparc64-linux-user x86_64-linux-user xtensa-linux-user xtensaeb-linux-user aarch64-softmmu alpha-softmmu arm-softmmu avr-softmmu hppa-softmmu i386-softmmu loongarch64-softmmu m68k-softmmu microblaze-softmmu microblazeel-softmmu mips-softmmu mips64-softmmu mips64el-softmmu mipsel-softmmu or1k-softmmu ppc-softmmu ppc64-softmmu riscv32-softmmu riscv64-softmmu rx-softmmu s390x-softmmu sh4-softmmu sh4eb-softmmu sparc-softmmu sparc64-softmmu tricore-softmmu x86_64-softmmu xtensa-softmmu xtensaeb-softmmu
  GDB=/usr/bin/gdb-multiarch

>  
> -        # replay and run debug commands
>          with Ports() as ports:
>              port = ports.find_free_port()
>              vm = self.run_vm(False, shift, args, replay_path, image_path, port)
> -        logger.info('connecting to gdbstub')
> -        g = gdb.GDBRemote('127.0.0.1', port, False, False)
> -        g.connect()
> -        r = g.cmd(b'qSupported')
> -        if b'qXfer:features:read+' in r:
> -            g.cmd(b'qXfer:features:read:target.xml:0,ffb')
> -        if b'ReverseStep+' not in r:
> -            self.fail('Reverse step is not supported by QEMU')
> -        if b'ReverseContinue+' not in r:
> -            self.fail('Reverse continue is not supported by QEMU')
> -
> -        logger.info('stepping forward')
> -        steps = []
> -        # record first instruction addresses
> -        for _ in range(self.STEPS):
> -            pc = self.get_pc(g)
> -            logger.info('saving position %x' % pc)
> -            steps.append(pc)
> -            self.gdb_step(g)
> -
> -        # visit the recorded instruction in reverse order
> -        logger.info('stepping backward')
> -        for addr in steps[::-1]:
> -            self.gdb_bstep(g)
> -            self.check_pc(g, addr)
> -            logger.info('found position %x' % addr)
> -
> -        # visit the recorded instruction in forward order
> -        logger.info('stepping forward')
> -        for addr in steps:
> -            self.check_pc(g, addr)
> -            self.gdb_step(g)
> -            logger.info('found position %x' % addr)
> -
> -        # set breakpoints for the instructions just stepped over
> -        logger.info('setting breakpoints')
> -        for addr in steps:
> -            # hardware breakpoint at addr with len=1
> -            g.cmd(b'Z1,%x,1' % addr, b'OK')
> -
> -        # this may hit a breakpoint if first instructions are executed
> -        # again
> -        logger.info('continuing execution')
> -        vm.qmp('replay-break', icount=last_icount - 1)
> -        # continue - will return after pausing
> -        # This could stop at the end and get a T02 return, or by
> -        # re-executing one of the breakpoints and get a T05 return.
> -        g.cmd(b'c')
> -        if self.vm_get_icount(vm) == last_icount - 1:
> -            logger.info('reached the end (icount %s)' % (last_icount - 1))
> -        else:
> -            logger.info('hit a breakpoint again at %x (icount %s)' %
> -                        (self.get_pc(g), self.vm_get_icount(vm)))
>  
> -        logger.info('running reverse continue to reach %x' % steps[-1])
> -        # reverse continue - will return after stopping at the breakpoint
> -        g.cmd(b'bc', b'T05thread:01;')
> +        try:
> +            gdb = GDB(gdb_cmd)
>  
> -        # assume that none of the first instructions is executed again
> -        # breaking the order of the breakpoints
> -        self.check_pc(g, steps[-1])
> -        logger.info('successfully reached %x' % steps[-1])
> +            logger.info('Connecting to gdbstub...')
>  
> -        logger.info('exiting gdb and qemu')
> -        vm.shutdown()
> +            gdb.cli("set debug remote 1")
> +
> +            c = gdb.cli(f"target remote localhost:{port}").get_console()
> +            if not f"Remote debugging using localhost:{port}" in c:
> +                self.fail("Could not connect to gdbstub!")
> +
> +            # Remote debug messages are in 'log' payloads.
> +            r = gdb.get_log()
> +            if 'ReverseStep+' not in r:
> +                self.fail('Reverse step is not supported by QEMU')
> +            if 'ReverseContinue+' not in r:
> +                self.fail('Reverse continue is not supported by QEMU')
> +
> +            gdb.cli("set debug remote 0")
> +
> +            logger.info('Stepping forward')
> +            steps = []
> +            # Record first instruction addresses.
> +            for _ in range(self.STEPS):
> +                pc = gdb.cli("print $pc").get_addr()
> +                logger.info('Saving position %x' % pc)
> +                steps.append(pc)
> +
> +                gdb.cli("stepi")
> +
> +            # Visit the recorded instructions in reverse order.
> +            logger.info('Stepping backward')
> +            for saved_pc in steps[::-1]:
> +                logger.info('Found position %x' % saved_pc)
> +                gdb.cli("reverse-stepi")
> +                pc = gdb.cli("print $pc").get_addr()
> +                if pc != saved_pc:
> +                    logger.info('Invalid PC (read %x instead of %x)' % (pc, saved_pc))
> +                    self.fail('Reverse stepping failed!')
> +
> +            # Visit the recorded instructions in forward order.
> +            logger.info('Stepping forward')
> +            for saved_pc in steps:
> +                logger.info('Found position %x' % saved_pc)
> +                pc = gdb.cli("print $pc").get_addr()
> +                if pc != saved_pc:
> +                    logger.info('Invalid PC (read %x instead of %x)' % (pc, saved_pc))
> +                    self.fail('Forward stepping failed!')
> +
> +                gdb.cli("stepi")
> +
> +            # Set breakpoints for the instructions just stepped over.
> +            logger.info('Setting breakpoints')
> +            for saved_pc in steps:
> +                gdb.cli(f"break *{hex(saved_pc)}")
> +
> +            # This may hit a breakpoint if first instructions are executed again.
> +            logger.info('Continuing execution')
> +            vm.qmp('replay-break', icount=last_icount - 1)
> +            # continue - will return after pausing.
> +            # This can stop at the end of the replay-break and gdb gets a SIGINT,
> +            # or by re-executing one of the breakpoints and gdb stops at a
> +            # breakpoint.
> +            gdb.cli("continue")
> +
> +            pc = gdb.cli("print $pc").get_addr()
> +            current_icount = self.vm_get_icount(vm)
> +            if current_icount == last_icount - 1:
> +                print(f"# **** Hit replay-break at icount={current_icount}, pc={hex(pc)} ****")
> +                logger.info('Reached the end (icount %s)' % (current_icount))
> +            else:
> +                print(f"# **** Hit breakpoint at icount={current_icount}, pc={hex(pc)} ****")
> +                logger.info('Hit a breakpoint again at %x (icount %s)' %
> +                            (pc, current_icount))
> +
> +            logger.info('Running reverse continue to reach %x' % steps[-1])
> +            # reverse-continue - will return after stopping at the breakpoint.
> +            gdb.cli("reverse-continue")
> +
> +            # Assume that none of the first instructions are executed again
> +            # breaking the order of the breakpoints.
> +            # steps[-1] is the first saved $pc in reverse order.
> +            pc = gdb.cli("print $pc").get_addr()
> +            first_pc_in_rev_order = steps[-1]
> +            if pc == first_pc_in_rev_order:
> +                print(f"# **** Hit breakpoint at the first PC in reverse order ({hex(pc)}) ****")
> +                logger.info('Successfully reached breakpoint at %x' % first_pc_in_rev_order)
> +            else:
> +                logger.info('Failed to reach breakpoint at %x' % first_pc_in_rev_order)
> +                self.fail("'reverse-continue' did not hit the first PC in reverse order!")
> +
> +            logger.info('Exiting GDB and QEMU...')
> +            gdb.exit()
> +            vm.shutdown()
> +
> +            logger.info('Test passed.')
> +
> +        except GdbTimeoutError:
> +            self.fail("Connection to gdbstub timeouted...")

-- 
Alex Bennée
Virtualisation Tech Lead @ Linaro