In order to talk to the source BDS (and maybe in the future to the
target BDS as well) directly, we need to convert our existing AIO
requests into coroutine I/O requests.
Signed-off-by: Max Reitz <mreitz@redhat.com>
---
block/mirror.c | 154 ++++++++++++++++++++++++++++++++++-----------------------
1 file changed, 92 insertions(+), 62 deletions(-)
diff --git a/block/mirror.c b/block/mirror.c
index 4066788ee2..71a8e66850 100644
--- a/block/mirror.c
+++ b/block/mirror.c
@@ -80,6 +80,10 @@ typedef struct MirrorOp {
QEMUIOVector qiov;
int64_t offset;
uint64_t bytes;
+
+ /* The pointee is set by mirror_co_read(), mirror_co_zero(), and
+ * mirror_co_discard() before yielding for the first time */
+ int64_t *bytes_handled;
} MirrorOp;
typedef enum MirrorMethod {
@@ -101,7 +105,7 @@ static BlockErrorAction mirror_error_action(MirrorBlockJob *s, bool read,
}
}
-static void mirror_iteration_done(MirrorOp *op, int ret)
+static void coroutine_fn mirror_iteration_done(MirrorOp *op, int ret)
{
MirrorBlockJob *s = op->s;
struct iovec *iov;
@@ -138,9 +142,8 @@ static void mirror_iteration_done(MirrorOp *op, int ret)
}
}
-static void mirror_write_complete(void *opaque, int ret)
+static void coroutine_fn mirror_write_complete(MirrorOp *op, int ret)
{
- MirrorOp *op = opaque;
MirrorBlockJob *s = op->s;
aio_context_acquire(blk_get_aio_context(s->common.blk));
@@ -157,9 +160,8 @@ static void mirror_write_complete(void *opaque, int ret)
aio_context_release(blk_get_aio_context(s->common.blk));
}
-static void mirror_read_complete(void *opaque, int ret)
+static void coroutine_fn mirror_read_complete(MirrorOp *op, int ret)
{
- MirrorOp *op = opaque;
MirrorBlockJob *s = op->s;
aio_context_acquire(blk_get_aio_context(s->common.blk));
@@ -174,8 +176,11 @@ static void mirror_read_complete(void *opaque, int ret)
mirror_iteration_done(op, ret);
} else {
- blk_aio_pwritev(s->target, op->offset, &op->qiov,
- 0, mirror_write_complete, op);
+ int ret;
+
+ ret = blk_co_pwritev(s->target, op->offset,
+ op->qiov.size, &op->qiov, 0);
+ mirror_write_complete(op, ret);
}
aio_context_release(blk_get_aio_context(s->common.blk));
}
@@ -232,60 +237,57 @@ static inline void mirror_wait_for_io(MirrorBlockJob *s)
s->waiting_for_io = false;
}
-/* Submit async read while handling COW.
- * Returns: The number of bytes copied after and including offset,
- * excluding any bytes copied prior to offset due to alignment.
- * This will be @bytes if no alignment is necessary, or
- * (new_end - offset) if tail is rounded up or down due to
- * alignment or buffer limit.
+/* Perform a mirror copy operation.
+ *
+ * *op->bytes_handled is set to the number of bytes copied after and
+ * including offset, excluding any bytes copied prior to offset due
+ * to alignment. This will be op->bytes if no alignment is necessary,
+ * or (new_end - op->offset) if the tail is rounded up or down due to
+ * alignment or buffer limit.
*/
-static uint64_t mirror_do_read(MirrorBlockJob *s, int64_t offset,
- uint64_t bytes)
+static void coroutine_fn mirror_co_read(void *opaque)
{
+ MirrorOp *op = opaque;
+ MirrorBlockJob *s = op->s;
BlockBackend *source = s->common.blk;
int nb_chunks;
uint64_t ret;
- MirrorOp *op;
uint64_t max_bytes;
max_bytes = s->granularity * s->max_iov;
/* We can only handle as much as buf_size at a time. */
- bytes = MIN(s->buf_size, MIN(max_bytes, bytes));
- assert(bytes);
- assert(bytes < BDRV_REQUEST_MAX_BYTES);
- ret = bytes;
+ op->bytes = MIN(s->buf_size, MIN(max_bytes, op->bytes));
+ assert(op->bytes);
+ assert(op->bytes < BDRV_REQUEST_MAX_BYTES);
+ *op->bytes_handled = op->bytes;
if (s->cow_bitmap) {
- ret += mirror_cow_align(s, &offset, &bytes);
+ *op->bytes_handled += mirror_cow_align(s, &op->offset, &op->bytes);
}
- assert(bytes <= s->buf_size);
+ /* Cannot exceed BDRV_REQUEST_MAX_BYTES + INT_MAX */
+ assert(*op->bytes_handled <= UINT_MAX);
+ assert(op->bytes <= s->buf_size);
/* The offset is granularity-aligned because:
* 1) Caller passes in aligned values;
* 2) mirror_cow_align is used only when target cluster is larger. */
- assert(QEMU_IS_ALIGNED(offset, s->granularity));
+ assert(QEMU_IS_ALIGNED(op->offset, s->granularity));
/* The range is sector-aligned, since bdrv_getlength() rounds up. */
- assert(QEMU_IS_ALIGNED(bytes, BDRV_SECTOR_SIZE));
- nb_chunks = DIV_ROUND_UP(bytes, s->granularity);
+ assert(QEMU_IS_ALIGNED(op->bytes, BDRV_SECTOR_SIZE));
+ nb_chunks = DIV_ROUND_UP(op->bytes, s->granularity);
while (s->buf_free_count < nb_chunks) {
- trace_mirror_yield_in_flight(s, offset, s->in_flight);
+ trace_mirror_yield_in_flight(s, op->offset, s->in_flight);
mirror_wait_for_io(s);
}
- /* Allocate a MirrorOp that is used as an AIO callback. */
- op = g_new(MirrorOp, 1);
- op->s = s;
- op->offset = offset;
- op->bytes = bytes;
-
/* Now make a QEMUIOVector taking enough granularity-sized chunks
* from s->buf_free.
*/
qemu_iovec_init(&op->qiov, nb_chunks);
while (nb_chunks-- > 0) {
MirrorBuffer *buf = QSIMPLEQ_FIRST(&s->buf_free);
- size_t remaining = bytes - op->qiov.size;
+ size_t remaining = op->bytes - op->qiov.size;
QSIMPLEQ_REMOVE_HEAD(&s->buf_free, next);
s->buf_free_count--;
@@ -294,53 +296,81 @@ static uint64_t mirror_do_read(MirrorBlockJob *s, int64_t offset,
/* Copy the dirty cluster. */
s->in_flight++;
- s->bytes_in_flight += bytes;
- trace_mirror_one_iteration(s, offset, bytes);
+ s->bytes_in_flight += op->bytes;
+ trace_mirror_one_iteration(s, op->offset, op->bytes);
- blk_aio_preadv(source, offset, &op->qiov, 0, mirror_read_complete, op);
- return ret;
+ ret = blk_co_preadv(source, op->offset, op->bytes, &op->qiov, 0);
+ mirror_read_complete(op, ret);
}
-static void mirror_do_zero_or_discard(MirrorBlockJob *s,
- int64_t offset,
- uint64_t bytes,
- bool is_discard)
+static void coroutine_fn mirror_co_zero(void *opaque)
{
- MirrorOp *op;
+ MirrorOp *op = opaque;
+ int ret;
- /* Allocate a MirrorOp that is used as an AIO callback. The qiov is zeroed
- * so the freeing in mirror_iteration_done is nop. */
- op = g_new0(MirrorOp, 1);
- op->s = s;
- op->offset = offset;
- op->bytes = bytes;
+ op->s->in_flight++;
+ op->s->bytes_in_flight += op->bytes;
+ *op->bytes_handled = op->bytes;
- s->in_flight++;
- s->bytes_in_flight += bytes;
- if (is_discard) {
- blk_aio_pdiscard(s->target, offset,
- op->bytes, mirror_write_complete, op);
- } else {
- blk_aio_pwrite_zeroes(s->target, offset,
- op->bytes, s->unmap ? BDRV_REQ_MAY_UNMAP : 0,
- mirror_write_complete, op);
- }
+ ret = blk_co_pwrite_zeroes(op->s->target, op->offset, op->bytes,
+ op->s->unmap ? BDRV_REQ_MAY_UNMAP : 0);
+ mirror_write_complete(op, ret);
+}
+
+static void coroutine_fn mirror_co_discard(void *opaque)
+{
+ MirrorOp *op = opaque;
+ int ret;
+
+ op->s->in_flight++;
+ op->s->bytes_in_flight += op->bytes;
+ *op->bytes_handled = op->bytes;
+
+ ret = blk_co_pdiscard(op->s->target, op->offset, op->bytes);
+ mirror_write_complete(op, ret);
}
static unsigned mirror_perform(MirrorBlockJob *s, int64_t offset,
unsigned bytes, MirrorMethod mirror_method)
{
+ MirrorOp *op;
+ Coroutine *co;
+ int64_t bytes_handled = -1;
+
+ op = g_new(MirrorOp, 1);
+ *op = (MirrorOp){
+ .s = s,
+ .offset = offset,
+ .bytes = bytes,
+ .bytes_handled = &bytes_handled,
+ };
+
switch (mirror_method) {
case MIRROR_METHOD_COPY:
- return mirror_do_read(s, offset, bytes);
+ co = qemu_coroutine_create(mirror_co_read, op);
+ break;
case MIRROR_METHOD_ZERO:
+ co = qemu_coroutine_create(mirror_co_zero, op);
+ break;
case MIRROR_METHOD_DISCARD:
- mirror_do_zero_or_discard(s, offset, bytes,
- mirror_method == MIRROR_METHOD_DISCARD);
- return bytes;
+ co = qemu_coroutine_create(mirror_co_discard, op);
+ break;
default:
abort();
}
+
+ qemu_coroutine_enter(co);
+ /* At this point, ownership of op has been moved to the coroutine
+ * and the object may already be freed */
+
+ /* Assert that this value has been set */
+ assert(bytes_handled >= 0);
+
+ /* Same assertion as in mirror_co_read() (and for mirror_co_read()
+ * and mirror_co_discard(), bytes_handled == op->bytes, which
+ * is the @bytes parameter given to this function) */
+ assert(bytes_handled <= UINT_MAX);
+ return bytes_handled;
}
static uint64_t coroutine_fn mirror_iteration(MirrorBlockJob *s)
--
2.14.3
On Mon, 01/22 23:07, Max Reitz wrote:
> @@ -101,7 +105,7 @@ static BlockErrorAction mirror_error_action(MirrorBlockJob *s, bool read,
> }
> }
>
> -static void mirror_iteration_done(MirrorOp *op, int ret)
> +static void coroutine_fn mirror_iteration_done(MirrorOp *op, int ret)
> {
> MirrorBlockJob *s = op->s;
> struct iovec *iov;
I think we want s/qemu_coroutine_enter/aio_co_wake/ in mirror_iteration_done().
As an AIO callback before, this didn't matter, but now we are in an terminating
coroutine, so it is pointless to defer the termination, or even risky in that we
are in a aio_context_acquire/release section, but have already decremented
s->in_flight, which is fishy.
> @@ -138,9 +142,8 @@ static void mirror_iteration_done(MirrorOp *op, int ret)
> }
> }
>
> -static void mirror_write_complete(void *opaque, int ret)
> +static void coroutine_fn mirror_write_complete(MirrorOp *op, int ret)
> {
> - MirrorOp *op = opaque;
> MirrorBlockJob *s = op->s;
>
> aio_context_acquire(blk_get_aio_context(s->common.blk));
> @@ -157,9 +160,8 @@ static void mirror_write_complete(void *opaque, int ret)
> aio_context_release(blk_get_aio_context(s->common.blk));
> }
>
> -static void mirror_read_complete(void *opaque, int ret)
> +static void coroutine_fn mirror_read_complete(MirrorOp *op, int ret)
> {
> - MirrorOp *op = opaque;
> MirrorBlockJob *s = op->s;
>
> aio_context_acquire(blk_get_aio_context(s->common.blk));
> @@ -174,8 +176,11 @@ static void mirror_read_complete(void *opaque, int ret)
>
> mirror_iteration_done(op, ret);
> } else {
> - blk_aio_pwritev(s->target, op->offset, &op->qiov,
> - 0, mirror_write_complete, op);
> + int ret;
s/ret/ret2/ or drop the definition?
because ret is already the paramter of the function.
> +
> + ret = blk_co_pwritev(s->target, op->offset,
> + op->qiov.size, &op->qiov, 0);
> + mirror_write_complete(op, ret);
> }
> aio_context_release(blk_get_aio_context(s->common.blk));
> }
<snip>
> +static void coroutine_fn mirror_co_discard(void *opaque)
> +{
> + MirrorOp *op = opaque;
> + int ret;
> +
> + op->s->in_flight++;
> + op->s->bytes_in_flight += op->bytes;
> + *op->bytes_handled = op->bytes;
> +
> + ret = blk_co_pdiscard(op->s->target, op->offset, op->bytes);
> + mirror_write_complete(op, ret);
> }
>
> static unsigned mirror_perform(MirrorBlockJob *s, int64_t offset,
> unsigned bytes, MirrorMethod mirror_method)
Doesn't mirror_perform need coroutine_fn annotation too?
> {
Fam
On 2018-02-27 08:44, Fam Zheng wrote:
> On Mon, 01/22 23:07, Max Reitz wrote:
>> @@ -101,7 +105,7 @@ static BlockErrorAction mirror_error_action(MirrorBlockJob *s, bool read,
>> }
>> }
>>
>> -static void mirror_iteration_done(MirrorOp *op, int ret)
>> +static void coroutine_fn mirror_iteration_done(MirrorOp *op, int ret)
>> {
>> MirrorBlockJob *s = op->s;
>> struct iovec *iov;
>
> I think we want s/qemu_coroutine_enter/aio_co_wake/ in mirror_iteration_done().
> As an AIO callback before, this didn't matter, but now we are in an terminating
> coroutine, so it is pointless to defer the termination, or even risky in that we
> are in a aio_context_acquire/release section, but have already decremented
> s->in_flight, which is fishy.
I guess I'll still do the replacement, regardless of whether the next
patch overwrites it again...
>> @@ -138,9 +142,8 @@ static void mirror_iteration_done(MirrorOp *op, int ret)
>> }
>> }
>>
>> -static void mirror_write_complete(void *opaque, int ret)
>> +static void coroutine_fn mirror_write_complete(MirrorOp *op, int ret)
>> {
>> - MirrorOp *op = opaque;
>> MirrorBlockJob *s = op->s;
>>
>> aio_context_acquire(blk_get_aio_context(s->common.blk));
>> @@ -157,9 +160,8 @@ static void mirror_write_complete(void *opaque, int ret)
>> aio_context_release(blk_get_aio_context(s->common.blk));
>> }
>>
>> -static void mirror_read_complete(void *opaque, int ret)
>> +static void coroutine_fn mirror_read_complete(MirrorOp *op, int ret)
>> {
>> - MirrorOp *op = opaque;
>> MirrorBlockJob *s = op->s;
>>
>> aio_context_acquire(blk_get_aio_context(s->common.blk));
>> @@ -174,8 +176,11 @@ static void mirror_read_complete(void *opaque, int ret)
>>
>> mirror_iteration_done(op, ret);
>> } else {
>> - blk_aio_pwritev(s->target, op->offset, &op->qiov,
>> - 0, mirror_write_complete, op);
>> + int ret;
>
> s/ret/ret2/ or drop the definition?
> because ret is already the paramter of the function.
Oh, right, yes, will do.
>> +
>> + ret = blk_co_pwritev(s->target, op->offset,
>> + op->qiov.size, &op->qiov, 0);
>> + mirror_write_complete(op, ret);
>> }
>> aio_context_release(blk_get_aio_context(s->common.blk));
>> }
>
> <snip>
>
>> +static void coroutine_fn mirror_co_discard(void *opaque)
>> +{
>> + MirrorOp *op = opaque;
>> + int ret;
>> +
>> + op->s->in_flight++;
>> + op->s->bytes_in_flight += op->bytes;
>> + *op->bytes_handled = op->bytes;
>> +
>> + ret = blk_co_pdiscard(op->s->target, op->offset, op->bytes);
>> + mirror_write_complete(op, ret);
>> }
>>
>> static unsigned mirror_perform(MirrorBlockJob *s, int64_t offset,
>> unsigned bytes, MirrorMethod mirror_method)
>
> Doesn't mirror_perform need coroutine_fn annotation too?
I don't think it needs one. We could give it one, but as far as I've
understood (which may be wrong), all functions that need to be run from
a coroutine need the tag -- but functions that may be called from either
coroutines or just normal code don't need it.
(And I think this function should be fine either way, so I don't think
it needs a tag.)
Also, thanks for reviewing! :-)
Max
On 2018-02-28 15:13, Max Reitz wrote:
> On 2018-02-27 08:44, Fam Zheng wrote:
>> On Mon, 01/22 23:07, Max Reitz wrote:
>>> @@ -101,7 +105,7 @@ static BlockErrorAction mirror_error_action(MirrorBlockJob *s, bool read,
>>> }
>>> }
>>>
>>> -static void mirror_iteration_done(MirrorOp *op, int ret)
>>> +static void coroutine_fn mirror_iteration_done(MirrorOp *op, int ret)
>>> {
>>> MirrorBlockJob *s = op->s;
>>> struct iovec *iov;
>>
>> I think we want s/qemu_coroutine_enter/aio_co_wake/ in mirror_iteration_done().
>> As an AIO callback before, this didn't matter, but now we are in an terminating
>> coroutine, so it is pointless to defer the termination, or even risky in that we
>> are in a aio_context_acquire/release section, but have already decremented
>> s->in_flight, which is fishy.
>
> I guess I'll still do the replacement, regardless of whether the next
> patch overwrites it again...
Maybe I don't. Doing this breaks iotest 041 because the
assert(data.done) in bdrv_co_yield_to_drain() fails.
Not sure why that is, but under the circumstance I guess it's best to
just pretend this never happened, continue to use qemu_coroutine_enter()
and just replace it in the next patch.
As for in_flight: What is the issue there? We mostly need that to know
how many I/O requests are actually running, that is, how much buffer
space is used, how many I/O is done concurrently, etc. (and later we
need the in-flight information so that we don't access the target in
overlapping areas concurrently). But it doesn't seem to be about how
many coroutines there are.
So as long as the s->in_flight decrement is done in the same critical
section as the op is deleted, we should be good...?
Max
On Wed, 02/28 18:07, Max Reitz wrote:
> On 2018-02-28 15:13, Max Reitz wrote:
> > On 2018-02-27 08:44, Fam Zheng wrote:
> >> On Mon, 01/22 23:07, Max Reitz wrote:
> >>> @@ -101,7 +105,7 @@ static BlockErrorAction mirror_error_action(MirrorBlockJob *s, bool read,
> >>> }
> >>> }
> >>>
> >>> -static void mirror_iteration_done(MirrorOp *op, int ret)
> >>> +static void coroutine_fn mirror_iteration_done(MirrorOp *op, int ret)
> >>> {
> >>> MirrorBlockJob *s = op->s;
> >>> struct iovec *iov;
> >>
> >> I think we want s/qemu_coroutine_enter/aio_co_wake/ in mirror_iteration_done().
> >> As an AIO callback before, this didn't matter, but now we are in an terminating
> >> coroutine, so it is pointless to defer the termination, or even risky in that we
> >> are in a aio_context_acquire/release section, but have already decremented
> >> s->in_flight, which is fishy.
> >
> > I guess I'll still do the replacement, regardless of whether the next
> > patch overwrites it again...
>
> Maybe I don't. Doing this breaks iotest 041 because the
> assert(data.done) in bdrv_co_yield_to_drain() fails.
>
> Not sure why that is, but under the circumstance I guess it's best to
> just pretend this never happened, continue to use qemu_coroutine_enter()
> and just replace it in the next patch.
>
> As for in_flight: What is the issue there? We mostly need that to know
> how many I/O requests are actually running, that is, how much buffer
> space is used, how many I/O is done concurrently, etc. (and later we
> need the in-flight information so that we don't access the target in
> overlapping areas concurrently). But it doesn't seem to be about how
> many coroutines there are.
>
> So as long as the s->in_flight decrement is done in the same critical
> section as the op is deleted, we should be good...?
I don't have a specific problem in my mind but is just generally concerned about
the "if (s->in_flight == 0)" checks around mirror_exit.
Fam
© 2016 - 2026 Red Hat, Inc.