From: Julien Grall <jgrall@amazon.com>
Currently, the restore code is considering the stream will contain at
most one in-flight request per connection. In a follow-up changes, we
will want to transfer multiple in-flight requests.
The function read_state_buffered() is now extended to restore multiple
in-flight request. Complete requests will be queued as delayed
requests, if there a partial request (only the last one can) then it
will used as the current in-flight request.
Note that we want to bypass the quota check for delayed requests as
the new Xenstore may have a lower limit.
Lastly, there is no need to change the specification as there was
no restriction on the number of in-flight requests preserved.
Signed-off-by: Julien Grall <jgrall@amazon.com>
---
tools/xenstore/xenstored_core.c | 56 ++++++++++++++++++++++++++++-----
1 file changed, 48 insertions(+), 8 deletions(-)
diff --git a/tools/xenstore/xenstored_core.c b/tools/xenstore/xenstored_core.c
index a5084a5b173d..5b7ab7f74013 100644
--- a/tools/xenstore/xenstored_core.c
+++ b/tools/xenstore/xenstored_core.c
@@ -1486,6 +1486,10 @@ static void process_message(struct connection *conn, struct buffered_data *in)
enum xsd_sockmsg_type type = in->hdr.msg.type;
int ret;
+ /* At least send_error() and send_reply() expects conn->in == in */
+ assert(conn->in == in);
+ trace_io(conn, in, 0);
+
if ((unsigned int)type >= XS_TYPE_COUNT || !wire_funcs[type].func) {
eprintf("Client unknown operation %i", type);
send_error(conn, ENOSYS);
@@ -1515,6 +1519,23 @@ static void process_message(struct connection *conn, struct buffered_data *in)
conn->transaction = NULL;
}
+static bool process_delayed_message(struct delayed_request *req)
+{
+ struct connection *conn = req->data;
+ struct buffered_data *saved_in = conn->in;
+
+ /*
+ * Part of process_message() expects conn->in to contains the
+ * processed response. So save the current conn->in and restore it
+ * afterwards.
+ */
+ conn->in = req->in;
+ process_message(req->data, req->in);
+ conn->in = saved_in;
+
+ return true;
+}
+
static void consider_message(struct connection *conn)
{
if (verbose)
@@ -1582,7 +1603,6 @@ static void handle_input(struct connection *conn)
if (in->used != in->hdr.msg.len)
return;
- trace_io(conn, in, 0);
consider_message(conn);
return;
@@ -2611,14 +2631,20 @@ void read_state_buffered_data(const void *ctx, struct connection *conn,
unsigned int len;
bool partial = sc->data_resp_len;
- if (sc->data_in_len) {
+ for (data = sc->data; data < sc->data + sc->data_in_len; data += len) {
bdata = new_buffer(conn);
if (!bdata)
barf("error restoring read data");
- if (sc->data_in_len < sizeof(bdata->hdr)) {
+
+ /*
+ * We don't know yet if there is more than one message
+ * to process. So the len is the size of the leftover data.
+ */
+ len = sc->data_in_len - (data - sc->data);
+ if (len < sizeof(bdata->hdr)) {
bdata->inhdr = true;
- memcpy(&bdata->hdr, sc->data, sc->data_in_len);
- bdata->used = sc->data_in_len;
+ memcpy(&bdata->hdr, sc->data, len);
+ bdata->used = len;
} else {
bdata->inhdr = false;
memcpy(&bdata->hdr, sc->data, sizeof(bdata->hdr));
@@ -2629,12 +2655,26 @@ void read_state_buffered_data(const void *ctx, struct connection *conn,
bdata->hdr.msg.len);
if (!bdata->buffer)
barf("Error allocating in buffer");
- bdata->used = sc->data_in_len - sizeof(bdata->hdr);
- memcpy(bdata->buffer, sc->data + sizeof(bdata->hdr),
+ bdata->used = min_t(unsigned int,
+ len - sizeof(bdata->hdr),
+ bdata->hdr.msg.len);
+ memcpy(bdata->buffer, data + sizeof(bdata->hdr),
bdata->used);
+ /* Update len to match the size of the message. */
+ len = bdata->used + sizeof(bdata->hdr);
}
- conn->in = bdata;
+ /*
+ * If the message is not complete, then it means this was
+ * the current processed message. All the other messages
+ * will be queued to be handled after restoring.
+ */
+ if (bdata->inhdr || bdata->used != bdata->hdr.msg.len) {
+ assert(conn->in == NULL);
+ conn->in = bdata;
+ } else if (delay_request(conn, bdata, process_delayed_message,
+ conn, true))
+ barf("Unable to delay the request");
}
for (data = sc->data + sc->data_in_len;
--
2.17.1
> On 16 Jun 2021, at 15:43, Julien Grall <julien@xen.org> wrote:
>
> From: Julien Grall <jgrall@amazon.com>
>
> Currently, the restore code is considering the stream will contain at
> most one in-flight request per connection. In a follow-up changes, we
> will want to transfer multiple in-flight requests.
>
> The function read_state_buffered() is now extended to restore multiple
> in-flight request. Complete requests will be queued as delayed
> requests, if there a partial request (only the last one can) then it
> will used as the current in-flight request.
>
> Note that we want to bypass the quota check for delayed requests as
> the new Xenstore may have a lower limit.
>
> Lastly, there is no need to change the specification as there was
> no restriction on the number of in-flight requests preserved.
>
> Signed-off-by: Julien Grall <jgrall@amazon.com>
Reviewed-by: Luca Fancellu <luca.fancellu@arm.com>
> ---
> tools/xenstore/xenstored_core.c | 56 ++++++++++++++++++++++++++++-----
> 1 file changed, 48 insertions(+), 8 deletions(-)
>
> diff --git a/tools/xenstore/xenstored_core.c b/tools/xenstore/xenstored_core.c
> index a5084a5b173d..5b7ab7f74013 100644
> --- a/tools/xenstore/xenstored_core.c
> +++ b/tools/xenstore/xenstored_core.c
> @@ -1486,6 +1486,10 @@ static void process_message(struct connection *conn, struct buffered_data *in)
> enum xsd_sockmsg_type type = in->hdr.msg.type;
> int ret;
>
> + /* At least send_error() and send_reply() expects conn->in == in */
> + assert(conn->in == in);
> + trace_io(conn, in, 0);
> +
> if ((unsigned int)type >= XS_TYPE_COUNT || !wire_funcs[type].func) {
> eprintf("Client unknown operation %i", type);
> send_error(conn, ENOSYS);
> @@ -1515,6 +1519,23 @@ static void process_message(struct connection *conn, struct buffered_data *in)
> conn->transaction = NULL;
> }
>
> +static bool process_delayed_message(struct delayed_request *req)
> +{
> + struct connection *conn = req->data;
> + struct buffered_data *saved_in = conn->in;
> +
> + /*
> + * Part of process_message() expects conn->in to contains the
> + * processed response. So save the current conn->in and restore it
> + * afterwards.
> + */
> + conn->in = req->in;
> + process_message(req->data, req->in);
> + conn->in = saved_in;
> +
> + return true;
> +}
> +
> static void consider_message(struct connection *conn)
> {
> if (verbose)
> @@ -1582,7 +1603,6 @@ static void handle_input(struct connection *conn)
> if (in->used != in->hdr.msg.len)
> return;
>
> - trace_io(conn, in, 0);
> consider_message(conn);
> return;
>
> @@ -2611,14 +2631,20 @@ void read_state_buffered_data(const void *ctx, struct connection *conn,
> unsigned int len;
> bool partial = sc->data_resp_len;
>
> - if (sc->data_in_len) {
> + for (data = sc->data; data < sc->data + sc->data_in_len; data += len) {
> bdata = new_buffer(conn);
> if (!bdata)
> barf("error restoring read data");
> - if (sc->data_in_len < sizeof(bdata->hdr)) {
> +
> + /*
> + * We don't know yet if there is more than one message
> + * to process. So the len is the size of the leftover data.
> + */
> + len = sc->data_in_len - (data - sc->data);
> + if (len < sizeof(bdata->hdr)) {
> bdata->inhdr = true;
> - memcpy(&bdata->hdr, sc->data, sc->data_in_len);
> - bdata->used = sc->data_in_len;
> + memcpy(&bdata->hdr, sc->data, len);
> + bdata->used = len;
> } else {
> bdata->inhdr = false;
> memcpy(&bdata->hdr, sc->data, sizeof(bdata->hdr));
> @@ -2629,12 +2655,26 @@ void read_state_buffered_data(const void *ctx, struct connection *conn,
> bdata->hdr.msg.len);
> if (!bdata->buffer)
> barf("Error allocating in buffer");
> - bdata->used = sc->data_in_len - sizeof(bdata->hdr);
> - memcpy(bdata->buffer, sc->data + sizeof(bdata->hdr),
> + bdata->used = min_t(unsigned int,
> + len - sizeof(bdata->hdr),
> + bdata->hdr.msg.len);
> + memcpy(bdata->buffer, data + sizeof(bdata->hdr),
> bdata->used);
> + /* Update len to match the size of the message. */
> + len = bdata->used + sizeof(bdata->hdr);
> }
>
> - conn->in = bdata;
> + /*
> + * If the message is not complete, then it means this was
> + * the current processed message. All the other messages
> + * will be queued to be handled after restoring.
> + */
> + if (bdata->inhdr || bdata->used != bdata->hdr.msg.len) {
> + assert(conn->in == NULL);
> + conn->in = bdata;
> + } else if (delay_request(conn, bdata, process_delayed_message,
> + conn, true))
> + barf("Unable to delay the request");
> }
>
> for (data = sc->data + sc->data_in_len;
> --
> 2.17.1
>
>
On 16.06.21 16:43, Julien Grall wrote:
> From: Julien Grall <jgrall@amazon.com>
>
> Currently, the restore code is considering the stream will contain at
> most one in-flight request per connection. In a follow-up changes, we
> will want to transfer multiple in-flight requests.
>
> The function read_state_buffered() is now extended to restore multiple
> in-flight request. Complete requests will be queued as delayed
> requests, if there a partial request (only the last one can) then it
> will used as the current in-flight request.
>
> Note that we want to bypass the quota check for delayed requests as
> the new Xenstore may have a lower limit.
>
> Lastly, there is no need to change the specification as there was
> no restriction on the number of in-flight requests preserved.
>
> Signed-off-by: Julien Grall <jgrall@amazon.com>
> ---
> tools/xenstore/xenstored_core.c | 56 ++++++++++++++++++++++++++++-----
> 1 file changed, 48 insertions(+), 8 deletions(-)
>
> diff --git a/tools/xenstore/xenstored_core.c b/tools/xenstore/xenstored_core.c
> index a5084a5b173d..5b7ab7f74013 100644
> --- a/tools/xenstore/xenstored_core.c
> +++ b/tools/xenstore/xenstored_core.c
> @@ -1486,6 +1486,10 @@ static void process_message(struct connection *conn, struct buffered_data *in)
> enum xsd_sockmsg_type type = in->hdr.msg.type;
> int ret;
>
> + /* At least send_error() and send_reply() expects conn->in == in */
> + assert(conn->in == in);
> + trace_io(conn, in, 0);
> +
> if ((unsigned int)type >= XS_TYPE_COUNT || !wire_funcs[type].func)
{
> eprintf("Client unknown operation %i", type);
> send_error(conn, ENOSYS);
> @@ -1515,6 +1519,23 @@ static void process_message(struct connection *conn, struct buffered_data *in)
> conn->transaction = NULL;
> }
>
> +static bool process_delayed_message(struct delayed_request *req)
> +{
> + struct connection *conn = req->data;
> + struct buffered_data *saved_in = conn->in;
> +
> + /*
> + * Part of process_message() expects conn->in to contains the
> + * processed response. So save the current conn->in and restore it
> + * afterwards.
> + */
> + conn->in = req->in;
> + process_message(req->data, req->in);
> + conn->in = saved_in;
This pattern was added to do_lu_start() already, and it will be needed
for each other function being called via call_delayed().
Maybe it would be better to add conn explicitly to struct
delayed_request (or even better: replace data with conn) and to do the
conn->in saving/setting/restoring in call_delayed() instead?
Other than that:
Reviewed-by: Juergen Gross <jgross@suse.com>
Juergen
Hi Juergen,
On 24/06/2021 10:30, Juergen Gross wrote:
> On 16.06.21 16:43, Julien Grall wrote:
>> From: Julien Grall <jgrall@amazon.com>
>>
>> Currently, the restore code is considering the stream will contain at
>> most one in-flight request per connection. In a follow-up changes, we
>> will want to transfer multiple in-flight requests.
>>
>> The function read_state_buffered() is now extended to restore multiple
>> in-flight request. Complete requests will be queued as delayed
>> requests, if there a partial request (only the last one can) then it
>> will used as the current in-flight request.
>>
>> Note that we want to bypass the quota check for delayed requests as
>> the new Xenstore may have a lower limit.
>>
>> Lastly, there is no need to change the specification as there was
>> no restriction on the number of in-flight requests preserved.
>>
>> Signed-off-by: Julien Grall <jgrall@amazon.com>
>> ---
>> tools/xenstore/xenstored_core.c | 56 ++++++++++++++++++++++++++++-----
>> 1 file changed, 48 insertions(+), 8 deletions(-)
>>
>> diff --git a/tools/xenstore/xenstored_core.c
>> b/tools/xenstore/xenstored_core.c
>> index a5084a5b173d..5b7ab7f74013 100644
>> --- a/tools/xenstore/xenstored_core.c
>> +++ b/tools/xenstore/xenstored_core.c
>> @@ -1486,6 +1486,10 @@ static void process_message(struct connection
>> *conn, struct buffered_data *in)
>> enum xsd_sockmsg_type type = in->hdr.msg.type;
>> int ret;
>> + /* At least send_error() and send_reply() expects conn->in == in */
>> + assert(conn->in == in);
>> + trace_io(conn, in, 0);
>> +
>> if ((unsigned int)type >= XS_TYPE_COUNT || !wire_funcs[type].func)
> {
>> eprintf("Client unknown operation %i", type);
>> send_error(conn, ENOSYS);
>> @@ -1515,6 +1519,23 @@ static void process_message(struct connection
>> *conn, struct buffered_data *in)
>> conn->transaction = NULL;
>> }
>> +static bool process_delayed_message(struct delayed_request *req)
>> +{
>> + struct connection *conn = req->data;
>> + struct buffered_data *saved_in = conn->in;
>> +
>> + /*
>> + * Part of process_message() expects conn->in to contains the
>> + * processed response. So save the current conn->in and restore it
>> + * afterwards.
>> + */
>> + conn->in = req->in;
>> + process_message(req->data, req->in);
>> + conn->in = saved_in;
>
> This pattern was added to do_lu_start() already, and it will be needed
> for each other function being called via call_delayed().
>
> Maybe it would be better to add conn explicitly to struct
> delayed_request (or even better: replace data with conn) and to do the
> conn->in saving/setting/restoring in call_delayed() instead?
This was my original approach, but I abandoned it because in the case of
do_lu_start() we need to save the original conn->in in the stream (see
patch #3 for more details).
If we overwrite conn->in in call_delayed(), then we need a different way
to find the original conn->in. I couldn't find a nice way and decided to
settle with the duplication.
Cheers,
--
Julien Grall
On 24.06.21 10:42, Julien Grall wrote:
> Hi Juergen,
>
> On 24/06/2021 10:30, Juergen Gross wrote:
>> On 16.06.21 16:43, Julien Grall wrote:
>>> From: Julien Grall <jgrall@amazon.com>
>>>
>>> Currently, the restore code is considering the stream will contain at
>>> most one in-flight request per connection. In a follow-up changes, we
>>> will want to transfer multiple in-flight requests.
>>>
>>> The function read_state_buffered() is now extended to restore multiple
>>> in-flight request. Complete requests will be queued as delayed
>>> requests, if there a partial request (only the last one can) then it
>>> will used as the current in-flight request.
>>>
>>> Note that we want to bypass the quota check for delayed requests as
>>> the new Xenstore may have a lower limit.
>>>
>>> Lastly, there is no need to change the specification as there was
>>> no restriction on the number of in-flight requests preserved.
>>>
>>> Signed-off-by: Julien Grall <jgrall@amazon.com>
>>> ---
>>> tools/xenstore/xenstored_core.c | 56 ++++++++++++++++++++++++++++-----
>>> 1 file changed, 48 insertions(+), 8 deletions(-)
>>>
>>> diff --git a/tools/xenstore/xenstored_core.c
>>> b/tools/xenstore/xenstored_core.c
>>> index a5084a5b173d..5b7ab7f74013 100644
>>> --- a/tools/xenstore/xenstored_core.c
>>> +++ b/tools/xenstore/xenstored_core.c
>>> @@ -1486,6 +1486,10 @@ static void process_message(struct connection
>>> *conn, struct buffered_data *in)
>>> enum xsd_sockmsg_type type = in->hdr.msg.type;
>>> int ret;
>>> + /* At least send_error() and send_reply() expects
conn->in == in */
>>> + assert(conn->in == in);
>>> + trace_io(conn, in, 0);
>>> +
>>> if ((unsigned int)type >= XS_TYPE_COUNT|| !wire_funcs[type].func)
>> {
>>> eprintf("Client unknown operation %i", type);
>>> send_error(conn, ENOSYS);
>>> @@ -1515,6 +1519,23 @@ static void process_message(struct connection
>>> *conn, struct buffered_data *in)
>>> conn->transaction = NULL;
>>> }
>>> +static bool process_delayed_message(struct delayed_request *req)
>>> +{
>>> + struct connection *conn = req->data;
>>> + struct buffered_data *saved_in = conn->in;
>>> +
>>> + /*
>>> + * Part of process_message() expects conn->in to contains the
>>> + * processed response. So save the current conn->in and restore it
>>> + * afterwards.
>>> + */
>>> + conn->in = req->in;
>>> + process_message(req->data, req->in);
>>> + conn->in = saved_in;
>>
>> This pattern was added to do_lu_start() already, and it will be needed
>> for each other function being called via call_delayed().
>>
>> Maybe it would be better to add conn explicitly to struct
>> delayed_request (or even better: replace data with conn) and to do the
>> conn->in saving/setting/restoring in call_delayed() instead?
>
> This was my original approach, but I abandoned it because in the case of
> do_lu_start() we need to save the original conn->in in the stream (see
> patch #3 for more details).
>
> If we overwrite conn->in in call_delayed(), then we need a different way
> to find the original conn->in. I couldn't find a nice way and decided to
> settle with the duplication.
Ah, okay, understood. Even in case we'd be able to restore conn->in it
would be the same amount of code again, but harder to follow.
Juergen
© 2016 - 2026 Red Hat, Inc.