Signed-off-by: Fam Zheng <famz@redhat.com>
---
include/block/aio.h | 27 +++++++++++++++++---
util/async.c | 73 +++++++++++++++++++++++++++++++++++++++++++++++++++++
2 files changed, 97 insertions(+), 3 deletions(-)
diff --git a/include/block/aio.h b/include/block/aio.h
index e9aeeaec94..40c2f64544 100644
--- a/include/block/aio.h
+++ b/include/block/aio.h
@@ -47,6 +47,15 @@ typedef void QEMUBHFunc(void *opaque);
typedef bool AioPollFn(void *opaque);
typedef void IOHandler(void *opaque);
+typedef void AioDrainFn(void *opaque);
+typedef struct AioDrainOps {
+ AioDrainFn *drained_begin;
+ AioDrainFn *drained_end;
+ void *opaque;
+ bool is_new;
+ QTAILQ_ENTRY(AioDrainOps) next;
+} AioDrainOps;
+
struct Coroutine;
struct ThreadPool;
struct LinuxAioState;
@@ -147,6 +156,9 @@ struct AioContext {
int epollfd;
bool epoll_enabled;
bool epoll_available;
+
+ QTAILQ_HEAD(, AioDrainOps) drain_ops;
+ bool drain_ops_updated;
};
/**
@@ -441,9 +453,9 @@ int64_t aio_compute_timeout(AioContext *ctx);
*
* Disable the further processing of external clients.
*/
-static inline void aio_disable_external(AioContext *ctx)
+static inline bool aio_disable_external(AioContext *ctx)
{
- atomic_inc(&ctx->external_disable_cnt);
+ return atomic_fetch_inc(&ctx->external_disable_cnt) == 0;
}
/**
@@ -452,7 +464,7 @@ static inline void aio_disable_external(AioContext *ctx)
*
* Enable the processing of external clients.
*/
-static inline void aio_enable_external(AioContext *ctx)
+static inline bool aio_enable_external(AioContext *ctx)
{
int old;
@@ -462,6 +474,7 @@ static inline void aio_enable_external(AioContext *ctx)
/* Kick event loop so it re-arms file descriptors */
aio_notify(ctx);
}
+ return old == 1;
}
/**
@@ -564,4 +577,12 @@ void aio_context_set_poll_params(AioContext *ctx, int64_t max_ns,
int64_t grow, int64_t shrink,
Error **errp);
+void aio_context_drained_begin(AioContext *ctx);
+void aio_context_drained_end(AioContext *ctx);
+
+void aio_context_add_drain_ops(AioContext *ctx,
+ AioDrainFn *begin, AioDrainFn *end, void *opaque);
+void aio_context_del_drain_ops(AioContext *ctx,
+ AioDrainFn *begin, AioDrainFn *end, void *opaque);
+
#endif
diff --git a/util/async.c b/util/async.c
index 4dd9d95a9e..cca0efd263 100644
--- a/util/async.c
+++ b/util/async.c
@@ -402,6 +402,7 @@ AioContext *aio_context_new(Error **errp)
AioContext *ctx;
ctx = (AioContext *) g_source_new(&aio_source_funcs, sizeof(AioContext));
+ QTAILQ_INIT(&ctx->drain_ops);
aio_context_setup(ctx);
ret = event_notifier_init(&ctx->notifier, false);
@@ -506,3 +507,75 @@ void aio_context_release(AioContext *ctx)
{
qemu_rec_mutex_unlock(&ctx->lock);
}
+
+/* Called with ctx->lock */
+void aio_context_drained_begin(AioContext *ctx)
+{
+ AioDrainOps *ops;
+
+ /* TODO: When all external fds are handled in the following drain_ops
+ * callbacks, aio_disable_external can be dropped. */
+ aio_disable_external(ctx);
+restart:
+ ctx->drain_ops_updated = false;
+ QTAILQ_FOREACH(ops, &ctx->drain_ops, next) {
+ ops->drained_begin(ops->opaque);
+ if (ctx->drain_ops_updated) {
+ goto restart;
+ }
+ }
+}
+
+/* Called with ctx->lock */
+void aio_context_drained_end(AioContext *ctx)
+{
+ AioDrainOps *ops;
+
+restart:
+ ctx->drain_ops_updated = false;
+ QTAILQ_FOREACH(ops, &ctx->drain_ops, next) {
+ if (ops->is_new) {
+ continue;
+ }
+ ops->drained_end(ops->opaque);
+ if (ctx->drain_ops_updated) {
+ goto restart;
+ }
+ }
+ if (aio_enable_external(ctx)) {
+ QTAILQ_FOREACH(ops, &ctx->drain_ops, next) {
+ ops->is_new = false;
+ }
+ }
+}
+
+/* Called with ctx->lock */
+void aio_context_add_drain_ops(AioContext *ctx,
+ AioDrainFn *begin, AioDrainFn *end, void *opaque)
+{
+ AioDrainOps *ops = g_new0(AioDrainOps, 1);
+ ops->drained_begin = begin;
+ ops->drained_end = end;
+ ops->opaque = opaque;
+ ops->is_new = true;
+ QTAILQ_INSERT_TAIL(&ctx->drain_ops, ops, next);
+ ctx->drain_ops_updated = true;
+}
+
+/* Called with ctx->lock */
+void aio_context_del_drain_ops(AioContext *ctx,
+ AioDrainFn *begin, AioDrainFn *end, void *opaque)
+{
+ AioDrainOps *ops;
+
+ QTAILQ_FOREACH(ops, &ctx->drain_ops, next) {
+ if (ops->drained_begin == begin &&
+ ops->drained_end == end &&
+ ops->opaque == opaque) {
+ QTAILQ_REMOVE(&ctx->drain_ops, ops, next);
+ ctx->drain_ops_updated = true;
+ g_free(ops);
+ return;
+ }
+ }
+}
--
2.14.3
On Wed, Nov 29, 2017 at 10:49:49PM +0800, Fam Zheng wrote:
> diff --git a/util/async.c b/util/async.c
> index 4dd9d95a9e..cca0efd263 100644
> --- a/util/async.c
> +++ b/util/async.c
> @@ -402,6 +402,7 @@ AioContext *aio_context_new(Error **errp)
> AioContext *ctx;
>
> ctx = (AioContext *) g_source_new(&aio_source_funcs, sizeof(AioContext));
> + QTAILQ_INIT(&ctx->drain_ops);
> aio_context_setup(ctx);
>
> ret = event_notifier_init(&ctx->notifier, false);
> @@ -506,3 +507,75 @@ void aio_context_release(AioContext *ctx)
> {
> qemu_rec_mutex_unlock(&ctx->lock);
> }
> +
> +/* Called with ctx->lock */
> +void aio_context_drained_begin(AioContext *ctx)
> +{
> + AioDrainOps *ops;
> +
> + /* TODO: When all external fds are handled in the following drain_ops
> + * callbacks, aio_disable_external can be dropped. */
> + aio_disable_external(ctx);
> +restart:
> + ctx->drain_ops_updated = false;
> + QTAILQ_FOREACH(ops, &ctx->drain_ops, next) {
> + ops->drained_begin(ops->opaque);
> + if (ctx->drain_ops_updated) {
> + goto restart;
drained_begin() can be called multiple times. This needs to be clearly
documented to avoid surprises.
> + }
> + }
> +}
> +
> +/* Called with ctx->lock */
> +void aio_context_drained_end(AioContext *ctx)
> +{
> + AioDrainOps *ops;
> +
> +restart:
> + ctx->drain_ops_updated = false;
> + QTAILQ_FOREACH(ops, &ctx->drain_ops, next) {
> + if (ops->is_new) {
> + continue;
> + }
> + ops->drained_end(ops->opaque);
drained_end() can be called multiple times. This needs to be clearly
documented to avoid surprises.
> + if (ctx->drain_ops_updated) {
> + goto restart;
> + }
> + }
> + if (aio_enable_external(ctx)) {
> + QTAILQ_FOREACH(ops, &ctx->drain_ops, next) {
> + ops->is_new = false;
> + }
> + }
This is weird, aio_context_drained_end() has nesting support for
->is_new but not for ->drained_end() calls. I'm not sure where you're
going with these semantics yet.
© 2016 - 2026 Red Hat, Inc.