[Qemu-devel] [PATCH v3 28/38] QmpSession: return orderly

Marc-André Lureau posted 38 patches 7 years, 10 months ago
[Qemu-devel] [PATCH v3 28/38] QmpSession: return orderly
Posted by Marc-André Lureau 7 years, 10 months ago
QEMU will gain support for asynchronous commands, and may thus finish
commands in various order. However, the clients expect replies in
order. Let's enforce ordering of replies in QmpReturn: starting from
the older command, process each pending QmpReturn, and return until
reaching one that is unfinished.

Or if the command is OOB, it should return immediately.

Signed-off-by: Marc-André Lureau <marcandre.lureau@redhat.com>
---
 include/qapi/qmp/dispatch.h |  1 +
 qapi/qmp-dispatch.c         | 59 ++++++++++++++++++++++++++++++-------
 tests/test-qmp-cmds.c       | 36 ++++++++++++++++++++++
 3 files changed, 85 insertions(+), 11 deletions(-)

diff --git a/include/qapi/qmp/dispatch.h b/include/qapi/qmp/dispatch.h
index 94a272a5fb..f6db06c164 100644
--- a/include/qapi/qmp/dispatch.h
+++ b/include/qapi/qmp/dispatch.h
@@ -56,6 +56,7 @@ struct QmpSession {
 struct QmpReturn {
     QmpSession *session;
     QDict *rsp;
+    bool oob;
     QTAILQ_ENTRY(QmpReturn) entry;
 };
 
diff --git a/qapi/qmp-dispatch.c b/qapi/qmp-dispatch.c
index 2c162642cb..aa8b71a2c0 100644
--- a/qapi/qmp-dispatch.c
+++ b/qapi/qmp-dispatch.c
@@ -24,6 +24,7 @@ QmpReturn *qmp_return_new(QmpSession *session, const QDict *req)
     QmpReturn *qret = g_new0(QmpReturn, 1);
     QObject *id = req ? qdict_get(req, "id") : NULL;
 
+    qret->oob = req ? qmp_is_oob(req) : false;
     qret->session = session;
     qret->rsp = qdict_new();
     if (id) {
@@ -38,6 +39,15 @@ QmpReturn *qmp_return_new(QmpSession *session, const QDict *req)
     return qret;
 }
 
+static void qmp_return_free_with_lock(QmpReturn *qret)
+{
+    if (qret->session) {
+        QTAILQ_REMOVE(&qret->session->pending, qret, entry);
+    }
+    QDECREF(qret->rsp);
+    g_free(qret);
+}
+
 void qmp_return_free(QmpReturn *qret)
 {
     QmpSession *session = qret->session;
@@ -45,21 +55,51 @@ void qmp_return_free(QmpReturn *qret)
     if (session) {
         qemu_mutex_lock(&session->pending_lock);
     }
-    QTAILQ_REMOVE(&session->pending, qret, entry);
+
+    qmp_return_free_with_lock(qret);
+
     if (session) {
         qemu_mutex_unlock(&session->pending_lock);
     }
-    QDECREF(qret->rsp);
-    g_free(qret);
+}
+
+static void qmp_return_orderly(QmpReturn *qret)
+{
+    QmpSession *session = qret->session;
+    QmpReturn *ret, *next;
+
+    if (!session) {
+        /* the client was destroyed before return, discard */
+        qmp_return_free(qret);
+        return;
+    }
+    if (qret->oob) {
+        session->return_cb(session, qret->rsp);
+        qmp_return_free(qret);
+        return;
+    }
+
+    /* mark as finished */
+    qret->session = NULL;
+
+    qemu_mutex_lock(&session->pending_lock);
+    /* process the list of pending and return until reaching an unfinshed */
+    QTAILQ_FOREACH_SAFE(ret, &session->pending, entry, next) {
+        if (ret->session) {
+            goto end;
+        }
+        session->return_cb(session, ret->rsp);
+        ret->session = session;
+        qmp_return_free_with_lock(ret);
+    }
+end:
+    qemu_mutex_unlock(&session->pending_lock);
 }
 
 void qmp_return(QmpReturn *qret, QObject *rsp)
 {
     qdict_put_obj(qret->rsp, "return", rsp ?: QOBJECT(qdict_new()));
-    if (qret->session) {
-        qret->session->return_cb(qret->session, qret->rsp);
-    }
-    qmp_return_free(qret);
+    qmp_return_orderly(qret);
 }
 
 void qmp_return_error(QmpReturn *qret, Error *err)
@@ -70,10 +110,7 @@ void qmp_return_error(QmpReturn *qret, Error *err)
     qdict_put_str(qdict, "desc", error_get_pretty(err));
     qdict_put_obj(qret->rsp, "error", QOBJECT(qdict));
     error_free(err);
-    if (qret->session) {
-        qret->session->return_cb(qret->session, qret->rsp);
-    }
-    qmp_return_free(qret);
+    qmp_return_orderly(qret);
 }
 
 QDict *qmp_dispatch_check_obj(const QObject *request, Error **errp)
diff --git a/tests/test-qmp-cmds.c b/tests/test-qmp-cmds.c
index 5246650f71..eeaac03d50 100644
--- a/tests/test-qmp-cmds.c
+++ b/tests/test-qmp-cmds.c
@@ -309,6 +309,41 @@ static void test_dispatch_cmd_id(void)
     qmp_session_destroy(&session);
 }
 
+typedef struct QmpReturnOrderly {
+    QmpSession session;
+    int returns;
+} QmpReturnOrderly;
+
+static void dispatch_return_orderly(QmpSession *session, QDict *resp)
+{
+    QmpReturnOrderly *o = container_of(session, QmpReturnOrderly, session);
+
+    o->returns++;
+}
+
+static void test_qmp_return_orderly(void)
+{
+    QDict *dict = qdict_new();
+    QDict *ctrl = qdict_new();
+    QmpReturnOrderly o = { 0, };
+    QmpReturn *r1, *r2, *r3;
+
+    qmp_session_init(&o.session, &qmp_commands,
+                     qmp_dispatch, dispatch_return_orderly);
+    r1 = qmp_return_new(&o.session, NULL);
+    qdict_put_bool(ctrl, "run-oob", true);
+    qdict_put_obj(dict, "control", QOBJECT(ctrl));
+    r2 = qmp_return_new(&o.session, dict);
+    r3 = qmp_return_new(&o.session, NULL);
+    qmp_return(r3, NULL);
+    g_assert_cmpint(o.returns, ==, 0);
+    qmp_return(r2, NULL);
+    g_assert_cmpint(o.returns, ==, 1);
+    qmp_return(r1, NULL);
+    g_assert_cmpint(o.returns, ==, 3);
+    qmp_session_destroy(&o.session);
+    QDECREF(dict);
+}
 
 int main(int argc, char **argv)
 {
@@ -322,6 +357,7 @@ int main(int argc, char **argv)
     g_test_add_func("/qmp/dispatch_cmd_id", test_dispatch_cmd_id);
     g_test_add_func("/qmp/dealloc_types", test_dealloc_types);
     g_test_add_func("/qmp/dealloc_partial", test_dealloc_partial);
+    g_test_add_func("/qmp/return_orderly", test_qmp_return_orderly);
 
     test_qmp_init_marshal(&qmp_commands);
     g_test_run();
-- 
2.17.0.rc1.1.g4c4f2b46a3