From nobody Wed Oct 29 09:10:28 2025 Delivered-To: importer@patchew.org Received-SPF: pass (zoho.com: domain of gnu.org designates 208.118.235.17 as permitted sender) client-ip=208.118.235.17; envelope-from=qemu-devel-bounces+importer=patchew.org@nongnu.org; helo=lists.gnu.org; Authentication-Results: mx.zohomail.com; spf=pass (zoho.com: domain of gnu.org designates 208.118.235.17 as permitted sender) smtp.mailfrom=qemu-devel-bounces+importer=patchew.org@nongnu.org; dmarc=fail(p=none dis=none) header.from=redhat.com Return-Path: Received: from lists.gnu.org (lists.gnu.org [208.118.235.17]) by mx.zohomail.com with SMTPS id 1524585015490942.0713059262844; Tue, 24 Apr 2018 08:50:15 -0700 (PDT) Received: from localhost ([::1]:59229 helo=lists.gnu.org) by lists.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1fB0Cs-0001pL-H2 for importer@patchew.org; Tue, 24 Apr 2018 11:50:14 -0400 Received: from eggs.gnu.org ([2001:4830:134:3::10]:48327) by lists.gnu.org with esmtp (Exim 4.71) (envelope-from ) id 1fAzph-00008m-Fq for qemu-devel@nongnu.org; Tue, 24 Apr 2018 11:26:20 -0400 Received: from Debian-exim by eggs.gnu.org with spam-scanned (Exim 4.71) (envelope-from ) id 1fAzpe-0000Oi-CC for qemu-devel@nongnu.org; Tue, 24 Apr 2018 11:26:17 -0400 Received: from mx3-rdu2.redhat.com ([66.187.233.73]:58904 helo=mx1.redhat.com) by eggs.gnu.org with esmtps (TLS1.0:DHE_RSA_AES_256_CBC_SHA1:32) (Exim 4.71) (envelope-from ) id 1fAzpS-0008Vk-VD; Tue, 24 Apr 2018 11:26:03 -0400 Received: from smtp.corp.redhat.com (int-mx04.intmail.prod.int.rdu2.redhat.com [10.11.54.4]) (using TLSv1.2 with cipher AECDH-AES256-SHA (256/256 bits)) (No client certificate requested) by mx1.redhat.com (Postfix) with ESMTPS id 90EB940201DF; Tue, 24 Apr 2018 15:26:02 +0000 (UTC) Received: from localhost.localdomain.com (ovpn-117-100.ams2.redhat.com [10.36.117.100]) by smtp.corp.redhat.com (Postfix) with ESMTP id 7ED912026990; Tue, 24 Apr 2018 15:26:01 +0000 (UTC) From: Kevin Wolf To: qemu-block@nongnu.org Date: Tue, 24 Apr 2018 17:25:12 +0200 Message-Id: <20180424152515.25664-31-kwolf@redhat.com> In-Reply-To: <20180424152515.25664-1-kwolf@redhat.com> References: <20180424152515.25664-1-kwolf@redhat.com> X-Scanned-By: MIMEDefang 2.78 on 10.11.54.4 X-Greylist: Sender IP whitelisted, not delayed by milter-greylist-4.5.16 (mx1.redhat.com [10.11.55.6]); Tue, 24 Apr 2018 15:26:02 +0000 (UTC) X-Greylist: inspected by milter-greylist-4.5.16 (mx1.redhat.com [10.11.55.6]); Tue, 24 Apr 2018 15:26:02 +0000 (UTC) for IP:'10.11.54.4' DOMAIN:'int-mx04.intmail.prod.int.rdu2.redhat.com' HELO:'smtp.corp.redhat.com' FROM:'kwolf@redhat.com' RCPT:'' X-detected-operating-system: by eggs.gnu.org: GNU/Linux 2.2.x-3.x [generic] [fuzzy] X-Received-From: 66.187.233.73 Subject: [Qemu-devel] [RFC PATCH 30/33] job: Move transactions to Job X-BeenThere: qemu-devel@nongnu.org X-Mailman-Version: 2.1.21 Precedence: list List-Id: List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Cc: kwolf@redhat.com, jcody@redhat.com, jsnow@redhat.com, qemu-devel@nongnu.org, mreitz@redhat.com Errors-To: qemu-devel-bounces+importer=patchew.org@nongnu.org Sender: "Qemu-devel" X-ZohoMail: RSF_0 Z_629925259 SPT_0 Content-Transfer-Encoding: quoted-printable MIME-Version: 1.0 Content-Type: text/plain; charset="utf-8" This moves the logic that implements job transactions from BlockJob to Job. Signed-off-by: Kevin Wolf --- include/block/blockjob.h | 54 ---------- include/block/blockjob_int.h | 10 -- include/qemu/job.h | 70 +++++++++++-- blockdev.c | 6 +- blockjob.c | 238 +--------------------------------------= ---- job.c | 233 +++++++++++++++++++++++++++++++++++++++= +-- tests/test-blockjob-txn.c | 12 +-- tests/test-blockjob.c | 2 +- 8 files changed, 303 insertions(+), 322 deletions(-) diff --git a/include/block/blockjob.h b/include/block/blockjob.h index fbb8f54dc6..e8e9e5f370 100644 --- a/include/block/blockjob.h +++ b/include/block/blockjob.h @@ -33,7 +33,6 @@ #define SLICE_TIME 100000000ULL /* ns */ =20 typedef struct BlockJobDriver BlockJobDriver; -typedef struct JobTxn JobTxn; =20 /** * BlockJob: @@ -84,8 +83,6 @@ typedef struct BlockJob { =20 /** BlockDriverStates that are involved in this block job */ GSList *nodes; - - JobTxn *txn; } BlockJob; =20 /** @@ -153,22 +150,6 @@ void block_job_set_speed(BlockJob *job, int64_t speed,= Error **errp); void block_job_cancel(BlockJob *job, bool force); =20 /** - * block_job_finalize: - * @job: The job to fully commit and finish. - * @errp: Error object. - * - * For jobs that have finished their work and are pending - * awaiting explicit acknowledgement to commit their work, - * This will commit that work. - * - * FIXME: Make the below statement universally true: - * For jobs that support the manual workflow mode, all graph - * changes that occur as a result will occur after this command - * and before a successful reply. - */ -void block_job_finalize(BlockJob *job, Error **errp); - -/** * block_job_dismiss: * @job: The job to be dismissed. * @errp: Error object. @@ -260,41 +241,6 @@ int block_job_complete_sync(BlockJob *job, Error **err= p); void block_job_iostatus_reset(BlockJob *job); =20 /** - * block_job_txn_new: - * - * Allocate and return a new block job transaction. Jobs can be added to = the - * transaction using block_job_txn_add_job(). - * - * The transaction is automatically freed when the last job completes or is - * cancelled. - * - * All jobs in the transaction either complete successfully or fail/cancel= as a - * group. Jobs wait for each other before completing. Cancelling one job - * cancels all jobs in the transaction. - */ -JobTxn *block_job_txn_new(void); - -/** - * block_job_txn_unref: - * - * Release a reference that was previously acquired with block_job_txn_add= _job - * or block_job_txn_new. If it's the last reference to the object, it will= be - * freed. - */ -void block_job_txn_unref(JobTxn *txn); - -/** - * block_job_txn_add_job: - * @txn: The transaction (may be NULL) - * @job: Job to add to the transaction - * - * Add @job to the transaction. The @job must not already be in a transac= tion. - * The caller must call either block_job_txn_unref() or block_job_complete= d() - * to release the reference that is automatically grabbed here. - */ -void block_job_txn_add_job(JobTxn *txn, BlockJob *job); - -/** * block_job_is_internal: * @job: The job to determine if it is user-visible or not. * diff --git a/include/block/blockjob_int.h b/include/block/blockjob_int.h index 13c9924b9c..2519b1f879 100644 --- a/include/block/blockjob_int.h +++ b/include/block/blockjob_int.h @@ -38,16 +38,6 @@ struct BlockJobDriver { /** Generic JobDriver callbacks and settings */ JobDriver job_driver; =20 - /** - * If the callback is not NULL, prepare will be invoked when all the j= obs - * belonging to the same transaction complete; or upon this job's comp= letion - * if it is not in a transaction. - * - * This callback will not be invoked if the job has already failed. - * If it fails, abort and then clean will be called. - */ - int (*prepare)(BlockJob *job); - /* * If the callback is not NULL, it will be invoked before the job is * resumed in a new AioContext. This is the place to move any resourc= es diff --git a/include/qemu/job.h b/include/qemu/job.h index 0c8e7e85d0..fceb18c6c7 100644 --- a/include/qemu/job.h +++ b/include/qemu/job.h @@ -32,6 +32,8 @@ #include "block/aio.h" =20 typedef struct JobDriver JobDriver; +typedef struct JobTxn JobTxn; + =20 /** * Long-running operation. @@ -133,6 +135,9 @@ typedef struct Job { /** Element of the list of jobs */ QLIST_ENTRY(Job) job_list; =20 + /** Transaction this job is part of */ + JobTxn *txn; + /** Element of the list of jobs in a job transaction */ QLIST_ENTRY(Job) txn_list; } Job; @@ -184,6 +189,16 @@ struct JobDriver { void (*drain)(Job *job); =20 /** + * If the callback is not NULL, prepare will be invoked when all the j= obs + * belonging to the same transaction complete; or upon this job's comp= letion + * if it is not in a transaction. + * + * This callback will not be invoked if the job has already failed. + * If it fails, abort and then clean will be called. + */ + int (*prepare)(Job *job); + + /** * If the callback is not NULL, it will be invoked when all the jobs * belonging to the same transaction complete; or upon this job's * completion if it is not in a transaction. Skipped if NULL. @@ -227,20 +242,52 @@ typedef enum JobCreateFlags { JOB_MANUAL_DISMISS =3D 0x04, } JobCreateFlags; =20 +/** + * Allocate and return a new job transaction. Jobs can be added to the + * transaction using job_txn_add_job(). + * + * The transaction is automatically freed when the last job completes or is + * cancelled. + * + * All jobs in the transaction either complete successfully or fail/cancel= as a + * group. Jobs wait for each other before completing. Cancelling one job + * cancels all jobs in the transaction. + */ +JobTxn *job_txn_new(void); + +/** + * Release a reference that was previously acquired with job_txn_add_job or + * job_txn_new. If it's the last reference to the object, it will be freed. + */ +void job_txn_unref(JobTxn *txn); + +/** + * @txn: The transaction (may be NULL) + * @job: Job to add to the transaction + * + * Add @job to the transaction. The @job must not already be in a transac= tion. + * The caller must call either block_job_txn_unref() or block_job_complete= d() + * to release the reference that is automatically grabbed here. + * + * If @txn is NULL, the function does nothing. + */ +void job_txn_add_job(JobTxn *txn, Job *job); =20 /** * Create a new long-running job and return it. * * @job_id: The id of the newly-created job, or %NULL for internal jobs * @driver: The class object for the newly-created job. + * @txn: The transaction this job belongs to, if any. %NULL otherwise. * @ctx: The AioContext to run the job coroutine in. * @flags: Creation flags for the job. See @JobCreateFlags. * @cb: Completion function for the job. * @opaque: Opaque pointer value passed to @cb. * @errp: Error object. */ -void *job_create(const char *job_id, const JobDriver *driver, AioContext *= ctx, - int flags, BlockCompletionFunc *cb, void *opaque, Error *= *errp); +void *job_create(const char *job_id, const JobDriver *driver, JobTxn *txn, + AioContext *ctx, int flags, BlockCompletionFunc *cb, + void *opaque, Error **errp); =20 /** * Add a reference to Job refcnt, it will be decreased with job_unref, and= then @@ -260,9 +307,6 @@ void job_event_cancelled(Job *job); /** To be called when a successfully completed job is finalised. */ void job_event_completed(Job *job); =20 -/** To be called when the job transitions to PENDING */ -void job_event_pending(Job *job); - /** * @job: A job that has not yet been started. * @@ -351,6 +395,16 @@ void job_early_fail(Job *job); /** Asynchronously complete the specified @job. */ void job_complete(Job *job, Error **errp);; =20 +/** + * For a @job that has finished its work and is pending awaiting explicit + * acknowledgement to commit its work, this will commit that work. + * + * FIXME: Make the below statement universally true: + * For jobs that support the manual workflow mode, all graph changes that = occur + * as a result will occur after this command and before a successful reply. + */ +void job_finalize(Job *job, Error **errp); + typedef void JobDeferToMainLoopFn(Job *job, void *opaque); =20 /** @@ -389,8 +443,8 @@ void job_resume(Job *job); void job_do_dismiss(Job *job); int job_finalize_single(Job *job); void job_update_rc(Job *job); - -typedef struct BlockJob BlockJob; -void block_job_txn_del_job(BlockJob *job); +void job_cancel_async(Job *job, bool force); +void job_completed_txn_abort(Job *job); +void job_completed_txn_success(Job *job); =20 #endif diff --git a/blockdev.c b/blockdev.c index 69471c62b3..ec05ad89ae 100644 --- a/blockdev.c +++ b/blockdev.c @@ -2255,7 +2255,7 @@ void qmp_transaction(TransactionActionList *dev_list, */ props =3D get_transaction_properties(props); if (props->completion_mode !=3D ACTION_COMPLETION_MODE_INDIVIDUAL) { - block_job_txn =3D block_job_txn_new(); + block_job_txn =3D job_txn_new(); } =20 /* drain all i/o before any operations */ @@ -2314,7 +2314,7 @@ exit: if (!has_props) { qapi_free_TransactionProperties(props); } - block_job_txn_unref(block_job_txn); + job_txn_unref(block_job_txn); } =20 void qmp_eject(bool has_device, const char *device, @@ -3908,7 +3908,7 @@ void qmp_block_job_finalize(const char *id, Error **e= rrp) } =20 trace_qmp_block_job_finalize(job); - block_job_finalize(job, errp); + job_finalize(&job->job, errp); aio_context_release(aio_context); } =20 diff --git a/blockjob.c b/blockjob.c index 3afa0dbdca..b3d4c34e12 100644 --- a/blockjob.c +++ b/blockjob.c @@ -36,19 +36,6 @@ #include "qemu/coroutine.h" #include "qemu/timer.h" =20 -/* Transactional group of block jobs */ -struct JobTxn { - - /* Is this txn being cancelled? */ - bool aborting; - - /* List of jobs */ - QLIST_HEAD(, Job) jobs; - - /* Reference count */ - int refcnt; -}; - /* * The block job API is composed of two categories of functions. * @@ -95,48 +82,6 @@ BlockJob *block_job_get(const char *id) } } =20 -JobTxn *block_job_txn_new(void) -{ - JobTxn *txn =3D g_new0(JobTxn, 1); - QLIST_INIT(&txn->jobs); - txn->refcnt =3D 1; - return txn; -} - -static void block_job_txn_ref(JobTxn *txn) -{ - txn->refcnt++; -} - -void block_job_txn_unref(JobTxn *txn) -{ - if (txn && --txn->refcnt =3D=3D 0) { - g_free(txn); - } -} - -void block_job_txn_add_job(JobTxn *txn, BlockJob *job) -{ - if (!txn) { - return; - } - - assert(!job->txn); - job->txn =3D txn; - - QLIST_INSERT_HEAD(&txn->jobs, &job->job, txn_list); - block_job_txn_ref(txn); -} - -void block_job_txn_del_job(BlockJob *job) -{ - if (job->txn) { - QLIST_REMOVE(&job->job, txn_list); - block_job_txn_unref(job->txn); - job->txn =3D NULL; - } -} - static void block_job_attached_aio_context(AioContext *new_context, void *opaque); static void block_job_detach_aio_context(void *opaque); @@ -146,8 +91,6 @@ void block_job_free(Job *job) BlockJob *bjob =3D container_of(job, BlockJob, job); BlockDriverState *bs =3D blk_bs(bjob->blk); =20 - assert(!bjob->txn); - bs->job =3D NULL; block_job_remove_all_bdrv(bjob); blk_remove_aio_context_notifier(bjob->blk, @@ -262,158 +205,6 @@ const BlockJobDriver *block_job_driver(BlockJob *job) return job->driver; } =20 -static int block_job_prepare(BlockJob *job) -{ - if (job->job.ret =3D=3D 0 && job->driver->prepare) { - job->job.ret =3D job->driver->prepare(job); - } - return job->job.ret; -} - -static void job_cancel_async(Job *job, bool force) -{ - if (job->user_paused) { - /* Do not call job_enter here, the caller will handle it. */ - job->user_paused =3D false; - if (job->driver->user_resume) { - job->driver->user_resume(job); - } - assert(job->pause_count > 0); - job->pause_count--; - } - job->cancelled =3D true; - /* To prevent 'force =3D=3D false' overriding a previous 'force =3D=3D= true' */ - job->force_cancel |=3D force; -} - -static int block_job_txn_apply(JobTxn *txn, int fn(BlockJob *), bool lock) -{ - AioContext *ctx; - Job *job, *next; - BlockJob *bjob; - int rc =3D 0; - - QLIST_FOREACH_SAFE(job, &txn->jobs, txn_list, next) { - assert(is_block_job(job)); - bjob =3D container_of(job, BlockJob, job); - - if (lock) { - ctx =3D job->aio_context; - aio_context_acquire(ctx); - } - rc =3D fn(bjob); - if (lock) { - aio_context_release(ctx); - } - if (rc) { - break; - } - } - return rc; -} - -static void block_job_completed_txn_abort(BlockJob *job) -{ - AioContext *ctx; - JobTxn *txn =3D job->txn; - Job *other_job; - - if (txn->aborting) { - /* - * We are cancelled by another job, which will handle everything. - */ - return; - } - txn->aborting =3D true; - block_job_txn_ref(txn); - - /* We are the first failed job. Cancel other jobs. */ - QLIST_FOREACH(other_job, &txn->jobs, txn_list) { - ctx =3D other_job->aio_context; - aio_context_acquire(ctx); - } - - /* Other jobs are effectively cancelled by us, set the status for - * them; this job, however, may or may not be cancelled, depending - * on the caller, so leave it. */ - QLIST_FOREACH(other_job, &txn->jobs, txn_list) { - if (other_job !=3D &job->job) { - job_cancel_async(other_job, false); - } - } - while (!QLIST_EMPTY(&txn->jobs)) { - other_job =3D QLIST_FIRST(&txn->jobs); - ctx =3D other_job->aio_context; - if (!job_is_completed(other_job)) { - assert(job_is_cancelled(other_job)); - job_finish_sync(other_job, NULL, NULL); - } - job_finalize_single(other_job); - aio_context_release(ctx); - } - - block_job_txn_unref(txn); -} - -static int block_job_needs_finalize(BlockJob *job) -{ - return !job->job.auto_finalize; -} - -static int block_job_finalize_single(BlockJob *job) -{ - return job_finalize_single(&job->job); -} - -static void block_job_do_finalize(BlockJob *job) -{ - int rc; - assert(job && job->txn); - - /* prepare the transaction to complete */ - rc =3D block_job_txn_apply(job->txn, block_job_prepare, true); - if (rc) { - block_job_completed_txn_abort(job); - } else { - block_job_txn_apply(job->txn, block_job_finalize_single, true); - } -} - -static int block_job_transition_to_pending(BlockJob *job) -{ - job_state_transition(&job->job, JOB_STATUS_PENDING); - if (!job->job.auto_finalize) { - job_event_pending(&job->job); - } - return 0; -} - -static void block_job_completed_txn_success(BlockJob *job) -{ - JobTxn *txn =3D job->txn; - Job *other_job; - - job_state_transition(&job->job, JOB_STATUS_WAITING); - - /* - * Successful completion, see if there are other running jobs in this - * txn. - */ - QLIST_FOREACH(other_job, &txn->jobs, txn_list) { - if (!job_is_completed(other_job)) { - return; - } - assert(other_job->ret =3D=3D 0); - } - - block_job_txn_apply(txn, block_job_transition_to_pending, false); - - /* If no jobs need manual finalization, automatically do so */ - if (block_job_txn_apply(txn, block_job_needs_finalize, false) =3D=3D 0= ) { - block_job_do_finalize(job); - } -} - /* Assumes the block_job_mutex is held */ static bool job_timer_pending(Job *job) { @@ -452,15 +243,6 @@ int64_t block_job_ratelimit_get_delay(BlockJob *job, u= int64_t n) return ratelimit_calculate_delay(&job->limit, n); } =20 -void block_job_finalize(BlockJob *job, Error **errp) -{ - assert(job && job->job.id && job->txn); - if (job_apply_verb(&job->job, JOB_VERB_FINALIZE, errp)) { - return; - } - block_job_do_finalize(job); -} - void block_job_dismiss(BlockJob **jobptr, Error **errp) { BlockJob *job =3D *jobptr; @@ -484,7 +266,7 @@ void block_job_cancel(BlockJob *job, bool force) if (!job_started(&job->job)) { block_job_completed(job, -ECANCELED); } else if (job->job.deferred_to_main_loop) { - block_job_completed_txn_abort(job); + job_completed_txn_abort(&job->job); } else { block_job_enter(job); } @@ -655,7 +437,7 @@ void *block_job_create(const char *job_id, const BlockJ= obDriver *driver, return NULL; } =20 - job =3D job_create(job_id, &driver->job_driver, blk_get_aio_context(bl= k), + job =3D job_create(job_id, &driver->job_driver, txn, blk_get_aio_conte= xt(blk), flags, cb, opaque, errp); if (job =3D=3D NULL) { blk_unref(blk); @@ -703,30 +485,20 @@ void *block_job_create(const char *job_id, const Bloc= kJobDriver *driver, } } =20 - /* Single jobs are modeled as single-job transactions for sake of - * consolidating the job management logic */ - if (!txn) { - txn =3D block_job_txn_new(); - block_job_txn_add_job(txn, job); - block_job_txn_unref(txn); - } else { - block_job_txn_add_job(txn, job); - } - return job; } =20 void block_job_completed(BlockJob *job, int ret) { - assert(job && job->txn && !job_is_completed(&job->job)); + assert(job && job->job.txn && !job_is_completed(&job->job)); assert(blk_bs(job->blk)->job =3D=3D job); job->job.ret =3D ret; job_update_rc(&job->job); trace_block_job_completed(job, ret, job->job.ret); if (job->job.ret) { - block_job_completed_txn_abort(job); + job_completed_txn_abort(&job->job); } else { - block_job_completed_txn_success(job); + job_completed_txn_success(&job->job); } } =20 diff --git a/job.c b/job.c index db3d8536f7..f2381b559c 100644 --- a/job.c +++ b/job.c @@ -63,6 +63,19 @@ bool JobVerbTable[JOB_VERB__MAX][JOB_STATUS__MAX] =3D { [JOB_VERB_DISMISS] =3D {0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0}, }; =20 +/* Transactional group of jobs */ +struct JobTxn { + + /* Is this txn being cancelled? */ + bool aborting; + + /* List of jobs */ + QLIST_HEAD(, Job) jobs; + + /* Reference count */ + int refcnt; +}; + /* Right now, this mutex is only needed to synchronize accesses to job->bu= sy * and job->sleep_timer, such as concurrent calls to job_do_yield and * job_enter. */ @@ -83,6 +96,71 @@ static void __attribute__((__constructor__)) job_init(vo= id) qemu_mutex_init(&job_mutex); } =20 +JobTxn *job_txn_new(void) +{ + JobTxn *txn =3D g_new0(JobTxn, 1); + QLIST_INIT(&txn->jobs); + txn->refcnt =3D 1; + return txn; +} + +static void job_txn_ref(JobTxn *txn) +{ + txn->refcnt++; +} + +void job_txn_unref(JobTxn *txn) +{ + if (txn && --txn->refcnt =3D=3D 0) { + g_free(txn); + } +} + +void job_txn_add_job(JobTxn *txn, Job *job) +{ + if (!txn) { + return; + } + + assert(!job->txn); + job->txn =3D txn; + + QLIST_INSERT_HEAD(&txn->jobs, job, txn_list); + job_txn_ref(txn); +} + +static void job_txn_del_job(Job *job) +{ + if (job->txn) { + QLIST_REMOVE(job, txn_list); + job_txn_unref(job->txn); + job->txn =3D NULL; + } +} + +static int job_txn_apply(JobTxn *txn, int fn(Job *), bool lock) +{ + AioContext *ctx; + Job *job, *next; + int rc =3D 0; + + QLIST_FOREACH_SAFE(job, &txn->jobs, txn_list, next) { + if (lock) { + ctx =3D job->aio_context; + aio_context_acquire(ctx); + } + rc =3D fn(job); + if (lock) { + aio_context_release(ctx); + } + if (rc) { + break; + } + } + return rc; +} + + /* TODO Make static once the whole state machine is in job.c */ void job_state_transition(Job *job, JobStatus s1) { @@ -185,8 +263,9 @@ static void job_sleep_timer_cb(void *opaque) job_enter(job); } =20 -void *job_create(const char *job_id, const JobDriver *driver, AioContext *= ctx, - int flags, BlockCompletionFunc *cb, void *opaque, Error *= *errp) +void *job_create(const char *job_id, const JobDriver *driver, JobTxn *txn, + AioContext *ctx, int flags, BlockCompletionFunc *cb, + void *opaque, Error **errp) { Job *job; =20 @@ -232,6 +311,16 @@ void *job_create(const char *job_id, const JobDriver *= driver, AioContext *ctx, =20 QLIST_INSERT_HEAD(&jobs, job, job_list); =20 + /* Single jobs are modeled as single-job transactions for sake of + * consolidating the job management logic */ + if (!txn) { + txn =3D job_txn_new(); + job_txn_add_job(txn, job); + job_txn_unref(txn); + } else { + job_txn_add_job(txn, job); + } + return job; } =20 @@ -245,6 +334,7 @@ void job_unref(Job *job) if (--job->refcnt =3D=3D 0) { assert(job->status =3D=3D JOB_STATUS_NULL); assert(!timer_pending(&job->sleep_timer)); + assert(!job->txn); =20 if (job->driver->free) { job->driver->free(job); @@ -267,9 +357,10 @@ void job_event_completed(Job *job) notifier_list_notify(&job->on_finalize_completed, job); } =20 -void job_event_pending(Job *job) +static int job_event_pending(Job *job) { notifier_list_notify(&job->on_pending, job); + return 0; } =20 /* @@ -469,8 +560,7 @@ void job_do_dismiss(Job *job) job->paused =3D false; job->deferred_to_main_loop =3D true; =20 - /* TODO Don't assume it's a BlockJob */ - block_job_txn_del_job((BlockJob*) job); + job_txn_del_job(job); =20 job_state_transition(job, JOB_STATUS_NULL); job_unref(job); @@ -550,12 +640,141 @@ int job_finalize_single(Job *job) } } =20 - /* TODO Don't assume it's a BlockJob */ - block_job_txn_del_job((BlockJob*) job); + job_txn_del_job(job); job_conclude(job); return 0; } =20 +void job_cancel_async(Job *job, bool force) +{ + if (job->user_paused) { + /* Do not call job_enter here, the caller will handle it. */ + job->user_paused =3D false; + if (job->driver->user_resume) { + job->driver->user_resume(job); + } + assert(job->pause_count > 0); + job->pause_count--; + } + job->cancelled =3D true; + /* To prevent 'force =3D=3D false' overriding a previous 'force =3D=3D= true' */ + job->force_cancel |=3D force; +} + +void job_completed_txn_abort(Job *job) +{ + AioContext *ctx; + JobTxn *txn =3D job->txn; + Job *other_job; + + if (txn->aborting) { + /* + * We are cancelled by another job, which will handle everything. + */ + return; + } + txn->aborting =3D true; + job_txn_ref(txn); + + /* We are the first failed job. Cancel other jobs. */ + QLIST_FOREACH(other_job, &txn->jobs, txn_list) { + ctx =3D other_job->aio_context; + aio_context_acquire(ctx); + } + + /* Other jobs are effectively cancelled by us, set the status for + * them; this job, however, may or may not be cancelled, depending + * on the caller, so leave it. */ + QLIST_FOREACH(other_job, &txn->jobs, txn_list) { + if (other_job !=3D job) { + job_cancel_async(other_job, false); + } + } + while (!QLIST_EMPTY(&txn->jobs)) { + other_job =3D QLIST_FIRST(&txn->jobs); + ctx =3D other_job->aio_context; + if (!job_is_completed(other_job)) { + assert(job_is_cancelled(other_job)); + job_finish_sync(other_job, NULL, NULL); + } + job_finalize_single(other_job); + aio_context_release(ctx); + } + + job_txn_unref(txn); +} + +static int job_prepare(Job *job) +{ + if (job->ret =3D=3D 0 && job->driver->prepare) { + job->ret =3D job->driver->prepare(job); + } + return job->ret; +} + +static int job_needs_finalize(Job *job) +{ + return !job->auto_finalize; +} + +static void job_do_finalize(Job *job) +{ + int rc; + assert(job && job->txn); + + /* prepare the transaction to complete */ + rc =3D job_txn_apply(job->txn, job_prepare, true); + if (rc) { + job_completed_txn_abort(job); + } else { + job_txn_apply(job->txn, job_finalize_single, true); + } +} + +void job_finalize(Job *job, Error **errp) +{ + assert(job && job->id && job->txn); + if (job_apply_verb(job, JOB_VERB_FINALIZE, errp)) { + return; + } + job_do_finalize(job); +} + +static int job_transition_to_pending(Job *job) +{ + job_state_transition(job, JOB_STATUS_PENDING); + if (!job->auto_finalize) { + job_event_pending(job); + } + return 0; +} + +void job_completed_txn_success(Job *job) +{ + JobTxn *txn =3D job->txn; + Job *other_job; + + job_state_transition(job, JOB_STATUS_WAITING); + + /* + * Successful completion, see if there are other running jobs in this + * txn. + */ + QLIST_FOREACH(other_job, &txn->jobs, txn_list) { + if (!job_is_completed(other_job)) { + return; + } + assert(other_job->ret =3D=3D 0); + } + + job_txn_apply(txn, job_transition_to_pending, false); + + /* If no jobs need manual finalization, automatically do so */ + if (job_txn_apply(txn, job_needs_finalize, false) =3D=3D 0) { + job_do_finalize(job); + } +} + void job_complete(Job *job, Error **errp) { /* Should not be reachable via external interface for internal jobs */ diff --git a/tests/test-blockjob-txn.c b/tests/test-blockjob-txn.c index ec5d592b68..6ee31d59ad 100644 --- a/tests/test-blockjob-txn.c +++ b/tests/test-blockjob-txn.c @@ -125,7 +125,7 @@ static void test_single_job(int expected) JobTxn *txn; int result =3D -EINPROGRESS; =20 - txn =3D block_job_txn_new(); + txn =3D job_txn_new(); job =3D test_block_job_start(1, true, expected, &result, txn); job_start(&job->job); =20 @@ -138,7 +138,7 @@ static void test_single_job(int expected) } g_assert_cmpint(result, =3D=3D, expected); =20 - block_job_txn_unref(txn); + job_txn_unref(txn); } =20 static void test_single_job_success(void) @@ -164,7 +164,7 @@ static void test_pair_jobs(int expected1, int expected2) int result1 =3D -EINPROGRESS; int result2 =3D -EINPROGRESS; =20 - txn =3D block_job_txn_new(); + txn =3D job_txn_new(); job1 =3D test_block_job_start(1, true, expected1, &result1, txn); job2 =3D test_block_job_start(2, true, expected2, &result2, txn); job_start(&job1->job); @@ -173,7 +173,7 @@ static void test_pair_jobs(int expected1, int expected2) /* Release our reference now to trigger as many nice * use-after-free bugs as possible. */ - block_job_txn_unref(txn); + job_txn_unref(txn); =20 if (expected1 =3D=3D -ECANCELED) { block_job_cancel(job1, false); @@ -226,7 +226,7 @@ static void test_pair_jobs_fail_cancel_race(void) int result1 =3D -EINPROGRESS; int result2 =3D -EINPROGRESS; =20 - txn =3D block_job_txn_new(); + txn =3D job_txn_new(); job1 =3D test_block_job_start(1, true, -ECANCELED, &result1, txn); job2 =3D test_block_job_start(2, false, 0, &result2, txn); job_start(&job1->job); @@ -247,7 +247,7 @@ static void test_pair_jobs_fail_cancel_race(void) g_assert_cmpint(result1, =3D=3D, -ECANCELED); g_assert_cmpint(result2, =3D=3D, -ECANCELED); =20 - block_job_txn_unref(txn); + job_txn_unref(txn); } =20 int main(int argc, char **argv) diff --git a/tests/test-blockjob.c b/tests/test-blockjob.c index e44c608327..1e052c2e9c 100644 --- a/tests/test-blockjob.c +++ b/tests/test-blockjob.c @@ -364,7 +364,7 @@ static void test_cancel_concluded(void) } assert(job->job.status =3D=3D JOB_STATUS_PENDING); =20 - block_job_finalize(job, &error_abort); + job_finalize(&job->job, &error_abort); assert(job->job.status =3D=3D JOB_STATUS_CONCLUDED); =20 cancel_common(s); --=20 2.13.6