[PATCH 10/29] hw/core: Implement Remote Port packet dispatch logic

Ruslan Ruslichenko posted 29 patches 1 day, 11 hours ago
[PATCH 10/29] hw/core: Implement Remote Port packet dispatch logic
Posted by Ruslan Ruslichenko 1 day, 11 hours ago
From: Ruslan Ruslichenko <Ruslan_Ruslichenko@epam.com>

Implement the mechanism to transfer packets from the dedicated
protocol thread to the main QEMU execution loop for processing.

The patch adds the following features:
- signaling logic using internal pipe to wake up the main loop
- the rp_process handler, which retrieves packets from queue and
dispatches them to the target Remote Port device.

This enables QEMU device models to handle remote events.

Signed-off-by: Edgar E. Iglesias <edgar.iglesias@amd.com>
Signed-off-by: Takahiro Nakata <takahiro.nakata.wr@renesas.com>
Signed-off-by: Ruslan Ruslichenko <Ruslan_Ruslichenko@epam.com>
---
 hw/core/remote-port.c         | 148 +++++++++++++++++++++++++++++++++-
 include/hw/core/remote-port.h |   5 ++
 2 files changed, 152 insertions(+), 1 deletion(-)

diff --git a/hw/core/remote-port.c b/hw/core/remote-port.c
index 91b0682884..e44d9249c3 100644
--- a/hw/core/remote-port.c
+++ b/hw/core/remote-port.c
@@ -52,6 +52,8 @@
 #define REMOTE_PORT_CLASS(klass)    \
      OBJECT_CLASS_CHECK(RemotePortClass, (klass), TYPE_REMOTE_PORT)
 
+static void rp_event_read(void *opaque);
+
 static void rp_pkt_dump(const char *prefix, const char *buf, size_t len)
 {
     qemu_hexdump(stdout, prefix, buf, len);
@@ -96,6 +98,12 @@ ssize_t rp_write(RemotePort *s, const void *buf, size_t count)
     return r;
 }
 
+static unsigned int rp_has_work(RemotePort *s)
+{
+    unsigned int work = s->rx_queue.wpos - s->rx_queue.rpos;
+    return work;
+}
+
 static void rp_cmd_hello(RemotePort *s, struct rp_pkt *pkt)
 {
     s->peer.version = pkt->hello.version;
@@ -187,6 +195,135 @@ static Chardev *rp_autocreate_chardev(RemotePort *s, char *name)
     return chr;
 }
 
+void rp_process(RemotePort *s)
+{
+    while (true) {
+        struct rp_pkt *pkt;
+        unsigned int rpos;
+        bool actioned = false;
+        RemotePortDevice *dev;
+        RemotePortDeviceClass *rpdc;
+
+        qemu_mutex_lock(&s->rsp_mutex);
+        if (!rp_has_work(s)) {
+            qemu_mutex_unlock(&s->rsp_mutex);
+            break;
+        }
+        rpos = s->rx_queue.rpos;
+
+        pkt = s->rx_queue.pkt[rpos].pkt;
+        D(qemu_log("%s: io-thread rpos=%d wpos=%d cmd=%d dev=%d\n",
+                 s->prefix, s->rx_queue.rpos, s->rx_queue.wpos,
+                 pkt->hdr.cmd, pkt->hdr.dev));
+
+        /*
+         * To handle recursiveness, we need to advance the index
+         * index before processing the packet.
+         */
+        s->rx_queue.rpos++;
+        s->rx_queue.rpos %= ARRAY_SIZE(s->rx_queue.pkt);
+        qemu_mutex_unlock(&s->rsp_mutex);
+
+        dev = s->devs[pkt->hdr.dev];
+        if (dev) {
+            rpdc = REMOTE_PORT_DEVICE_GET_CLASS(dev);
+            if (rpdc->ops[pkt->hdr.cmd]) {
+                rpdc->ops[pkt->hdr.cmd](dev, pkt);
+                actioned = true;
+            }
+        }
+
+        switch (pkt->hdr.cmd) {
+        /* TBD */
+        default:
+            assert(actioned);
+        }
+
+        s->rx_queue.inuse[rpos] = false;
+        qemu_sem_post(&s->rx_queue.sem);
+    }
+}
+
+static void rp_event_read(void *opaque)
+{
+    RemotePort *s = REMOTE_PORT(opaque);
+    unsigned char buf[32];
+    ssize_t r;
+
+    /* We don't care about the data. Just read it out to clear the event.  */
+    do {
+#ifdef _WIN32
+        r = qemu_recv_wrap(s->event.pipe.read, buf, sizeof buf, 0);
+#else
+        r = read(s->event.pipe.read, buf, sizeof buf);
+#endif
+        if (r == 0) {
+            return;
+        }
+    } while (r == sizeof buf || (r < 0 && errno == EINTR));
+
+    rp_process(s);
+}
+
+static void rp_event_notify(RemotePort *s)
+{
+    unsigned char d = 0;
+    ssize_t r;
+
+#ifdef _WIN32
+    /* Mingw is sensitive about doing write's to socket descriptors.  */
+    r = qemu_send_wrap(s->event.pipe.write, &d, sizeof d, 0);
+#else
+    r = qemu_write_full(s->event.pipe.write, &d, sizeof d);
+#endif
+    if (r == 0) {
+        hw_error("%s: pipe closed\n", s->prefix);
+    }
+}
+
+/* Handover a pkt to CPU or IO-thread context.  */
+static void rp_pt_handover_pkt(RemotePort *s, RemotePortDynPkt *dpkt)
+{
+    bool full;
+
+    /*
+     * Take the rsp lock around the wpos update, otherwise
+     * rp_wait_resp will race with us.
+     */
+    qemu_mutex_lock(&s->rsp_mutex);
+    s->rx_queue.wpos++;
+    s->rx_queue.wpos %= ARRAY_SIZE(s->rx_queue.pkt);
+    /*
+     * Ensure rx_queue index update is visible to consumer
+     * before signaling event, to prevent lost wakeup
+     */
+    smp_mb();
+    rp_event_notify(s);
+    qemu_cond_signal(&s->progress_cond);
+    qemu_mutex_unlock(&s->rsp_mutex);
+
+    do {
+        full = s->rx_queue.inuse[s->rx_queue.wpos];
+        if (full) {
+            qemu_log("%s: FULL rx queue %d\n", __func__, s->rx_queue.wpos);
+        if (qemu_sem_timedwait(&s->rx_queue.sem, 2 * 1000) != 0) {
+#ifndef _WIN32
+                int sval;
+
+#ifndef CONFIG_SEM_TIMEDWAIT
+                sval = s->rx_queue.sem.count;
+#else
+                sem_getvalue(&s->rx_queue.sem.sem, &sval);
+#endif
+                qemu_log("semwait: %d rpos=%u wpos=%u\n", sval,
+                         s->rx_queue.rpos, s->rx_queue.wpos);
+#endif
+                qemu_log("Deadlock?\n");
+        }
+        }
+    } while (full);
+}
+
 static bool rp_pt_process_pkt(RemotePort *s, RemotePortDynPkt *dpkt)
 {
     struct rp_pkt *pkt = dpkt->pkt;
@@ -208,7 +345,7 @@ static bool rp_pt_process_pkt(RemotePort *s, RemotePortDynPkt *dpkt)
     case RP_CMD_interrupt:
     case RP_CMD_ats_req:
     case RP_CMD_ats_inv:
-        /* TBD */;
+        rp_pt_handover_pkt(s, dpkt);
         break;
     default:
         g_assert_not_reached();
@@ -312,6 +449,8 @@ static void rp_realize(DeviceState *dev, Error **errp)
     s->prefix = object_get_canonical_path(OBJECT(dev));
 
     qemu_mutex_init(&s->write_mutex);
+    qemu_mutex_init(&s->rsp_mutex);
+    qemu_cond_init(&s->progress_cond);
 
     if (!qemu_chr_fe_get_driver(&s->chr)) {
         char *name;
@@ -413,6 +552,7 @@ static void rp_realize(DeviceState *dev, Error **errp)
                         s->prefix);
             exit(EXIT_FAILURE);
         }
+        qemu_set_fd_handler(s->event.pipe.read, rp_event_read, NULL, s);
     }
 #else
     if (!g_unix_open_pipe(s->event.pipes, FD_CLOEXEC, NULL)) {
@@ -427,7 +567,10 @@ static void rp_realize(DeviceState *dev, Error **errp)
         exit(EXIT_FAILURE);
     }
 
+    qemu_set_fd_handler(s->event.pipe.read, rp_event_read, NULL, s);
 #endif
+
+    qemu_sem_init(&s->rx_queue.sem, ARRAY_SIZE(s->rx_queue.pkt) - 1);
 }
 
 static void rp_unrealize(DeviceState *dev)
@@ -436,6 +579,9 @@ static void rp_unrealize(DeviceState *dev)
 
     s->finalizing = true;
 
+    /* Unregister handler.  */
+    qemu_set_fd_handler(s->event.pipe.read, NULL, NULL, s);
+
     info_report("%s: Wait for remote-port to disconnect", s->prefix);
     qemu_chr_fe_disconnect(&s->chr);
     qemu_thread_join(&s->thread);
diff --git a/include/hw/core/remote-port.h b/include/hw/core/remote-port.h
index b88e523894..21dfbe89cd 100644
--- a/include/hw/core/remote-port.h
+++ b/include/hw/core/remote-port.h
@@ -74,6 +74,9 @@ struct RemotePort {
     char *chrdev_id;
     struct rp_peer_state peer;
 
+    QemuMutex rsp_mutex;
+    QemuCond progress_cond;
+
 #define RX_QUEUE_SIZE 1024
     struct {
         /* This array must be sized minimum 2 and always a power of 2.  */
@@ -100,6 +103,8 @@ struct RemotePort {
     RemotePortDevice *devs[REMOTE_PORT_MAX_DEVS];
 };
 
+void rp_process(RemotePort *s);
+
 ssize_t rp_write(RemotePort *s, const void *buf, size_t count);
 
 #endif
-- 
2.43.0
Re: [PATCH 10/29] hw/core: Implement Remote Port packet dispatch logic
Posted by Daniel P. Berrangé 1 day, 10 hours ago
On Thu, Feb 05, 2026 at 08:58:05PM +0100, Ruslan Ruslichenko wrote:
> From: Ruslan Ruslichenko <Ruslan_Ruslichenko@epam.com>
> 
> Implement the mechanism to transfer packets from the dedicated
> protocol thread to the main QEMU execution loop for processing.
> 
> The patch adds the following features:
> - signaling logic using internal pipe to wake up the main loop
> - the rp_process handler, which retrieves packets from queue and
> dispatches them to the target Remote Port device.
> 
> This enables QEMU device models to handle remote events.
> 
> Signed-off-by: Edgar E. Iglesias <edgar.iglesias@amd.com>
> Signed-off-by: Takahiro Nakata <takahiro.nakata.wr@renesas.com>
> Signed-off-by: Ruslan Ruslichenko <Ruslan_Ruslichenko@epam.com>
> ---
>  hw/core/remote-port.c         | 148 +++++++++++++++++++++++++++++++++-
>  include/hw/core/remote-port.h |   5 ++
>  2 files changed, 152 insertions(+), 1 deletion(-)
> 
> diff --git a/hw/core/remote-port.c b/hw/core/remote-port.c
> index 91b0682884..e44d9249c3 100644
> --- a/hw/core/remote-port.c
> +++ b/hw/core/remote-port.c
> @@ -52,6 +52,8 @@
>  #define REMOTE_PORT_CLASS(klass)    \
>       OBJECT_CLASS_CHECK(RemotePortClass, (klass), TYPE_REMOTE_PORT)
>  
> +static void rp_event_read(void *opaque);
> +
>  static void rp_pkt_dump(const char *prefix, const char *buf, size_t len)
>  {
>      qemu_hexdump(stdout, prefix, buf, len);
> @@ -96,6 +98,12 @@ ssize_t rp_write(RemotePort *s, const void *buf, size_t count)
>      return r;
>  }
>  
> +static unsigned int rp_has_work(RemotePort *s)
> +{
> +    unsigned int work = s->rx_queue.wpos - s->rx_queue.rpos;
> +    return work;
> +}
> +
>  static void rp_cmd_hello(RemotePort *s, struct rp_pkt *pkt)
>  {
>      s->peer.version = pkt->hello.version;
> @@ -187,6 +195,135 @@ static Chardev *rp_autocreate_chardev(RemotePort *s, char *name)
>      return chr;
>  }
>  
> +void rp_process(RemotePort *s)
> +{
> +    while (true) {
> +        struct rp_pkt *pkt;
> +        unsigned int rpos;
> +        bool actioned = false;
> +        RemotePortDevice *dev;
> +        RemotePortDeviceClass *rpdc;
> +
> +        qemu_mutex_lock(&s->rsp_mutex);
> +        if (!rp_has_work(s)) {
> +            qemu_mutex_unlock(&s->rsp_mutex);
> +            break;
> +        }
> +        rpos = s->rx_queue.rpos;
> +
> +        pkt = s->rx_queue.pkt[rpos].pkt;
> +        D(qemu_log("%s: io-thread rpos=%d wpos=%d cmd=%d dev=%d\n",
> +                 s->prefix, s->rx_queue.rpos, s->rx_queue.wpos,
> +                 pkt->hdr.cmd, pkt->hdr.dev));

Same point as last patch, that emitted structured data in
tracepoints is preferrable to merely logging. Consider this
to apply to any other qemu_log call, so I won't repeat it
every time.

> +
> +        /*
> +         * To handle recursiveness, we need to advance the index
> +         * index before processing the packet.
> +         */
> +        s->rx_queue.rpos++;
> +        s->rx_queue.rpos %= ARRAY_SIZE(s->rx_queue.pkt);
> +        qemu_mutex_unlock(&s->rsp_mutex);
> +
> +        dev = s->devs[pkt->hdr.dev];
> +        if (dev) {
> +            rpdc = REMOTE_PORT_DEVICE_GET_CLASS(dev);
> +            if (rpdc->ops[pkt->hdr.cmd]) {
> +                rpdc->ops[pkt->hdr.cmd](dev, pkt);
> +                actioned = true;
> +            }
> +        }
> +
> +        switch (pkt->hdr.cmd) {
> +        /* TBD */
> +        default:
> +            assert(actioned);
> +        }
> +
> +        s->rx_queue.inuse[rpos] = false;
> +        qemu_sem_post(&s->rx_queue.sem);
> +    }
> +}
> +
> +static void rp_event_read(void *opaque)
> +{
> +    RemotePort *s = REMOTE_PORT(opaque);
> +    unsigned char buf[32];
> +    ssize_t r;
> +
> +    /* We don't care about the data. Just read it out to clear the event.  */
> +    do {
> +#ifdef _WIN32
> +        r = qemu_recv_wrap(s->event.pipe.read, buf, sizeof buf, 0);
> +#else
> +        r = read(s->event.pipe.read, buf, sizeof buf);
> +#endif
> +        if (r == 0) {
> +            return;
> +        }
> +    } while (r == sizeof buf || (r < 0 && errno == EINTR));
> +
> +    rp_process(s);
> +}
> +
> +static void rp_event_notify(RemotePort *s)
> +{
> +    unsigned char d = 0;
> +    ssize_t r;
> +
> +#ifdef _WIN32
> +    /* Mingw is sensitive about doing write's to socket descriptors.  */
> +    r = qemu_send_wrap(s->event.pipe.write, &d, sizeof d, 0);
> +#else
> +    r = qemu_write_full(s->event.pipe.write, &d, sizeof d);
> +#endif
> +    if (r == 0) {
> +        hw_error("%s: pipe closed\n", s->prefix);
> +    }
> +}
> +
> +/* Handover a pkt to CPU or IO-thread context.  */
> +static void rp_pt_handover_pkt(RemotePort *s, RemotePortDynPkt *dpkt)
> +{
> +    bool full;
> +
> +    /*
> +     * Take the rsp lock around the wpos update, otherwise
> +     * rp_wait_resp will race with us.
> +     */
> +    qemu_mutex_lock(&s->rsp_mutex);
> +    s->rx_queue.wpos++;
> +    s->rx_queue.wpos %= ARRAY_SIZE(s->rx_queue.pkt);
> +    /*
> +     * Ensure rx_queue index update is visible to consumer
> +     * before signaling event, to prevent lost wakeup
> +     */
> +    smp_mb();
> +    rp_event_notify(s);
> +    qemu_cond_signal(&s->progress_cond);
> +    qemu_mutex_unlock(&s->rsp_mutex);
> +
> +    do {
> +        full = s->rx_queue.inuse[s->rx_queue.wpos];
> +        if (full) {
> +            qemu_log("%s: FULL rx queue %d\n", __func__, s->rx_queue.wpos);
> +        if (qemu_sem_timedwait(&s->rx_queue.sem, 2 * 1000) != 0) {
> +#ifndef _WIN32
> +                int sval;
> +
> +#ifndef CONFIG_SEM_TIMEDWAIT
> +                sval = s->rx_queue.sem.count;
> +#else
> +                sem_getvalue(&s->rx_queue.sem.sem, &sval);
> +#endif
> +                qemu_log("semwait: %d rpos=%u wpos=%u\n", sval,
> +                         s->rx_queue.rpos, s->rx_queue.wpos);
> +#endif
> +                qemu_log("Deadlock?\n");
> +        }
> +        }
> +    } while (full);
> +}

With regards,
Daniel
-- 
|: https://berrange.com      -o-    https://www.flickr.com/photos/dberrange :|
|: https://libvirt.org         -o-            https://fstop138.berrange.com :|
|: https://entangle-photo.org    -o-    https://www.instagram.com/dberrange :|