xenbus/xenbus.c | 209 ++++++++++++++++++++++++++++-------------------- 1 file changed, 121 insertions(+), 88 deletions(-)
Today the implementation of the xenbus protocol in Mini-OS will only
allow to transfer the complete message to or from the ring page buffer.
This is limiting the maximum message size to lower values as the xenbus
protocol normally would allow.
Change that by allowing to transfer the xenbus message in chunks as
soon as they are available.
Avoid crashing Mini-OS in case of illegal data read from the ring
buffer.
Signed-off-by: Juergen Gross <jgross@suse.com>
---
V2:
- drop redundant if (Samuel Thibault)
- move rmb() (Samuel Thibault)
V3:
- correct notification test (Samuel Thibault)
---
xenbus/xenbus.c | 209 ++++++++++++++++++++++++++++--------------------
1 file changed, 121 insertions(+), 88 deletions(-)
diff --git a/xenbus/xenbus.c b/xenbus/xenbus.c
index 23de61e..c8d358d 100644
--- a/xenbus/xenbus.c
+++ b/xenbus/xenbus.c
@@ -29,6 +29,7 @@
#include <xen/hvm/params.h>
#include <mini-os/spinlock.h>
#include <mini-os/xmalloc.h>
+#include <mini-os/semaphore.h>
#define min(x,y) ({ \
typeof(x) tmpx = (x); \
@@ -46,6 +47,7 @@
static struct xenstore_domain_interface *xenstore_buf;
static DECLARE_WAIT_QUEUE_HEAD(xb_waitq);
DECLARE_WAIT_QUEUE_HEAD(xenbus_watch_queue);
+static __DECLARE_SEMAPHORE_GENERIC(xb_write_sem, 1);
xenbus_event_queue xenbus_events;
static struct watch {
@@ -231,75 +233,102 @@ char *xenbus_wait_for_state_change(const char* path, XenbusState *state, xenbus_
}
+static void xenbus_read_data(char *buf, unsigned int len)
+{
+ unsigned int off = 0;
+ unsigned int prod, cons;
+ unsigned int size;
+
+ while (off != len)
+ {
+ wait_event(xb_waitq, xenstore_buf->rsp_prod != xenstore_buf->rsp_cons);
+
+ prod = xenstore_buf->rsp_prod;
+ cons = xenstore_buf->rsp_cons;
+ DEBUG("Rsp_cons %d, rsp_prod %d.\n", cons, prod);
+ size = min(len - off, prod - cons);
+
+ rmb(); /* Make sure data read from ring is ordered with rsp_prod. */
+ memcpy_from_ring(xenstore_buf->rsp, buf + off,
+ MASK_XENSTORE_IDX(cons), size);
+ off += size;
+ xenstore_buf->rsp_cons += size;
+ wmb();
+ if (xenstore_buf->rsp_prod - cons >= XENSTORE_RING_SIZE)
+ notify_remote_via_evtchn(xenbus_evtchn);
+ }
+}
+
static void xenbus_thread_func(void *ign)
{
struct xsd_sockmsg msg;
- unsigned prod = xenstore_buf->rsp_prod;
+ char *data;
for (;;) {
- wait_event(xb_waitq, prod != xenstore_buf->rsp_prod);
- while (1) {
- prod = xenstore_buf->rsp_prod;
- DEBUG("Rsp_cons %d, rsp_prod %d.\n", xenstore_buf->rsp_cons,
- xenstore_buf->rsp_prod);
- if (xenstore_buf->rsp_prod - xenstore_buf->rsp_cons < sizeof(msg))
- break;
- rmb();
- memcpy_from_ring(xenstore_buf->rsp, &msg,
- MASK_XENSTORE_IDX(xenstore_buf->rsp_cons),
- sizeof(msg));
- DEBUG("Msg len %d, %d avail, id %d.\n", msg.len + sizeof(msg),
- xenstore_buf->rsp_prod - xenstore_buf->rsp_cons, msg.req_id);
-
- if (xenstore_buf->rsp_prod - xenstore_buf->rsp_cons <
- sizeof(msg) + msg.len)
- break;
-
- DEBUG("Message is good.\n");
-
- if (msg.type == XS_WATCH_EVENT) {
- struct xenbus_event *event = malloc(sizeof(*event) + msg.len);
- xenbus_event_queue *events = NULL;
- char *data = (char*)event + sizeof(*event);
- struct watch *watch;
-
- memcpy_from_ring(xenstore_buf->rsp, data,
- MASK_XENSTORE_IDX(xenstore_buf->rsp_cons + sizeof(msg)),
- msg.len);
-
- event->path = data;
- event->token = event->path + strlen(event->path) + 1;
-
- mb();
- xenstore_buf->rsp_cons += msg.len + sizeof(msg);
-
- for (watch = watches; watch; watch = watch->next)
- if (!strcmp(watch->token, event->token)) {
- events = watch->events;
- break;
- }
-
- if (events) {
- event->next = *events;
- *events = event;
- wake_up(&xenbus_watch_queue);
- } else {
- printk("unexpected watch token %s\n", event->token);
- free(event);
+ xenbus_read_data((char *)&msg, sizeof(msg));
+ DEBUG("Msg len %d, %d avail, id %d.\n", msg.len + sizeof(msg),
+ xenstore_buf->rsp_prod - xenstore_buf->rsp_cons, msg.req_id);
+
+ if (msg.len > XENSTORE_PAYLOAD_MAX) {
+ printk("Xenstore violates protocol, message longer than allowed.\n");
+ return;
+ }
+
+ if (msg.type == XS_WATCH_EVENT) {
+ struct xenbus_event *event = malloc(sizeof(*event) + msg.len);
+ xenbus_event_queue *events = NULL;
+ struct watch *watch;
+ char *c;
+ int zeroes = 0;
+
+ data = (char*)event + sizeof(*event);
+ xenbus_read_data(data, msg.len);
+
+ for (c = data; c < data + msg.len; c++)
+ if (!*c)
+ zeroes++;
+ if (zeroes != 2) {
+ printk("Xenstore: illegal watch event data\n");
+ free(event);
+ continue;
+ }
+
+ event->path = data;
+ event->token = event->path + strlen(event->path) + 1;
+
+ for (watch = watches; watch; watch = watch->next)
+ if (!strcmp(watch->token, event->token)) {
+ events = watch->events;
+ break;
}
+
+ if (events) {
+ event->next = *events;
+ *events = event;
+ wake_up(&xenbus_watch_queue);
} else {
- req_info[msg.req_id].reply = malloc(sizeof(msg) + msg.len);
- memcpy_from_ring(xenstore_buf->rsp, req_info[msg.req_id].reply,
- MASK_XENSTORE_IDX(xenstore_buf->rsp_cons),
- msg.len + sizeof(msg));
- mb();
- xenstore_buf->rsp_cons += msg.len + sizeof(msg);
- wake_up(&req_info[msg.req_id].waitq);
+ printk("Xenstore: unexpected watch token %s\n", event->token);
+ free(event);
}
- wmb();
- notify_remote_via_evtchn(xenbus_evtchn);
+ continue;
}
+
+ data = malloc(sizeof(msg) + msg.len);
+ memcpy(data, &msg, sizeof(msg));
+ xenbus_read_data(data + sizeof(msg), msg.len);
+
+ if (msg.req_id >= NR_REQS || !req_info[msg.req_id].in_use) {
+ printk("Xenstore: illegal request id %d\n", msg.req_id);
+ free(data);
+ continue;
+ }
+
+ DEBUG("Message is good.\n");
+
+ req_info[msg.req_id].reply = data;
+
+ wake_up(&req_info[msg.req_id].waitq);
}
}
@@ -451,36 +480,40 @@ static void xb_write(int type, int req_id, xenbus_transaction_t trans_id,
cur_req = &header_req;
- BUG_ON(len > XENSTORE_RING_SIZE);
- /* Wait for the ring to drain to the point where we can send the
- message. */
- prod = xenstore_buf->req_prod;
- if (prod + len - xenstore_buf->req_cons > XENSTORE_RING_SIZE)
- {
- /* Wait for there to be space on the ring */
- DEBUG("prod %d, len %d, cons %d, size %d; waiting.\n",
- prod, len, xenstore_buf->req_cons, XENSTORE_RING_SIZE);
- wait_event(xb_waitq,
- xenstore_buf->req_prod + len - xenstore_buf->req_cons <=
- XENSTORE_RING_SIZE);
- DEBUG("Back from wait.\n");
- prod = xenstore_buf->req_prod;
- }
+ BUG_ON(len > XENSTORE_PAYLOAD_MAX);
+
+ /* Make sure we are the only thread trying to write. */
+ down(&xb_write_sem);
- /* We're now guaranteed to be able to send the message without
- overflowing the ring. Do so. */
+ /* Send the message in chunks using free ring space when available. */
total_off = 0;
req_off = 0;
- while (total_off < len)
+ while (total_off < len)
{
+ prod = xenstore_buf->req_prod;
+ if (prod - xenstore_buf->req_cons >= XENSTORE_RING_SIZE)
+ {
+ /* Send evtchn to notify remote */
+ notify_remote_via_evtchn(xenbus_evtchn);
+
+ /* Wait for there to be space on the ring */
+ DEBUG("prod %d, len %d, cons %d, size %d; waiting.\n", prod,
+ len - total_off, xenstore_buf->req_cons, XENSTORE_RING_SIZE);
+ wait_event(xb_waitq,
+ prod - xenstore_buf->req_cons < XENSTORE_RING_SIZE);
+ DEBUG("Back from wait.\n");
+ }
+
this_chunk = min(cur_req->len - req_off,
- XENSTORE_RING_SIZE - MASK_XENSTORE_IDX(prod));
+ XENSTORE_RING_SIZE - MASK_XENSTORE_IDX(prod));
+ this_chunk = min(this_chunk,
+ xenstore_buf->req_cons + XENSTORE_RING_SIZE - prod);
memcpy((char *)xenstore_buf->req + MASK_XENSTORE_IDX(prod),
- (char *)cur_req->data + req_off, this_chunk);
+ (char *)cur_req->data + req_off, this_chunk);
prod += this_chunk;
req_off += this_chunk;
total_off += this_chunk;
- if (req_off == cur_req->len)
+ if (req_off == cur_req->len)
{
req_off = 0;
if (cur_req == &header_req)
@@ -488,20 +521,20 @@ static void xb_write(int type, int req_id, xenbus_transaction_t trans_id,
else
cur_req++;
}
+
+ /* Remote must see entire message before updating indexes */
+ wmb();
+ xenstore_buf->req_prod = prod;
}
+ /* Send evtchn to notify remote */
+ notify_remote_via_evtchn(xenbus_evtchn);
+
DEBUG("Complete main loop of xb_write.\n");
BUG_ON(req_off != 0);
BUG_ON(total_off != len);
- BUG_ON(prod > xenstore_buf->req_cons + XENSTORE_RING_SIZE);
- /* Remote must see entire message before updating indexes */
- wmb();
-
- xenstore_buf->req_prod += len;
-
- /* Send evtchn to notify remote */
- notify_remote_via_evtchn(xenbus_evtchn);
+ up(&xb_write_sem);
}
/* Send a mesasge to xenbus, in the same fashion as xb_write, and
--
2.26.2
Re, Juergen Gross, le mer. 15 sept. 2021 16:08:15 +0200, a ecrit: > + while (off != len) > + { > + wait_event(xb_waitq, xenstore_buf->rsp_prod != xenstore_buf->rsp_cons); > + > + prod = xenstore_buf->rsp_prod; > + cons = xenstore_buf->rsp_cons; > + DEBUG("Rsp_cons %d, rsp_prod %d.\n", cons, prod); > + size = min(len - off, prod - cons); > + > + rmb(); /* Make sure data read from ring is ordered with rsp_prod. */ > + memcpy_from_ring(xenstore_buf->rsp, buf + off, > + MASK_XENSTORE_IDX(cons), size); > + off += size; > + xenstore_buf->rsp_cons += size; > + wmb(); > + if (xenstore_buf->rsp_prod - cons >= XENSTORE_RING_SIZE) > + notify_remote_via_evtchn(xenbus_evtchn); > + } Reading it again, I'm still not convinced we covered all cases. There is at least one thing: Linux does virt_rmb(); memcpy(data, src, avail); /* Other side must not see free space until we've copied out */ virt_mb(); intf->rsp_cons += avail; which makes sense to me: we don't want the compiler or anything to reorder the write to rsp_cons respectively to the memcpy. So I believe we also need a full barrier before xenstore_buf->rsp_cons += size; in mini-os. Then there is xenstore_buf->rsp_cons += size; wmb(); if (xenstore_buf->rsp_prod - cons >= XENSTORE_RING_SIZE) notify_remote_via_evtchn(xenbus_evtchn); The Linux code does intf->rsp_cons += avail; /* Implies mb(): other side will see the updated consumer. */ if (intf->rsp_prod - cons >= XENSTORE_RING_SIZE) notify_remote_via_evtchn(xen_store_evtchn); (Note that the "Implies mb()" comment is about the notify_remote_via_evtchn call) I believe the Linux code itself is missing a full barrier here: before reading intf->rsp_prod, we want to make sure that our update of rsp_cons is seen by the producer. Otherwise it may happen that the producer doesn't see the rsp_cons and updates its rsp_prod (filling the ring and going to sleep), and the consumer does not see the rsp_prod update either, and thus we miss a notification and the producer stays stuck. Extremely rare scenario etc. but I believe we do want a full barrier between rsp_cons += and the if, both in mini-os and Linux. Samuel
© 2016 - 2024 Red Hat, Inc.