[PATCH 11/29] hw/core: Implement Remote Port response handling

Ruslan Ruslichenko posted 29 patches 1 day, 11 hours ago
[PATCH 11/29] hw/core: Implement Remote Port response handling
Posted by Ruslan Ruslichenko 1 day, 11 hours ago
From: Ruslan Ruslichenko <Ruslan_Ruslichenko@epam.com>

Add infrastructure to handle transaction responses from
remote peers.

This patch enables QEMU to wait for replies when it initiates
a memory access (Read/Write). It implements a mechanism to track
outstanding transaction IDs and block the execution thread until
the patching response is received.

Signed-off-by: Edgar E. Iglesias <edgar.iglesias@amd.com>
Signed-off-by: Takahiro Nakata <takahiro.nakata.wr@renesas.com>
Signed-off-by: Ruslan Ruslichenko <Ruslan_Ruslichenko@epam.com>
---
 hw/core/remote-port.c         | 136 +++++++++++++++++++++++++++++++++-
 include/hw/core/remote-port.h |  29 ++++++++
 2 files changed, 163 insertions(+), 2 deletions(-)

diff --git a/hw/core/remote-port.c b/hw/core/remote-port.c
index e44d9249c3..00c9529348 100644
--- a/hw/core/remote-port.c
+++ b/hw/core/remote-port.c
@@ -59,6 +59,16 @@ static void rp_pkt_dump(const char *prefix, const char *buf, size_t len)
     qemu_hexdump(stdout, prefix, buf, len);
 }
 
+void rp_rsp_mutex_lock(RemotePort *s)
+{
+    qemu_mutex_lock(&s->rsp_mutex);
+}
+
+void rp_rsp_mutex_unlock(RemotePort *s)
+{
+    qemu_mutex_unlock(&s->rsp_mutex);
+}
+
 static void rp_fatal_error(RemotePort *s, const char *reason)
 {
     error_report("%s: %s", s->prefix, reason);
@@ -104,6 +114,80 @@ static unsigned int rp_has_work(RemotePort *s)
     return work;
 }
 
+/* Response handling.  */
+RemotePortRespSlot *rp_dev_timed_wait_resp(RemotePort *s, uint32_t dev,
+                                            uint32_t id, int timems)
+{
+    int i;
+
+    assert(s->devs[dev]);
+
+    /* Find a free slot.  */
+    for (i = 0; i < ARRAY_SIZE(s->dev_state[dev].rsp_queue); i++) {
+        if (s->dev_state[dev].rsp_queue[i].used == false) {
+            break;
+        }
+    }
+
+    if (i == ARRAY_SIZE(s->dev_state[dev].rsp_queue) ||
+        s->dev_state[dev].rsp_queue[i].used == true) {
+        error_report("Number of outstanding transactions exceeded! %d",
+                      RP_MAX_OUTSTANDING_TRANSACTIONS);
+        rp_fatal_error(s, "Internal error");
+    }
+
+    /* Got a slot, fill it in.  */
+    s->dev_state[dev].rsp_queue[i].id = id;
+    s->dev_state[dev].rsp_queue[i].valid = false;
+    s->dev_state[dev].rsp_queue[i].used = true;
+
+    while (!s->dev_state[dev].rsp_queue[i].valid) {
+        rp_rsp_mutex_unlock(s);
+        rp_event_read(s);
+        rp_rsp_mutex_lock(s);
+        if (s->dev_state[dev].rsp_queue[i].valid) {
+            break;
+        }
+        if (!rp_has_work(s)) {
+            if (timems) {
+                if (!qemu_cond_timedwait(&s->progress_cond, &s->rsp_mutex,
+                                       timems)) {
+                    /*
+                     * TimeOut!
+                     */
+                    break;
+                }
+            } else {
+                qemu_cond_wait(&s->progress_cond, &s->rsp_mutex);
+            }
+        }
+    }
+    return &s->dev_state[dev].rsp_queue[i];
+}
+
+RemotePortRespSlot *rp_dev_wait_resp(RemotePort *s, uint32_t dev, uint32_t id)
+{
+    return rp_dev_timed_wait_resp(s, dev, id, 0);
+}
+
+RemotePortDynPkt rp_wait_resp(RemotePort *s)
+{
+    while (!rp_dpkt_is_valid(&s->rspqueue)) {
+        rp_rsp_mutex_unlock(s);
+        rp_event_read(s);
+        rp_rsp_mutex_lock(s);
+        /* Need to recheck the condition with the rsp lock taken.  */
+        if (rp_dpkt_is_valid(&s->rspqueue)) {
+            break;
+        }
+        D(qemu_log("%s: wait for progress\n", __func__));
+        if (!rp_has_work(s)) {
+            qemu_cond_wait(&s->progress_cond, &s->rsp_mutex);
+        }
+    }
+    return s->rspqueue;
+}
+
 static void rp_cmd_hello(RemotePort *s, struct rp_pkt *pkt)
 {
     s->peer.version = pkt->hello.version;
@@ -328,14 +412,52 @@ static bool rp_pt_process_pkt(RemotePort *s, RemotePortDynPkt *dpkt)
 {
     struct rp_pkt *pkt = dpkt->pkt;
 
-    D(qemu_log("%s: cmd=%x id=%d dev=%d\n", __func__, pkt->hdr.cmd,
-             pkt->hdr.id, pkt->hdr.dev));
+    D(qemu_log("%s: cmd=%x id=%d dev=%d rsp=%d\n", __func__, pkt->hdr.cmd,
+             pkt->hdr.id, pkt->hdr.dev,
+             pkt->hdr.flags & RP_PKT_FLAGS_response));
 
     if (pkt->hdr.dev >= ARRAY_SIZE(s->devs)) {
         /* FIXME: Respond with an error.  */
         return true;
     }
 
+    if (pkt->hdr.flags & RP_PKT_FLAGS_response) {
+        uint32_t dev = pkt->hdr.dev;
+        uint32_t id = pkt->hdr.id;
+        int i;
+
+        if (pkt->hdr.flags & RP_PKT_FLAGS_posted) {
+            printf("Drop response for posted packets\n");
+            return true;
+        }
+
+        qemu_mutex_lock(&s->rsp_mutex);
+
+        /* Try to find a per-device slot first.  */
+        for (i = 0; i < ARRAY_SIZE(s->dev_state[dev].rsp_queue); i++) {
+            if (s->devs[dev] && s->dev_state[dev].rsp_queue[i].used == true
+                && s->dev_state[dev].rsp_queue[i].id == id) {
+                break;
+            }
+        }
+
+        if (i < ARRAY_SIZE(s->dev_state[dev].rsp_queue)) {
+            /* Found a per device one.  */
+            assert(s->dev_state[dev].rsp_queue[i].valid == false);
+
+            rp_dpkt_swap(&s->dev_state[dev].rsp_queue[i].rsp, dpkt);
+            s->dev_state[dev].rsp_queue[i].valid = true;
+
+            qemu_cond_signal(&s->progress_cond);
+        } else {
+            rp_dpkt_swap(&s->rspqueue, dpkt);
+            qemu_cond_signal(&s->progress_cond);
+        }
+
+        qemu_mutex_unlock(&s->rsp_mutex);
+        return true;
+    }
+
     switch (pkt->hdr.cmd) {
     case RP_CMD_hello:
         rp_cmd_hello(s, pkt);
@@ -389,6 +511,7 @@ static void *rp_protocol_thread(void *arg)
 
     /* Make sure we have a decent bufsize to start with.  */
     rp_dpkt_alloc(&s->rsp, sizeof s->rsp.pkt->busaccess + 1024);
+    rp_dpkt_alloc(&s->rspqueue, sizeof s->rspqueue.pkt->busaccess + 1024);
     for (i = 0; i < ARRAY_SIZE(s->rx_queue.pkt); i++) {
         rp_dpkt_alloc(&s->rx_queue.pkt[i],
                       sizeof s->rx_queue.pkt[i].pkt->busaccess + 1024);
@@ -614,6 +737,7 @@ static void rp_prop_allow_set_link(const Object *obj, const char *name,
 static void rp_init(Object *obj)
 {
     RemotePort *s = REMOTE_PORT(obj);
+    int t;
     int i;
 
     for (i = 0; i < REMOTE_PORT_MAX_DEVS; ++i) {
@@ -623,6 +747,14 @@ static void rp_init(Object *obj)
                              rp_prop_allow_set_link,
                              OBJ_PROP_LINK_STRONG);
         g_free(name);
+
+
+        for (t = 0; t < RP_MAX_OUTSTANDING_TRANSACTIONS; t++) {
+            s->dev_state[i].rsp_queue[t].used = false;
+            s->dev_state[i].rsp_queue[t].valid = false;
+            rp_dpkt_alloc(&s->dev_state[i].rsp_queue[t].rsp,
+               sizeof s->dev_state[i].rsp_queue[t].rsp.pkt->busaccess + 1024);
+        }
     }
 }
 
diff --git a/include/hw/core/remote-port.h b/include/hw/core/remote-port.h
index 21dfbe89cd..c1b21eb573 100644
--- a/include/hw/core/remote-port.h
+++ b/include/hw/core/remote-port.h
@@ -53,6 +53,13 @@ typedef struct RemotePortDeviceClass {
 #define TYPE_REMOTE_PORT "remote-port"
 #define REMOTE_PORT(obj) OBJECT_CHECK(RemotePort, (obj), TYPE_REMOTE_PORT)
 
+typedef struct RemotePortRespSlot {
+            RemotePortDynPkt rsp;
+            uint32_t id;
+            bool used;
+            bool valid;
+} RemotePortRespSlot;
+
 struct RemotePort {
     DeviceState parent;
 
@@ -93,6 +100,13 @@ struct RemotePort {
      */
     RemotePortDynPkt rsp;
 
+    /*
+     * rspqueue holds received responses from the remote side.
+     * Only one for the moment but it might grow.
+     * Used by the master.
+     */
+    RemotePortDynPkt rspqueue;
+
     const char *prefix;
     const char *remote_prefix;
 
@@ -100,9 +114,24 @@ struct RemotePort {
     bool reset_done;
 
 #define REMOTE_PORT_MAX_DEVS 1024
+#define RP_MAX_OUTSTANDING_TRANSACTIONS 32
+    struct {
+        RemotePortRespSlot rsp_queue[RP_MAX_OUTSTANDING_TRANSACTIONS];
+    } dev_state[REMOTE_PORT_MAX_DEVS];
+
     RemotePortDevice *devs[REMOTE_PORT_MAX_DEVS];
 };
 
+
+void rp_rsp_mutex_lock(RemotePort *s);
+void rp_rsp_mutex_unlock(RemotePort *s);
+
+RemotePortDynPkt rp_wait_resp(RemotePort *s);
+
+RemotePortRespSlot *rp_dev_wait_resp(RemotePort *s, uint32_t dev, uint32_t id);
+RemotePortRespSlot *rp_dev_timed_wait_resp(RemotePort *s, uint32_t dev,
+                                           uint32_t id, int timems);
+
 void rp_process(RemotePort *s);
 
 ssize_t rp_write(RemotePort *s, const void *buf, size_t count);
-- 
2.43.0