[PATCH] mini-os: xenbus: support large messages

Juergen Gross posted 1 patch 2 years, 8 months ago
Failed in applying to current master (apply log)
xenbus/xenbus.c | 212 ++++++++++++++++++++++++++++--------------------
1 file changed, 124 insertions(+), 88 deletions(-)
[PATCH] mini-os: xenbus: support large messages
Posted by Juergen Gross 2 years, 8 months ago
Today the implementation of the xenbus protocol in Mini-OS will only
allow to transfer the complete message to or from the ring page buffer.
This is limiting the maximum message size to lower values as the xenbus
protocol normally would allow.

Change that by allowing to transfer the xenbus message in chunks as
soon as they are available.

Avoid crashing Mini-OS in case of illegal data read from the ring
buffer.

Signed-off-by: Juergen Gross <jgross@suse.com>
---
 xenbus/xenbus.c | 212 ++++++++++++++++++++++++++++--------------------
 1 file changed, 124 insertions(+), 88 deletions(-)

diff --git a/xenbus/xenbus.c b/xenbus/xenbus.c
index 23de61e..3fbb122 100644
--- a/xenbus/xenbus.c
+++ b/xenbus/xenbus.c
@@ -29,6 +29,7 @@
 #include <xen/hvm/params.h>
 #include <mini-os/spinlock.h>
 #include <mini-os/xmalloc.h>
+#include <mini-os/semaphore.h>
 
 #define min(x,y) ({                       \
         typeof(x) tmpx = (x);                 \
@@ -46,6 +47,7 @@
 static struct xenstore_domain_interface *xenstore_buf;
 static DECLARE_WAIT_QUEUE_HEAD(xb_waitq);
 DECLARE_WAIT_QUEUE_HEAD(xenbus_watch_queue);
+static __DECLARE_SEMAPHORE_GENERIC(xb_write_sem, 1);
 
 xenbus_event_queue xenbus_events;
 static struct watch {
@@ -231,75 +233,105 @@ char *xenbus_wait_for_state_change(const char* path, XenbusState *state, xenbus_
 }
 
 
+static void xenbus_read_data(char *buf, unsigned int len)
+{
+    unsigned int off = 0;
+    unsigned int prod;
+    unsigned int size;
+    int notify;
+
+    while (off != len)
+    {
+        if (xenstore_buf->rsp_prod == xenstore_buf->rsp_cons)
+            wait_event(xb_waitq,
+                       xenstore_buf->rsp_prod != xenstore_buf->rsp_cons);
+
+        prod = xenstore_buf->rsp_prod;
+        DEBUG("Rsp_cons %d, rsp_prod %d.\n", xenstore_buf->rsp_cons, prod);
+        size = min(len - off, prod - xenstore_buf->rsp_cons);
+        memcpy_from_ring(xenstore_buf->rsp, buf + off,
+                         MASK_XENSTORE_IDX(xenstore_buf->rsp_cons), size);
+        off += size;
+        notify = (xenstore_buf->rsp_cons + XENSTORE_RING_SIZE ==
+                  xenstore_buf->rsp_prod);
+        rmb();
+        xenstore_buf->rsp_cons += size;
+        wmb();
+        if (notify)
+            notify_remote_via_evtchn(xenbus_evtchn);
+    }
+}
+
 static void xenbus_thread_func(void *ign)
 {
     struct xsd_sockmsg msg;
-    unsigned prod = xenstore_buf->rsp_prod;
+    char *data;
 
     for (;;) {
-        wait_event(xb_waitq, prod != xenstore_buf->rsp_prod);
-        while (1) {
-            prod = xenstore_buf->rsp_prod;
-            DEBUG("Rsp_cons %d, rsp_prod %d.\n", xenstore_buf->rsp_cons,
-                  xenstore_buf->rsp_prod);
-            if (xenstore_buf->rsp_prod - xenstore_buf->rsp_cons < sizeof(msg))
-                break;
-            rmb();
-            memcpy_from_ring(xenstore_buf->rsp, &msg,
-                             MASK_XENSTORE_IDX(xenstore_buf->rsp_cons),
-                             sizeof(msg));
-            DEBUG("Msg len %d, %d avail, id %d.\n", msg.len + sizeof(msg),
-                  xenstore_buf->rsp_prod - xenstore_buf->rsp_cons, msg.req_id);
-
-            if (xenstore_buf->rsp_prod - xenstore_buf->rsp_cons <
-                sizeof(msg) + msg.len)
-                break;
-
-            DEBUG("Message is good.\n");
-
-            if (msg.type == XS_WATCH_EVENT) {
-                struct xenbus_event *event = malloc(sizeof(*event) + msg.len);
-                xenbus_event_queue *events = NULL;
-                char *data = (char*)event + sizeof(*event);
-                struct watch *watch;
-
-                memcpy_from_ring(xenstore_buf->rsp, data,
-                    MASK_XENSTORE_IDX(xenstore_buf->rsp_cons + sizeof(msg)),
-                    msg.len);
-
-                event->path = data;
-                event->token = event->path + strlen(event->path) + 1;
-
-                mb();
-                xenstore_buf->rsp_cons += msg.len + sizeof(msg);
-
-                for (watch = watches; watch; watch = watch->next)
-                    if (!strcmp(watch->token, event->token)) {
-                        events = watch->events;
-                        break;
-                    }
-
-                if (events) {
-                    event->next = *events;
-                    *events = event;
-                    wake_up(&xenbus_watch_queue);
-                } else {
-                    printk("unexpected watch token %s\n", event->token);
-                    free(event);
+        xenbus_read_data((char *)&msg, sizeof(msg));
+        DEBUG("Msg len %d, %d avail, id %d.\n", msg.len + sizeof(msg),
+              xenstore_buf->rsp_prod - xenstore_buf->rsp_cons, msg.req_id);
+
+        if (msg.len > XENSTORE_PAYLOAD_MAX) {
+            printk("Xenstore violates protocol, message longer than allowed.\n");
+            return;
+        }
+
+        if (msg.type == XS_WATCH_EVENT) {
+            struct xenbus_event *event = malloc(sizeof(*event) + msg.len);
+            xenbus_event_queue *events = NULL;
+            struct watch *watch;
+            char *c;
+            int zeroes = 0;
+
+            data = (char*)event + sizeof(*event);
+            xenbus_read_data(data, msg.len);
+
+            for (c = data; c < data + msg.len; c++)
+                if (!*c)
+                    zeroes++;
+            if (zeroes != 2) {
+                printk("Xenstore: illegal watch event data\n");
+                free(event);
+                continue;
+            }
+
+            event->path = data;
+            event->token = event->path + strlen(event->path) + 1;
+
+            for (watch = watches; watch; watch = watch->next)
+                if (!strcmp(watch->token, event->token)) {
+                    events = watch->events;
+                    break;
                 }
+
+            if (events) {
+                event->next = *events;
+                *events = event;
+                wake_up(&xenbus_watch_queue);
             } else {
-                req_info[msg.req_id].reply = malloc(sizeof(msg) + msg.len);
-                memcpy_from_ring(xenstore_buf->rsp, req_info[msg.req_id].reply,
-                                 MASK_XENSTORE_IDX(xenstore_buf->rsp_cons),
-                                 msg.len + sizeof(msg));
-                mb();
-                xenstore_buf->rsp_cons += msg.len + sizeof(msg);
-                wake_up(&req_info[msg.req_id].waitq);
+                printk("Xenstore: unexpected watch token %s\n", event->token);
+                free(event);
             }
 
-            wmb();
-            notify_remote_via_evtchn(xenbus_evtchn);
+            continue;
         }
+
+        data = malloc(sizeof(msg) + msg.len);
+        memcpy(data, &msg, sizeof(msg));
+        xenbus_read_data(data + sizeof(msg), msg.len);
+
+        if (msg.req_id >= NR_REQS || !req_info[msg.req_id].in_use) {
+            printk("Xenstore: illegal request id %d\n", msg.req_id);
+            free(data);
+            continue;
+        }
+
+        DEBUG("Message is good.\n");
+
+        req_info[msg.req_id].reply = data;
+
+        wake_up(&req_info[msg.req_id].waitq);
     }
 }
 
@@ -451,36 +483,40 @@ static void xb_write(int type, int req_id, xenbus_transaction_t trans_id,
 
     cur_req = &header_req;
 
-    BUG_ON(len > XENSTORE_RING_SIZE);
-    /* Wait for the ring to drain to the point where we can send the
-       message. */
-    prod = xenstore_buf->req_prod;
-    if (prod + len - xenstore_buf->req_cons > XENSTORE_RING_SIZE) 
-    {
-        /* Wait for there to be space on the ring */
-        DEBUG("prod %d, len %d, cons %d, size %d; waiting.\n",
-                prod, len, xenstore_buf->req_cons, XENSTORE_RING_SIZE);
-        wait_event(xb_waitq,
-                xenstore_buf->req_prod + len - xenstore_buf->req_cons <=
-                XENSTORE_RING_SIZE);
-        DEBUG("Back from wait.\n");
-        prod = xenstore_buf->req_prod;
-    }
+    BUG_ON(len > XENSTORE_PAYLOAD_MAX);
 
-    /* We're now guaranteed to be able to send the message without
-       overflowing the ring.  Do so. */
+    /* Make sure we are the only thread trying to write. */
+    down(&xb_write_sem);
+
+    /* Send the message in chunks using free ring space when available. */
     total_off = 0;
     req_off = 0;
-    while (total_off < len) 
+    while (total_off < len)
     {
+        prod = xenstore_buf->req_prod;
+        if (prod - xenstore_buf->req_cons >= XENSTORE_RING_SIZE)
+        {
+            /* Send evtchn to notify remote */
+            notify_remote_via_evtchn(xenbus_evtchn);
+
+            /* Wait for there to be space on the ring */
+            DEBUG("prod %d, len %d, cons %d, size %d; waiting.\n", prod,
+                  len - total_off, xenstore_buf->req_cons, XENSTORE_RING_SIZE);
+            wait_event(xb_waitq,
+                       prod - xenstore_buf->req_cons < XENSTORE_RING_SIZE);
+            DEBUG("Back from wait.\n");
+        }
+
         this_chunk = min(cur_req->len - req_off,
-                XENSTORE_RING_SIZE - MASK_XENSTORE_IDX(prod));
+                         XENSTORE_RING_SIZE - MASK_XENSTORE_IDX(prod));
+        this_chunk = min(this_chunk,
+                         xenstore_buf->req_cons + XENSTORE_RING_SIZE - prod);
         memcpy((char *)xenstore_buf->req + MASK_XENSTORE_IDX(prod),
-                (char *)cur_req->data + req_off, this_chunk);
+               (char *)cur_req->data + req_off, this_chunk);
         prod += this_chunk;
         req_off += this_chunk;
         total_off += this_chunk;
-        if (req_off == cur_req->len) 
+        if (req_off == cur_req->len)
         {
             req_off = 0;
             if (cur_req == &header_req)
@@ -488,20 +524,20 @@ static void xb_write(int type, int req_id, xenbus_transaction_t trans_id,
             else
                 cur_req++;
         }
+
+        /* Remote must see entire message before updating indexes */
+        wmb();
+        xenstore_buf->req_prod = prod;
     }
 
+    /* Send evtchn to notify remote */
+    notify_remote_via_evtchn(xenbus_evtchn);
+
     DEBUG("Complete main loop of xb_write.\n");
     BUG_ON(req_off != 0);
     BUG_ON(total_off != len);
-    BUG_ON(prod > xenstore_buf->req_cons + XENSTORE_RING_SIZE);
-
-    /* Remote must see entire message before updating indexes */
-    wmb();
-
-    xenstore_buf->req_prod += len;
 
-    /* Send evtchn to notify remote */
-    notify_remote_via_evtchn(xenbus_evtchn);
+    up(&xb_write_sem);
 }
 
 /* Send a mesasge to xenbus, in the same fashion as xb_write, and
-- 
2.26.2


Re: [PATCH] mini-os: xenbus: support large messages
Posted by Juergen Gross 2 years, 7 months ago
On 18.08.21 17:26, Juergen Gross wrote:
> Today the implementation of the xenbus protocol in Mini-OS will only
> allow to transfer the complete message to or from the ring page buffer.
> This is limiting the maximum message size to lower values as the xenbus
> protocol normally would allow.
> 
> Change that by allowing to transfer the xenbus message in chunks as
> soon as they are available.
> 
> Avoid crashing Mini-OS in case of illegal data read from the ring
> buffer.
> 
> Signed-off-by: Juergen Gross <jgross@suse.com>

Ping?


Juergen

> ---
>   xenbus/xenbus.c | 212 ++++++++++++++++++++++++++++--------------------
>   1 file changed, 124 insertions(+), 88 deletions(-)
> 
> diff --git a/xenbus/xenbus.c b/xenbus/xenbus.c
> index 23de61e..3fbb122 100644
> --- a/xenbus/xenbus.c
> +++ b/xenbus/xenbus.c
> @@ -29,6 +29,7 @@
>   #include <xen/hvm/params.h>
>   #include <mini-os/spinlock.h>
>   #include <mini-os/xmalloc.h>
> +#include <mini-os/semaphore.h>
>   
>   #define min(x,y) ({                       \
>           typeof(x) tmpx = (x);                 \
> @@ -46,6 +47,7 @@
>   static struct xenstore_domain_interface *xenstore_buf;
>   static DECLARE_WAIT_QUEUE_HEAD(xb_waitq);
>   DECLARE_WAIT_QUEUE_HEAD(xenbus_watch_queue);
> +static __DECLARE_SEMAPHORE_GENERIC(xb_write_sem, 1);
>   
>   xenbus_event_queue xenbus_events;
>   static struct watch {
> @@ -231,75 +233,105 @@ char *xenbus_wait_for_state_change(const char* path, XenbusState *state, xenbus_
>   }
>   
>   
> +static void xenbus_read_data(char *buf, unsigned int len)
> +{
> +    unsigned int off = 0;
> +    unsigned int prod;
> +    unsigned int size;
> +    int notify;
> +
> +    while (off != len)
> +    {
> +        if (xenstore_buf->rsp_prod == xenstore_buf->rsp_cons)
> +            wait_event(xb_waitq,
> +                       xenstore_buf->rsp_prod != xenstore_buf->rsp_cons);
> +
> +        prod = xenstore_buf->rsp_prod;
> +        DEBUG("Rsp_cons %d, rsp_prod %d.\n", xenstore_buf->rsp_cons, prod);
> +        size = min(len - off, prod - xenstore_buf->rsp_cons);
> +        memcpy_from_ring(xenstore_buf->rsp, buf + off,
> +                         MASK_XENSTORE_IDX(xenstore_buf->rsp_cons), size);
> +        off += size;
> +        notify = (xenstore_buf->rsp_cons + XENSTORE_RING_SIZE ==
> +                  xenstore_buf->rsp_prod);
> +        rmb();
> +        xenstore_buf->rsp_cons += size;
> +        wmb();
> +        if (notify)
> +            notify_remote_via_evtchn(xenbus_evtchn);
> +    }
> +}
> +
>   static void xenbus_thread_func(void *ign)
>   {
>       struct xsd_sockmsg msg;
> -    unsigned prod = xenstore_buf->rsp_prod;
> +    char *data;
>   
>       for (;;) {
> -        wait_event(xb_waitq, prod != xenstore_buf->rsp_prod);
> -        while (1) {
> -            prod = xenstore_buf->rsp_prod;
> -            DEBUG("Rsp_cons %d, rsp_prod %d.\n", xenstore_buf->rsp_cons,
> -                  xenstore_buf->rsp_prod);
> -            if (xenstore_buf->rsp_prod - xenstore_buf->rsp_cons < sizeof(msg))
> -                break;
> -            rmb();
> -            memcpy_from_ring(xenstore_buf->rsp, &msg,
> -                             MASK_XENSTORE_IDX(xenstore_buf->rsp_cons),
> -                             sizeof(msg));
> -            DEBUG("Msg len %d, %d avail, id %d.\n", msg.len + sizeof(msg),
> -                  xenstore_buf->rsp_prod - xenstore_buf->rsp_cons, msg.req_id);
> -
> -            if (xenstore_buf->rsp_prod - xenstore_buf->rsp_cons <
> -                sizeof(msg) + msg.len)
> -                break;
> -
> -            DEBUG("Message is good.\n");
> -
> -            if (msg.type == XS_WATCH_EVENT) {
> -                struct xenbus_event *event = malloc(sizeof(*event) + msg.len);
> -                xenbus_event_queue *events = NULL;
> -                char *data = (char*)event + sizeof(*event);
> -                struct watch *watch;
> -
> -                memcpy_from_ring(xenstore_buf->rsp, data,
> -                    MASK_XENSTORE_IDX(xenstore_buf->rsp_cons + sizeof(msg)),
> -                    msg.len);
> -
> -                event->path = data;
> -                event->token = event->path + strlen(event->path) + 1;
> -
> -                mb();
> -                xenstore_buf->rsp_cons += msg.len + sizeof(msg);
> -
> -                for (watch = watches; watch; watch = watch->next)
> -                    if (!strcmp(watch->token, event->token)) {
> -                        events = watch->events;
> -                        break;
> -                    }
> -
> -                if (events) {
> -                    event->next = *events;
> -                    *events = event;
> -                    wake_up(&xenbus_watch_queue);
> -                } else {
> -                    printk("unexpected watch token %s\n", event->token);
> -                    free(event);
> +        xenbus_read_data((char *)&msg, sizeof(msg));
> +        DEBUG("Msg len %d, %d avail, id %d.\n", msg.len + sizeof(msg),
> +              xenstore_buf->rsp_prod - xenstore_buf->rsp_cons, msg.req_id);
> +
> +        if (msg.len > XENSTORE_PAYLOAD_MAX) {
> +            printk("Xenstore violates protocol, message longer than allowed.\n");
> +            return;
> +        }
> +
> +        if (msg.type == XS_WATCH_EVENT) {
> +            struct xenbus_event *event = malloc(sizeof(*event) + msg.len);
> +            xenbus_event_queue *events = NULL;
> +            struct watch *watch;
> +            char *c;
> +            int zeroes = 0;
> +
> +            data = (char*)event + sizeof(*event);
> +            xenbus_read_data(data, msg.len);
> +
> +            for (c = data; c < data + msg.len; c++)
> +                if (!*c)
> +                    zeroes++;
> +            if (zeroes != 2) {
> +                printk("Xenstore: illegal watch event data\n");
> +                free(event);
> +                continue;
> +            }
> +
> +            event->path = data;
> +            event->token = event->path + strlen(event->path) + 1;
> +
> +            for (watch = watches; watch; watch = watch->next)
> +                if (!strcmp(watch->token, event->token)) {
> +                    events = watch->events;
> +                    break;
>                   }
> +
> +            if (events) {
> +                event->next = *events;
> +                *events = event;
> +                wake_up(&xenbus_watch_queue);
>               } else {
> -                req_info[msg.req_id].reply = malloc(sizeof(msg) + msg.len);
> -                memcpy_from_ring(xenstore_buf->rsp, req_info[msg.req_id].reply,
> -                                 MASK_XENSTORE_IDX(xenstore_buf->rsp_cons),
> -                                 msg.len + sizeof(msg));
> -                mb();
> -                xenstore_buf->rsp_cons += msg.len + sizeof(msg);
> -                wake_up(&req_info[msg.req_id].waitq);
> +                printk("Xenstore: unexpected watch token %s\n", event->token);
> +                free(event);
>               }
>   
> -            wmb();
> -            notify_remote_via_evtchn(xenbus_evtchn);
> +            continue;
>           }
> +
> +        data = malloc(sizeof(msg) + msg.len);
> +        memcpy(data, &msg, sizeof(msg));
> +        xenbus_read_data(data + sizeof(msg), msg.len);
> +
> +        if (msg.req_id >= NR_REQS || !req_info[msg.req_id].in_use) {
> +            printk("Xenstore: illegal request id %d\n", msg.req_id);
> +            free(data);
> +            continue;
> +        }
> +
> +        DEBUG("Message is good.\n");
> +
> +        req_info[msg.req_id].reply = data;
> +
> +        wake_up(&req_info[msg.req_id].waitq);
>       }
>   }
>   
> @@ -451,36 +483,40 @@ static void xb_write(int type, int req_id, xenbus_transaction_t trans_id,
>   
>       cur_req = &header_req;
>   
> -    BUG_ON(len > XENSTORE_RING_SIZE);
> -    /* Wait for the ring to drain to the point where we can send the
> -       message. */
> -    prod = xenstore_buf->req_prod;
> -    if (prod + len - xenstore_buf->req_cons > XENSTORE_RING_SIZE)
> -    {
> -        /* Wait for there to be space on the ring */
> -        DEBUG("prod %d, len %d, cons %d, size %d; waiting.\n",
> -                prod, len, xenstore_buf->req_cons, XENSTORE_RING_SIZE);
> -        wait_event(xb_waitq,
> -                xenstore_buf->req_prod + len - xenstore_buf->req_cons <=
> -                XENSTORE_RING_SIZE);
> -        DEBUG("Back from wait.\n");
> -        prod = xenstore_buf->req_prod;
> -    }
> +    BUG_ON(len > XENSTORE_PAYLOAD_MAX);
>   
> -    /* We're now guaranteed to be able to send the message without
> -       overflowing the ring.  Do so. */
> +    /* Make sure we are the only thread trying to write. */
> +    down(&xb_write_sem);
> +
> +    /* Send the message in chunks using free ring space when available. */
>       total_off = 0;
>       req_off = 0;
> -    while (total_off < len)
> +    while (total_off < len)
>       {
> +        prod = xenstore_buf->req_prod;
> +        if (prod - xenstore_buf->req_cons >= XENSTORE_RING_SIZE)
> +        {
> +            /* Send evtchn to notify remote */
> +            notify_remote_via_evtchn(xenbus_evtchn);
> +
> +            /* Wait for there to be space on the ring */
> +            DEBUG("prod %d, len %d, cons %d, size %d; waiting.\n", prod,
> +                  len - total_off, xenstore_buf->req_cons, XENSTORE_RING_SIZE);
> +            wait_event(xb_waitq,
> +                       prod - xenstore_buf->req_cons < XENSTORE_RING_SIZE);
> +            DEBUG("Back from wait.\n");
> +        }
> +
>           this_chunk = min(cur_req->len - req_off,
> -                XENSTORE_RING_SIZE - MASK_XENSTORE_IDX(prod));
> +                         XENSTORE_RING_SIZE - MASK_XENSTORE_IDX(prod));
> +        this_chunk = min(this_chunk,
> +                         xenstore_buf->req_cons + XENSTORE_RING_SIZE - prod);
>           memcpy((char *)xenstore_buf->req + MASK_XENSTORE_IDX(prod),
> -                (char *)cur_req->data + req_off, this_chunk);
> +               (char *)cur_req->data + req_off, this_chunk);
>           prod += this_chunk;
>           req_off += this_chunk;
>           total_off += this_chunk;
> -        if (req_off == cur_req->len)
> +        if (req_off == cur_req->len)
>           {
>               req_off = 0;
>               if (cur_req == &header_req)
> @@ -488,20 +524,20 @@ static void xb_write(int type, int req_id, xenbus_transaction_t trans_id,
>               else
>                   cur_req++;
>           }
> +
> +        /* Remote must see entire message before updating indexes */
> +        wmb();
> +        xenstore_buf->req_prod = prod;
>       }
>   
> +    /* Send evtchn to notify remote */
> +    notify_remote_via_evtchn(xenbus_evtchn);
> +
>       DEBUG("Complete main loop of xb_write.\n");
>       BUG_ON(req_off != 0);
>       BUG_ON(total_off != len);
> -    BUG_ON(prod > xenstore_buf->req_cons + XENSTORE_RING_SIZE);
> -
> -    /* Remote must see entire message before updating indexes */
> -    wmb();
> -
> -    xenstore_buf->req_prod += len;
>   
> -    /* Send evtchn to notify remote */
> -    notify_remote_via_evtchn(xenbus_evtchn);
> +    up(&xb_write_sem);
>   }
>   
>   /* Send a mesasge to xenbus, in the same fashion as xb_write, and
> 

Re: [PATCH] mini-os: xenbus: support large messages
Posted by Samuel Thibault 2 years, 7 months ago
Hello,

Thanks for having worked on this!

Juergen Gross, le mer. 18 août 2021 17:26:10 +0200, a ecrit:
> +static void xenbus_read_data(char *buf, unsigned int len)
> +{
> +    unsigned int off = 0;
> +    unsigned int prod;
> +    unsigned int size;
> +    int notify;
> +
> +    while (off != len)
> +    {
> +        if (xenstore_buf->rsp_prod == xenstore_buf->rsp_cons)
> +            wait_event(xb_waitq,
> +                       xenstore_buf->rsp_prod != xenstore_buf->rsp_cons);

The if is redundant since wait_event already tests it.

> +        prod = xenstore_buf->rsp_prod;
> +        DEBUG("Rsp_cons %d, rsp_prod %d.\n", xenstore_buf->rsp_cons, prod);
> +        size = min(len - off, prod - xenstore_buf->rsp_cons);
> +        memcpy_from_ring(xenstore_buf->rsp, buf + off,
> +                         MASK_XENSTORE_IDX(xenstore_buf->rsp_cons), size);
> +        off += size;
> +        notify = (xenstore_buf->rsp_cons + XENSTORE_RING_SIZE ==
> +                  xenstore_buf->rsp_prod);

This looks odd to me?  We want to notify as soon as the ring is empty,
which can happen at any place in the ring right?

Linux' code uses (intf->rsp_prod - cons >= XENSTORE_RING_SIZE), *after*
the rsp_cons increase.

> +        rmb();

rmb() must be placed before memcpy_from_ring, to make sure that the data
we read from the buffer is up-to-date according to the read we made from
rsp_prod.

The rest seems ok to me.

Samuel

Re: [PATCH] mini-os: xenbus: support large messages
Posted by Juergen Gross 2 years, 7 months ago
On 15.09.21 01:17, Samuel Thibault wrote:
> Hello,
> 
> Thanks for having worked on this!
> 
> Juergen Gross, le mer. 18 août 2021 17:26:10 +0200, a ecrit:
>> +static void xenbus_read_data(char *buf, unsigned int len)
>> +{
>> +    unsigned int off = 0;
>> +    unsigned int prod;
>> +    unsigned int size;
>> +    int notify;
>> +
>> +    while (off != len)
>> +    {
>> +        if (xenstore_buf->rsp_prod == xenstore_buf->rsp_cons)
>> +            wait_event(xb_waitq,
>> +                       xenstore_buf->rsp_prod != xenstore_buf->rsp_cons);
> 
> The if is redundant since wait_event already tests it.

Ah, right.

> 
>> +        prod = xenstore_buf->rsp_prod;
>> +        DEBUG("Rsp_cons %d, rsp_prod %d.\n", xenstore_buf->rsp_cons, prod);
>> +        size = min(len - off, prod - xenstore_buf->rsp_cons);
>> +        memcpy_from_ring(xenstore_buf->rsp, buf + off,
>> +                         MASK_XENSTORE_IDX(xenstore_buf->rsp_cons), size);
>> +        off += size;
>> +        notify = (xenstore_buf->rsp_cons + XENSTORE_RING_SIZE ==
>> +                  xenstore_buf->rsp_prod);
> 
> This looks odd to me?  We want to notify as soon as the ring is empty,
> which can happen at any place in the ring right?

No, we want to notify if the ring was full and is about to gain some
space again, as the other side was probably not able to put all data
in and is now waiting for more space to become available.

> 
> Linux' code uses (intf->rsp_prod - cons >= XENSTORE_RING_SIZE), *after*
> the rsp_cons increase.
> 
>> +        rmb();
> 
> rmb() must be placed before memcpy_from_ring, to make sure that the data
> we read from the buffer is up-to-date according to the read we made from
> rsp_prod.

Ah, yes. Thanks for spotting that one!

> 
> The rest seems ok to me.

Thanks,


Juergen
Re: [PATCH] mini-os: xenbus: support large messages
Posted by Samuel Thibault 2 years, 7 months ago
Juergen Gross, le mer. 15 sept. 2021 12:48:44 +0200, a ecrit:
> On 15.09.21 01:17, Samuel Thibault wrote:
> > > +        prod = xenstore_buf->rsp_prod;
> > > +        DEBUG("Rsp_cons %d, rsp_prod %d.\n", xenstore_buf->rsp_cons, prod);
> > > +        size = min(len - off, prod - xenstore_buf->rsp_cons);
> > > +        memcpy_from_ring(xenstore_buf->rsp, buf + off,
> > > +                         MASK_XENSTORE_IDX(xenstore_buf->rsp_cons), size);
> > > +        off += size;
> > > +        notify = (xenstore_buf->rsp_cons + XENSTORE_RING_SIZE ==
> > > +                  xenstore_buf->rsp_prod);
> > 
> > This looks odd to me?  We want to notify as soon as the ring is empty,
> > which can happen at any place in the ring right?
> 
> No, we want to notify if the ring was full and is about to gain some
> space again, as the other side was probably not able to put all data
> in and is now waiting for more space to become available.

Ok, that said, the producer may fill the ring between this test and
the rsp_cons update, and thus the producer will go sleep and here the
consumer will not notice it and thus never notify it.

So we really need to make the test after the rsp_cons update, like Linux
does:

> > Linux' code uses (intf->rsp_prod - cons >= XENSTORE_RING_SIZE), *after*
> > the rsp_cons increase.

Samuel

Re: [PATCH] mini-os: xenbus: support large messages
Posted by Juergen Gross 2 years, 7 months ago
On 15.09.21 13:20, Samuel Thibault wrote:
> Juergen Gross, le mer. 15 sept. 2021 12:48:44 +0200, a ecrit:
>> On 15.09.21 01:17, Samuel Thibault wrote:
>>>> +        prod = xenstore_buf->rsp_prod;
>>>> +        DEBUG("Rsp_cons %d, rsp_prod %d.\n", xenstore_buf->rsp_cons, prod);
>>>> +        size = min(len - off, prod - xenstore_buf->rsp_cons);
>>>> +        memcpy_from_ring(xenstore_buf->rsp, buf + off,
>>>> +                         MASK_XENSTORE_IDX(xenstore_buf->rsp_cons), size);
>>>> +        off += size;
>>>> +        notify = (xenstore_buf->rsp_cons + XENSTORE_RING_SIZE ==
>>>> +                  xenstore_buf->rsp_prod);
>>>
>>> This looks odd to me?  We want to notify as soon as the ring is empty,
>>> which can happen at any place in the ring right?
>>
>> No, we want to notify if the ring was full and is about to gain some
>> space again, as the other side was probably not able to put all data
>> in and is now waiting for more space to become available.
> 
> Ok, that said, the producer may fill the ring between this test and
> the rsp_cons update, and thus the producer will go sleep and here the
> consumer will not notice it and thus never notify it.
> 
> So we really need to make the test after the rsp_cons update, like Linux
> does:
> 
>>> Linux' code uses (intf->rsp_prod - cons >= XENSTORE_RING_SIZE), *after*
>>> the rsp_cons increase.

Oh, you are right, of course. How could I overlook this?

Thanks,


Juergen
Re: [PATCH] mini-os: xenbus: support large messages
Posted by Samuel Thibault 2 years, 7 months ago
Juergen Gross, le mer. 15 sept. 2021 14:03:35 +0200, a ecrit:
> On 15.09.21 13:20, Samuel Thibault wrote:
> > So we really need to make the test after the rsp_cons update, like Linux
> > does:
> > 
> > > > Linux' code uses (intf->rsp_prod - cons >= XENSTORE_RING_SIZE), *after*
> > > > the rsp_cons increase.
> 
> Oh, you are right, of course. How could I overlook this?

One person alone will always overlook something, that's why we make
code review, to have several pairs of eyes check whether we may have
forgotten something :)

Samuel