:p
atchew
Login
The existing blk_io_plug() API is not block layer multi-queue friendly because the plug state is per-BlockDriverState. Change blk_io_plug()'s implementation so it is thread-local. This is done by introducing the blk_io_plug_call() function that block drivers use to batch calls while plugged. It is relatively easy to convert block drivers from .bdrv_co_io_plug() to blk_io_plug_call(). Random read 4KB performance with virtio-blk on a host NVMe block device: iodepth iops change vs today 1 45612 -4% 2 87967 +2% 4 129872 +0% 8 171096 -3% 16 194508 -4% 32 208947 -1% 64 217647 +0% 128 229629 +0% The results are within the noise for these benchmarks. This is to be expected because the plugging behavior for a single thread hasn't changed in this patch series, only that the state is thread-local now. The following graph compares several approaches: https://vmsplice.net/~stefan/blk_io_plug-thread-local.png - v7.2.0: before most of the multi-queue block layer changes landed. - with-blk_io_plug: today's post-8.0.0 QEMU. - blk_io_plug-thread-local: this patch series. - no-blk_io_plug: what happens when we simply remove plugging? - call-after-dispatch: what if we integrate plugging into the event loop? I decided against this approach in the end because it's more likely to introduce performance regressions since I/O submission is deferred until the end of the event loop iteration. Aside from the no-blk_io_plug case, which bottlenecks much earlier than the others, we see that all plugging approaches are more or less equivalent in this benchmark. It is also clear that QEMU 8.0.0 has lower performance than 7.2.0. The Ansible playbook, fio results, and a Jupyter notebook are available here: https://github.com/stefanha/qemu-perf/tree/remove-blk_io_plug Stefan Hajnoczi (6): block: add blk_io_plug_call() API block/nvme: convert to blk_io_plug_call() API block/blkio: convert to blk_io_plug_call() API block/io_uring: convert to blk_io_plug_call() API block/linux-aio: convert to blk_io_plug_call() API block: remove bdrv_co_io_plug() API MAINTAINERS | 1 + include/block/block-io.h | 3 - include/block/block_int-common.h | 11 --- include/block/raw-aio.h | 14 --- include/sysemu/block-backend-io.h | 13 +-- block/blkio.c | 40 ++++---- block/block-backend.c | 22 ----- block/file-posix.c | 38 ------- block/io.c | 37 ------- block/io_uring.c | 45 ++++----- block/linux-aio.c | 41 +++----- block/nvme.c | 44 +++------ block/plug.c | 159 ++++++++++++++++++++++++++++++ hw/block/dataplane/xen-block.c | 8 +- hw/block/virtio-blk.c | 4 +- hw/scsi/virtio-scsi.c | 6 +- block/meson.build | 1 + block/trace-events | 5 +- 18 files changed, 236 insertions(+), 256 deletions(-) create mode 100644 block/plug.c -- 2.40.1
Introduce a new API for thread-local blk_io_plug() that does not traverse the block graph. The goal is to make blk_io_plug() multi-queue friendly. Instead of having block drivers track whether or not we're in a plugged section, provide an API that allows them to defer a function call until we're unplugged: blk_io_plug_call(fn, opaque). If blk_io_plug_call() is called multiple times with the same fn/opaque pair, then fn() is only called once at the end of the function - resulting in batching. This patch introduces the API and changes blk_io_plug()/blk_io_unplug(). blk_io_plug()/blk_io_unplug() no longer require a BlockBackend argument because the plug state is now thread-local. Later patches convert block drivers to blk_io_plug_call() and then we can finally remove .bdrv_co_io_plug() once all block drivers have been converted. Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com> --- MAINTAINERS | 1 + include/sysemu/block-backend-io.h | 13 +-- block/block-backend.c | 22 ----- block/plug.c | 159 ++++++++++++++++++++++++++++++ hw/block/dataplane/xen-block.c | 8 +- hw/block/virtio-blk.c | 4 +- hw/scsi/virtio-scsi.c | 6 +- block/meson.build | 1 + 8 files changed, 173 insertions(+), 41 deletions(-) create mode 100644 block/plug.c diff --git a/MAINTAINERS b/MAINTAINERS index XXXXXXX..XXXXXXX 100644 --- a/MAINTAINERS +++ b/MAINTAINERS @@ -XXX,XX +XXX,XX @@ F: util/aio-*.c F: util/aio-*.h F: util/fdmon-*.c F: block/io.c +F: block/plug.c F: migration/block* F: include/block/aio.h F: include/block/aio-wait.h diff --git a/include/sysemu/block-backend-io.h b/include/sysemu/block-backend-io.h index XXXXXXX..XXXXXXX 100644 --- a/include/sysemu/block-backend-io.h +++ b/include/sysemu/block-backend-io.h @@ -XXX,XX +XXX,XX @@ void blk_iostatus_set_err(BlockBackend *blk, int error); int blk_get_max_iov(BlockBackend *blk); int blk_get_max_hw_iov(BlockBackend *blk); -/* - * blk_io_plug/unplug are thread-local operations. This means that multiple - * IOThreads can simultaneously call plug/unplug, but the caller must ensure - * that each unplug() is called in the same IOThread of the matching plug(). - */ -void coroutine_fn blk_co_io_plug(BlockBackend *blk); -void co_wrapper blk_io_plug(BlockBackend *blk); - -void coroutine_fn blk_co_io_unplug(BlockBackend *blk); -void co_wrapper blk_io_unplug(BlockBackend *blk); +void blk_io_plug(void); +void blk_io_unplug(void); +void blk_io_plug_call(void (*fn)(void *), void *opaque); AioContext *blk_get_aio_context(BlockBackend *blk); BlockAcctStats *blk_get_stats(BlockBackend *blk); diff --git a/block/block-backend.c b/block/block-backend.c index XXXXXXX..XXXXXXX 100644 --- a/block/block-backend.c +++ b/block/block-backend.c @@ -XXX,XX +XXX,XX @@ void blk_add_insert_bs_notifier(BlockBackend *blk, Notifier *notify) notifier_list_add(&blk->insert_bs_notifiers, notify); } -void coroutine_fn blk_co_io_plug(BlockBackend *blk) -{ - BlockDriverState *bs = blk_bs(blk); - IO_CODE(); - GRAPH_RDLOCK_GUARD(); - - if (bs) { - bdrv_co_io_plug(bs); - } -} - -void coroutine_fn blk_co_io_unplug(BlockBackend *blk) -{ - BlockDriverState *bs = blk_bs(blk); - IO_CODE(); - GRAPH_RDLOCK_GUARD(); - - if (bs) { - bdrv_co_io_unplug(bs); - } -} - BlockAcctStats *blk_get_stats(BlockBackend *blk) { IO_CODE(); diff --git a/block/plug.c b/block/plug.c new file mode 100644 index XXXXXXX..XXXXXXX --- /dev/null +++ b/block/plug.c @@ -XXX,XX +XXX,XX @@ +/* SPDX-License-Identifier: GPL-2.0-or-later */ +/* + * Block I/O plugging + * + * Copyright Red Hat. + * + * This API defers a function call within a blk_io_plug()/blk_io_unplug() + * section, allowing multiple calls to batch up. This is a performance + * optimization that is used in the block layer to submit several I/O requests + * at once instead of individually: + * + * blk_io_plug(); <-- start of plugged region + * ... + * blk_io_plug_call(my_func, my_obj); <-- deferred my_func(my_obj) call + * blk_io_plug_call(my_func, my_obj); <-- another + * blk_io_plug_call(my_func, my_obj); <-- another + * ... + * blk_io_unplug(); <-- end of plugged region, my_func(my_obj) is called once + * + * This code is actually generic and not tied to the block layer. If another + * subsystem needs this functionality, it could be renamed. + */ + +#include "qemu/osdep.h" +#include "qemu/coroutine-tls.h" +#include "qemu/notify.h" +#include "qemu/thread.h" +#include "sysemu/block-backend.h" + +/* A function call that has been deferred until unplug() */ +typedef struct { + void (*fn)(void *); + void *opaque; +} UnplugFn; + +/* Per-thread state */ +typedef struct { + unsigned count; /* how many times has plug() been called? */ + GArray *unplug_fns; /* functions to call at unplug time */ +} Plug; + +/* Use get_ptr_plug() to fetch this thread-local value */ +QEMU_DEFINE_STATIC_CO_TLS(Plug, plug); + +/* Called at thread cleanup time */ +static void blk_io_plug_atexit(Notifier *n, void *value) +{ + Plug *plug = get_ptr_plug(); + g_array_free(plug->unplug_fns, TRUE); +} + +/* This won't involve coroutines, so use __thread */ +static __thread Notifier blk_io_plug_atexit_notifier; + +/** + * blk_io_plug_call: + * @fn: a function pointer to be invoked + * @opaque: a user-defined argument to @fn() + * + * Call @fn(@opaque) immediately if not within a blk_io_plug()/blk_io_unplug() + * section. + * + * Otherwise defer the call until the end of the outermost + * blk_io_plug()/blk_io_unplug() section in this thread. If the same + * @fn/@opaque pair has already been deferred, it will only be called once upon + * blk_io_unplug() so that accumulated calls are batched into a single call. + * + * The caller must ensure that @opaque is not be freed before @fn() is invoked. + */ +void blk_io_plug_call(void (*fn)(void *), void *opaque) +{ + Plug *plug = get_ptr_plug(); + + /* Call immediately if we're not plugged */ + if (plug->count == 0) { + fn(opaque); + return; + } + + GArray *array = plug->unplug_fns; + if (!array) { + array = g_array_new(FALSE, FALSE, sizeof(UnplugFn)); + plug->unplug_fns = array; + blk_io_plug_atexit_notifier.notify = blk_io_plug_atexit; + qemu_thread_atexit_add(&blk_io_plug_atexit_notifier); + } + + UnplugFn *fns = (UnplugFn *)array->data; + UnplugFn new_fn = { + .fn = fn, + .opaque = opaque, + }; + + /* + * There won't be many, so do a linear search. If this becomes a bottleneck + * then a binary search (glib 2.62+) or different data structure could be + * used. + */ + for (guint i = 0; i < array->len; i++) { + if (memcmp(&fns[i], &new_fn, sizeof(new_fn)) == 0) { + return; /* already exists */ + } + } + + g_array_append_val(array, new_fn); +} + +/** + * blk_io_plug: Defer blk_io_plug_call() functions until blk_io_unplug() + * + * blk_io_plug/unplug are thread-local operations. This means that multiple + * threads can simultaneously call plug/unplug, but the caller must ensure that + * each unplug() is called in the same thread of the matching plug(). + * + * Nesting is supported. blk_io_plug_call() functions are only called at the + * outermost blk_io_unplug(). + */ +void blk_io_plug(void) +{ + Plug *plug = get_ptr_plug(); + + assert(plug->count < UINT32_MAX); + + plug->count++; +} + +/** + * blk_io_unplug: Run any pending blk_io_plug_call() functions + * + * There must have been a matching blk_io_plug() call in the same thread prior + * to this blk_io_unplug() call. + */ +void blk_io_unplug(void) +{ + Plug *plug = get_ptr_plug(); + + assert(plug->count > 0); + + if (--plug->count > 0) { + return; + } + + GArray *array = plug->unplug_fns; + if (!array) { + return; + } + + UnplugFn *fns = (UnplugFn *)array->data; + + for (guint i = 0; i < array->len; i++) { + fns[i].fn(fns[i].opaque); + } + + /* + * This resets the array without freeing memory so that appending is cheap + * in the future. + */ + g_array_set_size(array, 0); +} diff --git a/hw/block/dataplane/xen-block.c b/hw/block/dataplane/xen-block.c index XXXXXXX..XXXXXXX 100644 --- a/hw/block/dataplane/xen-block.c +++ b/hw/block/dataplane/xen-block.c @@ -XXX,XX +XXX,XX @@ static bool xen_block_handle_requests(XenBlockDataPlane *dataplane) * is below us. */ if (inflight_atstart > IO_PLUG_THRESHOLD) { - blk_io_plug(dataplane->blk); + blk_io_plug(); } while (rc != rp) { /* pull request from ring */ @@ -XXX,XX +XXX,XX @@ static bool xen_block_handle_requests(XenBlockDataPlane *dataplane) if (inflight_atstart > IO_PLUG_THRESHOLD && batched >= inflight_atstart) { - blk_io_unplug(dataplane->blk); + blk_io_unplug(); } xen_block_do_aio(request); if (inflight_atstart > IO_PLUG_THRESHOLD) { if (batched >= inflight_atstart) { - blk_io_plug(dataplane->blk); + blk_io_plug(); batched = 0; } else { batched++; @@ -XXX,XX +XXX,XX @@ static bool xen_block_handle_requests(XenBlockDataPlane *dataplane) } } if (inflight_atstart > IO_PLUG_THRESHOLD) { - blk_io_unplug(dataplane->blk); + blk_io_unplug(); } return done_something; diff --git a/hw/block/virtio-blk.c b/hw/block/virtio-blk.c index XXXXXXX..XXXXXXX 100644 --- a/hw/block/virtio-blk.c +++ b/hw/block/virtio-blk.c @@ -XXX,XX +XXX,XX @@ void virtio_blk_handle_vq(VirtIOBlock *s, VirtQueue *vq) bool suppress_notifications = virtio_queue_get_notification(vq); aio_context_acquire(blk_get_aio_context(s->blk)); - blk_io_plug(s->blk); + blk_io_plug(); do { if (suppress_notifications) { @@ -XXX,XX +XXX,XX @@ void virtio_blk_handle_vq(VirtIOBlock *s, VirtQueue *vq) virtio_blk_submit_multireq(s, &mrb); } - blk_io_unplug(s->blk); + blk_io_unplug(); aio_context_release(blk_get_aio_context(s->blk)); } diff --git a/hw/scsi/virtio-scsi.c b/hw/scsi/virtio-scsi.c index XXXXXXX..XXXXXXX 100644 --- a/hw/scsi/virtio-scsi.c +++ b/hw/scsi/virtio-scsi.c @@ -XXX,XX +XXX,XX @@ static int virtio_scsi_handle_cmd_req_prepare(VirtIOSCSI *s, VirtIOSCSIReq *req) return -ENOBUFS; } scsi_req_ref(req->sreq); - blk_io_plug(d->conf.blk); + blk_io_plug(); object_unref(OBJECT(d)); return 0; } @@ -XXX,XX +XXX,XX @@ static void virtio_scsi_handle_cmd_req_submit(VirtIOSCSI *s, VirtIOSCSIReq *req) if (scsi_req_enqueue(sreq)) { scsi_req_continue(sreq); } - blk_io_unplug(sreq->dev->conf.blk); + blk_io_unplug(); scsi_req_unref(sreq); } @@ -XXX,XX +XXX,XX @@ static void virtio_scsi_handle_cmd_vq(VirtIOSCSI *s, VirtQueue *vq) while (!QTAILQ_EMPTY(&reqs)) { req = QTAILQ_FIRST(&reqs); QTAILQ_REMOVE(&reqs, req, next); - blk_io_unplug(req->sreq->dev->conf.blk); + blk_io_unplug(); scsi_req_unref(req->sreq); virtqueue_detach_element(req->vq, &req->elem, 0); virtio_scsi_free_req(req); diff --git a/block/meson.build b/block/meson.build index XXXXXXX..XXXXXXX 100644 --- a/block/meson.build +++ b/block/meson.build @@ -XXX,XX +XXX,XX @@ block_ss.add(files( 'mirror.c', 'nbd.c', 'null.c', + 'plug.c', 'qapi.c', 'qcow2-bitmap.c', 'qcow2-cache.c', -- 2.40.1
Stop using the .bdrv_co_io_plug() API because it is not multi-queue block layer friendly. Use the new blk_io_plug_call() API to batch I/O submission instead. Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com> --- block/nvme.c | 44 ++++++++++++-------------------------------- 1 file changed, 12 insertions(+), 32 deletions(-) diff --git a/block/nvme.c b/block/nvme.c index XXXXXXX..XXXXXXX 100644 --- a/block/nvme.c +++ b/block/nvme.c @@ -XXX,XX +XXX,XX @@ #include "qemu/vfio-helpers.h" #include "block/block-io.h" #include "block/block_int.h" +#include "sysemu/block-backend.h" #include "sysemu/replay.h" #include "trace.h" @@ -XXX,XX +XXX,XX @@ struct BDRVNVMeState { int blkshift; uint64_t max_transfer; - bool plugged; bool supports_write_zeroes; bool supports_discard; @@ -XXX,XX +XXX,XX @@ static void nvme_kick(NVMeQueuePair *q) { BDRVNVMeState *s = q->s; - if (s->plugged || !q->need_kick) { + if (!q->need_kick) { return; } trace_nvme_kick(s, q->index); @@ -XXX,XX +XXX,XX @@ static bool nvme_process_completion(NVMeQueuePair *q) NvmeCqe *c; trace_nvme_process_completion(s, q->index, q->inflight); - if (s->plugged) { - trace_nvme_process_completion_queue_plugged(s, q->index); - return false; - } /* * Support re-entrancy when a request cb() function invokes aio_poll(). @@ -XXX,XX +XXX,XX @@ static void nvme_trace_command(const NvmeCmd *cmd) } } +static void nvme_unplug_fn(void *opaque) +{ + NVMeQueuePair *q = opaque; + + QEMU_LOCK_GUARD(&q->lock); + nvme_kick(q); + nvme_process_completion(q); +} + static void nvme_submit_command(NVMeQueuePair *q, NVMeRequest *req, NvmeCmd *cmd, BlockCompletionFunc cb, void *opaque) @@ -XXX,XX +XXX,XX @@ static void nvme_submit_command(NVMeQueuePair *q, NVMeRequest *req, q->sq.tail * NVME_SQ_ENTRY_BYTES, cmd, sizeof(*cmd)); q->sq.tail = (q->sq.tail + 1) % NVME_QUEUE_SIZE; q->need_kick++; - nvme_kick(q); - nvme_process_completion(q); + blk_io_plug_call(nvme_unplug_fn, q); qemu_mutex_unlock(&q->lock); } @@ -XXX,XX +XXX,XX @@ static void nvme_attach_aio_context(BlockDriverState *bs, } } -static void coroutine_fn nvme_co_io_plug(BlockDriverState *bs) -{ - BDRVNVMeState *s = bs->opaque; - assert(!s->plugged); - s->plugged = true; -} - -static void coroutine_fn nvme_co_io_unplug(BlockDriverState *bs) -{ - BDRVNVMeState *s = bs->opaque; - assert(s->plugged); - s->plugged = false; - for (unsigned i = INDEX_IO(0); i < s->queue_count; i++) { - NVMeQueuePair *q = s->queues[i]; - qemu_mutex_lock(&q->lock); - nvme_kick(q); - nvme_process_completion(q); - qemu_mutex_unlock(&q->lock); - } -} - static bool nvme_register_buf(BlockDriverState *bs, void *host, size_t size, Error **errp) { @@ -XXX,XX +XXX,XX @@ static BlockDriver bdrv_nvme = { .bdrv_detach_aio_context = nvme_detach_aio_context, .bdrv_attach_aio_context = nvme_attach_aio_context, - .bdrv_co_io_plug = nvme_co_io_plug, - .bdrv_co_io_unplug = nvme_co_io_unplug, - .bdrv_register_buf = nvme_register_buf, .bdrv_unregister_buf = nvme_unregister_buf, }; -- 2.40.1
Stop using the .bdrv_co_io_plug() API because it is not multi-queue block layer friendly. Use the new blk_io_plug_call() API to batch I/O submission instead. Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com> --- block/blkio.c | 40 +++++++++++++++++++++------------------- 1 file changed, 21 insertions(+), 19 deletions(-) diff --git a/block/blkio.c b/block/blkio.c index XXXXXXX..XXXXXXX 100644 --- a/block/blkio.c +++ b/block/blkio.c @@ -XXX,XX +XXX,XX @@ static void blkio_detach_aio_context(BlockDriverState *bs) false, NULL, NULL, NULL, NULL, NULL); } -/* Call with s->blkio_lock held to submit I/O after enqueuing a new request */ -static void blkio_submit_io(BlockDriverState *bs) +/* + * Called by blk_io_unplug() or immediately if not plugged. Called without + * blkio_lock. + */ +static void blkio_unplug_fn(BlockDriverState *bs) { - if (qatomic_read(&bs->io_plugged) == 0) { - BDRVBlkioState *s = bs->opaque; + BDRVBlkioState *s = bs->opaque; + WITH_QEMU_LOCK_GUARD(&s->blkio_lock) { blkioq_do_io(s->blkioq, NULL, 0, 0, NULL); } } +/* + * Schedule I/O submission after enqueuing a new request. Called without + * blkio_lock. + */ +static void blkio_submit_io(BlockDriverState *bs) +{ + blk_io_plug_call(blkio_unplug_fn, bs); +} + static int coroutine_fn blkio_co_pdiscard(BlockDriverState *bs, int64_t offset, int64_t bytes) { @@ -XXX,XX +XXX,XX @@ blkio_co_pdiscard(BlockDriverState *bs, int64_t offset, int64_t bytes) WITH_QEMU_LOCK_GUARD(&s->blkio_lock) { blkioq_discard(s->blkioq, offset, bytes, &cod, 0); - blkio_submit_io(bs); } + blkio_submit_io(bs); qemu_coroutine_yield(); return cod.ret; } @@ -XXX,XX +XXX,XX @@ blkio_co_preadv(BlockDriverState *bs, int64_t offset, int64_t bytes, WITH_QEMU_LOCK_GUARD(&s->blkio_lock) { blkioq_readv(s->blkioq, offset, iov, iovcnt, &cod, 0); - blkio_submit_io(bs); } + blkio_submit_io(bs); qemu_coroutine_yield(); if (use_bounce_buffer) { @@ -XXX,XX +XXX,XX @@ static int coroutine_fn blkio_co_pwritev(BlockDriverState *bs, int64_t offset, WITH_QEMU_LOCK_GUARD(&s->blkio_lock) { blkioq_writev(s->blkioq, offset, iov, iovcnt, &cod, blkio_flags); - blkio_submit_io(bs); } + blkio_submit_io(bs); qemu_coroutine_yield(); if (use_bounce_buffer) { @@ -XXX,XX +XXX,XX @@ static int coroutine_fn blkio_co_flush(BlockDriverState *bs) WITH_QEMU_LOCK_GUARD(&s->blkio_lock) { blkioq_flush(s->blkioq, &cod, 0); - blkio_submit_io(bs); } + blkio_submit_io(bs); qemu_coroutine_yield(); return cod.ret; } @@ -XXX,XX +XXX,XX @@ static int coroutine_fn blkio_co_pwrite_zeroes(BlockDriverState *bs, WITH_QEMU_LOCK_GUARD(&s->blkio_lock) { blkioq_write_zeroes(s->blkioq, offset, bytes, &cod, blkio_flags); - blkio_submit_io(bs); } + blkio_submit_io(bs); qemu_coroutine_yield(); return cod.ret; } -static void coroutine_fn blkio_co_io_unplug(BlockDriverState *bs) -{ - BDRVBlkioState *s = bs->opaque; - - WITH_QEMU_LOCK_GUARD(&s->blkio_lock) { - blkio_submit_io(bs); - } -} - typedef enum { BMRR_OK, BMRR_SKIP, @@ -XXX,XX +XXX,XX @@ static void blkio_refresh_limits(BlockDriverState *bs, Error **errp) .bdrv_co_pwritev = blkio_co_pwritev, \ .bdrv_co_flush_to_disk = blkio_co_flush, \ .bdrv_co_pwrite_zeroes = blkio_co_pwrite_zeroes, \ - .bdrv_co_io_unplug = blkio_co_io_unplug, \ .bdrv_refresh_limits = blkio_refresh_limits, \ .bdrv_register_buf = blkio_register_buf, \ .bdrv_unregister_buf = blkio_unregister_buf, \ -- 2.40.1
Stop using the .bdrv_co_io_plug() API because it is not multi-queue block layer friendly. Use the new blk_io_plug_call() API to batch I/O submission instead. Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com> --- include/block/raw-aio.h | 7 ------- block/file-posix.c | 10 --------- block/io_uring.c | 45 ++++++++++++++++------------------------- block/trace-events | 5 ++--- 4 files changed, 19 insertions(+), 48 deletions(-) diff --git a/include/block/raw-aio.h b/include/block/raw-aio.h index XXXXXXX..XXXXXXX 100644 --- a/include/block/raw-aio.h +++ b/include/block/raw-aio.h @@ -XXX,XX +XXX,XX @@ int coroutine_fn luring_co_submit(BlockDriverState *bs, int fd, uint64_t offset, QEMUIOVector *qiov, int type); void luring_detach_aio_context(LuringState *s, AioContext *old_context); void luring_attach_aio_context(LuringState *s, AioContext *new_context); - -/* - * luring_io_plug/unplug work in the thread's current AioContext, therefore the - * caller must ensure that they are paired in the same IOThread. - */ -void luring_io_plug(void); -void luring_io_unplug(void); #endif #ifdef _WIN32 diff --git a/block/file-posix.c b/block/file-posix.c index XXXXXXX..XXXXXXX 100644 --- a/block/file-posix.c +++ b/block/file-posix.c @@ -XXX,XX +XXX,XX @@ static void coroutine_fn raw_co_io_plug(BlockDriverState *bs) laio_io_plug(); } #endif -#ifdef CONFIG_LINUX_IO_URING - if (s->use_linux_io_uring) { - luring_io_plug(); - } -#endif } static void coroutine_fn raw_co_io_unplug(BlockDriverState *bs) @@ -XXX,XX +XXX,XX @@ static void coroutine_fn raw_co_io_unplug(BlockDriverState *bs) laio_io_unplug(s->aio_max_batch); } #endif -#ifdef CONFIG_LINUX_IO_URING - if (s->use_linux_io_uring) { - luring_io_unplug(); - } -#endif } static int coroutine_fn raw_co_flush_to_disk(BlockDriverState *bs) diff --git a/block/io_uring.c b/block/io_uring.c index XXXXXXX..XXXXXXX 100644 --- a/block/io_uring.c +++ b/block/io_uring.c @@ -XXX,XX +XXX,XX @@ #include "block/raw-aio.h" #include "qemu/coroutine.h" #include "qapi/error.h" +#include "sysemu/block-backend.h" #include "trace.h" /* Only used for assertions. */ @@ -XXX,XX +XXX,XX @@ typedef struct LuringAIOCB { } LuringAIOCB; typedef struct LuringQueue { - int plugged; unsigned int in_queue; unsigned int in_flight; bool blocked; @@ -XXX,XX +XXX,XX @@ static void luring_process_completions_and_submit(LuringState *s) { luring_process_completions(s); - if (!s->io_q.plugged && s->io_q.in_queue > 0) { + if (s->io_q.in_queue > 0) { ioq_submit(s); } } @@ -XXX,XX +XXX,XX @@ static void qemu_luring_poll_ready(void *opaque) static void ioq_init(LuringQueue *io_q) { QSIMPLEQ_INIT(&io_q->submit_queue); - io_q->plugged = 0; io_q->in_queue = 0; io_q->in_flight = 0; io_q->blocked = false; } -void luring_io_plug(void) +static void luring_unplug_fn(void *opaque) { - AioContext *ctx = qemu_get_current_aio_context(); - LuringState *s = aio_get_linux_io_uring(ctx); - trace_luring_io_plug(s); - s->io_q.plugged++; -} - -void luring_io_unplug(void) -{ - AioContext *ctx = qemu_get_current_aio_context(); - LuringState *s = aio_get_linux_io_uring(ctx); - assert(s->io_q.plugged); - trace_luring_io_unplug(s, s->io_q.blocked, s->io_q.plugged, - s->io_q.in_queue, s->io_q.in_flight); - if (--s->io_q.plugged == 0 && - !s->io_q.blocked && s->io_q.in_queue > 0) { + LuringState *s = opaque; + trace_luring_unplug_fn(s, s->io_q.blocked, s->io_q.in_queue, + s->io_q.in_flight); + if (!s->io_q.blocked && s->io_q.in_queue > 0) { ioq_submit(s); } } @@ -XXX,XX +XXX,XX @@ void luring_io_unplug(void) * @type: type of request * * Fetches sqes from ring, adds to pending queue and preps them - * */ static int luring_do_submit(int fd, LuringAIOCB *luringcb, LuringState *s, uint64_t offset, int type) @@ -XXX,XX +XXX,XX @@ static int luring_do_submit(int fd, LuringAIOCB *luringcb, LuringState *s, QSIMPLEQ_INSERT_TAIL(&s->io_q.submit_queue, luringcb, next); s->io_q.in_queue++; - trace_luring_do_submit(s, s->io_q.blocked, s->io_q.plugged, - s->io_q.in_queue, s->io_q.in_flight); - if (!s->io_q.blocked && - (!s->io_q.plugged || - s->io_q.in_flight + s->io_q.in_queue >= MAX_ENTRIES)) { - ret = ioq_submit(s); - trace_luring_do_submit_done(s, ret); - return ret; + trace_luring_do_submit(s, s->io_q.blocked, s->io_q.in_queue, + s->io_q.in_flight); + if (!s->io_q.blocked) { + if (s->io_q.in_flight + s->io_q.in_queue >= MAX_ENTRIES) { + ret = ioq_submit(s); + trace_luring_do_submit_done(s, ret); + return ret; + } + + blk_io_plug_call(luring_unplug_fn, s); } return 0; } diff --git a/block/trace-events b/block/trace-events index XXXXXXX..XXXXXXX 100644 --- a/block/trace-events +++ b/block/trace-events @@ -XXX,XX +XXX,XX @@ file_paio_submit(void *acb, void *opaque, int64_t offset, int count, int type) " # io_uring.c luring_init_state(void *s, size_t size) "s %p size %zu" luring_cleanup_state(void *s) "%p freed" -luring_io_plug(void *s) "LuringState %p plug" -luring_io_unplug(void *s, int blocked, int plugged, int queued, int inflight) "LuringState %p blocked %d plugged %d queued %d inflight %d" -luring_do_submit(void *s, int blocked, int plugged, int queued, int inflight) "LuringState %p blocked %d plugged %d queued %d inflight %d" +luring_unplug_fn(void *s, int blocked, int queued, int inflight) "LuringState %p blocked %d queued %d inflight %d" +luring_do_submit(void *s, int blocked, int queued, int inflight) "LuringState %p blocked %d queued %d inflight %d" luring_do_submit_done(void *s, int ret) "LuringState %p submitted to kernel %d" luring_co_submit(void *bs, void *s, void *luringcb, int fd, uint64_t offset, size_t nbytes, int type) "bs %p s %p luringcb %p fd %d offset %" PRId64 " nbytes %zd type %d" luring_process_completion(void *s, void *aiocb, int ret) "LuringState %p luringcb %p ret %d" -- 2.40.1
Stop using the .bdrv_co_io_plug() API because it is not multi-queue block layer friendly. Use the new blk_io_plug_call() API to batch I/O submission instead. Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com> --- include/block/raw-aio.h | 7 ------- block/file-posix.c | 28 ---------------------------- block/linux-aio.c | 41 +++++++++++------------------------------ 3 files changed, 11 insertions(+), 65 deletions(-) diff --git a/include/block/raw-aio.h b/include/block/raw-aio.h index XXXXXXX..XXXXXXX 100644 --- a/include/block/raw-aio.h +++ b/include/block/raw-aio.h @@ -XXX,XX +XXX,XX @@ int coroutine_fn laio_co_submit(int fd, uint64_t offset, QEMUIOVector *qiov, void laio_detach_aio_context(LinuxAioState *s, AioContext *old_context); void laio_attach_aio_context(LinuxAioState *s, AioContext *new_context); - -/* - * laio_io_plug/unplug work in the thread's current AioContext, therefore the - * caller must ensure that they are paired in the same IOThread. - */ -void laio_io_plug(void); -void laio_io_unplug(uint64_t dev_max_batch); #endif /* io_uring.c - Linux io_uring implementation */ #ifdef CONFIG_LINUX_IO_URING diff --git a/block/file-posix.c b/block/file-posix.c index XXXXXXX..XXXXXXX 100644 --- a/block/file-posix.c +++ b/block/file-posix.c @@ -XXX,XX +XXX,XX @@ static int coroutine_fn raw_co_pwritev(BlockDriverState *bs, int64_t offset, return raw_co_prw(bs, offset, bytes, qiov, QEMU_AIO_WRITE); } -static void coroutine_fn raw_co_io_plug(BlockDriverState *bs) -{ - BDRVRawState __attribute__((unused)) *s = bs->opaque; -#ifdef CONFIG_LINUX_AIO - if (s->use_linux_aio) { - laio_io_plug(); - } -#endif -} - -static void coroutine_fn raw_co_io_unplug(BlockDriverState *bs) -{ - BDRVRawState __attribute__((unused)) *s = bs->opaque; -#ifdef CONFIG_LINUX_AIO - if (s->use_linux_aio) { - laio_io_unplug(s->aio_max_batch); - } -#endif -} - static int coroutine_fn raw_co_flush_to_disk(BlockDriverState *bs) { BDRVRawState *s = bs->opaque; @@ -XXX,XX +XXX,XX @@ BlockDriver bdrv_file = { .bdrv_co_copy_range_from = raw_co_copy_range_from, .bdrv_co_copy_range_to = raw_co_copy_range_to, .bdrv_refresh_limits = raw_refresh_limits, - .bdrv_co_io_plug = raw_co_io_plug, - .bdrv_co_io_unplug = raw_co_io_unplug, .bdrv_attach_aio_context = raw_aio_attach_aio_context, .bdrv_co_truncate = raw_co_truncate, @@ -XXX,XX +XXX,XX @@ static BlockDriver bdrv_host_device = { .bdrv_co_copy_range_from = raw_co_copy_range_from, .bdrv_co_copy_range_to = raw_co_copy_range_to, .bdrv_refresh_limits = raw_refresh_limits, - .bdrv_co_io_plug = raw_co_io_plug, - .bdrv_co_io_unplug = raw_co_io_unplug, .bdrv_attach_aio_context = raw_aio_attach_aio_context, .bdrv_co_truncate = raw_co_truncate, @@ -XXX,XX +XXX,XX @@ static BlockDriver bdrv_host_cdrom = { .bdrv_co_pwritev = raw_co_pwritev, .bdrv_co_flush_to_disk = raw_co_flush_to_disk, .bdrv_refresh_limits = cdrom_refresh_limits, - .bdrv_co_io_plug = raw_co_io_plug, - .bdrv_co_io_unplug = raw_co_io_unplug, .bdrv_attach_aio_context = raw_aio_attach_aio_context, .bdrv_co_truncate = raw_co_truncate, @@ -XXX,XX +XXX,XX @@ static BlockDriver bdrv_host_cdrom = { .bdrv_co_pwritev = raw_co_pwritev, .bdrv_co_flush_to_disk = raw_co_flush_to_disk, .bdrv_refresh_limits = cdrom_refresh_limits, - .bdrv_co_io_plug = raw_co_io_plug, - .bdrv_co_io_unplug = raw_co_io_unplug, .bdrv_attach_aio_context = raw_aio_attach_aio_context, .bdrv_co_truncate = raw_co_truncate, diff --git a/block/linux-aio.c b/block/linux-aio.c index XXXXXXX..XXXXXXX 100644 --- a/block/linux-aio.c +++ b/block/linux-aio.c @@ -XXX,XX +XXX,XX @@ #include "qemu/event_notifier.h" #include "qemu/coroutine.h" #include "qapi/error.h" +#include "sysemu/block-backend.h" /* Only used for assertions. */ #include "qemu/coroutine_int.h" @@ -XXX,XX +XXX,XX @@ struct qemu_laiocb { }; typedef struct { - int plugged; unsigned int in_queue; unsigned int in_flight; bool blocked; @@ -XXX,XX +XXX,XX @@ static void qemu_laio_process_completions_and_submit(LinuxAioState *s) { qemu_laio_process_completions(s); - if (!s->io_q.plugged && !QSIMPLEQ_EMPTY(&s->io_q.pending)) { + if (!QSIMPLEQ_EMPTY(&s->io_q.pending)) { ioq_submit(s); } } @@ -XXX,XX +XXX,XX @@ static void qemu_laio_poll_ready(EventNotifier *opaque) static void ioq_init(LaioQueue *io_q) { QSIMPLEQ_INIT(&io_q->pending); - io_q->plugged = 0; io_q->in_queue = 0; io_q->in_flight = 0; io_q->blocked = false; @@ -XXX,XX +XXX,XX @@ static uint64_t laio_max_batch(LinuxAioState *s, uint64_t dev_max_batch) return max_batch; } -void laio_io_plug(void) +static void laio_unplug_fn(void *opaque) { - AioContext *ctx = qemu_get_current_aio_context(); - LinuxAioState *s = aio_get_linux_aio(ctx); + LinuxAioState *s = opaque; - s->io_q.plugged++; -} - -void laio_io_unplug(uint64_t dev_max_batch) -{ - AioContext *ctx = qemu_get_current_aio_context(); - LinuxAioState *s = aio_get_linux_aio(ctx); - - assert(s->io_q.plugged); - s->io_q.plugged--; - - /* - * Why max batch checking is performed here: - * Another BDS may have queued requests with a higher dev_max_batch and - * therefore in_queue could now exceed our dev_max_batch. Re-check the max - * batch so we can honor our device's dev_max_batch. - */ - if (s->io_q.in_queue >= laio_max_batch(s, dev_max_batch) || - (!s->io_q.plugged && - !s->io_q.blocked && !QSIMPLEQ_EMPTY(&s->io_q.pending))) { + if (!s->io_q.blocked && !QSIMPLEQ_EMPTY(&s->io_q.pending)) { ioq_submit(s); } } @@ -XXX,XX +XXX,XX @@ static int laio_do_submit(int fd, struct qemu_laiocb *laiocb, off_t offset, QSIMPLEQ_INSERT_TAIL(&s->io_q.pending, laiocb, next); s->io_q.in_queue++; - if (!s->io_q.blocked && - (!s->io_q.plugged || - s->io_q.in_queue >= laio_max_batch(s, dev_max_batch))) { - ioq_submit(s); + if (!s->io_q.blocked) { + if (s->io_q.in_queue >= laio_max_batch(s, dev_max_batch)) { + ioq_submit(s); + } else { + blk_io_plug_call(laio_unplug_fn, s); + } } return 0; -- 2.40.1
No block driver implements .bdrv_co_io_plug() anymore. Get rid of the function pointers. Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com> --- include/block/block-io.h | 3 --- include/block/block_int-common.h | 11 ---------- block/io.c | 37 -------------------------------- 3 files changed, 51 deletions(-) diff --git a/include/block/block-io.h b/include/block/block-io.h index XXXXXXX..XXXXXXX 100644 --- a/include/block/block-io.h +++ b/include/block/block-io.h @@ -XXX,XX +XXX,XX @@ void coroutine_fn bdrv_co_leave(BlockDriverState *bs, AioContext *old_ctx); AioContext *child_of_bds_get_parent_aio_context(BdrvChild *c); -void coroutine_fn GRAPH_RDLOCK bdrv_co_io_plug(BlockDriverState *bs); -void coroutine_fn GRAPH_RDLOCK bdrv_co_io_unplug(BlockDriverState *bs); - bool coroutine_fn GRAPH_RDLOCK bdrv_co_can_store_new_dirty_bitmap(BlockDriverState *bs, const char *name, uint32_t granularity, Error **errp); diff --git a/include/block/block_int-common.h b/include/block/block_int-common.h index XXXXXXX..XXXXXXX 100644 --- a/include/block/block_int-common.h +++ b/include/block/block_int-common.h @@ -XXX,XX +XXX,XX @@ struct BlockDriver { void coroutine_fn GRAPH_RDLOCK_PTR (*bdrv_co_debug_event)( BlockDriverState *bs, BlkdebugEvent event); - /* io queue for linux-aio */ - void coroutine_fn GRAPH_RDLOCK_PTR (*bdrv_co_io_plug)(BlockDriverState *bs); - void coroutine_fn GRAPH_RDLOCK_PTR (*bdrv_co_io_unplug)( - BlockDriverState *bs); - /** * bdrv_drain_begin is called if implemented in the beginning of a * drain operation to drain and stop any internal sources of requests in @@ -XXX,XX +XXX,XX @@ struct BlockDriverState { unsigned int in_flight; unsigned int serialising_in_flight; - /* - * counter for nested bdrv_io_plug. - * Accessed with atomic ops. - */ - unsigned io_plugged; - /* do we need to tell the quest if we have a volatile write cache? */ int enable_write_cache; diff --git a/block/io.c b/block/io.c index XXXXXXX..XXXXXXX 100644 --- a/block/io.c +++ b/block/io.c @@ -XXX,XX +XXX,XX @@ void *qemu_try_blockalign0(BlockDriverState *bs, size_t size) return mem; } -void coroutine_fn bdrv_co_io_plug(BlockDriverState *bs) -{ - BdrvChild *child; - IO_CODE(); - assert_bdrv_graph_readable(); - - QLIST_FOREACH(child, &bs->children, next) { - bdrv_co_io_plug(child->bs); - } - - if (qatomic_fetch_inc(&bs->io_plugged) == 0) { - BlockDriver *drv = bs->drv; - if (drv && drv->bdrv_co_io_plug) { - drv->bdrv_co_io_plug(bs); - } - } -} - -void coroutine_fn bdrv_co_io_unplug(BlockDriverState *bs) -{ - BdrvChild *child; - IO_CODE(); - assert_bdrv_graph_readable(); - - assert(bs->io_plugged); - if (qatomic_fetch_dec(&bs->io_plugged) == 1) { - BlockDriver *drv = bs->drv; - if (drv && drv->bdrv_co_io_unplug) { - drv->bdrv_co_io_unplug(bs); - } - } - - QLIST_FOREACH(child, &bs->children, next) { - bdrv_co_io_unplug(child->bs); - } -} - /* Helper that undoes bdrv_register_buf() when it fails partway through */ static void GRAPH_RDLOCK bdrv_register_buf_rollback(BlockDriverState *bs, void *host, size_t size, -- 2.40.1
v2 - Patch 1: "is not be freed" -> "is not freed" [Eric] - Patch 2: Remove unused nvme_process_completion_queue_plugged trace event [Stefano] - Patch 3: Add missing #include and fix blkio_unplug_fn() prototype [Stefano] - Patch 4: Removed whitespace hunk [Eric] The existing blk_io_plug() API is not block layer multi-queue friendly because the plug state is per-BlockDriverState. Change blk_io_plug()'s implementation so it is thread-local. This is done by introducing the blk_io_plug_call() function that block drivers use to batch calls while plugged. It is relatively easy to convert block drivers from .bdrv_co_io_plug() to blk_io_plug_call(). Random read 4KB performance with virtio-blk on a host NVMe block device: iodepth iops change vs today 1 45612 -4% 2 87967 +2% 4 129872 +0% 8 171096 -3% 16 194508 -4% 32 208947 -1% 64 217647 +0% 128 229629 +0% The results are within the noise for these benchmarks. This is to be expected because the plugging behavior for a single thread hasn't changed in this patch series, only that the state is thread-local now. The following graph compares several approaches: https://vmsplice.net/~stefan/blk_io_plug-thread-local.png - v7.2.0: before most of the multi-queue block layer changes landed. - with-blk_io_plug: today's post-8.0.0 QEMU. - blk_io_plug-thread-local: this patch series. - no-blk_io_plug: what happens when we simply remove plugging? - call-after-dispatch: what if we integrate plugging into the event loop? I decided against this approach in the end because it's more likely to introduce performance regressions since I/O submission is deferred until the end of the event loop iteration. Aside from the no-blk_io_plug case, which bottlenecks much earlier than the others, we see that all plugging approaches are more or less equivalent in this benchmark. It is also clear that QEMU 8.0.0 has lower performance than 7.2.0. The Ansible playbook, fio results, and a Jupyter notebook are available here: https://github.com/stefanha/qemu-perf/tree/remove-blk_io_plug Stefan Hajnoczi (6): block: add blk_io_plug_call() API block/nvme: convert to blk_io_plug_call() API block/blkio: convert to blk_io_plug_call() API block/io_uring: convert to blk_io_plug_call() API block/linux-aio: convert to blk_io_plug_call() API block: remove bdrv_co_io_plug() API MAINTAINERS | 1 + include/block/block-io.h | 3 - include/block/block_int-common.h | 11 --- include/block/raw-aio.h | 14 --- include/sysemu/block-backend-io.h | 13 +-- block/blkio.c | 43 ++++---- block/block-backend.c | 22 ----- block/file-posix.c | 38 ------- block/io.c | 37 ------- block/io_uring.c | 44 ++++----- block/linux-aio.c | 41 +++----- block/nvme.c | 44 +++------ block/plug.c | 159 ++++++++++++++++++++++++++++++ hw/block/dataplane/xen-block.c | 8 +- hw/block/virtio-blk.c | 4 +- hw/scsi/virtio-scsi.c | 6 +- block/meson.build | 1 + block/trace-events | 6 +- 18 files changed, 239 insertions(+), 256 deletions(-) create mode 100644 block/plug.c -- 2.40.1
Introduce a new API for thread-local blk_io_plug() that does not traverse the block graph. The goal is to make blk_io_plug() multi-queue friendly. Instead of having block drivers track whether or not we're in a plugged section, provide an API that allows them to defer a function call until we're unplugged: blk_io_plug_call(fn, opaque). If blk_io_plug_call() is called multiple times with the same fn/opaque pair, then fn() is only called once at the end of the function - resulting in batching. This patch introduces the API and changes blk_io_plug()/blk_io_unplug(). blk_io_plug()/blk_io_unplug() no longer require a BlockBackend argument because the plug state is now thread-local. Later patches convert block drivers to blk_io_plug_call() and then we can finally remove .bdrv_co_io_plug() once all block drivers have been converted. Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com> Reviewed-by: Eric Blake <eblake@redhat.com> --- v2 - "is not be freed" -> "is not freed" [Eric] --- MAINTAINERS | 1 + include/sysemu/block-backend-io.h | 13 +-- block/block-backend.c | 22 ----- block/plug.c | 159 ++++++++++++++++++++++++++++++ hw/block/dataplane/xen-block.c | 8 +- hw/block/virtio-blk.c | 4 +- hw/scsi/virtio-scsi.c | 6 +- block/meson.build | 1 + 8 files changed, 173 insertions(+), 41 deletions(-) create mode 100644 block/plug.c diff --git a/MAINTAINERS b/MAINTAINERS index XXXXXXX..XXXXXXX 100644 --- a/MAINTAINERS +++ b/MAINTAINERS @@ -XXX,XX +XXX,XX @@ F: util/aio-*.c F: util/aio-*.h F: util/fdmon-*.c F: block/io.c +F: block/plug.c F: migration/block* F: include/block/aio.h F: include/block/aio-wait.h diff --git a/include/sysemu/block-backend-io.h b/include/sysemu/block-backend-io.h index XXXXXXX..XXXXXXX 100644 --- a/include/sysemu/block-backend-io.h +++ b/include/sysemu/block-backend-io.h @@ -XXX,XX +XXX,XX @@ void blk_iostatus_set_err(BlockBackend *blk, int error); int blk_get_max_iov(BlockBackend *blk); int blk_get_max_hw_iov(BlockBackend *blk); -/* - * blk_io_plug/unplug are thread-local operations. This means that multiple - * IOThreads can simultaneously call plug/unplug, but the caller must ensure - * that each unplug() is called in the same IOThread of the matching plug(). - */ -void coroutine_fn blk_co_io_plug(BlockBackend *blk); -void co_wrapper blk_io_plug(BlockBackend *blk); - -void coroutine_fn blk_co_io_unplug(BlockBackend *blk); -void co_wrapper blk_io_unplug(BlockBackend *blk); +void blk_io_plug(void); +void blk_io_unplug(void); +void blk_io_plug_call(void (*fn)(void *), void *opaque); AioContext *blk_get_aio_context(BlockBackend *blk); BlockAcctStats *blk_get_stats(BlockBackend *blk); diff --git a/block/block-backend.c b/block/block-backend.c index XXXXXXX..XXXXXXX 100644 --- a/block/block-backend.c +++ b/block/block-backend.c @@ -XXX,XX +XXX,XX @@ void blk_add_insert_bs_notifier(BlockBackend *blk, Notifier *notify) notifier_list_add(&blk->insert_bs_notifiers, notify); } -void coroutine_fn blk_co_io_plug(BlockBackend *blk) -{ - BlockDriverState *bs = blk_bs(blk); - IO_CODE(); - GRAPH_RDLOCK_GUARD(); - - if (bs) { - bdrv_co_io_plug(bs); - } -} - -void coroutine_fn blk_co_io_unplug(BlockBackend *blk) -{ - BlockDriverState *bs = blk_bs(blk); - IO_CODE(); - GRAPH_RDLOCK_GUARD(); - - if (bs) { - bdrv_co_io_unplug(bs); - } -} - BlockAcctStats *blk_get_stats(BlockBackend *blk) { IO_CODE(); diff --git a/block/plug.c b/block/plug.c new file mode 100644 index XXXXXXX..XXXXXXX --- /dev/null +++ b/block/plug.c @@ -XXX,XX +XXX,XX @@ +/* SPDX-License-Identifier: GPL-2.0-or-later */ +/* + * Block I/O plugging + * + * Copyright Red Hat. + * + * This API defers a function call within a blk_io_plug()/blk_io_unplug() + * section, allowing multiple calls to batch up. This is a performance + * optimization that is used in the block layer to submit several I/O requests + * at once instead of individually: + * + * blk_io_plug(); <-- start of plugged region + * ... + * blk_io_plug_call(my_func, my_obj); <-- deferred my_func(my_obj) call + * blk_io_plug_call(my_func, my_obj); <-- another + * blk_io_plug_call(my_func, my_obj); <-- another + * ... + * blk_io_unplug(); <-- end of plugged region, my_func(my_obj) is called once + * + * This code is actually generic and not tied to the block layer. If another + * subsystem needs this functionality, it could be renamed. + */ + +#include "qemu/osdep.h" +#include "qemu/coroutine-tls.h" +#include "qemu/notify.h" +#include "qemu/thread.h" +#include "sysemu/block-backend.h" + +/* A function call that has been deferred until unplug() */ +typedef struct { + void (*fn)(void *); + void *opaque; +} UnplugFn; + +/* Per-thread state */ +typedef struct { + unsigned count; /* how many times has plug() been called? */ + GArray *unplug_fns; /* functions to call at unplug time */ +} Plug; + +/* Use get_ptr_plug() to fetch this thread-local value */ +QEMU_DEFINE_STATIC_CO_TLS(Plug, plug); + +/* Called at thread cleanup time */ +static void blk_io_plug_atexit(Notifier *n, void *value) +{ + Plug *plug = get_ptr_plug(); + g_array_free(plug->unplug_fns, TRUE); +} + +/* This won't involve coroutines, so use __thread */ +static __thread Notifier blk_io_plug_atexit_notifier; + +/** + * blk_io_plug_call: + * @fn: a function pointer to be invoked + * @opaque: a user-defined argument to @fn() + * + * Call @fn(@opaque) immediately if not within a blk_io_plug()/blk_io_unplug() + * section. + * + * Otherwise defer the call until the end of the outermost + * blk_io_plug()/blk_io_unplug() section in this thread. If the same + * @fn/@opaque pair has already been deferred, it will only be called once upon + * blk_io_unplug() so that accumulated calls are batched into a single call. + * + * The caller must ensure that @opaque is not freed before @fn() is invoked. + */ +void blk_io_plug_call(void (*fn)(void *), void *opaque) +{ + Plug *plug = get_ptr_plug(); + + /* Call immediately if we're not plugged */ + if (plug->count == 0) { + fn(opaque); + return; + } + + GArray *array = plug->unplug_fns; + if (!array) { + array = g_array_new(FALSE, FALSE, sizeof(UnplugFn)); + plug->unplug_fns = array; + blk_io_plug_atexit_notifier.notify = blk_io_plug_atexit; + qemu_thread_atexit_add(&blk_io_plug_atexit_notifier); + } + + UnplugFn *fns = (UnplugFn *)array->data; + UnplugFn new_fn = { + .fn = fn, + .opaque = opaque, + }; + + /* + * There won't be many, so do a linear search. If this becomes a bottleneck + * then a binary search (glib 2.62+) or different data structure could be + * used. + */ + for (guint i = 0; i < array->len; i++) { + if (memcmp(&fns[i], &new_fn, sizeof(new_fn)) == 0) { + return; /* already exists */ + } + } + + g_array_append_val(array, new_fn); +} + +/** + * blk_io_plug: Defer blk_io_plug_call() functions until blk_io_unplug() + * + * blk_io_plug/unplug are thread-local operations. This means that multiple + * threads can simultaneously call plug/unplug, but the caller must ensure that + * each unplug() is called in the same thread of the matching plug(). + * + * Nesting is supported. blk_io_plug_call() functions are only called at the + * outermost blk_io_unplug(). + */ +void blk_io_plug(void) +{ + Plug *plug = get_ptr_plug(); + + assert(plug->count < UINT32_MAX); + + plug->count++; +} + +/** + * blk_io_unplug: Run any pending blk_io_plug_call() functions + * + * There must have been a matching blk_io_plug() call in the same thread prior + * to this blk_io_unplug() call. + */ +void blk_io_unplug(void) +{ + Plug *plug = get_ptr_plug(); + + assert(plug->count > 0); + + if (--plug->count > 0) { + return; + } + + GArray *array = plug->unplug_fns; + if (!array) { + return; + } + + UnplugFn *fns = (UnplugFn *)array->data; + + for (guint i = 0; i < array->len; i++) { + fns[i].fn(fns[i].opaque); + } + + /* + * This resets the array without freeing memory so that appending is cheap + * in the future. + */ + g_array_set_size(array, 0); +} diff --git a/hw/block/dataplane/xen-block.c b/hw/block/dataplane/xen-block.c index XXXXXXX..XXXXXXX 100644 --- a/hw/block/dataplane/xen-block.c +++ b/hw/block/dataplane/xen-block.c @@ -XXX,XX +XXX,XX @@ static bool xen_block_handle_requests(XenBlockDataPlane *dataplane) * is below us. */ if (inflight_atstart > IO_PLUG_THRESHOLD) { - blk_io_plug(dataplane->blk); + blk_io_plug(); } while (rc != rp) { /* pull request from ring */ @@ -XXX,XX +XXX,XX @@ static bool xen_block_handle_requests(XenBlockDataPlane *dataplane) if (inflight_atstart > IO_PLUG_THRESHOLD && batched >= inflight_atstart) { - blk_io_unplug(dataplane->blk); + blk_io_unplug(); } xen_block_do_aio(request); if (inflight_atstart > IO_PLUG_THRESHOLD) { if (batched >= inflight_atstart) { - blk_io_plug(dataplane->blk); + blk_io_plug(); batched = 0; } else { batched++; @@ -XXX,XX +XXX,XX @@ static bool xen_block_handle_requests(XenBlockDataPlane *dataplane) } } if (inflight_atstart > IO_PLUG_THRESHOLD) { - blk_io_unplug(dataplane->blk); + blk_io_unplug(); } return done_something; diff --git a/hw/block/virtio-blk.c b/hw/block/virtio-blk.c index XXXXXXX..XXXXXXX 100644 --- a/hw/block/virtio-blk.c +++ b/hw/block/virtio-blk.c @@ -XXX,XX +XXX,XX @@ void virtio_blk_handle_vq(VirtIOBlock *s, VirtQueue *vq) bool suppress_notifications = virtio_queue_get_notification(vq); aio_context_acquire(blk_get_aio_context(s->blk)); - blk_io_plug(s->blk); + blk_io_plug(); do { if (suppress_notifications) { @@ -XXX,XX +XXX,XX @@ void virtio_blk_handle_vq(VirtIOBlock *s, VirtQueue *vq) virtio_blk_submit_multireq(s, &mrb); } - blk_io_unplug(s->blk); + blk_io_unplug(); aio_context_release(blk_get_aio_context(s->blk)); } diff --git a/hw/scsi/virtio-scsi.c b/hw/scsi/virtio-scsi.c index XXXXXXX..XXXXXXX 100644 --- a/hw/scsi/virtio-scsi.c +++ b/hw/scsi/virtio-scsi.c @@ -XXX,XX +XXX,XX @@ static int virtio_scsi_handle_cmd_req_prepare(VirtIOSCSI *s, VirtIOSCSIReq *req) return -ENOBUFS; } scsi_req_ref(req->sreq); - blk_io_plug(d->conf.blk); + blk_io_plug(); object_unref(OBJECT(d)); return 0; } @@ -XXX,XX +XXX,XX @@ static void virtio_scsi_handle_cmd_req_submit(VirtIOSCSI *s, VirtIOSCSIReq *req) if (scsi_req_enqueue(sreq)) { scsi_req_continue(sreq); } - blk_io_unplug(sreq->dev->conf.blk); + blk_io_unplug(); scsi_req_unref(sreq); } @@ -XXX,XX +XXX,XX @@ static void virtio_scsi_handle_cmd_vq(VirtIOSCSI *s, VirtQueue *vq) while (!QTAILQ_EMPTY(&reqs)) { req = QTAILQ_FIRST(&reqs); QTAILQ_REMOVE(&reqs, req, next); - blk_io_unplug(req->sreq->dev->conf.blk); + blk_io_unplug(); scsi_req_unref(req->sreq); virtqueue_detach_element(req->vq, &req->elem, 0); virtio_scsi_free_req(req); diff --git a/block/meson.build b/block/meson.build index XXXXXXX..XXXXXXX 100644 --- a/block/meson.build +++ b/block/meson.build @@ -XXX,XX +XXX,XX @@ block_ss.add(files( 'mirror.c', 'nbd.c', 'null.c', + 'plug.c', 'qapi.c', 'qcow2-bitmap.c', 'qcow2-cache.c', -- 2.40.1
Stop using the .bdrv_co_io_plug() API because it is not multi-queue block layer friendly. Use the new blk_io_plug_call() API to batch I/O submission instead. Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com> Reviewed-by: Eric Blake <eblake@redhat.com> --- v2 - Remove unused nvme_process_completion_queue_plugged trace event [Stefano] --- block/nvme.c | 44 ++++++++++++-------------------------------- block/trace-events | 1 - 2 files changed, 12 insertions(+), 33 deletions(-) diff --git a/block/nvme.c b/block/nvme.c index XXXXXXX..XXXXXXX 100644 --- a/block/nvme.c +++ b/block/nvme.c @@ -XXX,XX +XXX,XX @@ #include "qemu/vfio-helpers.h" #include "block/block-io.h" #include "block/block_int.h" +#include "sysemu/block-backend.h" #include "sysemu/replay.h" #include "trace.h" @@ -XXX,XX +XXX,XX @@ struct BDRVNVMeState { int blkshift; uint64_t max_transfer; - bool plugged; bool supports_write_zeroes; bool supports_discard; @@ -XXX,XX +XXX,XX @@ static void nvme_kick(NVMeQueuePair *q) { BDRVNVMeState *s = q->s; - if (s->plugged || !q->need_kick) { + if (!q->need_kick) { return; } trace_nvme_kick(s, q->index); @@ -XXX,XX +XXX,XX @@ static bool nvme_process_completion(NVMeQueuePair *q) NvmeCqe *c; trace_nvme_process_completion(s, q->index, q->inflight); - if (s->plugged) { - trace_nvme_process_completion_queue_plugged(s, q->index); - return false; - } /* * Support re-entrancy when a request cb() function invokes aio_poll(). @@ -XXX,XX +XXX,XX @@ static void nvme_trace_command(const NvmeCmd *cmd) } } +static void nvme_unplug_fn(void *opaque) +{ + NVMeQueuePair *q = opaque; + + QEMU_LOCK_GUARD(&q->lock); + nvme_kick(q); + nvme_process_completion(q); +} + static void nvme_submit_command(NVMeQueuePair *q, NVMeRequest *req, NvmeCmd *cmd, BlockCompletionFunc cb, void *opaque) @@ -XXX,XX +XXX,XX @@ static void nvme_submit_command(NVMeQueuePair *q, NVMeRequest *req, q->sq.tail * NVME_SQ_ENTRY_BYTES, cmd, sizeof(*cmd)); q->sq.tail = (q->sq.tail + 1) % NVME_QUEUE_SIZE; q->need_kick++; - nvme_kick(q); - nvme_process_completion(q); + blk_io_plug_call(nvme_unplug_fn, q); qemu_mutex_unlock(&q->lock); } @@ -XXX,XX +XXX,XX @@ static void nvme_attach_aio_context(BlockDriverState *bs, } } -static void coroutine_fn nvme_co_io_plug(BlockDriverState *bs) -{ - BDRVNVMeState *s = bs->opaque; - assert(!s->plugged); - s->plugged = true; -} - -static void coroutine_fn nvme_co_io_unplug(BlockDriverState *bs) -{ - BDRVNVMeState *s = bs->opaque; - assert(s->plugged); - s->plugged = false; - for (unsigned i = INDEX_IO(0); i < s->queue_count; i++) { - NVMeQueuePair *q = s->queues[i]; - qemu_mutex_lock(&q->lock); - nvme_kick(q); - nvme_process_completion(q); - qemu_mutex_unlock(&q->lock); - } -} - static bool nvme_register_buf(BlockDriverState *bs, void *host, size_t size, Error **errp) { @@ -XXX,XX +XXX,XX @@ static BlockDriver bdrv_nvme = { .bdrv_detach_aio_context = nvme_detach_aio_context, .bdrv_attach_aio_context = nvme_attach_aio_context, - .bdrv_co_io_plug = nvme_co_io_plug, - .bdrv_co_io_unplug = nvme_co_io_unplug, - .bdrv_register_buf = nvme_register_buf, .bdrv_unregister_buf = nvme_unregister_buf, }; diff --git a/block/trace-events b/block/trace-events index XXXXXXX..XXXXXXX 100644 --- a/block/trace-events +++ b/block/trace-events @@ -XXX,XX +XXX,XX @@ nvme_kick(void *s, unsigned q_index) "s %p q #%u" nvme_dma_flush_queue_wait(void *s) "s %p" nvme_error(int cmd_specific, int sq_head, int sqid, int cid, int status) "cmd_specific %d sq_head %d sqid %d cid %d status 0x%x" nvme_process_completion(void *s, unsigned q_index, int inflight) "s %p q #%u inflight %d" -nvme_process_completion_queue_plugged(void *s, unsigned q_index) "s %p q #%u" nvme_complete_command(void *s, unsigned q_index, int cid) "s %p q #%u cid %d" nvme_submit_command(void *s, unsigned q_index, int cid) "s %p q #%u cid %d" nvme_submit_command_raw(int c0, int c1, int c2, int c3, int c4, int c5, int c6, int c7) "%02x %02x %02x %02x %02x %02x %02x %02x" -- 2.40.1
Stop using the .bdrv_co_io_plug() API because it is not multi-queue block layer friendly. Use the new blk_io_plug_call() API to batch I/O submission instead. Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com> Reviewed-by: Eric Blake <eblake@redhat.com> --- v2 - Add missing #include and fix blkio_unplug_fn() prototype [Stefano] --- block/blkio.c | 43 ++++++++++++++++++++++++------------------- 1 file changed, 24 insertions(+), 19 deletions(-) diff --git a/block/blkio.c b/block/blkio.c index XXXXXXX..XXXXXXX 100644 --- a/block/blkio.c +++ b/block/blkio.c @@ -XXX,XX +XXX,XX @@ #include "qemu/error-report.h" #include "qapi/qmp/qdict.h" #include "qemu/module.h" +#include "sysemu/block-backend.h" #include "exec/memory.h" /* for ram_block_discard_disable() */ #include "block/block-io.h" @@ -XXX,XX +XXX,XX @@ static void blkio_detach_aio_context(BlockDriverState *bs) false, NULL, NULL, NULL, NULL, NULL); } -/* Call with s->blkio_lock held to submit I/O after enqueuing a new request */ -static void blkio_submit_io(BlockDriverState *bs) +/* + * Called by blk_io_unplug() or immediately if not plugged. Called without + * blkio_lock. + */ +static void blkio_unplug_fn(void *opaque) { - if (qatomic_read(&bs->io_plugged) == 0) { - BDRVBlkioState *s = bs->opaque; + BDRVBlkioState *s = opaque; + WITH_QEMU_LOCK_GUARD(&s->blkio_lock) { blkioq_do_io(s->blkioq, NULL, 0, 0, NULL); } } +/* + * Schedule I/O submission after enqueuing a new request. Called without + * blkio_lock. + */ +static void blkio_submit_io(BlockDriverState *bs) +{ + BDRVBlkioState *s = bs->opaque; + + blk_io_plug_call(blkio_unplug_fn, s); +} + static int coroutine_fn blkio_co_pdiscard(BlockDriverState *bs, int64_t offset, int64_t bytes) { @@ -XXX,XX +XXX,XX @@ blkio_co_pdiscard(BlockDriverState *bs, int64_t offset, int64_t bytes) WITH_QEMU_LOCK_GUARD(&s->blkio_lock) { blkioq_discard(s->blkioq, offset, bytes, &cod, 0); - blkio_submit_io(bs); } + blkio_submit_io(bs); qemu_coroutine_yield(); return cod.ret; } @@ -XXX,XX +XXX,XX @@ blkio_co_preadv(BlockDriverState *bs, int64_t offset, int64_t bytes, WITH_QEMU_LOCK_GUARD(&s->blkio_lock) { blkioq_readv(s->blkioq, offset, iov, iovcnt, &cod, 0); - blkio_submit_io(bs); } + blkio_submit_io(bs); qemu_coroutine_yield(); if (use_bounce_buffer) { @@ -XXX,XX +XXX,XX @@ static int coroutine_fn blkio_co_pwritev(BlockDriverState *bs, int64_t offset, WITH_QEMU_LOCK_GUARD(&s->blkio_lock) { blkioq_writev(s->blkioq, offset, iov, iovcnt, &cod, blkio_flags); - blkio_submit_io(bs); } + blkio_submit_io(bs); qemu_coroutine_yield(); if (use_bounce_buffer) { @@ -XXX,XX +XXX,XX @@ static int coroutine_fn blkio_co_flush(BlockDriverState *bs) WITH_QEMU_LOCK_GUARD(&s->blkio_lock) { blkioq_flush(s->blkioq, &cod, 0); - blkio_submit_io(bs); } + blkio_submit_io(bs); qemu_coroutine_yield(); return cod.ret; } @@ -XXX,XX +XXX,XX @@ static int coroutine_fn blkio_co_pwrite_zeroes(BlockDriverState *bs, WITH_QEMU_LOCK_GUARD(&s->blkio_lock) { blkioq_write_zeroes(s->blkioq, offset, bytes, &cod, blkio_flags); - blkio_submit_io(bs); } + blkio_submit_io(bs); qemu_coroutine_yield(); return cod.ret; } -static void coroutine_fn blkio_co_io_unplug(BlockDriverState *bs) -{ - BDRVBlkioState *s = bs->opaque; - - WITH_QEMU_LOCK_GUARD(&s->blkio_lock) { - blkio_submit_io(bs); - } -} - typedef enum { BMRR_OK, BMRR_SKIP, @@ -XXX,XX +XXX,XX @@ static void blkio_refresh_limits(BlockDriverState *bs, Error **errp) .bdrv_co_pwritev = blkio_co_pwritev, \ .bdrv_co_flush_to_disk = blkio_co_flush, \ .bdrv_co_pwrite_zeroes = blkio_co_pwrite_zeroes, \ - .bdrv_co_io_unplug = blkio_co_io_unplug, \ .bdrv_refresh_limits = blkio_refresh_limits, \ .bdrv_register_buf = blkio_register_buf, \ .bdrv_unregister_buf = blkio_unregister_buf, \ -- 2.40.1
Stop using the .bdrv_co_io_plug() API because it is not multi-queue block layer friendly. Use the new blk_io_plug_call() API to batch I/O submission instead. Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com> Reviewed-by: Eric Blake <eblake@redhat.com> --- v2 - Removed whitespace hunk [Eric] --- include/block/raw-aio.h | 7 ------- block/file-posix.c | 10 ---------- block/io_uring.c | 44 ++++++++++++++++------------------------- block/trace-events | 5 ++--- 4 files changed, 19 insertions(+), 47 deletions(-) diff --git a/include/block/raw-aio.h b/include/block/raw-aio.h index XXXXXXX..XXXXXXX 100644 --- a/include/block/raw-aio.h +++ b/include/block/raw-aio.h @@ -XXX,XX +XXX,XX @@ int coroutine_fn luring_co_submit(BlockDriverState *bs, int fd, uint64_t offset, QEMUIOVector *qiov, int type); void luring_detach_aio_context(LuringState *s, AioContext *old_context); void luring_attach_aio_context(LuringState *s, AioContext *new_context); - -/* - * luring_io_plug/unplug work in the thread's current AioContext, therefore the - * caller must ensure that they are paired in the same IOThread. - */ -void luring_io_plug(void); -void luring_io_unplug(void); #endif #ifdef _WIN32 diff --git a/block/file-posix.c b/block/file-posix.c index XXXXXXX..XXXXXXX 100644 --- a/block/file-posix.c +++ b/block/file-posix.c @@ -XXX,XX +XXX,XX @@ static void coroutine_fn raw_co_io_plug(BlockDriverState *bs) laio_io_plug(); } #endif -#ifdef CONFIG_LINUX_IO_URING - if (s->use_linux_io_uring) { - luring_io_plug(); - } -#endif } static void coroutine_fn raw_co_io_unplug(BlockDriverState *bs) @@ -XXX,XX +XXX,XX @@ static void coroutine_fn raw_co_io_unplug(BlockDriverState *bs) laio_io_unplug(s->aio_max_batch); } #endif -#ifdef CONFIG_LINUX_IO_URING - if (s->use_linux_io_uring) { - luring_io_unplug(); - } -#endif } static int coroutine_fn raw_co_flush_to_disk(BlockDriverState *bs) diff --git a/block/io_uring.c b/block/io_uring.c index XXXXXXX..XXXXXXX 100644 --- a/block/io_uring.c +++ b/block/io_uring.c @@ -XXX,XX +XXX,XX @@ #include "block/raw-aio.h" #include "qemu/coroutine.h" #include "qapi/error.h" +#include "sysemu/block-backend.h" #include "trace.h" /* Only used for assertions. */ @@ -XXX,XX +XXX,XX @@ typedef struct LuringAIOCB { } LuringAIOCB; typedef struct LuringQueue { - int plugged; unsigned int in_queue; unsigned int in_flight; bool blocked; @@ -XXX,XX +XXX,XX @@ static void luring_process_completions_and_submit(LuringState *s) { luring_process_completions(s); - if (!s->io_q.plugged && s->io_q.in_queue > 0) { + if (s->io_q.in_queue > 0) { ioq_submit(s); } } @@ -XXX,XX +XXX,XX @@ static void qemu_luring_poll_ready(void *opaque) static void ioq_init(LuringQueue *io_q) { QSIMPLEQ_INIT(&io_q->submit_queue); - io_q->plugged = 0; io_q->in_queue = 0; io_q->in_flight = 0; io_q->blocked = false; } -void luring_io_plug(void) +static void luring_unplug_fn(void *opaque) { - AioContext *ctx = qemu_get_current_aio_context(); - LuringState *s = aio_get_linux_io_uring(ctx); - trace_luring_io_plug(s); - s->io_q.plugged++; -} - -void luring_io_unplug(void) -{ - AioContext *ctx = qemu_get_current_aio_context(); - LuringState *s = aio_get_linux_io_uring(ctx); - assert(s->io_q.plugged); - trace_luring_io_unplug(s, s->io_q.blocked, s->io_q.plugged, - s->io_q.in_queue, s->io_q.in_flight); - if (--s->io_q.plugged == 0 && - !s->io_q.blocked && s->io_q.in_queue > 0) { + LuringState *s = opaque; + trace_luring_unplug_fn(s, s->io_q.blocked, s->io_q.in_queue, + s->io_q.in_flight); + if (!s->io_q.blocked && s->io_q.in_queue > 0) { ioq_submit(s); } } @@ -XXX,XX +XXX,XX @@ static int luring_do_submit(int fd, LuringAIOCB *luringcb, LuringState *s, QSIMPLEQ_INSERT_TAIL(&s->io_q.submit_queue, luringcb, next); s->io_q.in_queue++; - trace_luring_do_submit(s, s->io_q.blocked, s->io_q.plugged, - s->io_q.in_queue, s->io_q.in_flight); - if (!s->io_q.blocked && - (!s->io_q.plugged || - s->io_q.in_flight + s->io_q.in_queue >= MAX_ENTRIES)) { - ret = ioq_submit(s); - trace_luring_do_submit_done(s, ret); - return ret; + trace_luring_do_submit(s, s->io_q.blocked, s->io_q.in_queue, + s->io_q.in_flight); + if (!s->io_q.blocked) { + if (s->io_q.in_flight + s->io_q.in_queue >= MAX_ENTRIES) { + ret = ioq_submit(s); + trace_luring_do_submit_done(s, ret); + return ret; + } + + blk_io_plug_call(luring_unplug_fn, s); } return 0; } diff --git a/block/trace-events b/block/trace-events index XXXXXXX..XXXXXXX 100644 --- a/block/trace-events +++ b/block/trace-events @@ -XXX,XX +XXX,XX @@ file_paio_submit(void *acb, void *opaque, int64_t offset, int count, int type) " # io_uring.c luring_init_state(void *s, size_t size) "s %p size %zu" luring_cleanup_state(void *s) "%p freed" -luring_io_plug(void *s) "LuringState %p plug" -luring_io_unplug(void *s, int blocked, int plugged, int queued, int inflight) "LuringState %p blocked %d plugged %d queued %d inflight %d" -luring_do_submit(void *s, int blocked, int plugged, int queued, int inflight) "LuringState %p blocked %d plugged %d queued %d inflight %d" +luring_unplug_fn(void *s, int blocked, int queued, int inflight) "LuringState %p blocked %d queued %d inflight %d" +luring_do_submit(void *s, int blocked, int queued, int inflight) "LuringState %p blocked %d queued %d inflight %d" luring_do_submit_done(void *s, int ret) "LuringState %p submitted to kernel %d" luring_co_submit(void *bs, void *s, void *luringcb, int fd, uint64_t offset, size_t nbytes, int type) "bs %p s %p luringcb %p fd %d offset %" PRId64 " nbytes %zd type %d" luring_process_completion(void *s, void *aiocb, int ret) "LuringState %p luringcb %p ret %d" -- 2.40.1
Stop using the .bdrv_co_io_plug() API because it is not multi-queue block layer friendly. Use the new blk_io_plug_call() API to batch I/O submission instead. Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com> Reviewed-by: Eric Blake <eblake@redhat.com> --- include/block/raw-aio.h | 7 ------- block/file-posix.c | 28 ---------------------------- block/linux-aio.c | 41 +++++++++++------------------------------ 3 files changed, 11 insertions(+), 65 deletions(-) diff --git a/include/block/raw-aio.h b/include/block/raw-aio.h index XXXXXXX..XXXXXXX 100644 --- a/include/block/raw-aio.h +++ b/include/block/raw-aio.h @@ -XXX,XX +XXX,XX @@ int coroutine_fn laio_co_submit(int fd, uint64_t offset, QEMUIOVector *qiov, void laio_detach_aio_context(LinuxAioState *s, AioContext *old_context); void laio_attach_aio_context(LinuxAioState *s, AioContext *new_context); - -/* - * laio_io_plug/unplug work in the thread's current AioContext, therefore the - * caller must ensure that they are paired in the same IOThread. - */ -void laio_io_plug(void); -void laio_io_unplug(uint64_t dev_max_batch); #endif /* io_uring.c - Linux io_uring implementation */ #ifdef CONFIG_LINUX_IO_URING diff --git a/block/file-posix.c b/block/file-posix.c index XXXXXXX..XXXXXXX 100644 --- a/block/file-posix.c +++ b/block/file-posix.c @@ -XXX,XX +XXX,XX @@ static int coroutine_fn raw_co_pwritev(BlockDriverState *bs, int64_t offset, return raw_co_prw(bs, offset, bytes, qiov, QEMU_AIO_WRITE); } -static void coroutine_fn raw_co_io_plug(BlockDriverState *bs) -{ - BDRVRawState __attribute__((unused)) *s = bs->opaque; -#ifdef CONFIG_LINUX_AIO - if (s->use_linux_aio) { - laio_io_plug(); - } -#endif -} - -static void coroutine_fn raw_co_io_unplug(BlockDriverState *bs) -{ - BDRVRawState __attribute__((unused)) *s = bs->opaque; -#ifdef CONFIG_LINUX_AIO - if (s->use_linux_aio) { - laio_io_unplug(s->aio_max_batch); - } -#endif -} - static int coroutine_fn raw_co_flush_to_disk(BlockDriverState *bs) { BDRVRawState *s = bs->opaque; @@ -XXX,XX +XXX,XX @@ BlockDriver bdrv_file = { .bdrv_co_copy_range_from = raw_co_copy_range_from, .bdrv_co_copy_range_to = raw_co_copy_range_to, .bdrv_refresh_limits = raw_refresh_limits, - .bdrv_co_io_plug = raw_co_io_plug, - .bdrv_co_io_unplug = raw_co_io_unplug, .bdrv_attach_aio_context = raw_aio_attach_aio_context, .bdrv_co_truncate = raw_co_truncate, @@ -XXX,XX +XXX,XX @@ static BlockDriver bdrv_host_device = { .bdrv_co_copy_range_from = raw_co_copy_range_from, .bdrv_co_copy_range_to = raw_co_copy_range_to, .bdrv_refresh_limits = raw_refresh_limits, - .bdrv_co_io_plug = raw_co_io_plug, - .bdrv_co_io_unplug = raw_co_io_unplug, .bdrv_attach_aio_context = raw_aio_attach_aio_context, .bdrv_co_truncate = raw_co_truncate, @@ -XXX,XX +XXX,XX @@ static BlockDriver bdrv_host_cdrom = { .bdrv_co_pwritev = raw_co_pwritev, .bdrv_co_flush_to_disk = raw_co_flush_to_disk, .bdrv_refresh_limits = cdrom_refresh_limits, - .bdrv_co_io_plug = raw_co_io_plug, - .bdrv_co_io_unplug = raw_co_io_unplug, .bdrv_attach_aio_context = raw_aio_attach_aio_context, .bdrv_co_truncate = raw_co_truncate, @@ -XXX,XX +XXX,XX @@ static BlockDriver bdrv_host_cdrom = { .bdrv_co_pwritev = raw_co_pwritev, .bdrv_co_flush_to_disk = raw_co_flush_to_disk, .bdrv_refresh_limits = cdrom_refresh_limits, - .bdrv_co_io_plug = raw_co_io_plug, - .bdrv_co_io_unplug = raw_co_io_unplug, .bdrv_attach_aio_context = raw_aio_attach_aio_context, .bdrv_co_truncate = raw_co_truncate, diff --git a/block/linux-aio.c b/block/linux-aio.c index XXXXXXX..XXXXXXX 100644 --- a/block/linux-aio.c +++ b/block/linux-aio.c @@ -XXX,XX +XXX,XX @@ #include "qemu/event_notifier.h" #include "qemu/coroutine.h" #include "qapi/error.h" +#include "sysemu/block-backend.h" /* Only used for assertions. */ #include "qemu/coroutine_int.h" @@ -XXX,XX +XXX,XX @@ struct qemu_laiocb { }; typedef struct { - int plugged; unsigned int in_queue; unsigned int in_flight; bool blocked; @@ -XXX,XX +XXX,XX @@ static void qemu_laio_process_completions_and_submit(LinuxAioState *s) { qemu_laio_process_completions(s); - if (!s->io_q.plugged && !QSIMPLEQ_EMPTY(&s->io_q.pending)) { + if (!QSIMPLEQ_EMPTY(&s->io_q.pending)) { ioq_submit(s); } } @@ -XXX,XX +XXX,XX @@ static void qemu_laio_poll_ready(EventNotifier *opaque) static void ioq_init(LaioQueue *io_q) { QSIMPLEQ_INIT(&io_q->pending); - io_q->plugged = 0; io_q->in_queue = 0; io_q->in_flight = 0; io_q->blocked = false; @@ -XXX,XX +XXX,XX @@ static uint64_t laio_max_batch(LinuxAioState *s, uint64_t dev_max_batch) return max_batch; } -void laio_io_plug(void) +static void laio_unplug_fn(void *opaque) { - AioContext *ctx = qemu_get_current_aio_context(); - LinuxAioState *s = aio_get_linux_aio(ctx); + LinuxAioState *s = opaque; - s->io_q.plugged++; -} - -void laio_io_unplug(uint64_t dev_max_batch) -{ - AioContext *ctx = qemu_get_current_aio_context(); - LinuxAioState *s = aio_get_linux_aio(ctx); - - assert(s->io_q.plugged); - s->io_q.plugged--; - - /* - * Why max batch checking is performed here: - * Another BDS may have queued requests with a higher dev_max_batch and - * therefore in_queue could now exceed our dev_max_batch. Re-check the max - * batch so we can honor our device's dev_max_batch. - */ - if (s->io_q.in_queue >= laio_max_batch(s, dev_max_batch) || - (!s->io_q.plugged && - !s->io_q.blocked && !QSIMPLEQ_EMPTY(&s->io_q.pending))) { + if (!s->io_q.blocked && !QSIMPLEQ_EMPTY(&s->io_q.pending)) { ioq_submit(s); } } @@ -XXX,XX +XXX,XX @@ static int laio_do_submit(int fd, struct qemu_laiocb *laiocb, off_t offset, QSIMPLEQ_INSERT_TAIL(&s->io_q.pending, laiocb, next); s->io_q.in_queue++; - if (!s->io_q.blocked && - (!s->io_q.plugged || - s->io_q.in_queue >= laio_max_batch(s, dev_max_batch))) { - ioq_submit(s); + if (!s->io_q.blocked) { + if (s->io_q.in_queue >= laio_max_batch(s, dev_max_batch)) { + ioq_submit(s); + } else { + blk_io_plug_call(laio_unplug_fn, s); + } } return 0; -- 2.40.1
No block driver implements .bdrv_co_io_plug() anymore. Get rid of the function pointers. Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com> Reviewed-by: Eric Blake <eblake@redhat.com> --- include/block/block-io.h | 3 --- include/block/block_int-common.h | 11 ---------- block/io.c | 37 -------------------------------- 3 files changed, 51 deletions(-) diff --git a/include/block/block-io.h b/include/block/block-io.h index XXXXXXX..XXXXXXX 100644 --- a/include/block/block-io.h +++ b/include/block/block-io.h @@ -XXX,XX +XXX,XX @@ void coroutine_fn bdrv_co_leave(BlockDriverState *bs, AioContext *old_ctx); AioContext *child_of_bds_get_parent_aio_context(BdrvChild *c); -void coroutine_fn GRAPH_RDLOCK bdrv_co_io_plug(BlockDriverState *bs); -void coroutine_fn GRAPH_RDLOCK bdrv_co_io_unplug(BlockDriverState *bs); - bool coroutine_fn GRAPH_RDLOCK bdrv_co_can_store_new_dirty_bitmap(BlockDriverState *bs, const char *name, uint32_t granularity, Error **errp); diff --git a/include/block/block_int-common.h b/include/block/block_int-common.h index XXXXXXX..XXXXXXX 100644 --- a/include/block/block_int-common.h +++ b/include/block/block_int-common.h @@ -XXX,XX +XXX,XX @@ struct BlockDriver { void coroutine_fn GRAPH_RDLOCK_PTR (*bdrv_co_debug_event)( BlockDriverState *bs, BlkdebugEvent event); - /* io queue for linux-aio */ - void coroutine_fn GRAPH_RDLOCK_PTR (*bdrv_co_io_plug)(BlockDriverState *bs); - void coroutine_fn GRAPH_RDLOCK_PTR (*bdrv_co_io_unplug)( - BlockDriverState *bs); - /** * bdrv_drain_begin is called if implemented in the beginning of a * drain operation to drain and stop any internal sources of requests in @@ -XXX,XX +XXX,XX @@ struct BlockDriverState { unsigned int in_flight; unsigned int serialising_in_flight; - /* - * counter for nested bdrv_io_plug. - * Accessed with atomic ops. - */ - unsigned io_plugged; - /* do we need to tell the quest if we have a volatile write cache? */ int enable_write_cache; diff --git a/block/io.c b/block/io.c index XXXXXXX..XXXXXXX 100644 --- a/block/io.c +++ b/block/io.c @@ -XXX,XX +XXX,XX @@ void *qemu_try_blockalign0(BlockDriverState *bs, size_t size) return mem; } -void coroutine_fn bdrv_co_io_plug(BlockDriverState *bs) -{ - BdrvChild *child; - IO_CODE(); - assert_bdrv_graph_readable(); - - QLIST_FOREACH(child, &bs->children, next) { - bdrv_co_io_plug(child->bs); - } - - if (qatomic_fetch_inc(&bs->io_plugged) == 0) { - BlockDriver *drv = bs->drv; - if (drv && drv->bdrv_co_io_plug) { - drv->bdrv_co_io_plug(bs); - } - } -} - -void coroutine_fn bdrv_co_io_unplug(BlockDriverState *bs) -{ - BdrvChild *child; - IO_CODE(); - assert_bdrv_graph_readable(); - - assert(bs->io_plugged); - if (qatomic_fetch_dec(&bs->io_plugged) == 1) { - BlockDriver *drv = bs->drv; - if (drv && drv->bdrv_co_io_unplug) { - drv->bdrv_co_io_unplug(bs); - } - } - - QLIST_FOREACH(child, &bs->children, next) { - bdrv_co_io_unplug(child->bs); - } -} - /* Helper that undoes bdrv_register_buf() when it fails partway through */ static void GRAPH_RDLOCK bdrv_register_buf_rollback(BlockDriverState *bs, void *host, size_t size, -- 2.40.1