From: Ruslan Ruslichenko <Ruslan_Ruslichenko@epam.com>
Implement the mechanism to transfer packets from the dedicated
protocol thread to the main QEMU execution loop for processing.
The patch adds the following features:
- signaling logic using internal pipe to wake up the main loop
- the rp_process handler, which retrieves packets from queue and
dispatches them to the target Remote Port device.
This enables QEMU device models to handle remote events.
Signed-off-by: Edgar E. Iglesias <edgar.iglesias@amd.com>
Signed-off-by: Takahiro Nakata <takahiro.nakata.wr@renesas.com>
Signed-off-by: Ruslan Ruslichenko <Ruslan_Ruslichenko@epam.com>
---
hw/core/remote-port.c | 148 +++++++++++++++++++++++++++++++++-
include/hw/core/remote-port.h | 5 ++
2 files changed, 152 insertions(+), 1 deletion(-)
diff --git a/hw/core/remote-port.c b/hw/core/remote-port.c
index 91b0682884..e44d9249c3 100644
--- a/hw/core/remote-port.c
+++ b/hw/core/remote-port.c
@@ -52,6 +52,8 @@
#define REMOTE_PORT_CLASS(klass) \
OBJECT_CLASS_CHECK(RemotePortClass, (klass), TYPE_REMOTE_PORT)
+static void rp_event_read(void *opaque);
+
static void rp_pkt_dump(const char *prefix, const char *buf, size_t len)
{
qemu_hexdump(stdout, prefix, buf, len);
@@ -96,6 +98,12 @@ ssize_t rp_write(RemotePort *s, const void *buf, size_t count)
return r;
}
+static unsigned int rp_has_work(RemotePort *s)
+{
+ unsigned int work = s->rx_queue.wpos - s->rx_queue.rpos;
+ return work;
+}
+
static void rp_cmd_hello(RemotePort *s, struct rp_pkt *pkt)
{
s->peer.version = pkt->hello.version;
@@ -187,6 +195,135 @@ static Chardev *rp_autocreate_chardev(RemotePort *s, char *name)
return chr;
}
+void rp_process(RemotePort *s)
+{
+ while (true) {
+ struct rp_pkt *pkt;
+ unsigned int rpos;
+ bool actioned = false;
+ RemotePortDevice *dev;
+ RemotePortDeviceClass *rpdc;
+
+ qemu_mutex_lock(&s->rsp_mutex);
+ if (!rp_has_work(s)) {
+ qemu_mutex_unlock(&s->rsp_mutex);
+ break;
+ }
+ rpos = s->rx_queue.rpos;
+
+ pkt = s->rx_queue.pkt[rpos].pkt;
+ D(qemu_log("%s: io-thread rpos=%d wpos=%d cmd=%d dev=%d\n",
+ s->prefix, s->rx_queue.rpos, s->rx_queue.wpos,
+ pkt->hdr.cmd, pkt->hdr.dev));
+
+ /*
+ * To handle recursiveness, we need to advance the index
+ * index before processing the packet.
+ */
+ s->rx_queue.rpos++;
+ s->rx_queue.rpos %= ARRAY_SIZE(s->rx_queue.pkt);
+ qemu_mutex_unlock(&s->rsp_mutex);
+
+ dev = s->devs[pkt->hdr.dev];
+ if (dev) {
+ rpdc = REMOTE_PORT_DEVICE_GET_CLASS(dev);
+ if (rpdc->ops[pkt->hdr.cmd]) {
+ rpdc->ops[pkt->hdr.cmd](dev, pkt);
+ actioned = true;
+ }
+ }
+
+ switch (pkt->hdr.cmd) {
+ /* TBD */
+ default:
+ assert(actioned);
+ }
+
+ s->rx_queue.inuse[rpos] = false;
+ qemu_sem_post(&s->rx_queue.sem);
+ }
+}
+
+static void rp_event_read(void *opaque)
+{
+ RemotePort *s = REMOTE_PORT(opaque);
+ unsigned char buf[32];
+ ssize_t r;
+
+ /* We don't care about the data. Just read it out to clear the event. */
+ do {
+#ifdef _WIN32
+ r = qemu_recv_wrap(s->event.pipe.read, buf, sizeof buf, 0);
+#else
+ r = read(s->event.pipe.read, buf, sizeof buf);
+#endif
+ if (r == 0) {
+ return;
+ }
+ } while (r == sizeof buf || (r < 0 && errno == EINTR));
+
+ rp_process(s);
+}
+
+static void rp_event_notify(RemotePort *s)
+{
+ unsigned char d = 0;
+ ssize_t r;
+
+#ifdef _WIN32
+ /* Mingw is sensitive about doing write's to socket descriptors. */
+ r = qemu_send_wrap(s->event.pipe.write, &d, sizeof d, 0);
+#else
+ r = qemu_write_full(s->event.pipe.write, &d, sizeof d);
+#endif
+ if (r == 0) {
+ hw_error("%s: pipe closed\n", s->prefix);
+ }
+}
+
+/* Handover a pkt to CPU or IO-thread context. */
+static void rp_pt_handover_pkt(RemotePort *s, RemotePortDynPkt *dpkt)
+{
+ bool full;
+
+ /*
+ * Take the rsp lock around the wpos update, otherwise
+ * rp_wait_resp will race with us.
+ */
+ qemu_mutex_lock(&s->rsp_mutex);
+ s->rx_queue.wpos++;
+ s->rx_queue.wpos %= ARRAY_SIZE(s->rx_queue.pkt);
+ /*
+ * Ensure rx_queue index update is visible to consumer
+ * before signaling event, to prevent lost wakeup
+ */
+ smp_mb();
+ rp_event_notify(s);
+ qemu_cond_signal(&s->progress_cond);
+ qemu_mutex_unlock(&s->rsp_mutex);
+
+ do {
+ full = s->rx_queue.inuse[s->rx_queue.wpos];
+ if (full) {
+ qemu_log("%s: FULL rx queue %d\n", __func__, s->rx_queue.wpos);
+ if (qemu_sem_timedwait(&s->rx_queue.sem, 2 * 1000) != 0) {
+#ifndef _WIN32
+ int sval;
+
+#ifndef CONFIG_SEM_TIMEDWAIT
+ sval = s->rx_queue.sem.count;
+#else
+ sem_getvalue(&s->rx_queue.sem.sem, &sval);
+#endif
+ qemu_log("semwait: %d rpos=%u wpos=%u\n", sval,
+ s->rx_queue.rpos, s->rx_queue.wpos);
+#endif
+ qemu_log("Deadlock?\n");
+ }
+ }
+ } while (full);
+}
+
static bool rp_pt_process_pkt(RemotePort *s, RemotePortDynPkt *dpkt)
{
struct rp_pkt *pkt = dpkt->pkt;
@@ -208,7 +345,7 @@ static bool rp_pt_process_pkt(RemotePort *s, RemotePortDynPkt *dpkt)
case RP_CMD_interrupt:
case RP_CMD_ats_req:
case RP_CMD_ats_inv:
- /* TBD */;
+ rp_pt_handover_pkt(s, dpkt);
break;
default:
g_assert_not_reached();
@@ -312,6 +449,8 @@ static void rp_realize(DeviceState *dev, Error **errp)
s->prefix = object_get_canonical_path(OBJECT(dev));
qemu_mutex_init(&s->write_mutex);
+ qemu_mutex_init(&s->rsp_mutex);
+ qemu_cond_init(&s->progress_cond);
if (!qemu_chr_fe_get_driver(&s->chr)) {
char *name;
@@ -413,6 +552,7 @@ static void rp_realize(DeviceState *dev, Error **errp)
s->prefix);
exit(EXIT_FAILURE);
}
+ qemu_set_fd_handler(s->event.pipe.read, rp_event_read, NULL, s);
}
#else
if (!g_unix_open_pipe(s->event.pipes, FD_CLOEXEC, NULL)) {
@@ -427,7 +567,10 @@ static void rp_realize(DeviceState *dev, Error **errp)
exit(EXIT_FAILURE);
}
+ qemu_set_fd_handler(s->event.pipe.read, rp_event_read, NULL, s);
#endif
+
+ qemu_sem_init(&s->rx_queue.sem, ARRAY_SIZE(s->rx_queue.pkt) - 1);
}
static void rp_unrealize(DeviceState *dev)
@@ -436,6 +579,9 @@ static void rp_unrealize(DeviceState *dev)
s->finalizing = true;
+ /* Unregister handler. */
+ qemu_set_fd_handler(s->event.pipe.read, NULL, NULL, s);
+
info_report("%s: Wait for remote-port to disconnect", s->prefix);
qemu_chr_fe_disconnect(&s->chr);
qemu_thread_join(&s->thread);
diff --git a/include/hw/core/remote-port.h b/include/hw/core/remote-port.h
index b88e523894..21dfbe89cd 100644
--- a/include/hw/core/remote-port.h
+++ b/include/hw/core/remote-port.h
@@ -74,6 +74,9 @@ struct RemotePort {
char *chrdev_id;
struct rp_peer_state peer;
+ QemuMutex rsp_mutex;
+ QemuCond progress_cond;
+
#define RX_QUEUE_SIZE 1024
struct {
/* This array must be sized minimum 2 and always a power of 2. */
@@ -100,6 +103,8 @@ struct RemotePort {
RemotePortDevice *devs[REMOTE_PORT_MAX_DEVS];
};
+void rp_process(RemotePort *s);
+
ssize_t rp_write(RemotePort *s, const void *buf, size_t count);
#endif
--
2.43.0