[PATCH v2 24/24] python/aqmp: add AsyncProtocol unit tests

John Snow posted 24 patches 4 years, 6 months ago
Maintainers: Eduardo Habkost <ehabkost@redhat.com>, Cleber Rosa <crosa@redhat.com>, John Snow <jsnow@redhat.com>
There is a newer version of this series
[PATCH v2 24/24] python/aqmp: add AsyncProtocol unit tests
Posted by John Snow 4 years, 6 months ago
This tests most of protocol.py -- From a hacked up Coverage.py run, it's
at about 86%. There's a few error cases that aren't very well tested
yet, they're hard to induce artificially so far. I'm working on it.

Signed-off-by: John Snow <jsnow@redhat.com>
---
 python/tests/null_proto.py |  67 ++++++
 python/tests/protocol.py   | 458 +++++++++++++++++++++++++++++++++++++
 2 files changed, 525 insertions(+)
 create mode 100644 python/tests/null_proto.py
 create mode 100644 python/tests/protocol.py

diff --git a/python/tests/null_proto.py b/python/tests/null_proto.py
new file mode 100644
index 00000000000..c697efc0001
--- /dev/null
+++ b/python/tests/null_proto.py
@@ -0,0 +1,67 @@
+import asyncio
+
+from qemu.aqmp.protocol import AsyncProtocol
+
+
+class NullProtocol(AsyncProtocol[None]):
+    """
+    NullProtocol is a test mockup of an AsyncProtocol implementation.
+
+    It adds a fake_session instance variable that enables a code path
+    that bypasses the actual connection logic, but still allows the
+    reader/writers to start.
+
+    Because the message type is defined as None, an asyncio.Event named
+    'trigger_input' is created that prohibits the reader from
+    incessantly being able to yield None; this input can be poked to
+    simulate an incoming message.
+
+    For testing symmetry with do_recv, an interface is added to "send" a
+    Null message.
+
+    For testing purposes, a "simulate_disconnection" method is also
+    added which allows us to trigger a bottom half disconnect without
+    injecting any real errors into the reader/writer loops; in essence
+    it performs exactly half of what disconnect() normally does.
+    """
+    def __init__(self, name=None):
+        self.fake_session = False
+        self.trigger_input: asyncio.Event
+        super().__init__(name)
+
+    async def _establish_session(self):
+        self.trigger_input = asyncio.Event()
+        await super()._establish_session()
+
+    async def _do_accept(self, address, ssl=None):
+        if not self.fake_session:
+            await super()._do_accept(address, ssl)
+
+    async def _do_connect(self, address, ssl=None):
+        if not self.fake_session:
+            await super()._do_connect(address, ssl)
+
+    async def _do_recv(self) -> None:
+        await self.trigger_input.wait()
+        self.trigger_input.clear()
+
+    def _do_send(self, msg: None) -> None:
+        pass
+
+    async def send_msg(self) -> None:
+        await self._outgoing.put(None)
+
+    async def simulate_disconnect(self) -> None:
+        # Simulates a bottom half disconnect, e.g. schedules a
+        # disconnection but does not wait for it to complete. This is
+        # used to put the loop into the DISCONNECTING state without
+        # fully quiescing it back to IDLE; this is normally something
+        # you cannot coax AsyncProtocol to do on purpose, but it will be
+        # similar to what happens with an unhandled Exception in the
+        # reader/writer.
+        #
+        # Under normal circumstances, the library design requires you to
+        # await on disconnect(), which awaits the disconnect task and
+        # returns bottom half errors as a pre-condition to allowing the
+        # loop to return back to IDLE.
+        self._schedule_disconnect()
diff --git a/python/tests/protocol.py b/python/tests/protocol.py
new file mode 100644
index 00000000000..2374d01365e
--- /dev/null
+++ b/python/tests/protocol.py
@@ -0,0 +1,458 @@
+import asyncio
+from contextlib import contextmanager
+import os
+import socket
+from tempfile import TemporaryDirectory
+
+import avocado
+
+from qemu.aqmp import ConnectError, Runstate
+from qemu.aqmp.protocol import StateError
+from qemu.aqmp.util import asyncio_run, create_task
+
+# An Avocado bug prevents us from defining this testing class in-line here:
+from null_proto import NullProtocol
+
+
+def run_as_task(coro, allow_cancellation=False):
+    # This helper runs a given coroutine as a task, wrapping it in a
+    # try...except that allows it to be cancelled gracefully.
+    async def _runner():
+        try:
+            await coro
+        except asyncio.CancelledError:
+            if allow_cancellation:
+                return
+            raise
+    return create_task(_runner())
+
+
+@contextmanager
+def jammed_socket():
+    # This method opens up a random TCP port on localhost, then jams it.
+    socks = []
+
+    try:
+        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+        sock.bind(('127.0.0.1', 0))
+        sock.listen(1)
+        address = sock.getsockname()
+
+        socks.append(sock)
+
+        # I don't *fully* understand why, but it takes *two* un-accepted
+        # connections to start jamming the socket.
+        for _ in range(2):
+            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+            sock.connect(address)
+            socks.append(sock)
+
+        yield address
+
+    finally:
+        for sock in socks:
+            sock.close()
+
+
+class Smoke(avocado.Test):
+
+    def setUp(self):
+        self.proto = NullProtocol()
+
+    def test__repr__(self):
+        self.assertEqual(
+            repr(self.proto),
+            "<NullProtocol runstate=IDLE>"
+        )
+
+    def testRunstate(self):
+        self.assertEqual(
+            self.proto.runstate,
+            Runstate.IDLE
+        )
+
+    def testDefaultName(self):
+        self.assertEqual(
+            self.proto.name,
+            None
+        )
+
+    def testLogger(self):
+        self.assertEqual(
+            self.proto.logger.name,
+            'qemu.aqmp.protocol'
+        )
+
+    def testName(self):
+        self.proto = NullProtocol('Steve')
+
+        self.assertEqual(
+            self.proto.name,
+            'Steve'
+        )
+
+        self.assertEqual(
+            self.proto.logger.name,
+            'qemu.aqmp.protocol.Steve'
+        )
+
+        self.assertEqual(
+            repr(self.proto),
+            "<NullProtocol name='Steve' runstate=IDLE>"
+        )
+
+
+class TestBase(avocado.Test):
+
+    def setUp(self):
+        self.proto = NullProtocol(type(self).__name__)
+        self.assertEqual(self.proto.runstate, Runstate.IDLE)
+        self.runstate_watcher = None
+
+    def tearDown(self):
+        self.assertEqual(self.proto.runstate, Runstate.IDLE)
+
+    async def _asyncSetUp(self):
+        pass
+
+    async def _asyncTearDown(self):
+        if self.runstate_watcher:
+            await self.runstate_watcher
+
+    def _asyncRunner(self, test_coroutine):
+        async def coroutine():
+            await self._asyncSetUp()
+            await test_coroutine
+            await self._asyncTearDown()
+
+        asyncio_run(coroutine(), debug=True)
+
+    # Definitions
+
+    # The states we expect a "bad" connect/accept attempt to transition through
+    BAD_CONNECTION_STATES = (
+        Runstate.CONNECTING,
+        Runstate.DISCONNECTING,
+        Runstate.IDLE,
+    )
+
+    # The states we expect a "good" session to transition through
+    GOOD_CONNECTION_STATES = (
+        Runstate.CONNECTING,
+        Runstate.RUNNING,
+        Runstate.DISCONNECTING,
+        Runstate.IDLE,
+    )
+
+    # Helpers
+
+    async def _watch_runstates(self, *states):
+        # This launches a task alongside most tests below to confirm that the
+        # sequence of runstate changes is exactly as anticipated.
+
+        async def _watcher():
+            for state in states:
+                new_state = await self.proto.runstate_changed()
+                self.assertEqual(
+                    new_state,
+                    state,
+                    msg=f"Expected state '{state.name}'",
+                )
+
+        self.runstate_watcher = create_task(_watcher())
+        # Kick the loop and force the task to block on the event.
+        await asyncio.sleep(0)
+
+
+class State(TestBase):
+
+    async def testSuperfluousDisconnect_(self):
+        await self._watch_runstates(
+            Runstate.DISCONNECTING,
+            Runstate.IDLE,
+        )
+        await self.proto.disconnect()
+
+    def testSuperfluousDisconnect(self):
+        self._asyncRunner(self.testSuperfluousDisconnect_())
+
+
+class Connect(TestBase):
+
+    async def _bad_connection(self, family: str):
+        assert family in ('INET', 'UNIX')
+
+        if family == 'INET':
+            await self.proto.connect(('127.0.0.1', 0))
+        elif family == 'UNIX':
+            await self.proto.connect('/dev/null')
+
+    async def _hanging_connection(self):
+        with jammed_socket() as addr:
+            await self.proto.connect(addr)
+
+    async def _bad_connection_test(self, family: str):
+        await self._watch_runstates(*self.BAD_CONNECTION_STATES)
+
+        with self.assertRaises(ConnectError) as context:
+            await self._bad_connection(family)
+
+        self.assertIsInstance(context.exception.exc, OSError)
+        self.assertEqual(
+            context.exception.error_message,
+            "Failed to establish connection"
+        )
+
+    def testBadINET(self):
+        self._asyncRunner(self._bad_connection_test('INET'))
+        # self.assertIsInstance(err.exc, ConnectionRefusedError)
+
+    def testBadUNIX(self):
+        self._asyncRunner(self._bad_connection_test('UNIX'))
+        # self.assertIsInstance(err.exc, ConnectionRefusedError)
+
+    async def testCancellation_(self):
+        # Note that accept() cannot be cancelled outright, as it isn't a task.
+        # However, we can wrap it in a task and cancel *that*.
+        await self._watch_runstates(*self.BAD_CONNECTION_STATES)
+        task = run_as_task(self._hanging_connection(), allow_cancellation=True)
+
+        state = await self.proto.runstate_changed()
+        self.assertEqual(state, Runstate.CONNECTING)
+
+        # This is insider baseball, but the connection attempt has
+        # yielded *just* before the actual connection attempt, so kick
+        # the loop to make sure it's truly wedged.
+        await asyncio.sleep(0)
+
+        task.cancel()
+        await task
+
+    def testCancellation(self):
+        self._asyncRunner(self.testCancellation_())
+
+    async def testTimeout_(self):
+        await self._watch_runstates(*self.BAD_CONNECTION_STATES)
+        task = run_as_task(self._hanging_connection())
+
+        # More insider baseball: to improve the speed of this test while
+        # guaranteeing that the connection even gets a chance to start,
+        # verify that the connection hangs *first*, then await the
+        # result of the task with a nearly-zero timeout.
+
+        state = await self.proto.runstate_changed()
+        self.assertEqual(state, Runstate.CONNECTING)
+        await asyncio.sleep(0)
+
+        with self.assertRaises(asyncio.TimeoutError):
+            await asyncio.wait_for(task, timeout=0)
+
+    def testTimeout(self):
+        self._asyncRunner(self.testTimeout_())
+
+    async def testRequire_(self):
+        await self._watch_runstates(*self.BAD_CONNECTION_STATES)
+        task = run_as_task(self._hanging_connection(), allow_cancellation=True)
+
+        state = await self.proto.runstate_changed()
+        self.assertEqual(state, Runstate.CONNECTING)
+
+        with self.assertRaises(StateError) as context:
+            await self._bad_connection('UNIX')
+
+        self.assertEqual(
+            context.exception.error_message,
+            "NullProtocol is currently connecting."
+        )
+        self.assertEqual(context.exception.state, Runstate.CONNECTING)
+        self.assertEqual(context.exception.required, Runstate.IDLE)
+
+        task.cancel()
+        await task
+
+    def testRequire(self):
+        self._asyncRunner(self.testRequire_())
+
+    async def testImplicitRunstateInit_(self):
+        # This tests what happens if we do not wait on the
+        # runstate until AFTER we connect, i.e., connect()/accept()
+        # themselves initialize the runstate event. All of the above
+        # tests force the initialization by waiting on the runstate
+        # *first*.
+        task = run_as_task(self._hanging_connection(), allow_cancellation=True)
+
+        # Kick the loop to coerce the state change
+        await asyncio.sleep(0)
+        assert self.proto.runstate == Runstate.CONNECTING
+
+        # We already missed the transition to CONNECTING
+        await self._watch_runstates(Runstate.DISCONNECTING, Runstate.IDLE)
+
+        task.cancel()
+        await task
+
+    def testImplicitRunstateInit(self):
+        self._asyncRunner(self.testImplicitRunstateInit_())
+
+
+class Accept(Connect):
+
+    async def _bad_connection(self, family: str):
+        assert family in ('INET', 'UNIX')
+
+        if family == 'INET':
+            await self.proto.accept(('example.com', 1))
+        elif family == 'UNIX':
+            await self.proto.accept('/dev/null')
+
+    async def _hanging_connection(self):
+        with TemporaryDirectory(suffix='.aqmp') as tmpdir:
+            sock = os.path.join(tmpdir, type(self.proto).__name__ + ".sock")
+            await self.proto.accept(sock)
+
+
+class FakeSession(TestBase):
+
+    def setUp(self):
+        super().setUp()
+        self.proto.fake_session = True
+
+    async def _asyncSetUp(self):
+        await super()._asyncSetUp()
+        await self._watch_runstates(*self.GOOD_CONNECTION_STATES)
+
+    async def _asyncTearDown(self):
+        await self.proto.disconnect()
+        await super()._asyncTearDown()
+
+    ####
+
+    async def testFakeConnect_(self):
+        await self.proto.connect('/not/a/real/path')
+        self.assertEqual(self.proto.runstate, Runstate.RUNNING)
+
+    def testFakeConnect(self):
+        """Test the full state lifecycle (via connect) with a no-op session."""
+        self._asyncRunner(self.testFakeConnect_())
+
+    async def testFakeAccept_(self):
+        await self.proto.accept('/not/a/real/path')
+        self.assertEqual(self.proto.runstate, Runstate.RUNNING)
+
+    def testFakeAccept(self):
+        """Test the full state lifecycle (via accept) with a no-op session."""
+        self._asyncRunner(self.testFakeAccept_())
+
+    async def testFakeRecv_(self):
+        await self.proto.accept('/not/a/real/path')
+
+        logname = self.proto.logger.name
+        with self.assertLogs(logname, level='DEBUG') as context:
+            self.proto.trigger_input.set()
+            self.proto.trigger_input.clear()
+            await asyncio.sleep(0)  # Kick reader.
+
+        self.assertEqual(
+            context.output,
+            [f"DEBUG:{logname}:<-- None"],
+        )
+
+    def testFakeRecv(self):
+        """Test receiving a fake/null message."""
+        self._asyncRunner(self.testFakeRecv_())
+
+    async def testFakeSend_(self):
+        await self.proto.accept('/not/a/real/path')
+
+        logname = self.proto.logger.name
+        with self.assertLogs(logname, level='DEBUG') as context:
+            # Cheat: Send a Null message to nobody.
+            await self.proto.send_msg()
+            # Kick writer; awaiting on a queue.put isn't sufficient to yield.
+            await asyncio.sleep(0)
+
+        self.assertEqual(
+            context.output,
+            [f"DEBUG:{logname}:--> None"],
+        )
+
+    def testFakeSend(self):
+        """Test sending a fake/null message."""
+        self._asyncRunner(self.testFakeSend_())
+
+    async def _prod_session_api(
+            self,
+            current_state: Runstate,
+            error_message: str,
+            accept: bool = True
+    ):
+        with self.assertRaises(StateError) as context:
+            if accept:
+                await self.proto.accept('/not/a/real/path')
+            else:
+                await self.proto.connect('/not/a/real/path')
+
+        self.assertEqual(context.exception.error_message, error_message)
+        self.assertEqual(context.exception.state, current_state)
+        self.assertEqual(context.exception.required, Runstate.IDLE)
+
+    async def testAcceptRequireRunning_(self):
+        await self.proto.accept('/not/a/real/path')
+
+        await self._prod_session_api(
+            Runstate.RUNNING,
+            "NullProtocol is already connected and running.",
+            accept=True,
+        )
+
+    def testAcceptRequireRunning(self):
+        """Test that accept() cannot be called when Runstate=RUNNING"""
+        self._asyncRunner(self.testAcceptRequireRunning_())
+
+    async def testConnectRequireRunning_(self):
+        await self.proto.accept('/not/a/real/path')
+
+        await self._prod_session_api(
+            Runstate.RUNNING,
+            "NullProtocol is already connected and running.",
+            accept=False,
+        )
+
+    def testConnectRequireRunning(self):
+        """Test that connect() cannot be called when Runstate=RUNNING"""
+        self._asyncRunner(self.testConnectRequireRunning_())
+
+    async def testAcceptRequireDisconnecting_(self):
+        await self.proto.accept('/not/a/real/path')
+
+        # Cheat: force a disconnect.
+        await self.proto.simulate_disconnect()
+
+        await self._prod_session_api(
+            Runstate.DISCONNECTING,
+            ("NullProtocol is disconnecting."
+             " Call disconnect() to return to IDLE state."),
+            accept=True,
+        )
+
+    def testAcceptRequireDisconnecting(self):
+        """Test that accept() cannot be called when Runstate=DISCONNECTING"""
+        self._asyncRunner(self.testAcceptRequireDisconnecting_())
+
+    async def testConnectRequireDisconnecting_(self):
+        await self.proto.accept('/not/a/real/path')
+
+        # Cheat: force a disconnect.
+        await self.proto.simulate_disconnect()
+
+        await self._prod_session_api(
+            Runstate.DISCONNECTING,
+            ("NullProtocol is disconnecting."
+             " Call disconnect() to return to IDLE state."),
+            accept=False,
+        )
+
+    def testConnectRequireDisconnecting(self):
+        """Test that connect() cannot be called when Runstate=DISCONNECTING"""
+        self._asyncRunner(self.testConnectRequireDisconnecting_())
-- 
2.31.1


Re: [PATCH v2 24/24] python/aqmp: add AsyncProtocol unit tests
Posted by Beraldo Leal 4 years, 6 months ago
On Fri, Jul 16, 2021 at 08:32:53PM -0400, John Snow wrote:
> This tests most of protocol.py -- From a hacked up Coverage.py run, it's
> at about 86%. There's a few error cases that aren't very well tested
> yet, they're hard to induce artificially so far. I'm working on it.
> 
> Signed-off-by: John Snow <jsnow@redhat.com>
> ---
>  python/tests/null_proto.py |  67 ++++++
>  python/tests/protocol.py   | 458 +++++++++++++++++++++++++++++++++++++
>  2 files changed, 525 insertions(+)
>  create mode 100644 python/tests/null_proto.py
>  create mode 100644 python/tests/protocol.py
> 
> diff --git a/python/tests/null_proto.py b/python/tests/null_proto.py
> new file mode 100644
> index 00000000000..c697efc0001
> --- /dev/null
> +++ b/python/tests/null_proto.py
> @@ -0,0 +1,67 @@
> +import asyncio
> +
> +from qemu.aqmp.protocol import AsyncProtocol
> +
> +
> +class NullProtocol(AsyncProtocol[None]):
> +    """
> +    NullProtocol is a test mockup of an AsyncProtocol implementation.
> +
> +    It adds a fake_session instance variable that enables a code path
> +    that bypasses the actual connection logic, but still allows the
> +    reader/writers to start.
> +
> +    Because the message type is defined as None, an asyncio.Event named
> +    'trigger_input' is created that prohibits the reader from
> +    incessantly being able to yield None; this input can be poked to
> +    simulate an incoming message.
> +
> +    For testing symmetry with do_recv, an interface is added to "send" a
> +    Null message.
> +
> +    For testing purposes, a "simulate_disconnection" method is also
> +    added which allows us to trigger a bottom half disconnect without
> +    injecting any real errors into the reader/writer loops; in essence
> +    it performs exactly half of what disconnect() normally does.
> +    """
> +    def __init__(self, name=None):
> +        self.fake_session = False
> +        self.trigger_input: asyncio.Event
> +        super().__init__(name)
> +
> +    async def _establish_session(self):
> +        self.trigger_input = asyncio.Event()
> +        await super()._establish_session()
> +
> +    async def _do_accept(self, address, ssl=None):
> +        if not self.fake_session:
> +            await super()._do_accept(address, ssl)
> +
> +    async def _do_connect(self, address, ssl=None):
> +        if not self.fake_session:
> +            await super()._do_connect(address, ssl)
> +
> +    async def _do_recv(self) -> None:
> +        await self.trigger_input.wait()
> +        self.trigger_input.clear()
> +
> +    def _do_send(self, msg: None) -> None:
> +        pass
> +
> +    async def send_msg(self) -> None:
> +        await self._outgoing.put(None)
> +
> +    async def simulate_disconnect(self) -> None:
> +        # Simulates a bottom half disconnect, e.g. schedules a
> +        # disconnection but does not wait for it to complete. This is
> +        # used to put the loop into the DISCONNECTING state without
> +        # fully quiescing it back to IDLE; this is normally something
> +        # you cannot coax AsyncProtocol to do on purpose, but it will be
> +        # similar to what happens with an unhandled Exception in the
> +        # reader/writer.
> +        #
> +        # Under normal circumstances, the library design requires you to
> +        # await on disconnect(), which awaits the disconnect task and
> +        # returns bottom half errors as a pre-condition to allowing the
> +        # loop to return back to IDLE.
> +        self._schedule_disconnect()

Nitpick: Any reason for not using a docstring? I wouldn't mind if it was
a docstring instead. ;)

> diff --git a/python/tests/protocol.py b/python/tests/protocol.py
> new file mode 100644
> index 00000000000..2374d01365e
> --- /dev/null
> +++ b/python/tests/protocol.py
> @@ -0,0 +1,458 @@
> +import asyncio
> +from contextlib import contextmanager
> +import os
> +import socket
> +from tempfile import TemporaryDirectory
> +
> +import avocado
> +
> +from qemu.aqmp import ConnectError, Runstate
> +from qemu.aqmp.protocol import StateError
> +from qemu.aqmp.util import asyncio_run, create_task

Nitpick: Maybe an isort?

> +# An Avocado bug prevents us from defining this testing class in-line here:
> +from null_proto import NullProtocol

Is this what you are looking for?

https://github.com/avocado-framework/avocado/pull/4764

If not, can you point to the right issue, please?

> +@contextmanager
> +def jammed_socket():
> +    # This method opens up a random TCP port on localhost, then jams it.
> +    socks = []
> +
> +    try:
> +        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
> +        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
> +        sock.bind(('127.0.0.1', 0))
> +        sock.listen(1)
> +        address = sock.getsockname()
> +
> +        socks.append(sock)
> +
> +        # I don't *fully* understand why, but it takes *two* un-accepted
> +        # connections to start jamming the socket.
> +        for _ in range(2):
> +            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
> +            sock.connect(address)
> +            socks.append(sock)
> +
> +        yield address
> +
> +    finally:
> +        for sock in socks:
> +            sock.close()
> +
> +
> +class Smoke(avocado.Test):
> +
> +    def setUp(self):
> +        self.proto = NullProtocol()
> +
> +    def test__repr__(self):
> +        self.assertEqual(
> +            repr(self.proto),
> +            "<NullProtocol runstate=IDLE>"
> +        )
> +
> +    def testRunstate(self):
> +        self.assertEqual(
> +            self.proto.runstate,
> +            Runstate.IDLE
> +        )
> +
> +    def testDefaultName(self):
> +        self.assertEqual(
> +            self.proto.name,
> +            None
> +        )
> +
> +    def testLogger(self):
> +        self.assertEqual(
> +            self.proto.logger.name,
> +            'qemu.aqmp.protocol'
> +        )
> +
> +    def testName(self):
> +        self.proto = NullProtocol('Steve')
> +
> +        self.assertEqual(
> +            self.proto.name,
> +            'Steve'
> +        )
> +
> +        self.assertEqual(
> +            self.proto.logger.name,
> +            'qemu.aqmp.protocol.Steve'
> +        )
> +
> +        self.assertEqual(
> +            repr(self.proto),
> +            "<NullProtocol name='Steve' runstate=IDLE>"
> +        )
> +
> +
> +class TestBase(avocado.Test):
> +
> +    def setUp(self):
> +        self.proto = NullProtocol(type(self).__name__)
> +        self.assertEqual(self.proto.runstate, Runstate.IDLE)
> +        self.runstate_watcher = None
> +
> +    def tearDown(self):
> +        self.assertEqual(self.proto.runstate, Runstate.IDLE)
> +
> +    async def _asyncSetUp(self):
> +        pass
> +
> +    async def _asyncTearDown(self):
> +        if self.runstate_watcher:
> +            await self.runstate_watcher
> +
> +    def _asyncRunner(self, test_coroutine):
> +        async def coroutine():
> +            await self._asyncSetUp()
> +            await test_coroutine
> +            await self._asyncTearDown()
> +
> +        asyncio_run(coroutine(), debug=True)
> +
> +    # Definitions
> +
> +    # The states we expect a "bad" connect/accept attempt to transition through
> +    BAD_CONNECTION_STATES = (
> +        Runstate.CONNECTING,
> +        Runstate.DISCONNECTING,
> +        Runstate.IDLE,
> +    )
> +
> +    # The states we expect a "good" session to transition through
> +    GOOD_CONNECTION_STATES = (
> +        Runstate.CONNECTING,
> +        Runstate.RUNNING,
> +        Runstate.DISCONNECTING,
> +        Runstate.IDLE,
> +    )
> +
> +    # Helpers
> +
> +    async def _watch_runstates(self, *states):
> +        # This launches a task alongside most tests below to confirm that the
> +        # sequence of runstate changes is exactly as anticipated.
> +
> +        async def _watcher():
> +            for state in states:
> +                new_state = await self.proto.runstate_changed()
> +                self.assertEqual(
> +                    new_state,
> +                    state,
> +                    msg=f"Expected state '{state.name}'",
> +                )
> +
> +        self.runstate_watcher = create_task(_watcher())
> +        # Kick the loop and force the task to block on the event.
> +        await asyncio.sleep(0)
> +
> +
> +class State(TestBase):
> +
> +    async def testSuperfluousDisconnect_(self):
> +        await self._watch_runstates(
> +            Runstate.DISCONNECTING,
> +            Runstate.IDLE,
> +        )
> +        await self.proto.disconnect()
> +
> +    def testSuperfluousDisconnect(self):
> +        self._asyncRunner(self.testSuperfluousDisconnect_())
> +
> +
> +class Connect(TestBase):
> +
> +    async def _bad_connection(self, family: str):
> +        assert family in ('INET', 'UNIX')
> +
> +        if family == 'INET':
> +            await self.proto.connect(('127.0.0.1', 0))
> +        elif family == 'UNIX':
> +            await self.proto.connect('/dev/null')
> +
> +    async def _hanging_connection(self):
> +        with jammed_socket() as addr:
> +            await self.proto.connect(addr)
> +
> +    async def _bad_connection_test(self, family: str):
> +        await self._watch_runstates(*self.BAD_CONNECTION_STATES)
> +
> +        with self.assertRaises(ConnectError) as context:
> +            await self._bad_connection(family)
> +
> +        self.assertIsInstance(context.exception.exc, OSError)
> +        self.assertEqual(
> +            context.exception.error_message,
> +            "Failed to establish connection"
> +        )
> +
> +    def testBadINET(self):
> +        self._asyncRunner(self._bad_connection_test('INET'))
> +        # self.assertIsInstance(err.exc, ConnectionRefusedError)
> +
> +    def testBadUNIX(self):
> +        self._asyncRunner(self._bad_connection_test('UNIX'))
> +        # self.assertIsInstance(err.exc, ConnectionRefusedError)
> +
> +    async def testCancellation_(self):
> +        # Note that accept() cannot be cancelled outright, as it isn't a task.
> +        # However, we can wrap it in a task and cancel *that*.
> +        await self._watch_runstates(*self.BAD_CONNECTION_STATES)
> +        task = run_as_task(self._hanging_connection(), allow_cancellation=True)
> +
> +        state = await self.proto.runstate_changed()
> +        self.assertEqual(state, Runstate.CONNECTING)
> +
> +        # This is insider baseball, but the connection attempt has
> +        # yielded *just* before the actual connection attempt, so kick
> +        # the loop to make sure it's truly wedged.
> +        await asyncio.sleep(0)
> +
> +        task.cancel()
> +        await task
> +
> +    def testCancellation(self):
> +        self._asyncRunner(self.testCancellation_())
> +
> +    async def testTimeout_(self):
> +        await self._watch_runstates(*self.BAD_CONNECTION_STATES)
> +        task = run_as_task(self._hanging_connection())
> +
> +        # More insider baseball: to improve the speed of this test while
> +        # guaranteeing that the connection even gets a chance to start,
> +        # verify that the connection hangs *first*, then await the
> +        # result of the task with a nearly-zero timeout.
> +
> +        state = await self.proto.runstate_changed()
> +        self.assertEqual(state, Runstate.CONNECTING)
> +        await asyncio.sleep(0)
> +
> +        with self.assertRaises(asyncio.TimeoutError):
> +            await asyncio.wait_for(task, timeout=0)
> +
> +    def testTimeout(self):
> +        self._asyncRunner(self.testTimeout_())
> +
> +    async def testRequire_(self):
> +        await self._watch_runstates(*self.BAD_CONNECTION_STATES)
> +        task = run_as_task(self._hanging_connection(), allow_cancellation=True)
> +
> +        state = await self.proto.runstate_changed()
> +        self.assertEqual(state, Runstate.CONNECTING)
> +
> +        with self.assertRaises(StateError) as context:
> +            await self._bad_connection('UNIX')
> +
> +        self.assertEqual(
> +            context.exception.error_message,
> +            "NullProtocol is currently connecting."
> +        )
> +        self.assertEqual(context.exception.state, Runstate.CONNECTING)
> +        self.assertEqual(context.exception.required, Runstate.IDLE)
> +
> +        task.cancel()
> +        await task
> +
> +    def testRequire(self):
> +        self._asyncRunner(self.testRequire_())
> +
> +    async def testImplicitRunstateInit_(self):
> +        # This tests what happens if we do not wait on the
> +        # runstate until AFTER we connect, i.e., connect()/accept()
> +        # themselves initialize the runstate event. All of the above
> +        # tests force the initialization by waiting on the runstate
> +        # *first*.
> +        task = run_as_task(self._hanging_connection(), allow_cancellation=True)
> +
> +        # Kick the loop to coerce the state change
> +        await asyncio.sleep(0)
> +        assert self.proto.runstate == Runstate.CONNECTING
> +
> +        # We already missed the transition to CONNECTING
> +        await self._watch_runstates(Runstate.DISCONNECTING, Runstate.IDLE)
> +
> +        task.cancel()
> +        await task
> +
> +    def testImplicitRunstateInit(self):
> +        self._asyncRunner(self.testImplicitRunstateInit_())
> +
> +
> +class Accept(Connect):
> +
> +    async def _bad_connection(self, family: str):
> +        assert family in ('INET', 'UNIX')
> +
> +        if family == 'INET':
> +            await self.proto.accept(('example.com', 1))
> +        elif family == 'UNIX':
> +            await self.proto.accept('/dev/null')
> +
> +    async def _hanging_connection(self):
> +        with TemporaryDirectory(suffix='.aqmp') as tmpdir:
> +            sock = os.path.join(tmpdir, type(self.proto).__name__ + ".sock")
> +            await self.proto.accept(sock)
> +
> +
> +class FakeSession(TestBase):
> +
> +    def setUp(self):
> +        super().setUp()
> +        self.proto.fake_session = True
> +
> +    async def _asyncSetUp(self):
> +        await super()._asyncSetUp()
> +        await self._watch_runstates(*self.GOOD_CONNECTION_STATES)
> +
> +    async def _asyncTearDown(self):
> +        await self.proto.disconnect()
> +        await super()._asyncTearDown()
> +
> +    ####
> +
> +    async def testFakeConnect_(self):
> +        await self.proto.connect('/not/a/real/path')
> +        self.assertEqual(self.proto.runstate, Runstate.RUNNING)
> +
> +    def testFakeConnect(self):
> +        """Test the full state lifecycle (via connect) with a no-op session."""
> +        self._asyncRunner(self.testFakeConnect_())
> +
> +    async def testFakeAccept_(self):
> +        await self.proto.accept('/not/a/real/path')
> +        self.assertEqual(self.proto.runstate, Runstate.RUNNING)
> +
> +    def testFakeAccept(self):
> +        """Test the full state lifecycle (via accept) with a no-op session."""
> +        self._asyncRunner(self.testFakeAccept_())
> +
> +    async def testFakeRecv_(self):
> +        await self.proto.accept('/not/a/real/path')
> +
> +        logname = self.proto.logger.name
> +        with self.assertLogs(logname, level='DEBUG') as context:
> +            self.proto.trigger_input.set()
> +            self.proto.trigger_input.clear()
> +            await asyncio.sleep(0)  # Kick reader.
> +
> +        self.assertEqual(
> +            context.output,
> +            [f"DEBUG:{logname}:<-- None"],
> +        )
> +
> +    def testFakeRecv(self):
> +        """Test receiving a fake/null message."""
> +        self._asyncRunner(self.testFakeRecv_())
> +
> +    async def testFakeSend_(self):
> +        await self.proto.accept('/not/a/real/path')
> +
> +        logname = self.proto.logger.name
> +        with self.assertLogs(logname, level='DEBUG') as context:
> +            # Cheat: Send a Null message to nobody.
> +            await self.proto.send_msg()
> +            # Kick writer; awaiting on a queue.put isn't sufficient to yield.
> +            await asyncio.sleep(0)
> +
> +        self.assertEqual(
> +            context.output,
> +            [f"DEBUG:{logname}:--> None"],
> +        )
> +
> +    def testFakeSend(self):
> +        """Test sending a fake/null message."""
> +        self._asyncRunner(self.testFakeSend_())
> +
> +    async def _prod_session_api(
> +            self,
> +            current_state: Runstate,
> +            error_message: str,
> +            accept: bool = True
> +    ):
> +        with self.assertRaises(StateError) as context:
> +            if accept:
> +                await self.proto.accept('/not/a/real/path')
> +            else:
> +                await self.proto.connect('/not/a/real/path')
> +
> +        self.assertEqual(context.exception.error_message, error_message)
> +        self.assertEqual(context.exception.state, current_state)
> +        self.assertEqual(context.exception.required, Runstate.IDLE)
> +
> +    async def testAcceptRequireRunning_(self):
> +        await self.proto.accept('/not/a/real/path')
> +
> +        await self._prod_session_api(
> +            Runstate.RUNNING,
> +            "NullProtocol is already connected and running.",
> +            accept=True,
> +        )
> +
> +    def testAcceptRequireRunning(self):
> +        """Test that accept() cannot be called when Runstate=RUNNING"""
> +        self._asyncRunner(self.testAcceptRequireRunning_())
> +
> +    async def testConnectRequireRunning_(self):
> +        await self.proto.accept('/not/a/real/path')
> +
> +        await self._prod_session_api(
> +            Runstate.RUNNING,
> +            "NullProtocol is already connected and running.",
> +            accept=False,
> +        )
> +
> +    def testConnectRequireRunning(self):
> +        """Test that connect() cannot be called when Runstate=RUNNING"""
> +        self._asyncRunner(self.testConnectRequireRunning_())
> +
> +    async def testAcceptRequireDisconnecting_(self):
> +        await self.proto.accept('/not/a/real/path')
> +
> +        # Cheat: force a disconnect.
> +        await self.proto.simulate_disconnect()
> +
> +        await self._prod_session_api(
> +            Runstate.DISCONNECTING,
> +            ("NullProtocol is disconnecting."
> +             " Call disconnect() to return to IDLE state."),
> +            accept=True,
> +        )
> +
> +    def testAcceptRequireDisconnecting(self):
> +        """Test that accept() cannot be called when Runstate=DISCONNECTING"""
> +        self._asyncRunner(self.testAcceptRequireDisconnecting_())
> +
> +    async def testConnectRequireDisconnecting_(self):
> +        await self.proto.accept('/not/a/real/path')
> +
> +        # Cheat: force a disconnect.
> +        await self.proto.simulate_disconnect()
> +
> +        await self._prod_session_api(
> +            Runstate.DISCONNECTING,
> +            ("NullProtocol is disconnecting."
> +             " Call disconnect() to return to IDLE state."),
> +            accept=False,
> +        )
> +
> +    def testConnectRequireDisconnecting(self):
> +        """Test that connect() cannot be called when Runstate=DISCONNECTING"""
> +        self._asyncRunner(self.testConnectRequireDisconnecting_())
> -- 
> 2.31.1

Besides that, I just would like to bring to the table that Avocado has
now a basic support for coroutines as tests that might help here. IIUC,
some of the boilerplate code (and duplicated methods) could be removed
with this:

https://github.com/avocado-framework/avocado/pull/4788

In any case, I understand if the latest version is not an option here,
so:

Reviewed-by: Beraldo Leal <bleal@redhat.com>

Thanks,
--
Beraldo


Re: [PATCH v2 24/24] python/aqmp: add AsyncProtocol unit tests
Posted by John Snow 4 years, 6 months ago
On Tue, Jul 20, 2021 at 4:34 PM Beraldo Leal <bleal@redhat.com> wrote:

> On Fri, Jul 16, 2021 at 08:32:53PM -0400, John Snow wrote:
> > This tests most of protocol.py -- From a hacked up Coverage.py run, it's
> > at about 86%. There's a few error cases that aren't very well tested
> > yet, they're hard to induce artificially so far. I'm working on it.
> >
> > Signed-off-by: John Snow <jsnow@redhat.com>
> > ---
> >  python/tests/null_proto.py |  67 ++++++
> >  python/tests/protocol.py   | 458 +++++++++++++++++++++++++++++++++++++
> >  2 files changed, 525 insertions(+)
> >  create mode 100644 python/tests/null_proto.py
> >  create mode 100644 python/tests/protocol.py
> >
> > diff --git a/python/tests/null_proto.py b/python/tests/null_proto.py
> > new file mode 100644
> > index 00000000000..c697efc0001
> > --- /dev/null
> > +++ b/python/tests/null_proto.py
> > @@ -0,0 +1,67 @@
> > +import asyncio
> > +
> > +from qemu.aqmp.protocol import AsyncProtocol
> > +
> > +
> > +class NullProtocol(AsyncProtocol[None]):
> > +    """
> > +    NullProtocol is a test mockup of an AsyncProtocol implementation.
> > +
> > +    It adds a fake_session instance variable that enables a code path
> > +    that bypasses the actual connection logic, but still allows the
> > +    reader/writers to start.
> > +
> > +    Because the message type is defined as None, an asyncio.Event named
> > +    'trigger_input' is created that prohibits the reader from
> > +    incessantly being able to yield None; this input can be poked to
> > +    simulate an incoming message.
> > +
> > +    For testing symmetry with do_recv, an interface is added to "send" a
> > +    Null message.
> > +
> > +    For testing purposes, a "simulate_disconnection" method is also
> > +    added which allows us to trigger a bottom half disconnect without
> > +    injecting any real errors into the reader/writer loops; in essence
> > +    it performs exactly half of what disconnect() normally does.
> > +    """
> > +    def __init__(self, name=None):
> > +        self.fake_session = False
> > +        self.trigger_input: asyncio.Event
> > +        super().__init__(name)
> > +
> > +    async def _establish_session(self):
> > +        self.trigger_input = asyncio.Event()
> > +        await super()._establish_session()
> > +
> > +    async def _do_accept(self, address, ssl=None):
> > +        if not self.fake_session:
> > +            await super()._do_accept(address, ssl)
> > +
> > +    async def _do_connect(self, address, ssl=None):
> > +        if not self.fake_session:
> > +            await super()._do_connect(address, ssl)
> > +
> > +    async def _do_recv(self) -> None:
> > +        await self.trigger_input.wait()
> > +        self.trigger_input.clear()
> > +
> > +    def _do_send(self, msg: None) -> None:
> > +        pass
> > +
> > +    async def send_msg(self) -> None:
> > +        await self._outgoing.put(None)
> > +
> > +    async def simulate_disconnect(self) -> None:
> > +        # Simulates a bottom half disconnect, e.g. schedules a
> > +        # disconnection but does not wait for it to complete. This is
> > +        # used to put the loop into the DISCONNECTING state without
> > +        # fully quiescing it back to IDLE; this is normally something
> > +        # you cannot coax AsyncProtocol to do on purpose, but it will be
> > +        # similar to what happens with an unhandled Exception in the
> > +        # reader/writer.
> > +        #
> > +        # Under normal circumstances, the library design requires you to
> > +        # await on disconnect(), which awaits the disconnect task and
> > +        # returns bottom half errors as a pre-condition to allowing the
> > +        # loop to return back to IDLE.
> > +        self._schedule_disconnect()
>
> Nitpick: Any reason for not using a docstring? I wouldn't mind if it was
> a docstring instead. ;)
>
>
Nope. I've changed it.


> > diff --git a/python/tests/protocol.py b/python/tests/protocol.py
> > new file mode 100644
> > index 00000000000..2374d01365e
> > --- /dev/null
> > +++ b/python/tests/protocol.py
> > @@ -0,0 +1,458 @@
> > +import asyncio
> > +from contextlib import contextmanager
> > +import os
> > +import socket
> > +from tempfile import TemporaryDirectory
> > +
> > +import avocado
> > +
> > +from qemu.aqmp import ConnectError, Runstate
> > +from qemu.aqmp.protocol import StateError
> > +from qemu.aqmp.util import asyncio_run, create_task
>
> Nitpick: Maybe an isort?
>
>
It actually is isorted, just using some different settings than you're used
to seeing in Avocado.


> > +# An Avocado bug prevents us from defining this testing class in-line
> here:
> > +from null_proto import NullProtocol
>
> Is this what you are looking for?
>
> https://github.com/avocado-framework/avocado/pull/4764
>
> If not, can you point to the right issue, please?
>
>
That's the one. I don't have time to update to v90 right now, so I'm going
to leave it as a todo item, please forgive me! I'll update the comment,
though.


> > +@contextmanager
> > +def jammed_socket():
> > +    # This method opens up a random TCP port on localhost, then jams it.
> > +    socks = []
> > +
> > +    try:
> > +        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
> > +        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
> > +        sock.bind(('127.0.0.1', 0))
> > +        sock.listen(1)
> > +        address = sock.getsockname()
> > +
> > +        socks.append(sock)
> > +
> > +        # I don't *fully* understand why, but it takes *two* un-accepted
> > +        # connections to start jamming the socket.
> > +        for _ in range(2):
> > +            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
> > +            sock.connect(address)
> > +            socks.append(sock)
> > +
> > +        yield address
> > +
> > +    finally:
> > +        for sock in socks:
> > +            sock.close()
> > +
> > +
> > +class Smoke(avocado.Test):
> > +
> > +    def setUp(self):
> > +        self.proto = NullProtocol()
> > +
> > +    def test__repr__(self):
> > +        self.assertEqual(
> > +            repr(self.proto),
> > +            "<NullProtocol runstate=IDLE>"
> > +        )
> > +
> > +    def testRunstate(self):
> > +        self.assertEqual(
> > +            self.proto.runstate,
> > +            Runstate.IDLE
> > +        )
> > +
> > +    def testDefaultName(self):
> > +        self.assertEqual(
> > +            self.proto.name,
> > +            None
> > +        )
> > +
> > +    def testLogger(self):
> > +        self.assertEqual(
> > +            self.proto.logger.name,
> > +            'qemu.aqmp.protocol'
> > +        )
> > +
> > +    def testName(self):
> > +        self.proto = NullProtocol('Steve')
> > +
> > +        self.assertEqual(
> > +            self.proto.name,
> > +            'Steve'
> > +        )
> > +
> > +        self.assertEqual(
> > +            self.proto.logger.name,
> > +            'qemu.aqmp.protocol.Steve'
> > +        )
> > +
> > +        self.assertEqual(
> > +            repr(self.proto),
> > +            "<NullProtocol name='Steve' runstate=IDLE>"
> > +        )
> > +
> > +
> > +class TestBase(avocado.Test):
> > +
> > +    def setUp(self):
> > +        self.proto = NullProtocol(type(self).__name__)
> > +        self.assertEqual(self.proto.runstate, Runstate.IDLE)
> > +        self.runstate_watcher = None
> > +
> > +    def tearDown(self):
> > +        self.assertEqual(self.proto.runstate, Runstate.IDLE)
> > +
> > +    async def _asyncSetUp(self):
> > +        pass
> > +
> > +    async def _asyncTearDown(self):
> > +        if self.runstate_watcher:
> > +            await self.runstate_watcher
> > +
> > +    def _asyncRunner(self, test_coroutine):
> > +        async def coroutine():
> > +            await self._asyncSetUp()
> > +            await test_coroutine
> > +            await self._asyncTearDown()
> > +
> > +        asyncio_run(coroutine(), debug=True)
> > +
> > +    # Definitions
> > +
> > +    # The states we expect a "bad" connect/accept attempt to transition
> through
> > +    BAD_CONNECTION_STATES = (
> > +        Runstate.CONNECTING,
> > +        Runstate.DISCONNECTING,
> > +        Runstate.IDLE,
> > +    )
> > +
> > +    # The states we expect a "good" session to transition through
> > +    GOOD_CONNECTION_STATES = (
> > +        Runstate.CONNECTING,
> > +        Runstate.RUNNING,
> > +        Runstate.DISCONNECTING,
> > +        Runstate.IDLE,
> > +    )
> > +
> > +    # Helpers
> > +
> > +    async def _watch_runstates(self, *states):
> > +        # This launches a task alongside most tests below to confirm
> that the
> > +        # sequence of runstate changes is exactly as anticipated.
> > +
> > +        async def _watcher():
> > +            for state in states:
> > +                new_state = await self.proto.runstate_changed()
> > +                self.assertEqual(
> > +                    new_state,
> > +                    state,
> > +                    msg=f"Expected state '{state.name}'",
> > +                )
> > +
> > +        self.runstate_watcher = create_task(_watcher())
> > +        # Kick the loop and force the task to block on the event.
> > +        await asyncio.sleep(0)
> > +
> > +
> > +class State(TestBase):
> > +
> > +    async def testSuperfluousDisconnect_(self):
> > +        await self._watch_runstates(
> > +            Runstate.DISCONNECTING,
> > +            Runstate.IDLE,
> > +        )
> > +        await self.proto.disconnect()
> > +
> > +    def testSuperfluousDisconnect(self):
> > +        self._asyncRunner(self.testSuperfluousDisconnect_())
> > +
> > +
> > +class Connect(TestBase):
> > +
> > +    async def _bad_connection(self, family: str):
> > +        assert family in ('INET', 'UNIX')
> > +
> > +        if family == 'INET':
> > +            await self.proto.connect(('127.0.0.1', 0))
> > +        elif family == 'UNIX':
> > +            await self.proto.connect('/dev/null')
> > +
> > +    async def _hanging_connection(self):
> > +        with jammed_socket() as addr:
> > +            await self.proto.connect(addr)
> > +
> > +    async def _bad_connection_test(self, family: str):
> > +        await self._watch_runstates(*self.BAD_CONNECTION_STATES)
> > +
> > +        with self.assertRaises(ConnectError) as context:
> > +            await self._bad_connection(family)
> > +
> > +        self.assertIsInstance(context.exception.exc, OSError)
> > +        self.assertEqual(
> > +            context.exception.error_message,
> > +            "Failed to establish connection"
> > +        )
> > +
> > +    def testBadINET(self):
> > +        self._asyncRunner(self._bad_connection_test('INET'))
> > +        # self.assertIsInstance(err.exc, ConnectionRefusedError)
> > +
> > +    def testBadUNIX(self):
> > +        self._asyncRunner(self._bad_connection_test('UNIX'))
> > +        # self.assertIsInstance(err.exc, ConnectionRefusedError)
> > +
> > +    async def testCancellation_(self):
> > +        # Note that accept() cannot be cancelled outright, as it isn't
> a task.
> > +        # However, we can wrap it in a task and cancel *that*.
> > +        await self._watch_runstates(*self.BAD_CONNECTION_STATES)
> > +        task = run_as_task(self._hanging_connection(),
> allow_cancellation=True)
> > +
> > +        state = await self.proto.runstate_changed()
> > +        self.assertEqual(state, Runstate.CONNECTING)
> > +
> > +        # This is insider baseball, but the connection attempt has
> > +        # yielded *just* before the actual connection attempt, so kick
> > +        # the loop to make sure it's truly wedged.
> > +        await asyncio.sleep(0)
> > +
> > +        task.cancel()
> > +        await task
> > +
> > +    def testCancellation(self):
> > +        self._asyncRunner(self.testCancellation_())
> > +
> > +    async def testTimeout_(self):
> > +        await self._watch_runstates(*self.BAD_CONNECTION_STATES)
> > +        task = run_as_task(self._hanging_connection())
> > +
> > +        # More insider baseball: to improve the speed of this test while
> > +        # guaranteeing that the connection even gets a chance to start,
> > +        # verify that the connection hangs *first*, then await the
> > +        # result of the task with a nearly-zero timeout.
> > +
> > +        state = await self.proto.runstate_changed()
> > +        self.assertEqual(state, Runstate.CONNECTING)
> > +        await asyncio.sleep(0)
> > +
> > +        with self.assertRaises(asyncio.TimeoutError):
> > +            await asyncio.wait_for(task, timeout=0)
> > +
> > +    def testTimeout(self):
> > +        self._asyncRunner(self.testTimeout_())
> > +
> > +    async def testRequire_(self):
> > +        await self._watch_runstates(*self.BAD_CONNECTION_STATES)
> > +        task = run_as_task(self._hanging_connection(),
> allow_cancellation=True)
> > +
> > +        state = await self.proto.runstate_changed()
> > +        self.assertEqual(state, Runstate.CONNECTING)
> > +
> > +        with self.assertRaises(StateError) as context:
> > +            await self._bad_connection('UNIX')
> > +
> > +        self.assertEqual(
> > +            context.exception.error_message,
> > +            "NullProtocol is currently connecting."
> > +        )
> > +        self.assertEqual(context.exception.state, Runstate.CONNECTING)
> > +        self.assertEqual(context.exception.required, Runstate.IDLE)
> > +
> > +        task.cancel()
> > +        await task
> > +
> > +    def testRequire(self):
> > +        self._asyncRunner(self.testRequire_())
> > +
> > +    async def testImplicitRunstateInit_(self):
> > +        # This tests what happens if we do not wait on the
> > +        # runstate until AFTER we connect, i.e., connect()/accept()
> > +        # themselves initialize the runstate event. All of the above
> > +        # tests force the initialization by waiting on the runstate
> > +        # *first*.
> > +        task = run_as_task(self._hanging_connection(),
> allow_cancellation=True)
> > +
> > +        # Kick the loop to coerce the state change
> > +        await asyncio.sleep(0)
> > +        assert self.proto.runstate == Runstate.CONNECTING
> > +
> > +        # We already missed the transition to CONNECTING
> > +        await self._watch_runstates(Runstate.DISCONNECTING,
> Runstate.IDLE)
> > +
> > +        task.cancel()
> > +        await task
> > +
> > +    def testImplicitRunstateInit(self):
> > +        self._asyncRunner(self.testImplicitRunstateInit_())
> > +
> > +
> > +class Accept(Connect):
> > +
> > +    async def _bad_connection(self, family: str):
> > +        assert family in ('INET', 'UNIX')
> > +
> > +        if family == 'INET':
> > +            await self.proto.accept(('example.com', 1))
> > +        elif family == 'UNIX':
> > +            await self.proto.accept('/dev/null')
> > +
> > +    async def _hanging_connection(self):
> > +        with TemporaryDirectory(suffix='.aqmp') as tmpdir:
> > +            sock = os.path.join(tmpdir, type(self.proto).__name__ +
> ".sock")
> > +            await self.proto.accept(sock)
> > +
> > +
> > +class FakeSession(TestBase):
> > +
> > +    def setUp(self):
> > +        super().setUp()
> > +        self.proto.fake_session = True
> > +
> > +    async def _asyncSetUp(self):
> > +        await super()._asyncSetUp()
> > +        await self._watch_runstates(*self.GOOD_CONNECTION_STATES)
> > +
> > +    async def _asyncTearDown(self):
> > +        await self.proto.disconnect()
> > +        await super()._asyncTearDown()
> > +
> > +    ####
> > +
> > +    async def testFakeConnect_(self):
> > +        await self.proto.connect('/not/a/real/path')
> > +        self.assertEqual(self.proto.runstate, Runstate.RUNNING)
> > +
> > +    def testFakeConnect(self):
> > +        """Test the full state lifecycle (via connect) with a no-op
> session."""
> > +        self._asyncRunner(self.testFakeConnect_())
> > +
> > +    async def testFakeAccept_(self):
> > +        await self.proto.accept('/not/a/real/path')
> > +        self.assertEqual(self.proto.runstate, Runstate.RUNNING)
> > +
> > +    def testFakeAccept(self):
> > +        """Test the full state lifecycle (via accept) with a no-op
> session."""
> > +        self._asyncRunner(self.testFakeAccept_())
> > +
> > +    async def testFakeRecv_(self):
> > +        await self.proto.accept('/not/a/real/path')
> > +
> > +        logname = self.proto.logger.name
> > +        with self.assertLogs(logname, level='DEBUG') as context:
> > +            self.proto.trigger_input.set()
> > +            self.proto.trigger_input.clear()
> > +            await asyncio.sleep(0)  # Kick reader.
> > +
> > +        self.assertEqual(
> > +            context.output,
> > +            [f"DEBUG:{logname}:<-- None"],
> > +        )
> > +
> > +    def testFakeRecv(self):
> > +        """Test receiving a fake/null message."""
> > +        self._asyncRunner(self.testFakeRecv_())
> > +
> > +    async def testFakeSend_(self):
> > +        await self.proto.accept('/not/a/real/path')
> > +
> > +        logname = self.proto.logger.name
> > +        with self.assertLogs(logname, level='DEBUG') as context:
> > +            # Cheat: Send a Null message to nobody.
> > +            await self.proto.send_msg()
> > +            # Kick writer; awaiting on a queue.put isn't sufficient to
> yield.
> > +            await asyncio.sleep(0)
> > +
> > +        self.assertEqual(
> > +            context.output,
> > +            [f"DEBUG:{logname}:--> None"],
> > +        )
> > +
> > +    def testFakeSend(self):
> > +        """Test sending a fake/null message."""
> > +        self._asyncRunner(self.testFakeSend_())
> > +
> > +    async def _prod_session_api(
> > +            self,
> > +            current_state: Runstate,
> > +            error_message: str,
> > +            accept: bool = True
> > +    ):
> > +        with self.assertRaises(StateError) as context:
> > +            if accept:
> > +                await self.proto.accept('/not/a/real/path')
> > +            else:
> > +                await self.proto.connect('/not/a/real/path')
> > +
> > +        self.assertEqual(context.exception.error_message, error_message)
> > +        self.assertEqual(context.exception.state, current_state)
> > +        self.assertEqual(context.exception.required, Runstate.IDLE)
> > +
> > +    async def testAcceptRequireRunning_(self):
> > +        await self.proto.accept('/not/a/real/path')
> > +
> > +        await self._prod_session_api(
> > +            Runstate.RUNNING,
> > +            "NullProtocol is already connected and running.",
> > +            accept=True,
> > +        )
> > +
> > +    def testAcceptRequireRunning(self):
> > +        """Test that accept() cannot be called when Runstate=RUNNING"""
> > +        self._asyncRunner(self.testAcceptRequireRunning_())
> > +
> > +    async def testConnectRequireRunning_(self):
> > +        await self.proto.accept('/not/a/real/path')
> > +
> > +        await self._prod_session_api(
> > +            Runstate.RUNNING,
> > +            "NullProtocol is already connected and running.",
> > +            accept=False,
> > +        )
> > +
> > +    def testConnectRequireRunning(self):
> > +        """Test that connect() cannot be called when Runstate=RUNNING"""
> > +        self._asyncRunner(self.testConnectRequireRunning_())
> > +
> > +    async def testAcceptRequireDisconnecting_(self):
> > +        await self.proto.accept('/not/a/real/path')
> > +
> > +        # Cheat: force a disconnect.
> > +        await self.proto.simulate_disconnect()
> > +
> > +        await self._prod_session_api(
> > +            Runstate.DISCONNECTING,
> > +            ("NullProtocol is disconnecting."
> > +             " Call disconnect() to return to IDLE state."),
> > +            accept=True,
> > +        )
> > +
> > +    def testAcceptRequireDisconnecting(self):
> > +        """Test that accept() cannot be called when
> Runstate=DISCONNECTING"""
> > +        self._asyncRunner(self.testAcceptRequireDisconnecting_())
> > +
> > +    async def testConnectRequireDisconnecting_(self):
> > +        await self.proto.accept('/not/a/real/path')
> > +
> > +        # Cheat: force a disconnect.
> > +        await self.proto.simulate_disconnect()
> > +
> > +        await self._prod_session_api(
> > +            Runstate.DISCONNECTING,
> > +            ("NullProtocol is disconnecting."
> > +             " Call disconnect() to return to IDLE state."),
> > +            accept=False,
> > +        )
> > +
> > +    def testConnectRequireDisconnecting(self):
> > +        """Test that connect() cannot be called when
> Runstate=DISCONNECTING"""
> > +        self._asyncRunner(self.testConnectRequireDisconnecting_())
> > --
> > 2.31.1
>
> Besides that, I just would like to bring to the table that Avocado has
> now a basic support for coroutines as tests that might help here. IIUC,
> some of the boilerplate code (and duplicated methods) could be removed
> with this:
>
> https://github.com/avocado-framework/avocado/pull/4788
>
> In any case, I understand if the latest version is not an option here,
> so:
>
>
It's an option, it's time that is the harsh master.


> Reviewed-by: Beraldo Leal <bleal@redhat.com>
>
>
Thanks! I updated a few bits and pieces and added the other items to a list
of things to do "later".


> Thanks,
> --
> Beraldo
>
>
--js