Implement reconnect. To achieve this:
1. add new modes:
connecting-wait: means, that reconnecting is in progress, and there
were small number of reconnect attempts, so all requests are
waiting for the connection.
connecting-nowait: reconnecting is in progress, there were a lot of
attempts of reconnect, all requests will return errors.
two old modes are used too:
connected: normal state
quit: exiting after fatal error or on close
Possible transitions are:
* -> quit
connecting-* -> connected
connecting-wait -> connecting-nowait (transition is done after
reconnect-delay seconds in connecting-wait mode)
connected -> connecting-wait
2. Implement reconnect in connection_co. So, in connecting-* mode,
connection_co, tries to reconnect unlimited times.
3. Retry nbd queries on channel error, if we are in connecting-wait
state.
Signed-off-by: Vladimir Sementsov-Ogievskiy <vsementsov@virtuozzo.com>
---
block/nbd.c | 332 ++++++++++++++++++++++++++++++++++++++++++----------
1 file changed, 269 insertions(+), 63 deletions(-)
diff --git a/block/nbd.c b/block/nbd.c
index 813c40d8f0..bec01d85c7 100644
--- a/block/nbd.c
+++ b/block/nbd.c
@@ -1,6 +1,7 @@
/*
* QEMU Block driver for NBD
*
+ * Copyright (c) 2019 Virtuozzo International GmbH.
* Copyright (C) 2016 Red Hat, Inc.
* Copyright (C) 2008 Bull S.A.S.
* Author: Laurent Vivier <Laurent.Vivier@bull.net>
@@ -55,6 +56,8 @@ typedef struct {
} NBDClientRequest;
typedef enum NBDClientState {
+ NBD_CLIENT_CONNECTING_WAIT,
+ NBD_CLIENT_CONNECTING_NOWAIT,
NBD_CLIENT_CONNECTED,
NBD_CLIENT_QUIT
} NBDClientState;
@@ -67,8 +70,14 @@ typedef struct BDRVNBDState {
CoMutex send_mutex;
CoQueue free_sema;
Coroutine *connection_co;
+ QemuCoSleepState *connection_co_sleep_ns_state;
+ bool drained;
+ bool wait_drained_end;
int in_flight;
NBDClientState state;
+ int connect_status;
+ Error *connect_err;
+ bool wait_in_flight;
NBDClientRequest requests[MAX_NBD_REQUESTS];
NBDReply reply;
@@ -83,10 +92,21 @@ typedef struct BDRVNBDState {
char *x_dirty_bitmap;
} BDRVNBDState;
-/* @ret will be used for reconnect in future */
+static int nbd_client_connect(BlockDriverState *bs, Error **errp);
+
static void nbd_channel_error(BDRVNBDState *s, int ret)
{
- s->state = NBD_CLIENT_QUIT;
+ if (ret == -EIO) {
+ if (s->state == NBD_CLIENT_CONNECTED) {
+ s->state = s->reconnect_delay ? NBD_CLIENT_CONNECTING_WAIT :
+ NBD_CLIENT_CONNECTING_NOWAIT;
+ }
+ } else {
+ if (s->state == NBD_CLIENT_CONNECTED) {
+ qio_channel_shutdown(s->ioc, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
+ }
+ s->state = NBD_CLIENT_QUIT;
+ }
}
static void nbd_recv_coroutines_wake_all(BDRVNBDState *s)
@@ -129,7 +149,13 @@ static void nbd_client_attach_aio_context(BlockDriverState *bs,
{
BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
- qio_channel_attach_aio_context(QIO_CHANNEL(s->ioc), new_context);
+ /*
+ * s->connection_co is either yielded from nbd_receive_reply or from
+ * nbd_reconnect_loop()
+ */
+ if (s->state == NBD_CLIENT_CONNECTED) {
+ qio_channel_attach_aio_context(QIO_CHANNEL(s->ioc), new_context);
+ }
bdrv_inc_in_flight(bs);
@@ -140,24 +166,151 @@ static void nbd_client_attach_aio_context(BlockDriverState *bs,
aio_wait_bh_oneshot(new_context, nbd_client_attach_aio_context_bh, bs);
}
+static void coroutine_fn nbd_client_co_drain_begin(BlockDriverState *bs)
+{
+ BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
-static void nbd_teardown_connection(BlockDriverState *bs)
+ s->drained = true;
+ if (s->connection_co_sleep_ns_state) {
+ qemu_co_sleep_wake(s->connection_co_sleep_ns_state);
+ }
+}
+
+static void coroutine_fn nbd_client_co_drain_end(BlockDriverState *bs)
{
BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
- assert(s->ioc);
+ s->drained = false;
+ if (s->wait_drained_end) {
+ s->wait_drained_end = false;
+ aio_co_wake(s->connection_co);
+ }
+}
+
- /* finish any pending coroutines */
- qio_channel_shutdown(s->ioc,
- QIO_CHANNEL_SHUTDOWN_BOTH,
- NULL);
+static void nbd_teardown_connection(BlockDriverState *bs)
+{
+ BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
+
+ if (s->state == NBD_CLIENT_CONNECTED) {
+ /* finish any pending coroutines */
+ assert(s->ioc);
+ qio_channel_shutdown(s->ioc, QIO_CHANNEL_SHUTDOWN_BOTH, NULL);
+ }
+ s->state = NBD_CLIENT_QUIT;
+ if (s->connection_co) {
+ if (s->connection_co_sleep_ns_state) {
+ qemu_co_sleep_wake(s->connection_co_sleep_ns_state);
+ }
+ }
BDRV_POLL_WHILE(bs, s->connection_co);
+}
- nbd_client_detach_aio_context(bs);
- object_unref(OBJECT(s->sioc));
- s->sioc = NULL;
- object_unref(OBJECT(s->ioc));
- s->ioc = NULL;
+static bool nbd_client_connecting(BDRVNBDState *s)
+{
+ return s->state == NBD_CLIENT_CONNECTING_WAIT ||
+ s->state == NBD_CLIENT_CONNECTING_NOWAIT;
+}
+
+static bool nbd_client_connecting_wait(BDRVNBDState *s)
+{
+ return s->state == NBD_CLIENT_CONNECTING_WAIT;
+}
+
+static coroutine_fn void nbd_reconnect_attempt(BDRVNBDState *s)
+{
+ Error *local_err = NULL;
+
+ if (!nbd_client_connecting(s)) {
+ return;
+ }
+
+ /* Wait for completion of all in-flight requests */
+
+ qemu_co_mutex_lock(&s->send_mutex);
+
+ while (s->in_flight > 0) {
+ qemu_co_mutex_unlock(&s->send_mutex);
+ nbd_recv_coroutines_wake_all(s);
+ s->wait_in_flight = true;
+ qemu_coroutine_yield();
+ s->wait_in_flight = false;
+ qemu_co_mutex_lock(&s->send_mutex);
+ }
+
+ qemu_co_mutex_unlock(&s->send_mutex);
+
+ if (!nbd_client_connecting(s)) {
+ return;
+ }
+
+ /*
+ * Now we are sure that nobody is accessing the channel, and no one will
+ * try until we set the state to CONNECTED.
+ */
+
+ /* Finalize previous connection if any */
+ if (s->ioc) {
+ nbd_client_detach_aio_context(s->bs);
+ object_unref(OBJECT(s->sioc));
+ s->sioc = NULL;
+ object_unref(OBJECT(s->ioc));
+ s->ioc = NULL;
+ }
+
+ s->connect_status = nbd_client_connect(s->bs, &local_err);
+ error_free(s->connect_err);
+ s->connect_err = NULL;
+ error_propagate(&s->connect_err, local_err);
+ local_err = NULL;
+
+ if (s->connect_status < 0) {
+ /* failed attempt */
+ return;
+ }
+
+ /* successfully connected */
+ s->state = NBD_CLIENT_CONNECTED;
+ qemu_co_queue_restart_all(&s->free_sema);
+}
+
+static coroutine_fn void nbd_reconnect_loop(BDRVNBDState *s)
+{
+ uint64_t start_time_ns = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
+ uint64_t delay_ns = s->reconnect_delay * NANOSECONDS_PER_SECOND;
+ uint64_t timeout = 1 * NANOSECONDS_PER_SECOND;
+ uint64_t max_timeout = 16 * NANOSECONDS_PER_SECOND;
+
+ nbd_reconnect_attempt(s);
+
+ while (nbd_client_connecting(s)) {
+ if (s->state == NBD_CLIENT_CONNECTING_WAIT &&
+ qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - start_time_ns > delay_ns)
+ {
+ s->state = NBD_CLIENT_CONNECTING_NOWAIT;
+ qemu_co_queue_restart_all(&s->free_sema);
+ }
+
+ qemu_co_sleep_ns_wakeable(QEMU_CLOCK_REALTIME, timeout,
+ &s->connection_co_sleep_ns_state);
+ if (s->drained) {
+ bdrv_dec_in_flight(s->bs);
+ s->wait_drained_end = true;
+ while (s->drained) {
+ /*
+ * We may be entered once from nbd_client_attach_aio_context_bh
+ * and then from nbd_client_co_drain_end. So here is a loop.
+ */
+ qemu_coroutine_yield();
+ }
+ bdrv_inc_in_flight(s->bs);
+ }
+ if (timeout < max_timeout) {
+ timeout *= 2;
+ }
+
+ nbd_reconnect_attempt(s);
+ }
}
static coroutine_fn void nbd_connection_entry(void *opaque)
@@ -177,16 +330,26 @@ static coroutine_fn void nbd_connection_entry(void *opaque)
* Therefore we keep an additional in_flight reference all the time and
* only drop it temporarily here.
*/
+
+ if (nbd_client_connecting(s)) {
+ nbd_reconnect_loop(s);
+ }
+
+ if (s->state != NBD_CLIENT_CONNECTED) {
+ continue;
+ }
+
assert(s->reply.handle == 0);
ret = nbd_receive_reply(s->bs, s->ioc, &s->reply, &local_err);
if (local_err) {
trace_nbd_read_reply_entry_fail(ret, error_get_pretty(local_err));
error_free(local_err);
+ local_err = NULL;
}
if (ret <= 0) {
nbd_channel_error(s, ret ? ret : -EIO);
- break;
+ continue;
}
/*
@@ -201,7 +364,7 @@ static coroutine_fn void nbd_connection_entry(void *opaque)
(nbd_reply_is_structured(&s->reply) && !s->info.structured_reply))
{
nbd_channel_error(s, -EINVAL);
- break;
+ continue;
}
/*
@@ -220,10 +383,19 @@ static coroutine_fn void nbd_connection_entry(void *opaque)
qemu_coroutine_yield();
}
+ qemu_co_queue_restart_all(&s->free_sema);
nbd_recv_coroutines_wake_all(s);
bdrv_dec_in_flight(s->bs);
s->connection_co = NULL;
+ if (s->ioc) {
+ nbd_client_detach_aio_context(s->bs);
+ object_unref(OBJECT(s->sioc));
+ s->sioc = NULL;
+ object_unref(OBJECT(s->ioc));
+ s->ioc = NULL;
+ }
+
aio_wait_kick();
}
@@ -235,7 +407,7 @@ static int nbd_co_send_request(BlockDriverState *bs,
int rc, i = -1;
qemu_co_mutex_lock(&s->send_mutex);
- while (s->in_flight == MAX_NBD_REQUESTS) {
+ while (s->in_flight == MAX_NBD_REQUESTS || nbd_client_connecting_wait(s)) {
qemu_co_queue_wait(&s->free_sema, &s->send_mutex);
}
@@ -286,7 +458,11 @@ err:
s->requests[i].coroutine = NULL;
s->in_flight--;
}
- qemu_co_queue_next(&s->free_sema);
+ if (s->in_flight == 0 && s->wait_in_flight) {
+ aio_co_wake(s->connection_co);
+ } else {
+ qemu_co_queue_next(&s->free_sema);
+ }
}
qemu_co_mutex_unlock(&s->send_mutex);
return rc;
@@ -666,10 +842,15 @@ static coroutine_fn int nbd_co_receive_one_chunk(
} else {
/* For assert at loop start in nbd_connection_entry */
*reply = s->reply;
- s->reply.handle = 0;
}
+ s->reply.handle = 0;
- if (s->connection_co) {
+ if (s->connection_co && !s->wait_in_flight) {
+ /*
+ * We must check s->wait_in_flight, because we may entered by
+ * nbd_recv_coroutines_wake_all(), in this case we should not
+ * wake connection_co here, it will woken by last request.
+ */
aio_co_wake(s->connection_co);
}
@@ -781,7 +962,11 @@ break_loop:
qemu_co_mutex_lock(&s->send_mutex);
s->in_flight--;
- qemu_co_queue_next(&s->free_sema);
+ if (s->in_flight == 0 && s->wait_in_flight) {
+ aio_co_wake(s->connection_co);
+ } else {
+ qemu_co_queue_next(&s->free_sema);
+ }
qemu_co_mutex_unlock(&s->send_mutex);
return false;
@@ -927,20 +1112,26 @@ static int nbd_co_request(BlockDriverState *bs, NBDRequest *request,
} else {
assert(request->type != NBD_CMD_WRITE);
}
- ret = nbd_co_send_request(bs, request, write_qiov);
- if (ret < 0) {
- return ret;
- }
- ret = nbd_co_receive_return_code(s, request->handle,
- &request_ret, &local_err);
- if (local_err) {
- trace_nbd_co_request_fail(request->from, request->len, request->handle,
- request->flags, request->type,
- nbd_cmd_lookup(request->type),
- ret, error_get_pretty(local_err));
- error_free(local_err);
- }
+ do {
+ ret = nbd_co_send_request(bs, request, write_qiov);
+ if (ret < 0) {
+ continue;
+ }
+
+ ret = nbd_co_receive_return_code(s, request->handle,
+ &request_ret, &local_err);
+ if (local_err) {
+ trace_nbd_co_request_fail(request->from, request->len,
+ request->handle, request->flags,
+ request->type,
+ nbd_cmd_lookup(request->type),
+ ret, error_get_pretty(local_err));
+ error_free(local_err);
+ local_err = NULL;
+ }
+ } while (ret < 0 && nbd_client_connecting_wait(s));
+
return ret ? ret : request_ret;
}
@@ -981,20 +1172,24 @@ static int nbd_client_co_preadv(BlockDriverState *bs, uint64_t offset,
request.len -= slop;
}
- ret = nbd_co_send_request(bs, &request, NULL);
- if (ret < 0) {
- return ret;
- }
+ do {
+ ret = nbd_co_send_request(bs, &request, NULL);
+ if (ret < 0) {
+ continue;
+ }
+
+ ret = nbd_co_receive_cmdread_reply(s, request.handle, offset, qiov,
+ &request_ret, &local_err);
+ if (local_err) {
+ trace_nbd_co_request_fail(request.from, request.len, request.handle,
+ request.flags, request.type,
+ nbd_cmd_lookup(request.type),
+ ret, error_get_pretty(local_err));
+ error_free(local_err);
+ local_err = NULL;
+ }
+ } while (ret < 0 && nbd_client_connecting_wait(s));
- ret = nbd_co_receive_cmdread_reply(s, request.handle, offset, qiov,
- &request_ret, &local_err);
- if (local_err) {
- trace_nbd_co_request_fail(request.from, request.len, request.handle,
- request.flags, request.type,
- nbd_cmd_lookup(request.type),
- ret, error_get_pretty(local_err));
- error_free(local_err);
- }
return ret ? ret : request_ret;
}
@@ -1131,20 +1326,25 @@ static int coroutine_fn nbd_client_co_block_status(
if (s->info.min_block) {
assert(QEMU_IS_ALIGNED(request.len, s->info.min_block));
}
- ret = nbd_co_send_request(bs, &request, NULL);
- if (ret < 0) {
- return ret;
- }
+ do {
+ ret = nbd_co_send_request(bs, &request, NULL);
+ if (ret < 0) {
+ continue;
+ }
+
+ ret = nbd_co_receive_blockstatus_reply(s, request.handle, bytes,
+ &extent, &request_ret,
+ &local_err);
+ if (local_err) {
+ trace_nbd_co_request_fail(request.from, request.len, request.handle,
+ request.flags, request.type,
+ nbd_cmd_lookup(request.type),
+ ret, error_get_pretty(local_err));
+ error_free(local_err);
+ local_err = NULL;
+ }
+ } while (ret < 0 && nbd_client_connecting_wait(s));
- ret = nbd_co_receive_blockstatus_reply(s, request.handle, bytes,
- &extent, &request_ret, &local_err);
- if (local_err) {
- trace_nbd_co_request_fail(request.from, request.len, request.handle,
- request.flags, request.type,
- nbd_cmd_lookup(request.type),
- ret, error_get_pretty(local_err));
- error_free(local_err);
- }
if (ret < 0 || request_ret < 0) {
return ret ? ret : request_ret;
}
@@ -1163,9 +1363,9 @@ static void nbd_client_close(BlockDriverState *bs)
BDRVNBDState *s = (BDRVNBDState *)bs->opaque;
NBDRequest request = { .type = NBD_CMD_DISC };
- assert(s->ioc);
-
- nbd_send_request(s->ioc, &request);
+ if (s->ioc) {
+ nbd_send_request(s->ioc, &request);
+ }
nbd_teardown_connection(bs);
}
@@ -1808,6 +2008,8 @@ static BlockDriver bdrv_nbd = {
.bdrv_getlength = nbd_getlength,
.bdrv_detach_aio_context = nbd_client_detach_aio_context,
.bdrv_attach_aio_context = nbd_client_attach_aio_context,
+ .bdrv_co_drain_begin = nbd_client_co_drain_begin,
+ .bdrv_co_drain_end = nbd_client_co_drain_end,
.bdrv_refresh_filename = nbd_refresh_filename,
.bdrv_co_block_status = nbd_client_co_block_status,
.bdrv_dirname = nbd_dirname,
@@ -1830,6 +2032,8 @@ static BlockDriver bdrv_nbd_tcp = {
.bdrv_getlength = nbd_getlength,
.bdrv_detach_aio_context = nbd_client_detach_aio_context,
.bdrv_attach_aio_context = nbd_client_attach_aio_context,
+ .bdrv_co_drain_begin = nbd_client_co_drain_begin,
+ .bdrv_co_drain_end = nbd_client_co_drain_end,
.bdrv_refresh_filename = nbd_refresh_filename,
.bdrv_co_block_status = nbd_client_co_block_status,
.bdrv_dirname = nbd_dirname,
@@ -1852,6 +2056,8 @@ static BlockDriver bdrv_nbd_unix = {
.bdrv_getlength = nbd_getlength,
.bdrv_detach_aio_context = nbd_client_detach_aio_context,
.bdrv_attach_aio_context = nbd_client_attach_aio_context,
+ .bdrv_co_drain_begin = nbd_client_co_drain_begin,
+ .bdrv_co_drain_end = nbd_client_co_drain_end,
.bdrv_refresh_filename = nbd_refresh_filename,
.bdrv_co_block_status = nbd_client_co_block_status,
.bdrv_dirname = nbd_dirname,
--
2.21.0
On 9/17/19 12:13 PM, Vladimir Sementsov-Ogievskiy wrote:
> Implement reconnect. To achieve this:
>
> 1. add new modes:
> connecting-wait: means, that reconnecting is in progress, and there
> were small number of reconnect attempts, so all requests are
> waiting for the connection.
> connecting-nowait: reconnecting is in progress, there were a lot of
> attempts of reconnect, all requests will return errors.
>
> two old modes are used too:
> connected: normal state
> quit: exiting after fatal error or on close
>
> Possible transitions are:
>
> * -> quit
> connecting-* -> connected
> connecting-wait -> connecting-nowait (transition is done after
> reconnect-delay seconds in connecting-wait mode)
> connected -> connecting-wait
>
> 2. Implement reconnect in connection_co. So, in connecting-* mode,
> connection_co, tries to reconnect unlimited times.
>
> 3. Retry nbd queries on channel error, if we are in connecting-wait
> state.
>
> Signed-off-by: Vladimir Sementsov-Ogievskiy <vsementsov@virtuozzo.com>
> ---
> block/nbd.c | 332 ++++++++++++++++++++++++++++++++++++++++++----------
> 1 file changed, 269 insertions(+), 63 deletions(-)
>
> +
> +static coroutine_fn void nbd_reconnect_attempt(BDRVNBDState *s)
> +{
> + Error *local_err = NULL;
> +
> + if (!nbd_client_connecting(s)) {
> + return;
> + }
> +
> + /* Wait for completion of all in-flight requests */
> +
> + qemu_co_mutex_lock(&s->send_mutex);
> +
> + while (s->in_flight > 0) {
> + qemu_co_mutex_unlock(&s->send_mutex);
> + nbd_recv_coroutines_wake_all(s);
> + s->wait_in_flight = true;
> + qemu_coroutine_yield();
> + s->wait_in_flight = false;
> + qemu_co_mutex_lock(&s->send_mutex);
> + }
> +
> + qemu_co_mutex_unlock(&s->send_mutex);
> +
> + if (!nbd_client_connecting(s)) {
> + return;
> + }
> +
> + /*
> + * Now we are sure that nobody is accessing the channel, and no one will
> + * try until we set the state to CONNECTED.
> + */
> +
> + /* Finalize previous connection if any */
> + if (s->ioc) {
> + nbd_client_detach_aio_context(s->bs);
> + object_unref(OBJECT(s->sioc));
> + s->sioc = NULL;
> + object_unref(OBJECT(s->ioc));
> + s->ioc = NULL;
> + }
> +
> + s->connect_status = nbd_client_connect(s->bs, &local_err);
> + error_free(s->connect_err);
> + s->connect_err = NULL;
> + error_propagate(&s->connect_err, local_err);
> + local_err = NULL;
> +
Looks like a dead assignment to local_err. But I see elsewhere you add
it, because you convert straight-line code into loops where it matters.
> + if (s->connect_status < 0) {
> + /* failed attempt */
> + return;
> + }
> +
> + /* successfully connected */
> + s->state = NBD_CLIENT_CONNECTED;
> + qemu_co_queue_restart_all(&s->free_sema);
> +}
> +
> +static coroutine_fn void nbd_reconnect_loop(BDRVNBDState *s)
Coroutine functions should generally have '_co_' in their name. I'd
prefer nbd_co_reconnect_loop.
> +{
> + uint64_t start_time_ns = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
> + uint64_t delay_ns = s->reconnect_delay * NANOSECONDS_PER_SECOND;
> + uint64_t timeout = 1 * NANOSECONDS_PER_SECOND;
> + uint64_t max_timeout = 16 * NANOSECONDS_PER_SECOND;
> +
> + nbd_reconnect_attempt(s);
> +
> + while (nbd_client_connecting(s)) {
> + if (s->state == NBD_CLIENT_CONNECTING_WAIT &&
> + qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - start_time_ns > delay_ns)
> + {
> + s->state = NBD_CLIENT_CONNECTING_NOWAIT;
> + qemu_co_queue_restart_all(&s->free_sema);
> + }
> +
> + qemu_co_sleep_ns_wakeable(QEMU_CLOCK_REALTIME, timeout,
> + &s->connection_co_sleep_ns_state);
> + if (s->drained) {
> + bdrv_dec_in_flight(s->bs);
> + s->wait_drained_end = true;
> + while (s->drained) {
> + /*
> + * We may be entered once from nbd_client_attach_aio_context_bh
> + * and then from nbd_client_co_drain_end. So here is a loop.
> + */
> + qemu_coroutine_yield();
> + }
> + bdrv_inc_in_flight(s->bs);
> + }
> + if (timeout < max_timeout) {
> + timeout *= 2;
> + }
Exponential backup, ok. If I read the loop correctly, you've hardcoded
the max_timeout at 16s, which means the overall timeout is about 30s
when adding in the time of the earlier iterations. Does that need to be
tunable? But for now I can live with it.
> +
> + nbd_reconnect_attempt(s);
> + }
> }
>
> static coroutine_fn void nbd_connection_entry(void *opaque)
> @@ -177,16 +330,26 @@ static coroutine_fn void nbd_connection_entry(void *opaque)
> * Therefore we keep an additional in_flight reference all the time and
> * only drop it temporarily here.
> */
> +
> + if (nbd_client_connecting(s)) {
> + nbd_reconnect_loop(s);
> + }
> +
> + if (s->state != NBD_CLIENT_CONNECTED) {
> + continue;
> + }
Is 'continue' right, even if s->state == QUIT?
> +
> assert(s->reply.handle == 0);
> ret = nbd_receive_reply(s->bs, s->ioc, &s->reply, &local_err);
>
> if (local_err) {
> trace_nbd_read_reply_entry_fail(ret, error_get_pretty(local_err));
> error_free(local_err);
> + local_err = NULL;
Could be fun in concert with your proposal to get rid of local_err ;)
But here, we aren't using error_propagate().
> }
> if (ret <= 0) {
> nbd_channel_error(s, ret ? ret : -EIO);
> - break;
> + continue;
> }
>
> /*
We're getting really close. If you can answer my questions above, and
the only thing left is adding _co_ in the function name, I could tweak
that locally to spare you a v10. At any rate, I'm tentatively queuing
this on my NBD tree; I'll probably do a pull request today without it,
and save it for next week's PR after I've had a week to hammer on it in
local tests.
--
Eric Blake, Principal Software Engineer
Red Hat, Inc. +1-919-301-3226
Virtualization: qemu.org | libvirt.org
23.09.2019 22:23, Eric Blake wrote:
> On 9/17/19 12:13 PM, Vladimir Sementsov-Ogievskiy wrote:
>> Implement reconnect. To achieve this:
>>
>> 1. add new modes:
>> connecting-wait: means, that reconnecting is in progress, and there
>> were small number of reconnect attempts, so all requests are
>> waiting for the connection.
>> connecting-nowait: reconnecting is in progress, there were a lot of
>> attempts of reconnect, all requests will return errors.
>>
>> two old modes are used too:
>> connected: normal state
>> quit: exiting after fatal error or on close
>>
>> Possible transitions are:
>>
>> * -> quit
>> connecting-* -> connected
>> connecting-wait -> connecting-nowait (transition is done after
>> reconnect-delay seconds in connecting-wait mode)
>> connected -> connecting-wait
>>
>> 2. Implement reconnect in connection_co. So, in connecting-* mode,
>> connection_co, tries to reconnect unlimited times.
>>
>> 3. Retry nbd queries on channel error, if we are in connecting-wait
>> state.
>>
>> Signed-off-by: Vladimir Sementsov-Ogievskiy <vsementsov@virtuozzo.com>
>> ---
>> block/nbd.c | 332 ++++++++++++++++++++++++++++++++++++++++++----------
>> 1 file changed, 269 insertions(+), 63 deletions(-)
>>
>
>> +
>> +static coroutine_fn void nbd_reconnect_attempt(BDRVNBDState *s)
>> +{
>> + Error *local_err = NULL;
>> +
>> + if (!nbd_client_connecting(s)) {
>> + return;
>> + }
>> +
>> + /* Wait for completion of all in-flight requests */
>> +
>> + qemu_co_mutex_lock(&s->send_mutex);
>> +
>> + while (s->in_flight > 0) {
>> + qemu_co_mutex_unlock(&s->send_mutex);
>> + nbd_recv_coroutines_wake_all(s);
>> + s->wait_in_flight = true;
>> + qemu_coroutine_yield();
>> + s->wait_in_flight = false;
>> + qemu_co_mutex_lock(&s->send_mutex);
>> + }
>> +
>> + qemu_co_mutex_unlock(&s->send_mutex);
>> +
>> + if (!nbd_client_connecting(s)) {
>> + return;
>> + }
>> +
>> + /*
>> + * Now we are sure that nobody is accessing the channel, and no one will
>> + * try until we set the state to CONNECTED.
>> + */
>> +
>> + /* Finalize previous connection if any */
>> + if (s->ioc) {
>> + nbd_client_detach_aio_context(s->bs);
>> + object_unref(OBJECT(s->sioc));
>> + s->sioc = NULL;
>> + object_unref(OBJECT(s->ioc));
>> + s->ioc = NULL;
>> + }
>> +
>> + s->connect_status = nbd_client_connect(s->bs, &local_err);
>> + error_free(s->connect_err);
>> + s->connect_err = NULL;
>> + error_propagate(&s->connect_err, local_err);
>> + local_err = NULL;
>> +
>
> Looks like a dead assignment to local_err.
Hmm, agree, it's dead.
> But I see elsewhere you add
> it, because you convert straight-line code into loops where it matters.
>
>> + if (s->connect_status < 0) {
>> + /* failed attempt */
>> + return;
>> + }
>> +
>> + /* successfully connected */
>> + s->state = NBD_CLIENT_CONNECTED;
>> + qemu_co_queue_restart_all(&s->free_sema);
>> +}
>> +
>> +static coroutine_fn void nbd_reconnect_loop(BDRVNBDState *s)
>
> Coroutine functions should generally have '_co_' in their name. I'd
> prefer nbd_co_reconnect_loop.
OK, agreed.
>
>> +{
>> + uint64_t start_time_ns = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
>> + uint64_t delay_ns = s->reconnect_delay * NANOSECONDS_PER_SECOND;
>> + uint64_t timeout = 1 * NANOSECONDS_PER_SECOND;
>> + uint64_t max_timeout = 16 * NANOSECONDS_PER_SECOND;
>> +
>> + nbd_reconnect_attempt(s);
>> +
>> + while (nbd_client_connecting(s)) {
>> + if (s->state == NBD_CLIENT_CONNECTING_WAIT &&
>> + qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - start_time_ns > delay_ns)
>> + {
>> + s->state = NBD_CLIENT_CONNECTING_NOWAIT;
>> + qemu_co_queue_restart_all(&s->free_sema);
>> + }
>> +
>> + qemu_co_sleep_ns_wakeable(QEMU_CLOCK_REALTIME, timeout,
>> + &s->connection_co_sleep_ns_state);
>> + if (s->drained) {
>> + bdrv_dec_in_flight(s->bs);
>> + s->wait_drained_end = true;
>> + while (s->drained) {
>> + /*
>> + * We may be entered once from nbd_client_attach_aio_context_bh
>> + * and then from nbd_client_co_drain_end. So here is a loop.
>> + */
>> + qemu_coroutine_yield();
>> + }
>> + bdrv_inc_in_flight(s->bs);
>> + }
>> + if (timeout < max_timeout) {
>> + timeout *= 2;
>> + }
>
> Exponential backup, ok. If I read the loop correctly, you've hardcoded
> the max_timeout at 16s, which means the overall timeout is about 30s
> when adding in the time of the earlier iterations. Does that need to be
> tunable? But for now I can live with it.
I think, we can add an option later if needed.
>
>> +
>> + nbd_reconnect_attempt(s);
>> + }
>> }
>>
>> static coroutine_fn void nbd_connection_entry(void *opaque)
>> @@ -177,16 +330,26 @@ static coroutine_fn void nbd_connection_entry(void *opaque)
>> * Therefore we keep an additional in_flight reference all the time and
>> * only drop it temporarily here.
>> */
>> +
>> + if (nbd_client_connecting(s)) {
>> + nbd_reconnect_loop(s);
>> + }
>> +
>> + if (s->state != NBD_CLIENT_CONNECTED) {
>> + continue;
>> + }
>
> Is 'continue' right, even if s->state == QUIT?
No matter, as we jump to "while (s->state != NBD_CLIENT_QUIT) {".
>
>> +
>> assert(s->reply.handle == 0);
>> ret = nbd_receive_reply(s->bs, s->ioc, &s->reply, &local_err);
>>
>> if (local_err) {
>> trace_nbd_read_reply_entry_fail(ret, error_get_pretty(local_err));
>> error_free(local_err);
>> + local_err = NULL;
>
> Could be fun in concert with your proposal to get rid of local_err ;)
> But here, we aren't using error_propagate().
And we don't have Error **errp parameter here too.
>
>> }
>> if (ret <= 0) {
>> nbd_channel_error(s, ret ? ret : -EIO);
>> - break;
>> + continue;
>> }
>>
>> /*
>
> We're getting really close. If you can answer my questions above, and
> the only thing left is adding _co_ in the function name, I could tweak
> that locally to spare you a v10. At any rate, I'm tentatively queuing
> this on my NBD tree; I'll probably do a pull request today without it,
> and save it for next week's PR after I've had a week to hammer on it in
> local tests.
>
Thank you! That's great!
--
Best regards,
Vladimir
23 Sep 2019 22:23 Eric Blake <eblake@redhat.com> wrote:
On 9/17/19 12:13 PM, Vladimir Sementsov-Ogievskiy wrote:
> Implement reconnect. To achieve this:
>
> 1. add new modes:
> connecting-wait: means, that reconnecting is in progress, and there
> were small number of reconnect attempts, so all requests are
> waiting for the connection.
> connecting-nowait: reconnecting is in progress, there were a lot of
> attempts of reconnect, all requests will return errors.
>
> two old modes are used too:
> connected: normal state
> quit: exiting after fatal error or on close
>
> Possible transitions are:
>
> * -> quit
> connecting-* -> connected
> connecting-wait -> connecting-nowait (transition is done after
> reconnect-delay seconds in connecting-wait mode)
> connected -> connecting-wait
>
> 2. Implement reconnect in connection_co. So, in connecting-* mode,
> connection_co, tries to reconnect unlimited times.
>
> 3. Retry nbd queries on channel error, if we are in connecting-wait
> state.
>
> Signed-off-by: Vladimir Sementsov-Ogievskiy <vsementsov@virtuozzo.com>
> ---
> block/nbd.c | 332 ++++++++++++++++++++++++++++++++++++++++++----------
> 1 file changed, 269 insertions(+), 63 deletions(-)
>
> +
> +static coroutine_fn void nbd_reconnect_attempt(BDRVNBDState *s)
> +{
> + Error *local_err = NULL;
> +
> + if (!nbd_client_connecting(s)) {
> + return;
> + }
> +
> + /* Wait for completion of all in-flight requests */
> +
> + qemu_co_mutex_lock(&s->send_mutex);
> +
> + while (s->in_flight > 0) {
> + qemu_co_mutex_unlock(&s->send_mutex);
> + nbd_recv_coroutines_wake_all(s);
> + s->wait_in_flight = true;
> + qemu_coroutine_yield();
> + s->wait_in_flight = false;
> + qemu_co_mutex_lock(&s->send_mutex);
> + }
> +
> + qemu_co_mutex_unlock(&s->send_mutex);
> +
> + if (!nbd_client_connecting(s)) {
> + return;
> + }
> +
> + /*
> + * Now we are sure that nobody is accessing the channel, and no one will
> + * try until we set the state to CONNECTED.
> + */
> +
> + /* Finalize previous connection if any */
> + if (s->ioc) {
> + nbd_client_detach_aio_context(s->bs);
> + object_unref(OBJECT(s->sioc));
> + s->sioc = NULL;
> + object_unref(OBJECT(s->ioc));
> + s->ioc = NULL;
> + }
> +
> + s->connect_status = nbd_client_connect(s->bs, &local_err);
> + error_free(s->connect_err);
> + s->connect_err = NULL;
> + error_propagate(&s->connect_err, local_err);
> + local_err = NULL;
> +
Looks like a dead assignment to local_err. But I see elsewhere you add
it, because you convert straight-line code into loops where it matters.
> + if (s->connect_status < 0) {
> + /* failed attempt */
> + return;
> + }
> +
> + /* successfully connected */
> + s->state = NBD_CLIENT_CONNECTED;
> + qemu_co_queue_restart_all(&s->free_sema);
> +}
> +
> +static coroutine_fn void nbd_reconnect_loop(BDRVNBDState *s)
Coroutine functions should generally have '_co_' in their name. I'd
prefer nbd_co_reconnect_loop.
> +{
> + uint64_t start_time_ns = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
> + uint64_t delay_ns = s->reconnect_delay * NANOSECONDS_PER_SECOND;
> + uint64_t timeout = 1 * NANOSECONDS_PER_SECOND;
> + uint64_t max_timeout = 16 * NANOSECONDS_PER_SECOND;
> +
> + nbd_reconnect_attempt(s);
> +
> + while (nbd_client_connecting(s)) {
> + if (s->state == NBD_CLIENT_CONNECTING_WAIT &&
> + qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - start_time_ns > delay_ns)
> + {
> + s->state = NBD_CLIENT_CONNECTING_NOWAIT;
> + qemu_co_queue_restart_all(&s->free_sema);
> + }
> +
> + qemu_co_sleep_ns_wakeable(QEMU_CLOCK_REALTIME, timeout,
> + &s->connection_co_sleep_ns_state);
> + if (s->drained) {
> + bdrv_dec_in_flight(s->bs);
> + s->wait_drained_end = true;
> + while (s->drained) {
> + /*
> + * We may be entered once from nbd_client_attach_aio_context_bh
> + * and then from nbd_client_co_drain_end. So here is a loop.
> + */
> + qemu_coroutine_yield();
> + }
> + bdrv_inc_in_flight(s->bs);
> + }
> + if (timeout < max_timeout) {
> + timeout *= 2;
> + }
Exponential backup, ok. If I read the loop correctly, you've hardcoded
the max_timeout at 16s, which means the overall timeout is about 30s
when adding in the time of the earlier iterations. Does that need to be
tunable? But for now I can live with it.
> +
> + nbd_reconnect_attempt(s);
> + }
> }
>
> static coroutine_fn void nbd_connection_entry(void *opaque)
> @@ -177,16 +330,26 @@ static coroutine_fn void nbd_connection_entry(void *opaque)
> * Therefore we keep an additional in_flight reference all the time and
> * only drop it temporarily here.
> */
> +
> + if (nbd_client_connecting(s)) {
> + nbd_reconnect_loop(s);
> + }
> +
> + if (s->state != NBD_CLIENT_CONNECTED) {
> + continue;
> + }
Is 'continue' right, even if s->state == QUIT?
> +
> assert(s->reply.handle == 0);
> ret = nbd_receive_reply(s->bs, s->ioc, &s->reply, &local_err);
>
> if (local_err) {
> trace_nbd_read_reply_entry_fail(ret, error_get_pretty(local_err));
> error_free(local_err);
> + local_err = NULL;
Could be fun in concert with your proposal to get rid of local_err ;)
But here, we aren't using error_propagate().
> }
> if (ret <= 0) {
> nbd_channel_error(s, ret ? ret : -EIO);
> - break;
> + continue;
> }
>
> /*
We're getting really close. If you can answer my questions above, and
the only thing left is adding _co_ in the function name, I could tweak
that locally to spare you a v10. At any rate, I'm tentatively queuing
this on my NBD tree; I'll probably do a pull request today without it,
and save it for next week's PR after I've had a week to hammer on it in
local tests.
Kindly remind. Not a problem of you remember but didn't have time for it.
Best regards,
Vladimir
© 2016 - 2025 Red Hat, Inc.