Originally QMP is going throw these steps:
JSON Parser --> QMP Dispatcher --> Respond
/|\ (2) (3) |
(1) | \|/ (4)
+--------- main thread --------+
This patch does this:
JSON Parser QMP Dispatcher --> Respond
/|\ | /|\ (4) |
| | (2) | (3) | (5)
(1) | +-----> | \|/
+--------- main thread <-------+
So the parsing job and the dispatching job is isolated now. It gives us
a chance in following up patches to totally move the parser outside.
The isloation is done using one QEMUBH. Only one dispatcher QEMUBH is
used for all the monitors.
Signed-off-by: Peter Xu <peterx@redhat.com>
---
monitor.c | 156 +++++++++++++++++++++++++++++++++++++++++++++++++++++---------
1 file changed, 133 insertions(+), 23 deletions(-)
diff --git a/monitor.c b/monitor.c
index 7b76dff5ad..1e9a6cb6a5 100644
--- a/monitor.c
+++ b/monitor.c
@@ -208,10 +208,14 @@ struct Monitor {
mon_cmd_t *cmd_table;
QLIST_HEAD(,mon_fd_t) fds;
QTAILQ_ENTRY(Monitor) entry;
+ /* Input queue that hangs all the parsed QMP requests */
+ GQueue *qmp_requests;
};
struct MonitorGlobal {
IOThread *mon_io_thread;
+ /* Bottom half to dispatch the requests received from IO thread */
+ QEMUBH *qmp_dispatcher_bh;
};
static struct MonitorGlobal mon_global;
@@ -586,6 +590,7 @@ static void monitor_data_init(Monitor *mon, bool skip_flush,
mon->cmd_table = mon_cmds;
mon->skip_flush = skip_flush;
mon->use_io_thr = use_io_thr;
+ mon->qmp_requests = g_queue_new();
}
static void monitor_data_destroy(Monitor *mon)
@@ -597,6 +602,7 @@ static void monitor_data_destroy(Monitor *mon)
g_free(mon->rs);
QDECREF(mon->outbuf);
qemu_mutex_destroy(&mon->out_lock);
+ g_queue_free(mon->qmp_requests);
}
char *qmp_human_monitor_command(const char *command_line, bool has_cpu_index,
@@ -3861,29 +3867,31 @@ static void monitor_qmp_respond(Monitor *mon, QObject *rsp,
qobject_decref(rsp);
}
-static void handle_qmp_command(JSONMessageParser *parser, GQueue *tokens,
- void *opaque)
+struct QMPRequest {
+ /* Owner of the request */
+ Monitor *mon;
+ /* "id" field of the request */
+ QObject *id;
+ /* Request object to be handled */
+ QObject *req;
+};
+typedef struct QMPRequest QMPRequest;
+
+/*
+ * Dispatch one single QMP request. The function will free the req_obj
+ * and objects inside it before return.
+ */
+static void monitor_qmp_dispatch_one(QMPRequest *req_obj)
{
- QObject *req, *rsp = NULL, *id = NULL;
+ Monitor *mon, *old_mon;
+ QObject *req, *rsp = NULL, *id;
QDict *qdict = NULL;
- Monitor *mon = opaque, *old_mon;
- Error *err = NULL;
- req = json_parser_parse_err(tokens, NULL, &err);
- if (!req && !err) {
- /* json_parser_parse_err() sucks: can fail without setting @err */
- error_setg(&err, QERR_JSON_PARSING);
- }
- if (err) {
- goto err_out;
- }
+ req = req_obj->req;
+ mon = req_obj->mon;
+ id = req_obj->id;
- qdict = qobject_to_qdict(req);
- if (qdict) {
- id = qdict_get(qdict, "id");
- qobject_incref(id);
- qdict_del(qdict, "id");
- } /* else will fail qmp_dispatch() */
+ g_free(req_obj);
if (trace_event_get_state_backends(TRACE_HANDLE_QMP_COMMAND)) {
QString *req_json = qobject_to_json(req);
@@ -3894,7 +3902,7 @@ static void handle_qmp_command(JSONMessageParser *parser, GQueue *tokens,
old_mon = cur_mon;
cur_mon = mon;
- rsp = qmp_dispatch(cur_mon->qmp.commands, req);
+ rsp = qmp_dispatch(mon->qmp.commands, req);
cur_mon = old_mon;
@@ -3910,12 +3918,101 @@ static void handle_qmp_command(JSONMessageParser *parser, GQueue *tokens,
}
}
-err_out:
- monitor_qmp_respond(mon, rsp, err, id);
-
+ /* Respond if necessary */
+ monitor_qmp_respond(mon, rsp, NULL, id);
qobject_decref(req);
}
+/*
+ * Pop one QMP request from monitor queues, return NULL if not found.
+ * We are using round-robin fasion to pop the request, to avoid
+ * processing command only on a very busy monitor. To achieve that,
+ * when we processed one request on specific monitor, we put that
+ * monitor to the end of mon_list queue.
+ */
+static QMPRequest *monitor_qmp_requests_pop_one(void)
+{
+ QMPRequest *req_obj = NULL;
+ Monitor *mon;
+
+ qemu_mutex_lock(&monitor_lock);
+
+ QTAILQ_FOREACH(mon, &mon_list, entry) {
+ req_obj = g_queue_pop_head(mon->qmp_requests);
+ if (req_obj) {
+ break;
+ }
+ }
+
+ if (req_obj) {
+ /*
+ * We found one request on the monitor. Degrade this monitor's
+ * priority to lowest by re-inserting it to end of queue.
+ */
+ QTAILQ_REMOVE(&mon_list, mon, entry);
+ QTAILQ_INSERT_TAIL(&mon_list, mon, entry);
+ }
+
+ qemu_mutex_unlock(&monitor_lock);
+
+ return req_obj;
+}
+
+static void monitor_qmp_bh_dispatcher(void *data)
+{
+ QMPRequest *req_obj;
+
+ while (true) {
+ req_obj = monitor_qmp_requests_pop_one();
+ if (!req_obj) {
+ break;
+ }
+ monitor_qmp_dispatch_one(req_obj);
+ }
+}
+
+static void handle_qmp_command(JSONMessageParser *parser, GQueue *tokens,
+ void *opaque)
+{
+ QObject *req, *id = NULL;
+ QDict *qdict = NULL;
+ Monitor *mon = opaque;
+ Error *err = NULL;
+ QMPRequest *req_obj;
+
+ req = json_parser_parse_err(tokens, NULL, &err);
+ if (!req && !err) {
+ /* json_parser_parse_err() sucks: can fail without setting @err */
+ error_setg(&err, QERR_JSON_PARSING);
+ }
+ if (err) {
+ monitor_qmp_respond(mon, NULL, err, NULL);
+ qobject_decref(req);
+ }
+
+ qdict = qobject_to_qdict(req);
+ if (qdict) {
+ id = qdict_get(qdict, "id");
+ qobject_incref(id);
+ qdict_del(qdict, "id");
+ } /* else will fail qmp_dispatch() */
+
+ req_obj = g_new0(QMPRequest, 1);
+ req_obj->mon = mon;
+ req_obj->id = id;
+ req_obj->req = req;
+
+ /*
+ * Put the request to the end of queue so that requests will be
+ * handled in time order. Ownership for req_obj, req, id,
+ * etc. will be delivered to the handler side.
+ */
+ g_queue_push_tail(mon->qmp_requests, req_obj);
+
+ /* Kick the dispatcher routine */
+ qemu_bh_schedule(mon_global.qmp_dispatcher_bh);
+}
+
static void monitor_qmp_read(void *opaque, const uint8_t *buf, int size)
{
Monitor *mon = opaque;
@@ -4085,6 +4182,15 @@ static void monitor_io_thread_init(void)
* fetch the context we'll have that initialized.
*/
monitor_io_context_get();
+
+ /*
+ * This MUST be on main loop thread since we have commands that
+ * have assumption to be run on main loop thread (Yeah, we'd
+ * better remove this assumption in the future).
+ */
+ mon_global.qmp_dispatcher_bh = aio_bh_new(qemu_get_aio_context(),
+ monitor_qmp_bh_dispatcher,
+ NULL);
}
void monitor_init_globals(void)
@@ -4188,6 +4294,10 @@ void monitor_init(Chardev *chr, int flags)
static void monitor_io_thread_destroy(void)
{
+ /* QEMUBHs needs to be deleted before destroying the IOThread. */
+ qemu_bh_delete(mon_global.qmp_dispatcher_bh);
+ mon_global.qmp_dispatcher_bh = NULL;
+
iothread_destroy(mon_global.mon_io_thread);
mon_global.mon_io_thread = NULL;
}
--
2.13.5
On Fri, Sep 29, 2017 at 11:38:35AM +0800, Peter Xu wrote:
> Originally QMP is going throw these steps:
s/is going throw/goes through/
>
> JSON Parser --> QMP Dispatcher --> Respond
> /|\ (2) (3) |
> (1) | \|/ (4)
> +--------- main thread --------+
>
> This patch does this:
>
> JSON Parser QMP Dispatcher --> Respond
> /|\ | /|\ (4) |
> | | (2) | (3) | (5)
> (1) | +-----> | \|/
> +--------- main thread <-------+
>
> So the parsing job and the dispatching job is isolated now. It gives us
> a chance in following up patches to totally move the parser outside.
>
> The isloation is done using one QEMUBH. Only one dispatcher QEMUBH is
> used for all the monitors.
>
> Signed-off-by: Peter Xu <peterx@redhat.com>
> ---
> monitor.c | 156 +++++++++++++++++++++++++++++++++++++++++++++++++++++---------
> 1 file changed, 133 insertions(+), 23 deletions(-)
>
> diff --git a/monitor.c b/monitor.c
> index 7b76dff5ad..1e9a6cb6a5 100644
> --- a/monitor.c
> +++ b/monitor.c
> @@ -208,10 +208,14 @@ struct Monitor {
> mon_cmd_t *cmd_table;
> QLIST_HEAD(,mon_fd_t) fds;
> QTAILQ_ENTRY(Monitor) entry;
> + /* Input queue that hangs all the parsed QMP requests */
s/hangs/holds/
> +static void handle_qmp_command(JSONMessageParser *parser, GQueue *tokens,
> + void *opaque)
> +{
> + QObject *req, *id = NULL;
> + QDict *qdict = NULL;
> + Monitor *mon = opaque;
> + Error *err = NULL;
> + QMPRequest *req_obj;
> +
> + req = json_parser_parse_err(tokens, NULL, &err);
> + if (!req && !err) {
> + /* json_parser_parse_err() sucks: can fail without setting @err */
> + error_setg(&err, QERR_JSON_PARSING);
> + }
> + if (err) {
> + monitor_qmp_respond(mon, NULL, err, NULL);
> + qobject_decref(req);
Is there a return statement missing here?
> + }
> +
> + qdict = qobject_to_qdict(req);
> + if (qdict) {
> + id = qdict_get(qdict, "id");
> + qobject_incref(id);
> + qdict_del(qdict, "id");
> + } /* else will fail qmp_dispatch() */
> +
> + req_obj = g_new0(QMPRequest, 1);
> + req_obj->mon = mon;
> + req_obj->id = id;
> + req_obj->req = req;
> +
> + /*
> + * Put the request to the end of queue so that requests will be
> + * handled in time order. Ownership for req_obj, req, id,
> + * etc. will be delivered to the handler side.
> + */
> + g_queue_push_tail(mon->qmp_requests, req_obj);
> +
> + /* Kick the dispatcher routine */
> + qemu_bh_schedule(mon_global.qmp_dispatcher_bh);
How is thread-safety ensured when accessing qmp_requests?
On Thu, Oct 12, 2017 at 01:50:45PM +0100, Stefan Hajnoczi wrote:
> On Fri, Sep 29, 2017 at 11:38:35AM +0800, Peter Xu wrote:
> > Originally QMP is going throw these steps:
>
> s/is going throw/goes through/
Fixed.
>
> >
> > JSON Parser --> QMP Dispatcher --> Respond
> > /|\ (2) (3) |
> > (1) | \|/ (4)
> > +--------- main thread --------+
> >
> > This patch does this:
> >
> > JSON Parser QMP Dispatcher --> Respond
> > /|\ | /|\ (4) |
> > | | (2) | (3) | (5)
> > (1) | +-----> | \|/
> > +--------- main thread <-------+
> >
> > So the parsing job and the dispatching job is isolated now. It gives us
> > a chance in following up patches to totally move the parser outside.
> >
> > The isloation is done using one QEMUBH. Only one dispatcher QEMUBH is
> > used for all the monitors.
> >
> > Signed-off-by: Peter Xu <peterx@redhat.com>
> > ---
> > monitor.c | 156 +++++++++++++++++++++++++++++++++++++++++++++++++++++---------
> > 1 file changed, 133 insertions(+), 23 deletions(-)
> >
> > diff --git a/monitor.c b/monitor.c
> > index 7b76dff5ad..1e9a6cb6a5 100644
> > --- a/monitor.c
> > +++ b/monitor.c
> > @@ -208,10 +208,14 @@ struct Monitor {
> > mon_cmd_t *cmd_table;
> > QLIST_HEAD(,mon_fd_t) fds;
> > QTAILQ_ENTRY(Monitor) entry;
> > + /* Input queue that hangs all the parsed QMP requests */
>
> s/hangs/holds/
Fixed.
>
> > +static void handle_qmp_command(JSONMessageParser *parser, GQueue *tokens,
> > + void *opaque)
> > +{
> > + QObject *req, *id = NULL;
> > + QDict *qdict = NULL;
> > + Monitor *mon = opaque;
> > + Error *err = NULL;
> > + QMPRequest *req_obj;
> > +
> > + req = json_parser_parse_err(tokens, NULL, &err);
> > + if (!req && !err) {
> > + /* json_parser_parse_err() sucks: can fail without setting @err */
> > + error_setg(&err, QERR_JSON_PARSING);
> > + }
> > + if (err) {
> > + monitor_qmp_respond(mon, NULL, err, NULL);
> > + qobject_decref(req);
>
> Is there a return statement missing here?
Hmm... Very possible!
Fixed.
>
> > + }
> > +
> > + qdict = qobject_to_qdict(req);
> > + if (qdict) {
> > + id = qdict_get(qdict, "id");
> > + qobject_incref(id);
> > + qdict_del(qdict, "id");
> > + } /* else will fail qmp_dispatch() */
> > +
> > + req_obj = g_new0(QMPRequest, 1);
> > + req_obj->mon = mon;
> > + req_obj->id = id;
> > + req_obj->req = req;
> > +
> > + /*
> > + * Put the request to the end of queue so that requests will be
> > + * handled in time order. Ownership for req_obj, req, id,
> > + * etc. will be delivered to the handler side.
> > + */
> > + g_queue_push_tail(mon->qmp_requests, req_obj);
> > +
> > + /* Kick the dispatcher routine */
> > + qemu_bh_schedule(mon_global.qmp_dispatcher_bh);
>
> How is thread-safety ensured when accessing qmp_requests?
It's a GQueue. I assume GQueue is thread safe itself as long as
g_thread_init() is called?
Thanks,
--
Peter Xu
On Mon, Oct 16, 2017 at 03:50:39PM +0800, Peter Xu wrote:
> On Thu, Oct 12, 2017 at 01:50:45PM +0100, Stefan Hajnoczi wrote:
> > On Fri, Sep 29, 2017 at 11:38:35AM +0800, Peter Xu wrote:
> > > + qdict = qobject_to_qdict(req);
> > > + if (qdict) {
> > > + id = qdict_get(qdict, "id");
> > > + qobject_incref(id);
> > > + qdict_del(qdict, "id");
> > > + } /* else will fail qmp_dispatch() */
> > > +
> > > + req_obj = g_new0(QMPRequest, 1);
> > > + req_obj->mon = mon;
> > > + req_obj->id = id;
> > > + req_obj->req = req;
> > > +
> > > + /*
> > > + * Put the request to the end of queue so that requests will be
> > > + * handled in time order. Ownership for req_obj, req, id,
> > > + * etc. will be delivered to the handler side.
> > > + */
> > > + g_queue_push_tail(mon->qmp_requests, req_obj);
> > > +
> > > + /* Kick the dispatcher routine */
> > > + qemu_bh_schedule(mon_global.qmp_dispatcher_bh);
> >
> > How is thread-safety ensured when accessing qmp_requests?
>
> It's a GQueue. I assume GQueue is thread safe itself as long as
> g_thread_init() is called?
No, glib data structures are not automatically thread-safe unless the
documentation says so.
Here is the implementation where you can see that no locking is
performed:
void
g_queue_push_tail (GQueue *queue,
gpointer data)
{
g_return_if_fail (queue != NULL);
queue->tail = g_list_append (queue->tail, data);
if (queue->tail->next)
queue->tail = queue->tail->next;
else
queue->head = queue->tail;
queue->length++;
}
Stefan
On Wed, Oct 18, 2017 at 05:31:15PM +0200, Stefan Hajnoczi wrote:
> On Mon, Oct 16, 2017 at 03:50:39PM +0800, Peter Xu wrote:
> > On Thu, Oct 12, 2017 at 01:50:45PM +0100, Stefan Hajnoczi wrote:
> > > On Fri, Sep 29, 2017 at 11:38:35AM +0800, Peter Xu wrote:
> > > > + qdict = qobject_to_qdict(req);
> > > > + if (qdict) {
> > > > + id = qdict_get(qdict, "id");
> > > > + qobject_incref(id);
> > > > + qdict_del(qdict, "id");
> > > > + } /* else will fail qmp_dispatch() */
> > > > +
> > > > + req_obj = g_new0(QMPRequest, 1);
> > > > + req_obj->mon = mon;
> > > > + req_obj->id = id;
> > > > + req_obj->req = req;
> > > > +
> > > > + /*
> > > > + * Put the request to the end of queue so that requests will be
> > > > + * handled in time order. Ownership for req_obj, req, id,
> > > > + * etc. will be delivered to the handler side.
> > > > + */
> > > > + g_queue_push_tail(mon->qmp_requests, req_obj);
> > > > +
> > > > + /* Kick the dispatcher routine */
> > > > + qemu_bh_schedule(mon_global.qmp_dispatcher_bh);
> > >
> > > How is thread-safety ensured when accessing qmp_requests?
> >
> > It's a GQueue. I assume GQueue is thread safe itself as long as
> > g_thread_init() is called?
>
> No, glib data structures are not automatically thread-safe unless the
> documentation says so.
>
> Here is the implementation where you can see that no locking is
> performed:
>
> void
> g_queue_push_tail (GQueue *queue,
> gpointer data)
> {
> g_return_if_fail (queue != NULL);
>
> queue->tail = g_list_append (queue->tail, data);
> if (queue->tail->next)
> queue->tail = queue->tail->next;
> else
> queue->head = queue->tail;
> queue->length++;
> }
Oops. Yes then I possibly need a lock... Thanks.
I think maybe I can rename the Monitor.out_lock into a more general
name, then to use it as a per-monitor lock (instead of introducing
another one).
--
Peter Xu
On Thu, Oct 19, 2017 at 02:36:49PM +0800, Peter Xu wrote:
> On Wed, Oct 18, 2017 at 05:31:15PM +0200, Stefan Hajnoczi wrote:
> > On Mon, Oct 16, 2017 at 03:50:39PM +0800, Peter Xu wrote:
> > > On Thu, Oct 12, 2017 at 01:50:45PM +0100, Stefan Hajnoczi wrote:
> > > > On Fri, Sep 29, 2017 at 11:38:35AM +0800, Peter Xu wrote:
> > > > > + qdict = qobject_to_qdict(req);
> > > > > + if (qdict) {
> > > > > + id = qdict_get(qdict, "id");
> > > > > + qobject_incref(id);
> > > > > + qdict_del(qdict, "id");
> > > > > + } /* else will fail qmp_dispatch() */
> > > > > +
> > > > > + req_obj = g_new0(QMPRequest, 1);
> > > > > + req_obj->mon = mon;
> > > > > + req_obj->id = id;
> > > > > + req_obj->req = req;
> > > > > +
> > > > > + /*
> > > > > + * Put the request to the end of queue so that requests will be
> > > > > + * handled in time order. Ownership for req_obj, req, id,
> > > > > + * etc. will be delivered to the handler side.
> > > > > + */
> > > > > + g_queue_push_tail(mon->qmp_requests, req_obj);
> > > > > +
> > > > > + /* Kick the dispatcher routine */
> > > > > + qemu_bh_schedule(mon_global.qmp_dispatcher_bh);
> > > >
> > > > How is thread-safety ensured when accessing qmp_requests?
> > >
> > > It's a GQueue. I assume GQueue is thread safe itself as long as
> > > g_thread_init() is called?
> >
> > No, glib data structures are not automatically thread-safe unless the
> > documentation says so.
> >
> > Here is the implementation where you can see that no locking is
> > performed:
> >
> > void
> > g_queue_push_tail (GQueue *queue,
> > gpointer data)
> > {
> > g_return_if_fail (queue != NULL);
> >
> > queue->tail = g_list_append (queue->tail, data);
> > if (queue->tail->next)
> > queue->tail = queue->tail->next;
> > else
> > queue->head = queue->tail;
> > queue->length++;
> > }
>
> Oops. Yes then I possibly need a lock... Thanks.
>
> I think maybe I can rename the Monitor.out_lock into a more general
> name, then to use it as a per-monitor lock (instead of introducing
> another one).
Up to you. I don't remember the details of out_lock usage well enough
to know whether using the lock for the queues is good or bad.
Stefan
On 19/10/2017 15:13, Stefan Hajnoczi wrote: > Up to you. I don't remember the details of out_lock usage well enough > to know whether using the lock for the queues is good or bad. out_lock is called like that because it's only writes that can be done from multiple threads (at both the chardev and monitor level). IOThreads in particular want to generate QMP events. out_lock protects the monitor's own output buffer, while the character device layer has its own locking (chr_write_lock). chr_write_lock protects calls to the chr_write method of the Chardev object and writes to the character device's logfd. Renaming out_lock and reusing it is just fine if the lock is only held for short periods of time. But maybe it's simpler to just introduce another lock, since the out_lock rules are very simple. Paolo
On Fri, Oct 20, 2017 at 11:19:44AM +0200, Paolo Bonzini wrote: > On 19/10/2017 15:13, Stefan Hajnoczi wrote: > > Up to you. I don't remember the details of out_lock usage well enough > > to know whether using the lock for the queues is good or bad. > > out_lock is called like that because it's only writes that can be done > from multiple threads (at both the chardev and monitor level). > IOThreads in particular want to generate QMP events. > > out_lock protects the monitor's own output buffer, while the character > device layer has its own locking (chr_write_lock). chr_write_lock > protects calls to the chr_write method of the Chardev object and writes > to the character device's logfd. > > Renaming out_lock and reusing it is just fine if the lock is only held > for short periods of time. But maybe it's simpler to just introduce > another lock, since the out_lock rules are very simple. OK, let me use a new lock. Thanks for explaining! -- Peter Xu
© 2016 - 2025 Red Hat, Inc.