1
The following changes since commit 9cf289af47bcfae5c75de37d8e5d6fd23705322c:
1
The following changes since commit 19b599f7664b2ebfd0f405fb79c14dd241557452:
2
2
3
Merge tag 'qga-pull-request' of gitlab.com:marcandre.lureau/qemu into staging (2022-05-04 03:42:49 -0700)
3
Merge remote-tracking branch 'remotes/armbru/tags/pull-error-2018-08-27-v2' into staging (2018-08-27 16:44:20 +0100)
4
4
5
are available in the Git repository at:
5
are available in the Git repository at:
6
6
7
https://gitlab.com/stefanha/qemu.git tags/block-pull-request
7
https://git.xanclic.moe/XanClic/qemu.git tags/pull-block-2018-08-31-v2
8
8
9
for you to fetch changes up to bef2e050d6a7feb865854c65570c496ac5a8cf53:
9
for you to fetch changes up to e21a1c9831fc80ae3f3c1affdfa43350035d8588:
10
10
11
util/event-loop-base: Introduce options to set the thread pool size (2022-05-04 17:02:19 +0100)
11
jobs: remove job_defer_to_main_loop (2018-08-31 16:28:33 +0200)
12
12
13
----------------------------------------------------------------
13
----------------------------------------------------------------
14
Pull request
14
Block patches:
15
15
- (Block) job exit refactoring, part 1
16
Add new thread-pool-min/thread-pool-max parameters to control the thread pool
16
(removing job_defer_to_main_loop())
17
used for async I/O.
17
- test-bdrv-drain leak fix
18
18
19
----------------------------------------------------------------
19
----------------------------------------------------------------
20
John Snow (9):
21
jobs: change start callback to run callback
22
jobs: canonize Error object
23
jobs: add exit shim
24
block/commit: utilize job_exit shim
25
block/mirror: utilize job_exit shim
26
jobs: utilize job_exit shim
27
block/backup: make function variables consistently named
28
jobs: remove ret argument to job_completed; privatize it
29
jobs: remove job_defer_to_main_loop
20
30
21
Nicolas Saenz Julienne (3):
31
Marc-André Lureau (1):
22
Introduce event-loop-base abstract class
32
tests: fix bdrv-drain leak
23
util/main-loop: Introduce the main loop into QOM
24
util/event-loop-base: Introduce options to set the thread pool size
25
33
26
qapi/qom.json | 43 ++++++++--
34
include/qemu/job.h | 70 ++++++++++++++++-----------------
27
meson.build | 26 +++---
35
block/backup.c | 81 ++++++++++++++++-----------------------
28
include/block/aio.h | 10 +++
36
block/commit.c | 29 +++++---------
29
include/block/thread-pool.h | 3 +
37
block/create.c | 19 +++------
30
include/qemu/main-loop.h | 10 +++
38
block/mirror.c | 39 ++++++++-----------
31
include/sysemu/event-loop-base.h | 41 +++++++++
39
block/stream.c | 29 ++++++--------
32
include/sysemu/iothread.h | 6 +-
40
job-qmp.c | 5 ++-
33
event-loop-base.c | 140 +++++++++++++++++++++++++++++++
41
job.c | 73 ++++++++++++-----------------------
34
iothread.c | 68 +++++----------
42
tests/test-bdrv-drain.c | 14 +++----
35
util/aio-posix.c | 1 +
43
tests/test-blockjob-txn.c | 25 +++++-------
36
util/async.c | 20 +++++
44
tests/test-blockjob.c | 17 ++++----
37
util/main-loop.c | 65 ++++++++++++++
45
trace-events | 2 +-
38
util/thread-pool.c | 55 +++++++++++-
46
12 files changed, 161 insertions(+), 242 deletions(-)
39
13 files changed, 419 insertions(+), 69 deletions(-)
40
create mode 100644 include/sysemu/event-loop-base.h
41
create mode 100644 event-loop-base.c
42
47
43
--
48
--
44
2.35.1
49
2.17.1
50
51
diff view generated by jsdifflib
New patch
1
From: Marc-André Lureau <marcandre.lureau@redhat.com>
1
2
3
Spotted by ASAN:
4
5
=================================================================
6
==5378==ERROR: LeakSanitizer: detected memory leaks
7
8
Direct leak of 65536 byte(s) in 1 object(s) allocated from:
9
#0 0x7f788f83bc48 in malloc (/lib64/libasan.so.5+0xeec48)
10
#1 0x7f788c9923c5 in g_malloc (/lib64/libglib-2.0.so.0+0x523c5)
11
#2 0x5622a1fe37bc in coroutine_trampoline /home/elmarco/src/qq/util/coroutine-ucontext.c:116
12
#3 0x7f788a15d75f in __correctly_grouped_prefixwc (/lib64/libc.so.6+0x4c75f)
13
14
(Broken in commit 4c8158e359d.)
15
16
Signed-off-by: Marc-André Lureau <marcandre.lureau@redhat.com>
17
Message-id: 20180809114417.28718-3-marcandre.lureau@redhat.com
18
Signed-off-by: Max Reitz <mreitz@redhat.com>
19
---
20
tests/test-bdrv-drain.c | 1 +
21
1 file changed, 1 insertion(+)
22
23
diff --git a/tests/test-bdrv-drain.c b/tests/test-bdrv-drain.c
24
index XXXXXXX..XXXXXXX 100644
25
--- a/tests/test-bdrv-drain.c
26
+++ b/tests/test-bdrv-drain.c
27
@@ -XXX,XX +XXX,XX @@ static void coroutine_fn test_co_delete_by_drain(void *opaque)
28
}
29
30
dbdd->done = true;
31
+ g_free(buffer);
32
}
33
34
/**
35
--
36
2.17.1
37
38
diff view generated by jsdifflib
New patch
1
1
From: John Snow <jsnow@redhat.com>
2
3
Presently we codify the entry point for a job as the "start" callback,
4
but a more apt name would be "run" to clarify the idea that when this
5
function returns we consider the job to have "finished," except for
6
any cleanup which occurs in separate callbacks later.
7
8
As part of this clarification, change the signature to include an error
9
object and a return code. The error ptr is not yet used, and the return
10
code while captured, will be overwritten by actions in the job_completed
11
function.
12
13
Signed-off-by: John Snow <jsnow@redhat.com>
14
Reviewed-by: Max Reitz <mreitz@redhat.com>
15
Message-id: 20180830015734.19765-2-jsnow@redhat.com
16
Reviewed-by: Jeff Cody <jcody@redhat.com>
17
Signed-off-by: Max Reitz <mreitz@redhat.com>
18
---
19
include/qemu/job.h | 2 +-
20
block/backup.c | 7 ++++---
21
block/commit.c | 7 ++++---
22
block/create.c | 8 +++++---
23
block/mirror.c | 10 ++++++----
24
block/stream.c | 7 ++++---
25
job.c | 6 +++---
26
tests/test-bdrv-drain.c | 7 ++++---
27
tests/test-blockjob-txn.c | 16 ++++++++--------
28
tests/test-blockjob.c | 7 ++++---
29
10 files changed, 43 insertions(+), 34 deletions(-)
30
31
diff --git a/include/qemu/job.h b/include/qemu/job.h
32
index XXXXXXX..XXXXXXX 100644
33
--- a/include/qemu/job.h
34
+++ b/include/qemu/job.h
35
@@ -XXX,XX +XXX,XX @@ struct JobDriver {
36
JobType job_type;
37
38
/** Mandatory: Entrypoint for the Coroutine. */
39
- CoroutineEntry *start;
40
+ int coroutine_fn (*run)(Job *job, Error **errp);
41
42
/**
43
* If the callback is not NULL, it will be invoked when the job transitions
44
diff --git a/block/backup.c b/block/backup.c
45
index XXXXXXX..XXXXXXX 100644
46
--- a/block/backup.c
47
+++ b/block/backup.c
48
@@ -XXX,XX +XXX,XX @@ static void backup_incremental_init_copy_bitmap(BackupBlockJob *job)
49
bdrv_dirty_iter_free(dbi);
50
}
51
52
-static void coroutine_fn backup_run(void *opaque)
53
+static int coroutine_fn backup_run(Job *opaque_job, Error **errp)
54
{
55
- BackupBlockJob *job = opaque;
56
+ BackupBlockJob *job = container_of(opaque_job, BackupBlockJob, common.job);
57
BackupCompleteData *data;
58
BlockDriverState *bs = blk_bs(job->common.blk);
59
int64_t offset, nb_clusters;
60
@@ -XXX,XX +XXX,XX @@ static void coroutine_fn backup_run(void *opaque)
61
data = g_malloc(sizeof(*data));
62
data->ret = ret;
63
job_defer_to_main_loop(&job->common.job, backup_complete, data);
64
+ return ret;
65
}
66
67
static const BlockJobDriver backup_job_driver = {
68
@@ -XXX,XX +XXX,XX @@ static const BlockJobDriver backup_job_driver = {
69
.free = block_job_free,
70
.user_resume = block_job_user_resume,
71
.drain = block_job_drain,
72
- .start = backup_run,
73
+ .run = backup_run,
74
.commit = backup_commit,
75
.abort = backup_abort,
76
.clean = backup_clean,
77
diff --git a/block/commit.c b/block/commit.c
78
index XXXXXXX..XXXXXXX 100644
79
--- a/block/commit.c
80
+++ b/block/commit.c
81
@@ -XXX,XX +XXX,XX @@ static void commit_complete(Job *job, void *opaque)
82
bdrv_unref(top);
83
}
84
85
-static void coroutine_fn commit_run(void *opaque)
86
+static int coroutine_fn commit_run(Job *job, Error **errp)
87
{
88
- CommitBlockJob *s = opaque;
89
+ CommitBlockJob *s = container_of(job, CommitBlockJob, common.job);
90
CommitCompleteData *data;
91
int64_t offset;
92
uint64_t delay_ns = 0;
93
@@ -XXX,XX +XXX,XX @@ out:
94
data = g_malloc(sizeof(*data));
95
data->ret = ret;
96
job_defer_to_main_loop(&s->common.job, commit_complete, data);
97
+ return ret;
98
}
99
100
static const BlockJobDriver commit_job_driver = {
101
@@ -XXX,XX +XXX,XX @@ static const BlockJobDriver commit_job_driver = {
102
.free = block_job_free,
103
.user_resume = block_job_user_resume,
104
.drain = block_job_drain,
105
- .start = commit_run,
106
+ .run = commit_run,
107
},
108
};
109
110
diff --git a/block/create.c b/block/create.c
111
index XXXXXXX..XXXXXXX 100644
112
--- a/block/create.c
113
+++ b/block/create.c
114
@@ -XXX,XX +XXX,XX @@ static void blockdev_create_complete(Job *job, void *opaque)
115
job_completed(job, s->ret, s->err);
116
}
117
118
-static void coroutine_fn blockdev_create_run(void *opaque)
119
+static int coroutine_fn blockdev_create_run(Job *job, Error **errp)
120
{
121
- BlockdevCreateJob *s = opaque;
122
+ BlockdevCreateJob *s = container_of(job, BlockdevCreateJob, common);
123
124
job_progress_set_remaining(&s->common, 1);
125
s->ret = s->drv->bdrv_co_create(s->opts, &s->err);
126
@@ -XXX,XX +XXX,XX @@ static void coroutine_fn blockdev_create_run(void *opaque)
127
128
qapi_free_BlockdevCreateOptions(s->opts);
129
job_defer_to_main_loop(&s->common, blockdev_create_complete, NULL);
130
+
131
+ return s->ret;
132
}
133
134
static const JobDriver blockdev_create_job_driver = {
135
.instance_size = sizeof(BlockdevCreateJob),
136
.job_type = JOB_TYPE_CREATE,
137
- .start = blockdev_create_run,
138
+ .run = blockdev_create_run,
139
};
140
141
void qmp_blockdev_create(const char *job_id, BlockdevCreateOptions *options,
142
diff --git a/block/mirror.c b/block/mirror.c
143
index XXXXXXX..XXXXXXX 100644
144
--- a/block/mirror.c
145
+++ b/block/mirror.c
146
@@ -XXX,XX +XXX,XX @@ static int mirror_flush(MirrorBlockJob *s)
147
return ret;
148
}
149
150
-static void coroutine_fn mirror_run(void *opaque)
151
+static int coroutine_fn mirror_run(Job *job, Error **errp)
152
{
153
- MirrorBlockJob *s = opaque;
154
+ MirrorBlockJob *s = container_of(job, MirrorBlockJob, common.job);
155
MirrorExitData *data;
156
BlockDriverState *bs = s->mirror_top_bs->backing->bs;
157
BlockDriverState *target_bs = blk_bs(s->target);
158
@@ -XXX,XX +XXX,XX @@ immediate_exit:
159
if (need_drain) {
160
bdrv_drained_begin(bs);
161
}
162
+
163
job_defer_to_main_loop(&s->common.job, mirror_exit, data);
164
+ return ret;
165
}
166
167
static void mirror_complete(Job *job, Error **errp)
168
@@ -XXX,XX +XXX,XX @@ static const BlockJobDriver mirror_job_driver = {
169
.free = block_job_free,
170
.user_resume = block_job_user_resume,
171
.drain = block_job_drain,
172
- .start = mirror_run,
173
+ .run = mirror_run,
174
.pause = mirror_pause,
175
.complete = mirror_complete,
176
},
177
@@ -XXX,XX +XXX,XX @@ static const BlockJobDriver commit_active_job_driver = {
178
.free = block_job_free,
179
.user_resume = block_job_user_resume,
180
.drain = block_job_drain,
181
- .start = mirror_run,
182
+ .run = mirror_run,
183
.pause = mirror_pause,
184
.complete = mirror_complete,
185
},
186
diff --git a/block/stream.c b/block/stream.c
187
index XXXXXXX..XXXXXXX 100644
188
--- a/block/stream.c
189
+++ b/block/stream.c
190
@@ -XXX,XX +XXX,XX @@ out:
191
g_free(data);
192
}
193
194
-static void coroutine_fn stream_run(void *opaque)
195
+static int coroutine_fn stream_run(Job *job, Error **errp)
196
{
197
- StreamBlockJob *s = opaque;
198
+ StreamBlockJob *s = container_of(job, StreamBlockJob, common.job);
199
StreamCompleteData *data;
200
BlockBackend *blk = s->common.blk;
201
BlockDriverState *bs = blk_bs(blk);
202
@@ -XXX,XX +XXX,XX @@ out:
203
data = g_malloc(sizeof(*data));
204
data->ret = ret;
205
job_defer_to_main_loop(&s->common.job, stream_complete, data);
206
+ return ret;
207
}
208
209
static const BlockJobDriver stream_job_driver = {
210
@@ -XXX,XX +XXX,XX @@ static const BlockJobDriver stream_job_driver = {
211
.instance_size = sizeof(StreamBlockJob),
212
.job_type = JOB_TYPE_STREAM,
213
.free = block_job_free,
214
- .start = stream_run,
215
+ .run = stream_run,
216
.user_resume = block_job_user_resume,
217
.drain = block_job_drain,
218
},
219
diff --git a/job.c b/job.c
220
index XXXXXXX..XXXXXXX 100644
221
--- a/job.c
222
+++ b/job.c
223
@@ -XXX,XX +XXX,XX @@ static void coroutine_fn job_co_entry(void *opaque)
224
{
225
Job *job = opaque;
226
227
- assert(job && job->driver && job->driver->start);
228
+ assert(job && job->driver && job->driver->run);
229
job_pause_point(job);
230
- job->driver->start(job);
231
+ job->ret = job->driver->run(job, NULL);
232
}
233
234
235
void job_start(Job *job)
236
{
237
assert(job && !job_started(job) && job->paused &&
238
- job->driver && job->driver->start);
239
+ job->driver && job->driver->run);
240
job->co = qemu_coroutine_create(job_co_entry, job);
241
job->pause_count--;
242
job->busy = true;
243
diff --git a/tests/test-bdrv-drain.c b/tests/test-bdrv-drain.c
244
index XXXXXXX..XXXXXXX 100644
245
--- a/tests/test-bdrv-drain.c
246
+++ b/tests/test-bdrv-drain.c
247
@@ -XXX,XX +XXX,XX @@ static void test_job_completed(Job *job, void *opaque)
248
job_completed(job, 0, NULL);
249
}
250
251
-static void coroutine_fn test_job_start(void *opaque)
252
+static int coroutine_fn test_job_run(Job *job, Error **errp)
253
{
254
- TestBlockJob *s = opaque;
255
+ TestBlockJob *s = container_of(job, TestBlockJob, common.job);
256
257
job_transition_to_ready(&s->common.job);
258
while (!s->should_complete) {
259
@@ -XXX,XX +XXX,XX @@ static void coroutine_fn test_job_start(void *opaque)
260
}
261
262
job_defer_to_main_loop(&s->common.job, test_job_completed, NULL);
263
+ return 0;
264
}
265
266
static void test_job_complete(Job *job, Error **errp)
267
@@ -XXX,XX +XXX,XX @@ BlockJobDriver test_job_driver = {
268
.free = block_job_free,
269
.user_resume = block_job_user_resume,
270
.drain = block_job_drain,
271
- .start = test_job_start,
272
+ .run = test_job_run,
273
.complete = test_job_complete,
274
},
275
};
276
diff --git a/tests/test-blockjob-txn.c b/tests/test-blockjob-txn.c
277
index XXXXXXX..XXXXXXX 100644
278
--- a/tests/test-blockjob-txn.c
279
+++ b/tests/test-blockjob-txn.c
280
@@ -XXX,XX +XXX,XX @@ static void test_block_job_complete(Job *job, void *opaque)
281
bdrv_unref(bs);
282
}
283
284
-static void coroutine_fn test_block_job_run(void *opaque)
285
+static int coroutine_fn test_block_job_run(Job *job, Error **errp)
286
{
287
- TestBlockJob *s = opaque;
288
- BlockJob *job = &s->common;
289
+ TestBlockJob *s = container_of(job, TestBlockJob, common.job);
290
291
while (s->iterations--) {
292
if (s->use_timer) {
293
- job_sleep_ns(&job->job, 0);
294
+ job_sleep_ns(job, 0);
295
} else {
296
- job_yield(&job->job);
297
+ job_yield(job);
298
}
299
300
- if (job_is_cancelled(&job->job)) {
301
+ if (job_is_cancelled(job)) {
302
break;
303
}
304
}
305
306
- job_defer_to_main_loop(&job->job, test_block_job_complete,
307
+ job_defer_to_main_loop(job, test_block_job_complete,
308
(void *)(intptr_t)s->rc);
309
+ return s->rc;
310
}
311
312
typedef struct {
313
@@ -XXX,XX +XXX,XX @@ static const BlockJobDriver test_block_job_driver = {
314
.free = block_job_free,
315
.user_resume = block_job_user_resume,
316
.drain = block_job_drain,
317
- .start = test_block_job_run,
318
+ .run = test_block_job_run,
319
},
320
};
321
322
diff --git a/tests/test-blockjob.c b/tests/test-blockjob.c
323
index XXXXXXX..XXXXXXX 100644
324
--- a/tests/test-blockjob.c
325
+++ b/tests/test-blockjob.c
326
@@ -XXX,XX +XXX,XX @@ static void cancel_job_complete(Job *job, Error **errp)
327
s->should_complete = true;
328
}
329
330
-static void coroutine_fn cancel_job_start(void *opaque)
331
+static int coroutine_fn cancel_job_run(Job *job, Error **errp)
332
{
333
- CancelJob *s = opaque;
334
+ CancelJob *s = container_of(job, CancelJob, common.job);
335
336
while (!s->should_complete) {
337
if (job_is_cancelled(&s->common.job)) {
338
@@ -XXX,XX +XXX,XX @@ static void coroutine_fn cancel_job_start(void *opaque)
339
340
defer:
341
job_defer_to_main_loop(&s->common.job, cancel_job_completed, s);
342
+ return 0;
343
}
344
345
static const BlockJobDriver test_cancel_driver = {
346
@@ -XXX,XX +XXX,XX @@ static const BlockJobDriver test_cancel_driver = {
347
.free = block_job_free,
348
.user_resume = block_job_user_resume,
349
.drain = block_job_drain,
350
- .start = cancel_job_start,
351
+ .run = cancel_job_run,
352
.complete = cancel_job_complete,
353
},
354
};
355
--
356
2.17.1
357
358
diff view generated by jsdifflib
New patch
1
1
From: John Snow <jsnow@redhat.com>
2
3
Jobs presently use both an Error object in the case of the create job,
4
and char strings in the case of generic errors elsewhere.
5
6
Unify the two paths as just j->err, and remove the extra argument from
7
job_completed. The integer error code for job_completed is kept for now,
8
to be removed shortly in a separate patch.
9
10
Signed-off-by: John Snow <jsnow@redhat.com>
11
Message-id: 20180830015734.19765-3-jsnow@redhat.com
12
[mreitz: Dropped a superfluous g_strdup()]
13
Reviewed-by: Eric Blake <eblake@redhat.com>
14
Signed-off-by: Max Reitz <mreitz@redhat.com>
15
---
16
include/qemu/job.h | 14 ++++++++------
17
block/backup.c | 2 +-
18
block/commit.c | 2 +-
19
block/create.c | 5 ++---
20
block/mirror.c | 2 +-
21
block/stream.c | 2 +-
22
job-qmp.c | 5 +++--
23
job.c | 18 ++++++------------
24
tests/test-bdrv-drain.c | 2 +-
25
tests/test-blockjob-txn.c | 2 +-
26
tests/test-blockjob.c | 2 +-
27
11 files changed, 26 insertions(+), 30 deletions(-)
28
29
diff --git a/include/qemu/job.h b/include/qemu/job.h
30
index XXXXXXX..XXXXXXX 100644
31
--- a/include/qemu/job.h
32
+++ b/include/qemu/job.h
33
@@ -XXX,XX +XXX,XX @@ typedef struct Job {
34
/** Estimated progress_current value at the completion of the job */
35
int64_t progress_total;
36
37
- /** Error string for a failed job (NULL if, and only if, job->ret == 0) */
38
- char *error;
39
-
40
/** ret code passed to job_completed. */
41
int ret;
42
43
+ /**
44
+ * Error object for a failed job.
45
+ * If job->ret is nonzero and an error object was not set, it will be set
46
+ * to strerror(-job->ret) during job_completed.
47
+ */
48
+ Error *err;
49
+
50
/** The completion function that will be called when the job completes. */
51
BlockCompletionFunc *cb;
52
53
@@ -XXX,XX +XXX,XX @@ void job_transition_to_ready(Job *job);
54
/**
55
* @job: The job being completed.
56
* @ret: The status code.
57
- * @error: The error message for a failing job (only with @ret < 0). If @ret is
58
- * negative, but NULL is given for @error, strerror() is used.
59
*
60
* Marks @job as completed. If @ret is non-zero, the job transaction it is part
61
* of is aborted. If @ret is zero, the job moves into the WAITING state. If it
62
* is the last job to complete in its transaction, all jobs in the transaction
63
* move from WAITING to PENDING.
64
*/
65
-void job_completed(Job *job, int ret, Error *error);
66
+void job_completed(Job *job, int ret);
67
68
/** Asynchronously complete the specified @job. */
69
void job_complete(Job *job, Error **errp);
70
diff --git a/block/backup.c b/block/backup.c
71
index XXXXXXX..XXXXXXX 100644
72
--- a/block/backup.c
73
+++ b/block/backup.c
74
@@ -XXX,XX +XXX,XX @@ static void backup_complete(Job *job, void *opaque)
75
{
76
BackupCompleteData *data = opaque;
77
78
- job_completed(job, data->ret, NULL);
79
+ job_completed(job, data->ret);
80
g_free(data);
81
}
82
83
diff --git a/block/commit.c b/block/commit.c
84
index XXXXXXX..XXXXXXX 100644
85
--- a/block/commit.c
86
+++ b/block/commit.c
87
@@ -XXX,XX +XXX,XX @@ static void commit_complete(Job *job, void *opaque)
88
* bdrv_set_backing_hd() to fail. */
89
block_job_remove_all_bdrv(bjob);
90
91
- job_completed(job, ret, NULL);
92
+ job_completed(job, ret);
93
g_free(data);
94
95
/* If bdrv_drop_intermediate() didn't already do that, remove the commit
96
diff --git a/block/create.c b/block/create.c
97
index XXXXXXX..XXXXXXX 100644
98
--- a/block/create.c
99
+++ b/block/create.c
100
@@ -XXX,XX +XXX,XX @@ typedef struct BlockdevCreateJob {
101
BlockDriver *drv;
102
BlockdevCreateOptions *opts;
103
int ret;
104
- Error *err;
105
} BlockdevCreateJob;
106
107
static void blockdev_create_complete(Job *job, void *opaque)
108
{
109
BlockdevCreateJob *s = container_of(job, BlockdevCreateJob, common);
110
111
- job_completed(job, s->ret, s->err);
112
+ job_completed(job, s->ret);
113
}
114
115
static int coroutine_fn blockdev_create_run(Job *job, Error **errp)
116
@@ -XXX,XX +XXX,XX @@ static int coroutine_fn blockdev_create_run(Job *job, Error **errp)
117
BlockdevCreateJob *s = container_of(job, BlockdevCreateJob, common);
118
119
job_progress_set_remaining(&s->common, 1);
120
- s->ret = s->drv->bdrv_co_create(s->opts, &s->err);
121
+ s->ret = s->drv->bdrv_co_create(s->opts, errp);
122
job_progress_update(&s->common, 1);
123
124
qapi_free_BlockdevCreateOptions(s->opts);
125
diff --git a/block/mirror.c b/block/mirror.c
126
index XXXXXXX..XXXXXXX 100644
127
--- a/block/mirror.c
128
+++ b/block/mirror.c
129
@@ -XXX,XX +XXX,XX @@ static void mirror_exit(Job *job, void *opaque)
130
blk_insert_bs(bjob->blk, mirror_top_bs, &error_abort);
131
132
bs_opaque->job = NULL;
133
- job_completed(job, data->ret, NULL);
134
+ job_completed(job, data->ret);
135
136
g_free(data);
137
bdrv_drained_end(src);
138
diff --git a/block/stream.c b/block/stream.c
139
index XXXXXXX..XXXXXXX 100644
140
--- a/block/stream.c
141
+++ b/block/stream.c
142
@@ -XXX,XX +XXX,XX @@ out:
143
}
144
145
g_free(s->backing_file_str);
146
- job_completed(job, data->ret, NULL);
147
+ job_completed(job, data->ret);
148
g_free(data);
149
}
150
151
diff --git a/job-qmp.c b/job-qmp.c
152
index XXXXXXX..XXXXXXX 100644
153
--- a/job-qmp.c
154
+++ b/job-qmp.c
155
@@ -XXX,XX +XXX,XX @@ static JobInfo *job_query_single(Job *job, Error **errp)
156
.status = job->status,
157
.current_progress = job->progress_current,
158
.total_progress = job->progress_total,
159
- .has_error = !!job->error,
160
- .error = g_strdup(job->error),
161
+ .has_error = !!job->err,
162
+ .error = job->err ? \
163
+ g_strdup(error_get_pretty(job->err)) : NULL,
164
};
165
166
return info;
167
diff --git a/job.c b/job.c
168
index XXXXXXX..XXXXXXX 100644
169
--- a/job.c
170
+++ b/job.c
171
@@ -XXX,XX +XXX,XX @@ void job_unref(Job *job)
172
173
QLIST_REMOVE(job, job_list);
174
175
- g_free(job->error);
176
+ error_free(job->err);
177
g_free(job->id);
178
g_free(job);
179
}
180
@@ -XXX,XX +XXX,XX @@ static void coroutine_fn job_co_entry(void *opaque)
181
182
assert(job && job->driver && job->driver->run);
183
job_pause_point(job);
184
- job->ret = job->driver->run(job, NULL);
185
+ job->ret = job->driver->run(job, &job->err);
186
}
187
188
189
@@ -XXX,XX +XXX,XX @@ static void job_update_rc(Job *job)
190
job->ret = -ECANCELED;
191
}
192
if (job->ret) {
193
- if (!job->error) {
194
- job->error = g_strdup(strerror(-job->ret));
195
+ if (!job->err) {
196
+ error_setg(&job->err, "%s", strerror(-job->ret));
197
}
198
job_state_transition(job, JOB_STATUS_ABORTING);
199
}
200
@@ -XXX,XX +XXX,XX @@ static void job_completed_txn_success(Job *job)
201
}
202
}
203
204
-void job_completed(Job *job, int ret, Error *error)
205
+void job_completed(Job *job, int ret)
206
{
207
assert(job && job->txn && !job_is_completed(job));
208
209
job->ret = ret;
210
- if (error) {
211
- assert(job->ret < 0);
212
- job->error = g_strdup(error_get_pretty(error));
213
- error_free(error);
214
- }
215
-
216
job_update_rc(job);
217
trace_job_completed(job, ret, job->ret);
218
if (job->ret) {
219
@@ -XXX,XX +XXX,XX @@ void job_cancel(Job *job, bool force)
220
}
221
job_cancel_async(job, force);
222
if (!job_started(job)) {
223
- job_completed(job, -ECANCELED, NULL);
224
+ job_completed(job, -ECANCELED);
225
} else if (job->deferred_to_main_loop) {
226
job_completed_txn_abort(job);
227
} else {
228
diff --git a/tests/test-bdrv-drain.c b/tests/test-bdrv-drain.c
229
index XXXXXXX..XXXXXXX 100644
230
--- a/tests/test-bdrv-drain.c
231
+++ b/tests/test-bdrv-drain.c
232
@@ -XXX,XX +XXX,XX @@ typedef struct TestBlockJob {
233
234
static void test_job_completed(Job *job, void *opaque)
235
{
236
- job_completed(job, 0, NULL);
237
+ job_completed(job, 0);
238
}
239
240
static int coroutine_fn test_job_run(Job *job, Error **errp)
241
diff --git a/tests/test-blockjob-txn.c b/tests/test-blockjob-txn.c
242
index XXXXXXX..XXXXXXX 100644
243
--- a/tests/test-blockjob-txn.c
244
+++ b/tests/test-blockjob-txn.c
245
@@ -XXX,XX +XXX,XX @@ static void test_block_job_complete(Job *job, void *opaque)
246
rc = -ECANCELED;
247
}
248
249
- job_completed(job, rc, NULL);
250
+ job_completed(job, rc);
251
bdrv_unref(bs);
252
}
253
254
diff --git a/tests/test-blockjob.c b/tests/test-blockjob.c
255
index XXXXXXX..XXXXXXX 100644
256
--- a/tests/test-blockjob.c
257
+++ b/tests/test-blockjob.c
258
@@ -XXX,XX +XXX,XX @@ static void cancel_job_completed(Job *job, void *opaque)
259
{
260
CancelJob *s = opaque;
261
s->completed = true;
262
- job_completed(job, 0, NULL);
263
+ job_completed(job, 0);
264
}
265
266
static void cancel_job_complete(Job *job, Error **errp)
267
--
268
2.17.1
269
270
diff view generated by jsdifflib
New patch
1
From: John Snow <jsnow@redhat.com>
1
2
3
All jobs do the same thing when they leave their running loop:
4
- Store the return code in a structure
5
- wait to receive this structure in the main thread
6
- signal job completion via job_completed
7
8
Few jobs do anything beyond exactly this. Consolidate this exit
9
logic for a net reduction in SLOC.
10
11
More seriously, when we utilize job_defer_to_main_loop_bh to call
12
a function that calls job_completed, job_finalize_single will run
13
in a context where it has recursively taken the aio_context lock,
14
which can cause hangs if it puts down a reference that causes a flush.
15
16
You can observe this in practice by looking at mirror_exit's careful
17
placement of job_completed and bdrv_unref calls.
18
19
If we centralize job exiting, we can signal job completion from outside
20
of the aio_context, which should allow for job cleanup code to run with
21
only one lock, which makes cleanup callbacks less tricky to write.
22
23
Signed-off-by: John Snow <jsnow@redhat.com>
24
Reviewed-by: Max Reitz <mreitz@redhat.com>
25
Message-id: 20180830015734.19765-4-jsnow@redhat.com
26
Reviewed-by: Jeff Cody <jcody@redhat.com>
27
Signed-off-by: Max Reitz <mreitz@redhat.com>
28
---
29
include/qemu/job.h | 11 +++++++++++
30
job.c | 18 ++++++++++++++++++
31
2 files changed, 29 insertions(+)
32
33
diff --git a/include/qemu/job.h b/include/qemu/job.h
34
index XXXXXXX..XXXXXXX 100644
35
--- a/include/qemu/job.h
36
+++ b/include/qemu/job.h
37
@@ -XXX,XX +XXX,XX @@ struct JobDriver {
38
*/
39
void (*drain)(Job *job);
40
41
+ /**
42
+ * If the callback is not NULL, exit will be invoked from the main thread
43
+ * when the job's coroutine has finished, but before transactional
44
+ * convergence; before @prepare or @abort.
45
+ *
46
+ * FIXME TODO: This callback is only temporary to transition remaining jobs
47
+ * to prepare/commit/abort/clean callbacks and will be removed before 3.1.
48
+ * is released.
49
+ */
50
+ void (*exit)(Job *job);
51
+
52
/**
53
* If the callback is not NULL, prepare will be invoked when all the jobs
54
* belonging to the same transaction complete; or upon this job's completion
55
diff --git a/job.c b/job.c
56
index XXXXXXX..XXXXXXX 100644
57
--- a/job.c
58
+++ b/job.c
59
@@ -XXX,XX +XXX,XX @@ void job_drain(Job *job)
60
}
61
}
62
63
+static void job_exit(void *opaque)
64
+{
65
+ Job *job = (Job *)opaque;
66
+ AioContext *aio_context = job->aio_context;
67
+
68
+ if (job->driver->exit) {
69
+ aio_context_acquire(aio_context);
70
+ job->driver->exit(job);
71
+ aio_context_release(aio_context);
72
+ }
73
+ job_completed(job, job->ret);
74
+}
75
76
/**
77
* All jobs must allow a pause point before entering their job proper. This
78
@@ -XXX,XX +XXX,XX @@ static void coroutine_fn job_co_entry(void *opaque)
79
assert(job && job->driver && job->driver->run);
80
job_pause_point(job);
81
job->ret = job->driver->run(job, &job->err);
82
+ if (!job->deferred_to_main_loop) {
83
+ job->deferred_to_main_loop = true;
84
+ aio_bh_schedule_oneshot(qemu_get_aio_context(),
85
+ job_exit,
86
+ job);
87
+ }
88
}
89
90
91
--
92
2.17.1
93
94
diff view generated by jsdifflib
New patch
1
From: John Snow <jsnow@redhat.com>
1
2
3
Change the manual deferment to commit_complete into the implicit
4
callback to job_exit, renaming commit_complete to commit_exit.
5
6
This conversion does change the timing of when job_completed is
7
called to after the bdrv_replace_node and bdrv_unref calls, which
8
could have implications for bjob->blk which will now be put down
9
after this cleanup.
10
11
Kevin highlights that we did not take any permissions for that backend
12
at job creation time, so it is safe to reorder these operations.
13
14
Signed-off-by: John Snow <jsnow@redhat.com>
15
Reviewed-by: Max Reitz <mreitz@redhat.com>
16
Message-id: 20180830015734.19765-5-jsnow@redhat.com
17
Reviewed-by: Jeff Cody <jcody@redhat.com>
18
Signed-off-by: Max Reitz <mreitz@redhat.com>
19
---
20
block/commit.c | 22 +++++-----------------
21
1 file changed, 5 insertions(+), 17 deletions(-)
22
23
diff --git a/block/commit.c b/block/commit.c
24
index XXXXXXX..XXXXXXX 100644
25
--- a/block/commit.c
26
+++ b/block/commit.c
27
@@ -XXX,XX +XXX,XX @@ static int coroutine_fn commit_populate(BlockBackend *bs, BlockBackend *base,
28
return 0;
29
}
30
31
-typedef struct {
32
- int ret;
33
-} CommitCompleteData;
34
-
35
-static void commit_complete(Job *job, void *opaque)
36
+static void commit_exit(Job *job)
37
{
38
CommitBlockJob *s = container_of(job, CommitBlockJob, common.job);
39
BlockJob *bjob = &s->common;
40
- CommitCompleteData *data = opaque;
41
BlockDriverState *top = blk_bs(s->top);
42
BlockDriverState *base = blk_bs(s->base);
43
BlockDriverState *commit_top_bs = s->commit_top_bs;
44
- int ret = data->ret;
45
bool remove_commit_top_bs = false;
46
47
/* Make sure commit_top_bs and top stay around until bdrv_replace_node() */
48
@@ -XXX,XX +XXX,XX @@ static void commit_complete(Job *job, void *opaque)
49
* the normal backing chain can be restored. */
50
blk_unref(s->base);
51
52
- if (!job_is_cancelled(job) && ret == 0) {
53
+ if (!job_is_cancelled(job) && job->ret == 0) {
54
/* success */
55
- ret = bdrv_drop_intermediate(s->commit_top_bs, base,
56
- s->backing_file_str);
57
+ job->ret = bdrv_drop_intermediate(s->commit_top_bs, base,
58
+ s->backing_file_str);
59
} else {
60
/* XXX Can (or should) we somehow keep 'consistent read' blocked even
61
* after the failed/cancelled commit job is gone? If we already wrote
62
@@ -XXX,XX +XXX,XX @@ static void commit_complete(Job *job, void *opaque)
63
* bdrv_set_backing_hd() to fail. */
64
block_job_remove_all_bdrv(bjob);
65
66
- job_completed(job, ret);
67
- g_free(data);
68
-
69
/* If bdrv_drop_intermediate() didn't already do that, remove the commit
70
* filter driver from the backing chain. Do this as the final step so that
71
* the 'consistent read' permission can be granted. */
72
@@ -XXX,XX +XXX,XX @@ static void commit_complete(Job *job, void *opaque)
73
static int coroutine_fn commit_run(Job *job, Error **errp)
74
{
75
CommitBlockJob *s = container_of(job, CommitBlockJob, common.job);
76
- CommitCompleteData *data;
77
int64_t offset;
78
uint64_t delay_ns = 0;
79
int ret = 0;
80
@@ -XXX,XX +XXX,XX @@ static int coroutine_fn commit_run(Job *job, Error **errp)
81
out:
82
qemu_vfree(buf);
83
84
- data = g_malloc(sizeof(*data));
85
- data->ret = ret;
86
- job_defer_to_main_loop(&s->common.job, commit_complete, data);
87
return ret;
88
}
89
90
@@ -XXX,XX +XXX,XX @@ static const BlockJobDriver commit_job_driver = {
91
.user_resume = block_job_user_resume,
92
.drain = block_job_drain,
93
.run = commit_run,
94
+ .exit = commit_exit,
95
},
96
};
97
98
--
99
2.17.1
100
101
diff view generated by jsdifflib
1
From: Nicolas Saenz Julienne <nsaenzju@redhat.com>
1
From: John Snow <jsnow@redhat.com>
2
2
3
The thread pool regulates itself: when idle, it kills threads until
3
Change the manual deferment to mirror_exit into the implicit
4
empty, when in demand, it creates new threads until full. This behaviour
4
callback to job_exit and the mirror_exit callback.
5
doesn't play well with latency sensitive workloads where the price of
6
creating a new thread is too high. For example, when paired with qemu's
7
'-mlock', or using safety features like SafeStack, creating a new thread
8
has been measured take multiple milliseconds.
9
5
10
In order to mitigate this let's introduce a new 'EventLoopBase'
6
This does change the order of some bdrv_unref calls and job_completed,
11
property to set the thread pool size. The threads will be created during
7
but thanks to the new context in which we call .exit, this is safe to
12
the pool's initialization or upon updating the property's value, remain
8
defer the possible flushing of any nodes to the job_finalize_single
13
available during its lifetime regardless of demand, and destroyed upon
9
cleanup stage.
14
freeing it. A properly characterized workload will then be able to
15
configure the pool to avoid any latency spikes.
16
10
17
Signed-off-by: Nicolas Saenz Julienne <nsaenzju@redhat.com>
11
Signed-off-by: John Snow <jsnow@redhat.com>
18
Reviewed-by: Stefan Hajnoczi <stefanha@redhat.com>
12
Message-id: 20180830015734.19765-6-jsnow@redhat.com
19
Acked-by: Markus Armbruster <armbru@redhat.com>
13
Reviewed-by: Max Reitz <mreitz@redhat.com>
20
Message-id: 20220425075723.20019-4-nsaenzju@redhat.com
14
Reviewed-by: Jeff Cody <jcody@redhat.com>
21
Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
15
Signed-off-by: Max Reitz <mreitz@redhat.com>
22
---
16
---
23
qapi/qom.json | 10 +++++-
17
block/mirror.c | 29 +++++++++++------------------
24
include/block/aio.h | 10 ++++++
18
1 file changed, 11 insertions(+), 18 deletions(-)
25
include/block/thread-pool.h | 3 ++
26
include/sysemu/event-loop-base.h | 4 +++
27
event-loop-base.c | 23 +++++++++++++
28
iothread.c | 3 ++
29
util/aio-posix.c | 1 +
30
util/async.c | 20 ++++++++++++
31
util/main-loop.c | 9 ++++++
32
util/thread-pool.c | 55 +++++++++++++++++++++++++++++---
33
10 files changed, 133 insertions(+), 5 deletions(-)
34
19
35
diff --git a/qapi/qom.json b/qapi/qom.json
20
diff --git a/block/mirror.c b/block/mirror.c
36
index XXXXXXX..XXXXXXX 100644
21
index XXXXXXX..XXXXXXX 100644
37
--- a/qapi/qom.json
22
--- a/block/mirror.c
38
+++ b/qapi/qom.json
23
+++ b/block/mirror.c
39
@@ -XXX,XX +XXX,XX @@
24
@@ -XXX,XX +XXX,XX @@ static void mirror_wait_for_all_io(MirrorBlockJob *s)
40
# 0 means that the engine will use its default.
25
}
41
# (default: 0)
26
}
42
#
27
43
+# @thread-pool-min: minimum number of threads reserved in the thread pool
28
-typedef struct {
44
+# (default:0)
29
- int ret;
45
+#
30
-} MirrorExitData;
46
+# @thread-pool-max: maximum number of threads the thread pool can contain
31
-
47
+# (default:64)
32
-static void mirror_exit(Job *job, void *opaque)
48
+#
33
+static void mirror_exit(Job *job)
49
# Since: 7.1
34
{
50
##
35
MirrorBlockJob *s = container_of(job, MirrorBlockJob, common.job);
51
{ 'struct': 'EventLoopBaseProperties',
36
BlockJob *bjob = &s->common;
52
- 'data': { '*aio-max-batch': 'int' } }
37
- MirrorExitData *data = opaque;
53
+ 'data': { '*aio-max-batch': 'int',
38
MirrorBDSOpaque *bs_opaque = s->mirror_top_bs->opaque;
54
+ '*thread-pool-min': 'int',
39
AioContext *replace_aio_context = NULL;
55
+ '*thread-pool-max': 'int' } }
40
BlockDriverState *src = s->mirror_top_bs->backing->bs;
56
41
BlockDriverState *target_bs = blk_bs(s->target);
57
##
42
BlockDriverState *mirror_top_bs = s->mirror_top_bs;
58
# @IothreadProperties:
43
Error *local_err = NULL;
59
diff --git a/include/block/aio.h b/include/block/aio.h
44
+ int ret = job->ret;
60
index XXXXXXX..XXXXXXX 100644
45
61
--- a/include/block/aio.h
46
bdrv_release_dirty_bitmap(src, s->dirty_bitmap);
62
+++ b/include/block/aio.h
47
63
@@ -XXX,XX +XXX,XX @@ struct AioContext {
48
- /* Make sure that the source BDS doesn't go away before we called
64
QSLIST_HEAD(, Coroutine) scheduled_coroutines;
49
- * job_completed(). */
65
QEMUBH *co_schedule_bh;
50
+ /* Make sure that the source BDS doesn't go away during bdrv_replace_node,
66
51
+ * before we can call bdrv_drained_end */
67
+ int thread_pool_min;
52
bdrv_ref(src);
68
+ int thread_pool_max;
53
bdrv_ref(mirror_top_bs);
69
/* Thread pool for performing work and receiving completion callbacks.
54
bdrv_ref(target_bs);
70
* Has its own locking.
55
@@ -XXX,XX +XXX,XX @@ static void mirror_exit(Job *job, void *opaque)
71
*/
56
bdrv_set_backing_hd(target_bs, backing, &local_err);
72
@@ -XXX,XX +XXX,XX @@ void aio_context_set_poll_params(AioContext *ctx, int64_t max_ns,
57
if (local_err) {
73
void aio_context_set_aio_params(AioContext *ctx, int64_t max_batch,
58
error_report_err(local_err);
74
Error **errp);
59
- data->ret = -EPERM;
75
60
+ ret = -EPERM;
76
+/**
61
}
77
+ * aio_context_set_thread_pool_params:
62
}
78
+ * @ctx: the aio context
63
}
79
+ * @min: min number of threads to have readily available in the thread pool
64
@@ -XXX,XX +XXX,XX @@ static void mirror_exit(Job *job, void *opaque)
80
+ * @min: max number of threads the thread pool can contain
65
aio_context_acquire(replace_aio_context);
81
+ */
66
}
82
+void aio_context_set_thread_pool_params(AioContext *ctx, int64_t min,
67
83
+ int64_t max, Error **errp);
68
- if (s->should_complete && data->ret == 0) {
84
#endif
69
+ if (s->should_complete && ret == 0) {
85
diff --git a/include/block/thread-pool.h b/include/block/thread-pool.h
70
BlockDriverState *to_replace = src;
86
index XXXXXXX..XXXXXXX 100644
71
if (s->to_replace) {
87
--- a/include/block/thread-pool.h
72
to_replace = s->to_replace;
88
+++ b/include/block/thread-pool.h
73
@@ -XXX,XX +XXX,XX @@ static void mirror_exit(Job *job, void *opaque)
89
@@ -XXX,XX +XXX,XX @@
74
bdrv_drained_end(target_bs);
90
75
if (local_err) {
91
#include "block/block.h"
76
error_report_err(local_err);
92
77
- data->ret = -EPERM;
93
+#define THREAD_POOL_MAX_THREADS_DEFAULT 64
78
+ ret = -EPERM;
79
}
80
}
81
if (s->to_replace) {
82
@@ -XXX,XX +XXX,XX @@ static void mirror_exit(Job *job, void *opaque)
83
blk_insert_bs(bjob->blk, mirror_top_bs, &error_abort);
84
85
bs_opaque->job = NULL;
86
- job_completed(job, data->ret);
87
88
- g_free(data);
89
bdrv_drained_end(src);
90
bdrv_unref(mirror_top_bs);
91
bdrv_unref(src);
94
+
92
+
95
typedef int ThreadPoolFunc(void *opaque);
93
+ job->ret = ret;
96
97
typedef struct ThreadPool ThreadPool;
98
@@ -XXX,XX +XXX,XX @@ BlockAIOCB *thread_pool_submit_aio(ThreadPool *pool,
99
int coroutine_fn thread_pool_submit_co(ThreadPool *pool,
100
ThreadPoolFunc *func, void *arg);
101
void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func, void *arg);
102
+void thread_pool_update_params(ThreadPool *pool, struct AioContext *ctx);
103
104
#endif
105
diff --git a/include/sysemu/event-loop-base.h b/include/sysemu/event-loop-base.h
106
index XXXXXXX..XXXXXXX 100644
107
--- a/include/sysemu/event-loop-base.h
108
+++ b/include/sysemu/event-loop-base.h
109
@@ -XXX,XX +XXX,XX @@ struct EventLoopBase {
110
111
/* AioContext AIO engine parameters */
112
int64_t aio_max_batch;
113
+
114
+ /* AioContext thread pool parameters */
115
+ int64_t thread_pool_min;
116
+ int64_t thread_pool_max;
117
};
118
#endif
119
diff --git a/event-loop-base.c b/event-loop-base.c
120
index XXXXXXX..XXXXXXX 100644
121
--- a/event-loop-base.c
122
+++ b/event-loop-base.c
123
@@ -XXX,XX +XXX,XX @@
124
#include "qemu/osdep.h"
125
#include "qom/object_interfaces.h"
126
#include "qapi/error.h"
127
+#include "block/thread-pool.h"
128
#include "sysemu/event-loop-base.h"
129
130
typedef struct {
131
@@ -XXX,XX +XXX,XX @@ typedef struct {
132
ptrdiff_t offset; /* field's byte offset in EventLoopBase struct */
133
} EventLoopBaseParamInfo;
134
135
+static void event_loop_base_instance_init(Object *obj)
136
+{
137
+ EventLoopBase *base = EVENT_LOOP_BASE(obj);
138
+
139
+ base->thread_pool_max = THREAD_POOL_MAX_THREADS_DEFAULT;
140
+}
141
+
142
static EventLoopBaseParamInfo aio_max_batch_info = {
143
"aio-max-batch", offsetof(EventLoopBase, aio_max_batch),
144
};
145
+static EventLoopBaseParamInfo thread_pool_min_info = {
146
+ "thread-pool-min", offsetof(EventLoopBase, thread_pool_min),
147
+};
148
+static EventLoopBaseParamInfo thread_pool_max_info = {
149
+ "thread-pool-max", offsetof(EventLoopBase, thread_pool_max),
150
+};
151
152
static void event_loop_base_get_param(Object *obj, Visitor *v,
153
const char *name, void *opaque, Error **errp)
154
@@ -XXX,XX +XXX,XX @@ static void event_loop_base_class_init(ObjectClass *klass, void *class_data)
155
event_loop_base_get_param,
156
event_loop_base_set_param,
157
NULL, &aio_max_batch_info);
158
+ object_class_property_add(klass, "thread-pool-min", "int",
159
+ event_loop_base_get_param,
160
+ event_loop_base_set_param,
161
+ NULL, &thread_pool_min_info);
162
+ object_class_property_add(klass, "thread-pool-max", "int",
163
+ event_loop_base_get_param,
164
+ event_loop_base_set_param,
165
+ NULL, &thread_pool_max_info);
166
}
94
}
167
95
168
static const TypeInfo event_loop_base_info = {
96
static void mirror_throttle(MirrorBlockJob *s)
169
.name = TYPE_EVENT_LOOP_BASE,
97
@@ -XXX,XX +XXX,XX @@ static int mirror_flush(MirrorBlockJob *s)
170
.parent = TYPE_OBJECT,
98
static int coroutine_fn mirror_run(Job *job, Error **errp)
171
.instance_size = sizeof(EventLoopBase),
99
{
172
+ .instance_init = event_loop_base_instance_init,
100
MirrorBlockJob *s = container_of(job, MirrorBlockJob, common.job);
173
.class_size = sizeof(EventLoopBaseClass),
101
- MirrorExitData *data;
174
.class_init = event_loop_base_class_init,
102
BlockDriverState *bs = s->mirror_top_bs->backing->bs;
175
.abstract = true,
103
BlockDriverState *target_bs = blk_bs(s->target);
176
diff --git a/iothread.c b/iothread.c
104
bool need_drain = true;
177
index XXXXXXX..XXXXXXX 100644
105
@@ -XXX,XX +XXX,XX @@ immediate_exit:
178
--- a/iothread.c
106
g_free(s->in_flight_bitmap);
179
+++ b/iothread.c
107
bdrv_dirty_iter_free(s->dbi);
180
@@ -XXX,XX +XXX,XX @@ static void iothread_set_aio_context_params(EventLoopBase *base, Error **errp)
108
181
aio_context_set_aio_params(iothread->ctx,
109
- data = g_malloc(sizeof(*data));
182
iothread->parent_obj.aio_max_batch,
110
- data->ret = ret;
183
errp);
111
-
184
+
112
if (need_drain) {
185
+ aio_context_set_thread_pool_params(iothread->ctx, base->thread_pool_min,
113
bdrv_drained_begin(bs);
186
+ base->thread_pool_max, errp);
114
}
115
116
- job_defer_to_main_loop(&s->common.job, mirror_exit, data);
117
return ret;
187
}
118
}
188
119
189
120
@@ -XXX,XX +XXX,XX @@ static const BlockJobDriver mirror_job_driver = {
190
diff --git a/util/aio-posix.c b/util/aio-posix.c
121
.user_resume = block_job_user_resume,
191
index XXXXXXX..XXXXXXX 100644
122
.drain = block_job_drain,
192
--- a/util/aio-posix.c
123
.run = mirror_run,
193
+++ b/util/aio-posix.c
124
+ .exit = mirror_exit,
194
@@ -XXX,XX +XXX,XX @@
125
.pause = mirror_pause,
195
126
.complete = mirror_complete,
196
#include "qemu/osdep.h"
127
},
197
#include "block/block.h"
128
@@ -XXX,XX +XXX,XX @@ static const BlockJobDriver commit_active_job_driver = {
198
+#include "block/thread-pool.h"
129
.user_resume = block_job_user_resume,
199
#include "qemu/main-loop.h"
130
.drain = block_job_drain,
200
#include "qemu/rcu.h"
131
.run = mirror_run,
201
#include "qemu/rcu_queue.h"
132
+ .exit = mirror_exit,
202
diff --git a/util/async.c b/util/async.c
133
.pause = mirror_pause,
203
index XXXXXXX..XXXXXXX 100644
134
.complete = mirror_complete,
204
--- a/util/async.c
135
},
205
+++ b/util/async.c
206
@@ -XXX,XX +XXX,XX @@ AioContext *aio_context_new(Error **errp)
207
208
ctx->aio_max_batch = 0;
209
210
+ ctx->thread_pool_min = 0;
211
+ ctx->thread_pool_max = THREAD_POOL_MAX_THREADS_DEFAULT;
212
+
213
return ctx;
214
fail:
215
g_source_destroy(&ctx->source);
216
@@ -XXX,XX +XXX,XX @@ void qemu_set_current_aio_context(AioContext *ctx)
217
assert(!get_my_aiocontext());
218
set_my_aiocontext(ctx);
219
}
220
+
221
+void aio_context_set_thread_pool_params(AioContext *ctx, int64_t min,
222
+ int64_t max, Error **errp)
223
+{
224
+
225
+ if (min > max || !max || min > INT_MAX || max > INT_MAX) {
226
+ error_setg(errp, "bad thread-pool-min/thread-pool-max values");
227
+ return;
228
+ }
229
+
230
+ ctx->thread_pool_min = min;
231
+ ctx->thread_pool_max = max;
232
+
233
+ if (ctx->thread_pool) {
234
+ thread_pool_update_params(ctx->thread_pool, ctx);
235
+ }
236
+}
237
diff --git a/util/main-loop.c b/util/main-loop.c
238
index XXXXXXX..XXXXXXX 100644
239
--- a/util/main-loop.c
240
+++ b/util/main-loop.c
241
@@ -XXX,XX +XXX,XX @@
242
#include "sysemu/replay.h"
243
#include "qemu/main-loop.h"
244
#include "block/aio.h"
245
+#include "block/thread-pool.h"
246
#include "qemu/error-report.h"
247
#include "qemu/queue.h"
248
#include "qemu/compiler.h"
249
@@ -XXX,XX +XXX,XX @@ int qemu_init_main_loop(Error **errp)
250
251
static void main_loop_update_params(EventLoopBase *base, Error **errp)
252
{
253
+ ERRP_GUARD();
254
+
255
if (!qemu_aio_context) {
256
error_setg(errp, "qemu aio context not ready");
257
return;
258
}
259
260
aio_context_set_aio_params(qemu_aio_context, base->aio_max_batch, errp);
261
+ if (*errp) {
262
+ return;
263
+ }
264
+
265
+ aio_context_set_thread_pool_params(qemu_aio_context, base->thread_pool_min,
266
+ base->thread_pool_max, errp);
267
}
268
269
MainLoop *mloop;
270
diff --git a/util/thread-pool.c b/util/thread-pool.c
271
index XXXXXXX..XXXXXXX 100644
272
--- a/util/thread-pool.c
273
+++ b/util/thread-pool.c
274
@@ -XXX,XX +XXX,XX @@ struct ThreadPool {
275
QemuMutex lock;
276
QemuCond worker_stopped;
277
QemuSemaphore sem;
278
- int max_threads;
279
QEMUBH *new_thread_bh;
280
281
/* The following variables are only accessed from one AioContext. */
282
@@ -XXX,XX +XXX,XX @@ struct ThreadPool {
283
int new_threads; /* backlog of threads we need to create */
284
int pending_threads; /* threads created but not running yet */
285
bool stopping;
286
+ int min_threads;
287
+ int max_threads;
288
};
289
290
+static inline bool back_to_sleep(ThreadPool *pool, int ret)
291
+{
292
+ /*
293
+ * The semaphore timed out, we should exit the loop except when:
294
+ * - There is work to do, we raced with the signal.
295
+ * - The max threads threshold just changed, we raced with the signal.
296
+ * - The thread pool forces a minimum number of readily available threads.
297
+ */
298
+ if (ret == -1 && (!QTAILQ_EMPTY(&pool->request_list) ||
299
+ pool->cur_threads > pool->max_threads ||
300
+ pool->cur_threads <= pool->min_threads)) {
301
+ return true;
302
+ }
303
+
304
+ return false;
305
+}
306
+
307
static void *worker_thread(void *opaque)
308
{
309
ThreadPool *pool = opaque;
310
@@ -XXX,XX +XXX,XX @@ static void *worker_thread(void *opaque)
311
ret = qemu_sem_timedwait(&pool->sem, 10000);
312
qemu_mutex_lock(&pool->lock);
313
pool->idle_threads--;
314
- } while (ret == -1 && !QTAILQ_EMPTY(&pool->request_list));
315
- if (ret == -1 || pool->stopping) {
316
+ } while (back_to_sleep(pool, ret));
317
+ if (ret == -1 || pool->stopping ||
318
+ pool->cur_threads > pool->max_threads) {
319
break;
320
}
321
322
@@ -XXX,XX +XXX,XX @@ void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func, void *arg)
323
thread_pool_submit_aio(pool, func, arg, NULL, NULL);
324
}
325
326
+void thread_pool_update_params(ThreadPool *pool, AioContext *ctx)
327
+{
328
+ qemu_mutex_lock(&pool->lock);
329
+
330
+ pool->min_threads = ctx->thread_pool_min;
331
+ pool->max_threads = ctx->thread_pool_max;
332
+
333
+ /*
334
+ * We either have to:
335
+ * - Increase the number available of threads until over the min_threads
336
+ * threshold.
337
+ * - Decrease the number of available threads until under the max_threads
338
+ * threshold.
339
+ * - Do nothing. The current number of threads fall in between the min and
340
+ * max thresholds. We'll let the pool manage itself.
341
+ */
342
+ for (int i = pool->cur_threads; i < pool->min_threads; i++) {
343
+ spawn_thread(pool);
344
+ }
345
+
346
+ for (int i = pool->cur_threads; i > pool->max_threads; i--) {
347
+ qemu_sem_post(&pool->sem);
348
+ }
349
+
350
+ qemu_mutex_unlock(&pool->lock);
351
+}
352
+
353
static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx)
354
{
355
if (!ctx) {
356
@@ -XXX,XX +XXX,XX @@ static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx)
357
qemu_mutex_init(&pool->lock);
358
qemu_cond_init(&pool->worker_stopped);
359
qemu_sem_init(&pool->sem, 0);
360
- pool->max_threads = 64;
361
pool->new_thread_bh = aio_bh_new(ctx, spawn_thread_bh_fn, pool);
362
363
QLIST_INIT(&pool->head);
364
QTAILQ_INIT(&pool->request_list);
365
+
366
+ thread_pool_update_params(pool, ctx);
367
}
368
369
ThreadPool *thread_pool_new(AioContext *ctx)
370
--
136
--
371
2.35.1
137
2.17.1
138
139
diff view generated by jsdifflib
1
From: Nicolas Saenz Julienne <nsaenzju@redhat.com>
1
From: John Snow <jsnow@redhat.com>
2
2
3
'event-loop-base' provides basic property handling for all 'AioContext'
3
Utilize the job_exit shim by not calling job_defer_to_main_loop, and
4
based event loops. So let's define a new 'MainLoopClass' that inherits
4
where applicable, converting the deferred callback into the job_exit
5
from it. This will permit tweaking the main loop's properties through
5
callback.
6
qapi as well as through the command line using the '-object' keyword[1].
6
7
Only one instance of 'MainLoopClass' might be created at any time.
7
This converts backup, stream, create, and the unit tests all at once.
8
8
Most of these jobs do not see any changes to the order in which they
9
'EventLoopBaseClass' learns a new callback, 'can_be_deleted()' so as to
9
clean up their resources, except the test-blockjob-txn test, which
10
mark 'MainLoop' as non-deletable.
10
now puts down its bs before job_completed is called.
11
11
12
[1] For example:
12
This is safe for the same reason the reordering in the mirror job is
13
-object main-loop,id=main-loop,aio-max-batch=<value>
13
safe, because job_completed no longer runs under two locks, making
14
14
the unref safe even if it causes a flush.
15
Signed-off-by: Nicolas Saenz Julienne <nsaenzju@redhat.com>
15
16
Reviewed-by: Stefan Hajnoczi <stefanha@redhat.com>
16
Signed-off-by: John Snow <jsnow@redhat.com>
17
Acked-by: Markus Armbruster <armbru@redhat.com>
17
Reviewed-by: Max Reitz <mreitz@redhat.com>
18
Message-id: 20220425075723.20019-3-nsaenzju@redhat.com
18
Message-id: 20180830015734.19765-7-jsnow@redhat.com
19
Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
19
Signed-off-by: Max Reitz <mreitz@redhat.com>
20
---
20
---
21
qapi/qom.json | 13 ++++++++
21
block/backup.c | 16 ----------------
22
meson.build | 3 +-
22
block/create.c | 14 +++-----------
23
include/qemu/main-loop.h | 10 ++++++
23
block/stream.c | 22 +++++++---------------
24
include/sysemu/event-loop-base.h | 1 +
24
tests/test-bdrv-drain.c | 6 ------
25
event-loop-base.c | 13 ++++++++
25
tests/test-blockjob-txn.c | 11 ++---------
26
util/main-loop.c | 56 ++++++++++++++++++++++++++++++++
26
tests/test-blockjob.c | 10 ++++------
27
6 files changed, 95 insertions(+), 1 deletion(-)
27
6 files changed, 16 insertions(+), 63 deletions(-)
28
28
29
diff --git a/qapi/qom.json b/qapi/qom.json
29
diff --git a/block/backup.c b/block/backup.c
30
index XXXXXXX..XXXXXXX 100644
30
index XXXXXXX..XXXXXXX 100644
31
--- a/qapi/qom.json
31
--- a/block/backup.c
32
+++ b/qapi/qom.json
32
+++ b/block/backup.c
33
@@ -XXX,XX +XXX,XX @@
33
@@ -XXX,XX +XXX,XX @@ static BlockErrorAction backup_error_action(BackupBlockJob *job,
34
'*poll-grow': 'int',
34
}
35
'*poll-shrink': 'int' } }
35
}
36
36
37
+##
37
-typedef struct {
38
+# @MainLoopProperties:
38
- int ret;
39
+#
39
-} BackupCompleteData;
40
+# Properties for the main-loop object.
40
-
41
+#
41
-static void backup_complete(Job *job, void *opaque)
42
+# Since: 7.1
42
-{
43
+##
43
- BackupCompleteData *data = opaque;
44
+{ 'struct': 'MainLoopProperties',
44
-
45
+ 'base': 'EventLoopBaseProperties',
45
- job_completed(job, data->ret);
46
+ 'data': {} }
46
- g_free(data);
47
+
47
-}
48
##
48
-
49
# @MemoryBackendProperties:
49
static bool coroutine_fn yield_and_check(BackupBlockJob *job)
50
#
50
{
51
@@ -XXX,XX +XXX,XX @@
51
uint64_t delay_ns;
52
{ 'name': 'input-linux',
52
@@ -XXX,XX +XXX,XX @@ static void backup_incremental_init_copy_bitmap(BackupBlockJob *job)
53
'if': 'CONFIG_LINUX' },
53
static int coroutine_fn backup_run(Job *opaque_job, Error **errp)
54
'iothread',
54
{
55
+ 'main-loop',
55
BackupBlockJob *job = container_of(opaque_job, BackupBlockJob, common.job);
56
{ 'name': 'memory-backend-epc',
56
- BackupCompleteData *data;
57
'if': 'CONFIG_LINUX' },
57
BlockDriverState *bs = blk_bs(job->common.blk);
58
'memory-backend-file',
58
int64_t offset, nb_clusters;
59
@@ -XXX,XX +XXX,XX @@
59
int ret = 0;
60
'input-linux': { 'type': 'InputLinuxProperties',
60
@@ -XXX,XX +XXX,XX @@ static int coroutine_fn backup_run(Job *opaque_job, Error **errp)
61
'if': 'CONFIG_LINUX' },
61
qemu_co_rwlock_unlock(&job->flush_rwlock);
62
'iothread': 'IothreadProperties',
62
hbitmap_free(job->copy_bitmap);
63
+ 'main-loop': 'MainLoopProperties',
63
64
'memory-backend-epc': { 'type': 'MemoryBackendEpcProperties',
64
- data = g_malloc(sizeof(*data));
65
'if': 'CONFIG_LINUX' },
65
- data->ret = ret;
66
'memory-backend-file': 'MemoryBackendFileProperties',
66
- job_defer_to_main_loop(&job->common.job, backup_complete, data);
67
diff --git a/meson.build b/meson.build
67
return ret;
68
index XXXXXXX..XXXXXXX 100644
68
}
69
--- a/meson.build
69
70
+++ b/meson.build
70
diff --git a/block/create.c b/block/create.c
71
@@ -XXX,XX +XXX,XX @@ libqemuutil = static_library('qemuutil',
71
index XXXXXXX..XXXXXXX 100644
72
sources: util_ss.sources() + stub_ss.sources() + genh,
72
--- a/block/create.c
73
dependencies: [util_ss.dependencies(), libm, threads, glib, socket, malloc, pixman])
73
+++ b/block/create.c
74
qemuutil = declare_dependency(link_with: libqemuutil,
74
@@ -XXX,XX +XXX,XX @@ typedef struct BlockdevCreateJob {
75
- sources: genh + version_res)
75
Job common;
76
+ sources: genh + version_res,
76
BlockDriver *drv;
77
+ dependencies: [event_loop_base])
77
BlockdevCreateOptions *opts;
78
78
- int ret;
79
if have_system or have_user
79
} BlockdevCreateJob;
80
decodetree = generator(find_program('scripts/decodetree.py'),
80
81
diff --git a/include/qemu/main-loop.h b/include/qemu/main-loop.h
81
-static void blockdev_create_complete(Job *job, void *opaque)
82
index XXXXXXX..XXXXXXX 100644
82
-{
83
--- a/include/qemu/main-loop.h
83
- BlockdevCreateJob *s = container_of(job, BlockdevCreateJob, common);
84
+++ b/include/qemu/main-loop.h
84
-
85
@@ -XXX,XX +XXX,XX @@
85
- job_completed(job, s->ret);
86
#define QEMU_MAIN_LOOP_H
86
-}
87
87
-
88
#include "block/aio.h"
88
static int coroutine_fn blockdev_create_run(Job *job, Error **errp)
89
+#include "qom/object.h"
89
{
90
+#include "sysemu/event-loop-base.h"
90
BlockdevCreateJob *s = container_of(job, BlockdevCreateJob, common);
91
91
+ int ret;
92
#define SIG_IPI SIGUSR1
92
93
93
job_progress_set_remaining(&s->common, 1);
94
+#define TYPE_MAIN_LOOP "main-loop"
94
- s->ret = s->drv->bdrv_co_create(s->opts, errp);
95
+OBJECT_DECLARE_TYPE(MainLoop, MainLoopClass, MAIN_LOOP)
95
+ ret = s->drv->bdrv_co_create(s->opts, errp);
96
+
96
job_progress_update(&s->common, 1);
97
+struct MainLoop {
97
98
+ EventLoopBase parent_obj;
98
qapi_free_BlockdevCreateOptions(s->opts);
99
+};
99
- job_defer_to_main_loop(&s->common, blockdev_create_complete, NULL);
100
+typedef struct MainLoop MainLoop;
100
101
+
101
- return s->ret;
102
/**
102
+ return ret;
103
* qemu_init_main_loop: Set up the process so that it can run the main loop.
103
}
104
*
104
105
diff --git a/include/sysemu/event-loop-base.h b/include/sysemu/event-loop-base.h
105
static const JobDriver blockdev_create_job_driver = {
106
index XXXXXXX..XXXXXXX 100644
106
diff --git a/block/stream.c b/block/stream.c
107
--- a/include/sysemu/event-loop-base.h
107
index XXXXXXX..XXXXXXX 100644
108
+++ b/include/sysemu/event-loop-base.h
108
--- a/block/stream.c
109
@@ -XXX,XX +XXX,XX @@ struct EventLoopBaseClass {
109
+++ b/block/stream.c
110
110
@@ -XXX,XX +XXX,XX @@ static int coroutine_fn stream_populate(BlockBackend *blk,
111
void (*init)(EventLoopBase *base, Error **errp);
111
return blk_co_preadv(blk, offset, qiov.size, &qiov, BDRV_REQ_COPY_ON_READ);
112
void (*update_params)(EventLoopBase *base, Error **errp);
112
}
113
+ bool (*can_be_deleted)(EventLoopBase *base);
113
114
-typedef struct {
115
- int ret;
116
-} StreamCompleteData;
117
-
118
-static void stream_complete(Job *job, void *opaque)
119
+static void stream_exit(Job *job)
120
{
121
StreamBlockJob *s = container_of(job, StreamBlockJob, common.job);
122
BlockJob *bjob = &s->common;
123
- StreamCompleteData *data = opaque;
124
BlockDriverState *bs = blk_bs(bjob->blk);
125
BlockDriverState *base = s->base;
126
Error *local_err = NULL;
127
+ int ret = job->ret;
128
129
- if (!job_is_cancelled(job) && bs->backing && data->ret == 0) {
130
+ if (!job_is_cancelled(job) && bs->backing && ret == 0) {
131
const char *base_id = NULL, *base_fmt = NULL;
132
if (base) {
133
base_id = s->backing_file_str;
134
@@ -XXX,XX +XXX,XX @@ static void stream_complete(Job *job, void *opaque)
135
base_fmt = base->drv->format_name;
136
}
137
}
138
- data->ret = bdrv_change_backing_file(bs, base_id, base_fmt);
139
+ ret = bdrv_change_backing_file(bs, base_id, base_fmt);
140
bdrv_set_backing_hd(bs, base, &local_err);
141
if (local_err) {
142
error_report_err(local_err);
143
- data->ret = -EPERM;
144
+ ret = -EPERM;
145
goto out;
146
}
147
}
148
@@ -XXX,XX +XXX,XX @@ out:
149
}
150
151
g_free(s->backing_file_str);
152
- job_completed(job, data->ret);
153
- g_free(data);
154
+ job->ret = ret;
155
}
156
157
static int coroutine_fn stream_run(Job *job, Error **errp)
158
{
159
StreamBlockJob *s = container_of(job, StreamBlockJob, common.job);
160
- StreamCompleteData *data;
161
BlockBackend *blk = s->common.blk;
162
BlockDriverState *bs = blk_bs(blk);
163
BlockDriverState *base = s->base;
164
@@ -XXX,XX +XXX,XX @@ static int coroutine_fn stream_run(Job *job, Error **errp)
165
166
out:
167
/* Modify backing chain and close BDSes in main loop */
168
- data = g_malloc(sizeof(*data));
169
- data->ret = ret;
170
- job_defer_to_main_loop(&s->common.job, stream_complete, data);
171
return ret;
172
}
173
174
@@ -XXX,XX +XXX,XX @@ static const BlockJobDriver stream_job_driver = {
175
.job_type = JOB_TYPE_STREAM,
176
.free = block_job_free,
177
.run = stream_run,
178
+ .exit = stream_exit,
179
.user_resume = block_job_user_resume,
180
.drain = block_job_drain,
181
},
182
diff --git a/tests/test-bdrv-drain.c b/tests/test-bdrv-drain.c
183
index XXXXXXX..XXXXXXX 100644
184
--- a/tests/test-bdrv-drain.c
185
+++ b/tests/test-bdrv-drain.c
186
@@ -XXX,XX +XXX,XX @@ typedef struct TestBlockJob {
187
bool should_complete;
188
} TestBlockJob;
189
190
-static void test_job_completed(Job *job, void *opaque)
191
-{
192
- job_completed(job, 0);
193
-}
194
-
195
static int coroutine_fn test_job_run(Job *job, Error **errp)
196
{
197
TestBlockJob *s = container_of(job, TestBlockJob, common.job);
198
@@ -XXX,XX +XXX,XX @@ static int coroutine_fn test_job_run(Job *job, Error **errp)
199
job_pause_point(&s->common.job);
200
}
201
202
- job_defer_to_main_loop(&s->common.job, test_job_completed, NULL);
203
return 0;
204
}
205
206
diff --git a/tests/test-blockjob-txn.c b/tests/test-blockjob-txn.c
207
index XXXXXXX..XXXXXXX 100644
208
--- a/tests/test-blockjob-txn.c
209
+++ b/tests/test-blockjob-txn.c
210
@@ -XXX,XX +XXX,XX @@ typedef struct {
211
int *result;
212
} TestBlockJob;
213
214
-static void test_block_job_complete(Job *job, void *opaque)
215
+static void test_block_job_exit(Job *job)
216
{
217
BlockJob *bjob = container_of(job, BlockJob, job);
218
BlockDriverState *bs = blk_bs(bjob->blk);
219
- int rc = (intptr_t)opaque;
220
221
- if (job_is_cancelled(job)) {
222
- rc = -ECANCELED;
223
- }
224
-
225
- job_completed(job, rc);
226
bdrv_unref(bs);
227
}
228
229
@@ -XXX,XX +XXX,XX @@ static int coroutine_fn test_block_job_run(Job *job, Error **errp)
230
}
231
}
232
233
- job_defer_to_main_loop(job, test_block_job_complete,
234
- (void *)(intptr_t)s->rc);
235
return s->rc;
236
}
237
238
@@ -XXX,XX +XXX,XX @@ static const BlockJobDriver test_block_job_driver = {
239
.user_resume = block_job_user_resume,
240
.drain = block_job_drain,
241
.run = test_block_job_run,
242
+ .exit = test_block_job_exit,
243
},
114
};
244
};
115
245
116
struct EventLoopBase {
246
diff --git a/tests/test-blockjob.c b/tests/test-blockjob.c
117
diff --git a/event-loop-base.c b/event-loop-base.c
247
index XXXXXXX..XXXXXXX 100644
118
index XXXXXXX..XXXXXXX 100644
248
--- a/tests/test-blockjob.c
119
--- a/event-loop-base.c
249
+++ b/tests/test-blockjob.c
120
+++ b/event-loop-base.c
250
@@ -XXX,XX +XXX,XX @@ typedef struct CancelJob {
121
@@ -XXX,XX +XXX,XX @@ static void event_loop_base_complete(UserCreatable *uc, Error **errp)
251
bool completed;
122
}
252
} CancelJob;
123
}
253
124
254
-static void cancel_job_completed(Job *job, void *opaque)
125
+static bool event_loop_base_can_be_deleted(UserCreatable *uc)
255
+static void cancel_job_exit(Job *job)
126
+{
256
{
127
+ EventLoopBaseClass *bc = EVENT_LOOP_BASE_GET_CLASS(uc);
257
- CancelJob *s = opaque;
128
+ EventLoopBase *backend = EVENT_LOOP_BASE(uc);
258
+ CancelJob *s = container_of(job, CancelJob, common.job);
129
+
259
s->completed = true;
130
+ if (bc->can_be_deleted) {
260
- job_completed(job, 0);
131
+ return bc->can_be_deleted(backend);
261
}
132
+ }
262
133
+
263
static void cancel_job_complete(Job *job, Error **errp)
134
+ return true;
264
@@ -XXX,XX +XXX,XX @@ static int coroutine_fn cancel_job_run(Job *job, Error **errp)
135
+}
265
136
+
266
while (!s->should_complete) {
137
static void event_loop_base_class_init(ObjectClass *klass, void *class_data)
267
if (job_is_cancelled(&s->common.job)) {
138
{
268
- goto defer;
139
UserCreatableClass *ucc = USER_CREATABLE_CLASS(klass);
269
+ return 0;
140
ucc->complete = event_loop_base_complete;
270
}
141
+ ucc->can_be_deleted = event_loop_base_can_be_deleted;
271
142
272
if (!job_is_ready(&s->common.job) && s->should_converge) {
143
object_class_property_add(klass, "aio-max-batch", "int",
273
@@ -XXX,XX +XXX,XX @@ static int coroutine_fn cancel_job_run(Job *job, Error **errp)
144
event_loop_base_get_param,
274
job_sleep_ns(&s->common.job, 100000);
145
diff --git a/util/main-loop.c b/util/main-loop.c
275
}
146
index XXXXXXX..XXXXXXX 100644
276
147
--- a/util/main-loop.c
277
- defer:
148
+++ b/util/main-loop.c
278
- job_defer_to_main_loop(&s->common.job, cancel_job_completed, s);
149
@@ -XXX,XX +XXX,XX @@
150
#include "qemu/error-report.h"
151
#include "qemu/queue.h"
152
#include "qemu/compiler.h"
153
+#include "qom/object.h"
154
155
#ifndef _WIN32
156
#include <sys/wait.h>
157
@@ -XXX,XX +XXX,XX @@ int qemu_init_main_loop(Error **errp)
158
return 0;
279
return 0;
159
}
280
}
160
281
161
+static void main_loop_update_params(EventLoopBase *base, Error **errp)
282
@@ -XXX,XX +XXX,XX @@ static const BlockJobDriver test_cancel_driver = {
162
+{
283
.user_resume = block_job_user_resume,
163
+ if (!qemu_aio_context) {
284
.drain = block_job_drain,
164
+ error_setg(errp, "qemu aio context not ready");
285
.run = cancel_job_run,
165
+ return;
286
+ .exit = cancel_job_exit,
166
+ }
287
.complete = cancel_job_complete,
167
+
288
},
168
+ aio_context_set_aio_params(qemu_aio_context, base->aio_max_batch, errp);
289
};
169
+}
170
+
171
+MainLoop *mloop;
172
+
173
+static void main_loop_init(EventLoopBase *base, Error **errp)
174
+{
175
+ MainLoop *m = MAIN_LOOP(base);
176
+
177
+ if (mloop) {
178
+ error_setg(errp, "only one main-loop instance allowed");
179
+ return;
180
+ }
181
+
182
+ main_loop_update_params(base, errp);
183
+
184
+ mloop = m;
185
+ return;
186
+}
187
+
188
+static bool main_loop_can_be_deleted(EventLoopBase *base)
189
+{
190
+ return false;
191
+}
192
+
193
+static void main_loop_class_init(ObjectClass *oc, void *class_data)
194
+{
195
+ EventLoopBaseClass *bc = EVENT_LOOP_BASE_CLASS(oc);
196
+
197
+ bc->init = main_loop_init;
198
+ bc->update_params = main_loop_update_params;
199
+ bc->can_be_deleted = main_loop_can_be_deleted;
200
+}
201
+
202
+static const TypeInfo main_loop_info = {
203
+ .name = TYPE_MAIN_LOOP,
204
+ .parent = TYPE_EVENT_LOOP_BASE,
205
+ .class_init = main_loop_class_init,
206
+ .instance_size = sizeof(MainLoop),
207
+};
208
+
209
+static void main_loop_register_types(void)
210
+{
211
+ type_register_static(&main_loop_info);
212
+}
213
+
214
+type_init(main_loop_register_types)
215
+
216
static int max_priority;
217
218
#ifndef _WIN32
219
--
290
--
220
2.35.1
291
2.17.1
292
293
diff view generated by jsdifflib
New patch
1
From: John Snow <jsnow@redhat.com>
1
2
3
Rename opaque_job to job to be consistent with other job implementations.
4
Rename 'job', the BackupBlockJob object, to 's' to also be consistent.
5
6
Suggested-by: Eric Blake <eblake@redhat.com>
7
Signed-off-by: John Snow <jsnow@redhat.com>
8
Reviewed-by: Max Reitz <mreitz@redhat.com>
9
Message-id: 20180830015734.19765-8-jsnow@redhat.com
10
Signed-off-by: Max Reitz <mreitz@redhat.com>
11
---
12
block/backup.c | 62 +++++++++++++++++++++++++-------------------------
13
1 file changed, 31 insertions(+), 31 deletions(-)
14
15
diff --git a/block/backup.c b/block/backup.c
16
index XXXXXXX..XXXXXXX 100644
17
--- a/block/backup.c
18
+++ b/block/backup.c
19
@@ -XXX,XX +XXX,XX @@ static void backup_incremental_init_copy_bitmap(BackupBlockJob *job)
20
bdrv_dirty_iter_free(dbi);
21
}
22
23
-static int coroutine_fn backup_run(Job *opaque_job, Error **errp)
24
+static int coroutine_fn backup_run(Job *job, Error **errp)
25
{
26
- BackupBlockJob *job = container_of(opaque_job, BackupBlockJob, common.job);
27
- BlockDriverState *bs = blk_bs(job->common.blk);
28
+ BackupBlockJob *s = container_of(job, BackupBlockJob, common.job);
29
+ BlockDriverState *bs = blk_bs(s->common.blk);
30
int64_t offset, nb_clusters;
31
int ret = 0;
32
33
- QLIST_INIT(&job->inflight_reqs);
34
- qemu_co_rwlock_init(&job->flush_rwlock);
35
+ QLIST_INIT(&s->inflight_reqs);
36
+ qemu_co_rwlock_init(&s->flush_rwlock);
37
38
- nb_clusters = DIV_ROUND_UP(job->len, job->cluster_size);
39
- job_progress_set_remaining(&job->common.job, job->len);
40
+ nb_clusters = DIV_ROUND_UP(s->len, s->cluster_size);
41
+ job_progress_set_remaining(job, s->len);
42
43
- job->copy_bitmap = hbitmap_alloc(nb_clusters, 0);
44
- if (job->sync_mode == MIRROR_SYNC_MODE_INCREMENTAL) {
45
- backup_incremental_init_copy_bitmap(job);
46
+ s->copy_bitmap = hbitmap_alloc(nb_clusters, 0);
47
+ if (s->sync_mode == MIRROR_SYNC_MODE_INCREMENTAL) {
48
+ backup_incremental_init_copy_bitmap(s);
49
} else {
50
- hbitmap_set(job->copy_bitmap, 0, nb_clusters);
51
+ hbitmap_set(s->copy_bitmap, 0, nb_clusters);
52
}
53
54
55
- job->before_write.notify = backup_before_write_notify;
56
- bdrv_add_before_write_notifier(bs, &job->before_write);
57
+ s->before_write.notify = backup_before_write_notify;
58
+ bdrv_add_before_write_notifier(bs, &s->before_write);
59
60
- if (job->sync_mode == MIRROR_SYNC_MODE_NONE) {
61
+ if (s->sync_mode == MIRROR_SYNC_MODE_NONE) {
62
/* All bits are set in copy_bitmap to allow any cluster to be copied.
63
* This does not actually require them to be copied. */
64
- while (!job_is_cancelled(&job->common.job)) {
65
+ while (!job_is_cancelled(job)) {
66
/* Yield until the job is cancelled. We just let our before_write
67
* notify callback service CoW requests. */
68
- job_yield(&job->common.job);
69
+ job_yield(job);
70
}
71
- } else if (job->sync_mode == MIRROR_SYNC_MODE_INCREMENTAL) {
72
- ret = backup_run_incremental(job);
73
+ } else if (s->sync_mode == MIRROR_SYNC_MODE_INCREMENTAL) {
74
+ ret = backup_run_incremental(s);
75
} else {
76
/* Both FULL and TOP SYNC_MODE's require copying.. */
77
- for (offset = 0; offset < job->len;
78
- offset += job->cluster_size) {
79
+ for (offset = 0; offset < s->len;
80
+ offset += s->cluster_size) {
81
bool error_is_read;
82
int alloced = 0;
83
84
- if (yield_and_check(job)) {
85
+ if (yield_and_check(s)) {
86
break;
87
}
88
89
- if (job->sync_mode == MIRROR_SYNC_MODE_TOP) {
90
+ if (s->sync_mode == MIRROR_SYNC_MODE_TOP) {
91
int i;
92
int64_t n;
93
94
/* Check to see if these blocks are already in the
95
* backing file. */
96
97
- for (i = 0; i < job->cluster_size;) {
98
+ for (i = 0; i < s->cluster_size;) {
99
/* bdrv_is_allocated() only returns true/false based
100
* on the first set of sectors it comes across that
101
* are are all in the same state.
102
@@ -XXX,XX +XXX,XX @@ static int coroutine_fn backup_run(Job *opaque_job, Error **errp)
103
* needed but at some point that is always the case. */
104
alloced =
105
bdrv_is_allocated(bs, offset + i,
106
- job->cluster_size - i, &n);
107
+ s->cluster_size - i, &n);
108
i += n;
109
110
if (alloced || n == 0) {
111
@@ -XXX,XX +XXX,XX @@ static int coroutine_fn backup_run(Job *opaque_job, Error **errp)
112
if (alloced < 0) {
113
ret = alloced;
114
} else {
115
- ret = backup_do_cow(job, offset, job->cluster_size,
116
+ ret = backup_do_cow(s, offset, s->cluster_size,
117
&error_is_read, false);
118
}
119
if (ret < 0) {
120
/* Depending on error action, fail now or retry cluster */
121
BlockErrorAction action =
122
- backup_error_action(job, error_is_read, -ret);
123
+ backup_error_action(s, error_is_read, -ret);
124
if (action == BLOCK_ERROR_ACTION_REPORT) {
125
break;
126
} else {
127
- offset -= job->cluster_size;
128
+ offset -= s->cluster_size;
129
continue;
130
}
131
}
132
}
133
}
134
135
- notifier_with_return_remove(&job->before_write);
136
+ notifier_with_return_remove(&s->before_write);
137
138
/* wait until pending backup_do_cow() calls have completed */
139
- qemu_co_rwlock_wrlock(&job->flush_rwlock);
140
- qemu_co_rwlock_unlock(&job->flush_rwlock);
141
- hbitmap_free(job->copy_bitmap);
142
+ qemu_co_rwlock_wrlock(&s->flush_rwlock);
143
+ qemu_co_rwlock_unlock(&s->flush_rwlock);
144
+ hbitmap_free(s->copy_bitmap);
145
146
return ret;
147
}
148
--
149
2.17.1
150
151
diff view generated by jsdifflib
1
From: Nicolas Saenz Julienne <nsaenzju@redhat.com>
1
From: John Snow <jsnow@redhat.com>
2
2
3
Introduce the 'event-loop-base' abstract class, it'll hold the
3
Jobs are now expected to return their retcode on the stack, from the
4
properties common to all event loops and provide the necessary hooks for
4
.run callback, so we can remove that argument.
5
their creation and maintenance. Then have iothread inherit from it.
6
5
7
EventLoopBaseClass is defined as user creatable and provides a hook for
6
job_cancel does not need to set -ECANCELED because job_completed will
8
its children to attach themselves to the user creatable class 'complete'
7
update the return code itself if the job was canceled.
9
function. It also provides an update_params() callback to propagate
10
property changes onto its children.
11
8
12
The new 'event-loop-base' class will live in the root directory. It is
9
While we're here, make job_completed static to job.c and remove it from
13
built on its own using the 'link_whole' option (there are no direct
10
job.h; move the documentation of return code to the .run() callback and
14
function dependencies between the class and its children, it all happens
11
to the job->ret property, accordingly.
15
trough 'constructor' magic). And also imposes new compilation
16
dependencies:
17
12
18
qom <- event-loop-base <- blockdev (iothread.c)
13
Signed-off-by: John Snow <jsnow@redhat.com>
14
Message-id: 20180830015734.19765-9-jsnow@redhat.com
15
Reviewed-by: Max Reitz <mreitz@redhat.com>
16
Signed-off-by: Max Reitz <mreitz@redhat.com>
17
---
18
include/qemu/job.h | 28 +++++++++++++++-------------
19
job.c | 11 ++++++-----
20
trace-events | 2 +-
21
3 files changed, 22 insertions(+), 19 deletions(-)
19
22
20
And in subsequent patches:
23
diff --git a/include/qemu/job.h b/include/qemu/job.h
21
22
qom <- event-loop-base <- qemuutil (util/main-loop.c)
23
24
All this forced some amount of reordering in meson.build:
25
26
- Moved qom build definition before qemuutil. Doing it the other way
27
around (i.e. moving qemuutil after qom) isn't possible as a lot of
28
core libraries that live in between the two depend on it.
29
30
- Process the 'hw' subdir earlier, as it introduces files into the
31
'qom' source set.
32
33
No functional changes intended.
34
35
Signed-off-by: Nicolas Saenz Julienne <nsaenzju@redhat.com>
36
Reviewed-by: Stefan Hajnoczi <stefanha@redhat.com>
37
Acked-by: Markus Armbruster <armbru@redhat.com>
38
Message-id: 20220425075723.20019-2-nsaenzju@redhat.com
39
Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
40
---
41
qapi/qom.json | 22 +++++--
42
meson.build | 23 ++++---
43
include/sysemu/event-loop-base.h | 36 +++++++++++
44
include/sysemu/iothread.h | 6 +-
45
event-loop-base.c | 104 +++++++++++++++++++++++++++++++
46
iothread.c | 65 ++++++-------------
47
6 files changed, 192 insertions(+), 64 deletions(-)
48
create mode 100644 include/sysemu/event-loop-base.h
49
create mode 100644 event-loop-base.c
50
51
diff --git a/qapi/qom.json b/qapi/qom.json
52
index XXXXXXX..XXXXXXX 100644
24
index XXXXXXX..XXXXXXX 100644
53
--- a/qapi/qom.json
25
--- a/include/qemu/job.h
54
+++ b/qapi/qom.json
26
+++ b/include/qemu/job.h
55
@@ -XXX,XX +XXX,XX @@
27
@@ -XXX,XX +XXX,XX @@ typedef struct Job {
56
'*repeat': 'bool',
28
/** Estimated progress_current value at the completion of the job */
57
'*grab-toggle': 'GrabToggleKeys' } }
29
int64_t progress_total;
58
30
59
+##
31
- /** ret code passed to job_completed. */
60
+# @EventLoopBaseProperties:
32
+ /**
61
+#
33
+ * Return code from @run and/or @prepare callback(s).
62
+# Common properties for event loops
34
+ * Not final until the job has reached the CONCLUDED status.
63
+#
35
+ * 0 on success, -errno on failure.
64
+# @aio-max-batch: maximum number of requests in a batch for the AIO engine,
36
+ */
65
+# 0 means that the engine will use its default.
37
int ret;
66
+# (default: 0)
38
67
+#
39
/**
68
+# Since: 7.1
40
@@ -XXX,XX +XXX,XX @@ struct JobDriver {
69
+##
41
/** Enum describing the operation */
70
+{ 'struct': 'EventLoopBaseProperties',
42
JobType job_type;
71
+ 'data': { '*aio-max-batch': 'int' } }
43
72
+
44
- /** Mandatory: Entrypoint for the Coroutine. */
73
##
45
+ /**
74
# @IothreadProperties:
46
+ * Mandatory: Entrypoint for the Coroutine.
75
#
47
+ *
76
@@ -XXX,XX +XXX,XX @@
48
+ * This callback will be invoked when moving from CREATED to RUNNING.
77
# algorithm detects it is spending too long polling without
49
+ *
78
# encountering events. 0 selects a default behaviour (default: 0)
50
+ * If this callback returns nonzero, the job transaction it is part of is
79
#
51
+ * aborted. If it returns zero, the job moves into the WAITING state. If it
80
-# @aio-max-batch: maximum number of requests in a batch for the AIO engine,
52
+ * is the last job to complete in its transaction, all jobs in the
81
-# 0 means that the engine will use its default
53
+ * transaction move from WAITING to PENDING.
82
-# (default:0, since 6.1)
54
+ */
83
+# The @aio-max-batch option is available since 6.1.
55
int coroutine_fn (*run)(Job *job, Error **errp);
84
#
56
85
# Since: 2.0
57
/**
86
##
58
@@ -XXX,XX +XXX,XX @@ void job_early_fail(Job *job);
87
{ 'struct': 'IothreadProperties',
59
/** Moves the @job from RUNNING to READY */
88
+ 'base': 'EventLoopBaseProperties',
60
void job_transition_to_ready(Job *job);
89
'data': { '*poll-max-ns': 'int',
61
90
'*poll-grow': 'int',
62
-/**
91
- '*poll-shrink': 'int',
63
- * @job: The job being completed.
92
- '*aio-max-batch': 'int' } }
64
- * @ret: The status code.
93
+ '*poll-shrink': 'int' } }
65
- *
94
66
- * Marks @job as completed. If @ret is non-zero, the job transaction it is part
95
##
67
- * of is aborted. If @ret is zero, the job moves into the WAITING state. If it
96
# @MemoryBackendProperties:
68
- * is the last job to complete in its transaction, all jobs in the transaction
97
diff --git a/meson.build b/meson.build
69
- * move from WAITING to PENDING.
70
- */
71
-void job_completed(Job *job, int ret);
72
-
73
/** Asynchronously complete the specified @job. */
74
void job_complete(Job *job, Error **errp);
75
76
diff --git a/job.c b/job.c
98
index XXXXXXX..XXXXXXX 100644
77
index XXXXXXX..XXXXXXX 100644
99
--- a/meson.build
78
--- a/job.c
100
+++ b/meson.build
79
+++ b/job.c
101
@@ -XXX,XX +XXX,XX @@ subdir('qom')
80
@@ -XXX,XX +XXX,XX @@ void job_drain(Job *job)
102
subdir('authz')
103
subdir('crypto')
104
subdir('ui')
105
+subdir('hw')
106
107
108
if enable_modules
109
@@ -XXX,XX +XXX,XX @@ if enable_modules
110
modulecommon = declare_dependency(link_whole: libmodulecommon, compile_args: '-DBUILD_DSO')
111
endif
112
113
+qom_ss = qom_ss.apply(config_host, strict: false)
114
+libqom = static_library('qom', qom_ss.sources() + genh,
115
+ dependencies: [qom_ss.dependencies()],
116
+ name_suffix: 'fa')
117
+qom = declare_dependency(link_whole: libqom)
118
+
119
+event_loop_base = files('event-loop-base.c')
120
+event_loop_base = static_library('event-loop-base', sources: event_loop_base + genh,
121
+ build_by_default: true)
122
+event_loop_base = declare_dependency(link_whole: event_loop_base,
123
+ dependencies: [qom])
124
+
125
stub_ss = stub_ss.apply(config_all, strict: false)
126
127
util_ss.add_all(trace_ss)
128
@@ -XXX,XX +XXX,XX @@ subdir('monitor')
129
subdir('net')
130
subdir('replay')
131
subdir('semihosting')
132
-subdir('hw')
133
subdir('tcg')
134
subdir('fpu')
135
subdir('accel')
136
@@ -XXX,XX +XXX,XX @@ qemu_syms = custom_target('qemu.syms', output: 'qemu.syms',
137
capture: true,
138
command: [undefsym, nm, '@INPUT@'])
139
140
-qom_ss = qom_ss.apply(config_host, strict: false)
141
-libqom = static_library('qom', qom_ss.sources() + genh,
142
- dependencies: [qom_ss.dependencies()],
143
- name_suffix: 'fa')
144
-
145
-qom = declare_dependency(link_whole: libqom)
146
-
147
authz_ss = authz_ss.apply(config_host, strict: false)
148
libauthz = static_library('authz', authz_ss.sources() + genh,
149
dependencies: [authz_ss.dependencies()],
150
@@ -XXX,XX +XXX,XX @@ libblockdev = static_library('blockdev', blockdev_ss.sources() + genh,
151
build_by_default: false)
152
153
blockdev = declare_dependency(link_whole: [libblockdev],
154
- dependencies: [block])
155
+ dependencies: [block, event_loop_base])
156
157
qmp_ss = qmp_ss.apply(config_host, strict: false)
158
libqmp = static_library('qmp', qmp_ss.sources() + genh,
159
diff --git a/include/sysemu/event-loop-base.h b/include/sysemu/event-loop-base.h
160
new file mode 100644
161
index XXXXXXX..XXXXXXX
162
--- /dev/null
163
+++ b/include/sysemu/event-loop-base.h
164
@@ -XXX,XX +XXX,XX @@
165
+/*
166
+ * QEMU event-loop backend
167
+ *
168
+ * Copyright (C) 2022 Red Hat Inc
169
+ *
170
+ * Authors:
171
+ * Nicolas Saenz Julienne <nsaenzju@redhat.com>
172
+ *
173
+ * This work is licensed under the terms of the GNU GPL, version 2 or later.
174
+ * See the COPYING file in the top-level directory.
175
+ */
176
+#ifndef QEMU_EVENT_LOOP_BASE_H
177
+#define QEMU_EVENT_LOOP_BASE_H
178
+
179
+#include "qom/object.h"
180
+#include "block/aio.h"
181
+#include "qemu/typedefs.h"
182
+
183
+#define TYPE_EVENT_LOOP_BASE "event-loop-base"
184
+OBJECT_DECLARE_TYPE(EventLoopBase, EventLoopBaseClass,
185
+ EVENT_LOOP_BASE)
186
+
187
+struct EventLoopBaseClass {
188
+ ObjectClass parent_class;
189
+
190
+ void (*init)(EventLoopBase *base, Error **errp);
191
+ void (*update_params)(EventLoopBase *base, Error **errp);
192
+};
193
+
194
+struct EventLoopBase {
195
+ Object parent;
196
+
197
+ /* AioContext AIO engine parameters */
198
+ int64_t aio_max_batch;
199
+};
200
+#endif
201
diff --git a/include/sysemu/iothread.h b/include/sysemu/iothread.h
202
index XXXXXXX..XXXXXXX 100644
203
--- a/include/sysemu/iothread.h
204
+++ b/include/sysemu/iothread.h
205
@@ -XXX,XX +XXX,XX @@
206
#include "block/aio.h"
207
#include "qemu/thread.h"
208
#include "qom/object.h"
209
+#include "sysemu/event-loop-base.h"
210
211
#define TYPE_IOTHREAD "iothread"
212
213
struct IOThread {
214
- Object parent_obj;
215
+ EventLoopBase parent_obj;
216
217
QemuThread thread;
218
AioContext *ctx;
219
@@ -XXX,XX +XXX,XX @@ struct IOThread {
220
int64_t poll_max_ns;
221
int64_t poll_grow;
222
int64_t poll_shrink;
223
-
224
- /* AioContext AIO engine parameters */
225
- int64_t aio_max_batch;
226
};
227
typedef struct IOThread IOThread;
228
229
diff --git a/event-loop-base.c b/event-loop-base.c
230
new file mode 100644
231
index XXXXXXX..XXXXXXX
232
--- /dev/null
233
+++ b/event-loop-base.c
234
@@ -XXX,XX +XXX,XX @@
235
+/*
236
+ * QEMU event-loop base
237
+ *
238
+ * Copyright (C) 2022 Red Hat Inc
239
+ *
240
+ * Authors:
241
+ * Stefan Hajnoczi <stefanha@redhat.com>
242
+ * Nicolas Saenz Julienne <nsaenzju@redhat.com>
243
+ *
244
+ * This work is licensed under the terms of the GNU GPL, version 2 or later.
245
+ * See the COPYING file in the top-level directory.
246
+ */
247
+
248
+#include "qemu/osdep.h"
249
+#include "qom/object_interfaces.h"
250
+#include "qapi/error.h"
251
+#include "sysemu/event-loop-base.h"
252
+
253
+typedef struct {
254
+ const char *name;
255
+ ptrdiff_t offset; /* field's byte offset in EventLoopBase struct */
256
+} EventLoopBaseParamInfo;
257
+
258
+static EventLoopBaseParamInfo aio_max_batch_info = {
259
+ "aio-max-batch", offsetof(EventLoopBase, aio_max_batch),
260
+};
261
+
262
+static void event_loop_base_get_param(Object *obj, Visitor *v,
263
+ const char *name, void *opaque, Error **errp)
264
+{
265
+ EventLoopBase *event_loop_base = EVENT_LOOP_BASE(obj);
266
+ EventLoopBaseParamInfo *info = opaque;
267
+ int64_t *field = (void *)event_loop_base + info->offset;
268
+
269
+ visit_type_int64(v, name, field, errp);
270
+}
271
+
272
+static void event_loop_base_set_param(Object *obj, Visitor *v,
273
+ const char *name, void *opaque, Error **errp)
274
+{
275
+ EventLoopBaseClass *bc = EVENT_LOOP_BASE_GET_CLASS(obj);
276
+ EventLoopBase *base = EVENT_LOOP_BASE(obj);
277
+ EventLoopBaseParamInfo *info = opaque;
278
+ int64_t *field = (void *)base + info->offset;
279
+ int64_t value;
280
+
281
+ if (!visit_type_int64(v, name, &value, errp)) {
282
+ return;
283
+ }
284
+
285
+ if (value < 0) {
286
+ error_setg(errp, "%s value must be in range [0, %" PRId64 "]",
287
+ info->name, INT64_MAX);
288
+ return;
289
+ }
290
+
291
+ *field = value;
292
+
293
+ if (bc->update_params) {
294
+ bc->update_params(base, errp);
295
+ }
296
+
297
+ return;
298
+}
299
+
300
+static void event_loop_base_complete(UserCreatable *uc, Error **errp)
301
+{
302
+ EventLoopBaseClass *bc = EVENT_LOOP_BASE_GET_CLASS(uc);
303
+ EventLoopBase *base = EVENT_LOOP_BASE(uc);
304
+
305
+ if (bc->init) {
306
+ bc->init(base, errp);
307
+ }
308
+}
309
+
310
+static void event_loop_base_class_init(ObjectClass *klass, void *class_data)
311
+{
312
+ UserCreatableClass *ucc = USER_CREATABLE_CLASS(klass);
313
+ ucc->complete = event_loop_base_complete;
314
+
315
+ object_class_property_add(klass, "aio-max-batch", "int",
316
+ event_loop_base_get_param,
317
+ event_loop_base_set_param,
318
+ NULL, &aio_max_batch_info);
319
+}
320
+
321
+static const TypeInfo event_loop_base_info = {
322
+ .name = TYPE_EVENT_LOOP_BASE,
323
+ .parent = TYPE_OBJECT,
324
+ .instance_size = sizeof(EventLoopBase),
325
+ .class_size = sizeof(EventLoopBaseClass),
326
+ .class_init = event_loop_base_class_init,
327
+ .abstract = true,
328
+ .interfaces = (InterfaceInfo[]) {
329
+ { TYPE_USER_CREATABLE },
330
+ { }
331
+ }
332
+};
333
+
334
+static void register_types(void)
335
+{
336
+ type_register_static(&event_loop_base_info);
337
+}
338
+type_init(register_types);
339
diff --git a/iothread.c b/iothread.c
340
index XXXXXXX..XXXXXXX 100644
341
--- a/iothread.c
342
+++ b/iothread.c
343
@@ -XXX,XX +XXX,XX @@
344
#include "qemu/module.h"
345
#include "block/aio.h"
346
#include "block/block.h"
347
+#include "sysemu/event-loop-base.h"
348
#include "sysemu/iothread.h"
349
#include "qapi/error.h"
350
#include "qapi/qapi-commands-misc.h"
351
@@ -XXX,XX +XXX,XX @@ static void iothread_init_gcontext(IOThread *iothread)
352
iothread->main_loop = g_main_loop_new(iothread->worker_context, TRUE);
353
}
354
355
-static void iothread_set_aio_context_params(IOThread *iothread, Error **errp)
356
+static void iothread_set_aio_context_params(EventLoopBase *base, Error **errp)
357
{
358
+ IOThread *iothread = IOTHREAD(base);
359
ERRP_GUARD();
360
361
+ if (!iothread->ctx) {
362
+ return;
363
+ }
364
+
365
aio_context_set_poll_params(iothread->ctx,
366
iothread->poll_max_ns,
367
iothread->poll_grow,
368
@@ -XXX,XX +XXX,XX @@ static void iothread_set_aio_context_params(IOThread *iothread, Error **errp)
369
}
370
371
aio_context_set_aio_params(iothread->ctx,
372
- iothread->aio_max_batch,
373
+ iothread->parent_obj.aio_max_batch,
374
errp);
375
}
376
377
-static void iothread_complete(UserCreatable *obj, Error **errp)
378
+
379
+static void iothread_init(EventLoopBase *base, Error **errp)
380
{
381
Error *local_error = NULL;
382
- IOThread *iothread = IOTHREAD(obj);
383
+ IOThread *iothread = IOTHREAD(base);
384
char *thread_name;
385
386
iothread->stopping = false;
387
@@ -XXX,XX +XXX,XX @@ static void iothread_complete(UserCreatable *obj, Error **errp)
388
*/
389
iothread_init_gcontext(iothread);
390
391
- iothread_set_aio_context_params(iothread, &local_error);
392
+ iothread_set_aio_context_params(base, &local_error);
393
if (local_error) {
394
error_propagate(errp, local_error);
395
aio_context_unref(iothread->ctx);
396
@@ -XXX,XX +XXX,XX @@ static void iothread_complete(UserCreatable *obj, Error **errp)
397
* to inherit.
398
*/
399
thread_name = g_strdup_printf("IO %s",
400
- object_get_canonical_path_component(OBJECT(obj)));
401
+ object_get_canonical_path_component(OBJECT(base)));
402
qemu_thread_create(&iothread->thread, thread_name, iothread_run,
403
iothread, QEMU_THREAD_JOINABLE);
404
g_free(thread_name);
405
@@ -XXX,XX +XXX,XX @@ static IOThreadParamInfo poll_grow_info = {
406
static IOThreadParamInfo poll_shrink_info = {
407
"poll-shrink", offsetof(IOThread, poll_shrink),
408
};
409
-static IOThreadParamInfo aio_max_batch_info = {
410
- "aio-max-batch", offsetof(IOThread, aio_max_batch),
411
-};
412
413
static void iothread_get_param(Object *obj, Visitor *v,
414
const char *name, IOThreadParamInfo *info, Error **errp)
415
@@ -XXX,XX +XXX,XX @@ static void iothread_set_poll_param(Object *obj, Visitor *v,
416
}
81
}
417
}
82
}
418
83
419
-static void iothread_get_aio_param(Object *obj, Visitor *v,
84
+static void job_completed(Job *job);
420
- const char *name, void *opaque, Error **errp)
85
+
421
-{
86
static void job_exit(void *opaque)
422
- IOThreadParamInfo *info = opaque;
423
-
424
- iothread_get_param(obj, v, name, info, errp);
425
-}
426
-
427
-static void iothread_set_aio_param(Object *obj, Visitor *v,
428
- const char *name, void *opaque, Error **errp)
429
-{
430
- IOThread *iothread = IOTHREAD(obj);
431
- IOThreadParamInfo *info = opaque;
432
-
433
- if (!iothread_set_param(obj, v, name, info, errp)) {
434
- return;
435
- }
436
-
437
- if (iothread->ctx) {
438
- aio_context_set_aio_params(iothread->ctx,
439
- iothread->aio_max_batch,
440
- errp);
441
- }
442
-}
443
-
444
static void iothread_class_init(ObjectClass *klass, void *class_data)
445
{
87
{
446
- UserCreatableClass *ucc = USER_CREATABLE_CLASS(klass);
88
Job *job = (Job *)opaque;
447
- ucc->complete = iothread_complete;
89
@@ -XXX,XX +XXX,XX @@ static void job_exit(void *opaque)
448
+ EventLoopBaseClass *bc = EVENT_LOOP_BASE_CLASS(klass);
90
job->driver->exit(job);
449
+
91
aio_context_release(aio_context);
450
+ bc->init = iothread_init;
92
}
451
+ bc->update_params = iothread_set_aio_context_params;
93
- job_completed(job, job->ret);
452
94
+ job_completed(job);
453
object_class_property_add(klass, "poll-max-ns", "int",
454
iothread_get_poll_param,
455
@@ -XXX,XX +XXX,XX @@ static void iothread_class_init(ObjectClass *klass, void *class_data)
456
iothread_get_poll_param,
457
iothread_set_poll_param,
458
NULL, &poll_shrink_info);
459
- object_class_property_add(klass, "aio-max-batch", "int",
460
- iothread_get_aio_param,
461
- iothread_set_aio_param,
462
- NULL, &aio_max_batch_info);
463
}
95
}
464
96
465
static const TypeInfo iothread_info = {
97
/**
466
.name = TYPE_IOTHREAD,
98
@@ -XXX,XX +XXX,XX @@ static void job_completed_txn_success(Job *job)
467
- .parent = TYPE_OBJECT,
99
}
468
+ .parent = TYPE_EVENT_LOOP_BASE,
100
}
469
.class_init = iothread_class_init,
101
470
.instance_size = sizeof(IOThread),
102
-void job_completed(Job *job, int ret)
471
.instance_init = iothread_instance_init,
103
+static void job_completed(Job *job)
472
.instance_finalize = iothread_instance_finalize,
104
{
473
- .interfaces = (InterfaceInfo[]) {
105
assert(job && job->txn && !job_is_completed(job));
474
- {TYPE_USER_CREATABLE},
106
475
- {}
107
- job->ret = ret;
476
- },
108
job_update_rc(job);
477
};
109
- trace_job_completed(job, ret, job->ret);
478
110
+ trace_job_completed(job, job->ret);
479
static void iothread_register_types(void)
111
if (job->ret) {
480
@@ -XXX,XX +XXX,XX @@ static int query_one_iothread(Object *object, void *opaque)
112
job_completed_txn_abort(job);
481
info->poll_max_ns = iothread->poll_max_ns;
113
} else {
482
info->poll_grow = iothread->poll_grow;
114
@@ -XXX,XX +XXX,XX @@ void job_cancel(Job *job, bool force)
483
info->poll_shrink = iothread->poll_shrink;
115
}
484
- info->aio_max_batch = iothread->aio_max_batch;
116
job_cancel_async(job, force);
485
+ info->aio_max_batch = iothread->parent_obj.aio_max_batch;
117
if (!job_started(job)) {
486
118
- job_completed(job, -ECANCELED);
487
QAPI_LIST_APPEND(*tail, info);
119
+ job_completed(job);
488
return 0;
120
} else if (job->deferred_to_main_loop) {
121
job_completed_txn_abort(job);
122
} else {
123
diff --git a/trace-events b/trace-events
124
index XXXXXXX..XXXXXXX 100644
125
--- a/trace-events
126
+++ b/trace-events
127
@@ -XXX,XX +XXX,XX @@ gdbstub_err_checksum_incorrect(uint8_t expected, uint8_t got) "got command packe
128
# job.c
129
job_state_transition(void *job, int ret, const char *legal, const char *s0, const char *s1) "job %p (ret: %d) attempting %s transition (%s-->%s)"
130
job_apply_verb(void *job, const char *state, const char *verb, const char *legal) "job %p in state %s; applying verb %s (%s)"
131
-job_completed(void *job, int ret, int jret) "job %p ret %d corrected ret %d"
132
+job_completed(void *job, int ret) "job %p ret %d"
133
134
# job-qmp.c
135
qmp_job_cancel(void *job) "job %p"
489
--
136
--
490
2.35.1
137
2.17.1
138
139
diff view generated by jsdifflib
New patch
1
From: John Snow <jsnow@redhat.com>
1
2
3
Now that the job infrastructure is handling the job_completed call for
4
all implemented jobs, we can remove the interface that allowed jobs to
5
schedule their own completion.
6
7
Signed-off-by: John Snow <jsnow@redhat.com>
8
Reviewed-by: Max Reitz <mreitz@redhat.com>
9
Message-id: 20180830015734.19765-10-jsnow@redhat.com
10
Signed-off-by: Max Reitz <mreitz@redhat.com>
11
---
12
include/qemu/job.h | 17 -----------------
13
job.c | 40 ++--------------------------------------
14
2 files changed, 2 insertions(+), 55 deletions(-)
15
16
diff --git a/include/qemu/job.h b/include/qemu/job.h
17
index XXXXXXX..XXXXXXX 100644
18
--- a/include/qemu/job.h
19
+++ b/include/qemu/job.h
20
@@ -XXX,XX +XXX,XX @@ void job_finalize(Job *job, Error **errp);
21
*/
22
void job_dismiss(Job **job, Error **errp);
23
24
-typedef void JobDeferToMainLoopFn(Job *job, void *opaque);
25
-
26
-/**
27
- * @job: The job
28
- * @fn: The function to run in the main loop
29
- * @opaque: The opaque value that is passed to @fn
30
- *
31
- * This function must be called by the main job coroutine just before it
32
- * returns. @fn is executed in the main loop with the job AioContext acquired.
33
- *
34
- * Block jobs must call bdrv_unref(), bdrv_close(), and anything that uses
35
- * bdrv_drain_all() in the main loop.
36
- *
37
- * The @job AioContext is held while @fn executes.
38
- */
39
-void job_defer_to_main_loop(Job *job, JobDeferToMainLoopFn *fn, void *opaque);
40
-
41
/**
42
* Synchronously finishes the given @job. If @finish is given, it is called to
43
* trigger completion or cancellation of the job.
44
diff --git a/job.c b/job.c
45
index XXXXXXX..XXXXXXX 100644
46
--- a/job.c
47
+++ b/job.c
48
@@ -XXX,XX +XXX,XX @@ static void coroutine_fn job_co_entry(void *opaque)
49
assert(job && job->driver && job->driver->run);
50
job_pause_point(job);
51
job->ret = job->driver->run(job, &job->err);
52
- if (!job->deferred_to_main_loop) {
53
- job->deferred_to_main_loop = true;
54
- aio_bh_schedule_oneshot(qemu_get_aio_context(),
55
- job_exit,
56
- job);
57
- }
58
+ job->deferred_to_main_loop = true;
59
+ aio_bh_schedule_oneshot(qemu_get_aio_context(), job_exit, job);
60
}
61
62
63
@@ -XXX,XX +XXX,XX @@ void job_complete(Job *job, Error **errp)
64
job->driver->complete(job, errp);
65
}
66
67
-
68
-typedef struct {
69
- Job *job;
70
- JobDeferToMainLoopFn *fn;
71
- void *opaque;
72
-} JobDeferToMainLoopData;
73
-
74
-static void job_defer_to_main_loop_bh(void *opaque)
75
-{
76
- JobDeferToMainLoopData *data = opaque;
77
- Job *job = data->job;
78
- AioContext *aio_context = job->aio_context;
79
-
80
- aio_context_acquire(aio_context);
81
- data->fn(data->job, data->opaque);
82
- aio_context_release(aio_context);
83
-
84
- g_free(data);
85
-}
86
-
87
-void job_defer_to_main_loop(Job *job, JobDeferToMainLoopFn *fn, void *opaque)
88
-{
89
- JobDeferToMainLoopData *data = g_malloc(sizeof(*data));
90
- data->job = job;
91
- data->fn = fn;
92
- data->opaque = opaque;
93
- job->deferred_to_main_loop = true;
94
-
95
- aio_bh_schedule_oneshot(qemu_get_aio_context(),
96
- job_defer_to_main_loop_bh, data);
97
-}
98
-
99
int job_finish_sync(Job *job, void (*finish)(Job *, Error **errp), Error **errp)
100
{
101
Error *local_err = NULL;
102
--
103
2.17.1
104
105
diff view generated by jsdifflib