In order to make it thread safe, implement a "fake rwlock",
where we allow reads under BQL *or* job_mutex held, but
writes only under BQL *and* job_mutex.
The only write we have is in child_job_set_aio_ctx, which always
happens under drain (so the job is paused).
For this reason, introduce job_set_aio_context and make sure that
the context is set under BQL, job_mutex and drain.
Also make sure all other places where the aiocontext is read
are protected.
Note: at this stage, job_{lock/unlock} and job lock guard macros
are *nop*.
Suggested-by: Paolo Bonzini <pbonzini@redhat.com>
Signed-off-by: Emanuele Giuseppe Esposito <eesposit@redhat.com>
Reviewed-by: Stefan Hajnoczi <stefanha@redhat.com>
---
block/replication.c | 6 ++++--
blockjob.c | 3 ++-
include/qemu/job.h | 19 ++++++++++++++++++-
job.c | 12 ++++++++++++
4 files changed, 36 insertions(+), 4 deletions(-)
diff --git a/block/replication.c b/block/replication.c
index 55c8f894aa..2189863df1 100644
--- a/block/replication.c
+++ b/block/replication.c
@@ -148,8 +148,10 @@ static void replication_close(BlockDriverState *bs)
}
if (s->stage == BLOCK_REPLICATION_FAILOVER) {
commit_job = &s->commit_job->job;
- assert(commit_job->aio_context == qemu_get_current_aio_context());
- job_cancel_sync(commit_job, false);
+ WITH_JOB_LOCK_GUARD() {
+ assert(commit_job->aio_context == qemu_get_current_aio_context());
+ job_cancel_sync_locked(commit_job, false);
+ }
}
if (s->mode == REPLICATION_MODE_SECONDARY) {
diff --git a/blockjob.c b/blockjob.c
index 96fb9d9f73..9ff2727025 100644
--- a/blockjob.c
+++ b/blockjob.c
@@ -162,12 +162,13 @@ static void child_job_set_aio_ctx(BdrvChild *c, AioContext *ctx,
bdrv_set_aio_context_ignore(sibling->bs, ctx, ignore);
}
- job->job.aio_context = ctx;
+ job_set_aio_context(&job->job, ctx);
}
static AioContext *child_job_get_parent_aio_context(BdrvChild *c)
{
BlockJob *job = c->opaque;
+ assert(qemu_in_main_thread());
return job->job.aio_context;
}
diff --git a/include/qemu/job.h b/include/qemu/job.h
index 5709e8d4a8..c144aabefc 100644
--- a/include/qemu/job.h
+++ b/include/qemu/job.h
@@ -77,7 +77,12 @@ typedef struct Job {
/** Protected by AioContext lock */
- /** AioContext to run the job coroutine in */
+ /**
+ * AioContext to run the job coroutine in.
+ * This field can be read when holding either the BQL (so we are in
+ * the main loop) or the job_mutex.
+ * It can be only written when we hold *both* BQL and job_mutex.
+ */
AioContext *aio_context;
/** Reference count of the block job */
@@ -741,4 +746,16 @@ int job_finish_sync(Job *job, void (*finish)(Job *, Error **errp),
int job_finish_sync_locked(Job *job, void (*finish)(Job *, Error **errp),
Error **errp);
+/**
+ * Sets the @job->aio_context.
+ * Called with job_mutex *not* held.
+ *
+ * This function must run in the main thread to protect against
+ * concurrent read in job_finish_sync_locked(),
+ * takes the job_mutex lock to protect against the read in
+ * job_do_yield_locked(), and must be called when the coroutine
+ * is quiescent.
+ */
+void job_set_aio_context(Job *job, AioContext *ctx);
+
#endif
diff --git a/job.c b/job.c
index ecec66b44e..0a857b1468 100644
--- a/job.c
+++ b/job.c
@@ -394,6 +394,17 @@ Job *job_get(const char *id)
return job_get_locked(id);
}
+void job_set_aio_context(Job *job, AioContext *ctx)
+{
+ /* protect against read in job_finish_sync_locked and job_start */
+ assert(qemu_in_main_thread());
+ /* protect against read in job_do_yield_locked */
+ JOB_LOCK_GUARD();
+ /* ensure the coroutine is quiescent while the AioContext is changed */
+ assert(job->pause_count > 0);
+ job->aio_context = ctx;
+}
+
/* Called with job_mutex *not* held. */
static void job_sleep_timer_cb(void *opaque)
{
@@ -1376,6 +1387,7 @@ int job_finish_sync_locked(Job *job,
{
Error *local_err = NULL;
int ret;
+ assert(qemu_in_main_thread());
job_ref_locked(job);
--
2.31.1
Am 25.07.2022 um 09:38 hat Emanuele Giuseppe Esposito geschrieben:
> In order to make it thread safe, implement a "fake rwlock",
> where we allow reads under BQL *or* job_mutex held, but
> writes only under BQL *and* job_mutex.
Oh, so the "or BQL" part is only for job.aio_context? Okay.
> The only write we have is in child_job_set_aio_ctx, which always
> happens under drain (so the job is paused).
> For this reason, introduce job_set_aio_context and make sure that
> the context is set under BQL, job_mutex and drain.
> Also make sure all other places where the aiocontext is read
> are protected.
>
> Note: at this stage, job_{lock/unlock} and job lock guard macros
> are *nop*.
>
> Suggested-by: Paolo Bonzini <pbonzini@redhat.com>
> Signed-off-by: Emanuele Giuseppe Esposito <eesposit@redhat.com>
> Reviewed-by: Stefan Hajnoczi <stefanha@redhat.com>
> ---
> block/replication.c | 6 ++++--
> blockjob.c | 3 ++-
> include/qemu/job.h | 19 ++++++++++++++++++-
> job.c | 12 ++++++++++++
> 4 files changed, 36 insertions(+), 4 deletions(-)
>
> diff --git a/block/replication.c b/block/replication.c
> index 55c8f894aa..2189863df1 100644
> --- a/block/replication.c
> +++ b/block/replication.c
> @@ -148,8 +148,10 @@ static void replication_close(BlockDriverState *bs)
> }
> if (s->stage == BLOCK_REPLICATION_FAILOVER) {
> commit_job = &s->commit_job->job;
> - assert(commit_job->aio_context == qemu_get_current_aio_context());
> - job_cancel_sync(commit_job, false);
> + WITH_JOB_LOCK_GUARD() {
> + assert(commit_job->aio_context == qemu_get_current_aio_context());
> + job_cancel_sync_locked(commit_job, false);
> + }
> }
.bdrv_close runs under the BQL, so why is this needed? Maybe a
GLOBAL_STATE_CODE() annotation would be helpful, though.
> if (s->mode == REPLICATION_MODE_SECONDARY) {
> diff --git a/blockjob.c b/blockjob.c
> index 96fb9d9f73..9ff2727025 100644
> --- a/blockjob.c
> +++ b/blockjob.c
> @@ -162,12 +162,13 @@ static void child_job_set_aio_ctx(BdrvChild *c, AioContext *ctx,
> bdrv_set_aio_context_ignore(sibling->bs, ctx, ignore);
> }
>
> - job->job.aio_context = ctx;
> + job_set_aio_context(&job->job, ctx);
> }
>
> static AioContext *child_job_get_parent_aio_context(BdrvChild *c)
> {
> BlockJob *job = c->opaque;
> + assert(qemu_in_main_thread());
Any reason not to use GLOBAL_STATE_CODE()?
> return job->job.aio_context;
> }
> diff --git a/include/qemu/job.h b/include/qemu/job.h
> index 5709e8d4a8..c144aabefc 100644
> --- a/include/qemu/job.h
> +++ b/include/qemu/job.h
> @@ -77,7 +77,12 @@ typedef struct Job {
>
> /** Protected by AioContext lock */
I think this section comment should move down below aio_context now.
> - /** AioContext to run the job coroutine in */
> + /**
> + * AioContext to run the job coroutine in.
> + * This field can be read when holding either the BQL (so we are in
> + * the main loop) or the job_mutex.
> + * It can be only written when we hold *both* BQL and job_mutex.
> + */
> AioContext *aio_context;
>
> /** Reference count of the block job */
> @@ -741,4 +746,16 @@ int job_finish_sync(Job *job, void (*finish)(Job *, Error **errp),
> int job_finish_sync_locked(Job *job, void (*finish)(Job *, Error **errp),
> Error **errp);
>
> +/**
> + * Sets the @job->aio_context.
> + * Called with job_mutex *not* held.
> + *
> + * This function must run in the main thread to protect against
> + * concurrent read in job_finish_sync_locked(),
Odd line break here in the middle of a sentence.
> + * takes the job_mutex lock to protect against the read in
> + * job_do_yield_locked(), and must be called when the coroutine
> + * is quiescent.
> + */
> +void job_set_aio_context(Job *job, AioContext *ctx);
> +
> #endif
> diff --git a/job.c b/job.c
> index ecec66b44e..0a857b1468 100644
> --- a/job.c
> +++ b/job.c
> @@ -394,6 +394,17 @@ Job *job_get(const char *id)
> return job_get_locked(id);
> }
>
> +void job_set_aio_context(Job *job, AioContext *ctx)
> +{
> + /* protect against read in job_finish_sync_locked and job_start */
> + assert(qemu_in_main_thread());
Same question about GLOBAL_STATE_CODE().
> + /* protect against read in job_do_yield_locked */
> + JOB_LOCK_GUARD();
> + /* ensure the coroutine is quiescent while the AioContext is changed */
> + assert(job->pause_count > 0);
job->pause_count only shows that pausing was requested. The coroutine is
only really quiescent if job->busy == false, too.
Or maybe job->paused is actually the one you want here.
> + job->aio_context = ctx;
> +}
> +
> /* Called with job_mutex *not* held. */
> static void job_sleep_timer_cb(void *opaque)
> {
> @@ -1376,6 +1387,7 @@ int job_finish_sync_locked(Job *job,
> {
> Error *local_err = NULL;
> int ret;
> + assert(qemu_in_main_thread());
>
> job_ref_locked(job);
Another GLOBAL_STATE_CODE()?
Kevin
Am 05/08/2022 um 11:12 schrieb Kevin Wolf:
> Am 25.07.2022 um 09:38 hat Emanuele Giuseppe Esposito geschrieben:
>> In order to make it thread safe, implement a "fake rwlock",
>> where we allow reads under BQL *or* job_mutex held, but
>> writes only under BQL *and* job_mutex.
>
> Oh, so the "or BQL" part is only for job.aio_context? Okay.
>
>> The only write we have is in child_job_set_aio_ctx, which always
>> happens under drain (so the job is paused).
>> For this reason, introduce job_set_aio_context and make sure that
>> the context is set under BQL, job_mutex and drain.
>> Also make sure all other places where the aiocontext is read
>> are protected.
>>
>> Note: at this stage, job_{lock/unlock} and job lock guard macros
>> are *nop*.
>>
>> Suggested-by: Paolo Bonzini <pbonzini@redhat.com>
>> Signed-off-by: Emanuele Giuseppe Esposito <eesposit@redhat.com>
>> Reviewed-by: Stefan Hajnoczi <stefanha@redhat.com>
>> ---
>> block/replication.c | 6 ++++--
>> blockjob.c | 3 ++-
>> include/qemu/job.h | 19 ++++++++++++++++++-
>> job.c | 12 ++++++++++++
>> 4 files changed, 36 insertions(+), 4 deletions(-)
>>
>> diff --git a/block/replication.c b/block/replication.c
>> index 55c8f894aa..2189863df1 100644
>> --- a/block/replication.c
>> +++ b/block/replication.c
>> @@ -148,8 +148,10 @@ static void replication_close(BlockDriverState *bs)
>> }
>> if (s->stage == BLOCK_REPLICATION_FAILOVER) {
>> commit_job = &s->commit_job->job;
>> - assert(commit_job->aio_context == qemu_get_current_aio_context());
>> - job_cancel_sync(commit_job, false);
>> + WITH_JOB_LOCK_GUARD() {
>> + assert(commit_job->aio_context == qemu_get_current_aio_context());
>> + job_cancel_sync_locked(commit_job, false);
>> + }
>> }
>
> .bdrv_close runs under the BQL, so why is this needed? Maybe a
> GLOBAL_STATE_CODE() annotation would be helpful, though.
I think I left it because it would be confusing to leave a _locked
function without the job lock. I'll add the GLOBAL_STATE_CODE anyways.
>
>> if (s->mode == REPLICATION_MODE_SECONDARY) {
>> diff --git a/blockjob.c b/blockjob.c
>> index 96fb9d9f73..9ff2727025 100644
>> --- a/blockjob.c
>> +++ b/blockjob.c
>> @@ -162,12 +162,13 @@ static void child_job_set_aio_ctx(BdrvChild *c, AioContext *ctx,
>> bdrv_set_aio_context_ignore(sibling->bs, ctx, ignore);
>> }
>>
>> - job->job.aio_context = ctx;
>> + job_set_aio_context(&job->job, ctx);
>> }
>>
>> static AioContext *child_job_get_parent_aio_context(BdrvChild *c)
>> {
>> BlockJob *job = c->opaque;
>> + assert(qemu_in_main_thread());
>
> Any reason not to use GLOBAL_STATE_CODE()?
4 months ago GLOBAL_STATE_CODE did not exist yet, and I didn't think
about updating it :)
>
>> return job->job.aio_context;
>> }
>> + /* protect against read in job_do_yield_locked */
>> + JOB_LOCK_GUARD();
>> + /* ensure the coroutine is quiescent while the AioContext is changed */
>> + assert(job->pause_count > 0);
>
> job->pause_count only shows that pausing was requested. The coroutine is
> only really quiescent if job->busy == false, too.
>
> Or maybe job->paused is actually the one you want here.
I think job->paused works too.
Emanuele
Am 17/08/2022 um 10:04 schrieb Emanuele Giuseppe Esposito: >>> + /* protect against read in job_do_yield_locked */ >>> + JOB_LOCK_GUARD(); >>> + /* ensure the coroutine is quiescent while the AioContext is changed */ >>> + assert(job->pause_count > 0); >> job->pause_count only shows that pausing was requested. The coroutine is >> only really quiescent if job->busy == false, too. >> >> Or maybe job->paused is actually the one you want here. > I think job->paused works too. > Actually it doesn't. At least test-block-iothread test_propagate_mirror() fails, for both job->paused and !job->busy. I think the reason is that if we wait for the flag to be set, we need to actually wait that the job gets to the next pause point, because job_pause() doesn't really pause the job, it just kind of "schedules" the pause on next pause point. So, either we leave pause_count > 0, or somehow wait (I was thinking AIO_WAIT_WHILE(job->paused) but that's probably a very bad idea). Do you have any suggestion for that? Maybe Paolo has a better idea on how to do it? Emanuele
Am 17/08/2022 um 15:10 schrieb Emanuele Giuseppe Esposito: > > > Am 17/08/2022 um 10:04 schrieb Emanuele Giuseppe Esposito: >>>> + /* protect against read in job_do_yield_locked */ >>>> + JOB_LOCK_GUARD(); >>>> + /* ensure the coroutine is quiescent while the AioContext is changed */ >>>> + assert(job->pause_count > 0); >>> job->pause_count only shows that pausing was requested. The coroutine is >>> only really quiescent if job->busy == false, too. >>> >>> Or maybe job->paused is actually the one you want here. >> I think job->paused works too. >> > > Actually it doesn't. At least test-block-iothread > test_propagate_mirror() fails, for both job->paused and !job->busy. I > think the reason is that if we wait for the flag to be set, we need to > actually wait that the job gets to the next pause point, because > job_pause() doesn't really pause the job, it just kind of "schedules" > the pause on next pause point. > > So, either we leave pause_count > 0, or somehow wait (I was thinking > AIO_WAIT_WHILE(job->paused) but that's probably a very bad idea). > > Do you have any suggestion for that? > Maybe Paolo has a better idea on how to do it? > Additional update: after debugging it a little bit, I figured that there are places where busy=false and paused=false too. This is clear in job_do_dismiss and not-so-clear in job_exit (at least for me). I don't know if this is expected or a bug, because it just means that if we are draining and the job is completing, there is no way to stop it. Is it a bug? Or am I missing something? Emanuele
On 7/25/22 10:38, Emanuele Giuseppe Esposito wrote:
> In order to make it thread safe, implement a "fake rwlock",
> where we allow reads under BQL *or* job_mutex held, but
> writes only under BQL *and* job_mutex.
>
> The only write we have is in child_job_set_aio_ctx, which always
> happens under drain (so the job is paused).
> For this reason, introduce job_set_aio_context and make sure that
> the context is set under BQL, job_mutex and drain.
actually, we only make sure that pause was scheduled
> Also make sure all other places where the aiocontext is read
> are protected.
>
> Note: at this stage, job_{lock/unlock} and job lock guard macros
> are *nop*.
>
> Suggested-by: Paolo Bonzini <pbonzini@redhat.com>
> Signed-off-by: Emanuele Giuseppe Esposito <eesposit@redhat.com>
> Reviewed-by: Stefan Hajnoczi <stefanha@redhat.com>
> ---
> block/replication.c | 6 ++++--
> blockjob.c | 3 ++-
> include/qemu/job.h | 19 ++++++++++++++++++-
> job.c | 12 ++++++++++++
> 4 files changed, 36 insertions(+), 4 deletions(-)
>
> diff --git a/block/replication.c b/block/replication.c
> index 55c8f894aa..2189863df1 100644
> --- a/block/replication.c
> +++ b/block/replication.c
> @@ -148,8 +148,10 @@ static void replication_close(BlockDriverState *bs)
> }
> if (s->stage == BLOCK_REPLICATION_FAILOVER) {
> commit_job = &s->commit_job->job;
> - assert(commit_job->aio_context == qemu_get_current_aio_context());
> - job_cancel_sync(commit_job, false);
> + WITH_JOB_LOCK_GUARD() {
> + assert(commit_job->aio_context == qemu_get_current_aio_context());
> + job_cancel_sync_locked(commit_job, false);
> + }
> }
>
> if (s->mode == REPLICATION_MODE_SECONDARY) {
> diff --git a/blockjob.c b/blockjob.c
> index 96fb9d9f73..9ff2727025 100644
> --- a/blockjob.c
> +++ b/blockjob.c
> @@ -162,12 +162,13 @@ static void child_job_set_aio_ctx(BdrvChild *c, AioContext *ctx,
> bdrv_set_aio_context_ignore(sibling->bs, ctx, ignore);
> }
>
> - job->job.aio_context = ctx;
> + job_set_aio_context(&job->job, ctx);
> }
>
> static AioContext *child_job_get_parent_aio_context(BdrvChild *c)
> {
> BlockJob *job = c->opaque;
> + assert(qemu_in_main_thread());
>
> return job->job.aio_context;
> }
> diff --git a/include/qemu/job.h b/include/qemu/job.h
> index 5709e8d4a8..c144aabefc 100644
> --- a/include/qemu/job.h
> +++ b/include/qemu/job.h
> @@ -77,7 +77,12 @@ typedef struct Job {
>
> /** Protected by AioContext lock */
>
> - /** AioContext to run the job coroutine in */
> + /**
> + * AioContext to run the job coroutine in.
> + * This field can be read when holding either the BQL (so we are in
> + * the main loop) or the job_mutex.
> + * It can be only written when we hold *both* BQL and job_mutex.
> + */
> AioContext *aio_context;
>
> /** Reference count of the block job */
> @@ -741,4 +746,16 @@ int job_finish_sync(Job *job, void (*finish)(Job *, Error **errp),
> int job_finish_sync_locked(Job *job, void (*finish)(Job *, Error **errp),
> Error **errp);
>
> +/**
> + * Sets the @job->aio_context.
> + * Called with job_mutex *not* held.
> + *
> + * This function must run in the main thread to protect against
> + * concurrent read in job_finish_sync_locked(),
> + * takes the job_mutex lock to protect against the read in
> + * job_do_yield_locked(), and must be called when the coroutine
> + * is quiescent.
What means "coroutine is quescent"? Could we just swap it by "job is paused"?
> + */
> +void job_set_aio_context(Job *job, AioContext *ctx);
> +
> #endif
> diff --git a/job.c b/job.c
> index ecec66b44e..0a857b1468 100644
> --- a/job.c
> +++ b/job.c
> @@ -394,6 +394,17 @@ Job *job_get(const char *id)
> return job_get_locked(id);
> }
>
> +void job_set_aio_context(Job *job, AioContext *ctx)
> +{
> + /* protect against read in job_finish_sync_locked and job_start */
> + assert(qemu_in_main_thread());
> + /* protect against read in job_do_yield_locked */
> + JOB_LOCK_GUARD();
> + /* ensure the coroutine is quiescent while the AioContext is changed */
pause_count means pause was scheduled. Job may be paused already, or may be not. I'm not against the assertion, it helps. I just think that we don't have the guarantee that comment claims. (And I still don't understand what means coroutine is quiescent. And not that there are may be several job related coroutines: the main one and some worker coroutines).
> + assert(job->pause_count > 0);
> + job->aio_context = ctx;
> +}
> +
> /* Called with job_mutex *not* held. */
> static void job_sleep_timer_cb(void *opaque)
> {
> @@ -1376,6 +1387,7 @@ int job_finish_sync_locked(Job *job,
> {
> Error *local_err = NULL;
> int ret;
> + assert(qemu_in_main_thread());
>
> job_ref_locked(job);
>
without "quiescent coroutine" concept:
Reviewed-by: Vladimir Sementsov-Ogievskiy <vsementsov@yandex-team.ru>
--
Best regards,
Vladimir
© 2016 - 2026 Red Hat, Inc.