From: Ruslan Ruslichenko <Ruslan_Ruslichenko@epam.com>
Introduce main execution loop for the Remote Port protocol.
Creates a dedicated thread to manage the communication lifecycle.
This includes logic to read packets and manage RX queue.
Patch also implements handshake logic to verify protocol
versions and negotiate capabilities with remote peer.
Signed-off-by: Edgar E. Iglesias <edgar.iglesias@amd.com>
Signed-off-by: Takahiro Nakata <takahiro.nakata.wr@renesas.com>
Signed-off-by: Ruslan Ruslichenko <Ruslan_Ruslichenko@epam.com>
---
hw/core/remote-port.c | 190 ++++++++++++++++++++++++++++++++++
include/hw/core/remote-port.h | 22 ++++
2 files changed, 212 insertions(+)
diff --git a/hw/core/remote-port.c b/hw/core/remote-port.c
index 5154c1bc2a..91b0682884 100644
--- a/hw/core/remote-port.c
+++ b/hw/core/remote-port.c
@@ -52,6 +52,88 @@
#define REMOTE_PORT_CLASS(klass) \
OBJECT_CLASS_CHECK(RemotePortClass, (klass), TYPE_REMOTE_PORT)
+static void rp_pkt_dump(const char *prefix, const char *buf, size_t len)
+{
+ qemu_hexdump(stdout, prefix, buf, len);
+}
+
+static void rp_fatal_error(RemotePort *s, const char *reason)
+{
+ error_report("%s: %s", s->prefix, reason);
+ exit(EXIT_FAILURE);
+}
+
+static ssize_t rp_recv(RemotePort *s, void *buf, size_t count)
+{
+ ssize_t r;
+
+ r = qemu_chr_fe_read_all(&s->chr, buf, count);
+ if (r <= 0) {
+ return r;
+ }
+ if (r != count) {
+ error_report("%s: Bad read, expected %zd but got %zd",
+ s->prefix, count, r);
+ rp_fatal_error(s, "Bad read");
+ }
+
+ return r;
+}
+
+ssize_t rp_write(RemotePort *s, const void *buf, size_t count)
+{
+ ssize_t r;
+
+ qemu_mutex_lock(&s->write_mutex);
+ r = qemu_chr_fe_write_all(&s->chr, buf, count);
+ qemu_mutex_unlock(&s->write_mutex);
+ assert(r == count);
+ if (r <= 0) {
+ error_report("%s: Disconnected r=%zd buf=%p count=%zd",
+ s->prefix, r, buf, count);
+ rp_fatal_error(s, "Bad write");
+ }
+ return r;
+}
+
+static void rp_cmd_hello(RemotePort *s, struct rp_pkt *pkt)
+{
+ s->peer.version = pkt->hello.version;
+ if (pkt->hello.version.major != RP_VERSION_MAJOR) {
+ error_report("remote-port version missmatch remote=%d.%d local=%d.%d",
+ pkt->hello.version.major, pkt->hello.version.minor,
+ RP_VERSION_MAJOR, RP_VERSION_MINOR);
+ rp_fatal_error(s, "Bad version");
+ }
+
+ if (pkt->hello.caps.len) {
+ void *caps = (char *) pkt + pkt->hello.caps.offset;
+
+ rp_process_caps(&s->peer, caps, pkt->hello.caps.len);
+ }
+}
+
+static void rp_say_hello(RemotePort *s)
+{
+ struct rp_pkt_hello pkt;
+ uint32_t caps[] = {
+ CAP_BUSACCESS_EXT_BASE,
+ CAP_BUSACCESS_EXT_BYTE_EN,
+ CAP_WIRE_POSTED_UPDATES,
+ CAP_ATS,
+ };
+ size_t len;
+
+ len = rp_encode_hello_caps(s->current_id++, 0, &pkt, RP_VERSION_MAJOR,
+ RP_VERSION_MINOR,
+ caps, caps, sizeof caps / sizeof caps[0]);
+ rp_write(s, (void *) &pkt, len);
+
+ if (sizeof caps) {
+ rp_write(s, caps, sizeof caps);
+ }
+}
+
static char *rp_sanitize_prefix(RemotePort *s)
{
char *sanitized_name;
@@ -105,6 +187,108 @@ static Chardev *rp_autocreate_chardev(RemotePort *s, char *name)
return chr;
}
+static bool rp_pt_process_pkt(RemotePort *s, RemotePortDynPkt *dpkt)
+{
+ struct rp_pkt *pkt = dpkt->pkt;
+
+ D(qemu_log("%s: cmd=%x id=%d dev=%d\n", __func__, pkt->hdr.cmd,
+ pkt->hdr.id, pkt->hdr.dev));
+
+ if (pkt->hdr.dev >= ARRAY_SIZE(s->devs)) {
+ /* FIXME: Respond with an error. */
+ return true;
+ }
+
+ switch (pkt->hdr.cmd) {
+ case RP_CMD_hello:
+ rp_cmd_hello(s, pkt);
+ break;
+ case RP_CMD_read:
+ case RP_CMD_write:
+ case RP_CMD_interrupt:
+ case RP_CMD_ats_req:
+ case RP_CMD_ats_inv:
+ /* TBD */;
+ break;
+ default:
+ g_assert_not_reached();
+ break;
+ }
+ return false;
+}
+
+static int rp_read_pkt(RemotePort *s, RemotePortDynPkt *dpkt)
+{
+ struct rp_pkt *pkt = dpkt->pkt;
+ int used;
+ int r;
+
+ r = rp_recv(s, pkt, sizeof pkt->hdr);
+ if (r <= 0) {
+ return r;
+ }
+ used = rp_decode_hdr((void *) &pkt->hdr);
+ assert(used == sizeof pkt->hdr);
+
+ if (pkt->hdr.len) {
+ rp_dpkt_alloc(dpkt, sizeof pkt->hdr + pkt->hdr.len);
+ /* pkt may move due to realloc. */
+ pkt = dpkt->pkt;
+ r = rp_recv(s, &pkt->hdr + 1, pkt->hdr.len);
+ if (r <= 0) {
+ return r;
+ }
+ rp_decode_payload(pkt);
+ }
+
+ return used + r;
+}
+
+static void *rp_protocol_thread(void *arg)
+{
+ RemotePort *s = REMOTE_PORT(arg);
+ unsigned int i;
+ int r;
+
+ /* Make sure we have a decent bufsize to start with. */
+ rp_dpkt_alloc(&s->rsp, sizeof s->rsp.pkt->busaccess + 1024);
+ for (i = 0; i < ARRAY_SIZE(s->rx_queue.pkt); i++) {
+ rp_dpkt_alloc(&s->rx_queue.pkt[i],
+ sizeof s->rx_queue.pkt[i].pkt->busaccess + 1024);
+ s->rx_queue.inuse[i] = false;
+ }
+
+ rp_say_hello(s);
+
+ while (1) {
+ RemotePortDynPkt *dpkt;
+ unsigned int wpos = s->rx_queue.wpos;
+ bool handled;
+
+ dpkt = &s->rx_queue.pkt[wpos];
+ s->rx_queue.inuse[wpos] = true;
+
+ r = rp_read_pkt(s, dpkt);
+ if (r <= 0) {
+ /* Disconnected. */
+ break;
+ }
+ if (0) {
+ rp_pkt_dump("rport-pkt", (void *) dpkt->pkt,
+ sizeof dpkt->pkt->hdr + dpkt->pkt->hdr.len);
+ }
+ handled = rp_pt_process_pkt(s, dpkt);
+ if (handled) {
+ s->rx_queue.inuse[wpos] = false;
+ }
+ }
+
+ if (!s->finalizing) {
+ rp_fatal_error(s, "Disconnected");
+ }
+ return NULL;
+}
+
static void rp_reset(DeviceState *dev)
{
RemotePort *s = REMOTE_PORT(dev);
@@ -113,6 +297,9 @@ static void rp_reset(DeviceState *dev)
return;
}
+ qemu_thread_create(&s->thread, "remote-port", rp_protocol_thread, s,
+ QEMU_THREAD_JOINABLE);
+
s->reset_done = true;
}
@@ -124,6 +311,8 @@ static void rp_realize(DeviceState *dev, Error **errp)
s->prefix = object_get_canonical_path(OBJECT(dev));
+ qemu_mutex_init(&s->write_mutex);
+
if (!qemu_chr_fe_get_driver(&s->chr)) {
char *name;
Chardev *chr = NULL;
@@ -249,6 +438,7 @@ static void rp_unrealize(DeviceState *dev)
info_report("%s: Wait for remote-port to disconnect", s->prefix);
qemu_chr_fe_disconnect(&s->chr);
+ qemu_thread_join(&s->thread);
close(s->event.pipe.read);
close(s->event.pipe.write);
diff --git a/include/hw/core/remote-port.h b/include/hw/core/remote-port.h
index 0f40018cdb..b88e523894 100644
--- a/include/hw/core/remote-port.h
+++ b/include/hw/core/remote-port.h
@@ -56,6 +56,7 @@ typedef struct RemotePortDeviceClass {
struct RemotePort {
DeviceState parent;
+ QemuThread thread;
union {
int pipes[2];
struct {
@@ -66,9 +67,28 @@ struct RemotePort {
Chardev *chrdev;
CharFrontend chr;
bool finalizing;
+ /* To serialize writes to fd. */
+ QemuMutex write_mutex;
char *chardesc;
char *chrdev_id;
+ struct rp_peer_state peer;
+
+#define RX_QUEUE_SIZE 1024
+ struct {
+ /* This array must be sized minimum 2 and always a power of 2. */
+ RemotePortDynPkt pkt[RX_QUEUE_SIZE];
+ bool inuse[RX_QUEUE_SIZE];
+ QemuSemaphore sem;
+ unsigned int wpos;
+ unsigned int rpos;
+ } rx_queue;
+
+ /*
+ * rsp holds responses for the remote side.
+ * Used by the slave.
+ */
+ RemotePortDynPkt rsp;
const char *prefix;
const char *remote_prefix;
@@ -80,4 +100,6 @@ struct RemotePort {
RemotePortDevice *devs[REMOTE_PORT_MAX_DEVS];
};
+ssize_t rp_write(RemotePort *s, const void *buf, size_t count);
+
#endif
--
2.43.0
On Thu, Feb 05, 2026 at 08:58:04PM +0100, Ruslan Ruslichenko wrote:
> From: Ruslan Ruslichenko <Ruslan_Ruslichenko@epam.com>
>
> Introduce main execution loop for the Remote Port protocol.
> Creates a dedicated thread to manage the communication lifecycle.
> This includes logic to read packets and manage RX queue.
>
> Patch also implements handshake logic to verify protocol
> versions and negotiate capabilities with remote peer.
>
> Signed-off-by: Edgar E. Iglesias <edgar.iglesias@amd.com>
> Signed-off-by: Takahiro Nakata <takahiro.nakata.wr@renesas.com>
> Signed-off-by: Ruslan Ruslichenko <Ruslan_Ruslichenko@epam.com>
> ---
> hw/core/remote-port.c | 190 ++++++++++++++++++++++++++++++++++
> include/hw/core/remote-port.h | 22 ++++
> 2 files changed, 212 insertions(+)
>
> diff --git a/hw/core/remote-port.c b/hw/core/remote-port.c
> index 5154c1bc2a..91b0682884 100644
> --- a/hw/core/remote-port.c
> +++ b/hw/core/remote-port.c
> @@ -52,6 +52,88 @@
> #define REMOTE_PORT_CLASS(klass) \
> OBJECT_CLASS_CHECK(RemotePortClass, (klass), TYPE_REMOTE_PORT)
>
> +static void rp_pkt_dump(const char *prefix, const char *buf, size_t len)
> +{
> + qemu_hexdump(stdout, prefix, buf, len);
> +}
> +
> +static void rp_fatal_error(RemotePort *s, const char *reason)
> +{
> + error_report("%s: %s", s->prefix, reason);
> + exit(EXIT_FAILURE);
> +}
> +
> +static ssize_t rp_recv(RemotePort *s, void *buf, size_t count)
> +{
> + ssize_t r;
> +
> + r = qemu_chr_fe_read_all(&s->chr, buf, count);
> + if (r <= 0) {
> + return r;
> + }
> + if (r != count) {
> + error_report("%s: Bad read, expected %zd but got %zd",
> + s->prefix, count, r);
> + rp_fatal_error(s, "Bad read");
> + }
> +
> + return r;
> +}
> +
> +ssize_t rp_write(RemotePort *s, const void *buf, size_t count)
> +{
> + ssize_t r;
> +
> + qemu_mutex_lock(&s->write_mutex);
> + r = qemu_chr_fe_write_all(&s->chr, buf, count);
> + qemu_mutex_unlock(&s->write_mutex);
> + assert(r == count);
> + if (r <= 0) {
> + error_report("%s: Disconnected r=%zd buf=%p count=%zd",
> + s->prefix, r, buf, count);
> + rp_fatal_error(s, "Bad write");
> + }
> + return r;
> +}
> +
> +static void rp_cmd_hello(RemotePort *s, struct rp_pkt *pkt)
> +{
> + s->peer.version = pkt->hello.version;
> + if (pkt->hello.version.major != RP_VERSION_MAJOR) {
> + error_report("remote-port version missmatch remote=%d.%d local=%d.%d",
> + pkt->hello.version.major, pkt->hello.version.minor,
> + RP_VERSION_MAJOR, RP_VERSION_MINOR);
> + rp_fatal_error(s, "Bad version");
> + }
> +
> + if (pkt->hello.caps.len) {
> + void *caps = (char *) pkt + pkt->hello.caps.offset;
> +
> + rp_process_caps(&s->peer, caps, pkt->hello.caps.len);
> + }
> +}
> +
> +static void rp_say_hello(RemotePort *s)
> +{
> + struct rp_pkt_hello pkt;
> + uint32_t caps[] = {
> + CAP_BUSACCESS_EXT_BASE,
> + CAP_BUSACCESS_EXT_BYTE_EN,
> + CAP_WIRE_POSTED_UPDATES,
> + CAP_ATS,
> + };
> + size_t len;
> +
> + len = rp_encode_hello_caps(s->current_id++, 0, &pkt, RP_VERSION_MAJOR,
> + RP_VERSION_MINOR,
> + caps, caps, sizeof caps / sizeof caps[0]);
> + rp_write(s, (void *) &pkt, len);
> +
> + if (sizeof caps) {
> + rp_write(s, caps, sizeof caps);
> + }
> +}
> +
> static char *rp_sanitize_prefix(RemotePort *s)
> {
> char *sanitized_name;
> @@ -105,6 +187,108 @@ static Chardev *rp_autocreate_chardev(RemotePort *s, char *name)
> return chr;
> }
>
> +static bool rp_pt_process_pkt(RemotePort *s, RemotePortDynPkt *dpkt)
> +{
> + struct rp_pkt *pkt = dpkt->pkt;
> +
> + D(qemu_log("%s: cmd=%x id=%d dev=%d\n", __func__, pkt->hdr.cmd,
> + pkt->hdr.id, pkt->hdr.dev));
This kind of thing feels better suited to being implement with
QEMU tracepoints.
> +
> + if (pkt->hdr.dev >= ARRAY_SIZE(s->devs)) {
> + /* FIXME: Respond with an error. */
> + return true;
> + }
> +
> + switch (pkt->hdr.cmd) {
> + case RP_CMD_hello:
> + rp_cmd_hello(s, pkt);
> + break;
> + case RP_CMD_read:
> + case RP_CMD_write:
> + case RP_CMD_interrupt:
> + case RP_CMD_ats_req:
> + case RP_CMD_ats_inv:
> + /* TBD */;
> + break;
> + default:
> + g_assert_not_reached();
> + break;
> + }
> + return false;
> +}
With regards,
Daniel
--
|: https://berrange.com -o- https://www.flickr.com/photos/dberrange :|
|: https://libvirt.org -o- https://fstop138.berrange.com :|
|: https://entangle-photo.org -o- https://www.instagram.com/dberrange :|
On Thu, Feb 5, 2026 at 9:30 PM Daniel P. Berrangé <berrange@redhat.com> wrote:
>
> On Thu, Feb 05, 2026 at 08:58:04PM +0100, Ruslan Ruslichenko wrote:
> > From: Ruslan Ruslichenko <Ruslan_Ruslichenko@epam.com>
> >
> > Introduce main execution loop for the Remote Port protocol.
> > Creates a dedicated thread to manage the communication lifecycle.
> > This includes logic to read packets and manage RX queue.
> >
> > Patch also implements handshake logic to verify protocol
> > versions and negotiate capabilities with remote peer.
> >
> > Signed-off-by: Edgar E. Iglesias <edgar.iglesias@amd.com>
> > Signed-off-by: Takahiro Nakata <takahiro.nakata.wr@renesas.com>
> > Signed-off-by: Ruslan Ruslichenko <Ruslan_Ruslichenko@epam.com>
> > ---
> > hw/core/remote-port.c | 190 ++++++++++++++++++++++++++++++++++
> > include/hw/core/remote-port.h | 22 ++++
> > 2 files changed, 212 insertions(+)
> >
> > diff --git a/hw/core/remote-port.c b/hw/core/remote-port.c
> > index 5154c1bc2a..91b0682884 100644
> > --- a/hw/core/remote-port.c
> > +++ b/hw/core/remote-port.c
> > @@ -52,6 +52,88 @@
> > #define REMOTE_PORT_CLASS(klass) \
> > OBJECT_CLASS_CHECK(RemotePortClass, (klass), TYPE_REMOTE_PORT)
> >
> > +static void rp_pkt_dump(const char *prefix, const char *buf, size_t len)
> > +{
> > + qemu_hexdump(stdout, prefix, buf, len);
> > +}
> > +
> > +static void rp_fatal_error(RemotePort *s, const char *reason)
> > +{
> > + error_report("%s: %s", s->prefix, reason);
> > + exit(EXIT_FAILURE);
> > +}
> > +
> > +static ssize_t rp_recv(RemotePort *s, void *buf, size_t count)
> > +{
> > + ssize_t r;
> > +
> > + r = qemu_chr_fe_read_all(&s->chr, buf, count);
> > + if (r <= 0) {
> > + return r;
> > + }
> > + if (r != count) {
> > + error_report("%s: Bad read, expected %zd but got %zd",
> > + s->prefix, count, r);
> > + rp_fatal_error(s, "Bad read");
> > + }
> > +
> > + return r;
> > +}
> > +
> > +ssize_t rp_write(RemotePort *s, const void *buf, size_t count)
> > +{
> > + ssize_t r;
> > +
> > + qemu_mutex_lock(&s->write_mutex);
> > + r = qemu_chr_fe_write_all(&s->chr, buf, count);
> > + qemu_mutex_unlock(&s->write_mutex);
> > + assert(r == count);
> > + if (r <= 0) {
> > + error_report("%s: Disconnected r=%zd buf=%p count=%zd",
> > + s->prefix, r, buf, count);
> > + rp_fatal_error(s, "Bad write");
> > + }
> > + return r;
> > +}
> > +
> > +static void rp_cmd_hello(RemotePort *s, struct rp_pkt *pkt)
> > +{
> > + s->peer.version = pkt->hello.version;
> > + if (pkt->hello.version.major != RP_VERSION_MAJOR) {
> > + error_report("remote-port version missmatch remote=%d.%d local=%d.%d",
> > + pkt->hello.version.major, pkt->hello.version.minor,
> > + RP_VERSION_MAJOR, RP_VERSION_MINOR);
> > + rp_fatal_error(s, "Bad version");
> > + }
> > +
> > + if (pkt->hello.caps.len) {
> > + void *caps = (char *) pkt + pkt->hello.caps.offset;
> > +
> > + rp_process_caps(&s->peer, caps, pkt->hello.caps.len);
> > + }
> > +}
> > +
> > +static void rp_say_hello(RemotePort *s)
> > +{
> > + struct rp_pkt_hello pkt;
> > + uint32_t caps[] = {
> > + CAP_BUSACCESS_EXT_BASE,
> > + CAP_BUSACCESS_EXT_BYTE_EN,
> > + CAP_WIRE_POSTED_UPDATES,
> > + CAP_ATS,
> > + };
> > + size_t len;
> > +
> > + len = rp_encode_hello_caps(s->current_id++, 0, &pkt, RP_VERSION_MAJOR,
> > + RP_VERSION_MINOR,
> > + caps, caps, sizeof caps / sizeof caps[0]);
> > + rp_write(s, (void *) &pkt, len);
> > +
> > + if (sizeof caps) {
> > + rp_write(s, caps, sizeof caps);
> > + }
> > +}
> > +
> > static char *rp_sanitize_prefix(RemotePort *s)
> > {
> > char *sanitized_name;
> > @@ -105,6 +187,108 @@ static Chardev *rp_autocreate_chardev(RemotePort *s, char *name)
> > return chr;
> > }
> >
> > +static bool rp_pt_process_pkt(RemotePort *s, RemotePortDynPkt *dpkt)
> > +{
> > + struct rp_pkt *pkt = dpkt->pkt;
> > +
> > + D(qemu_log("%s: cmd=%x id=%d dev=%d\n", __func__, pkt->hdr.cmd,
> > + pkt->hdr.id, pkt->hdr.dev));
>
> This kind of thing feels better suited to being implement with
> QEMU tracepoints.
>
Agreed. I will replace the manual debug logging with standard QEMU
tracepoints in v2.
> > +
> > + if (pkt->hdr.dev >= ARRAY_SIZE(s->devs)) {
> > + /* FIXME: Respond with an error. */
> > + return true;
> > + }
> > +
> > + switch (pkt->hdr.cmd) {
> > + case RP_CMD_hello:
> > + rp_cmd_hello(s, pkt);
> > + break;
> > + case RP_CMD_read:
> > + case RP_CMD_write:
> > + case RP_CMD_interrupt:
> > + case RP_CMD_ats_req:
> > + case RP_CMD_ats_inv:
> > + /* TBD */;
> > + break;
> > + default:
> > + g_assert_not_reached();
> > + break;
> > + }
> > + return false;
> > +}
>
> With regards,
> Daniel
> --
> |: https://berrange.com -o- https://www.flickr.com/photos/dberrange :|
> |: https://libvirt.org -o- https://fstop138.berrange.com :|
> |: https://entangle-photo.org -o- https://www.instagram.com/dberrange :|
>
--
BR,
Ruslan
© 2016 - 2026 Red Hat, Inc.