[PATCH v2 04/11] rust/qemu-api: Add wrappers to run futures in QEMU

Kevin Wolf posted 11 patches 1 month, 2 weeks ago
[PATCH v2 04/11] rust/qemu-api: Add wrappers to run futures in QEMU
Posted by Kevin Wolf 1 month, 2 weeks ago
This adds helper functions that allow running Rust futures to completion
using QEMU's event loops.

Signed-off-by: Kevin Wolf <kwolf@redhat.com>
---
 include/qemu/coroutine-rust.h | 24 +++++++++++
 rust/wrapper.h                |  1 +
 util/qemu-co-rust-async.c     | 55 +++++++++++++++++++++++++
 rust/qemu-api/meson.build     |  1 +
 rust/qemu-api/src/futures.rs  | 77 +++++++++++++++++++++++++++++++++++
 rust/qemu-api/src/lib.rs      |  1 +
 util/meson.build              |  3 ++
 7 files changed, 162 insertions(+)
 create mode 100644 include/qemu/coroutine-rust.h
 create mode 100644 util/qemu-co-rust-async.c
 create mode 100644 rust/qemu-api/src/futures.rs

diff --git a/include/qemu/coroutine-rust.h b/include/qemu/coroutine-rust.h
new file mode 100644
index 0000000000..0c5cf42a6b
--- /dev/null
+++ b/include/qemu/coroutine-rust.h
@@ -0,0 +1,24 @@
+/* SPDX-License-Identifier: LGPL-2.1-or-later */
+/*
+ * Helpers to run Rust futures using QEMU coroutines
+ *
+ * Copyright Red Hat
+ *
+ * Author:
+ *   Kevin Wolf <kwolf@redhat.com>
+ *
+ * This work is licensed under the terms of the GNU LGPL, version 2.1 or later.
+ * See the COPYING.LIB file in the top-level directory.
+ */
+
+#ifndef QEMU_COROUTINE_RUST_H
+#define QEMU_COROUTINE_RUST_H
+
+typedef struct RustBoxedFuture RustBoxedFuture;
+typedef void coroutine_fn RunFuture(RustBoxedFuture *future, void *opaque);
+
+void no_coroutine_fn rust_run_future(RustBoxedFuture *future,
+                                     RunFuture *entry,
+                                     void *opaque);
+
+#endif
diff --git a/rust/wrapper.h b/rust/wrapper.h
index 303d7bba7f..3dc385e256 100644
--- a/rust/wrapper.h
+++ b/rust/wrapper.h
@@ -58,3 +58,4 @@ typedef enum memory_order {
 #include "block/block_int.h"
 #include "block/qdict.h"
 #include "qapi/qapi-visit-block-core.h"
+#include "qemu/coroutine-rust.h"
diff --git a/util/qemu-co-rust-async.c b/util/qemu-co-rust-async.c
new file mode 100644
index 0000000000..d893dfb7bd
--- /dev/null
+++ b/util/qemu-co-rust-async.c
@@ -0,0 +1,55 @@
+/* SPDX-License-Identifier: LGPL-2.1-or-later */
+/*
+ * Helpers to run Rust futures using QEMU coroutines
+ *
+ * Copyright Red Hat
+ *
+ * Author:
+ *   Kevin Wolf <kwolf@redhat.com>
+ *
+ * This work is licensed under the terms of the GNU LGPL, version 2.1 or later.
+ * See the COPYING.LIB file in the top-level directory.
+ */
+
+#include "qemu/osdep.h"
+
+#include "block/aio-wait.h"
+#include "qemu/coroutine.h"
+#include "qemu/coroutine-rust.h"
+#include "qemu/main-loop.h"
+
+typedef struct FutureCo {
+    RustBoxedFuture *future;
+    RunFuture *entry;
+    void *opaque;
+    bool done;
+} FutureCo;
+
+static void coroutine_fn rust_co_run_future_entry(void *opaque)
+{
+    FutureCo *data = opaque;
+
+    data->entry(data->future, data->opaque);
+    data->done = true;
+    aio_wait_kick();
+}
+
+void no_coroutine_fn rust_run_future(RustBoxedFuture *future,
+                                     RunFuture *entry,
+                                     void *opaque)
+{
+    AioContext *ctx = qemu_get_current_aio_context();
+    Coroutine *co;
+    FutureCo data = {
+        .future = future,
+        .entry = entry,
+        .opaque = opaque,
+        .done = false,
+    };
+
+    GLOBAL_STATE_CODE();
+
+    co = qemu_coroutine_create(rust_co_run_future_entry, &data);
+    aio_co_enter(ctx, co);
+    AIO_WAIT_WHILE(ctx, !data.done);
+}
diff --git a/rust/qemu-api/meson.build b/rust/qemu-api/meson.build
index e0a3052c79..44fd34e193 100644
--- a/rust/qemu-api/meson.build
+++ b/rust/qemu-api/meson.build
@@ -22,6 +22,7 @@ sources_core = [
   'src/chardev.rs',
   'src/c_str.rs',
   'src/errno.rs',
+  'src/futures.rs',
   'src/module.rs',
   'src/offset_of.rs',
   'src/prelude.rs',
diff --git a/rust/qemu-api/src/futures.rs b/rust/qemu-api/src/futures.rs
new file mode 100644
index 0000000000..cd307a1d62
--- /dev/null
+++ b/rust/qemu-api/src/futures.rs
@@ -0,0 +1,77 @@
+use crate::bindings;
+use std::ffi::c_void;
+use std::future::Future;
+use std::mem::MaybeUninit;
+use std::sync::Arc;
+use std::task::{Context, Poll, Wake, Waker};
+
+struct RunFutureWaker {
+    co: *mut bindings::Coroutine,
+}
+unsafe impl Send for RunFutureWaker {}
+unsafe impl Sync for RunFutureWaker {}
+
+impl Wake for RunFutureWaker {
+    fn wake(self: Arc<Self>) {
+        unsafe {
+            bindings::aio_co_wake(self.co);
+        }
+    }
+}
+
+/// Use QEMU's event loops to run a Rust [`Future`] to completion and return its result.
+///
+/// This function must be called in coroutine context. If the future isn't ready yet, it yields.
+pub fn qemu_co_run_future<F: Future>(future: F) -> F::Output {
+    let waker = Waker::from(Arc::new(RunFutureWaker {
+        co: unsafe { bindings::qemu_coroutine_self() },
+    }));
+    let mut cx = Context::from_waker(&waker);
+
+    let mut pinned_future = std::pin::pin!(future);
+    loop {
+        match pinned_future.as_mut().poll(&mut cx) {
+            Poll::Ready(res) => return res,
+            Poll::Pending => unsafe {
+                bindings::qemu_coroutine_yield();
+            },
+        }
+    }
+}
+
+/// Wrapper around [`qemu_co_run_future`] that can be called from C.
+///
+/// # Safety
+///
+/// `future` must be a valid pointer to an owned `F` (it will be freed in this function).  `output`
+/// must be a valid pointer representing a mutable reference to an `F::Output` where the result can
+/// be stored.
+unsafe extern "C" fn rust_co_run_future<F: Future>(
+    future: *mut bindings::RustBoxedFuture,
+    output: *mut c_void,
+) {
+    let future = unsafe { Box::from_raw(future.cast::<F>()) };
+    let output = output.cast::<F::Output>();
+    let ret = qemu_co_run_future(*future);
+    unsafe {
+        output.write(ret);
+    }
+}
+
+/// Use QEMU's event loops to run a Rust [`Future`] to completion and return its result.
+///
+/// This function must be called outside of coroutine context to avoid deadlocks. It blocks and
+/// runs a nested even loop until the future is ready and returns a result.
+pub fn qemu_run_future<F: Future>(future: F) -> F::Output {
+    let future_ptr = Box::into_raw(Box::new(future));
+    let mut output = MaybeUninit::<F::Output>::uninit();
+    unsafe {
+        bindings::rust_run_future(
+            future_ptr.cast::<bindings::RustBoxedFuture>(),
+            #[allow(clippy::as_underscore)]
+            Some(rust_co_run_future::<F> as _),
+            output.as_mut_ptr().cast::<c_void>(),
+        );
+        output.assume_init()
+    }
+}
diff --git a/rust/qemu-api/src/lib.rs b/rust/qemu-api/src/lib.rs
index 825443abde..84928905f1 100644
--- a/rust/qemu-api/src/lib.rs
+++ b/rust/qemu-api/src/lib.rs
@@ -20,6 +20,7 @@
 pub mod cell;
 pub mod chardev;
 pub mod errno;
+pub mod futures;
 #[cfg(feature = "system")]
 pub mod irq;
 #[cfg(feature = "system")]
diff --git a/util/meson.build b/util/meson.build
index 780b5977a8..14a2ae17fd 100644
--- a/util/meson.build
+++ b/util/meson.build
@@ -101,6 +101,9 @@ if have_block
   util_ss.add(files('qemu-coroutine-sleep.c'))
   util_ss.add(files('qemu-co-shared-resource.c'))
   util_ss.add(files('qemu-co-timeout.c'))
+  if have_rust
+    util_ss.add(files('qemu-co-rust-async.c'))
+  endif
   util_ss.add(files('readline.c'))
   util_ss.add(files('throttle.c'))
   util_ss.add(files('timed-average.c'))
-- 
2.48.1
Re: [PATCH v2 04/11] rust/qemu-api: Add wrappers to run futures in QEMU
Posted by Stefan Hajnoczi 1 month ago
On Tue, Feb 18, 2025 at 07:20:12PM +0100, Kevin Wolf wrote:
> This adds helper functions that allow running Rust futures to completion
> using QEMU's event loops.

This commit is a cliff-hanger. I'm intrigued to find out how timer, fd,
etc event loop integration will work :).

> 
> Signed-off-by: Kevin Wolf <kwolf@redhat.com>
> ---
>  include/qemu/coroutine-rust.h | 24 +++++++++++
>  rust/wrapper.h                |  1 +
>  util/qemu-co-rust-async.c     | 55 +++++++++++++++++++++++++
>  rust/qemu-api/meson.build     |  1 +
>  rust/qemu-api/src/futures.rs  | 77 +++++++++++++++++++++++++++++++++++
>  rust/qemu-api/src/lib.rs      |  1 +
>  util/meson.build              |  3 ++
>  7 files changed, 162 insertions(+)
>  create mode 100644 include/qemu/coroutine-rust.h
>  create mode 100644 util/qemu-co-rust-async.c
>  create mode 100644 rust/qemu-api/src/futures.rs
> 
> diff --git a/include/qemu/coroutine-rust.h b/include/qemu/coroutine-rust.h
> new file mode 100644
> index 0000000000..0c5cf42a6b
> --- /dev/null
> +++ b/include/qemu/coroutine-rust.h
> @@ -0,0 +1,24 @@
> +/* SPDX-License-Identifier: LGPL-2.1-or-later */
> +/*
> + * Helpers to run Rust futures using QEMU coroutines
> + *
> + * Copyright Red Hat
> + *
> + * Author:
> + *   Kevin Wolf <kwolf@redhat.com>
> + *
> + * This work is licensed under the terms of the GNU LGPL, version 2.1 or later.
> + * See the COPYING.LIB file in the top-level directory.
> + */
> +
> +#ifndef QEMU_COROUTINE_RUST_H
> +#define QEMU_COROUTINE_RUST_H
> +
> +typedef struct RustBoxedFuture RustBoxedFuture;
> +typedef void coroutine_fn RunFuture(RustBoxedFuture *future, void *opaque);
> +
> +void no_coroutine_fn rust_run_future(RustBoxedFuture *future,
> +                                     RunFuture *entry,
> +                                     void *opaque);

This adds a blocking (aio_poll()-style) API. The more blocking APIs we
add, the more points are created where QEMU hangs when the async
operation doesn't complete in a reasonable amount of time. It would be
best to avoid introducing new blocking APIs, but sometimes it is
unavoidable.

rust_run_future() is very generic and I think the downsides should be
pointed out to discourage people from using it when not absolutely
necessary. Can you document when it's appropriate to use this API?

> +
> +#endif
> diff --git a/rust/wrapper.h b/rust/wrapper.h
> index 303d7bba7f..3dc385e256 100644
> --- a/rust/wrapper.h
> +++ b/rust/wrapper.h
> @@ -58,3 +58,4 @@ typedef enum memory_order {
>  #include "block/block_int.h"
>  #include "block/qdict.h"
>  #include "qapi/qapi-visit-block-core.h"
> +#include "qemu/coroutine-rust.h"
> diff --git a/util/qemu-co-rust-async.c b/util/qemu-co-rust-async.c
> new file mode 100644
> index 0000000000..d893dfb7bd
> --- /dev/null
> +++ b/util/qemu-co-rust-async.c
> @@ -0,0 +1,55 @@
> +/* SPDX-License-Identifier: LGPL-2.1-or-later */
> +/*
> + * Helpers to run Rust futures using QEMU coroutines
> + *
> + * Copyright Red Hat
> + *
> + * Author:
> + *   Kevin Wolf <kwolf@redhat.com>
> + *
> + * This work is licensed under the terms of the GNU LGPL, version 2.1 or later.
> + * See the COPYING.LIB file in the top-level directory.
> + */
> +
> +#include "qemu/osdep.h"
> +
> +#include "block/aio-wait.h"
> +#include "qemu/coroutine.h"
> +#include "qemu/coroutine-rust.h"
> +#include "qemu/main-loop.h"
> +
> +typedef struct FutureCo {
> +    RustBoxedFuture *future;
> +    RunFuture *entry;
> +    void *opaque;
> +    bool done;
> +} FutureCo;
> +
> +static void coroutine_fn rust_co_run_future_entry(void *opaque)
> +{
> +    FutureCo *data = opaque;
> +
> +    data->entry(data->future, data->opaque);
> +    data->done = true;
> +    aio_wait_kick();
> +}
> +
> +void no_coroutine_fn rust_run_future(RustBoxedFuture *future,
> +                                     RunFuture *entry,
> +                                     void *opaque)
> +{
> +    AioContext *ctx = qemu_get_current_aio_context();
> +    Coroutine *co;
> +    FutureCo data = {
> +        .future = future,
> +        .entry = entry,
> +        .opaque = opaque,
> +        .done = false,
> +    };
> +
> +    GLOBAL_STATE_CODE();
> +
> +    co = qemu_coroutine_create(rust_co_run_future_entry, &data);
> +    aio_co_enter(ctx, co);
> +    AIO_WAIT_WHILE(ctx, !data.done);
> +}
> diff --git a/rust/qemu-api/meson.build b/rust/qemu-api/meson.build
> index e0a3052c79..44fd34e193 100644
> --- a/rust/qemu-api/meson.build
> +++ b/rust/qemu-api/meson.build
> @@ -22,6 +22,7 @@ sources_core = [
>    'src/chardev.rs',
>    'src/c_str.rs',
>    'src/errno.rs',
> +  'src/futures.rs',
>    'src/module.rs',
>    'src/offset_of.rs',
>    'src/prelude.rs',
> diff --git a/rust/qemu-api/src/futures.rs b/rust/qemu-api/src/futures.rs
> new file mode 100644
> index 0000000000..cd307a1d62
> --- /dev/null
> +++ b/rust/qemu-api/src/futures.rs
> @@ -0,0 +1,77 @@
> +use crate::bindings;
> +use std::ffi::c_void;
> +use std::future::Future;
> +use std::mem::MaybeUninit;
> +use std::sync::Arc;
> +use std::task::{Context, Poll, Wake, Waker};
> +
> +struct RunFutureWaker {
> +    co: *mut bindings::Coroutine,
> +}
> +unsafe impl Send for RunFutureWaker {}
> +unsafe impl Sync for RunFutureWaker {}
> +
> +impl Wake for RunFutureWaker {
> +    fn wake(self: Arc<Self>) {
> +        unsafe {
> +            bindings::aio_co_wake(self.co);
> +        }
> +    }
> +}
> +
> +/// Use QEMU's event loops to run a Rust [`Future`] to completion and return its result.
> +///
> +/// This function must be called in coroutine context. If the future isn't ready yet, it yields.
> +pub fn qemu_co_run_future<F: Future>(future: F) -> F::Output {
> +    let waker = Waker::from(Arc::new(RunFutureWaker {
> +        co: unsafe { bindings::qemu_coroutine_self() },
> +    }));
> +    let mut cx = Context::from_waker(&waker);
> +
> +    let mut pinned_future = std::pin::pin!(future);
> +    loop {
> +        match pinned_future.as_mut().poll(&mut cx) {
> +            Poll::Ready(res) => return res,
> +            Poll::Pending => unsafe {
> +                bindings::qemu_coroutine_yield();
> +            },
> +        }
> +    }
> +}
> +
> +/// Wrapper around [`qemu_co_run_future`] that can be called from C.
> +///
> +/// # Safety
> +///
> +/// `future` must be a valid pointer to an owned `F` (it will be freed in this function).  `output`
> +/// must be a valid pointer representing a mutable reference to an `F::Output` where the result can
> +/// be stored.
> +unsafe extern "C" fn rust_co_run_future<F: Future>(
> +    future: *mut bindings::RustBoxedFuture,
> +    output: *mut c_void,
> +) {
> +    let future = unsafe { Box::from_raw(future.cast::<F>()) };
> +    let output = output.cast::<F::Output>();
> +    let ret = qemu_co_run_future(*future);
> +    unsafe {
> +        output.write(ret);
> +    }
> +}
> +
> +/// Use QEMU's event loops to run a Rust [`Future`] to completion and return its result.
> +///
> +/// This function must be called outside of coroutine context to avoid deadlocks. It blocks and

rust_run_future() has GLOBAL_STATE_CODE() so qemu_run_future() needs to
run not just outside coroutine context, but also under the BQL. Should
this be mentioned?

> +/// runs a nested even loop until the future is ready and returns a result.
> +pub fn qemu_run_future<F: Future>(future: F) -> F::Output {
> +    let future_ptr = Box::into_raw(Box::new(future));
> +    let mut output = MaybeUninit::<F::Output>::uninit();
> +    unsafe {
> +        bindings::rust_run_future(
> +            future_ptr.cast::<bindings::RustBoxedFuture>(),
> +            #[allow(clippy::as_underscore)]
> +            Some(rust_co_run_future::<F> as _),

This line is hard to follow. I think it's casting to the C equivalent
type:

 void coroutine_fn (*)(RustBoxedFuture *future, void *opaque)

I wonder if there's a clearer way of writing this. Maybe being explicit
rather than implicit here would be helpful.

If not, it's not a big deal, but I spent some time trying to figure out
what this does and others might too.

> +            output.as_mut_ptr().cast::<c_void>(),
> +        );
> +        output.assume_init()
> +    }
> +}
> diff --git a/rust/qemu-api/src/lib.rs b/rust/qemu-api/src/lib.rs
> index 825443abde..84928905f1 100644
> --- a/rust/qemu-api/src/lib.rs
> +++ b/rust/qemu-api/src/lib.rs
> @@ -20,6 +20,7 @@
>  pub mod cell;
>  pub mod chardev;
>  pub mod errno;
> +pub mod futures;
>  #[cfg(feature = "system")]
>  pub mod irq;
>  #[cfg(feature = "system")]
> diff --git a/util/meson.build b/util/meson.build
> index 780b5977a8..14a2ae17fd 100644
> --- a/util/meson.build
> +++ b/util/meson.build
> @@ -101,6 +101,9 @@ if have_block
>    util_ss.add(files('qemu-coroutine-sleep.c'))
>    util_ss.add(files('qemu-co-shared-resource.c'))
>    util_ss.add(files('qemu-co-timeout.c'))
> +  if have_rust
> +    util_ss.add(files('qemu-co-rust-async.c'))
> +  endif
>    util_ss.add(files('readline.c'))
>    util_ss.add(files('throttle.c'))
>    util_ss.add(files('timed-average.c'))
> -- 
> 2.48.1
> 
> 
Re: [PATCH v2 04/11] rust/qemu-api: Add wrappers to run futures in QEMU
Posted by Zhao Liu 1 month, 1 week ago
> +/// Use QEMU's event loops to run a Rust [`Future`] to completion and return its result.
> +///
> +/// This function must be called in coroutine context. If the future isn't ready yet, it yields.
> +pub fn qemu_co_run_future<F: Future>(future: F) -> F::Output {
> +    let waker = Waker::from(Arc::new(RunFutureWaker {
> +        co: unsafe { bindings::qemu_coroutine_self() },
> +    }));
> +    let mut cx = Context::from_waker(&waker);
> +
> +    let mut pinned_future = std::pin::pin!(future);

pin macro stabilized in v1.68.0, but currently the minimum rustc
supported by QEMU is v1.63.

I found there's a workaround [*], so we can add a temporary pin.rs in
qemu_api until QEMU bumps up rustc to >= v1.68?

[*]: https://github.com/rust-lang/rust/issues/93178#issuecomment-1386177439

> +    loop {
> +        match pinned_future.as_mut().poll(&mut cx) {
> +            Poll::Ready(res) => return res,
> +            Poll::Pending => unsafe {
> +                bindings::qemu_coroutine_yield();
> +            },
> +        }
> +    }
> +}
> +
Re: [PATCH v2 04/11] rust/qemu-api: Add wrappers to run futures in QEMU
Posted by Kevin Wolf 1 month, 1 week ago
Am 20.02.2025 um 07:35 hat Zhao Liu geschrieben:
> > +/// Use QEMU's event loops to run a Rust [`Future`] to completion and return its result.
> > +///
> > +/// This function must be called in coroutine context. If the future isn't ready yet, it yields.
> > +pub fn qemu_co_run_future<F: Future>(future: F) -> F::Output {
> > +    let waker = Waker::from(Arc::new(RunFutureWaker {
> > +        co: unsafe { bindings::qemu_coroutine_self() },
> > +    }));
> > +    let mut cx = Context::from_waker(&waker);
> > +
> > +    let mut pinned_future = std::pin::pin!(future);
> 
> pin macro stabilized in v1.68.0, but currently the minimum rustc
> supported by QEMU is v1.63.

Can we check this automatically somehow? I actually seem to remember
that I got errors for too new things before. Is the problem here that
it's a macro?

> I found there's a workaround [*], so we can add a temporary pin.rs in
> qemu_api until QEMU bumps up rustc to >= v1.68?
> 
> [*]: https://github.com/rust-lang/rust/issues/93178#issuecomment-1386177439

I don't think we'll need this anywhere else, so I can just open-code
what the macro does:

    // TODO Use std::pin::pin! when MSRV is updated to at least 1.68.0
    // SAFETY: `future` is not used any more after this and dropped at the end of the function.
    let mut pinned_future = unsafe { std::pin::Pin::new_unchecked(&mut future)};

Kevin