In order to talk to the source BDS (and maybe in the future to the
target BDS as well) directly, we need to convert our existing AIO
requests into coroutine I/O requests.
Signed-off-by: Max Reitz <mreitz@redhat.com>
---
block/mirror.c | 134 +++++++++++++++++++++++++++++++++------------------------
1 file changed, 78 insertions(+), 56 deletions(-)
diff --git a/block/mirror.c b/block/mirror.c
index 4664b0516f..2b3297aa61 100644
--- a/block/mirror.c
+++ b/block/mirror.c
@@ -80,6 +80,9 @@ typedef struct MirrorOp {
QEMUIOVector qiov;
int64_t offset;
uint64_t bytes;
+
+ /* Set by mirror_co_read() before yielding for the first time */
+ uint64_t bytes_copied;
} MirrorOp;
typedef enum MirrorMethod {
@@ -101,7 +104,7 @@ static BlockErrorAction mirror_error_action(MirrorBlockJob *s, bool read,
}
}
-static void mirror_iteration_done(MirrorOp *op, int ret)
+static void coroutine_fn mirror_iteration_done(MirrorOp *op, int ret)
{
MirrorBlockJob *s = op->s;
struct iovec *iov;
@@ -138,9 +141,8 @@ static void mirror_iteration_done(MirrorOp *op, int ret)
}
}
-static void mirror_write_complete(void *opaque, int ret)
+static void coroutine_fn mirror_write_complete(MirrorOp *op, int ret)
{
- MirrorOp *op = opaque;
MirrorBlockJob *s = op->s;
aio_context_acquire(blk_get_aio_context(s->common.blk));
@@ -158,9 +160,8 @@ static void mirror_write_complete(void *opaque, int ret)
aio_context_release(blk_get_aio_context(s->common.blk));
}
-static void mirror_read_complete(void *opaque, int ret)
+static void coroutine_fn mirror_read_complete(MirrorOp *op, int ret)
{
- MirrorOp *op = opaque;
MirrorBlockJob *s = op->s;
aio_context_acquire(blk_get_aio_context(s->common.blk));
@@ -176,8 +177,11 @@ static void mirror_read_complete(void *opaque, int ret)
mirror_iteration_done(op, ret);
} else {
- blk_aio_pwritev(s->target, op->offset, &op->qiov,
- 0, mirror_write_complete, op);
+ int ret;
+
+ ret = blk_co_pwritev(s->target, op->offset,
+ op->qiov.size, &op->qiov, 0);
+ mirror_write_complete(op, ret);
}
aio_context_release(blk_get_aio_context(s->common.blk));
}
@@ -242,53 +246,49 @@ static inline void mirror_wait_for_io(MirrorBlockJob *s)
* (new_end - offset) if tail is rounded up or down due to
* alignment or buffer limit.
*/
-static uint64_t mirror_do_read(MirrorBlockJob *s, int64_t offset,
- uint64_t bytes)
+static void coroutine_fn mirror_co_read(void *opaque)
{
+ MirrorOp *op = opaque;
+ MirrorBlockJob *s = op->s;
BlockBackend *source = s->common.blk;
int nb_chunks;
uint64_t ret;
- MirrorOp *op;
uint64_t max_bytes;
max_bytes = s->granularity * s->max_iov;
/* We can only handle as much as buf_size at a time. */
- bytes = MIN(s->buf_size, MIN(max_bytes, bytes));
- assert(bytes);
- assert(bytes < BDRV_REQUEST_MAX_BYTES);
- ret = bytes;
+ op->bytes = MIN(s->buf_size, MIN(max_bytes, op->bytes));
+ assert(op->bytes);
+ assert(op->bytes < BDRV_REQUEST_MAX_BYTES);
+ op->bytes_copied = op->bytes;
if (s->cow_bitmap) {
- ret += mirror_cow_align(s, &offset, &bytes);
+ op->bytes_copied += mirror_cow_align(s, &op->offset, &op->bytes);
}
- assert(bytes <= s->buf_size);
+ /* Cannot exceed BDRV_REQUEST_MAX_BYTES + INT_MAX */
+ assert(op->bytes_copied <= UINT_MAX);
+ assert(op->bytes <= s->buf_size);
/* The offset is granularity-aligned because:
* 1) Caller passes in aligned values;
* 2) mirror_cow_align is used only when target cluster is larger. */
- assert(QEMU_IS_ALIGNED(offset, s->granularity));
+ assert(QEMU_IS_ALIGNED(op->offset, s->granularity));
/* The range is sector-aligned, since bdrv_getlength() rounds up. */
- assert(QEMU_IS_ALIGNED(bytes, BDRV_SECTOR_SIZE));
- nb_chunks = DIV_ROUND_UP(bytes, s->granularity);
+ assert(QEMU_IS_ALIGNED(op->bytes, BDRV_SECTOR_SIZE));
+ nb_chunks = DIV_ROUND_UP(op->bytes, s->granularity);
while (s->buf_free_count < nb_chunks) {
- trace_mirror_yield_in_flight(s, offset, s->in_flight);
+ trace_mirror_yield_in_flight(s, op->offset, s->in_flight);
mirror_wait_for_io(s);
}
- /* Allocate a MirrorOp that is used as an AIO callback. */
- op = g_new(MirrorOp, 1);
- op->s = s;
- op->offset = offset;
- op->bytes = bytes;
-
/* Now make a QEMUIOVector taking enough granularity-sized chunks
* from s->buf_free.
*/
qemu_iovec_init(&op->qiov, nb_chunks);
while (nb_chunks-- > 0) {
MirrorBuffer *buf = QSIMPLEQ_FIRST(&s->buf_free);
- size_t remaining = bytes - op->qiov.size;
+ size_t remaining = op->bytes - op->qiov.size;
QSIMPLEQ_REMOVE_HEAD(&s->buf_free, next);
s->buf_free_count--;
@@ -297,53 +297,75 @@ static uint64_t mirror_do_read(MirrorBlockJob *s, int64_t offset,
/* Copy the dirty cluster. */
s->in_flight++;
- s->bytes_in_flight += bytes;
- trace_mirror_one_iteration(s, offset, bytes);
+ s->bytes_in_flight += op->bytes;
+ trace_mirror_one_iteration(s, op->offset, op->bytes);
- blk_aio_preadv(source, offset, &op->qiov, 0, mirror_read_complete, op);
- return ret;
+ ret = blk_co_preadv(source, op->offset, op->bytes, &op->qiov, 0);
+ mirror_read_complete(op, ret);
}
-static void mirror_do_zero_or_discard(MirrorBlockJob *s,
- int64_t offset,
- uint64_t bytes,
- bool is_discard)
+static void coroutine_fn mirror_co_zero(void *opaque)
{
- MirrorOp *op;
+ MirrorOp *op = opaque;
+ int ret;
- /* Allocate a MirrorOp that is used as an AIO callback. The qiov is zeroed
- * so the freeing in mirror_iteration_done is nop. */
- op = g_new0(MirrorOp, 1);
- op->s = s;
- op->offset = offset;
- op->bytes = bytes;
+ op->s->in_flight++;
+ op->s->bytes_in_flight += op->bytes;
- s->in_flight++;
- s->bytes_in_flight += bytes;
- if (is_discard) {
- blk_aio_pdiscard(s->target, offset,
- op->bytes, mirror_write_complete, op);
- } else {
- blk_aio_pwrite_zeroes(s->target, offset,
- op->bytes, s->unmap ? BDRV_REQ_MAY_UNMAP : 0,
- mirror_write_complete, op);
- }
+ ret = blk_co_pwrite_zeroes(op->s->target, op->offset, op->bytes,
+ op->s->unmap ? BDRV_REQ_MAY_UNMAP : 0);
+ mirror_write_complete(op, ret);
+}
+
+static void coroutine_fn mirror_co_discard(void *opaque)
+{
+ MirrorOp *op = opaque;
+ int ret;
+
+ op->s->in_flight++;
+ op->s->bytes_in_flight += op->bytes;
+
+ ret = blk_co_pdiscard(op->s->target, op->offset, op->bytes);
+ mirror_write_complete(op, ret);
}
static unsigned mirror_perform(MirrorBlockJob *s, int64_t offset,
unsigned bytes, MirrorMethod mirror_method)
{
+ MirrorOp *op;
+ Coroutine *co;
+ unsigned ret = bytes;
+
+ op = g_new(MirrorOp, 1);
+ *op = (MirrorOp){
+ .s = s,
+ .offset = offset,
+ .bytes = bytes,
+ };
+
switch (mirror_method) {
case MIRROR_METHOD_COPY:
- return mirror_do_read(s, offset, bytes);
+ co = qemu_coroutine_create(mirror_co_read, op);
+ break;
case MIRROR_METHOD_ZERO:
+ co = qemu_coroutine_create(mirror_co_zero, op);
+ break;
case MIRROR_METHOD_DISCARD:
- mirror_do_zero_or_discard(s, offset, bytes,
- mirror_method == MIRROR_METHOD_DISCARD);
- return bytes;
+ co = qemu_coroutine_create(mirror_co_discard, op);
+ break;
default:
abort();
}
+
+ qemu_coroutine_enter(co);
+
+ if (mirror_method == MIRROR_METHOD_COPY) {
+ /* Same assertion as in mirror_co_read() */
+ assert(op->bytes_copied <= UINT_MAX);
+ ret = op->bytes_copied;
+ }
+
+ return ret;
}
static uint64_t coroutine_fn mirror_iteration(MirrorBlockJob *s)
--
2.13.5
On Wed, 09/13 20:18, Max Reitz wrote:
> In order to talk to the source BDS (and maybe in the future to the
> target BDS as well) directly, we need to convert our existing AIO
> requests into coroutine I/O requests.
>
> Signed-off-by: Max Reitz <mreitz@redhat.com>
> ---
> block/mirror.c | 134 +++++++++++++++++++++++++++++++++------------------------
> 1 file changed, 78 insertions(+), 56 deletions(-)
>
> diff --git a/block/mirror.c b/block/mirror.c
> index 4664b0516f..2b3297aa61 100644
> --- a/block/mirror.c
> +++ b/block/mirror.c
> @@ -80,6 +80,9 @@ typedef struct MirrorOp {
> QEMUIOVector qiov;
> int64_t offset;
> uint64_t bytes;
> +
> + /* Set by mirror_co_read() before yielding for the first time */
> + uint64_t bytes_copied;
> } MirrorOp;
>
> typedef enum MirrorMethod {
> @@ -101,7 +104,7 @@ static BlockErrorAction mirror_error_action(MirrorBlockJob *s, bool read,
> }
> }
>
> -static void mirror_iteration_done(MirrorOp *op, int ret)
> +static void coroutine_fn mirror_iteration_done(MirrorOp *op, int ret)
> {
> MirrorBlockJob *s = op->s;
> struct iovec *iov;
> @@ -138,9 +141,8 @@ static void mirror_iteration_done(MirrorOp *op, int ret)
> }
> }
>
> -static void mirror_write_complete(void *opaque, int ret)
> +static void coroutine_fn mirror_write_complete(MirrorOp *op, int ret)
> {
> - MirrorOp *op = opaque;
> MirrorBlockJob *s = op->s;
>
> aio_context_acquire(blk_get_aio_context(s->common.blk));
> @@ -158,9 +160,8 @@ static void mirror_write_complete(void *opaque, int ret)
> aio_context_release(blk_get_aio_context(s->common.blk));
> }
>
> -static void mirror_read_complete(void *opaque, int ret)
> +static void coroutine_fn mirror_read_complete(MirrorOp *op, int ret)
> {
> - MirrorOp *op = opaque;
> MirrorBlockJob *s = op->s;
>
> aio_context_acquire(blk_get_aio_context(s->common.blk));
> @@ -176,8 +177,11 @@ static void mirror_read_complete(void *opaque, int ret)
>
> mirror_iteration_done(op, ret);
> } else {
> - blk_aio_pwritev(s->target, op->offset, &op->qiov,
> - 0, mirror_write_complete, op);
> + int ret;
> +
> + ret = blk_co_pwritev(s->target, op->offset,
> + op->qiov.size, &op->qiov, 0);
> + mirror_write_complete(op, ret);
> }
> aio_context_release(blk_get_aio_context(s->common.blk));
> }
> @@ -242,53 +246,49 @@ static inline void mirror_wait_for_io(MirrorBlockJob *s)
> * (new_end - offset) if tail is rounded up or down due to
> * alignment or buffer limit.
> */
> -static uint64_t mirror_do_read(MirrorBlockJob *s, int64_t offset,
> - uint64_t bytes)
> +static void coroutine_fn mirror_co_read(void *opaque)
> {
> + MirrorOp *op = opaque;
> + MirrorBlockJob *s = op->s;
> BlockBackend *source = s->common.blk;
> int nb_chunks;
> uint64_t ret;
> - MirrorOp *op;
> uint64_t max_bytes;
>
> max_bytes = s->granularity * s->max_iov;
>
> /* We can only handle as much as buf_size at a time. */
> - bytes = MIN(s->buf_size, MIN(max_bytes, bytes));
> - assert(bytes);
> - assert(bytes < BDRV_REQUEST_MAX_BYTES);
> - ret = bytes;
> + op->bytes = MIN(s->buf_size, MIN(max_bytes, op->bytes));
> + assert(op->bytes);
> + assert(op->bytes < BDRV_REQUEST_MAX_BYTES);
> + op->bytes_copied = op->bytes;
>
> if (s->cow_bitmap) {
> - ret += mirror_cow_align(s, &offset, &bytes);
> + op->bytes_copied += mirror_cow_align(s, &op->offset, &op->bytes);
> }
> - assert(bytes <= s->buf_size);
> + /* Cannot exceed BDRV_REQUEST_MAX_BYTES + INT_MAX */
> + assert(op->bytes_copied <= UINT_MAX);
> + assert(op->bytes <= s->buf_size);
> /* The offset is granularity-aligned because:
> * 1) Caller passes in aligned values;
> * 2) mirror_cow_align is used only when target cluster is larger. */
> - assert(QEMU_IS_ALIGNED(offset, s->granularity));
> + assert(QEMU_IS_ALIGNED(op->offset, s->granularity));
> /* The range is sector-aligned, since bdrv_getlength() rounds up. */
> - assert(QEMU_IS_ALIGNED(bytes, BDRV_SECTOR_SIZE));
> - nb_chunks = DIV_ROUND_UP(bytes, s->granularity);
> + assert(QEMU_IS_ALIGNED(op->bytes, BDRV_SECTOR_SIZE));
> + nb_chunks = DIV_ROUND_UP(op->bytes, s->granularity);
>
> while (s->buf_free_count < nb_chunks) {
> - trace_mirror_yield_in_flight(s, offset, s->in_flight);
> + trace_mirror_yield_in_flight(s, op->offset, s->in_flight);
> mirror_wait_for_io(s);
> }
>
> - /* Allocate a MirrorOp that is used as an AIO callback. */
> - op = g_new(MirrorOp, 1);
> - op->s = s;
> - op->offset = offset;
> - op->bytes = bytes;
> -
> /* Now make a QEMUIOVector taking enough granularity-sized chunks
> * from s->buf_free.
> */
> qemu_iovec_init(&op->qiov, nb_chunks);
> while (nb_chunks-- > 0) {
> MirrorBuffer *buf = QSIMPLEQ_FIRST(&s->buf_free);
> - size_t remaining = bytes - op->qiov.size;
> + size_t remaining = op->bytes - op->qiov.size;
>
> QSIMPLEQ_REMOVE_HEAD(&s->buf_free, next);
> s->buf_free_count--;
> @@ -297,53 +297,75 @@ static uint64_t mirror_do_read(MirrorBlockJob *s, int64_t offset,
>
> /* Copy the dirty cluster. */
> s->in_flight++;
> - s->bytes_in_flight += bytes;
> - trace_mirror_one_iteration(s, offset, bytes);
> + s->bytes_in_flight += op->bytes;
> + trace_mirror_one_iteration(s, op->offset, op->bytes);
>
> - blk_aio_preadv(source, offset, &op->qiov, 0, mirror_read_complete, op);
> - return ret;
> + ret = blk_co_preadv(source, op->offset, op->bytes, &op->qiov, 0);
> + mirror_read_complete(op, ret);
> }
>
> -static void mirror_do_zero_or_discard(MirrorBlockJob *s,
> - int64_t offset,
> - uint64_t bytes,
> - bool is_discard)
> +static void coroutine_fn mirror_co_zero(void *opaque)
> {
> - MirrorOp *op;
> + MirrorOp *op = opaque;
> + int ret;
>
> - /* Allocate a MirrorOp that is used as an AIO callback. The qiov is zeroed
> - * so the freeing in mirror_iteration_done is nop. */
> - op = g_new0(MirrorOp, 1);
> - op->s = s;
> - op->offset = offset;
> - op->bytes = bytes;
> + op->s->in_flight++;
> + op->s->bytes_in_flight += op->bytes;
>
> - s->in_flight++;
> - s->bytes_in_flight += bytes;
> - if (is_discard) {
> - blk_aio_pdiscard(s->target, offset,
> - op->bytes, mirror_write_complete, op);
> - } else {
> - blk_aio_pwrite_zeroes(s->target, offset,
> - op->bytes, s->unmap ? BDRV_REQ_MAY_UNMAP : 0,
> - mirror_write_complete, op);
> - }
> + ret = blk_co_pwrite_zeroes(op->s->target, op->offset, op->bytes,
> + op->s->unmap ? BDRV_REQ_MAY_UNMAP : 0);
> + mirror_write_complete(op, ret);
> +}
> +
> +static void coroutine_fn mirror_co_discard(void *opaque)
> +{
> + MirrorOp *op = opaque;
> + int ret;
> +
> + op->s->in_flight++;
> + op->s->bytes_in_flight += op->bytes;
> +
> + ret = blk_co_pdiscard(op->s->target, op->offset, op->bytes);
> + mirror_write_complete(op, ret);
> }
>
> static unsigned mirror_perform(MirrorBlockJob *s, int64_t offset,
> unsigned bytes, MirrorMethod mirror_method)
> {
> + MirrorOp *op;
> + Coroutine *co;
> + unsigned ret = bytes;
> +
> + op = g_new(MirrorOp, 1);
> + *op = (MirrorOp){
> + .s = s,
> + .offset = offset,
> + .bytes = bytes,
> + };
> +
> switch (mirror_method) {
> case MIRROR_METHOD_COPY:
> - return mirror_do_read(s, offset, bytes);
> + co = qemu_coroutine_create(mirror_co_read, op);
> + break;
> case MIRROR_METHOD_ZERO:
> + co = qemu_coroutine_create(mirror_co_zero, op);
> + break;
> case MIRROR_METHOD_DISCARD:
> - mirror_do_zero_or_discard(s, offset, bytes,
> - mirror_method == MIRROR_METHOD_DISCARD);
> - return bytes;
> + co = qemu_coroutine_create(mirror_co_discard, op);
> + break;
> default:
> abort();
> }
> +
> + qemu_coroutine_enter(co);
> +
> + if (mirror_method == MIRROR_METHOD_COPY) {
> + /* Same assertion as in mirror_co_read() */
> + assert(op->bytes_copied <= UINT_MAX);
> + ret = op->bytes_copied;
> + }
This special casing is a bit ugly. Can you just make mirror_co_zero and
mirror_co_discard set op->bytes_copied too? (and perhaps rename to
op->bytes_handled) If so the comment in MirrorOp needs an update too.
And is it better to initialize it to -1 before entering coroutine, then assert
it is != -1 afterwards?
> +
> + return ret;
> }
>
> static uint64_t coroutine_fn mirror_iteration(MirrorBlockJob *s)
> --
> 2.13.5
>
Fam
On 2017-09-18 08:02, Fam Zheng wrote:
> On Wed, 09/13 20:18, Max Reitz wrote:
>> In order to talk to the source BDS (and maybe in the future to the
>> target BDS as well) directly, we need to convert our existing AIO
>> requests into coroutine I/O requests.
>>
>> Signed-off-by: Max Reitz <mreitz@redhat.com>
>> ---
>> block/mirror.c | 134 +++++++++++++++++++++++++++++++++------------------------
>> 1 file changed, 78 insertions(+), 56 deletions(-)
>>
>> diff --git a/block/mirror.c b/block/mirror.c
>> index 4664b0516f..2b3297aa61 100644
>> --- a/block/mirror.c
>> +++ b/block/mirror.c
>> @@ -80,6 +80,9 @@ typedef struct MirrorOp {
>> QEMUIOVector qiov;
>> int64_t offset;
>> uint64_t bytes;
>> +
>> + /* Set by mirror_co_read() before yielding for the first time */
>> + uint64_t bytes_copied;
>> } MirrorOp;
>>
>> typedef enum MirrorMethod {
>> @@ -101,7 +104,7 @@ static BlockErrorAction mirror_error_action(MirrorBlockJob *s, bool read,
>> }
>> }
>>
>> -static void mirror_iteration_done(MirrorOp *op, int ret)
>> +static void coroutine_fn mirror_iteration_done(MirrorOp *op, int ret)
>> {
>> MirrorBlockJob *s = op->s;
>> struct iovec *iov;
>> @@ -138,9 +141,8 @@ static void mirror_iteration_done(MirrorOp *op, int ret)
>> }
>> }
>>
>> -static void mirror_write_complete(void *opaque, int ret)
>> +static void coroutine_fn mirror_write_complete(MirrorOp *op, int ret)
>> {
>> - MirrorOp *op = opaque;
>> MirrorBlockJob *s = op->s;
>>
>> aio_context_acquire(blk_get_aio_context(s->common.blk));
>> @@ -158,9 +160,8 @@ static void mirror_write_complete(void *opaque, int ret)
>> aio_context_release(blk_get_aio_context(s->common.blk));
>> }
>>
>> -static void mirror_read_complete(void *opaque, int ret)
>> +static void coroutine_fn mirror_read_complete(MirrorOp *op, int ret)
>> {
>> - MirrorOp *op = opaque;
>> MirrorBlockJob *s = op->s;
>>
>> aio_context_acquire(blk_get_aio_context(s->common.blk));
>> @@ -176,8 +177,11 @@ static void mirror_read_complete(void *opaque, int ret)
>>
>> mirror_iteration_done(op, ret);
>> } else {
>> - blk_aio_pwritev(s->target, op->offset, &op->qiov,
>> - 0, mirror_write_complete, op);
>> + int ret;
>> +
>> + ret = blk_co_pwritev(s->target, op->offset,
>> + op->qiov.size, &op->qiov, 0);
>> + mirror_write_complete(op, ret);
>> }
>> aio_context_release(blk_get_aio_context(s->common.blk));
>> }
>> @@ -242,53 +246,49 @@ static inline void mirror_wait_for_io(MirrorBlockJob *s)
>> * (new_end - offset) if tail is rounded up or down due to
>> * alignment or buffer limit.
>> */
>> -static uint64_t mirror_do_read(MirrorBlockJob *s, int64_t offset,
>> - uint64_t bytes)
>> +static void coroutine_fn mirror_co_read(void *opaque)
>> {
>> + MirrorOp *op = opaque;
>> + MirrorBlockJob *s = op->s;
>> BlockBackend *source = s->common.blk;
>> int nb_chunks;
>> uint64_t ret;
>> - MirrorOp *op;
>> uint64_t max_bytes;
>>
>> max_bytes = s->granularity * s->max_iov;
>>
>> /* We can only handle as much as buf_size at a time. */
>> - bytes = MIN(s->buf_size, MIN(max_bytes, bytes));
>> - assert(bytes);
>> - assert(bytes < BDRV_REQUEST_MAX_BYTES);
>> - ret = bytes;
>> + op->bytes = MIN(s->buf_size, MIN(max_bytes, op->bytes));
>> + assert(op->bytes);
>> + assert(op->bytes < BDRV_REQUEST_MAX_BYTES);
>> + op->bytes_copied = op->bytes;
>>
>> if (s->cow_bitmap) {
>> - ret += mirror_cow_align(s, &offset, &bytes);
>> + op->bytes_copied += mirror_cow_align(s, &op->offset, &op->bytes);
>> }
>> - assert(bytes <= s->buf_size);
>> + /* Cannot exceed BDRV_REQUEST_MAX_BYTES + INT_MAX */
>> + assert(op->bytes_copied <= UINT_MAX);
>> + assert(op->bytes <= s->buf_size);
>> /* The offset is granularity-aligned because:
>> * 1) Caller passes in aligned values;
>> * 2) mirror_cow_align is used only when target cluster is larger. */
>> - assert(QEMU_IS_ALIGNED(offset, s->granularity));
>> + assert(QEMU_IS_ALIGNED(op->offset, s->granularity));
>> /* The range is sector-aligned, since bdrv_getlength() rounds up. */
>> - assert(QEMU_IS_ALIGNED(bytes, BDRV_SECTOR_SIZE));
>> - nb_chunks = DIV_ROUND_UP(bytes, s->granularity);
>> + assert(QEMU_IS_ALIGNED(op->bytes, BDRV_SECTOR_SIZE));
>> + nb_chunks = DIV_ROUND_UP(op->bytes, s->granularity);
>>
>> while (s->buf_free_count < nb_chunks) {
>> - trace_mirror_yield_in_flight(s, offset, s->in_flight);
>> + trace_mirror_yield_in_flight(s, op->offset, s->in_flight);
>> mirror_wait_for_io(s);
>> }
>>
>> - /* Allocate a MirrorOp that is used as an AIO callback. */
>> - op = g_new(MirrorOp, 1);
>> - op->s = s;
>> - op->offset = offset;
>> - op->bytes = bytes;
>> -
>> /* Now make a QEMUIOVector taking enough granularity-sized chunks
>> * from s->buf_free.
>> */
>> qemu_iovec_init(&op->qiov, nb_chunks);
>> while (nb_chunks-- > 0) {
>> MirrorBuffer *buf = QSIMPLEQ_FIRST(&s->buf_free);
>> - size_t remaining = bytes - op->qiov.size;
>> + size_t remaining = op->bytes - op->qiov.size;
>>
>> QSIMPLEQ_REMOVE_HEAD(&s->buf_free, next);
>> s->buf_free_count--;
>> @@ -297,53 +297,75 @@ static uint64_t mirror_do_read(MirrorBlockJob *s, int64_t offset,
>>
>> /* Copy the dirty cluster. */
>> s->in_flight++;
>> - s->bytes_in_flight += bytes;
>> - trace_mirror_one_iteration(s, offset, bytes);
>> + s->bytes_in_flight += op->bytes;
>> + trace_mirror_one_iteration(s, op->offset, op->bytes);
>>
>> - blk_aio_preadv(source, offset, &op->qiov, 0, mirror_read_complete, op);
>> - return ret;
>> + ret = blk_co_preadv(source, op->offset, op->bytes, &op->qiov, 0);
>> + mirror_read_complete(op, ret);
>> }
>>
>> -static void mirror_do_zero_or_discard(MirrorBlockJob *s,
>> - int64_t offset,
>> - uint64_t bytes,
>> - bool is_discard)
>> +static void coroutine_fn mirror_co_zero(void *opaque)
>> {
>> - MirrorOp *op;
>> + MirrorOp *op = opaque;
>> + int ret;
>>
>> - /* Allocate a MirrorOp that is used as an AIO callback. The qiov is zeroed
>> - * so the freeing in mirror_iteration_done is nop. */
>> - op = g_new0(MirrorOp, 1);
>> - op->s = s;
>> - op->offset = offset;
>> - op->bytes = bytes;
>> + op->s->in_flight++;
>> + op->s->bytes_in_flight += op->bytes;
>>
>> - s->in_flight++;
>> - s->bytes_in_flight += bytes;
>> - if (is_discard) {
>> - blk_aio_pdiscard(s->target, offset,
>> - op->bytes, mirror_write_complete, op);
>> - } else {
>> - blk_aio_pwrite_zeroes(s->target, offset,
>> - op->bytes, s->unmap ? BDRV_REQ_MAY_UNMAP : 0,
>> - mirror_write_complete, op);
>> - }
>> + ret = blk_co_pwrite_zeroes(op->s->target, op->offset, op->bytes,
>> + op->s->unmap ? BDRV_REQ_MAY_UNMAP : 0);
>> + mirror_write_complete(op, ret);
>> +}
>> +
>> +static void coroutine_fn mirror_co_discard(void *opaque)
>> +{
>> + MirrorOp *op = opaque;
>> + int ret;
>> +
>> + op->s->in_flight++;
>> + op->s->bytes_in_flight += op->bytes;
>> +
>> + ret = blk_co_pdiscard(op->s->target, op->offset, op->bytes);
>> + mirror_write_complete(op, ret);
>> }
>>
>> static unsigned mirror_perform(MirrorBlockJob *s, int64_t offset,
>> unsigned bytes, MirrorMethod mirror_method)
>> {
>> + MirrorOp *op;
>> + Coroutine *co;
>> + unsigned ret = bytes;
>> +
>> + op = g_new(MirrorOp, 1);
>> + *op = (MirrorOp){
>> + .s = s,
>> + .offset = offset,
>> + .bytes = bytes,
>> + };
>> +
>> switch (mirror_method) {
>> case MIRROR_METHOD_COPY:
>> - return mirror_do_read(s, offset, bytes);
>> + co = qemu_coroutine_create(mirror_co_read, op);
>> + break;
>> case MIRROR_METHOD_ZERO:
>> + co = qemu_coroutine_create(mirror_co_zero, op);
>> + break;
>> case MIRROR_METHOD_DISCARD:
>> - mirror_do_zero_or_discard(s, offset, bytes,
>> - mirror_method == MIRROR_METHOD_DISCARD);
>> - return bytes;
>> + co = qemu_coroutine_create(mirror_co_discard, op);
>> + break;
>> default:
>> abort();
>> }
>> +
>> + qemu_coroutine_enter(co);
>> +
>> + if (mirror_method == MIRROR_METHOD_COPY) {
>> + /* Same assertion as in mirror_co_read() */
>> + assert(op->bytes_copied <= UINT_MAX);
>> + ret = op->bytes_copied;
>> + }
>
> This special casing is a bit ugly. Can you just make mirror_co_zero and
> mirror_co_discard set op->bytes_copied too? (and perhaps rename to
> op->bytes_handled) If so the comment in MirrorOp needs an update too.
Sure.
> And is it better to initialize it to -1 before entering coroutine, then assert
> it is != -1 afterwards?
Sounds good, will do.
Max
Am 13.09.2017 um 20:18 hat Max Reitz geschrieben: > In order to talk to the source BDS (and maybe in the future to the > target BDS as well) directly, we need to convert our existing AIO > requests into coroutine I/O requests. > > Signed-off-by: Max Reitz <mreitz@redhat.com> Please follow through with it and add a few patches that turn it into natural coroutine code rather than just any coroutine code. I know I did the same kind of half-assed conversion in qed, but mirror is code that is actually used and that people look at for more than just a bad example. You'll probably notice more things when you do this, but the obvious things would be changing mirror_co_read() into a mirror_co_copy() with the former callbacks inlined; keeping op on the stack instead of mallocing it in mirror_perform() and free it deep inside the nested functions that used to be callbacks; and probably also cleaning up the random calls to aio_context_acquire/release() that will now appear in the middle of the function. Anyway, that's for follow-up patches (though ideally in the same series), so for this one you can have: Reviewed-by: Kevin Wolf <kwolf@redhat.com>
On 2017-10-10 11:14, Kevin Wolf wrote: > Am 13.09.2017 um 20:18 hat Max Reitz geschrieben: >> In order to talk to the source BDS (and maybe in the future to the >> target BDS as well) directly, we need to convert our existing AIO >> requests into coroutine I/O requests. >> >> Signed-off-by: Max Reitz <mreitz@redhat.com> > > Please follow through with it and add a few patches that turn it into > natural coroutine code rather than just any coroutine code. I know I did > the same kind of half-assed conversion in qed, but mirror is code that > is actually used and that people look at for more than just a bad > example. > > You'll probably notice more things when you do this, but the obvious > things would be changing mirror_co_read() into a mirror_co_copy() with > the former callbacks inlined; keeping op on the stack instead of > mallocing it in mirror_perform() and free it deep inside the nested > functions that used to be callbacks; and probably also cleaning up the > random calls to aio_context_acquire/release() that will now appear in > the middle of the function. > > Anyway, that's for follow-up patches (though ideally in the same > series), so for this one you can have: > > Reviewed-by: Kevin Wolf <kwolf@redhat.com> Phew. :-) I think I'll write the patches (while working on v2), but I'll send them as a follow-up. Max
© 2016 - 2026 Red Hat, Inc.