[PATCH RFC 6/7] qmp_protocol: add QMP client implementation

John Snow posted 7 patches 4 years, 7 months ago
[PATCH RFC 6/7] qmp_protocol: add QMP client implementation
Posted by John Snow 4 years, 7 months ago
Using everything added so far, add the QMP client itself.

So far, this QMP object cannot actually pretend to be a server; it only
implements the client logic (receiving events and sending commands.)
Future work may involve implementing the ability to send events and
receive RPC commands, so that we can create a QMP test server for unit
test purposes.

(It can, however, both connect to or receive a connection from QEMU so
that it can be used to instrument iotests.)

Note: the event handling is a total hack; I need to figure out the most
delightful way to create an interface to consume these easily, as I
think it's one of the biggest shortcomings of the synchronous library so
far. Consider that part very much a work-in-progress.

Signed-off-by: John Snow <jsnow@redhat.com>
---
 qmp_protocol.py | 420 ++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 420 insertions(+)
 create mode 100644 qmp_protocol.py

diff --git a/qmp_protocol.py b/qmp_protocol.py
new file mode 100644
index 0000000..6e6ac25
--- /dev/null
+++ b/qmp_protocol.py
@@ -0,0 +1,420 @@
+"""
+QMP Client Implementation
+
+This module provides the QMP class, which can be used to connect and
+send commands to a QMP server such as QEMU. The QMP class can be used to
+either connect to a listening server, or used to listen and accept an
+incoming connection from the server.
+"""
+
+import asyncio
+import logging
+from typing import (
+    Awaitable,
+    Callable,
+    Dict,
+    List,
+    Mapping,
+    Optional,
+    Tuple,
+    cast,
+)
+
+from error import (
+    AQMPError,
+    DisconnectedError,
+    DeserializationError,
+    GreetingError,
+    NegotiationError,
+    StateError,
+    UnexpectedTypeError,
+)
+from message import (
+    Message,
+    ObjectTypeError,
+    ServerParseError,
+)
+from models import (
+    ErrorInfo,
+    ErrorResponse,
+    Greeting,
+    ParsingError,
+    ServerResponse,
+    SuccessResponse,
+)
+from protocol import AsyncProtocol
+from util import create_task, pretty_traceback
+
+
+class ExecuteError(AQMPError):
+    """Execution statement returned failure."""
+    def __init__(self,
+                 sent: Message,
+                 received: Message,
+                 error: ErrorInfo):
+        super().__init__()
+        self.sent = sent
+        self.received = received
+        self.error = error
+
+    def __str__(self) -> str:
+        return self.error.desc
+
+
+_EventCallbackFn = Callable[['QMP', Message], Awaitable[None]]
+
+
+class QMP(AsyncProtocol[Message]):
+    """
+    Implements a QMP connection to/from the server.
+
+    Basic usage looks like this::
+
+      qmp = QMP('my_virtual_machine_name')
+      await qmp.connect(('127.0.0.1', 1234))
+      ...
+      res = await qmp.execute('block-query')
+      ...
+      await qmp.disconnect()
+
+    :param name: Optional nickname for the connection, used for logging.
+    """
+    #: Logger object for debugging messages
+    logger = logging.getLogger(__name__)
+
+    def __init__(self, name: Optional[str] = None) -> None:
+        super().__init__(name)
+
+        # Greeting
+        self.await_greeting = True
+        self._greeting: Optional[Greeting] = None
+        self.greeting_timeout = 5  # (In seconds)
+
+        # RFC: Do I even want to use any timeouts internally? They're
+        # not defined in the protocol itself. Theoretically, a client
+        # could simply use asyncio.wait_for(qmp.connect(...), timeout=5)
+        # and then I don't have to support this interface at all.
+        #
+        # We don't need to support any timeouts so long as we never initiate
+        # any long-term wait that wasn't in direct response to a user action.
+
+        # Command ID counter
+        self._execute_id = 0
+
+        # Event handling
+        self._event_queue: asyncio.Queue[Message] = asyncio.Queue()
+        self._event_callbacks: List[_EventCallbackFn] = []
+
+        # Incoming RPC reply messages
+        self._pending: Dict[str, Tuple[
+            asyncio.Future[object],
+            asyncio.Queue[Message]]] = {}
+
+    def on_event(self, func: _EventCallbackFn) -> _EventCallbackFn:
+        """
+        FIXME: Quick hack: decorator to register event handlers.
+
+        Use it like this::
+
+          @qmp.on_event
+          async def my_event_handler(qmp, event: Message) -> None:
+            print(f"Received event: {event['event']}")
+
+        RFC: What kind of event handler would be the most useful in
+        practical terms? In tests, we are usually waiting for an
+        event with some criteria to occur; maybe it would be useful
+        to allow "coroutine" style functions where we can block
+        until a certain event shows up?
+        """
+        if func not in self._event_callbacks:
+            self._event_callbacks.append(func)
+        return func
+
+    async def _new_session(self, coro: Awaitable[None]) -> None:
+        self._event_queue = asyncio.Queue()
+        await super()._new_session(coro)
+
+    async def _on_connect(self) -> None:
+        """
+        Wait for the QMP greeting prior to the engagement of the full loop.
+
+        :raise: GreetingError when the greeting is not understood.
+        """
+        if self.await_greeting:
+            self._greeting = await self._get_greeting()
+
+    async def _on_start(self) -> None:
+        """
+        Perform QMP negotiation right after the loop starts.
+
+        Negotiation is performed afterwards so that the implementation
+        can simply use `execute()`, which relies on the loop machinery
+        to be running.
+
+        :raise: NegotiationError if the negotiation fails in some way.
+        """
+        await self._negotiate()
+
+    async def _get_greeting(self) -> Greeting:
+        """
+        :raise: GreetingError  (Many causes.)
+        """
+        self.logger.debug("Awaiting greeting ...")
+        try:
+            msg = await asyncio.wait_for(self._recv(), self.greeting_timeout)
+            return Greeting.parse_msg(msg)
+        except Exception as err:
+            if isinstance(err, (asyncio.TimeoutError, OSError, EOFError)):
+                emsg = "Failed to receive Greeting"
+            elif isinstance(err, (DeserializationError, UnexpectedTypeError)):
+                emsg = "Failed to understand Greeting"
+            elif isinstance(err, ObjectTypeError):
+                emsg = "Failed to validate Greeting"
+            else:
+                emsg = "Unknown failure acquiring Greeting"
+
+            self.logger.error("%s:\n%s\n", emsg, pretty_traceback())
+            raise GreetingError(emsg, err) from err
+
+    async def _negotiate(self) -> None:
+        """
+        :raise: NegotiationError  (Many causes.)
+        """
+        self.logger.debug("Negotiating capabilities ...")
+        arguments: Dict[str, List[str]] = {'enable': []}
+        if self._greeting and 'oob' in self._greeting.QMP.capabilities:
+            arguments['enable'].append('oob')
+        try:
+            await self.execute('qmp_capabilities', arguments=arguments)
+        except Exception as err:
+            # FIXME: what exceptions do we actually expect execute to raise?
+            emsg = "Failure negotiating capabilities"
+            self.logger.error("%s:\n%s\n", emsg, pretty_traceback())
+            raise NegotiationError(emsg, err) from err
+
+    async def _bh_disconnect(self) -> None:
+        # See AsyncProtocol._bh_disconnect().
+        await super()._bh_disconnect()
+
+        if self._pending:
+            self.logger.debug("Cancelling pending executions")
+        for key in self._pending:
+            self.logger.debug("Cancelling execution %s", key)
+            # NB: This signals cancellation, but doesn't fully quiesce;
+            # it merely requests the cancellation; it will be thrown into
+            # that tasks's context on the next event loop cycle.
+            #
+            # This task is being awaited on by `_execute()`, which will
+            # exist in the user's callstack in the upper-half. Since
+            # we're here, we know it isn't running! It won't have a
+            # chance to run again except to receive a cancellation.
+            #
+            # NB: Python 3.9 adds a msg= parameter to cancel that would
+            # be useful for debugging the 'cause' of cancellations.
+            self._pending[key][0].cancel()
+
+        self.logger.debug("QMP Disconnected.")
+
+    async def _on_message(self, msg: Message) -> None:
+        """
+        Add an incoming message to the appropriate queue/handler.
+
+        :raise: RawProtocolError     (`_recv` via `Message._deserialize`)
+        :raise: ServerParseError     (Message has no 'event' nor 'id' field)
+        """
+        # Incoming messages are not fully parsed/validated here;
+        # do only light peeking to know how to route the messages.
+
+        if 'event' in msg:
+            await self._event_queue.put(msg)
+            # FIXME: quick hack; event queue handling.
+            for func in self._event_callbacks:
+                await func(self, msg)
+            return
+
+        # Below, we assume everything left is an execute/exec-oob response.
+
+        if 'id' in msg:
+            exec_id = str(msg['id'])
+            if exec_id not in self._pending:
+                # qmp-spec.txt, section 2.4:
+                # 'Clients should drop all the responses
+                #  that have an unknown "id" field.'
+                self.logger.warning("Unknown ID '%s', response dropped.",
+                                    exec_id)
+                return
+        else:
+            # This is a server parsing error;
+            # It inherently does not "belong" to any pending execution.
+            # Instead of performing clever recovery, just terminate.
+            raise ServerParseError(
+                "Server sent a message without an ID,"
+                " indicating parse failure.", msg)
+
+        _, queue = self._pending[exec_id]
+        await queue.put(msg)
+
+    async def _do_recv(self) -> Message:
+        """
+        :raise: OSError            (Stream errors)
+        :raise: `EOFError`         (When the stream is at EOF)
+        :raise: `RawProtocolError` (via `Message._deserialize`)
+
+        :return: A single QMP `Message`.
+        """
+        msg_bytes = await self._readline()
+        msg = Message(msg_bytes, eager=True)
+        return msg
+
+    def _do_send(self, msg: Message) -> None:
+        """
+        :raise: ValueError  (JSON serialization failure)
+        :raise: TypeError   (JSON serialization failure)
+        :raise: OSError     (Stream errors)
+        """
+        assert self._writer is not None
+        self._writer.write(bytes(msg))
+
+    def _cleanup(self) -> None:
+        super()._cleanup()
+        self._greeting = None
+        assert self._pending == {}
+        self._event_queue = asyncio.Queue()
+
+    @classmethod
+    def make_execute_msg(cls, cmd: str,
+                         arguments: Optional[Mapping[str, object]] = None,
+                         oob: bool = False) -> Message:
+        """
+        Create an executable message to be sent by `execute_msg` later.
+
+        :param cmd: QMP command name.
+        :param arguments: Arguments (if any). Must be JSON-serializable.
+        :param oob: If true, execute "out of band".
+
+        :return: An executable QMP message.
+        """
+        msg = Message({'exec-oob' if oob else 'execute': cmd})
+        if arguments is not None:
+            msg['arguments'] = arguments
+        return msg
+
+    async def _bh_execute(self, msg: Message,
+                          queue: 'asyncio.Queue[Message]') -> object:
+        """
+        Execute a QMP Message and wait for the result.
+
+        :param msg: Message to execute.
+        :param queue: The queue we should expect to see a reply delivered to.
+
+        :return: Execution result from the server.
+                 The type depends on the command sent.
+        """
+        if not self.running:
+            raise StateError("QMP is not running.")
+        assert self._outgoing
+
+        self._outgoing.put_nowait(msg)
+        reply_msg = await queue.get()
+
+        # May raise ObjectTypeError (Unlikely - only if it has missing keys.)
+        reply = ServerResponse.parse_msg(reply_msg).__root__
+        assert not isinstance(reply, ParsingError)  # Handled by BH
+
+        if isinstance(reply, ErrorResponse):
+            # Server indicated execution failure.
+            raise ExecuteError(msg, reply_msg, reply.error)
+
+        assert isinstance(reply, SuccessResponse)
+        return reply.return_
+
+    async def _execute(self, msg: Message) -> object:
+        """
+        The same as `execute_msg()`, but without safety mechanisms.
+
+        Does not assign an execution ID and does not check that the form
+        of the message being sent is valid.
+
+        This method *Requires* an 'id' parameter to be set on the
+        message, it will not set one for you like `execute()` or
+        `execute_msg()`.
+
+        Do not use "__aqmp#00000" style IDs, use something else to avoid
+        potential clashes. If this ID clashes with an ID presently
+        in-use or otherwise clashes with the auto-generated IDs, the
+        response routing mechanisms in _on_message may very well fail
+        loudly enough to cause the entire loop to crash.
+
+        The ID should be a str; or at least something JSON
+        serializable. It *must* be hashable.
+        """
+        exec_id = cast(str, msg['id'])
+        self.logger.debug("Execute(%s): '%s'", exec_id,
+                          msg.get('execute', msg.get('exec-oob')))
+
+        queue: asyncio.Queue[Message] = asyncio.Queue(maxsize=1)
+        task = create_task(self._bh_execute(msg, queue))
+        self._pending[exec_id] = (task, queue)
+
+        try:
+            result = await task
+        except asyncio.CancelledError as err:
+            raise DisconnectedError("Disconnected") from err
+        finally:
+            del self._pending[exec_id]
+
+        return result
+
+    async def execute_msg(self, msg: Message) -> object:
+        """
+        Execute a QMP message and return the response.
+
+        :param msg: The QMP `Message` to execute.
+        :raises: ValueError if the QMP `Message` does not have either the
+                 'execute' or 'exec-oob' fields set.
+        :raises: ExecuteError if the server returns an error response.
+        :raises: DisconnectedError if the connection was terminated early.
+
+        :return: Execution response from the server. The type of object depends
+                 on the command that was issued, though most return a dict.
+        """
+        if not ('execute' in msg or 'exec-oob' in msg):
+            raise ValueError("Requires 'execute' or 'exec-oob' message")
+        if self.disconnecting:
+            raise StateError("QMP is disconnecting/disconnected."
+                             " Call disconnect() to fully disconnect.")
+
+        # FIXME: Copy the message here, to avoid leaking the ID back out.
+
+        exec_id = f"__aqmp#{self._execute_id:05d}"
+        msg['id'] = exec_id
+        self._execute_id += 1
+
+        return await self._execute(msg)
+
+    async def execute(self, cmd: str,
+                      arguments: Optional[Mapping[str, object]] = None,
+                      oob: bool = False) -> object:
+        """
+        Execute a QMP command and return the response.
+
+        :param cmd: QMP command name.
+        :param arguments: Arguments (if any). Must be JSON-serializable.
+        :param oob: If true, execute "out of band".
+
+        :raise: ExecuteError if the server returns an error response.
+        :raise: DisconnectedError if the connection was terminated early.
+
+        :return: Execution response from the server. The type of object depends
+                 on the command that was issued, though most return a dict.
+        """
+        # Note: I designed arguments to be its own argument instead of
+        # kwparams so that we are able to add other modifiers that
+        # change execution parameters later on. A theoretical
+        # higher-level API that is generated against a particular QAPI
+        # Schema should generate function signatures the way we want at
+        # that point; modifying those commands to behave differently
+        # could be performed using context managers that alter the QMP
+        # loop for any commands that occur within that block.
+        msg = self.make_execute_msg(cmd, arguments, oob=oob)
+        return await self.execute_msg(msg)
-- 
2.30.2


Re: [PATCH RFC 6/7] qmp_protocol: add QMP client implementation
Posted by Stefan Hajnoczi 4 years, 7 months ago
On Tue, Apr 13, 2021 at 11:55:52AM -0400, John Snow wrote:
> +    async def _execute(self, msg: Message) -> object:
> +        """
> +        The same as `execute_msg()`, but without safety mechanisms.
> +
> +        Does not assign an execution ID and does not check that the form
> +        of the message being sent is valid.
> +
> +        This method *Requires* an 'id' parameter to be set on the
> +        message, it will not set one for you like `execute()` or
> +        `execute_msg()`.
> +
> +        Do not use "__aqmp#00000" style IDs, use something else to avoid
> +        potential clashes. If this ID clashes with an ID presently
> +        in-use or otherwise clashes with the auto-generated IDs, the
> +        response routing mechanisms in _on_message may very well fail
> +        loudly enough to cause the entire loop to crash.
> +
> +        The ID should be a str; or at least something JSON
> +        serializable. It *must* be hashable.
> +        """
> +        exec_id = cast(str, msg['id'])
> +        self.logger.debug("Execute(%s): '%s'", exec_id,
> +                          msg.get('execute', msg.get('exec-oob')))
> +
> +        queue: asyncio.Queue[Message] = asyncio.Queue(maxsize=1)
> +        task = create_task(self._bh_execute(msg, queue))

We're already in a coroutine, can we await queue.get() ourselves instead
of creating a new task?

I guess this is done in order to use Task.cancel() in _bh_disconnect()
but it seems simpler to use queue both for success and cancellation.
Fewer tasks are easier to reason about.
Re: [PATCH RFC 6/7] qmp_protocol: add QMP client implementation
Posted by John Snow 4 years, 7 months ago
On 4/14/21 1:44 AM, Stefan Hajnoczi wrote:
> On Tue, Apr 13, 2021 at 11:55:52AM -0400, John Snow wrote:
>> +    async def _execute(self, msg: Message) -> object:
>> +        """
>> +        The same as `execute_msg()`, but without safety mechanisms.
>> +
>> +        Does not assign an execution ID and does not check that the form
>> +        of the message being sent is valid.
>> +
>> +        This method *Requires* an 'id' parameter to be set on the
>> +        message, it will not set one for you like `execute()` or
>> +        `execute_msg()`.
>> +
>> +        Do not use "__aqmp#00000" style IDs, use something else to avoid
>> +        potential clashes. If this ID clashes with an ID presently
>> +        in-use or otherwise clashes with the auto-generated IDs, the
>> +        response routing mechanisms in _on_message may very well fail
>> +        loudly enough to cause the entire loop to crash.
>> +
>> +        The ID should be a str; or at least something JSON
>> +        serializable. It *must* be hashable.
>> +        """
>> +        exec_id = cast(str, msg['id'])
>> +        self.logger.debug("Execute(%s): '%s'", exec_id,
>> +                          msg.get('execute', msg.get('exec-oob')))
>> +
>> +        queue: asyncio.Queue[Message] = asyncio.Queue(maxsize=1)
>> +        task = create_task(self._bh_execute(msg, queue))
> 
> We're already in a coroutine, can we await queue.get() ourselves instead
> of creating a new task?
> 
> I guess this is done in order to use Task.cancel() in _bh_disconnect()
> but it seems simpler to use queue both for success and cancellation.
> Fewer tasks are easier to reason about.
> 

...queues do not have a cancellation signal :( :( :( :(

There's no way to "cancel" a queue:
https://docs.python.org/3/library/asyncio-queue.html#queue

You *could* craft a special message and inject an exception into the 
queue to notify the reader that the message will never arrive, but it 
feels like working against the intended mechanism of that primitive. It 
really feels like it wants to be wrapped in a *task*.

An earlier draft used an approach where it crafted a special "mailbox" 
object, comprised of message, event, and error fields. The waiter sets 
up a mailbox and then blocks on the event. Upon being notified of an 
event, the caller checks to see if the message OR the error field was 
filled.

I wound up removing it, because I felt it added too much custom 
machinery/terminology and instead went with Tasks and a queue with a 
depth of one.

Both feel like they are working against the intended mechanisms to a 
degree. I am open to suggestions here!

(It's also worth noting that iotests will want the ability to separate 
the queueing of a message and the waiting for that message. The current 
design only allows for send-and-wait, and not separate send-then-wait 
semantics. Tasks do provide a rather convenient handle if I want to 
split that mechanism out.)

All of the above options are a little hacky to me. Any thoughts or 
preferences?

--js


Re: [PATCH RFC 6/7] qmp_protocol: add QMP client implementation
Posted by Stefan Hajnoczi 4 years, 7 months ago
On Wed, Apr 14, 2021 at 01:50:37PM -0400, John Snow wrote:
> On 4/14/21 1:44 AM, Stefan Hajnoczi wrote:
> > On Tue, Apr 13, 2021 at 11:55:52AM -0400, John Snow wrote:
> > > +    async def _execute(self, msg: Message) -> object:
> > > +        """
> > > +        The same as `execute_msg()`, but without safety mechanisms.
> > > +
> > > +        Does not assign an execution ID and does not check that the form
> > > +        of the message being sent is valid.
> > > +
> > > +        This method *Requires* an 'id' parameter to be set on the
> > > +        message, it will not set one for you like `execute()` or
> > > +        `execute_msg()`.
> > > +
> > > +        Do not use "__aqmp#00000" style IDs, use something else to avoid
> > > +        potential clashes. If this ID clashes with an ID presently
> > > +        in-use or otherwise clashes with the auto-generated IDs, the
> > > +        response routing mechanisms in _on_message may very well fail
> > > +        loudly enough to cause the entire loop to crash.
> > > +
> > > +        The ID should be a str; or at least something JSON
> > > +        serializable. It *must* be hashable.
> > > +        """
> > > +        exec_id = cast(str, msg['id'])
> > > +        self.logger.debug("Execute(%s): '%s'", exec_id,
> > > +                          msg.get('execute', msg.get('exec-oob')))
> > > +
> > > +        queue: asyncio.Queue[Message] = asyncio.Queue(maxsize=1)
> > > +        task = create_task(self._bh_execute(msg, queue))
> > 
> > We're already in a coroutine, can we await queue.get() ourselves instead
> > of creating a new task?
> > 
> > I guess this is done in order to use Task.cancel() in _bh_disconnect()
> > but it seems simpler to use queue both for success and cancellation.
> > Fewer tasks are easier to reason about.
> > 
> 
> ...queues do not have a cancellation signal :( :( :( :(
> 
> There's no way to "cancel" a queue:
> https://docs.python.org/3/library/asyncio-queue.html#queue
> 
> You *could* craft a special message and inject an exception into the queue
> to notify the reader that the message will never arrive, but it feels like
> working against the intended mechanism of that primitive. It really feels
> like it wants to be wrapped in a *task*.

That's what I meant by "it seems simpler to use the queue both for
success and cancellation". Just queue a message that says the execution
has been cancelled.

Stefan