Originally QMP goes throw these steps:
JSON Parser --> QMP Dispatcher --> Respond
/|\ (2) (3) |
(1) | \|/ (4)
+--------- main thread --------+
This patch does this:
JSON Parser QMP Dispatcher --> Respond
/|\ | /|\ (4) |
| | (2) | (3) | (5)
(1) | +-----> | \|/
+--------- main thread <-------+
So the parsing job and the dispatching job is isolated now. It gives us
a chance in following up patches to totally move the parser outside.
The isloation is done using one QEMUBH. Only one dispatcher QEMUBH is
used for all the monitors.
Signed-off-by: Peter Xu <peterx@redhat.com>
---
monitor.c | 179 ++++++++++++++++++++++++++++++++++++++++++++++++++++++--------
1 file changed, 157 insertions(+), 22 deletions(-)
diff --git a/monitor.c b/monitor.c
index 6ac1b2065d..c20e659740 100644
--- a/monitor.c
+++ b/monitor.c
@@ -168,6 +168,10 @@ typedef struct {
*/
QmpCommandList *commands;
bool qmp_caps[QMP_CAPABILITY__MAX];
+ /* Protects qmp request/response queue. */
+ QemuMutex qmp_queue_lock;
+ /* Input queue that holds all the parsed QMP requests */
+ GQueue *qmp_requests;
} MonitorQMP;
/*
@@ -213,6 +217,8 @@ struct Monitor {
struct MonitorGlobal {
IOThread *mon_iothread;
+ /* Bottom half to dispatch the requests received from IO thread */
+ QEMUBH *qmp_dispatcher_bh;
};
static struct MonitorGlobal mon_global;
@@ -582,11 +588,13 @@ static void monitor_data_init(Monitor *mon, bool skip_flush,
{
memset(mon, 0, sizeof(Monitor));
qemu_mutex_init(&mon->out_lock);
+ qemu_mutex_init(&mon->qmp.qmp_queue_lock);
mon->outbuf = qstring_new();
/* Use *mon_cmds by default. */
mon->cmd_table = mon_cmds;
mon->skip_flush = skip_flush;
mon->use_io_thr = use_io_thr;
+ mon->qmp.qmp_requests = g_queue_new();
}
static void monitor_data_destroy(Monitor *mon)
@@ -599,6 +607,8 @@ static void monitor_data_destroy(Monitor *mon)
g_free(mon->rs);
QDECREF(mon->outbuf);
qemu_mutex_destroy(&mon->out_lock);
+ qemu_mutex_destroy(&mon->qmp.qmp_queue_lock);
+ g_queue_free(mon->qmp.qmp_requests);
}
char *qmp_human_monitor_command(const char *command_line, bool has_cpu_index,
@@ -3907,29 +3917,31 @@ static void monitor_qmp_respond(Monitor *mon, QObject *rsp,
qobject_decref(rsp);
}
-static void handle_qmp_command(JSONMessageParser *parser, GQueue *tokens,
- void *opaque)
+struct QMPRequest {
+ /* Owner of the request */
+ Monitor *mon;
+ /* "id" field of the request */
+ QObject *id;
+ /* Request object to be handled */
+ QObject *req;
+};
+typedef struct QMPRequest QMPRequest;
+
+/*
+ * Dispatch one single QMP request. The function will free the req_obj
+ * and objects inside it before return.
+ */
+static void monitor_qmp_dispatch_one(QMPRequest *req_obj)
{
- QObject *req, *rsp = NULL, *id = NULL;
+ Monitor *mon, *old_mon;
+ QObject *req, *rsp = NULL, *id;
QDict *qdict = NULL;
- Monitor *mon = opaque, *old_mon;
- Error *err = NULL;
- req = json_parser_parse_err(tokens, NULL, &err);
- if (!req && !err) {
- /* json_parser_parse_err() sucks: can fail without setting @err */
- error_setg(&err, QERR_JSON_PARSING);
- }
- if (err) {
- goto err_out;
- }
+ req = req_obj->req;
+ mon = req_obj->mon;
+ id = req_obj->id;
- qdict = qobject_to_qdict(req);
- if (qdict) {
- id = qdict_get(qdict, "id");
- qobject_incref(id);
- qdict_del(qdict, "id");
- } /* else will fail qmp_dispatch() */
+ g_free(req_obj);
if (trace_event_get_state_backends(TRACE_HANDLE_QMP_COMMAND)) {
QString *req_json = qobject_to_json(req);
@@ -3940,7 +3952,7 @@ static void handle_qmp_command(JSONMessageParser *parser, GQueue *tokens,
old_mon = cur_mon;
cur_mon = mon;
- rsp = qmp_dispatch(cur_mon->qmp.commands, req);
+ rsp = qmp_dispatch(mon->qmp.commands, req);
cur_mon = old_mon;
@@ -3956,12 +3968,122 @@ static void handle_qmp_command(JSONMessageParser *parser, GQueue *tokens,
}
}
-err_out:
- monitor_qmp_respond(mon, rsp, err, id);
+ /* Respond if necessary */
+ monitor_qmp_respond(mon, rsp, NULL, id);
+
+ /* This pairs with the monitor_suspend() in handle_qmp_command(). */
+ if (!qmp_oob_enabled(mon)) {
+ monitor_resume(mon);
+ }
qobject_decref(req);
}
+/*
+ * Pop one QMP request from monitor queues, return NULL if not found.
+ * We are using round-robin fasion to pop the request, to avoid
+ * processing command only on a very busy monitor. To achieve that,
+ * when we processed one request on specific monitor, we put that
+ * monitor to the end of mon_list queue.
+ */
+static QMPRequest *monitor_qmp_requests_pop_one(void)
+{
+ QMPRequest *req_obj = NULL;
+ Monitor *mon;
+
+ qemu_mutex_lock(&monitor_lock);
+
+ QTAILQ_FOREACH(mon, &mon_list, entry) {
+ qemu_mutex_lock(&mon->qmp.qmp_queue_lock);
+ req_obj = g_queue_pop_head(mon->qmp.qmp_requests);
+ qemu_mutex_unlock(&mon->qmp.qmp_queue_lock);
+ if (req_obj) {
+ break;
+ }
+ }
+
+ if (req_obj) {
+ /*
+ * We found one request on the monitor. Degrade this monitor's
+ * priority to lowest by re-inserting it to end of queue.
+ */
+ QTAILQ_REMOVE(&mon_list, mon, entry);
+ QTAILQ_INSERT_TAIL(&mon_list, mon, entry);
+ }
+
+ qemu_mutex_unlock(&monitor_lock);
+
+ return req_obj;
+}
+
+static void monitor_qmp_bh_dispatcher(void *data)
+{
+ QMPRequest *req_obj;
+
+ while (true) {
+ req_obj = monitor_qmp_requests_pop_one();
+ if (!req_obj) {
+ break;
+ }
+ monitor_qmp_dispatch_one(req_obj);
+ }
+}
+
+static void handle_qmp_command(JSONMessageParser *parser, GQueue *tokens,
+ void *opaque)
+{
+ QObject *req, *id = NULL;
+ QDict *qdict = NULL;
+ Monitor *mon = opaque;
+ Error *err = NULL;
+ QMPRequest *req_obj;
+
+ req = json_parser_parse_err(tokens, NULL, &err);
+ if (!req && !err) {
+ /* json_parser_parse_err() sucks: can fail without setting @err */
+ error_setg(&err, QERR_JSON_PARSING);
+ }
+ if (err) {
+ monitor_qmp_respond(mon, NULL, err, NULL);
+ qobject_decref(req);
+ return;
+ }
+
+ qdict = qobject_to_qdict(req);
+ if (qdict) {
+ id = qdict_get(qdict, "id");
+ qobject_incref(id);
+ qdict_del(qdict, "id");
+ } /* else will fail qmp_dispatch() */
+
+ req_obj = g_new0(QMPRequest, 1);
+ req_obj->mon = mon;
+ req_obj->id = id;
+ req_obj->req = req;
+
+ /*
+ * Put the request to the end of queue so that requests will be
+ * handled in time order. Ownership for req_obj, req, id,
+ * etc. will be delivered to the handler side.
+ */
+ qemu_mutex_lock(&mon->qmp.qmp_queue_lock);
+ g_queue_push_tail(mon->qmp.qmp_requests, req_obj);
+ qemu_mutex_unlock(&mon->qmp.qmp_queue_lock);
+
+ /* Kick the dispatcher routine */
+ qemu_bh_schedule(mon_global.qmp_dispatcher_bh);
+
+ /*
+ * If OOB is not enabled on current monitor, we'll emulate the old
+ * behavior that we won't process current monitor any more until
+ * it is responded. This helps make sure that as long as OOB is
+ * not enabled, the server will never drop any command.
+ */
+ if (!qmp_oob_enabled(mon)) {
+ monitor_suspend(mon);
+ }
+}
+
static void monitor_qmp_read(void *opaque, const uint8_t *buf, int size)
{
Monitor *mon = opaque;
@@ -4144,6 +4266,15 @@ static void monitor_iothread_init(void)
{
mon_global.mon_iothread = iothread_create("mon_iothread",
&error_abort);
+
+ /*
+ * This MUST be on main loop thread since we have commands that
+ * have assumption to be run on main loop thread (Yeah, we'd
+ * better remove this assumption in the future).
+ */
+ mon_global.qmp_dispatcher_bh = aio_bh_new(qemu_get_aio_context(),
+ monitor_qmp_bh_dispatcher,
+ NULL);
}
void monitor_init_globals(void)
@@ -4267,6 +4398,10 @@ void monitor_cleanup(void)
}
qemu_mutex_unlock(&monitor_lock);
+ /* QEMUBHs needs to be deleted before destroying the IOThread. */
+ qemu_bh_delete(mon_global.qmp_dispatcher_bh);
+ mon_global.qmp_dispatcher_bh = NULL;
+
iothread_destroy(mon_global.mon_iothread);
mon_global.mon_iothread = NULL;
}
--
2.14.3
On Tue, Dec 05, 2017 at 01:51:50PM +0800, Peter Xu wrote:
> @@ -3956,12 +3968,122 @@ static void handle_qmp_command(JSONMessageParser *parser, GQueue *tokens,
> }
> }
>
> -err_out:
> - monitor_qmp_respond(mon, rsp, err, id);
> + /* Respond if necessary */
> + monitor_qmp_respond(mon, rsp, NULL, id);
> +
> + /* This pairs with the monitor_suspend() in handle_qmp_command(). */
> + if (!qmp_oob_enabled(mon)) {
> + monitor_resume(mon);
monitor_resume() does not work between threads: if the event loop is
currently blocked in poll() it won't notice that the monitor fd should
be watched again.
Please add aio_notify() to monitor_resume() and monitor_suspend(). That
way the event loop is forced to check can_read() again.
> + }
>
> qobject_decref(req);
> }
>
> +/*
> + * Pop one QMP request from monitor queues, return NULL if not found.
> + * We are using round-robin fasion to pop the request, to avoid
> + * processing command only on a very busy monitor. To achieve that,
> + * when we processed one request on specific monitor, we put that
> + * monitor to the end of mon_list queue.
> + */
> +static QMPRequest *monitor_qmp_requests_pop_one(void)
> +{
> + QMPRequest *req_obj = NULL;
> + Monitor *mon;
> +
> + qemu_mutex_lock(&monitor_lock);
> +
> + QTAILQ_FOREACH(mon, &mon_list, entry) {
> + qemu_mutex_lock(&mon->qmp.qmp_queue_lock);
Please add a doc comment about the monitor_lock < qmp_queue_lock lock
ordering in the qmp_queue_lock field declaration so that deadlocks can
be prevented.
> + req_obj = g_queue_pop_head(mon->qmp.qmp_requests);
> + qemu_mutex_unlock(&mon->qmp.qmp_queue_lock);
> + if (req_obj) {
> + break;
> + }
> + }
> +
> + if (req_obj) {
> + /*
> + * We found one request on the monitor. Degrade this monitor's
> + * priority to lowest by re-inserting it to end of queue.
> + */
> + QTAILQ_REMOVE(&mon_list, mon, entry);
> + QTAILQ_INSERT_TAIL(&mon_list, mon, entry);
> + }
> +
> + qemu_mutex_unlock(&monitor_lock);
> +
> + return req_obj;
> +}
> +
> +static void monitor_qmp_bh_dispatcher(void *data)
> +{
> + QMPRequest *req_obj;
> +
> + while (true) {
Previously QEMU could only dispatch 1 QMP command per main loop
iteration. Now multiple requests can be processed in a single main loop
iteration.
If a producer enqueues requests faster than the dispatcher can dequeue
them then this is infinite loop will prevent the caller (i.e. QEMU main
loop) from making progress.
The following keeps 1 QMP command per main loop iteration and avoids the
infinite loop:
static void monitor_qmp_bh_dispatcher(void *data)
{
QMPRequest *req_obj = monitor_qmp_requests_pop_one();
if (req_obj) {
monitor_qmp_dispatch_one(req_obj);
/* Reschedule instead of looping so the main loop stays responsive */
qemu_bh_schedule(mon_global.qmp_dispatcher_bh);
}
}
> + /*
> + * Put the request to the end of queue so that requests will be
> + * handled in time order. Ownership for req_obj, req, id,
> + * etc. will be delivered to the handler side.
> + */
> + qemu_mutex_lock(&mon->qmp.qmp_queue_lock);
> + g_queue_push_tail(mon->qmp.qmp_requests, req_obj);
> + qemu_mutex_unlock(&mon->qmp.qmp_queue_lock);
> +
> + /* Kick the dispatcher routine */
> + qemu_bh_schedule(mon_global.qmp_dispatcher_bh);
> +
> + /*
> + * If OOB is not enabled on current monitor, we'll emulate the old
> + * behavior that we won't process current monitor any more until
> + * it is responded. This helps make sure that as long as OOB is
> + * not enabled, the server will never drop any command.
> + */
> + if (!qmp_oob_enabled(mon)) {
> + monitor_suspend(mon);
> + }
monitor_suspend() must be called before g_queue_push_tail(). Otherwise
the other thread might complete the request and call monitor_resume()
before we call monitor_suspend().
On Wed, Dec 13, 2017 at 08:09:38PM +0000, Stefan Hajnoczi wrote:
> On Tue, Dec 05, 2017 at 01:51:50PM +0800, Peter Xu wrote:
> > @@ -3956,12 +3968,122 @@ static void handle_qmp_command(JSONMessageParser *parser, GQueue *tokens,
> > }
> > }
> >
> > -err_out:
> > - monitor_qmp_respond(mon, rsp, err, id);
> > + /* Respond if necessary */
> > + monitor_qmp_respond(mon, rsp, NULL, id);
> > +
> > + /* This pairs with the monitor_suspend() in handle_qmp_command(). */
> > + if (!qmp_oob_enabled(mon)) {
> > + monitor_resume(mon);
>
> monitor_resume() does not work between threads: if the event loop is
> currently blocked in poll() it won't notice that the monitor fd should
> be watched again.
>
> Please add aio_notify() to monitor_resume() and monitor_suspend(). That
> way the event loop is forced to check can_read() again.
Ah, yes. I think monitor_suspend() does not need the notify? Since
if it's sleeping it won't miss the next check in can_read() after all?
Regarding to monitor_resume(), I noticed that I missed the fact that
it's only tailored for HMP before, which seems to be a bigger problem.
Do you agree with a change like this?
diff --git a/monitor.c b/monitor.c
index 9970418d6f..8f96880ad7 100644
--- a/monitor.c
+++ b/monitor.c
@@ -4244,10 +4244,12 @@ int monitor_suspend(Monitor *mon)
void monitor_resume(Monitor *mon)
{
- if (!mon->rs)
- return;
if (atomic_dec_fetch(&mon->suspend_cnt) == 0) {
- readline_show_prompt(mon->rs);
+ if (monitor_is_qmp(mon)) {
+ aio_notify(mon_global.mon_iothread->ctx);
+ } else {
+ assert(mon->rs);
+ readline_show_prompt(mon->rs);
+ }
}
}
Even, I'm thinking about whether I should start to introduce
iothread_notify() now to mask out the IOThread.ctx details.
>
> > + }
> >
> > qobject_decref(req);
> > }
> >
> > +/*
> > + * Pop one QMP request from monitor queues, return NULL if not found.
> > + * We are using round-robin fasion to pop the request, to avoid
> > + * processing command only on a very busy monitor. To achieve that,
> > + * when we processed one request on specific monitor, we put that
> > + * monitor to the end of mon_list queue.
> > + */
> > +static QMPRequest *monitor_qmp_requests_pop_one(void)
> > +{
> > + QMPRequest *req_obj = NULL;
> > + Monitor *mon;
> > +
> > + qemu_mutex_lock(&monitor_lock);
> > +
> > + QTAILQ_FOREACH(mon, &mon_list, entry) {
> > + qemu_mutex_lock(&mon->qmp.qmp_queue_lock);
>
> Please add a doc comment about the monitor_lock < qmp_queue_lock lock
> ordering in the qmp_queue_lock field declaration so that deadlocks can
> be prevented.
Will do.
>
> > + req_obj = g_queue_pop_head(mon->qmp.qmp_requests);
> > + qemu_mutex_unlock(&mon->qmp.qmp_queue_lock);
> > + if (req_obj) {
> > + break;
> > + }
> > + }
> > +
> > + if (req_obj) {
> > + /*
> > + * We found one request on the monitor. Degrade this monitor's
> > + * priority to lowest by re-inserting it to end of queue.
> > + */
> > + QTAILQ_REMOVE(&mon_list, mon, entry);
> > + QTAILQ_INSERT_TAIL(&mon_list, mon, entry);
> > + }
> > +
> > + qemu_mutex_unlock(&monitor_lock);
> > +
> > + return req_obj;
> > +}
> > +
> > +static void monitor_qmp_bh_dispatcher(void *data)
> > +{
> > + QMPRequest *req_obj;
> > +
> > + while (true) {
>
> Previously QEMU could only dispatch 1 QMP command per main loop
> iteration. Now multiple requests can be processed in a single main loop
> iteration.
>
> If a producer enqueues requests faster than the dispatcher can dequeue
> them then this is infinite loop will prevent the caller (i.e. QEMU main
> loop) from making progress.
>
> The following keeps 1 QMP command per main loop iteration and avoids the
> infinite loop:
>
> static void monitor_qmp_bh_dispatcher(void *data)
> {
> QMPRequest *req_obj = monitor_qmp_requests_pop_one();
>
> if (req_obj) {
> monitor_qmp_dispatch_one(req_obj);
>
> /* Reschedule instead of looping so the main loop stays responsive */
> qemu_bh_schedule(mon_global.qmp_dispatcher_bh);
> }
> }
Yes, this sounds better!
>
> > + /*
> > + * Put the request to the end of queue so that requests will be
> > + * handled in time order. Ownership for req_obj, req, id,
> > + * etc. will be delivered to the handler side.
> > + */
> > + qemu_mutex_lock(&mon->qmp.qmp_queue_lock);
> > + g_queue_push_tail(mon->qmp.qmp_requests, req_obj);
> > + qemu_mutex_unlock(&mon->qmp.qmp_queue_lock);
> > +
> > + /* Kick the dispatcher routine */
> > + qemu_bh_schedule(mon_global.qmp_dispatcher_bh);
> > +
> > + /*
> > + * If OOB is not enabled on current monitor, we'll emulate the old
> > + * behavior that we won't process current monitor any more until
> > + * it is responded. This helps make sure that as long as OOB is
> > + * not enabled, the server will never drop any command.
> > + */
> > + if (!qmp_oob_enabled(mon)) {
> > + monitor_suspend(mon);
> > + }
>
> monitor_suspend() must be called before g_queue_push_tail(). Otherwise
> the other thread might complete the request and call monitor_resume()
> before we call monitor_suspend().
Reasonable. Thanks,
--
Peter Xu
On Sat, Dec 16, 2017 at 02:37:03PM +0800, Peter Xu wrote:
> On Wed, Dec 13, 2017 at 08:09:38PM +0000, Stefan Hajnoczi wrote:
> > On Tue, Dec 05, 2017 at 01:51:50PM +0800, Peter Xu wrote:
> > > @@ -3956,12 +3968,122 @@ static void handle_qmp_command(JSONMessageParser *parser, GQueue *tokens,
> > > }
> > > }
> > >
> > > -err_out:
> > > - monitor_qmp_respond(mon, rsp, err, id);
> > > + /* Respond if necessary */
> > > + monitor_qmp_respond(mon, rsp, NULL, id);
> > > +
> > > + /* This pairs with the monitor_suspend() in handle_qmp_command(). */
> > > + if (!qmp_oob_enabled(mon)) {
> > > + monitor_resume(mon);
> >
> > monitor_resume() does not work between threads: if the event loop is
> > currently blocked in poll() it won't notice that the monitor fd should
> > be watched again.
> >
> > Please add aio_notify() to monitor_resume() and monitor_suspend(). That
> > way the event loop is forced to check can_read() again.
>
> Ah, yes. I think monitor_suspend() does not need the notify? Since
> if it's sleeping it won't miss the next check in can_read() after all?
>
> Regarding to monitor_resume(), I noticed that I missed the fact that
> it's only tailored for HMP before, which seems to be a bigger problem.
> Do you agree with a change like this?
>
> diff --git a/monitor.c b/monitor.c
> index 9970418d6f..8f96880ad7 100644
> --- a/monitor.c
> +++ b/monitor.c
> @@ -4244,10 +4244,12 @@ int monitor_suspend(Monitor *mon)
>
> void monitor_resume(Monitor *mon)
> {
> - if (!mon->rs)
> - return;
> if (atomic_dec_fetch(&mon->suspend_cnt) == 0) {
> - readline_show_prompt(mon->rs);
> + if (monitor_is_qmp(mon)) {
> + aio_notify(mon_global.mon_iothread->ctx);
> + } else {
> + assert(mon->rs);
> + readline_show_prompt(mon->rs);
> + }
> }
> }
And monitor_suspend() needs to be touched too to support QMP, which I
missed for sure... I plan to add a new patch for this:
From d020db4529deccf6777f6b5a0f7ff9330d4de42c Mon Sep 17 00:00:00 2001
From: Peter Xu <peterx@redhat.com>
Date: Sat, 16 Dec 2017 14:42:04 +0800
Subject: [PATCH] monitor: let suspend/resume work even with QMPs
One thing to mention is that for QMPs that are using IOThreads, we need
an explicit kick for the IOThread in case it is sleeping.
Signed-off-by: Peter Xu <peterx@redhat.com>
---
monitor.c | 16 +++++++++++-----
1 file changed, 11 insertions(+), 5 deletions(-)
diff --git a/monitor.c b/monitor.c
index 9970418d6f..085146e84c 100644
--- a/monitor.c
+++ b/monitor.c
@@ -4236,18 +4236,24 @@ static void monitor_command_cb(void *opaque, const char *cmdline,
int monitor_suspend(Monitor *mon)
{
- if (!mon->rs)
- return -ENOTTY;
+ /* This will be effective in the next monitor_can_read() check */
atomic_inc(&mon->suspend_cnt);
return 0;
}
void monitor_resume(Monitor *mon)
{
- if (!mon->rs)
- return;
if (atomic_dec_fetch(&mon->suspend_cnt) == 0) {
- readline_show_prompt(mon->rs);
+ if (monitor_is_qmp(mon) && mon->use_io_thr) {
+ /*
+ * For QMP monitors that are running in IOThread, let's
+ * kick the thread in case it's sleeping.
+ */
+ aio_notify(mon_global.mon_iothread->ctx);
+ } else {
+ assert(mon->rs);
+ readline_show_prompt(mon->rs);
+ }
}
}
Thanks,
>
> Even, I'm thinking about whether I should start to introduce
> iothread_notify() now to mask out the IOThread.ctx details.
>
> >
> > > + }
> > >
> > > qobject_decref(req);
> > > }
> > >
> > > +/*
> > > + * Pop one QMP request from monitor queues, return NULL if not found.
> > > + * We are using round-robin fasion to pop the request, to avoid
> > > + * processing command only on a very busy monitor. To achieve that,
> > > + * when we processed one request on specific monitor, we put that
> > > + * monitor to the end of mon_list queue.
> > > + */
> > > +static QMPRequest *monitor_qmp_requests_pop_one(void)
> > > +{
> > > + QMPRequest *req_obj = NULL;
> > > + Monitor *mon;
> > > +
> > > + qemu_mutex_lock(&monitor_lock);
> > > +
> > > + QTAILQ_FOREACH(mon, &mon_list, entry) {
> > > + qemu_mutex_lock(&mon->qmp.qmp_queue_lock);
> >
> > Please add a doc comment about the monitor_lock < qmp_queue_lock lock
> > ordering in the qmp_queue_lock field declaration so that deadlocks can
> > be prevented.
>
> Will do.
>
> >
> > > + req_obj = g_queue_pop_head(mon->qmp.qmp_requests);
> > > + qemu_mutex_unlock(&mon->qmp.qmp_queue_lock);
> > > + if (req_obj) {
> > > + break;
> > > + }
> > > + }
> > > +
> > > + if (req_obj) {
> > > + /*
> > > + * We found one request on the monitor. Degrade this monitor's
> > > + * priority to lowest by re-inserting it to end of queue.
> > > + */
> > > + QTAILQ_REMOVE(&mon_list, mon, entry);
> > > + QTAILQ_INSERT_TAIL(&mon_list, mon, entry);
> > > + }
> > > +
> > > + qemu_mutex_unlock(&monitor_lock);
> > > +
> > > + return req_obj;
> > > +}
> > > +
> > > +static void monitor_qmp_bh_dispatcher(void *data)
> > > +{
> > > + QMPRequest *req_obj;
> > > +
> > > + while (true) {
> >
> > Previously QEMU could only dispatch 1 QMP command per main loop
> > iteration. Now multiple requests can be processed in a single main loop
> > iteration.
> >
> > If a producer enqueues requests faster than the dispatcher can dequeue
> > them then this is infinite loop will prevent the caller (i.e. QEMU main
> > loop) from making progress.
> >
> > The following keeps 1 QMP command per main loop iteration and avoids the
> > infinite loop:
> >
> > static void monitor_qmp_bh_dispatcher(void *data)
> > {
> > QMPRequest *req_obj = monitor_qmp_requests_pop_one();
> >
> > if (req_obj) {
> > monitor_qmp_dispatch_one(req_obj);
> >
> > /* Reschedule instead of looping so the main loop stays responsive */
> > qemu_bh_schedule(mon_global.qmp_dispatcher_bh);
> > }
> > }
>
> Yes, this sounds better!
>
> >
> > > + /*
> > > + * Put the request to the end of queue so that requests will be
> > > + * handled in time order. Ownership for req_obj, req, id,
> > > + * etc. will be delivered to the handler side.
> > > + */
> > > + qemu_mutex_lock(&mon->qmp.qmp_queue_lock);
> > > + g_queue_push_tail(mon->qmp.qmp_requests, req_obj);
> > > + qemu_mutex_unlock(&mon->qmp.qmp_queue_lock);
> > > +
> > > + /* Kick the dispatcher routine */
> > > + qemu_bh_schedule(mon_global.qmp_dispatcher_bh);
> > > +
> > > + /*
> > > + * If OOB is not enabled on current monitor, we'll emulate the old
> > > + * behavior that we won't process current monitor any more until
> > > + * it is responded. This helps make sure that as long as OOB is
> > > + * not enabled, the server will never drop any command.
> > > + */
> > > + if (!qmp_oob_enabled(mon)) {
> > > + monitor_suspend(mon);
> > > + }
> >
> > monitor_suspend() must be called before g_queue_push_tail(). Otherwise
> > the other thread might complete the request and call monitor_resume()
> > before we call monitor_suspend().
>
> Reasonable. Thanks,
>
> --
> Peter Xu
--
Peter Xu
On Sat, Dec 16, 2017 at 02:37:03PM +0800, Peter Xu wrote:
> On Wed, Dec 13, 2017 at 08:09:38PM +0000, Stefan Hajnoczi wrote:
> > On Tue, Dec 05, 2017 at 01:51:50PM +0800, Peter Xu wrote:
> > > @@ -3956,12 +3968,122 @@ static void handle_qmp_command(JSONMessageParser *parser, GQueue *tokens,
> > > }
> > > }
> > >
> > > -err_out:
> > > - monitor_qmp_respond(mon, rsp, err, id);
> > > + /* Respond if necessary */
> > > + monitor_qmp_respond(mon, rsp, NULL, id);
> > > +
> > > + /* This pairs with the monitor_suspend() in handle_qmp_command(). */
> > > + if (!qmp_oob_enabled(mon)) {
> > > + monitor_resume(mon);
> >
> > monitor_resume() does not work between threads: if the event loop is
> > currently blocked in poll() it won't notice that the monitor fd should
> > be watched again.
> >
> > Please add aio_notify() to monitor_resume() and monitor_suspend(). That
> > way the event loop is forced to check can_read() again.
>
> Ah, yes. I think monitor_suspend() does not need the notify? Since
> if it's sleeping it won't miss the next check in can_read() after all?
No, that would be a bug. Imagine the IOThread is blocked in poll
monitoring the chardev file descriptor when the main loop calls
monitor_suspend(). If the file descriptors becomes readable then the
handler function executes even though the monitor is supposed to be
suspended!
> Regarding to monitor_resume(), I noticed that I missed the fact that
> it's only tailored for HMP before, which seems to be a bigger problem.
> Do you agree with a change like this?
>
> diff --git a/monitor.c b/monitor.c
> index 9970418d6f..8f96880ad7 100644
> --- a/monitor.c
> +++ b/monitor.c
> @@ -4244,10 +4244,12 @@ int monitor_suspend(Monitor *mon)
>
> void monitor_resume(Monitor *mon)
> {
> - if (!mon->rs)
> - return;
> if (atomic_dec_fetch(&mon->suspend_cnt) == 0) {
> - readline_show_prompt(mon->rs);
> + if (monitor_is_qmp(mon)) {
> + aio_notify(mon_global.mon_iothread->ctx);
Please use iothread_get_aio_context() instead of accessing struct
members.
> + } else {
> + assert(mon->rs);
> + readline_show_prompt(mon->rs);
I haven't studied the HMP and ->rs code. I'll do that when reviewing
the next revision of this series.
> + }
> }
> }
>
> Even, I'm thinking about whether I should start to introduce
> iothread_notify() now to mask out the IOThread.ctx details.
aio_notify() is a low-level AioContext operation that is somewhat
bug-prone. I think it's better to leave it directly exposed so callers
have to think about what they are doing.
Stefan
On Sat, Dec 16, 2017 at 09:23:22AM +0000, Stefan Hajnoczi wrote:
> On Sat, Dec 16, 2017 at 02:37:03PM +0800, Peter Xu wrote:
> > On Wed, Dec 13, 2017 at 08:09:38PM +0000, Stefan Hajnoczi wrote:
> > > On Tue, Dec 05, 2017 at 01:51:50PM +0800, Peter Xu wrote:
> > > > @@ -3956,12 +3968,122 @@ static void handle_qmp_command(JSONMessageParser *parser, GQueue *tokens,
> > > > }
> > > > }
> > > >
> > > > -err_out:
> > > > - monitor_qmp_respond(mon, rsp, err, id);
> > > > + /* Respond if necessary */
> > > > + monitor_qmp_respond(mon, rsp, NULL, id);
> > > > +
> > > > + /* This pairs with the monitor_suspend() in handle_qmp_command(). */
> > > > + if (!qmp_oob_enabled(mon)) {
> > > > + monitor_resume(mon);
> > >
> > > monitor_resume() does not work between threads: if the event loop is
> > > currently blocked in poll() it won't notice that the monitor fd should
> > > be watched again.
> > >
> > > Please add aio_notify() to monitor_resume() and monitor_suspend(). That
> > > way the event loop is forced to check can_read() again.
> >
> > Ah, yes. I think monitor_suspend() does not need the notify? Since
> > if it's sleeping it won't miss the next check in can_read() after all?
>
> No, that would be a bug. Imagine the IOThread is blocked in poll
> monitoring the chardev file descriptor when the main loop calls
> monitor_suspend(). If the file descriptors becomes readable then the
> handler function executes even though the monitor is supposed to be
> suspended!
When you say "the handler function executes", do you mean the handler
that has already added to the qmp request queue, or the one that
hasn't yet parsed by the parser?
For the previous case (the handler that has queued already): IMHO
that's what we expect it to behave, say, when we call
monitor_suspend(), we only stop accepting and parsing new inputs from
the user, but the requests on the queue should still be processed.
For the latter (the handler of a newly typed command):
monitor_suspend() should suspend the parser already, so
monitor_can_read() check should fail, then that command should never
be queued until we call another monitor_resume().
>
> > Regarding to monitor_resume(), I noticed that I missed the fact that
> > it's only tailored for HMP before, which seems to be a bigger problem.
> > Do you agree with a change like this?
> >
> > diff --git a/monitor.c b/monitor.c
> > index 9970418d6f..8f96880ad7 100644
> > --- a/monitor.c
> > +++ b/monitor.c
> > @@ -4244,10 +4244,12 @@ int monitor_suspend(Monitor *mon)
> >
> > void monitor_resume(Monitor *mon)
> > {
> > - if (!mon->rs)
> > - return;
> > if (atomic_dec_fetch(&mon->suspend_cnt) == 0) {
> > - readline_show_prompt(mon->rs);
> > + if (monitor_is_qmp(mon)) {
> > + aio_notify(mon_global.mon_iothread->ctx);
>
> Please use iothread_get_aio_context() instead of accessing struct
> members.
Ah, yes!
>
> > + } else {
> > + assert(mon->rs);
> > + readline_show_prompt(mon->rs);
>
> I haven't studied the HMP and ->rs code. I'll do that when reviewing
> the next revision of this series.
Sure.
>
> > + }
> > }
> > }
> >
> > Even, I'm thinking about whether I should start to introduce
> > iothread_notify() now to mask out the IOThread.ctx details.
>
> aio_notify() is a low-level AioContext operation that is somewhat
> bug-prone. I think it's better to leave it directly exposed so callers
> have to think about what they are doing.
Sure. Thanks!
--
Peter Xu
On Mon, Dec 18, 2017 at 01:26:49PM +0800, Peter Xu wrote:
> On Sat, Dec 16, 2017 at 09:23:22AM +0000, Stefan Hajnoczi wrote:
> > On Sat, Dec 16, 2017 at 02:37:03PM +0800, Peter Xu wrote:
> > > On Wed, Dec 13, 2017 at 08:09:38PM +0000, Stefan Hajnoczi wrote:
> > > > On Tue, Dec 05, 2017 at 01:51:50PM +0800, Peter Xu wrote:
> > > > > @@ -3956,12 +3968,122 @@ static void handle_qmp_command(JSONMessageParser *parser, GQueue *tokens,
> > > > > }
> > > > > }
> > > > >
> > > > > -err_out:
> > > > > - monitor_qmp_respond(mon, rsp, err, id);
> > > > > + /* Respond if necessary */
> > > > > + monitor_qmp_respond(mon, rsp, NULL, id);
> > > > > +
> > > > > + /* This pairs with the monitor_suspend() in handle_qmp_command(). */
> > > > > + if (!qmp_oob_enabled(mon)) {
> > > > > + monitor_resume(mon);
> > > >
> > > > monitor_resume() does not work between threads: if the event loop is
> > > > currently blocked in poll() it won't notice that the monitor fd should
> > > > be watched again.
> > > >
> > > > Please add aio_notify() to monitor_resume() and monitor_suspend(). That
> > > > way the event loop is forced to check can_read() again.
> > >
> > > Ah, yes. I think monitor_suspend() does not need the notify? Since
> > > if it's sleeping it won't miss the next check in can_read() after all?
> >
> > No, that would be a bug. Imagine the IOThread is blocked in poll
> > monitoring the chardev file descriptor when the main loop calls
> > monitor_suspend(). If the file descriptors becomes readable then the
> > handler function executes even though the monitor is supposed to be
> > suspended!
>
> When you say "the handler function executes", do you mean the handler
> that has already added to the qmp request queue, or the one that
> hasn't yet parsed by the parser?
The chardev file descriptor handler function (the QMP parser).
> For the previous case (the handler that has queued already): IMHO
> that's what we expect it to behave, say, when we call
> monitor_suspend(), we only stop accepting and parsing new inputs from
> the user, but the requests on the queue should still be processed.
>
> For the latter (the handler of a newly typed command):
> monitor_suspend() should suspend the parser already, so
> monitor_can_read() check should fail, then that command should never
> be queued until we call another monitor_resume().
You are assuming that monitor_can_read() is called *after* poll()
returns. This is what I tried to explain in the previous reply.
The the monitor_can_read() function is called *before* the blocking
poll() syscall. If something changes the monitor_can_read() return
value, you must* kick the event loop to ensure that the event loop
reflects this change.
If you want to check how this works, see chardev/char-io.c for how
fd_can_read() is used.
* Currently monitor.c doesn't need to kick the event loop explicitly
because it runs within the main loop thread. Therefore the event loop
always calls monitor_can_read() again before entering poll().
On Mon, Dec 18, 2017 at 09:10:53AM +0000, Stefan Hajnoczi wrote:
> On Mon, Dec 18, 2017 at 01:26:49PM +0800, Peter Xu wrote:
> > On Sat, Dec 16, 2017 at 09:23:22AM +0000, Stefan Hajnoczi wrote:
> > > On Sat, Dec 16, 2017 at 02:37:03PM +0800, Peter Xu wrote:
> > > > On Wed, Dec 13, 2017 at 08:09:38PM +0000, Stefan Hajnoczi wrote:
> > > > > On Tue, Dec 05, 2017 at 01:51:50PM +0800, Peter Xu wrote:
> > > > > > @@ -3956,12 +3968,122 @@ static void handle_qmp_command(JSONMessageParser *parser, GQueue *tokens,
> > > > > > }
> > > > > > }
> > > > > >
> > > > > > -err_out:
> > > > > > - monitor_qmp_respond(mon, rsp, err, id);
> > > > > > + /* Respond if necessary */
> > > > > > + monitor_qmp_respond(mon, rsp, NULL, id);
> > > > > > +
> > > > > > + /* This pairs with the monitor_suspend() in handle_qmp_command(). */
> > > > > > + if (!qmp_oob_enabled(mon)) {
> > > > > > + monitor_resume(mon);
> > > > >
> > > > > monitor_resume() does not work between threads: if the event loop is
> > > > > currently blocked in poll() it won't notice that the monitor fd should
> > > > > be watched again.
> > > > >
> > > > > Please add aio_notify() to monitor_resume() and monitor_suspend(). That
> > > > > way the event loop is forced to check can_read() again.
> > > >
> > > > Ah, yes. I think monitor_suspend() does not need the notify? Since
> > > > if it's sleeping it won't miss the next check in can_read() after all?
> > >
> > > No, that would be a bug. Imagine the IOThread is blocked in poll
> > > monitoring the chardev file descriptor when the main loop calls
> > > monitor_suspend(). If the file descriptors becomes readable then the
> > > handler function executes even though the monitor is supposed to be
> > > suspended!
> >
> > When you say "the handler function executes", do you mean the handler
> > that has already added to the qmp request queue, or the one that
> > hasn't yet parsed by the parser?
>
> The chardev file descriptor handler function (the QMP parser).
>
> > For the previous case (the handler that has queued already): IMHO
> > that's what we expect it to behave, say, when we call
> > monitor_suspend(), we only stop accepting and parsing new inputs from
> > the user, but the requests on the queue should still be processed.
> >
> > For the latter (the handler of a newly typed command):
> > monitor_suspend() should suspend the parser already, so
> > monitor_can_read() check should fail, then that command should never
> > be queued until we call another monitor_resume().
>
> You are assuming that monitor_can_read() is called *after* poll()
> returns. This is what I tried to explain in the previous reply.
>
> The the monitor_can_read() function is called *before* the blocking
> poll() syscall. If something changes the monitor_can_read() return
> value, you must* kick the event loop to ensure that the event loop
> reflects this change.
>
> If you want to check how this works, see chardev/char-io.c for how
> fd_can_read() is used.
>
> * Currently monitor.c doesn't need to kick the event loop explicitly
> because it runs within the main loop thread. Therefore the event loop
> always calls monitor_can_read() again before entering poll().
Yeah I got it. I was assuming the check is done in check() of the
watcher, but obviously I was wrong. :(
Thanks for clarifying this. I'll do proper kicking.
--
Peter Xu
© 2016 - 2026 Red Hat, Inc.