From nobody Thu Apr 2 15:38:01 2026 Received: from mail-wr1-f49.google.com (mail-wr1-f49.google.com [209.85.221.49]) (using TLSv1.2 with cipher ECDHE-RSA-AES128-GCM-SHA256 (128/128 bits)) (No client certificate requested) by smtp.subspace.kernel.org (Postfix) with ESMTPS id 43B6F39F172 for ; Fri, 27 Mar 2026 22:39:25 +0000 (UTC) Authentication-Results: smtp.subspace.kernel.org; arc=none smtp.client-ip=209.85.221.49 ARC-Seal: i=1; a=rsa-sha256; d=subspace.kernel.org; s=arc-20240116; t=1774651186; cv=none; b=jX9Atim0ZS1KfQd/04Zd4gTMgnTESau8tE4DbcWn6UV/q9QmhiP+gmSpenPZ6wBY2x33vMzQUNj3drl+BFnQRjB6Vag2jA7uNgpGUu4/lMRATIqTvFnoyacJbEbk1P7Lib0YPro7fUSkR/6ot4dU4FzDikN2d1PMv/pU/Uz42Y8= ARC-Message-Signature: i=1; a=rsa-sha256; d=subspace.kernel.org; s=arc-20240116; t=1774651186; c=relaxed/simple; bh=KRvOoxdD/b7KhWrXx0f1PJl/CI9O+CA5KdbeQKdRwaA=; h=From:To:Cc:Subject:Date:Message-ID:In-Reply-To:References: MIME-Version:Content-Type; b=ADNG50jIwOPG1yfA4MAtJADpuZM66RpNVCH/pYitrMUuKdze2GXAxglWyDA9vaMwO9wf0fgzldSpIOVmk5eAnTlIzvmrWnxpQImW6o63nLXPcNGF79xLA+RvZ8rJXM6KvZfkxrMbL62VztAHKBVzjPmg6UmbNRL5cvrxJHlW35w= ARC-Authentication-Results: i=1; smtp.subspace.kernel.org; dmarc=pass (p=none dis=none) header.from=linbit.com; spf=pass smtp.mailfrom=linbit.com; dkim=pass (2048-bit key) header.d=linbit-com.20230601.gappssmtp.com header.i=@linbit-com.20230601.gappssmtp.com header.b=BiFGLs1Y; arc=none smtp.client-ip=209.85.221.49 Authentication-Results: smtp.subspace.kernel.org; dmarc=pass (p=none dis=none) header.from=linbit.com Authentication-Results: smtp.subspace.kernel.org; spf=pass smtp.mailfrom=linbit.com Authentication-Results: smtp.subspace.kernel.org; dkim=pass (2048-bit key) header.d=linbit-com.20230601.gappssmtp.com header.i=@linbit-com.20230601.gappssmtp.com header.b="BiFGLs1Y" Received: by mail-wr1-f49.google.com with SMTP id ffacd0b85a97d-439b6d9c981so1664069f8f.1 for ; Fri, 27 Mar 2026 15:39:25 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=linbit-com.20230601.gappssmtp.com; s=20230601; t=1774651164; x=1775255964; darn=vger.kernel.org; h=content-transfer-encoding:mime-version:references:in-reply-to :message-id:date:subject:cc:to:from:from:to:cc:subject:date :message-id:reply-to; bh=ODXFzIrGsdn395URUzdqQgk3vby0o5SzC6FBw/vsfLk=; b=BiFGLs1Y6Hn+rDLso1W89FZJ8Q3xGMomeXZBU9NVUPF4oons15yr2TTULniolPObKA 0syodWHOHHDMmud8e5f+PUy4qMVcFpAmQaaFIcri3prwKfAszlQ2y3bYjyBRAdLSMPjT 2DNX1YzuV+4iHZbhBmmUOqz28xYnVhIkhuvgvTryW2+LM4RLIO6k7aibqLLAb7Nwp/oX UIKnaK2MylFAvX46hbzvgKye8UaR4djKMsBUy/faMMwIzimdmzjdbrlUlQz+ph/xuiCP bLntyY10vIS6PoODn/RHNJozhEfQvgozBHtecfuM/Kiq1MRs08J6C/TSMDBheSVpNuSd K+6w== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20251104; t=1774651164; x=1775255964; h=content-transfer-encoding:mime-version:references:in-reply-to :message-id:date:subject:cc:to:from:x-gm-gg:x-gm-message-state:from :to:cc:subject:date:message-id:reply-to; bh=ODXFzIrGsdn395URUzdqQgk3vby0o5SzC6FBw/vsfLk=; b=WadLv+eYDylUeMy/qU2SNzxHAmt2uXhwr1ld7wkSS84a/v7+yy9hLZUmj/ir4A+OzF IexMJ8G1Cya/bcyf9shJGQ0lLb8tQKgA2uVTz1cRSfz1dl3uuwPpyfg9cL2RnLqART2+ Ix3s3BBnD92iZcon9bvlXq93w0mV9d2RTdTjoTI9Z4EGk4CvRQ4uHXFU8e8tolHoFhfI PKjkn8IcT9o6sQjalKnMB30QXWYA/yCxxFUe95l/fhiKVbdCDmfT6D95zDbmB6Ch4TUL 6be1M9F9c5fbjTfLAAFZJGCa2DatCPgX7hZBq2QK59Znb6WLJFS2aYxoPXNAz98Kz1SW Rktg== X-Forwarded-Encrypted: i=1; AJvYcCVVGfQAv4m1pMoHuEjxE9h0Znpin6InNDO9Ptqdrt4SsTTVYxDhx7hNFqB9Z9jBMvEbqcC7Vovzu7cqYg0=@vger.kernel.org X-Gm-Message-State: AOJu0Yx20wr8IzPdpEvboZwfgxqFFegIeMOZHY1W/a9vD51TSzfz/SJ3 5o064aLmX01RxZxiiieLVH/WC0vNnyrmnb5uPVvcGiifi42/kNABtbOo2xGImY9FF40= X-Gm-Gg: ATEYQzyqoUZYceFjUHusS9OfDXEPGC+Ujmrx9YaAMYNLGzUI98LQUoixXbVj32T5pj9 YMitykMEqeyLA7QJe9DVrBeV4+2w3HJTjvUxwaTk5k7jlUVfFSHYkrReNcyxIbH+Qraxonvah+F ZMlQIMqsvZgH6VWIqdnKNK1NIVlIsFc4+xnObizp2V6SzAEJvNqnIZh6H3vYnQQdgtoDKznZDvA 7WhS0/zzs/xloJbN7V4WQT3qL9i4Tw4mMdEPDH3CjcFev3w/aywaj3BUoZ12B7KBxuVpQfDzGxr uCcwONaYEP1TnlSXScR52bNT2TvNKimHtAMq0IsTILqY8FC/nzFrBhpLwX85MiM0JmylpdqWZj0 Ej7owArIv9sDef3Cvn72iOxzLffbPHO6HuwfmqZ0ZjvQdrY4sPCUVyk45u0pK+M+ZChiXWN2Mi9 kS9vvMJb0PzmQS9nmvCTqvr8lVFicBe4cMlpqE0CPJR9ssrUcppNBB1p2fPqRD1SX66oxMNDoax 6eUKeF3DzgGjUiCdLfOcBSIBLTLOody X-Received: by 2002:a05:6000:18a7:b0:439:ddc0:4bef with SMTP id ffacd0b85a97d-43b9eaad03amr7248314f8f.8.1774651162816; Fri, 27 Mar 2026 15:39:22 -0700 (PDT) Received: from localhost.localdomain (h082218028181.host.wavenet.at. [82.218.28.181]) by smtp.gmail.com with ESMTPSA id ffacd0b85a97d-43cf247079esm998990f8f.25.2026.03.27.15.39.18 (version=TLS1_3 cipher=TLS_AES_256_GCM_SHA384 bits=256/256); Fri, 27 Mar 2026 15:39:20 -0700 (PDT) From: =?UTF-8?q?Christoph=20B=C3=B6hmwalder?= To: Jens Axboe Cc: drbd-dev@lists.linbit.com, linux-kernel@vger.kernel.org, Lars Ellenberg , Philipp Reisner , linux-block@vger.kernel.org, =?UTF-8?q?Christoph=20B=C3=B6hmwalder?= , Joel Colledge Subject: [PATCH 16/20] drbd: rework module core for DRBD 9 transport and multi-peer Date: Fri, 27 Mar 2026 23:38:16 +0100 Message-ID: <20260327223820.2244227-17-christoph.boehmwalder@linbit.com> X-Mailer: git-send-email 2.53.0 In-Reply-To: <20260327223820.2244227-1-christoph.boehmwalder@linbit.com> References: <20260327223820.2244227-1-christoph.boehmwalder@linbit.com> Precedence: bulk X-Mailing-List: linux-kernel@vger.kernel.org List-Id: List-Subscribe: List-Unsubscribe: MIME-Version: 1.0 Content-Type: text/plain; charset="utf-8" Content-Transfer-Encoding: quoted-printable Rework drbd_main.c to align the module core with the DRBD 9 multi-peer architecture introduced by the surrounding header and transport commits. Refactor all packet sending around a page-based send buffer with explicit cork/uncork semantics driven by the transport layer, replacing the old per-socket static buffer and direct socket calls. Move the transfer log from per-connection to per-resource scope, and switch its traversal to RCU, allowing safe concurrent walks without the coarse req_lock spinlock. Rewrite UUID management for multi-peer: the fixed 4-slot layout is replaced by a per-device current UUID, per-peer bitmap UUIDs, and a history array. This enables DRBD 9 to track resyncs across more than one peer simultaneously. The on-disk metadata format is extended to match. Separate the resource and connection lifecycles so that resources and connections are created, torn down, and reference-counted independently, with threads scoped appropriately to each object. Add quorum-aware auto-promote semantics to the block device open/release path. Co-developed-by: Philipp Reisner Signed-off-by: Philipp Reisner Co-developed-by: Lars Ellenberg Signed-off-by: Lars Ellenberg Co-developed-by: Joel Colledge Signed-off-by: Joel Colledge Co-developed-by: Christoph B=C3=B6hmwalder Signed-off-by: Christoph B=C3=B6hmwalder --- drivers/block/drbd/drbd_main.c | 6008 ++++++++++++++++++++++---------- 1 file changed, 4180 insertions(+), 1828 deletions(-) diff --git a/drivers/block/drbd/drbd_main.c b/drivers/block/drbd/drbd_main.c index 200d464e984b..acce6c4b4a16 100644 --- a/drivers/block/drbd/drbd_main.c +++ b/drivers/block/drbd/drbd_main.c @@ -1,6 +1,6 @@ // SPDX-License-Identifier: GPL-2.0-only /* - drbd.c + drbd_main.c =20 This file is part of DRBD by Philipp Reisner and Lars Ellenberg. =20 @@ -19,41 +19,47 @@ #include #include #include -#include #include +#include #include #include -#include #include #include #include #include #include -#include +#include /* needed on kernels <4.3 */ #include #include #include -#include -#include #include -#include #include -#include +#include #include -#include +#include +#include +#include +#include =20 #include #include "drbd_int.h" #include "drbd_protocol.h" -#include "drbd_req.h" /* only for _req_mod in tl_release and tl_clear */ +#include "drbd_req.h" #include "drbd_vli.h" #include "drbd_debugfs.h" +#include "drbd_meta_data.h" +#include "drbd_legacy_84.h" +#include "drbd_dax_pmem.h" =20 -static DEFINE_MUTEX(drbd_main_mutex); -static int drbd_open(struct gendisk *disk, blk_mode_t mode); +static int drbd_open(struct gendisk *gd, blk_mode_t mode); static void drbd_release(struct gendisk *gd); static void md_sync_timer_fn(struct timer_list *t); static int w_bitmap_io(struct drbd_work *w, int unused); +static int flush_send_buffer(struct drbd_connection *connection, enum drbd= _stream drbd_stream); +static u64 __set_bitmap_slots(struct drbd_device *device, u64 bitmap_uuid,= u64 do_nodes); +static u64 __test_bitmap_slots(struct drbd_device *device); +static void drbd_send_ping_ack_wf(struct work_struct *ws); +static void __net_exit __drbd_net_exit(struct net *net); =20 MODULE_AUTHOR("Philipp Reisner , " "Lars Ellenberg "); @@ -63,16 +69,16 @@ MODULE_LICENSE("GPL"); MODULE_PARM_DESC(minor_count, "Approximate number of drbd devices (" __stringify(DRBD_MINOR_COUNT_MIN) "-" __stringify(DRBD_MINOR_COUNT_MAX)= ")"); MODULE_ALIAS_BLOCKDEV_MAJOR(DRBD_MAJOR); +MODULE_SOFTDEP("post: handshake"); =20 #include -/* thanks to these macros, if compiled into the kernel (not-module), - * these become boot parameters (e.g., drbd.minor_count) */ =20 #ifdef CONFIG_DRBD_FAULT_INJECTION int drbd_enable_faults; int drbd_fault_rate; static int drbd_fault_count; static int drbd_fault_devs; + /* bitmap of enabled faults */ module_param_named(enable_faults, drbd_enable_faults, int, 0664); /* fault rate % value - applies to all enabled faults */ @@ -84,15 +90,12 @@ module_param_named(fault_devs, drbd_fault_devs, int, 06= 44); #endif =20 /* module parameters we can keep static */ -static bool drbd_allow_oos; /* allow_open_on_secondary */ static bool drbd_disable_sendpage; +static bool drbd_allow_oos; /* allow_open_on_secondary */ MODULE_PARM_DESC(allow_oos, "DONT USE!"); -module_param_named(allow_oos, drbd_allow_oos, bool, 0); module_param_named(disable_sendpage, drbd_disable_sendpage, bool, 0644); +module_param_named(allow_oos, drbd_allow_oos, bool, 0); =20 -/* module parameters we share */ -int drbd_proc_details; /* Detail level in proc drbd*/ -module_param_named(proc_details, drbd_proc_details, int, 0644); /* module parameters shared with defaults */ unsigned int drbd_minor_count =3D DRBD_MINOR_COUNT_DEF; /* Module parameter for setting the user mode helper program @@ -101,16 +104,60 @@ char drbd_usermode_helper[80] =3D "/sbin/drbdadm"; module_param_named(minor_count, drbd_minor_count, uint, 0444); module_param_string(usermode_helper, drbd_usermode_helper, sizeof(drbd_use= rmode_helper), 0644); =20 +static int param_set_drbd_protocol_version(const char *s, const struct ker= nel_param *kp) +{ + unsigned long long tmp; + unsigned int *res =3D kp->arg; + int rv; + + rv =3D kstrtoull(s, 0, &tmp); + if (rv < 0) + return rv; + if (!drbd_protocol_version_acceptable(tmp)) + return -ERANGE; + *res =3D tmp; + return 0; +} + +#define param_check_drbd_protocol_version param_check_uint +#define param_get_drbd_protocol_version param_get_uint + +static const struct kernel_param_ops param_ops_drbd_protocol_version =3D { + .set =3D param_set_drbd_protocol_version, + .get =3D param_get_drbd_protocol_version, +}; + +unsigned int drbd_protocol_version_min =3D PRO_VERSION_8_MIN; +module_param_named(protocol_version_min, drbd_protocol_version_min, drbd_p= rotocol_version, 0644); +#define protocol_version_min_desc \ + "\n\t\tReject DRBD dialects older than this.\n\t\t" \ + "Supported: " \ + "DRBD 8 [" __stringify(PRO_VERSION_8_MIN) "-" __stringify(PRO_VERSION_8_M= AX) "]; " \ + "DRBD 9 [" __stringify(PRO_VERSION_MIN) "-" __stringify(PRO_VERSION_MAX) = "].\n\t\t" \ + "Default: " __stringify(PRO_VERSION_8_MIN) +MODULE_PARM_DESC(protocol_version_min, protocol_version_min_desc); + +#define param_check_drbd_strict_names param_check_bool +#define param_get_drbd_strict_names param_get_bool +const struct kernel_param_ops param_ops_drbd_strict_names =3D { + .set =3D param_set_drbd_strict_names, + .get =3D param_get_drbd_strict_names, +}; +bool drbd_strict_names =3D true; +MODULE_PARM_DESC(strict_names, "restrict resource and connection names to = ascii alnum and a subset of punct"); +module_param_named(strict_names, drbd_strict_names, drbd_strict_names, 064= 4); + /* in 2.6.x, our device mapping and config info contains our virtual gendi= sks * as member "struct gendisk *vdisk;" */ struct idr drbd_devices; struct list_head drbd_resources; -struct mutex resources_mutex; +static DEFINE_SPINLOCK(drbd_devices_lock); +DEFINE_MUTEX(resources_mutex); =20 +struct workqueue_struct *ping_ack_sender; struct kmem_cache *drbd_request_cache; struct kmem_cache *drbd_ee_cache; /* peer requests */ -struct kmem_cache *drbd_bm_ext_cache; /* bitmap extents */ struct kmem_cache *drbd_al_ext_cache; /* activity log extents */ mempool_t drbd_request_mempool; mempool_t drbd_ee_mempool; @@ -119,8 +166,6 @@ mempool_t drbd_buffer_page_pool; struct bio_set drbd_md_io_bio_set; struct bio_set drbd_io_bio_set; =20 -DEFINE_RATELIMIT_STATE(drbd_ratelimit_state, 5 * HZ, 5); - static const struct block_device_operations drbd_ops =3D { .owner =3D THIS_MODULE, .submit_bio =3D drbd_submit_bio, @@ -128,71 +173,241 @@ static const struct block_device_operations drbd_ops= =3D { .release =3D drbd_release, }; =20 -#ifdef __CHECKER__ -/* When checking with sparse, and this is an inline function, sparse will - give tons of false positives. When this is a real functions sparse work= s. - */ -int _get_ldev_if_state(struct drbd_device *device, enum drbd_disk_state mi= ns) +static struct pernet_operations drbd_pernet_ops =3D { + .exit =3D __drbd_net_exit, +}; + +struct drbd_connection *__drbd_next_connection_ref(u64 *visited, + struct drbd_connection *connection, + struct drbd_resource *resource) +{ + int node_id; + + rcu_read_lock(); + if (!connection) { + connection =3D list_first_or_null_rcu(&resource->connections, + struct drbd_connection, + connections); + *visited =3D 0; + } else { + struct list_head *pos; + bool previous_visible; /* on the resources connections list */ + + pos =3D list_next_rcu(&connection->connections); + /* follow the pointer first, then check if the previous element was + still an element on the list of visible connections. */ + smp_rmb(); + previous_visible =3D !test_bit(C_UNREGISTERED, &connection->flags); + + kref_put(&connection->kref, drbd_destroy_connection); + + if (pos =3D=3D &resource->connections) { + connection =3D NULL; + } else if (previous_visible) { /* visible -> we are now on a vital eleme= nt */ + connection =3D list_entry_rcu(pos, struct drbd_connection, connections); + } else { /* not visible -> pos might point to a dead element now */ + for_each_connection_rcu(connection, resource) { + node_id =3D connection->peer_node_id; + if (!(*visited & NODE_MASK(node_id))) + goto found; + } + connection =3D NULL; + } + } + + if (connection) { + found: + node_id =3D connection->peer_node_id; + *visited |=3D NODE_MASK(node_id); + + kref_get(&connection->kref); + } + + rcu_read_unlock(); + return connection; +} + + +struct drbd_peer_device *__drbd_next_peer_device_ref(u64 *visited, + struct drbd_peer_device *peer_device, + struct drbd_device *device) { - int io_allowed; + rcu_read_lock(); + if (!peer_device) { + peer_device =3D list_first_or_null_rcu(&device->peer_devices, + struct drbd_peer_device, + peer_devices); + *visited =3D 0; + } else { + struct list_head *pos; + bool previous_visible; + + pos =3D list_next_rcu(&peer_device->peer_devices); + smp_rmb(); + previous_visible =3D !test_bit(C_UNREGISTERED, &peer_device->connection-= >flags); + + kref_put(&peer_device->connection->kref, drbd_destroy_connection); =20 - atomic_inc(&device->local_cnt); - io_allowed =3D (device->state.disk >=3D mins); - if (!io_allowed) { - if (atomic_dec_and_test(&device->local_cnt)) - wake_up(&device->misc_wait); + if (pos =3D=3D &device->peer_devices) { + peer_device =3D NULL; + } else if (previous_visible) { + peer_device =3D list_entry_rcu(pos, struct drbd_peer_device, peer_devic= es); + } else { + for_each_peer_device_rcu(peer_device, device) { + if (!(*visited & NODE_MASK(peer_device->node_id))) + goto found; + } + peer_device =3D NULL; + } + } + + if (peer_device) { + found: + *visited |=3D NODE_MASK(peer_device->node_id); + + kref_get(&peer_device->connection->kref); } - return io_allowed; + + rcu_read_unlock(); + return peer_device; } =20 -#endif +static void dump_epoch(struct drbd_resource *resource, int node_id, int ep= och) +{ + struct drbd_request *req; + bool found_epoch =3D false; + + list_for_each_entry_rcu(req, &resource->transfer_log, tl_requests) { + if (!found_epoch && req->epoch =3D=3D epoch) + found_epoch =3D true; + + if (found_epoch) { + if (req->epoch !=3D epoch) + break; + drbd_info(req->device, "XXX %u %llu+%u 0x%x 0x%x\n", + req->epoch, + (unsigned long long)req->i.sector, req->i.size >> 9, + req->local_rq_state, req->net_rq_state[node_id] + ); + } + } +} =20 /** * tl_release() - mark as BARRIER_ACKED all requests in the corresponding = transfer log epoch * @connection: DRBD connection. + * @o_block_id: "block id" aka expected pointer address of the oldest requ= est + * @y_block_id: "block id" aka expected pointer address of the youngest re= quest + * confirmed to be on stable storage. * @barrier_nr: Expected identifier of the DRBD write barrier packet. - * @set_size: Expected number of requests before that barrier. + * @set_size: Expected number of requests before that barrier, respectively + * number of requests in the interval [o_block_id;y_block_id] + * + * Called for both P_BARRIER_ACK and P_CONFIRM_STABLE, + * which is similar to an unsolicited partial barrier ack. + * + * Either barrier_nr (for barrier acks) or both o_block_id and y_blockid (= for + * confirm stable) are given. For barrier acks, all requests in the epoch + * designated by "barrier_nr" are confirmed to be on stable storage. + * + * For confirm stable, both o_block_id and y_block_id are given, barrier_n= r is + * ignored, and all requests from "o_block_id" up to and including y_block= _id + * are confirmed to be on stable storage on the reporting peer. * * In case the passed barrier_nr or set_size does not match the oldest * epoch of not yet barrier-acked requests, this function will cause a * termination of the connection. */ -void tl_release(struct drbd_connection *connection, unsigned int barrier_n= r, +int tl_release(struct drbd_connection *connection, + uint64_t o_block_id, + uint64_t y_block_id, + unsigned int barrier_nr, unsigned int set_size) { + struct drbd_resource *resource =3D connection->resource; + const int idx =3D connection->peer_node_id; struct drbd_request *r; - struct drbd_request *req =3D NULL, *tmp =3D NULL; + struct drbd_request *req =3D NULL; + struct drbd_request *req_y =3D NULL; int expect_epoch =3D 0; int expect_size =3D 0; =20 - spin_lock_irq(&connection->resource->req_lock); - + rcu_read_lock(); /* find oldest not yet barrier-acked write request, * count writes in its epoch. */ - list_for_each_entry(r, &connection->transfer_log, tl_requests) { - const unsigned s =3D r->rq_state; + r =3D READ_ONCE(connection->req_not_net_done); + if (r =3D=3D NULL) { + drbd_err(connection, "BarrierAck #%u received, but req_not_net_done =3D = NULL\n", + barrier_nr); + goto bail; + } + smp_rmb(); /* paired with smp_wmb() in set_cache_ptr_if_null() */ + list_for_each_entry_from_rcu(r, &resource->transfer_log, tl_requests) { + unsigned int local_rq_state, net_rq_state; + + spin_lock_irq(&r->rq_lock); + local_rq_state =3D r->local_rq_state; + net_rq_state =3D r->net_rq_state[idx]; + spin_unlock_irq(&r->rq_lock); + if (!req) { - if (!(s & RQ_WRITE)) + if (!(local_rq_state & RQ_WRITE)) continue; - if (!(s & RQ_NET_MASK)) + if (!(net_rq_state & RQ_NET_MASK)) continue; - if (s & RQ_NET_DONE) + if (net_rq_state & RQ_NET_DONE) continue; req =3D r; expect_epoch =3D req->epoch; - expect_size ++; + expect_size++; } else { + const u16 s =3D r->net_rq_state[idx]; if (r->epoch !=3D expect_epoch) break; - if (!(s & RQ_WRITE)) + if (!(local_rq_state & RQ_WRITE)) continue; - /* if (s & RQ_DONE): not expected */ - /* if (!(s & RQ_NET_MASK)): not expected */ + /* probably a "send_out_of_sync", during Ahead/Behind mode, + * while at least one volume already started to resync again. + * Or a write that was not replicated during a resync, and + * replication has been enabled since it was submitted. + */ + if ((s & RQ_NET_MASK) && !(s & RQ_EXP_BARR_ACK)) + continue; + if (s & RQ_NET_DONE || (s & RQ_NET_MASK) =3D=3D 0) { + drbd_warn(connection, "unexpected state flags: 0x%x during BarrierAck = #%u\n", + s, barrier_nr); + } expect_size++; } + if (y_block_id && (struct drbd_request *)(unsigned long)y_block_id =3D= =3D r) { + req_y =3D r; + break; + } } =20 /* first some paranoia code */ + if (o_block_id) { + if ((struct drbd_request *)(unsigned long)o_block_id !=3D req) { + drbd_err(connection, "BAD! ConfirmedStable: expected %p, found %p\n", + (struct drbd_request *)(unsigned long)o_block_id, req); + goto bail; + } + if (!req_y) { + drbd_err(connection, "BAD! ConfirmedStable: expected youngest request %= p NOT found\n", + (struct drbd_req *)(unsigned long)y_block_id); + goto bail; + } + /* A P_CONFIRM_STABLE cannot tell me the to-be-expected barrier nr, + * it does not know it yet. But we just confirmed it knew the + * expected request, so just use that one. */ + barrier_nr =3D expect_epoch; + /* Both requests referenced must be in the same epoch. */ + if (req_y->epoch !=3D expect_epoch) { + drbd_err(connection, "BAD! ConfirmedStable: reported requests not in th= e same epoch (%u !=3D %u)\n", + req->epoch, req_y->epoch); + goto bail; + } + } if (req =3D=3D NULL) { drbd_err(connection, "BAD! BarrierAck #%u received, but no epoch in tl!?= \n", barrier_nr); @@ -205,111 +420,135 @@ void tl_release(struct drbd_connection *connection,= unsigned int barrier_nr, } =20 if (expect_size !=3D set_size) { - drbd_err(connection, "BAD! BarrierAck #%u received with n_writes=3D%u, e= xpected n_writes=3D%u!\n", - barrier_nr, set_size, expect_size); + if (!o_block_id) { + DEFINE_DYNAMIC_DEBUG_METADATA(ddm, "Bad barrier ack dump"); + + drbd_err(connection, "BAD! BarrierAck #%u received with n_writes=3D%u, = expected n_writes=3D%u!\n", + barrier_nr, set_size, expect_size); + + if (DYNAMIC_DEBUG_BRANCH(ddm)) + dump_epoch(resource, connection->peer_node_id, expect_epoch); + } else + drbd_err(connection, "BAD! ConfirmedStable [%p,%p] received with n_writ= es=3D%u, expected n_writes=3D%u!\n", + req, req_y, set_size, expect_size); goto bail; } =20 /* Clean up list of requests processed during current epoch. */ - /* this extra list walk restart is paranoia, - * to catch requests being barrier-acked "unexpectedly". - * It usually should find the same req again, or some READ preceding it. = */ - list_for_each_entry(req, &connection->transfer_log, tl_requests) - if (req->epoch =3D=3D expect_epoch) { - tmp =3D req; - break; - } - req =3D list_prepare_entry(tmp, &connection->transfer_log, tl_requests); - list_for_each_entry_safe_from(req, r, &connection->transfer_log, tl_reque= sts) { + list_for_each_entry_from_rcu(req, &resource->transfer_log, tl_requests) { struct drbd_peer_device *peer_device; + if (req->epoch !=3D expect_epoch) break; peer_device =3D conn_peer_device(connection, req->device->vnr); - _req_mod(req, BARRIER_ACKED, peer_device); + req_mod(req, BARRIER_ACKED, peer_device); + if (req =3D=3D req_y) + break; + } + rcu_read_unlock(); + + /* urgently flush out peer acks for P_CONFIRM_STABLE */ + if (req_y) { + drbd_flush_peer_acks(resource); + } else if (barrier_nr =3D=3D connection->send.last_sent_epoch_nr) { + clear_bit(BARRIER_ACK_PENDING, &connection->flags); + wake_up(&resource->barrier_wait); } - spin_unlock_irq(&connection->resource->req_lock); =20 - return; + return 0; =20 bail: - spin_unlock_irq(&connection->resource->req_lock); - conn_request_state(connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD); + rcu_read_unlock(); + return -EPROTO; } =20 =20 /** - * _tl_restart() - Walks the transfer log, and applies an action to all re= quests - * @connection: DRBD connection to operate on. + * __tl_walk() - Walks the transfer log, and applies an action to all requ= ests + * @resource: DRBD resource to opterate on + * @connection: DRBD connection to operate on + * @from_req: If set, the walk starts from the request that this points= to * @what: The action/event to perform with all request objects * - * @what might be one of CONNECTION_LOST_WHILE_PENDING, RESEND, FAIL_FROZE= N_DISK_IO, - * RESTART_FROZEN_DISK_IO. + * @what might be one of CONNECTION_LOST, CONNECTION_LOST_WHILE_SUSPENDED, + * RESEND, CANCEL_SUSPENDED_IO, COMPLETION_RESUMED. */ -/* must hold resource->req_lock */ -void _tl_restart(struct drbd_connection *connection, enum drbd_req_event w= hat) +void __tl_walk(struct drbd_resource *const resource, + struct drbd_connection *const connection, + struct drbd_request **from_req, + const enum drbd_req_event what) { struct drbd_peer_device *peer_device; - struct drbd_request *req, *r; + struct drbd_request *req =3D NULL; =20 - list_for_each_entry_safe(req, r, &connection->transfer_log, tl_requests) { - peer_device =3D conn_peer_device(connection, req->device->vnr); + rcu_read_lock(); + if (from_req) + req =3D READ_ONCE(*from_req); + if (!req) + req =3D list_entry_rcu(resource->transfer_log.next, struct drbd_request,= tl_requests); + smp_rmb(); /* paired with smp_wmb() in set_cache_ptr_if_null() */ + list_for_each_entry_from_rcu(req, &resource->transfer_log, tl_requests) { + /* Skip if the request has already been destroyed. */ + if (!kref_get_unless_zero(&req->kref)) + continue; + + peer_device =3D connection =3D=3D NULL ? NULL : + conn_peer_device(connection, req->device->vnr); _req_mod(req, what, peer_device); + kref_put(&req->kref, drbd_req_destroy); } + rcu_read_unlock(); } =20 -void tl_restart(struct drbd_connection *connection, enum drbd_req_event wh= at) +void tl_walk(struct drbd_connection *connection, struct drbd_request **fro= m_req, enum drbd_req_event what) { - spin_lock_irq(&connection->resource->req_lock); - _tl_restart(connection, what); - spin_unlock_irq(&connection->resource->req_lock); -} + struct drbd_resource *resource =3D connection->resource; =20 -/** - * tl_clear() - Clears all requests and &struct drbd_tl_epoch objects out = of the TL - * @connection: DRBD connection. - * - * This is called after the connection to the peer was lost. The storage c= overed - * by the requests on the transfer gets marked as our of sync. Called from= the - * receiver thread and the worker thread. - */ -void tl_clear(struct drbd_connection *connection) -{ - tl_restart(connection, CONNECTION_LOST_WHILE_PENDING); + read_lock_irq(&resource->state_rwlock); + __tl_walk(connection->resource, connection, from_req, what); + read_unlock_irq(&resource->state_rwlock); } =20 /** * tl_abort_disk_io() - Abort disk I/O for all requests for a certain devi= ce in the TL - * @device: DRBD device. + * @device: DRBD device. */ void tl_abort_disk_io(struct drbd_device *device) { - struct drbd_connection *connection =3D first_peer_device(device)->connect= ion; - struct drbd_request *req, *r; + struct drbd_resource *resource =3D device->resource; + struct drbd_request *req; =20 - spin_lock_irq(&connection->resource->req_lock); - list_for_each_entry_safe(req, r, &connection->transfer_log, tl_requests) { - if (!(req->rq_state & RQ_LOCAL_PENDING)) + rcu_read_lock(); + list_for_each_entry_rcu(req, &resource->transfer_log, tl_requests) { + if (!(READ_ONCE(req->local_rq_state) & RQ_LOCAL_PENDING)) continue; if (req->device !=3D device) continue; - _req_mod(req, ABORT_DISK_IO, NULL); + /* Skip if the request has already been destroyed. */ + if (!kref_get_unless_zero(&req->kref)) + continue; + + req_mod(req, ABORT_DISK_IO, NULL); + kref_put(&req->kref, drbd_req_destroy); } - spin_unlock_irq(&connection->resource->req_lock); + rcu_read_unlock(); } =20 static int drbd_thread_setup(void *arg) { struct drbd_thread *thi =3D (struct drbd_thread *) arg; struct drbd_resource *resource =3D thi->resource; + struct drbd_connection *connection =3D thi->connection; unsigned long flags; int retval; =20 - snprintf(current->comm, sizeof(current->comm), "drbd_%c_%s", - thi->name[0], - resource->name); - allow_kernel_signal(DRBD_SIGKILL); allow_kernel_signal(SIGXCPU); + + if (connection) + kref_get(&connection->kref); + else + kref_get(&resource->kref); restart: retval =3D thi->function(thi); =20 @@ -326,26 +565,33 @@ static int drbd_thread_setup(void *arg) */ =20 if (thi->t_state =3D=3D RESTARTING) { - drbd_info(resource, "Restarting %s thread\n", thi->name); + if (connection) + drbd_info(connection, "Restarting %s thread\n", thi->name); + else + drbd_info(resource, "Restarting %s thread\n", thi->name); thi->t_state =3D RUNNING; spin_unlock_irqrestore(&thi->t_lock, flags); + flush_signals(current); /* likely it got a signal to look at t_state... = */ goto restart; } =20 thi->task =3D NULL; thi->t_state =3D NONE; smp_mb(); - complete_all(&thi->stop); - spin_unlock_irqrestore(&thi->t_lock, flags); =20 - drbd_info(resource, "Terminating %s\n", current->comm); + if (connection) + drbd_info(connection, "Terminating %s thread\n", thi->name); + else + drbd_info(resource, "Terminating %s thread\n", thi->name); =20 - /* Release mod reference taken when thread was started */ + complete(&thi->stop); + spin_unlock_irqrestore(&thi->t_lock, flags); + + if (connection) + kref_put(&connection->kref, drbd_destroy_connection); + else + kref_put(&resource->kref, drbd_destroy_resource); =20 - if (thi->connection) - kref_put(&thi->connection->kref, drbd_destroy_connection); - kref_put(&resource->kref, drbd_destroy_resource); - module_put(THIS_MODULE); return retval; } =20 @@ -364,6 +610,7 @@ static void drbd_thread_init(struct drbd_resource *reso= urce, struct drbd_thread int drbd_thread_start(struct drbd_thread *thi) { struct drbd_resource *resource =3D thi->resource; + struct drbd_connection *connection =3D thi->connection; struct task_struct *nt; unsigned long flags; =20 @@ -373,36 +620,29 @@ int drbd_thread_start(struct drbd_thread *thi) =20 switch (thi->t_state) { case NONE: - drbd_info(resource, "Starting %s thread (from %s [%d])\n", - thi->name, current->comm, current->pid); - - /* Get ref on module for thread - this is released when thread exits */ - if (!try_module_get(THIS_MODULE)) { - drbd_err(resource, "Failed to get module reference in drbd_thread_start= \n"); - spin_unlock_irqrestore(&thi->t_lock, flags); - return false; - } - - kref_get(&resource->kref); - if (thi->connection) - kref_get(&thi->connection->kref); + if (connection) + drbd_info(connection, "Starting %s thread (peer-node-id %d)\n", + thi->name, connection->peer_node_id); + else + drbd_info(resource, "Starting %s thread (node-id %d)\n", + thi->name, resource->res_opts.node_id); =20 init_completion(&thi->stop); + D_ASSERT(resource, thi->task =3D=3D NULL); thi->reset_cpu_mask =3D 1; thi->t_state =3D RUNNING; spin_unlock_irqrestore(&thi->t_lock, flags); flush_signals(current); /* otherw. may get -ERESTARTNOINTR */ =20 nt =3D kthread_create(drbd_thread_setup, (void *) thi, - "drbd_%c_%s", thi->name[0], thi->resource->name); + "drbd_%c_%s", thi->name[0], resource->name); =20 if (IS_ERR(nt)) { - drbd_err(resource, "Couldn't start thread\n"); + if (connection) + drbd_err(connection, "Couldn't start thread: %ld\n", PTR_ERR(nt)); + else + drbd_err(resource, "Couldn't start thread: %ld\n", PTR_ERR(nt)); =20 - if (thi->connection) - kref_put(&thi->connection->kref, drbd_destroy_connection); - kref_put(&resource->kref, drbd_destroy_resource); - module_put(THIS_MODULE); return false; } spin_lock_irqsave(&thi->t_lock, flags); @@ -413,8 +653,10 @@ int drbd_thread_start(struct drbd_thread *thi) break; case EXITING: thi->t_state =3D RESTARTING; - drbd_info(resource, "Restarting %s thread (from %s [%d])\n", - thi->name, current->comm, current->pid); + if (connection) + drbd_info(connection, "Restarting %s thread\n", thi->name); + else + drbd_info(resource, "Restarting %s thread\n", thi->name); fallthrough; case RUNNING: case RESTARTING: @@ -443,6 +685,12 @@ void _drbd_thread_stop(struct drbd_thread *thi, int re= start, int wait) return; } =20 + if (thi->t_state =3D=3D EXITING && ns =3D=3D RESTARTING) { + /* Do not abort a stop request, otherwise a waiter might never wake up */ + spin_unlock_irqrestore(&thi->t_lock, flags); + return; + } + if (thi->t_state !=3D ns) { if (thi->task =3D=3D NULL) { spin_unlock_irqrestore(&thi->t_lock, flags); @@ -455,7 +703,6 @@ void _drbd_thread_stop(struct drbd_thread *thi, int res= tart, int wait) if (thi->task !=3D current) send_sig(DRBD_SIGKILL, thi->task, 1); } - spin_unlock_irqrestore(&thi->t_lock, flags); =20 if (wait) @@ -473,8 +720,7 @@ static void drbd_calc_cpu_mask(cpumask_var_t *cpu_mask) { unsigned int *resources_per_cpu, min_index =3D ~0; =20 - resources_per_cpu =3D kcalloc(nr_cpu_ids, sizeof(*resources_per_cpu), - GFP_KERNEL); + resources_per_cpu =3D kzalloc(nr_cpu_ids * sizeof(*resources_per_cpu), GF= P_KERNEL); if (resources_per_cpu) { struct drbd_resource *resource; unsigned int cpu, min =3D ~0; @@ -521,6 +767,46 @@ void drbd_thread_current_set_cpu(struct drbd_thread *t= hi) #define drbd_calc_cpu_mask(A) ({}) #endif =20 +static bool drbd_all_neighbor_secondary(struct drbd_device *device, u64 *a= uthoritative_ptr) +{ + struct drbd_peer_device *peer_device; + bool all_secondary =3D true; + u64 authoritative =3D 0; + int id; + + rcu_read_lock(); + for_each_peer_device_rcu(peer_device, device) { + if (peer_device->repl_state[NOW] >=3D L_ESTABLISHED && + peer_device->connection->peer_role[NOW] =3D=3D R_PRIMARY) { + all_secondary =3D false; + id =3D peer_device->node_id; + authoritative |=3D NODE_MASK(id); + } + } + rcu_read_unlock(); + if (authoritative_ptr) + *authoritative_ptr =3D authoritative; + return all_secondary; +} + +/* This function is supposed to have the same semantics as calc_device_sta= ble() in drbd_state.c + A primary is stable since it is authoritative. + Unstable are neighbors of a primary and resync target nodes. + Nodes further away from a primary are stable! */ +bool drbd_device_stable(struct drbd_device *device, u64 *authoritative_ptr) +{ + struct drbd_resource *resource =3D device->resource; + bool device_stable =3D true; + + if (resource->role[NOW] =3D=3D R_PRIMARY) + return true; + + if (!drbd_all_neighbor_secondary(device, authoritative_ptr)) + return false; + + return device_stable; +} + /* * drbd_header_size - size of a packet header * @@ -532,177 +818,370 @@ unsigned int drbd_header_size(struct drbd_connectio= n *connection) { if (connection->agreed_pro_version >=3D 100) { BUILD_BUG_ON(!IS_ALIGNED(sizeof(struct p_header100), 8)); - return sizeof(struct p_header100); + return sizeof(struct p_header100); /* 16 */ } else { BUILD_BUG_ON(sizeof(struct p_header80) !=3D sizeof(struct p_header95)); BUILD_BUG_ON(!IS_ALIGNED(sizeof(struct p_header80), 8)); - return sizeof(struct p_header80); + return sizeof(struct p_header80); /* 8 */ } } =20 -static unsigned int prepare_header80(struct p_header80 *h, enum drbd_packe= t cmd, int size) +static void prepare_header80(struct p_header80 *h, enum drbd_packet cmd, i= nt size) { h->magic =3D cpu_to_be32(DRBD_MAGIC); h->command =3D cpu_to_be16(cmd); - h->length =3D cpu_to_be16(size); - return sizeof(struct p_header80); + h->length =3D cpu_to_be16(size - sizeof(struct p_header80)); } =20 -static unsigned int prepare_header95(struct p_header95 *h, enum drbd_packe= t cmd, int size) +static void prepare_header95(struct p_header95 *h, enum drbd_packet cmd, i= nt size) { h->magic =3D cpu_to_be16(DRBD_MAGIC_BIG); h->command =3D cpu_to_be16(cmd); - h->length =3D cpu_to_be32(size); - return sizeof(struct p_header95); + h->length =3D cpu_to_be32(size - sizeof(struct p_header95)); } =20 -static unsigned int prepare_header100(struct p_header100 *h, enum drbd_pac= ket cmd, +static void prepare_header100(struct p_header100 *h, enum drbd_packet cmd, int size, int vnr) { h->magic =3D cpu_to_be32(DRBD_MAGIC_100); h->volume =3D cpu_to_be16(vnr); h->command =3D cpu_to_be16(cmd); - h->length =3D cpu_to_be32(size); + h->length =3D cpu_to_be32(size - sizeof(struct p_header100)); h->pad =3D 0; - return sizeof(struct p_header100); } =20 -static unsigned int prepare_header(struct drbd_connection *connection, int= vnr, - void *buffer, enum drbd_packet cmd, int size) +static void prepare_header(struct drbd_connection *connection, int vnr, + void *buffer, enum drbd_packet cmd, int size) { if (connection->agreed_pro_version >=3D 100) - return prepare_header100(buffer, cmd, size, vnr); + prepare_header100(buffer, cmd, size, vnr); else if (connection->agreed_pro_version >=3D 95 && size > DRBD_MAX_SIZE_H80_PACKET) - return prepare_header95(buffer, cmd, size); + prepare_header95(buffer, cmd, size); else - return prepare_header80(buffer, cmd, size); + prepare_header80(buffer, cmd, size); +} + +static void new_or_recycle_send_buffer_page(struct drbd_send_buffer *sbuf) +{ + while (1) { + struct page *page; + int count =3D page_count(sbuf->page); + + BUG_ON(count =3D=3D 0); + if (count =3D=3D 1) + goto have_page; + + page =3D alloc_page(GFP_NOIO | __GFP_NORETRY | __GFP_NOWARN); + if (page) { + put_page(sbuf->page); + sbuf->page =3D page; + goto have_page; + } + + schedule_timeout_uninterruptible(HZ / 10); + } +have_page: + sbuf->unsent =3D + sbuf->pos =3D page_address(sbuf->page); +} + +static char * __must_check alloc_send_buffer(struct drbd_connection *conne= ction, int size, + enum drbd_stream drbd_stream) +{ + struct drbd_send_buffer *sbuf =3D &connection->send_buffer[drbd_stream]; + char *page_start =3D page_address(sbuf->page); + int err; + + if (sbuf->pos - page_start + size > PAGE_SIZE) { + err =3D flush_send_buffer(connection, drbd_stream); + if (err) + return ERR_PTR(err); + new_or_recycle_send_buffer_page(sbuf); + } + + sbuf->allocated_size =3D size; + sbuf->additional_size =3D 0; + + return sbuf->pos; +} + +/* If we called alloc_send_buffer(), possibly indirectly via __conn_prepar= e_command(), + * but then decide that we actually don't want to use it. + */ +static void cancel_send_buffer(struct drbd_connection *connection, + enum drbd_stream drbd_stream) +{ + connection->send_buffer[drbd_stream].allocated_size =3D 0; +} + +/* Only used the shrink the previously allocated size. */ +static void resize_prepared_command(struct drbd_connection *connection, + enum drbd_stream drbd_stream, + int size) +{ + connection->send_buffer[drbd_stream].allocated_size =3D + size + drbd_header_size(connection); } =20 -static void *__conn_prepare_command(struct drbd_connection *connection, - struct drbd_socket *sock) +static void additional_size_command(struct drbd_connection *connection, + enum drbd_stream drbd_stream, + int additional_size) { - if (!sock->socket) + connection->send_buffer[drbd_stream].additional_size =3D additional_size; +} + +void *__conn_prepare_command(struct drbd_connection *connection, int size, + enum drbd_stream drbd_stream) +{ + struct drbd_transport *transport =3D &connection->transport; + int header_size; + void *p; + + if (connection->cstate[NOW] < C_CONNECTING) + return NULL; + + if (!transport->class->ops.stream_ok(transport, drbd_stream)) + return NULL; + + header_size =3D drbd_header_size(connection); + p =3D alloc_send_buffer(connection, header_size + size, drbd_stream) + he= ader_size; + if (IS_ERR(p)) return NULL; - return sock->sbuf + drbd_header_size(connection); + return p; } =20 -void *conn_prepare_command(struct drbd_connection *connection, struct drbd= _socket *sock) +/** + * conn_prepare_command() - Allocate a send buffer for a packet/command + * @connection: the connections the packet will be sent through + * @size: number of bytes to allocate + * @drbd_stream: DATA_STREAM or CONTROL_STREAM + * + * This allocates a buffer with capacity to hold the header, and + * the requested size. Upon success is return a pointer that points + * to the first byte behind the header. The caller is expected to + * call xxx_send_command() soon. + */ +void *conn_prepare_command(struct drbd_connection *connection, int size, + enum drbd_stream drbd_stream) { void *p; =20 - mutex_lock(&sock->mutex); - p =3D __conn_prepare_command(connection, sock); + mutex_lock(&connection->mutex[drbd_stream]); + p =3D __conn_prepare_command(connection, size, drbd_stream); if (!p) - mutex_unlock(&sock->mutex); + mutex_unlock(&connection->mutex[drbd_stream]); =20 return p; } =20 -void *drbd_prepare_command(struct drbd_peer_device *peer_device, struct dr= bd_socket *sock) +/** + * drbd_prepare_command() - Allocate a send buffer for a packet/command + * @peer_device: the DRBD peer device the packet will be sent to + * @size: number of bytes to allocate + * @drbd_stream: DATA_STREAM or CONTROL_STREAM + * + * This allocates a buffer with capacity to hold the header, and + * the requested size. Upon success is return a pointer that points + * to the first byte behind the header. The caller is expected to + * call xxx_send_command() soon. + */ +void *drbd_prepare_command(struct drbd_peer_device *peer_device, int size,= enum drbd_stream drbd_stream) { - return conn_prepare_command(peer_device->connection, sock); + return conn_prepare_command(peer_device->connection, size, drbd_stream); } =20 -static int __send_command(struct drbd_connection *connection, int vnr, - struct drbd_socket *sock, enum drbd_packet cmd, - unsigned int header_size, void *data, - unsigned int size) +static int flush_send_buffer(struct drbd_connection *connection, enum drbd= _stream drbd_stream) { - int msg_flags; + struct drbd_send_buffer *sbuf =3D &connection->send_buffer[drbd_stream]; + struct drbd_transport *transport =3D &connection->transport; + struct drbd_transport_ops *tr_ops =3D &transport->class->ops; + unsigned int flags, offset, size; int err; =20 - /* - * Called with @data =3D=3D NULL and the size of the data blocks in @size - * for commands that send data blocks. For those commands, omit the - * MSG_MORE flag: this will increase the likelihood that data blocks - * which are page aligned on the sender will end up page aligned on the - * receiver. - */ - msg_flags =3D data ? MSG_MORE : 0; - - header_size +=3D prepare_header(connection, vnr, sock->sbuf, cmd, - header_size + size); - err =3D drbd_send_all(connection, sock->socket, sock->sbuf, header_size, - msg_flags); - if (data && !err) - err =3D drbd_send_all(connection, sock->socket, data, size, 0); - /* DRBD protocol "pings" are latency critical. - * This is supposed to trigger tcp_push_pending_frames() */ - if (!err && (cmd =3D=3D P_PING || cmd =3D=3D P_PING_ACK)) - tcp_sock_set_nodelay(sock->socket->sk); + size =3D sbuf->pos - sbuf->unsent + sbuf->allocated_size; + if (size =3D=3D 0) + return 0; + + if (drbd_stream =3D=3D CONTROL_STREAM) { + connection->ctl_packets++; + if (check_add_overflow(connection->ctl_bytes, size, &connection->ctl_byt= es)) { + connection->ctl_bytes =3D size; + connection->ctl_packets =3D 1; + } + } + + if (drbd_stream =3D=3D DATA_STREAM) { + rcu_read_lock(); + connection->transport.ko_count =3D rcu_dereference(connection->transport= .net_conf)->ko_count; + rcu_read_unlock(); + } + + flags =3D (connection->cstate[NOW] < C_CONNECTING ? MSG_DONTWAIT : 0) | + (sbuf->additional_size ? MSG_MORE : 0); + offset =3D sbuf->unsent - (char *)page_address(sbuf->page); + err =3D tr_ops->send_page(transport, drbd_stream, sbuf->page, offset, siz= e, flags); + if (err) { + change_cstate(connection, C_NETWORK_FAILURE, CS_HARD); + } else { + sbuf->unsent =3D + sbuf->pos +=3D sbuf->allocated_size; /* send buffer submitted! */ + } + + sbuf->allocated_size =3D 0; =20 return err; } =20 -static int __conn_send_command(struct drbd_connection *connection, struct = drbd_socket *sock, - enum drbd_packet cmd, unsigned int header_size, - void *data, unsigned int size) +/* + * SFLAG_FLUSH makes sure the packet (and everything queued in front + * of it) gets sent immediately independently if it is currently + * corked. + * + * This is used for P_PING, P_PING_ACK, P_TWOPC_PREPARE, P_TWOPC_ABORT, + * P_TWOPC_YES, P_TWOPC_NO, P_TWOPC_RETRY and P_TWOPC_COMMIT. + * + * This quirk is necessary because it is corked while the worker + * thread processes work items. When it stops processing items, it + * uncorks. That works perfectly to coalesce ack packets etc.. + * A work item doing two-phase commits needs to override that behavior. + */ +#define SFLAG_FLUSH 0x10 +#define DRBD_STREAM_FLAGS (SFLAG_FLUSH) + +static inline enum drbd_stream extract_stream(int stream_and_flags) { - return __send_command(connection, 0, sock, cmd, header_size, data, size); + return stream_and_flags & ~DRBD_STREAM_FLAGS; } =20 -int conn_send_command(struct drbd_connection *connection, struct drbd_sock= et *sock, - enum drbd_packet cmd, unsigned int header_size, - void *data, unsigned int size) +int __send_command(struct drbd_connection *connection, int vnr, + enum drbd_packet cmd, int stream_and_flags) { + enum drbd_stream drbd_stream =3D extract_stream(stream_and_flags); + struct drbd_send_buffer *sbuf =3D &connection->send_buffer[drbd_stream]; + struct drbd_transport *transport =3D &connection->transport; + struct drbd_transport_ops *tr_ops =3D &transport->class->ops; + /* CORKED + drbd_stream is either DATA_CORKED or CONTROL_CORKED */ + bool corked =3D test_bit(CORKED + drbd_stream, &connection->flags); + bool flush =3D stream_and_flags & SFLAG_FLUSH; int err; =20 - err =3D __conn_send_command(connection, sock, cmd, header_size, data, siz= e); - mutex_unlock(&sock->mutex); + if (connection->cstate[NOW] < C_CONNECTING) + return -EIO; + prepare_header(connection, vnr, sbuf->pos, cmd, + sbuf->allocated_size + sbuf->additional_size); + + if (corked && !flush) { + sbuf->pos +=3D sbuf->allocated_size; + sbuf->allocated_size =3D 0; + err =3D 0; + } else { + err =3D flush_send_buffer(connection, drbd_stream); + + /* DRBD protocol "pings" are latency critical. + * This is supposed to trigger tcp_push_pending_frames() */ + if (!err && flush) + tr_ops->hint(transport, drbd_stream, NODELAY); + + } + return err; } =20 -int drbd_send_command(struct drbd_peer_device *peer_device, struct drbd_so= cket *sock, - enum drbd_packet cmd, unsigned int header_size, - void *data, unsigned int size) +void drbd_cork(struct drbd_connection *connection, enum drbd_stream stream) { + struct drbd_transport *transport =3D &connection->transport; + struct drbd_transport_ops *tr_ops =3D &transport->class->ops; + + mutex_lock(&connection->mutex[stream]); + set_bit(CORKED + stream, &connection->flags); + /* only call into transport, if we expect it to work */ + if (connection->cstate[NOW] >=3D C_CONNECTING) + tr_ops->hint(transport, stream, CORK); + mutex_unlock(&connection->mutex[stream]); +} + +int drbd_uncork(struct drbd_connection *connection, enum drbd_stream strea= m) +{ + struct drbd_transport *transport =3D &connection->transport; + struct drbd_transport_ops *tr_ops =3D &transport->class->ops; int err; =20 - err =3D __send_command(peer_device->connection, peer_device->device->vnr, - sock, cmd, header_size, data, size); - mutex_unlock(&sock->mutex); + mutex_lock(&connection->mutex[stream]); + err =3D flush_send_buffer(connection, stream); + if (!err) { + clear_bit(CORKED + stream, &connection->flags); + /* only call into transport, if we expect it to work */ + if (connection->cstate[NOW] >=3D C_CONNECTING) + tr_ops->hint(transport, stream, UNCORK); + } + mutex_unlock(&connection->mutex[stream]); return err; } =20 -int drbd_send_ping(struct drbd_connection *connection) +int send_command(struct drbd_connection *connection, int vnr, + enum drbd_packet cmd, int stream_and_flags) { - struct drbd_socket *sock; + enum drbd_stream drbd_stream =3D extract_stream(stream_and_flags); + int err; =20 - sock =3D &connection->meta; - if (!conn_prepare_command(connection, sock)) - return -EIO; - return conn_send_command(connection, sock, P_PING, 0, NULL, 0); + err =3D __send_command(connection, vnr, cmd, stream_and_flags); + mutex_unlock(&connection->mutex[drbd_stream]); + return err; } =20 -int drbd_send_ping_ack(struct drbd_connection *connection) +int drbd_send_command(struct drbd_peer_device *peer_device, + enum drbd_packet cmd, enum drbd_stream drbd_stream) { - struct drbd_socket *sock; + return send_command(peer_device->connection, peer_device->device->vnr, + cmd, drbd_stream); +} =20 - sock =3D &connection->meta; - if (!conn_prepare_command(connection, sock)) +int drbd_send_ping(struct drbd_connection *connection) +{ + if (!conn_prepare_command(connection, 0, CONTROL_STREAM)) return -EIO; - return conn_send_command(connection, sock, P_PING_ACK, 0, NULL, 0); + return send_command(connection, -1, P_PING, CONTROL_STREAM | SFLAG_FLUSH); } =20 -int drbd_send_sync_param(struct drbd_peer_device *peer_device) +void drbd_send_ping_ack_wf(struct work_struct *ws) { - struct drbd_socket *sock; - struct p_rs_param_95 *p; - int size; - const int apv =3D peer_device->connection->agreed_pro_version; - enum drbd_packet cmd; - struct net_conf *nc; - struct disk_conf *dc; - - sock =3D &peer_device->connection->data; - p =3D drbd_prepare_command(peer_device, sock); - if (!p) - return -EIO; + struct drbd_connection *connection =3D + container_of(ws, struct drbd_connection, send_ping_ack_work); + int err; =20 - rcu_read_lock(); - nc =3D rcu_dereference(peer_device->connection->net_conf); + err =3D conn_prepare_command(connection, 0, CONTROL_STREAM) ? 0 : -EIO; + if (!err) + err =3D send_command(connection, -1, P_PING_ACK, CONTROL_STREAM | SFLAG_= FLUSH); + if (err) + change_cstate(connection, C_NETWORK_FAILURE, CS_HARD); +} + +int drbd_send_peer_ack(struct drbd_connection *connection, u64 mask, u64 d= agtag_sector) +{ + struct p_peer_ack *p; + + p =3D conn_prepare_command(connection, sizeof(*p), CONTROL_STREAM); + if (!p) + return -EIO; + p->mask =3D cpu_to_be64(mask); + p->dagtag =3D cpu_to_be64(dagtag_sector); + + return send_command(connection, -1, P_PEER_ACK, CONTROL_STREAM); +} + +int drbd_send_sync_param(struct drbd_peer_device *peer_device) +{ + struct p_rs_param_95 *p; + int size; + const int apv =3D peer_device->connection->agreed_pro_version; + enum drbd_packet cmd; + struct net_conf *nc; + struct peer_device_conf *pdc; + + rcu_read_lock(); + nc =3D rcu_dereference(peer_device->connection->transport.net_conf); =20 size =3D apv <=3D 87 ? sizeof(struct p_rs_param) : apv =3D=3D 88 ? sizeof(struct p_rs_param) @@ -711,18 +1190,30 @@ int drbd_send_sync_param(struct drbd_peer_device *pe= er_device) : /* apv >=3D 95 */ sizeof(struct p_rs_param_95); =20 cmd =3D apv >=3D 89 ? P_SYNC_PARAM89 : P_SYNC_PARAM; + rcu_read_unlock(); + + p =3D drbd_prepare_command(peer_device, size, DATA_STREAM); + if (!p) + return -EIO; =20 /* initialize verify_alg and csums_alg */ - BUILD_BUG_ON(sizeof(p->algs) !=3D 2 * SHARED_SECRET_MAX); - memset(&p->algs, 0, sizeof(p->algs)); + memset(p->verify_alg, 0, sizeof(p->verify_alg)); + memset(p->csums_alg, 0, sizeof(p->csums_alg)); + + rcu_read_lock(); + nc =3D rcu_dereference(peer_device->connection->transport.net_conf); =20 if (get_ldev(peer_device->device)) { - dc =3D rcu_dereference(peer_device->device->ldev->disk_conf); - p->resync_rate =3D cpu_to_be32(dc->resync_rate); - p->c_plan_ahead =3D cpu_to_be32(dc->c_plan_ahead); - p->c_delay_target =3D cpu_to_be32(dc->c_delay_target); - p->c_fill_target =3D cpu_to_be32(dc->c_fill_target); - p->c_max_rate =3D cpu_to_be32(dc->c_max_rate); + pdc =3D rcu_dereference(peer_device->conf); + /* These values will be ignored by peers running DRBD 9.2+, but + * we have to send something, so send the real values. We + * cannot omit the entire packet because we must verify that + * the algorithms match. */ + p->resync_rate =3D cpu_to_be32(pdc->resync_rate); + p->c_plan_ahead =3D cpu_to_be32(pdc->c_plan_ahead); + p->c_delay_target =3D cpu_to_be32(pdc->c_delay_target); + p->c_fill_target =3D cpu_to_be32(pdc->c_fill_target); + p->c_max_rate =3D cpu_to_be32(pdc->c_max_rate); put_ldev(peer_device->device); } else { p->resync_rate =3D cpu_to_be32(DRBD_RESYNC_RATE_DEF); @@ -738,36 +1229,37 @@ int drbd_send_sync_param(struct drbd_peer_device *pe= er_device) strscpy(p->csums_alg, nc->csums_alg); rcu_read_unlock(); =20 - return drbd_send_command(peer_device, sock, cmd, size, NULL, 0); + return drbd_send_command(peer_device, cmd, DATA_STREAM); } =20 int __drbd_send_protocol(struct drbd_connection *connection, enum drbd_pac= ket cmd) { - struct drbd_socket *sock; struct p_protocol *p; struct net_conf *nc; size_t integrity_alg_len; int size, cf; =20 - sock =3D &connection->data; - p =3D __conn_prepare_command(connection, sock); - if (!p) - return -EIO; - - rcu_read_lock(); - nc =3D rcu_dereference(connection->net_conf); - - if (nc->tentative && connection->agreed_pro_version < 92) { - rcu_read_unlock(); + if (test_bit(CONN_DRY_RUN, &connection->flags) && connection->agreed_pro_= version < 92) { + clear_bit(CONN_DRY_RUN, &connection->flags); drbd_err(connection, "--dry-run is not supported by peer"); return -EOPNOTSUPP; } =20 size =3D sizeof(*p); + rcu_read_lock(); + nc =3D rcu_dereference(connection->transport.net_conf); if (connection->agreed_pro_version >=3D 87) { integrity_alg_len =3D strlen(nc->integrity_alg) + 1; size +=3D integrity_alg_len; } + rcu_read_unlock(); + + p =3D __conn_prepare_command(connection, size, DATA_STREAM); + if (!p) + return -EIO; + + rcu_read_lock(); + nc =3D rcu_dereference(connection->transport.net_conf); =20 p->protocol =3D cpu_to_be32(nc->wire_protocol); p->after_sb_0p =3D cpu_to_be32(nc->after_sb_0p); @@ -775,9 +1267,9 @@ int __drbd_send_protocol(struct drbd_connection *conne= ction, enum drbd_packet cm p->after_sb_2p =3D cpu_to_be32(nc->after_sb_2p); p->two_primaries =3D cpu_to_be32(nc->two_primaries); cf =3D 0; - if (nc->discard_my_data) + if (test_bit(CONN_DISCARD_MY_DATA, &connection->flags)) cf |=3D CF_DISCARD_MY_DATA; - if (nc->tentative) + if (test_bit(CONN_DRY_RUN, &connection->flags)) cf |=3D CF_DRY_RUN; p->conn_flags =3D cpu_to_be32(cf); =20 @@ -785,133 +1277,301 @@ int __drbd_send_protocol(struct drbd_connection *c= onnection, enum drbd_packet cm strscpy(p->integrity_alg, nc->integrity_alg, integrity_alg_len); rcu_read_unlock(); =20 - return __conn_send_command(connection, sock, cmd, size, NULL, 0); -} - -int drbd_send_protocol(struct drbd_connection *connection) -{ - int err; - - mutex_lock(&connection->data.mutex); - err =3D __drbd_send_protocol(connection, P_PROTOCOL); - mutex_unlock(&connection->data.mutex); - - return err; + return __send_command(connection, -1, cmd, DATA_STREAM); } =20 static int _drbd_send_uuids(struct drbd_peer_device *peer_device, u64 uuid= _flags) { struct drbd_device *device =3D peer_device->device; - struct drbd_socket *sock; struct p_uuids *p; int i; =20 if (!get_ldev_if_state(device, D_NEGOTIATING)) return 0; =20 - sock =3D &peer_device->connection->data; - p =3D drbd_prepare_command(peer_device, sock); + p =3D drbd_prepare_command(peer_device, sizeof(*p), DATA_STREAM); if (!p) { put_ldev(device); return -EIO; } + spin_lock_irq(&device->ldev->md.uuid_lock); - for (i =3D UI_CURRENT; i < UI_SIZE; i++) - p->uuid[i] =3D cpu_to_be64(device->ldev->md.uuid[i]); + p->current_uuid =3D cpu_to_be64(drbd_current_uuid(device)); + p->bitmap_uuid =3D cpu_to_be64(drbd_bitmap_uuid(peer_device)); + for (i =3D 0; i < ARRAY_SIZE(p->history_uuids); i++) + p->history_uuids[i] =3D cpu_to_be64(drbd_history_uuid(device, i)); spin_unlock_irq(&device->ldev->md.uuid_lock); =20 - device->comm_bm_set =3D drbd_bm_total_weight(device); - p->uuid[UI_SIZE] =3D cpu_to_be64(device->comm_bm_set); + peer_device->comm_bm_set =3D drbd_bm_total_weight(peer_device); + p->dirty_bits =3D cpu_to_be64(peer_device->comm_bm_set); + + if (test_bit(DISCARD_MY_DATA, &peer_device->flags)) + uuid_flags |=3D UUID_FLAG_DISCARD_MY_DATA; + if (test_bit(CRASHED_PRIMARY, &device->flags)) + uuid_flags |=3D UUID_FLAG_CRASHED_PRIMARY; + if (!drbd_md_test_flag(device->ldev, MDF_CONSISTENT)) + uuid_flags |=3D UUID_FLAG_INCONSISTENT; + + /* Silently mask out any "too recent" flags, + * we cannot communicate those in old DRBD + * protocol versions. */ + uuid_flags &=3D UUID_FLAG_MASK_COMPAT_84; + + peer_device->comm_uuid_flags =3D uuid_flags; + p->uuid_flags =3D cpu_to_be64(uuid_flags); + + put_ldev(device); + + return drbd_send_command(peer_device, P_UUIDS, DATA_STREAM); +} + +static u64 __bitmap_uuid(struct drbd_device *device, int node_id) +{ + struct drbd_peer_device *peer_device; + struct drbd_peer_md *peer_md =3D device->ldev->md.peers; + u64 bitmap_uuid =3D peer_md[node_id].bitmap_uuid; + + /* Sending a bitmap_uuid of 0 means that we are in sync with that peer. + The recipient of this message might use this assumption to throw away = it's + bitmap to that peer. + + Send -1 instead if we are (resync target from that peer) not at the sa= me + current uuid. + This corner case is relevant if we finish resync from an UpToDate peer= first, + and the second resync (which was paused first) is from an Outdated nod= e. + And that second resync gets canceled by the resync target due to the f= irst + resync finished successfully. + + Exceptions to the above are when the peer's UUID is not known yet + */ + rcu_read_lock(); - uuid_flags |=3D rcu_dereference(peer_device->connection->net_conf)->disca= rd_my_data ? 1 : 0; + peer_device =3D peer_device_by_node_id(device, node_id); + if (peer_device) { + enum drbd_repl_state repl_state =3D peer_device->repl_state[NOW]; + if (bitmap_uuid =3D=3D 0 && + (repl_state =3D=3D L_SYNC_TARGET || repl_state =3D=3D L_PAUSED_SYNC_= T) && + peer_device->current_uuid !=3D 0 && + (peer_device->current_uuid & ~UUID_PRIMARY) !=3D + (drbd_current_uuid(device) & ~UUID_PRIMARY)) + bitmap_uuid =3D -1; + } rcu_read_unlock(); - uuid_flags |=3D test_bit(CRASHED_PRIMARY, &device->flags) ? 2 : 0; - uuid_flags |=3D device->new_state_tmp.disk =3D=3D D_INCONSISTENT ? 4 : 0; - p->uuid[UI_FLAGS] =3D cpu_to_be64(uuid_flags); =20 - put_ldev(device); - return drbd_send_command(peer_device, sock, P_UUIDS, sizeof(*p), NULL, 0); + return bitmap_uuid; } =20 -int drbd_send_uuids(struct drbd_peer_device *peer_device) +u64 drbd_collect_local_uuid_flags(struct drbd_peer_device *peer_device, u6= 4 *authoritative_mask) { - return _drbd_send_uuids(peer_device, 0); + struct drbd_device *device =3D peer_device->device; + u64 uuid_flags =3D 0; + + if (test_bit(DISCARD_MY_DATA, &peer_device->flags)) + uuid_flags |=3D UUID_FLAG_DISCARD_MY_DATA; + if (test_bit(CRASHED_PRIMARY, &device->flags)) + uuid_flags |=3D UUID_FLAG_CRASHED_PRIMARY; + if (!drbd_md_test_flag(device->ldev, MDF_CONSISTENT)) + uuid_flags |=3D UUID_FLAG_INCONSISTENT; + if (test_bit(RECONNECT, &peer_device->connection->flags)) + uuid_flags |=3D UUID_FLAG_RECONNECT; + if (test_bit(PRIMARY_LOST_QUORUM, &device->flags)) + uuid_flags |=3D UUID_FLAG_PRIMARY_LOST_QUORUM; + if (drbd_device_stable(device, authoritative_mask)) + uuid_flags |=3D UUID_FLAG_STABLE; + + return uuid_flags; +} + +/* sets UUID_FLAG_SYNC_TARGET on uuid_flags as appropriate (may be NULL) */ +u64 drbd_resolved_uuid(struct drbd_peer_device *peer_device_base, u64 *uui= d_flags) +{ + struct drbd_device *device =3D peer_device_base->device; + struct drbd_peer_device *peer_device; + u64 uuid =3D drbd_current_uuid(device); + + rcu_read_lock(); + for_each_peer_device_rcu(peer_device, device) { + if (peer_device->node_id =3D=3D peer_device_base->node_id) + continue; + if (peer_device->repl_state[NOW] =3D=3D L_SYNC_TARGET) { + uuid =3D peer_device->current_uuid; + if (uuid_flags) + *uuid_flags |=3D UUID_FLAG_SYNC_TARGET; + break; + } + } + rcu_read_unlock(); + + return uuid; +} + +static int _drbd_send_uuids110(struct drbd_peer_device *peer_device, u64 u= uid_flags, u64 node_mask) +{ + struct drbd_device *device =3D peer_device->device; + const int my_node_id =3D device->resource->res_opts.node_id; + struct drbd_peer_md *peer_md; + struct p_uuids110 *p; + bool sent_one_unallocated; + int i, pos =3D 0; + u64 local_uuid_flags =3D 0, authoritative_mask, bitmap_uuids_mask =3D 0; + int p_size =3D sizeof(*p); + + if (!get_ldev_if_state(device, D_NEGOTIATING)) + return drbd_send_current_uuid(peer_device, device->exposed_data_uuid, + drbd_weak_nodes_device(device)); + + peer_md =3D device->ldev->md.peers; + + p_size +=3D (DRBD_PEERS_MAX + HISTORY_UUIDS) * sizeof(p->other_uuids[0]); + p =3D drbd_prepare_command(peer_device, p_size, DATA_STREAM); + if (!p) { + put_ldev(device); + return -EIO; + } + + spin_lock_irq(&device->ldev->md.uuid_lock); + peer_device->comm_current_uuid =3D drbd_resolved_uuid(peer_device, &local= _uuid_flags); + p->current_uuid =3D cpu_to_be64(peer_device->comm_current_uuid); + + sent_one_unallocated =3D peer_device->connection->agreed_pro_version < 11= 6; + for (i =3D 0; i < DRBD_NODE_ID_MAX; i++) { + u64 val =3D __bitmap_uuid(device, i); + bool send_this =3D peer_md[i].flags & (MDF_HAVE_BITMAP | MDF_NODE_EXISTS= ); + if (!send_this && !sent_one_unallocated && + i !=3D my_node_id && i !=3D peer_device->node_id && val) { + send_this =3D true; + sent_one_unallocated =3D true; + uuid_flags |=3D (u64)i << UUID_FLAG_UNALLOC_SHIFT; + uuid_flags |=3D UUID_FLAG_HAS_UNALLOC; + } + if (send_this) { + bitmap_uuids_mask |=3D NODE_MASK(i); + p->other_uuids[pos++] =3D cpu_to_be64(val); + } + } + peer_device->comm_bitmap_uuid =3D drbd_bitmap_uuid(peer_device); + + for (i =3D 0; i < HISTORY_UUIDS; i++) + p->other_uuids[pos++] =3D cpu_to_be64(drbd_history_uuid(device, i)); + spin_unlock_irq(&device->ldev->md.uuid_lock); + + p->bitmap_uuids_mask =3D cpu_to_be64(bitmap_uuids_mask); + + peer_device->comm_bm_set =3D drbd_bm_total_weight(peer_device); + p->dirty_bits =3D cpu_to_be64(peer_device->comm_bm_set); + local_uuid_flags |=3D drbd_collect_local_uuid_flags(peer_device, &authori= tative_mask); + peer_device->comm_uuid_flags =3D local_uuid_flags; + uuid_flags |=3D local_uuid_flags; + if (uuid_flags & UUID_FLAG_STABLE) { + p->node_mask =3D cpu_to_be64(node_mask); + } else { + D_ASSERT(peer_device, node_mask =3D=3D 0); + p->node_mask =3D cpu_to_be64(authoritative_mask); + } + + p->uuid_flags =3D cpu_to_be64(uuid_flags); + + put_ldev(device); + + p_size =3D sizeof(*p) + + (hweight64(bitmap_uuids_mask) + HISTORY_UUIDS) * sizeof(p->other_uuids[0= ]); + resize_prepared_command(peer_device->connection, DATA_STREAM, p_size); + return drbd_send_command(peer_device, P_UUIDS110, DATA_STREAM); } =20 -int drbd_send_uuids_skip_initial_sync(struct drbd_peer_device *peer_device) +int drbd_send_uuids(struct drbd_peer_device *peer_device, u64 uuid_flags, = u64 node_mask) { - return _drbd_send_uuids(peer_device, 8); + if (peer_device->connection->agreed_pro_version >=3D 110) + return _drbd_send_uuids110(peer_device, uuid_flags, node_mask); + else + return _drbd_send_uuids(peer_device, uuid_flags); } =20 -void drbd_print_uuids(struct drbd_device *device, const char *text) +void drbd_print_uuids(struct drbd_peer_device *peer_device, const char *te= xt) { + struct drbd_device *device =3D peer_device->device; + if (get_ldev_if_state(device, D_NEGOTIATING)) { - u64 *uuid =3D device->ldev->md.uuid; - drbd_info(device, "%s %016llX:%016llX:%016llX:%016llX\n", - text, - (unsigned long long)uuid[UI_CURRENT], - (unsigned long long)uuid[UI_BITMAP], - (unsigned long long)uuid[UI_HISTORY_START], - (unsigned long long)uuid[UI_HISTORY_END]); + drbd_info(peer_device, "%s %016llX:%016llX:%016llX:%016llX\n", + text, + (unsigned long long)drbd_current_uuid(device), + (unsigned long long)drbd_bitmap_uuid(peer_device), + (unsigned long long)drbd_history_uuid(device, 0), + (unsigned long long)drbd_history_uuid(device, 1)); put_ldev(device); } else { - drbd_info(device, "%s effective data uuid: %016llX\n", - text, - (unsigned long long)device->ed_uuid); + drbd_info(peer_device, "%s exposed data uuid: %016llX\n", + text, + (unsigned long long)device->exposed_data_uuid); } } =20 +int drbd_send_current_uuid(struct drbd_peer_device *peer_device, u64 curre= nt_uuid, u64 weak_nodes) +{ + struct p_current_uuid *p; + + p =3D drbd_prepare_command(peer_device, sizeof(*p), DATA_STREAM); + if (!p) + return -EIO; + + peer_device->comm_current_uuid =3D current_uuid; + p->uuid =3D cpu_to_be64(current_uuid); + p->weak_nodes =3D cpu_to_be64(weak_nodes); + return drbd_send_command(peer_device, P_CURRENT_UUID, DATA_STREAM); +} + void drbd_gen_and_send_sync_uuid(struct drbd_peer_device *peer_device) { struct drbd_device *device =3D peer_device->device; - struct drbd_socket *sock; - struct p_rs_uuid *p; + struct p_uuid *p; u64 uuid; =20 - D_ASSERT(device, device->state.disk =3D=3D D_UP_TO_DATE); + D_ASSERT(device, device->disk_state[NOW] =3D=3D D_UP_TO_DATE); =20 - uuid =3D device->ldev->md.uuid[UI_BITMAP]; + down_write(&device->uuid_sem); + uuid =3D drbd_bitmap_uuid(peer_device); if (uuid && uuid !=3D UUID_JUST_CREATED) uuid =3D uuid + UUID_NEW_BM_OFFSET; else get_random_bytes(&uuid, sizeof(u64)); - drbd_uuid_set(device, UI_BITMAP, uuid); - drbd_print_uuids(device, "updated sync UUID"); + drbd_uuid_set_bitmap(peer_device, uuid); + drbd_print_uuids(peer_device, "updated sync UUID"); drbd_md_sync(device); + downgrade_write(&device->uuid_sem); =20 - sock =3D &peer_device->connection->data; - p =3D drbd_prepare_command(peer_device, sock); + p =3D drbd_prepare_command(peer_device, sizeof(*p), DATA_STREAM); if (p) { p->uuid =3D cpu_to_be64(uuid); - drbd_send_command(peer_device, sock, P_SYNC_UUID, sizeof(*p), NULL, 0); + drbd_send_command(peer_device, P_SYNC_UUID, DATA_STREAM); } + up_read(&device->uuid_sem); } =20 -int drbd_send_sizes(struct drbd_peer_device *peer_device, int trigger_repl= y, enum dds_flags flags) +int drbd_send_sizes(struct drbd_peer_device *peer_device, + uint64_t u_size_diskless, enum dds_flags flags) { + struct drbd_connection *connection =3D peer_device->connection; struct drbd_device *device =3D peer_device->device; - struct drbd_socket *sock; struct p_sizes *p; sector_t d_size, u_size; int q_order_type; unsigned int max_bio_size; unsigned int packet_size; =20 - sock =3D &peer_device->connection->data; - p =3D drbd_prepare_command(peer_device, sock); - if (!p) - return -EIO; - packet_size =3D sizeof(*p); - if (peer_device->connection->agreed_features & DRBD_FF_WSAME) + if (connection->agreed_features & DRBD_FF_WSAME) packet_size +=3D sizeof(p->qlim[0]); =20 + p =3D drbd_prepare_command(peer_device, packet_size, DATA_STREAM); + if (!p) + return -EIO; + memset(p, 0, packet_size); if (get_ldev_if_state(device, D_NEGOTIATING)) { struct block_device *bdev =3D device->ldev->backing_bdev; struct request_queue *q =3D bdev_get_queue(bdev); =20 - d_size =3D drbd_get_max_capacity(device->ldev); + d_size =3D drbd_get_max_capacity(device, device->ldev, false); rcu_read_lock(); u_size =3D rcu_dereference(device->ldev->disk_conf)->disk_size; rcu_read_unlock(); @@ -927,6 +1587,10 @@ int drbd_send_sizes(struct drbd_peer_device *peer_dev= ice, int trigger_reply, enu p->qlim->io_min =3D cpu_to_be32(bdev_io_min(bdev)); p->qlim->io_opt =3D cpu_to_be32(bdev_io_opt(bdev)); p->qlim->discard_enabled =3D !!bdev_max_discard_sectors(bdev); + p->qlim->write_same_capable =3D 0; + if (connection->agreed_features & DRBD_FF_BM_BLOCK_SHIFT) + p->qlim->bm_block_shift_minus_12 =3D + device->bitmap->bm_block_shift - BM_BLOCK_SHIFT_4k; put_ldev(device); } else { struct request_queue *q =3D device->rq_queue; @@ -939,128 +1603,307 @@ int drbd_send_sizes(struct drbd_peer_device *peer_= device, int trigger_reply, enu p->qlim->io_min =3D cpu_to_be32(queue_io_min(q)); p->qlim->io_opt =3D cpu_to_be32(queue_io_opt(q)); p->qlim->discard_enabled =3D 0; + p->qlim->write_same_capable =3D 0; =20 d_size =3D 0; - u_size =3D 0; + u_size =3D u_size_diskless; q_order_type =3D QUEUE_ORDERED_NONE; max_bio_size =3D DRBD_MAX_BIO_SIZE; /* ... multiple BIOs per peer_reques= t */ } =20 - if (peer_device->connection->agreed_pro_version <=3D 94) + if (connection->agreed_pro_version <=3D 94) max_bio_size =3D min(max_bio_size, DRBD_MAX_SIZE_H80_PACKET); - else if (peer_device->connection->agreed_pro_version < 100) + else if (connection->agreed_pro_version < 100) max_bio_size =3D min(max_bio_size, DRBD_MAX_BIO_SIZE_P95); =20 + /* 9.0.4 bumped pro_version to 112 and introduced 2PC resizes */ + if (connection->agreed_pro_version >=3D 112) + d_size =3D drbd_partition_data_capacity(device); + p->d_size =3D cpu_to_be64(d_size); p->u_size =3D cpu_to_be64(u_size); - if (trigger_reply) - p->c_size =3D 0; - else - p->c_size =3D cpu_to_be64(get_capacity(device->vdisk)); + /* + TODO verify: this may be needed for v8 compatibility still. + p->c_size =3D cpu_to_be64(trigger_reply ? 0 : get_capacity(device->vdisk)= ); + */ + p->c_size =3D cpu_to_be64(get_capacity(device->vdisk)); p->max_bio_size =3D cpu_to_be32(max_bio_size); p->queue_order_type =3D cpu_to_be16(q_order_type); p->dds_flags =3D cpu_to_be16(flags); =20 - return drbd_send_command(peer_device, sock, P_SIZES, packet_size, NULL, 0= ); + return drbd_send_command(peer_device, P_SIZES, DATA_STREAM); } =20 -/** - * drbd_send_current_state() - Sends the drbd state to the peer - * @peer_device: DRBD peer device. - */ int drbd_send_current_state(struct drbd_peer_device *peer_device) { - struct drbd_socket *sock; + return drbd_send_state(peer_device, drbd_get_peer_device_state(peer_devic= e, NOW)); +} + +static int send_state(struct drbd_connection *connection, int vnr, union d= rbd_state state) +{ struct p_state *p; =20 - sock =3D &peer_device->connection->data; - p =3D drbd_prepare_command(peer_device, sock); + p =3D conn_prepare_command(connection, sizeof(*p), DATA_STREAM); if (!p) return -EIO; - p->state =3D cpu_to_be32(peer_device->device->state.i); /* Within the sen= d mutex */ - return drbd_send_command(peer_device, sock, P_STATE, sizeof(*p), NULL, 0); + + if (connection->agreed_pro_version < 110) { + /* D_DETACHING was introduced with drbd-9.0 */ + if (state.disk > D_DETACHING) + state.disk--; + if (state.pdsk > D_DETACHING) + state.pdsk--; + } + + p->state =3D cpu_to_be32(state.i); /* Within the send mutex */ + return send_command(connection, vnr, P_STATE, DATA_STREAM); +} + +int conn_send_state(struct drbd_connection *connection, union drbd_state s= tate) +{ + BUG_ON(connection->agreed_pro_version < 100); + return send_state(connection, -1, state); } =20 /** - * drbd_send_state() - After a state change, sends the new state to the pe= er - * @peer_device: DRBD peer device. - * @state: the state to send, not necessarily the current state. - * - * Each state change queues an "after_state_ch" work, which will eventually - * send the resulting new state to the peer. If more state changes happen - * between queuing and processing of the after_state_ch work, we still - * want to send each intermediary state in the order it occurred. + * drbd_send_state() - Sends the drbd state to the peer + * @peer_device: Peer DRBD device to send the state to. + * @state: state to send */ int drbd_send_state(struct drbd_peer_device *peer_device, union drbd_state= state) { - struct drbd_socket *sock; - struct p_state *p; - - sock =3D &peer_device->connection->data; - p =3D drbd_prepare_command(peer_device, sock); - if (!p) - return -EIO; - p->state =3D cpu_to_be32(state.i); /* Within the send mutex */ - return drbd_send_command(peer_device, sock, P_STATE, sizeof(*p), NULL, 0); + peer_device->comm_state =3D state; + return send_state(peer_device->connection, peer_device->device->vnr, stat= e); } =20 -int drbd_send_state_req(struct drbd_peer_device *peer_device, union drbd_s= tate mask, union drbd_state val) +int conn_send_state_req(struct drbd_connection *connection, int vnr, enum = drbd_packet cmd, + union drbd_state mask, union drbd_state val) { - struct drbd_socket *sock; struct p_req_state *p; =20 - sock =3D &peer_device->connection->data; - p =3D drbd_prepare_command(peer_device, sock); + /* Protocols before version 100 only support one volume and connection. + * All state change requests are via P_STATE_CHG_REQ. */ + if (connection->agreed_pro_version < 100) + cmd =3D P_STATE_CHG_REQ; + + p =3D conn_prepare_command(connection, sizeof(*p), DATA_STREAM); if (!p) return -EIO; p->mask =3D cpu_to_be32(mask.i); p->val =3D cpu_to_be32(val.i); - return drbd_send_command(peer_device, sock, P_STATE_CHG_REQ, sizeof(*p), = NULL, 0); + + return send_command(connection, vnr, cmd, DATA_STREAM); } =20 -int conn_send_state_req(struct drbd_connection *connection, union drbd_sta= te mask, union drbd_state val) +int conn_send_twopc_request(struct drbd_connection *connection, struct two= pc_request *request) { - enum drbd_packet cmd; - struct drbd_socket *sock; - struct p_req_state *p; + struct drbd_resource *resource =3D connection->resource; + struct p_twopc_request *p; =20 - cmd =3D connection->agreed_pro_version < 100 ? P_STATE_CHG_REQ : P_CONN_S= T_CHG_REQ; - sock =3D &connection->data; - p =3D conn_prepare_command(connection, sock); + dynamic_drbd_dbg(connection, "Sending %s request for state change %u\n", + drbd_packet_name(request->cmd), + request->tid); + + p =3D conn_prepare_command(connection, sizeof(*p), DATA_STREAM); if (!p) return -EIO; - p->mask =3D cpu_to_be32(mask.i); - p->val =3D cpu_to_be32(val.i); - return conn_send_command(connection, sock, cmd, sizeof(*p), NULL, 0); + p->tid =3D cpu_to_be32(request->tid); + if (connection->agreed_features & DRBD_FF_2PC_V2) { + p->flags =3D cpu_to_be32(TWOPC_HAS_FLAGS | request->flags); + p->_pad =3D 0; + p->s8_initiator_node_id =3D request->initiator_node_id; + p->s8_target_node_id =3D request->target_node_id; + } else { + p->u32_initiator_node_id =3D cpu_to_be32(request->initiator_node_id); + p->u32_target_node_id =3D cpu_to_be32(request->target_node_id); + } + p->nodes_to_reach =3D cpu_to_be64(request->nodes_to_reach); + switch (resource->twopc.type) { + case TWOPC_STATE_CHANGE: + if (request->cmd =3D=3D P_TWOPC_PREPARE) { + p->_compat_pad =3D 0; + p->mask =3D cpu_to_be32(resource->twopc.state_change.mask.i); + p->val =3D cpu_to_be32(resource->twopc.state_change.val.i); + } else { /* P_TWOPC_COMMIT */ + p->primary_nodes =3D cpu_to_be64(resource->twopc.state_change.primary_n= odes); + if (request->flags & TWOPC_HAS_REACHABLE && + connection->agreed_features & DRBD_FF_2PC_V2) { + p->reachable_nodes =3D cpu_to_be64( + resource->twopc.state_change.reachable_nodes); + } else { + p->mask =3D cpu_to_be32(resource->twopc.state_change.mask.i); + p->val =3D cpu_to_be32(resource->twopc.state_change.val.i); + } + } + break; + case TWOPC_RESIZE: + if (request->cmd =3D=3D P_TWOPC_PREP_RSZ) { + p->user_size =3D cpu_to_be64(resource->twopc.resize.user_size); + p->dds_flags =3D cpu_to_be16(resource->twopc.resize.dds_flags); + } else { /* P_TWOPC_COMMIT */ + p->diskful_primary_nodes =3D + cpu_to_be64(resource->twopc.resize.diskful_primary_nodes); + p->exposed_size =3D cpu_to_be64(resource->twopc.resize.new_size); + } + } + return send_command(connection, request->vnr, request->cmd, DATA_STREAM |= SFLAG_FLUSH); } =20 -void drbd_send_sr_reply(struct drbd_peer_device *peer_device, enum drbd_st= ate_rv retcode) +void drbd_send_sr_reply(struct drbd_connection *connection, int vnr, enum = drbd_state_rv retcode) { - struct drbd_socket *sock; struct p_req_state_reply *p; =20 - sock =3D &peer_device->connection->meta; - p =3D drbd_prepare_command(peer_device, sock); + p =3D conn_prepare_command(connection, sizeof(*p), CONTROL_STREAM); if (p) { + enum drbd_packet cmd =3D P_STATE_CHG_REPLY; + + if (connection->agreed_pro_version >=3D 100 && vnr < 0) + cmd =3D P_CONN_ST_CHG_REPLY; + p->retcode =3D cpu_to_be32(retcode); - drbd_send_command(peer_device, sock, P_STATE_CHG_REPLY, sizeof(*p), NULL= , 0); + send_command(connection, vnr, cmd, CONTROL_STREAM); } } =20 -void conn_send_sr_reply(struct drbd_connection *connection, enum drbd_stat= e_rv retcode) +void drbd_send_twopc_reply(struct drbd_connection *connection, + enum drbd_packet cmd, struct twopc_reply *reply) { - struct drbd_socket *sock; - struct p_req_state_reply *p; - enum drbd_packet cmd =3D connection->agreed_pro_version < 100 ? P_STATE_C= HG_REPLY : P_CONN_ST_CHG_REPLY; + struct p_twopc_reply *p; =20 - sock =3D &connection->meta; - p =3D conn_prepare_command(connection, sock); + p =3D conn_prepare_command(connection, sizeof(*p), CONTROL_STREAM); if (p) { - p->retcode =3D cpu_to_be32(retcode); - conn_send_command(connection, sock, cmd, sizeof(*p), NULL, 0); + p->tid =3D cpu_to_be32(reply->tid); + p->initiator_node_id =3D cpu_to_be32(reply->initiator_node_id); + p->reachable_nodes =3D cpu_to_be64(reply->reachable_nodes); + switch (connection->resource->twopc.type) { + case TWOPC_STATE_CHANGE: + p->primary_nodes =3D cpu_to_be64(reply->primary_nodes); + p->weak_nodes =3D cpu_to_be64(reply->weak_nodes); + break; + case TWOPC_RESIZE: + p->diskful_primary_nodes =3D cpu_to_be64(reply->diskful_primary_nodes); + p->max_possible_size =3D cpu_to_be64(reply->max_possible_size); + break; + } + send_command(connection, reply->vnr, cmd, CONTROL_STREAM | SFLAG_FLUSH); + } +} + +void drbd_send_peers_in_sync(struct drbd_peer_device *peer_device, u64 mas= k, sector_t sector, int size) +{ + struct p_peer_block_desc *p; + + p =3D drbd_prepare_command(peer_device, sizeof(*p), CONTROL_STREAM); + if (p) { + p->sector =3D cpu_to_be64(sector); + p->mask =3D cpu_to_be64(mask); + p->size =3D cpu_to_be32(size); + p->pad =3D 0; + drbd_send_command(peer_device, P_PEERS_IN_SYNC, CONTROL_STREAM); } } =20 +int drbd_send_peer_dagtag(struct drbd_connection *connection, struct drbd_= connection *lost_peer) +{ + struct p_peer_dagtag *p; + + p =3D conn_prepare_command(connection, sizeof(*p), DATA_STREAM); + if (!p) + return -EIO; + + p->dagtag =3D cpu_to_be64(atomic64_read(&lost_peer->last_dagtag_sector)); + p->node_id =3D cpu_to_be32(lost_peer->peer_node_id); + + return send_command(connection, -1, P_PEER_DAGTAG, DATA_STREAM); +} + +int drbd_send_flush_requests(struct drbd_connection *connection, u64 flush= _sequence) +{ + struct p_flush_requests *p; + + p =3D conn_prepare_command(connection, sizeof(*p), DATA_STREAM); + if (!p) + return -EIO; + + p->flush_sequence =3D cpu_to_be64(flush_sequence); + + return send_command(connection, -1, P_FLUSH_REQUESTS, DATA_STREAM); +} + +int drbd_send_flush_forward(struct drbd_connection *connection, u64 flush_= sequence, + int initiator_node_id) +{ + struct p_flush_forward *p; + + p =3D conn_prepare_command(connection, sizeof(*p), CONTROL_STREAM); + if (!p) + return -EIO; + + p->flush_sequence =3D cpu_to_be64(flush_sequence); + p->initiator_node_id =3D cpu_to_be32(initiator_node_id); + + return send_command(connection, -1, P_FLUSH_FORWARD, CONTROL_STREAM); +} + +int drbd_send_flush_requests_ack(struct drbd_connection *connection, u64 f= lush_sequence, + int primary_node_id) +{ + struct p_flush_ack *p; + + p =3D conn_prepare_command(connection, sizeof(*p), DATA_STREAM); + if (!p) + return -EIO; + + p->flush_sequence =3D cpu_to_be64(flush_sequence); + p->primary_node_id =3D cpu_to_be32(primary_node_id); + + return send_command(connection, -1, P_FLUSH_REQUESTS_ACK, DATA_STREAM); +} + +int drbd_send_enable_replication_next(struct drbd_peer_device *peer_device) +{ + struct p_enable_replication *p; + struct peer_device_conf *pdc; + bool resync_without_replication; + + set_bit(PEER_REPLICATION_NEXT, &peer_device->flags); + if (!(peer_device->connection->agreed_features & DRBD_FF_RESYNC_WITHOUT_R= EPLICATION)) + return 0; + + p =3D drbd_prepare_command(peer_device, sizeof(*p), DATA_STREAM); + if (!p) + return -EIO; + + rcu_read_lock(); + pdc =3D rcu_dereference(peer_device->conf); + resync_without_replication =3D pdc->resync_without_replication; + rcu_read_unlock(); + + if (resync_without_replication) + clear_bit(PEER_REPLICATION_NEXT, &peer_device->flags); + + p->enable =3D !resync_without_replication; + p->_pad1 =3D 0; + p->_pad2 =3D 0; + + return drbd_send_command(peer_device, P_ENABLE_REPLICATION_NEXT, DATA_STR= EAM); +} + +int drbd_send_enable_replication(struct drbd_peer_device *peer_device, boo= l enable) +{ + struct p_enable_replication *p; + + p =3D drbd_prepare_command(peer_device, sizeof(*p), DATA_STREAM); + if (!p) + return -EIO; + + p->enable =3D enable; + p->_pad1 =3D 0; + p->_pad2 =3D 0; + + return drbd_send_command(peer_device, P_ENABLE_REPLICATION, DATA_STREAM); +} + static void dcbp_set_code(struct p_compressed_bm *p, enum drbd_bitmap_code= code) { BUG_ON(code & ~0xf); @@ -1078,24 +1921,28 @@ static void dcbp_set_pad_bits(struct p_compressed_b= m *p, int n) p->encoding =3D (p->encoding & (~0x7 << 4)) | (n << 4); } =20 -static int fill_bitmap_rle_bits(struct drbd_device *device, - struct p_compressed_bm *p, - unsigned int size, - struct bm_xfer_ctx *c) +/* For compat reasons, encode bitmap as if it was 4k per bit! + * Easy: just scale the run length. + */ +static int fill_bitmap_rle_bits(struct drbd_peer_device *peer_device, + struct p_compressed_bm *p, + unsigned int size, + struct bm_xfer_ctx *c) { struct bitstream bs; unsigned long plain_bits; unsigned long tmp; unsigned long rl; + unsigned long rl_4k; unsigned len; unsigned toggle; int bits, use_rle; =20 /* may we use this feature? */ rcu_read_lock(); - use_rle =3D rcu_dereference(first_peer_device(device)->connection->net_co= nf)->use_rle; + use_rle =3D rcu_dereference(peer_device->connection->transport.net_conf)-= >use_rle; rcu_read_unlock(); - if (!use_rle || first_peer_device(device)->connection->agreed_pro_version= < 90) + if (!use_rle || peer_device->connection->agreed_pro_version < 90) return 0; =20 if (c->bit_offset >=3D c->bm_bits) @@ -1115,11 +1962,16 @@ static int fill_bitmap_rle_bits(struct drbd_device = *device, /* see how much plain bits we can stuff into one packet * using RLE and VLI. */ do { - tmp =3D (toggle =3D=3D 0) ? _drbd_bm_find_next_zero(device, c->bit_offse= t) - : _drbd_bm_find_next(device, c->bit_offset); - if (tmp =3D=3D -1UL) + tmp =3D (toggle =3D=3D 0) ? _drbd_bm_find_next_zero(peer_device, c->bit_= offset) + : _drbd_bm_find_next(peer_device, c->bit_offset); + if (tmp =3D=3D -1UL) { tmp =3D c->bm_bits; - rl =3D tmp - c->bit_offset; + rl =3D tmp - c->bit_offset; + rl_4k =3D c->bm_bits_4k - (c->bit_offset << c->scale); + } else { + rl =3D tmp - c->bit_offset; + rl_4k =3D rl << c->scale; + } =20 if (toggle =3D=3D 2) { /* first iteration */ if (rl =3D=3D 0) { @@ -1136,16 +1988,16 @@ static int fill_bitmap_rle_bits(struct drbd_device = *device, /* paranoia: catch zero runlength. * can only happen if bitmap is modified while we scan it. */ if (rl =3D=3D 0) { - drbd_err(device, "unexpected zero runlength while encoding bitmap " + drbd_err(peer_device, "unexpected zero runlength while encoding bitmap " "t:%u bo:%lu\n", toggle, c->bit_offset); return -1; } =20 - bits =3D vli_encode_bits(&bs, rl); + bits =3D vli_encode_bits(&bs, rl_4k); if (bits =3D=3D -ENOBUFS) /* buffer full */ break; if (bits <=3D 0) { - drbd_err(device, "error while encoding bitmap: %d\n", bits); + drbd_err(peer_device, "error while encoding bitmap: %d\n", bits); return 0; } =20 @@ -1156,7 +2008,7 @@ static int fill_bitmap_rle_bits(struct drbd_device *d= evice, =20 len =3D bs.cur.b - p->code + !!bs.cur.bit; =20 - if (plain_bits < (len << 3)) { + if (plain_bits << c->scale < (len << 3)) { /* incompressible with this method. * we need to rewind both word and bit position. */ c->bit_offset -=3D plain_bits; @@ -1175,33 +2027,69 @@ static int fill_bitmap_rle_bits(struct drbd_device = *device, return len; } =20 +/* Repeat extracted bits by "peeling off" words from the end. + * scale !=3D 0 implies that repeat >=3D 2. + * Feel free to optimize ... + */ +static void repeat_bits(unsigned long *base, unsigned long num, unsigned i= nt scale) +{ + unsigned long *src, *dst; + unsigned int repeat =3D 1 << scale; + unsigned int n; + int sbit, dbit, i; + + for (n =3D num - 1; n > 0; n--) { + src =3D &base[n]; + for (i =3D 0; i < repeat; i++) { + dst =3D &base[n*repeat + i]; + *dst =3D 0; + for (dbit =3D 0; dbit < BITS_PER_LONG; dbit++) { + sbit =3D (i * BITS_PER_LONG + dbit) >> scale; + if (test_bit(sbit, src)) + *dst |=3D 1UL << dbit; + } + } + } +} + /* * send_bitmap_rle_or_plain * * Return 0 when done, 1 when another iteration is needed, and a negative = error * code upon failure. + * + * For compat reasons, send bitmap as if it was 4k per bit! + * Good thing that a "scaled" bitmap will always "compress". */ static int send_bitmap_rle_or_plain(struct drbd_peer_device *peer_device, struct bm_x= fer_ctx *c) { struct drbd_device *device =3D peer_device->device; - struct drbd_socket *sock =3D &peer_device->connection->data; unsigned int header_size =3D drbd_header_size(peer_device->connection); - struct p_compressed_bm *p =3D sock->sbuf + header_size; + struct p_compressed_bm *pc; + char *p; int len, err; =20 - len =3D fill_bitmap_rle_bits(device, p, - DRBD_SOCKET_BUFFER_SIZE - header_size - sizeof(*p), c); - if (len < 0) + p =3D alloc_send_buffer(peer_device->connection, DRBD_SOCKET_BUFFER_SIZE,= DATA_STREAM); + if (IS_ERR(p)) return -EIO; =20 + pc =3D (struct p_compressed_bm *)(p + header_size); + + len =3D fill_bitmap_rle_bits(peer_device, pc, + DRBD_SOCKET_BUFFER_SIZE - header_size - sizeof(*pc), c); + if (len < 0) { + cancel_send_buffer(peer_device->connection, DATA_STREAM); + return -EIO; + } + if (len) { - dcbp_set_code(p, RLE_VLI_Bits); - err =3D __send_command(peer_device->connection, device->vnr, sock, - P_COMPRESSED_BITMAP, sizeof(*p) + len, - NULL, 0); + dcbp_set_code(pc, RLE_VLI_Bits); + resize_prepared_command(peer_device->connection, DATA_STREAM, sizeof(*pc= ) + len); + err =3D __send_command(peer_device->connection, device->vnr, + P_COMPRESSED_BITMAP, DATA_STREAM); c->packets[0]++; - c->bytes[0] +=3D header_size + sizeof(*p) + len; + c->bytes[0] +=3D header_size + sizeof(*pc) + len; =20 if (c->bit_offset >=3D c->bm_bits) len =3D 0; /* DONE */ @@ -1210,16 +2098,40 @@ send_bitmap_rle_or_plain(struct drbd_peer_device *p= eer_device, struct bm_xfer_ct * send a buffer full of plain text bits instead. */ unsigned int data_size; unsigned long num_words; - unsigned long *p =3D sock->sbuf + header_size; - + unsigned long words_left =3D c->bm_words - c->word_offset; + unsigned long *pu =3D (unsigned long *)pc; + + /* Only send full native bitmap words (actual granularity), + * scaled to what they would look like at 4k granularity. + * At maximum scale, which is (20 - 12), factor 256, + * to transfer at least one word of unscaled bitmap, + * we need data_size >=3D 256 (unsigned long) words, + * that is >=3D 2048 byte. Which we always have. + */ data_size =3D DRBD_SOCKET_BUFFER_SIZE - header_size; - num_words =3D min_t(size_t, data_size / sizeof(*p), - c->bm_words - c->word_offset); - len =3D num_words * sizeof(*p); - if (len) - drbd_bm_get_lel(device, c->word_offset, num_words, p); - err =3D __send_command(peer_device->connection, device->vnr, sock, P_BIT= MAP, - len, NULL, 0); + data_size =3D ALIGN_DOWN(data_size, sizeof(*pu) * (1UL << c->scale)); + num_words =3D (data_size / sizeof(*pu)) >> c->scale; + num_words =3D min_t(size_t, num_words, words_left); + + len =3D num_words * sizeof(*pu); + if (len) { + drbd_bm_get_lel(peer_device, c->word_offset, num_words, pu); + + if (c->scale) { + repeat_bits(pu, num_words, c->scale); + len <<=3D c->scale; + } + } else if (words_left !=3D 0) { + drbd_err(peer_device, + "failed to scale %lu words by %u while sending bitmap\n", + words_left, c->scale); + cancel_send_buffer(peer_device->connection, DATA_STREAM); + return -ERANGE; + } + + resize_prepared_command(peer_device->connection, DATA_STREAM, len); + err =3D __send_command(peer_device->connection, device->vnr, P_BITMAP, D= ATA_STREAM); + c->word_offset +=3D num_words; c->bit_offset =3D c->word_offset * BITS_PER_LONG; =20 @@ -1240,396 +2152,233 @@ send_bitmap_rle_or_plain(struct drbd_peer_device = *peer_device, struct bm_xfer_ct } =20 /* See the comment at receive_bitmap() */ -static int _drbd_send_bitmap(struct drbd_device *device, - struct drbd_peer_device *peer_device) +static bool _drbd_send_bitmap(struct drbd_device *device, + struct drbd_peer_device *peer_device) { struct bm_xfer_ctx c; - int err; - - if (!expect(device, device->bitmap)) - return false; + int res; =20 if (get_ldev(device)) { - if (drbd_md_test_flag(device->ldev, MDF_FULL_SYNC)) { + if (drbd_md_test_peer_flag(peer_device, MDF_PEER_FULL_SYNC)) { drbd_info(device, "Writing the whole bitmap, MDF_FullSync was set.\n"); - drbd_bm_set_all(device); - if (drbd_bm_write(device, peer_device)) { + drbd_bm_set_many_bits(peer_device, 0, -1UL); + if (drbd_bm_write(device, NULL)) { /* write_bm did fail! Leave full sync flag set in Meta P_DATA * but otherwise process as per normal - need to tell other * side that a full resync is required! */ drbd_err(device, "Failed to write bitmap to disk!\n"); } else { - drbd_md_clear_flag(device, MDF_FULL_SYNC); + drbd_md_clear_peer_flag(peer_device, MDF_PEER_FULL_SYNC); drbd_md_sync(device); } } + c =3D (struct bm_xfer_ctx) { + .bm_bits_4k =3D drbd_bm_bits_4k(device), + .bm_bits =3D drbd_bm_bits(device), + .bm_words =3D drbd_bm_words(device), + .scale =3D device->bitmap->bm_block_shift - BM_BLOCK_SHIFT_4k, + }; + put_ldev(device); + } else { + return false; } =20 - c =3D (struct bm_xfer_ctx) { - .bm_bits =3D drbd_bm_bits(device), - .bm_words =3D drbd_bm_words(device), - }; - do { - err =3D send_bitmap_rle_or_plain(peer_device, &c); - } while (err > 0); + if (get_ldev(device)) { + res =3D send_bitmap_rle_or_plain(peer_device, &c); + put_ldev(device); + } else { + return false; + } + } while (res > 0); =20 - return err =3D=3D 0; + return res =3D=3D 0; } =20 int drbd_send_bitmap(struct drbd_device *device, struct drbd_peer_device *= peer_device) { - struct drbd_socket *sock =3D &peer_device->connection->data; + struct drbd_transport *peer_transport =3D &peer_device->connection->trans= port; int err =3D -1; =20 - mutex_lock(&sock->mutex); - if (sock->socket) - err =3D !_drbd_send_bitmap(device, peer_device); - mutex_unlock(&sock->mutex); - return err; -} - -void drbd_send_b_ack(struct drbd_connection *connection, u32 barrier_nr, u= 32 set_size) -{ - struct drbd_socket *sock; - struct p_barrier_ack *p; - - if (connection->cstate < C_WF_REPORT_PARAMS) - return; - - sock =3D &connection->meta; - p =3D conn_prepare_command(connection, sock); - if (!p) - return; - p->barrier =3D barrier_nr; - p->set_size =3D cpu_to_be32(set_size); - conn_send_command(connection, sock, P_BARRIER_ACK, sizeof(*p), NULL, 0); -} - -/** - * _drbd_send_ack() - Sends an ack packet - * @peer_device: DRBD peer device. - * @cmd: Packet command code. - * @sector: sector, needs to be in big endian byte order - * @blksize: size in byte, needs to be in big endian byte order - * @block_id: Id, big endian byte order - */ -static int _drbd_send_ack(struct drbd_peer_device *peer_device, enum drbd_= packet cmd, - u64 sector, u32 blksize, u64 block_id) -{ - struct drbd_socket *sock; - struct p_block_ack *p; - - if (peer_device->device->state.conn < C_CONNECTED) + if (peer_device->bitmap_index =3D=3D -1) { + drbd_err(peer_device, "No bitmap allocated in drbd_send_bitmap()!\n"); return -EIO; + } =20 - sock =3D &peer_device->connection->meta; - p =3D drbd_prepare_command(peer_device, sock); - if (!p) - return -EIO; - p->sector =3D sector; - p->block_id =3D block_id; - p->blksize =3D blksize; - p->seq_num =3D cpu_to_be32(atomic_inc_return(&peer_device->device->packet= _seq)); - return drbd_send_command(peer_device, sock, cmd, sizeof(*p), NULL, 0); -} - -/* dp->sector and dp->block_id already/still in network byte order, - * data_size is payload size according to dp->head, - * and may need to be corrected for digest size. */ -void drbd_send_ack_dp(struct drbd_peer_device *peer_device, enum drbd_pack= et cmd, - struct p_data *dp, int data_size) -{ - if (peer_device->connection->peer_integrity_tfm) - data_size -=3D crypto_shash_digestsize(peer_device->connection->peer_int= egrity_tfm); - _drbd_send_ack(peer_device, cmd, dp->sector, cpu_to_be32(data_size), - dp->block_id); -} - -void drbd_send_ack_rp(struct drbd_peer_device *peer_device, enum drbd_pack= et cmd, - struct p_block_req *rp) -{ - _drbd_send_ack(peer_device, cmd, rp->sector, rp->blksize, rp->block_id); -} - -/** - * drbd_send_ack() - Sends an ack packet - * @peer_device: DRBD peer device - * @cmd: packet command code - * @peer_req: peer request - */ -int drbd_send_ack(struct drbd_peer_device *peer_device, enum drbd_packet c= md, - struct drbd_peer_request *peer_req) -{ - return _drbd_send_ack(peer_device, cmd, - cpu_to_be64(peer_req->i.sector), - cpu_to_be32(peer_req->i.size), - peer_req->block_id); -} + mutex_lock(&peer_device->connection->mutex[DATA_STREAM]); + if (peer_transport->class->ops.stream_ok(peer_transport, DATA_STREAM)) + err =3D !_drbd_send_bitmap(device, peer_device); + mutex_unlock(&peer_device->connection->mutex[DATA_STREAM]); =20 -/* This function misuses the block_id field to signal if the blocks - * are is sync or not. */ -int drbd_send_ack_ex(struct drbd_peer_device *peer_device, enum drbd_packe= t cmd, - sector_t sector, int blksize, u64 block_id) -{ - return _drbd_send_ack(peer_device, cmd, - cpu_to_be64(sector), - cpu_to_be32(blksize), - cpu_to_be64(block_id)); + return err; } =20 int drbd_send_rs_deallocated(struct drbd_peer_device *peer_device, struct drbd_peer_request *peer_req) { - struct drbd_socket *sock; - struct p_block_desc *p; + struct p_block_ack *p_id; =20 - sock =3D &peer_device->connection->data; - p =3D drbd_prepare_command(peer_device, sock); - if (!p) + if (peer_device->connection->agreed_pro_version < 122) { + struct p_block_desc *p; + + p =3D drbd_prepare_command(peer_device, sizeof(*p), DATA_STREAM); + if (!p) + return -EIO; + p->sector =3D cpu_to_be64(peer_req->i.sector); + p->blksize =3D cpu_to_be32(peer_req->i.size); + p->pad =3D 0; + return drbd_send_command(peer_device, P_RS_DEALLOCATED, DATA_STREAM); + } + + p_id =3D drbd_prepare_command(peer_device, sizeof(*p_id), DATA_STREAM); + if (!p_id) return -EIO; - p->sector =3D cpu_to_be64(peer_req->i.sector); - p->blksize =3D cpu_to_be32(peer_req->i.size); - p->pad =3D 0; - return drbd_send_command(peer_device, sock, P_RS_DEALLOCATED, sizeof(*p),= NULL, 0); + p_id->sector =3D cpu_to_be64(peer_req->i.sector); + p_id->blksize =3D cpu_to_be32(peer_req->i.size); + p_id->block_id =3D peer_req->block_id; + p_id->seq_num =3D 0; + return drbd_send_command(peer_device, P_RS_DEALLOCATED_ID, DATA_STREAM); } =20 -int drbd_send_drequest(struct drbd_peer_device *peer_device, int cmd, +int drbd_send_drequest(struct drbd_peer_device *peer_device, sector_t sector, int size, u64 block_id) { - struct drbd_socket *sock; struct p_block_req *p; =20 - sock =3D &peer_device->connection->data; - p =3D drbd_prepare_command(peer_device, sock); + p =3D drbd_prepare_command(peer_device, sizeof(*p), DATA_STREAM); if (!p) return -EIO; p->sector =3D cpu_to_be64(sector); p->block_id =3D block_id; p->blksize =3D cpu_to_be32(size); - return drbd_send_command(peer_device, sock, cmd, sizeof(*p), NULL, 0); -} - -int drbd_send_drequest_csum(struct drbd_peer_device *peer_device, sector_t= sector, int size, - void *digest, int digest_size, enum drbd_packet cmd) -{ - struct drbd_socket *sock; - struct p_block_req *p; + p->pad =3D 0; + return drbd_send_command(peer_device, P_DATA_REQUEST, DATA_STREAM); +} + +static void *drbd_prepare_rs_req(struct drbd_peer_device *peer_device, enu= m drbd_packet cmd, int payload_size, + sector_t sector, int blksize, u64 block_id, unsigned int dagtag_node_id,= u64 dagtag) +{ + void *payload; + struct p_block_req_common *req_common; + + if (cmd =3D=3D P_RS_DAGTAG_REQ || cmd =3D=3D P_RS_CSUM_DAGTAG_REQ || cmd = =3D=3D P_RS_THIN_DAGTAG_REQ || + cmd =3D=3D P_OV_DAGTAG_REQ || cmd =3D=3D P_OV_DAGTAG_REPLY) { + struct p_rs_req *p; + /* Due to the slightly complicated nested struct definition, + * verify that the packet size is as expected. */ + BUILD_BUG_ON(sizeof(struct p_rs_req) !=3D 32); + p =3D drbd_prepare_command(peer_device, sizeof(*p) + payload_size, DATA_= STREAM); + if (!p) + return NULL; + payload =3D p + 1; + req_common =3D &p->req_common; + p->dagtag_node_id =3D cpu_to_be32(dagtag_node_id); + p->dagtag =3D cpu_to_be64(dagtag); + } else { + struct p_block_req *p; + /* Due to the slightly complicated nested struct definition, + * verify that the packet size is as expected. */ + BUILD_BUG_ON(sizeof(struct p_block_req) !=3D 24); + p =3D drbd_prepare_command(peer_device, sizeof(*p) + payload_size, DATA_= STREAM); + if (!p) + return NULL; + payload =3D p + 1; + req_common =3D &p->req_common; + p->pad =3D 0; + } =20 - /* FIXME: Put the digest into the preallocated socket buffer. */ + req_common->sector =3D cpu_to_be64(sector); + req_common->block_id =3D block_id; + req_common->blksize =3D cpu_to_be32(blksize); =20 - sock =3D &peer_device->connection->data; - p =3D drbd_prepare_command(peer_device, sock); - if (!p) - return -EIO; - p->sector =3D cpu_to_be64(sector); - p->block_id =3D ID_SYNCER /* unused */; - p->blksize =3D cpu_to_be32(size); - return drbd_send_command(peer_device, sock, cmd, sizeof(*p), digest, dige= st_size); + return payload; } =20 -int drbd_send_ov_request(struct drbd_peer_device *peer_device, sector_t se= ctor, int size) +int drbd_send_rs_request(struct drbd_peer_device *peer_device, enum drbd_p= acket cmd, + sector_t sector, int size, u64 block_id, + unsigned int dagtag_node_id, u64 dagtag) { - struct drbd_socket *sock; - struct p_block_req *p; - - sock =3D &peer_device->connection->data; - p =3D drbd_prepare_command(peer_device, sock); - if (!p) + if (!drbd_prepare_rs_req(peer_device, cmd, 0, + sector, size, block_id, dagtag_node_id, dagtag)) return -EIO; - p->sector =3D cpu_to_be64(sector); - p->block_id =3D ID_SYNCER /* unused */; - p->blksize =3D cpu_to_be32(size); - return drbd_send_command(peer_device, sock, P_OV_REQUEST, sizeof(*p), NUL= L, 0); + return drbd_send_command(peer_device, cmd, DATA_STREAM); } =20 -/* called on sndtimeo - * returns false if we should retry, - * true if we think connection is dead - */ -static int we_should_drop_the_connection(struct drbd_connection *connectio= n, struct socket *sock) +void *drbd_prepare_drequest_csum(struct drbd_peer_request *peer_req, enum = drbd_packet cmd, + int digest_size, unsigned int dagtag_node_id, u64 dagtag) { - int drop_it; - /* long elapsed =3D (long)(jiffies - device->last_received); */ - - drop_it =3D connection->meta.socket =3D=3D sock - || !connection->ack_receiver.task - || get_t_state(&connection->ack_receiver) !=3D RUNNING - || connection->cstate < C_WF_REPORT_PARAMS; - - if (drop_it) - return true; - - drop_it =3D !--connection->ko_count; - if (!drop_it) { - drbd_err(connection, "[%s/%d] sock_sendmsg time expired, ko =3D %u\n", - current->comm, current->pid, connection->ko_count); - request_ping(connection); - } - - return drop_it; /* && (device->state =3D=3D R_PRIMARY) */; + struct drbd_peer_device *peer_device =3D peer_req->peer_device; + return drbd_prepare_rs_req(peer_device, cmd, digest_size, + peer_req->i.sector, peer_req->i.size, peer_req->block_id, + dagtag_node_id, dagtag); } =20 -static void drbd_update_congested(struct drbd_connection *connection) -{ - struct sock *sk =3D connection->data.socket->sk; - if (sk->sk_wmem_queued > sk->sk_sndbuf * 4 / 5) - set_bit(NET_CONGESTED, &connection->flags); -} =20 -/* The idea of sendpage seems to be to put some kind of reference - * to the page into the skb, and to hand it over to the NIC. In - * this process get_page() gets called. - * - * As soon as the page was really sent over the network put_page() - * gets called by some part of the network layer. [ NIC driver? ] - * - * [ get_page() / put_page() increment/decrement the count. If count - * reaches 0 the page will be freed. ] - * - * This works nicely with pages from FSs. - * But this means that in protocol A we might signal IO completion too ear= ly! - * - * In order not to corrupt data during a resync we must make sure - * that we do not reuse our own buffer pages (EEs) to early, therefore - * we have the net_ee list. - * - * XFS seems to have problems, still, it submits pages with page_count =3D= =3D 0! - * As a workaround, we disable sendpage on pages - * with page_count =3D=3D 0 or PageSlab. - */ -static int _drbd_no_send_page(struct drbd_peer_device *peer_device, struct= page *page, - int offset, size_t size, unsigned msg_flags) +static int __send_bio(struct drbd_peer_device *peer_device, struct bio *bi= o, unsigned int msg_flags) { - struct socket *socket; - void *addr; + struct drbd_connection *connection =3D peer_device->connection; + struct drbd_transport *transport =3D &connection->transport; + struct drbd_transport_ops *tr_ops =3D &transport->class->ops; int err; =20 - socket =3D peer_device->connection->data.socket; - addr =3D kmap(page) + offset; - err =3D drbd_send_all(peer_device->connection, socket, addr, size, msg_fl= ags); - kunmap(page); - if (!err) - peer_device->device->send_cnt +=3D size >> 9; + err =3D flush_send_buffer(connection, DATA_STREAM); + if (!err) { + err =3D tr_ops->send_bio(transport, bio, msg_flags); + if (!err) + peer_device->send_cnt +=3D bio->bi_iter.bi_size >> 9; + } + return err; } =20 -static int _drbd_send_page(struct drbd_peer_device *peer_device, struct pa= ge *page, - int offset, size_t size, unsigned msg_flags) +/* sendmsg(MSG_SPLICE_PAGES) (former (sendpage()) increases the page ref_c= ount + * and hands it to the network stack. After the NIC DMA sends the data, it + * decreases that page's ref_count. + * We may not do this for protocol A, where we could complete a write oper= ation + * before the network stack sends the data. + */ +static int +drbd_send_bio(struct drbd_peer_device *peer_device, struct bio *bio, unsig= ned int msg_flags) { - struct socket *socket =3D peer_device->connection->data.socket; - struct msghdr msg =3D { .msg_flags =3D msg_flags, }; - struct bio_vec bvec; - int len =3D size; - int err =3D -EIO; + if (drbd_disable_sendpage) + msg_flags &=3D ~MSG_SPLICE_PAGES; =20 - /* e.g. XFS meta- & log-data is in slab pages, which have a - * page_count of 0 and/or have PageSlab() set. - * we cannot use send_page for those, as that does get_page(); - * put_page(); and would cause either a VM_BUG directly, or - * __page_cache_release a page that would actually still be referenced - * by someone, leading to some obscure delayed Oops somewhere else. */ - if (!drbd_disable_sendpage && sendpages_ok(page, len, offset)) - msg.msg_flags |=3D MSG_NOSIGNAL | MSG_SPLICE_PAGES; + /* e.g. XFS meta- & log-data is in slab pages have !sendpage_ok(page) */ + if (msg_flags & MSG_SPLICE_PAGES) { + struct bvec_iter iter; + struct bio_vec bvec; =20 - drbd_update_congested(peer_device->connection); - do { - int sent; - - bvec_set_page(&bvec, page, len, offset); - iov_iter_bvec(&msg.msg_iter, ITER_SOURCE, &bvec, 1, len); + bio_for_each_segment(bvec, bio, iter) { + struct page *page =3D bvec.bv_page; =20 - sent =3D sock_sendmsg(socket, &msg); - if (sent <=3D 0) { - if (sent =3D=3D -EAGAIN) { - if (we_should_drop_the_connection(peer_device->connection, socket)) - break; - continue; + if (!sendpage_ok(page)) { + msg_flags &=3D ~MSG_SPLICE_PAGES; + break; } - drbd_warn(peer_device->device, "%s: size=3D%d len=3D%d sent=3D%d\n", - __func__, (int)size, len, sent); - if (sent < 0) - err =3D sent; - break; } - len -=3D sent; - offset +=3D sent; - } while (len > 0 /* THINK && device->cstate >=3D C_CONNECTED*/); - clear_bit(NET_CONGESTED, &peer_device->connection->flags); - - if (len =3D=3D 0) { - err =3D 0; - peer_device->device->send_cnt +=3D size >> 9; } - return err; -} - -static int _drbd_send_bio(struct drbd_peer_device *peer_device, struct bio= *bio) -{ - struct bio_vec bvec; - struct bvec_iter iter; =20 - /* hint all but last page with MSG_MORE */ - bio_for_each_segment(bvec, bio, iter) { - int err; - - err =3D _drbd_no_send_page(peer_device, bvec.bv_page, - bvec.bv_offset, bvec.bv_len, - bio_iter_last(bvec, iter) - ? 0 : MSG_MORE); - if (err) - return err; - } - return 0; + return __send_bio(peer_device, bio, msg_flags); } =20 -static int _drbd_send_zc_bio(struct drbd_peer_device *peer_device, struct = bio *bio) +static int drbd_send_ee(struct drbd_peer_device *peer_device, struct drbd_= peer_request *peer_req) { - struct bio_vec bvec; - struct bvec_iter iter; + struct bio *bio; + int err =3D 0; =20 - /* hint all but last page with MSG_MORE */ - bio_for_each_segment(bvec, bio, iter) { - int err; - - err =3D _drbd_send_page(peer_device, bvec.bv_page, - bvec.bv_offset, bvec.bv_len, - bio_iter_last(bvec, iter) ? 0 : MSG_MORE); + bio_list_for_each(bio, &peer_req->bios) { + err =3D __send_bio(peer_device, bio, + peer_req->flags & EE_RELEASE_TO_MEMPOOL ? 0 : MSG_SPLICE_PAGES); if (err) - return err; + break; } - return 0; -} =20 -static int _drbd_send_zc_ee(struct drbd_peer_device *peer_device, - struct drbd_peer_request *peer_req) -{ - bool use_sendpage =3D !(peer_req->flags & EE_RELEASE_TO_MEMPOOL); - struct page *page =3D peer_req->pages; - unsigned len =3D peer_req->i.size; - int err; - - /* hint all but last page with MSG_MORE */ - page_chain_for_each(page) { - unsigned l =3D min_t(unsigned, len, PAGE_SIZE); - - if (likely(use_sendpage)) - err =3D _drbd_send_page(peer_device, page, 0, l, - page_chain_next(page) ? MSG_MORE : 0); - else - err =3D _drbd_no_send_page(peer_device, page, 0, l, - page_chain_next(page) ? MSG_MORE : 0); - - if (err) - return err; - len -=3D l; - } - return 0; + return err; } =20 -static u32 bio_flags_to_wire(struct drbd_connection *connection, - struct bio *bio) +/* see also wire_flags_to_bio() */ +static u32 bio_flags_to_wire(struct drbd_connection *connection, struct bi= o *bio) { if (connection->agreed_pro_version >=3D 95) return (bio->bi_opf & REQ_SYNC ? DP_RW_SYNC : 0) | @@ -1637,12 +2386,13 @@ static u32 bio_flags_to_wire(struct drbd_connection= *connection, (bio->bi_opf & REQ_PREFLUSH ? DP_FLUSH : 0) | (bio_op(bio) =3D=3D REQ_OP_DISCARD ? DP_DISCARD : 0) | (bio_op(bio) =3D=3D REQ_OP_WRITE_ZEROES ? - ((connection->agreed_features & DRBD_FF_WZEROES) ? - (DP_ZEROES |(!(bio->bi_opf & REQ_NOUNMAP) ? DP_DISCARD : 0)) - : DP_DISCARD) - : 0); - else - return bio->bi_opf & REQ_SYNC ? DP_RW_SYNC : 0; + ((connection->agreed_features & DRBD_FF_WZEROES) ? + (DP_ZEROES | (!(bio->bi_opf & REQ_NOUNMAP) ? DP_DISCARD : 0)) + : DP_DISCARD) + : 0); + + /* else: we used to communicate one bit only in older DRBD */ + return bio->bi_opf & REQ_SYNC ? DP_RW_SYNC : 0; } =20 /* Used to send write or TRIM aka REQ_OP_DISCARD requests @@ -1651,53 +2401,62 @@ static u32 bio_flags_to_wire(struct drbd_connection= *connection, int drbd_send_dblock(struct drbd_peer_device *peer_device, struct drbd_req= uest *req) { struct drbd_device *device =3D peer_device->device; - struct drbd_socket *sock; + struct drbd_connection *connection =3D peer_device->connection; + char *const before =3D connection->scratch_buffer.d.before; + char *const after =3D connection->scratch_buffer.d.after; + struct p_trim *trim =3D NULL; struct p_data *p; - void *digest_out; + void *digest_out =3D NULL; unsigned int dp_flags =3D 0; - int digest_size; + int digest_size =3D 0; int err; + const unsigned s =3D req->net_rq_state[peer_device->node_id]; + const enum req_op op =3D bio_op(req->master_bio); + + if (op =3D=3D REQ_OP_DISCARD || op =3D=3D REQ_OP_WRITE_ZEROES) { + trim =3D drbd_prepare_command(peer_device, sizeof(*trim), DATA_STREAM); + if (!trim) + return -EIO; + p =3D &trim->p_data; + trim->size =3D cpu_to_be32(req->i.size); + } else { + if (connection->integrity_tfm) + digest_size =3D crypto_shash_digestsize(connection->integrity_tfm); =20 - sock =3D &peer_device->connection->data; - p =3D drbd_prepare_command(peer_device, sock); - digest_size =3D peer_device->connection->integrity_tfm ? - crypto_shash_digestsize(peer_device->connection->integrity_tfm) : = 0; + p =3D drbd_prepare_command(peer_device, sizeof(*p) + digest_size, DATA_S= TREAM); + if (!p) + return -EIO; + digest_out =3D p + 1; + } =20 - if (!p) - return -EIO; p->sector =3D cpu_to_be64(req->i.sector); p->block_id =3D (unsigned long)req; - p->seq_num =3D cpu_to_be32(atomic_inc_return(&device->packet_seq)); - dp_flags =3D bio_flags_to_wire(peer_device->connection, req->master_bio); - if (device->state.conn >=3D C_SYNC_SOURCE && - device->state.conn <=3D C_PAUSED_SYNC_T) + p->seq_num =3D cpu_to_be32(atomic_inc_return(&peer_device->packet_seq)); + dp_flags =3D bio_flags_to_wire(connection, req->master_bio); + if (peer_device->repl_state[NOW] >=3D L_SYNC_SOURCE && peer_device->repl_= state[NOW] <=3D L_PAUSED_SYNC_T) dp_flags |=3D DP_MAY_SET_IN_SYNC; - if (peer_device->connection->agreed_pro_version >=3D 100) { - if (req->rq_state & RQ_EXP_RECEIVE_ACK) + if (connection->agreed_pro_version >=3D 100) { + if (s & RQ_EXP_RECEIVE_ACK) dp_flags |=3D DP_SEND_RECEIVE_ACK; - /* During resync, request an explicit write ack, - * even in protocol !=3D C */ - if (req->rq_state & RQ_EXP_WRITE_ACK - || (dp_flags & DP_MAY_SET_IN_SYNC)) + if (s & RQ_EXP_WRITE_ACK || dp_flags & DP_MAY_SET_IN_SYNC) dp_flags |=3D DP_SEND_WRITE_ACK; } p->dp_flags =3D cpu_to_be32(dp_flags); =20 - if (dp_flags & (DP_DISCARD|DP_ZEROES)) { - enum drbd_packet cmd =3D (dp_flags & DP_ZEROES) ? P_ZEROES : P_TRIM; - struct p_trim *t =3D (struct p_trim*)p; - t->size =3D cpu_to_be32(req->i.size); - err =3D __send_command(peer_device->connection, device->vnr, sock, cmd, = sizeof(*t), NULL, 0); + if (trim) { + err =3D __send_command(connection, device->vnr, + (dp_flags & DP_ZEROES) ? P_ZEROES : P_TRIM, DATA_STREAM); goto out; } - digest_out =3D p + 1; =20 - /* our digest is still only over the payload. - * TRIM does not carry any payload. */ - if (digest_size) - drbd_csum_bio(peer_device->connection->integrity_tfm, req->master_bio, d= igest_out); - err =3D __send_command(peer_device->connection, device->vnr, sock, P_DATA, - sizeof(*p) + digest_size, NULL, req->i.size); + if (digest_size && digest_out) { + WARN_ON(digest_size > sizeof(connection->scratch_buffer.d.before)); + drbd_csum_bio(connection->integrity_tfm, req->master_bio, before); + memcpy(digest_out, before, digest_size); + } + + additional_size_command(connection, DATA_STREAM, req->i.size); + err =3D __send_command(connection, device->vnr, P_DATA, DATA_STREAM); if (!err) { /* For protocol A, we have to memcpy the payload into * socket buffers, as we may complete right away @@ -1710,50 +2469,43 @@ int drbd_send_dblock(struct drbd_peer_device *peer_= device, struct drbd_request * * out ok after sending on this side, but does not fit on the * receiving side, we sure have detected corruption elsewhere. */ - if (!(req->rq_state & (RQ_EXP_RECEIVE_ACK | RQ_EXP_WRITE_ACK)) || digest= _size) - err =3D _drbd_send_bio(peer_device, req->master_bio); - else - err =3D _drbd_send_zc_bio(peer_device, req->master_bio); + bool proto_b_or_c =3D (s & (RQ_EXP_RECEIVE_ACK | RQ_EXP_WRITE_ACK)); + int msg_flags =3D proto_b_or_c && !digest_size ? MSG_SPLICE_PAGES : 0; + + err =3D drbd_send_bio(peer_device, req->master_bio, msg_flags); =20 /* double check digest, sometimes buffers have been modified in flight. = */ - if (digest_size > 0 && digest_size <=3D 64) { - /* 64 byte, 512 bit, is the largest digest size - * currently supported in kernel crypto. */ - unsigned char digest[64]; - drbd_csum_bio(peer_device->connection->integrity_tfm, req->master_bio, = digest); - if (memcmp(p + 1, digest, digest_size)) { + if (digest_size > 0) { + drbd_csum_bio(connection->integrity_tfm, req->master_bio, after); + if (memcmp(before, after, digest_size)) { drbd_warn(device, "Digest mismatch, buffer modified by upper layers during write: %llus= +%u\n", (unsigned long long)req->i.sector, req->i.size); } - } /* else if (digest_size > 64) { - ... Be noisy about digest too large ... - } */ + } } out: - mutex_unlock(&sock->mutex); /* locked by drbd_prepare_command() */ + mutex_unlock(&connection->mutex[DATA_STREAM]); =20 return err; } =20 /* answer packet, used to send data back for read requests: * Peer -> (diskless) R_PRIMARY (P_DATA_REPLY) - * C_SYNC_SOURCE -> C_SYNC_TARGET (P_RS_DATA_REPLY) + * L_SYNC_SOURCE -> L_SYNC_TARGET (P_RS_DATA_REPLY) */ int drbd_send_block(struct drbd_peer_device *peer_device, enum drbd_packet= cmd, struct drbd_peer_request *peer_req) { - struct drbd_device *device =3D peer_device->device; - struct drbd_socket *sock; + struct drbd_connection *connection =3D peer_device->connection; struct p_data *p; int err; int digest_size; =20 - sock =3D &peer_device->connection->data; - p =3D drbd_prepare_command(peer_device, sock); + digest_size =3D connection->integrity_tfm ? + crypto_shash_digestsize(connection->integrity_tfm) : 0; =20 - digest_size =3D peer_device->connection->integrity_tfm ? - crypto_shash_digestsize(peer_device->connection->integrity_tfm) : = 0; + p =3D drbd_prepare_command(peer_device, sizeof(*p) + digest_size, DATA_ST= REAM); =20 if (!p) return -EIO; @@ -1761,314 +2513,721 @@ int drbd_send_block(struct drbd_peer_device *peer= _device, enum drbd_packet cmd, p->block_id =3D peer_req->block_id; p->seq_num =3D 0; /* unused */ p->dp_flags =3D 0; + + /* Older peers expect block_id for P_RS_DATA_REPLY to be ID_SYNCER. */ + if (connection->agreed_pro_version < 122 && cmd =3D=3D P_RS_DATA_REPLY) + p->block_id =3D ID_SYNCER; + if (digest_size) - drbd_csum_ee(peer_device->connection->integrity_tfm, peer_req, p + 1); - err =3D __send_command(peer_device->connection, device->vnr, sock, cmd, s= izeof(*p) + digest_size, NULL, peer_req->i.size); + drbd_csum_bios(connection->integrity_tfm, &peer_req->bios, p + 1); + additional_size_command(connection, DATA_STREAM, peer_req->i.size); + err =3D __send_command(connection, + peer_device->device->vnr, cmd, DATA_STREAM); if (!err) - err =3D _drbd_send_zc_ee(peer_device, peer_req); - mutex_unlock(&sock->mutex); /* locked by drbd_prepare_command() */ + err =3D drbd_send_ee(peer_device, peer_req); + mutex_unlock(&connection->mutex[DATA_STREAM]); =20 return err; } =20 -int drbd_send_out_of_sync(struct drbd_peer_device *peer_device, struct drb= d_request *req) +int drbd_send_out_of_sync(struct drbd_peer_device *peer_device, sector_t s= ector, unsigned int size) { - struct drbd_socket *sock; struct p_block_desc *p; =20 - sock =3D &peer_device->connection->data; - p =3D drbd_prepare_command(peer_device, sock); + p =3D drbd_prepare_command(peer_device, sizeof(*p), DATA_STREAM); if (!p) return -EIO; - p->sector =3D cpu_to_be64(req->i.sector); - p->blksize =3D cpu_to_be32(req->i.size); - return drbd_send_command(peer_device, sock, P_OUT_OF_SYNC, sizeof(*p), NU= LL, 0); + p->sector =3D cpu_to_be64(sector); + p->blksize =3D cpu_to_be32(size); + return drbd_send_command(peer_device, P_OUT_OF_SYNC, DATA_STREAM); } =20 -/* - drbd_send distinguishes two cases: +int drbd_send_dagtag(struct drbd_connection *connection, u64 dagtag) +{ + struct p_dagtag *p; =20 - Packets sent via the data socket "sock" - and packets sent via the meta data socket "msock" + if (connection->agreed_pro_version < 110) + return 0; =20 - sock msock - -----------------+-------------------------+----------------------------= -- - timeout conf.timeout / 2 conf.timeout / 2 - timeout action send a ping via msock Abort communication - and close all sockets -*/ + p =3D conn_prepare_command(connection, sizeof(*p), DATA_STREAM); + if (!p) + return -EIO; + p->dagtag =3D cpu_to_be64(dagtag); + return send_command(connection, -1, P_DAGTAG, DATA_STREAM); +} =20 -/* - * you must have down()ed the appropriate [m]sock_mutex elsewhere! - */ -int drbd_send(struct drbd_connection *connection, struct socket *sock, - void *buf, size_t size, unsigned msg_flags) +/* primary_peer_present_and_not_two_primaries_allowed() */ +static bool primary_peer_present(struct drbd_resource *resource) { - struct kvec iov =3D {.iov_base =3D buf, .iov_len =3D size}; - struct msghdr msg =3D {.msg_flags =3D msg_flags | MSG_NOSIGNAL}; - int rv, sent =3D 0; + struct drbd_connection *connection; + struct net_conf *nc; + bool two_primaries, rv =3D false; =20 - if (!sock) - return -EBADR; + rcu_read_lock(); + for_each_connection_rcu(connection, resource) { + nc =3D rcu_dereference(connection->transport.net_conf); + two_primaries =3D nc ? nc->two_primaries : false; =20 - /* THINK if (signal_pending) return ... ? */ + if (connection->peer_role[NOW] =3D=3D R_PRIMARY && !two_primaries) { + rv =3D true; + break; + } + } + rcu_read_unlock(); =20 - iov_iter_kvec(&msg.msg_iter, ITER_SOURCE, &iov, 1, size); + return rv; +} =20 - if (sock =3D=3D connection->data.socket) { - rcu_read_lock(); - connection->ko_count =3D rcu_dereference(connection->net_conf)->ko_count; - rcu_read_unlock(); - drbd_update_congested(connection); - } - do { - rv =3D sock_sendmsg(sock, &msg); - if (rv =3D=3D -EAGAIN) { - if (we_should_drop_the_connection(connection, sock)) +static bool any_disk_is_uptodate(struct drbd_device *device) +{ + bool ret =3D false; + + rcu_read_lock(); + if (device->disk_state[NOW] =3D=3D D_UP_TO_DATE) + ret =3D true; + else { + struct drbd_peer_device *peer_device; + + for_each_peer_device_rcu(peer_device, device) { + if (peer_device->disk_state[NOW] =3D=3D D_UP_TO_DATE) { + ret =3D true; break; - else - continue; - } - if (rv =3D=3D -EINTR) { - flush_signals(current); - rv =3D 0; + } } - if (rv < 0) - break; - sent +=3D rv; - } while (sent < size); - - if (sock =3D=3D connection->data.socket) - clear_bit(NET_CONGESTED, &connection->flags); - - if (rv <=3D 0) { - if (rv !=3D -EAGAIN) { - drbd_err(connection, "%s_sendmsg returned %d\n", - sock =3D=3D connection->meta.socket ? "msock" : "sock", - rv); - conn_request_state(connection, NS(conn, C_BROKEN_PIPE), CS_HARD); - } else - conn_request_state(connection, NS(conn, C_TIMEOUT), CS_HARD); } + rcu_read_unlock(); =20 - return sent; + return ret; } =20 -/* - * drbd_send_all - Send an entire buffer - * - * Returns 0 upon success and a negative error value otherwise. +/* If we are trying to (re-)establish some connection, + * it may be useful to re-try the conditions in drbd_open(). + * But if we have no connection at all (yet/anymore), + * or are disconnected and not trying to (re-)establish, + * or are established already, retrying won't help at all. + * Asking the same peer(s) the same question + * is unlikely to change their answer. + * Almost always triggered by udev (and the configured probes) while bring= ing + * the resource "up", just after "new-minor", even before "attach" or any + * "peers"/"paths" are configured. */ -int drbd_send_all(struct drbd_connection *connection, struct socket *sock,= void *buffer, - size_t size, unsigned msg_flags) +static bool connection_state_may_improve_soon(struct drbd_resource *resour= ce) { - int err; - - err =3D drbd_send(connection, sock, buffer, size, msg_flags); - if (err < 0) - return err; - if (err !=3D size) - return -EIO; - return 0; + struct drbd_connection *connection; + bool ret =3D false; + rcu_read_lock(); + for_each_connection_rcu(connection, resource) { + enum drbd_conn_state cstate =3D connection->cstate[NOW]; + if (C_DISCONNECTING < cstate && cstate < C_CONNECTED) { + ret =3D true; + break; + } + } + rcu_read_unlock(); + return ret; } =20 -static int drbd_open(struct gendisk *disk, blk_mode_t mode) +/* TASK_COMM_LEN reserves one '\0', sizeof("") both include '\0', + * that's room enough for ':' and ' ' separators and the EOS. + */ +union comm_pid_tag_buf { + char comm[TASK_COMM_LEN]; + char buf[TASK_COMM_LEN + sizeof("2147483647") + sizeof("auto-promote")]; +}; + +static void snprintf_current_comm_pid_tag(union comm_pid_tag_buf *s, const= char *tag) { - struct drbd_device *device =3D disk->private_data; - unsigned long flags; - int rv =3D 0; + int len; =20 - mutex_lock(&drbd_main_mutex); - spin_lock_irqsave(&device->resource->req_lock, flags); - /* to have a stable device->state.role - * and no race with updating open_cnt */ + get_task_comm(s->comm, current); + len =3D strlen(s->buf); + snprintf(s->buf + len, sizeof(s->buf)-len, ":%d %s", task_pid_nr(current)= , tag); +} =20 - if (device->state.role !=3D R_PRIMARY) { - if (mode & BLK_OPEN_WRITE) - rv =3D -EROFS; - else if (!drbd_allow_oos) - rv =3D -EMEDIUMTYPE; - } +static int try_to_promote(struct drbd_device *device, long timeout, bool n= delay) +{ + struct drbd_resource *resource =3D device->resource; + int rv; =20 - if (!rv) - device->open_cnt++; - spin_unlock_irqrestore(&device->resource->req_lock, flags); - mutex_unlock(&drbd_main_mutex); + do { + union comm_pid_tag_buf tag; + unsigned long start =3D jiffies; + long t; =20 + snprintf_current_comm_pid_tag(&tag, "auto-promote"); + rv =3D drbd_set_role(resource, R_PRIMARY, false, tag.buf, NULL); + timeout -=3D jiffies - start; + + if (ndelay || rv >=3D SS_SUCCESS || timeout <=3D 0) { + break; + } else if (rv =3D=3D SS_CW_FAILED_BY_PEER) { + /* Probably udev has it open read-only on one of the peers; + since commit cbcbb50a65 from 2017 it waits on the peer; + retry only if the timeout permits */ + if (jiffies - start < HZ / 10) { + t =3D schedule_timeout_interruptible(HZ / 10); + if (t) + break; + timeout -=3D HZ / 10; + } + } else if (rv =3D=3D SS_TWO_PRIMARIES) { + /* Wait till the peer demoted itself */ + t =3D wait_event_interruptible_timeout(resource->state_wait, + resource->role[NOW] =3D=3D R_PRIMARY || + (!primary_peer_present(resource) && any_disk_is_uptodate(device)), + timeout); + if (t <=3D 0) + break; + timeout -=3D t; + } else if (rv =3D=3D SS_NO_UP_TO_DATE_DISK && connection_state_may_impro= ve_soon(resource)) { + /* Wait until we get a connection established */ + t =3D wait_event_interruptible_timeout(resource->state_wait, + any_disk_is_uptodate(device), timeout); + if (t <=3D 0) + break; + timeout -=3D t; + } else { + break; + } + } while (timeout > 0); return rv; } =20 -static void drbd_release(struct gendisk *gd) +static int ro_open_cond(struct drbd_device *device) { - struct drbd_device *device =3D gd->private_data; + struct drbd_resource *resource =3D device->resource; =20 - mutex_lock(&drbd_main_mutex); - device->open_cnt--; - mutex_unlock(&drbd_main_mutex); + if (!device->have_quorum[NOW]) + return -ENODATA; + else if (resource->role[NOW] !=3D R_PRIMARY && + primary_peer_present(resource) && !drbd_allow_oos) + return -EMEDIUMTYPE; + else if (any_disk_is_uptodate(device)) + return 0; + else if (connection_state_may_improve_soon(resource)) + return -EAGAIN; + else + return -ENODATA; } =20 -/* need to hold resource->req_lock */ -void drbd_queue_unplug(struct drbd_device *device) +enum ioc_rv { + IOC_SLEEP =3D 0, + IOC_OK =3D 1, + IOC_ABORT =3D 2, +}; + +/* If we are in the middle of a cluster wide state change, we don't want + * to change (open_cnt =3D=3D 0), as that then could cause a failure to co= mmit + * some already promised peer auto-promote locally. + * So we wait until the pending remote_state_change is finalized, + * or give up when the timeout is reached. + * + * But we don't want to fail an open on a Primary just because it happens + * during some unrelated remote state change. + * If we are already Primary, or already have an open count !=3D 0, + * we don't need to wait, it won't change anything. + */ +static enum ioc_rv inc_open_count(struct drbd_device *device, blk_mode_t m= ode) { - if (device->state.pdsk >=3D D_INCONSISTENT && device->state.conn >=3D C_C= ONNECTED) { - D_ASSERT(device, device->state.role =3D=3D R_PRIMARY); - if (test_and_clear_bit(UNPLUG_REMOTE, &device->flags)) { - drbd_queue_work_if_unqueued( - &first_peer_device(device)->connection->sender_work, - &device->unplug_work); - } + struct drbd_resource *resource =3D device->resource; + enum ioc_rv r; + + if (test_bit(DOWN_IN_PROGRESS, &resource->flags)) + return IOC_ABORT; + + read_lock_irq(&resource->state_rwlock); + if (test_bit(UNREGISTERED, &device->flags)) + r =3D IOC_ABORT; + else if (resource->remote_state_change && + resource->role[NOW] !=3D R_PRIMARY && + (device->open_cnt =3D=3D 0 || mode & BLK_OPEN_WRITE)) { + if (mode & BLK_OPEN_NDELAY) + r =3D IOC_ABORT; + else + r =3D IOC_SLEEP; + } else { + r =3D IOC_OK; + device->open_cnt++; + if (mode & BLK_OPEN_WRITE) + device->writable =3D true; } -} + read_unlock_irq(&resource->state_rwlock); =20 -static void drbd_set_defaults(struct drbd_device *device) -{ - /* Beware! The actual layout differs - * between big endian and little endian */ - device->state =3D (union drbd_dev_state) { - { .role =3D R_SECONDARY, - .peer =3D R_UNKNOWN, - .conn =3D C_STANDALONE, - .disk =3D D_DISKLESS, - .pdsk =3D D_UNKNOWN, - } }; + return r; } =20 -void drbd_init_set_defaults(struct drbd_device *device) +static void __prune_or_free_openers(struct drbd_device *device, pid_t pid) { - /* the memset(,0,) did most of this. - * note: only assignments, no allocation in here */ + struct opener *pos, *tmp; =20 - drbd_set_defaults(device); + list_for_each_entry_safe(pos, tmp, &device->openers, list) { + // if pid =3D=3D 0, i.e., counts were 0, delete all entries, else the ma= tching one + if (pid =3D=3D 0 || pid =3D=3D pos->pid) { + dynamic_drbd_dbg(device, "%sopeners del: %s(%d)\n", pid =3D=3D 0 ? "" := "all ", + pos->comm, pos->pid); + list_del(&pos->list); + kfree(pos); =20 - atomic_set(&device->ap_bio_cnt, 0); - atomic_set(&device->ap_actlog_cnt, 0); - atomic_set(&device->ap_pending_cnt, 0); - atomic_set(&device->rs_pending_cnt, 0); - atomic_set(&device->unacked_cnt, 0); - atomic_set(&device->local_cnt, 0); - atomic_set(&device->pp_in_use_by_net, 0); - atomic_set(&device->rs_sect_in, 0); - atomic_set(&device->rs_sect_ev, 0); - atomic_set(&device->ap_in_flight, 0); - atomic_set(&device->md_io.in_use, 0); + /* in case we remove a real process, stop here, there might be multiple= openers with the same pid */ + /* this assumes that the oldest opener with the same pid releases first= . "as good as it gets" */ + if (pid !=3D 0) + break; + } + } +} =20 - mutex_init(&device->own_state_mutex); - device->state_mutex =3D &device->own_state_mutex; +static void free_openers(struct drbd_device *device) +{ + __prune_or_free_openers(device, 0); +} =20 - spin_lock_init(&device->al_lock); - spin_lock_init(&device->peer_seq_lock); - - INIT_LIST_HEAD(&device->active_ee); - INIT_LIST_HEAD(&device->sync_ee); - INIT_LIST_HEAD(&device->done_ee); - INIT_LIST_HEAD(&device->read_ee); - INIT_LIST_HEAD(&device->resync_reads); - INIT_LIST_HEAD(&device->resync_work.list); - INIT_LIST_HEAD(&device->unplug_work.list); - INIT_LIST_HEAD(&device->bm_io_work.w.list); - INIT_LIST_HEAD(&device->pending_master_completion[0]); - INIT_LIST_HEAD(&device->pending_master_completion[1]); - INIT_LIST_HEAD(&device->pending_completion[0]); - INIT_LIST_HEAD(&device->pending_completion[1]); +static void prune_or_free_openers(struct drbd_device *device, pid_t pid) +{ + spin_lock(&device->openers_lock); + __prune_or_free_openers(device, pid); + spin_unlock(&device->openers_lock); +} =20 - device->resync_work.cb =3D w_resync_timer; - device->unplug_work.cb =3D w_send_write_hint; - device->bm_io_work.w.cb =3D w_bitmap_io; +static void add_opener(struct drbd_device *device, bool did_auto_promote) +{ + struct opener *opener, *tmp; + ktime_t now =3D ktime_get_real(); + int len =3D 0; =20 - timer_setup(&device->resync_timer, resync_timer_fn, 0); - timer_setup(&device->md_sync_timer, md_sync_timer_fn, 0); - timer_setup(&device->start_resync_timer, start_resync_timer_fn, 0); - timer_setup(&device->request_timer, request_timer_fn, 0); + if (did_auto_promote) { + struct drbd_resource *resource =3D device->resource; =20 - init_waitqueue_head(&device->misc_wait); - init_waitqueue_head(&device->state_wait); - init_waitqueue_head(&device->ee_wait); - init_waitqueue_head(&device->al_wait); - init_waitqueue_head(&device->seq_wait); + resource->auto_promoted_by.minor =3D device->minor; + resource->auto_promoted_by.pid =3D task_pid_nr(current); + resource->auto_promoted_by.opened =3D now; + get_task_comm(resource->auto_promoted_by.comm, current); + } + opener =3D kmalloc_obj(*opener, GFP_NOIO); + if (!opener) + return; + get_task_comm(opener->comm, current); + opener->pid =3D task_pid_nr(current); + opener->opened =3D now; + + spin_lock(&device->openers_lock); + list_for_each_entry(tmp, &device->openers, list) + if (++len > 100) { /* 100 ought to be enough for everybody */ + dynamic_drbd_dbg(device, "openers: list full, do not add new opener\n"); + kfree(opener); + goto out; + } =20 - device->resync_wenr =3D LC_FREE; - device->peer_max_bio_size =3D DRBD_MAX_BIO_SIZE_SAFE; - device->local_max_bio_size =3D DRBD_MAX_BIO_SIZE_SAFE; + list_add(&opener->list, &device->openers); + dynamic_drbd_dbg(device, "openers add: %s(%d)\n", opener->comm, opener->p= id); +out: + spin_unlock(&device->openers_lock); } =20 -void drbd_set_my_capacity(struct drbd_device *device, sector_t size) +static int drbd_open(struct gendisk *gd, blk_mode_t mode) { - char ppb[10]; + struct drbd_device *device =3D gd->private_data; + struct drbd_resource *resource =3D device->resource; + long timeout =3D resource->res_opts.auto_promote_timeout * HZ / 10; + enum drbd_state_rv rv =3D SS_UNKNOWN_ERROR; + bool was_writable; + enum ioc_rv r; + int err =3D 0; + + /* Fail read-only open from systemd-udev (version <=3D 238) */ + if (!(mode & BLK_OPEN_WRITE) && !drbd_allow_oos) { + char comm[TASK_COMM_LEN]; + get_task_comm(comm, current); + if (!strcmp("systemd-udevd", comm)) + return -EACCES; + } =20 - set_capacity_and_notify(device->vdisk, size); + /* Fail read-write open early, + * in case someone explicitly set us read-only (blockdev --setro) */ + if (bdev_read_only(gd->part0) && (mode & BLK_OPEN_WRITE)) + return -EACCES; =20 - drbd_info(device, "size =3D %s (%llu KB)\n", - ppsize(ppb, size>>1), (unsigned long long)size>>1); -} + if (resource->fail_io[NOW]) + return -ENOTRECOVERABLE; =20 -void drbd_device_cleanup(struct drbd_device *device) -{ - int i; - if (first_peer_device(device)->connection->receiver.t_state !=3D NONE) - drbd_err(device, "ASSERT FAILED: receiver t_state =3D=3D %d expected 0.\= n", - first_peer_device(device)->connection->receiver.t_state); - - device->al_writ_cnt =3D - device->bm_writ_cnt =3D - device->read_cnt =3D - device->recv_cnt =3D - device->send_cnt =3D - device->writ_cnt =3D - device->p_size =3D - device->rs_start =3D - device->rs_total =3D - device->rs_failed =3D 0; - device->rs_last_events =3D 0; - device->rs_last_sect_ev =3D 0; - for (i =3D 0; i < DRBD_SYNC_MARKS; i++) { - device->rs_mark_left[i] =3D 0; - device->rs_mark_time[i] =3D 0; - } - D_ASSERT(device, first_peer_device(device)->connection->net_conf =3D=3D N= ULL); - - set_capacity_and_notify(device->vdisk, 0); - if (device->bitmap) { - /* maybe never allocated. */ - drbd_bm_resize(device, 0, 1); - drbd_bm_cleanup(device); + kref_get(&device->kref); + + mutex_lock(&resource->open_release); + was_writable =3D device->writable; + + timeout =3D wait_event_interruptible_timeout(resource->twopc_wait, + (r =3D inc_open_count(device, mode)), + timeout); + + if (r =3D=3D IOC_ABORT || (r =3D=3D IOC_SLEEP && timeout <=3D 0)) { + mutex_unlock(&resource->open_release); + + kref_put(&device->kref, drbd_destroy_device); + return -EAGAIN; } =20 - drbd_backing_dev_free(device, device->ldev); - device->ldev =3D NULL; + if (resource->res_opts.auto_promote) { + /* Allow opening in read-only mode on an unconnected secondary. + This avoids split brain when the drbd volume gets opened + temporarily by udev while it scans for PV signatures. */ + + if (mode & BLK_OPEN_WRITE) { + if (resource->role[NOW] =3D=3D R_SECONDARY) { + rv =3D try_to_promote(device, timeout, (mode & BLK_OPEN_NDELAY)); + if (rv < SS_SUCCESS) + drbd_info(resource, "Auto-promote failed: %s (%d)\n", + drbd_set_st_err_str(rv), rv); + } + } else if ((mode & BLK_OPEN_NDELAY) =3D=3D 0) { + /* Double check peers + * + * Some services may try to first open ro, and only if that + * works open rw. An attempt to failover immediately after + * primary crash, before DRBD has noticed that the primary peer + * is gone, would result in open failure, thus failure to take + * over services. */ + err =3D ro_open_cond(device); + if (err =3D=3D -EMEDIUMTYPE) { + drbd_check_peers(resource); + err =3D -EAGAIN; + } + if (err =3D=3D -EAGAIN) { + wait_event_interruptible_timeout(resource->state_wait, + ro_open_cond(device) !=3D -EAGAIN, + resource->res_opts.auto_promote_timeout * HZ / 10); + } + } + } else if (resource->role[NOW] !=3D R_PRIMARY && + !(mode & BLK_OPEN_WRITE) && !drbd_allow_oos) { + err =3D -EMEDIUMTYPE; + goto out; + } =20 - clear_bit(AL_SUSPENDED, &device->flags); + if (test_bit(UNREGISTERED, &device->flags)) { + err =3D -ENODEV; + } else if (mode & BLK_OPEN_WRITE) { + if (resource->role[NOW] !=3D R_PRIMARY) + err =3D rv =3D=3D SS_INTERRUPTED ? -ERESTARTSYS : -EROFS; + } else /* READ access only */ { + err =3D ro_open_cond(device); + } +out: + /* still keep mutex, but release ASAP */ + if (!err) { + add_opener(device, rv >=3D SS_SUCCESS); + /* Only interested in first open and last close. */ + if (device->open_cnt =3D=3D 1) { + struct device_info info; + + device_to_info(&info, device); + mutex_lock(¬ification_mutex); + notify_device_state(NULL, 0, device, &info, NOTIFY_CHANGE); + mutex_unlock(¬ification_mutex); + } + } else + device->writable =3D was_writable; =20 - D_ASSERT(device, list_empty(&device->active_ee)); - D_ASSERT(device, list_empty(&device->sync_ee)); - D_ASSERT(device, list_empty(&device->done_ee)); - D_ASSERT(device, list_empty(&device->read_ee)); - D_ASSERT(device, list_empty(&device->resync_reads)); - D_ASSERT(device, list_empty(&first_peer_device(device)->connection->sende= r_work.q)); - D_ASSERT(device, list_empty(&device->resync_work.list)); - D_ASSERT(device, list_empty(&device->unplug_work.list)); + mutex_unlock(&resource->open_release); + if (err) { + drbd_release(gd); + if (err =3D=3D -EAGAIN && !(mode & BLK_OPEN_NDELAY)) + err =3D -EMEDIUMTYPE; + } =20 - drbd_set_defaults(device); + return err; } =20 +void drbd_open_counts(struct drbd_resource *resource, int *rw_count_ptr, i= nt *ro_count_ptr) +{ + struct drbd_device *device; + int vnr, rw_count =3D 0, ro_count =3D 0; + + rcu_read_lock(); + idr_for_each_entry(&resource->devices, device, vnr) { + if (device->writable) + rw_count +=3D device->open_cnt; + else + ro_count +=3D device->open_cnt; + } + rcu_read_unlock(); + *rw_count_ptr =3D rw_count; + *ro_count_ptr =3D ro_count; +} =20 -static void drbd_destroy_mempools(void) +static void wait_for_peer_disk_updates(struct drbd_resource *resource) +{ + struct drbd_peer_device *peer_device; + struct drbd_device *device; + int vnr; + +restart: + rcu_read_lock(); + idr_for_each_entry(&resource->devices, device, vnr) { + for_each_peer_device_rcu(peer_device, device) { + if (test_bit(GOT_NEG_ACK, &peer_device->flags)) { + clear_bit(GOT_NEG_ACK, &peer_device->flags); + rcu_read_unlock(); + wait_event(resource->state_wait, peer_device->disk_state[NOW] < D_UP_T= O_DATE); + goto restart; + } + } + } + rcu_read_unlock(); +} + +static void drbd_fsync_device(struct drbd_device *device) +{ + struct drbd_resource *resource =3D device->resource; + + sync_blockdev(device->vdisk->part0); + /* Prevent writes occurring after demotion, at least + * the writes already submitted in this context. This + * covers the case where DRBD auto-demotes on release, + * which is important because it often occurs + * immediately after a write. */ + wait_event(device->misc_wait, !atomic_read(&device->ap_bio_cnt[WRITE])); + + if (start_new_tl_epoch(resource)) { + struct drbd_connection *connection; + u64 im; + + for_each_connection_ref(connection, im, resource) + drbd_flush_workqueue(&connection->sender_work); + } + wait_event(resource->barrier_wait, !barrier_pending(resource)); + /* After waiting for pending barriers, we got any possible NEG_ACKs, + and see them in wait_for_peer_disk_updates() */ + wait_for_peer_disk_updates(resource); + + /* In case switching from R_PRIMARY to R_SECONDARY works + out, there is no rw opener at this point. Thus, no new + writes can come in. -> Flushing queued peer acks is + necessary and sufficient. + The cluster wide role change required packets to be + received by the sender. -> We can be sure that the + peer_acks queued on a sender's TODO list go out before + we send the two phase commit packet. + */ + drbd_flush_peer_acks(resource); +} + +static void drbd_release(struct gendisk *gd) +{ + struct drbd_device *device =3D gd->private_data; + struct drbd_resource *resource =3D device->resource; + int open_rw_cnt, open_ro_cnt; + + mutex_lock(&resource->open_release); + /* The last one to close already called sync_blockdevice(), generic + * bdev_release() respectively blkdev_put_whole() takes care of that. + * We still want our side effects of drbd_fsync_device(): + * wait until all peers confirmed they have all the data, regardless of + * replication protocol, even if that is asynchronous. + * Still, do it before decreasing the open_cnt, just in case, so we + * won't confuse drbd_reject_write_early() or other code paths that may + * check for open_cnt !=3D 0 when they see write requests. + */ + if (device->writable && device->open_cnt =3D=3D 1) { + drbd_fsync_device(device); + device->writable =3D false; + } + device->open_cnt--; + drbd_open_counts(resource, &open_rw_cnt, &open_ro_cnt); + + if (open_ro_cnt =3D=3D 0) + wake_up_all(&resource->state_wait); + + if (test_bit(UNREGISTERED, &device->flags) && device->open_cnt =3D=3D 0 && + !test_and_set_bit(DESTROYING_DEV, &device->flags)) + call_rcu(&device->rcu, drbd_reclaim_device); + + if (resource->res_opts.auto_promote && + open_rw_cnt =3D=3D 0 && + resource->role[NOW] =3D=3D R_PRIMARY && + !test_bit(EXPLICIT_PRIMARY, &resource->flags)) { + union comm_pid_tag_buf tag; + sigset_t mask, oldmask; + int rv; + + snprintf_current_comm_pid_tag(&tag, "auto-demote"); + + /* + * Auto-demote is triggered by the last opener releasing the + * DRBD device. However, it is an implicit action, so it should + * not be affected by the state of the process. In particular, + * it should ignore any pending signals. It may be the case + * that the process is releasing DRBD because it is being + * terminated using a signal. + */ + sigfillset(&mask); + sigprocmask(SIG_BLOCK, &mask, &oldmask); + + rv =3D drbd_set_role(resource, R_SECONDARY, false, tag.buf, NULL); + if (rv < SS_SUCCESS) + drbd_warn(resource, "Auto-demote failed: %s (%d)\n", + drbd_set_st_err_str(rv), rv); + + sigprocmask(SIG_SETMASK, &oldmask, NULL); + } + + if (open_ro_cnt =3D=3D 0 && open_rw_cnt =3D=3D 0 && resource->fail_io[NOW= ]) { + unsigned long irq_flags; + + begin_state_change(resource, &irq_flags, CS_VERBOSE); + resource->fail_io[NEW] =3D false; + end_state_change(resource, &irq_flags, "release"); + } + + /* if the open count is 0, we free the whole list, otherwise we remove th= e specific pid */ + prune_or_free_openers(device, (device->open_cnt =3D=3D 0) ? 0 : task_pid_= nr(current)); + if (open_rw_cnt =3D=3D 0 && open_ro_cnt =3D=3D 0 && resource->auto_promot= ed_by.pid !=3D 0) + memset(&resource->auto_promoted_by, 0, sizeof(resource->auto_promoted_by= )); + if (device->open_cnt =3D=3D 0) { + struct device_info info; + + device_to_info(&info, device); + mutex_lock(¬ification_mutex); + notify_device_state(NULL, 0, device, &info, NOTIFY_CHANGE); + mutex_unlock(¬ification_mutex); + } + mutex_unlock(&resource->open_release); + + kref_put(&device->kref, drbd_destroy_device); /* might destroy the resou= rce as well */ +} + +static void drbd_remove_all_paths(struct drbd_connection *connection) +{ + struct drbd_resource *resource =3D connection->resource; + struct drbd_transport *transport =3D &connection->transport; + struct drbd_path *path, *tmp; + + lockdep_assert_held(&resource->conf_update); + + list_for_each_entry(path, &transport->paths, list) + set_bit(TR_UNREGISTERED, &path->flags); + + /* Ensure flag visible before list manipulation. */ + smp_wmb(); + + list_for_each_entry_safe(path, tmp, &transport->paths, list) { + /* Exclusive with reading state, in particular remember_state_change() */ + write_lock_irq(&resource->state_rwlock); + list_del_rcu(&path->list); + write_unlock_irq(&resource->state_rwlock); + + transport->class->ops.remove_path(path); + notify_path(connection, path, NOTIFY_DESTROY); + call_rcu(&path->rcu, drbd_reclaim_path); + } +} + +/** __drbd_net_exit is called when a network namespace is removed. + * + * For DRBD this means it needs to remove any sockets assigned to that nam= espace, + * i.e. it needs to disconnect some connections. It also needs to remove t= hose + * paths associated with the to be removed namespace, so the connection ca= n be + * reconfigured from a new namespace. + */ +static void __net_exit __drbd_net_exit(struct net *net) +{ + struct drbd_resource *resource; + struct drbd_connection *connection, *n; + enum drbd_state_rv rv; + LIST_HEAD(connections_wait_list); + + /* Disconnect and removal of paths works in 3 steps: + * 1. Find all connections associated with the namespace, add it to a sep= arate list. + * 2. Iterate over all connections in the new list and start the disconne= ct. + * 3. Iterate again over all connections, waiting for them to disconnect = and remove the path configuration.*/ + + /* Step 1 */ + rcu_read_lock(); + for_each_resource_rcu(resource, &drbd_resources) { + for_each_connection_rcu(connection, resource) { + /* We don't have to worry about any races here: + * For a connection to be "missed", it would need to be configured + * from the namespace to be removed. Since netlink does keep the + * namespace alive for the duration of it's connection, we can + * assume the namespace assignment can no longer be changed. */ + if (net_eq(net, drbd_net_assigned_to_connection(connection))) { + drbd_info(connection, "Disconnect because network namespace is exiting= \n"); + + kref_get(&connection->kref); + + list_add(&connection->remove_net_list, &connections_wait_list); + } + } + } + rcu_read_unlock(); + + /* Step 2 */ + list_for_each_entry(connection, &connections_wait_list, remove_net_list) { + /* We just start the disconnect here. We have to use force=3Dtrue here, + * otherwise the disconnect might fail waiting for some openers to disap= pear. + * + * Actually waiting for the disconnect is relegated to step 3, so we dis= connect + * in parallel. */ + rv =3D change_cstate(connection, C_DISCONNECTING, CS_HARD); + if (rv < SS_SUCCESS && rv !=3D SS_ALREADY_STANDALONE) + drbd_err(connection, "Failed to disconnect: %s\n", drbd_set_st_err_str(= rv)); + } + + /* Step 3 */ + list_for_each_entry_safe(connection, n, &connections_wait_list, remove_ne= t_list) { + list_del_init(&connection->remove_net_list); + + /* Wait here for StandAlone: a path can only be removed if it's not esta= blished */ + wait_event(connection->resource->state_wait, connection->cstate[NOW] =3D= =3D C_STANDALONE); + + mutex_lock(&connection->resource->adm_mutex); + mutex_lock(&connection->resource->conf_update); + drbd_remove_all_paths(connection); + mutex_unlock(&connection->resource->conf_update); + mutex_unlock(&connection->resource->adm_mutex); + + kref_put(&connection->kref, drbd_destroy_connection); + } +} + +void drbd_queue_unplug(struct drbd_device *device) +{ + struct drbd_resource *resource =3D device->resource; + struct drbd_connection *connection; + u64 dagtag_sector; + + dagtag_sector =3D resource->dagtag_sector; + + rcu_read_lock(); + for_each_connection_rcu(connection, resource) { + /* use the "next" slot */ + unsigned int i =3D !connection->todo.unplug_slot; + connection->todo.unplug_dagtag_sector[i] =3D dagtag_sector; + wake_up(&connection->sender_work.q_wait); + } + rcu_read_unlock(); +} + +static void drbd_set_defaults(struct drbd_device *device) { - /* D_ASSERT(device, atomic_read(&drbd_pp_vacant)=3D=3D0); */ + device->disk_state[NOW] =3D D_DISKLESS; +} =20 +static void drbd_destroy_mempools(void) +{ bioset_exit(&drbd_io_bio_set); bioset_exit(&drbd_md_io_bio_set); mempool_exit(&drbd_buffer_page_pool); mempool_exit(&drbd_md_io_page_pool); mempool_exit(&drbd_ee_mempool); mempool_exit(&drbd_request_mempool); - kmem_cache_destroy(drbd_ee_cache); - kmem_cache_destroy(drbd_request_cache); - kmem_cache_destroy(drbd_bm_ext_cache); - kmem_cache_destroy(drbd_al_ext_cache); + if (drbd_ee_cache) + kmem_cache_destroy(drbd_ee_cache); + if (drbd_request_cache) + kmem_cache_destroy(drbd_request_cache); + if (drbd_al_ext_cache) + kmem_cache_destroy(drbd_al_ext_cache); =20 drbd_ee_cache =3D NULL; drbd_request_cache =3D NULL; - drbd_bm_ext_cache =3D NULL; drbd_al_ext_cache =3D NULL; =20 return; @@ -2090,11 +3249,6 @@ static int drbd_create_mempools(void) if (drbd_ee_cache =3D=3D NULL) goto Enomem; =20 - drbd_bm_ext_cache =3D kmem_cache_create( - "drbd_bm", sizeof(struct bm_extent), 0, 0, NULL); - if (drbd_bm_ext_cache =3D=3D NULL) - goto Enomem; - drbd_al_ext_cache =3D kmem_cache_create( "drbd_al", sizeof(struct lc_element), 0, 0, NULL); if (drbd_al_ext_cache =3D=3D NULL) @@ -2113,7 +3267,6 @@ static int drbd_create_mempools(void) ret =3D mempool_init_page_pool(&drbd_md_io_page_pool, DRBD_MIN_POOL_PAGES= , 0); if (ret) goto Enomem; - ret =3D mempool_init_page_pool(&drbd_buffer_page_pool, number, 0); if (ret) goto Enomem; @@ -2134,70 +3287,77 @@ static int drbd_create_mempools(void) return -ENOMEM; } =20 -static void drbd_release_all_peer_reqs(struct drbd_device *device) +static void free_peer_device(struct drbd_peer_device *peer_device) { - int rr; + if (test_and_clear_bit(HOLDING_UUID_READ_LOCK, &peer_device->flags)) + up_read_non_owner(&peer_device->device->uuid_sem); =20 - rr =3D drbd_free_peer_reqs(device, &device->active_ee); - if (rr) - drbd_err(device, "%d EEs in active list found!\n", rr); + kfree(peer_device->rs_plan_s); + kfree(peer_device->conf); + kfree(peer_device); +} =20 - rr =3D drbd_free_peer_reqs(device, &device->sync_ee); - if (rr) - drbd_err(device, "%d EEs in sync list found!\n", rr); +static void drbd_device_finalize_work_fn(struct work_struct *work) +{ + struct drbd_device *device =3D container_of(work, struct drbd_device, fin= alize_work); + struct drbd_resource *resource =3D device->resource; =20 - rr =3D drbd_free_peer_reqs(device, &device->read_ee); - if (rr) - drbd_err(device, "%d EEs in read list found!\n", rr); + /* ldev_safe: no other contexts can access */ + drbd_bm_free(device); =20 - rr =3D drbd_free_peer_reqs(device, &device->done_ee); - if (rr) - drbd_err(device, "%d EEs in done list found!\n", rr); + put_disk(device->vdisk); + + kfree(device); + + kref_put(&resource->kref, drbd_destroy_resource); } =20 -/* caution. no locking. */ +/* may not sleep, called from call_rcu. */ void drbd_destroy_device(struct kref *kref) { struct drbd_device *device =3D container_of(kref, struct drbd_device, kre= f); - struct drbd_resource *resource =3D device->resource; - struct drbd_peer_device *peer_device, *tmp_peer_device; - - timer_shutdown_sync(&device->request_timer); - - /* paranoia asserts */ - D_ASSERT(device, device->open_cnt =3D=3D 0); - /* end paranoia asserts */ + struct drbd_peer_device *peer_device, *tmp; =20 /* cleanup stuff that may have been allocated during * device (re-)configuration or state changes */ =20 - drbd_backing_dev_free(device, device->ldev); - device->ldev =3D NULL; +#ifdef CONFIG_DRBD_COMPAT_84 + if (device->resource->res_opts.drbd8_compat_mode) + atomic_dec(&nr_drbd8_devices); +#endif =20 - drbd_release_all_peer_reqs(device); + free_openers(device); =20 lc_destroy(device->act_log); - lc_destroy(device->resync); - - kfree(device->p_uuid); - /* device->p_uuid =3D NULL; */ + for_each_peer_device_safe(peer_device, tmp, device) { + kref_put(&peer_device->connection->kref, drbd_destroy_connection); + free_peer_device(peer_device); + } =20 - if (device->bitmap) /* should no longer be there. */ - drbd_bm_cleanup(device); __free_page(device->md_io.page); - put_disk(device->vdisk); - kfree(device->rs_plan_s); =20 - /* not for_each_connection(connection, resource): - * those may have been cleaned up and disassociated already. - */ - for_each_peer_device_safe(peer_device, tmp_peer_device, device) { - kref_put(&peer_device->connection->kref, drbd_destroy_connection); - kfree(peer_device); - } - if (device->submit.wq) - destroy_workqueue(device->submit.wq); - kfree(device); + INIT_WORK(&device->finalize_work, drbd_device_finalize_work_fn); + schedule_work(&device->finalize_work); +} + +void drbd_destroy_resource(struct kref *kref) +{ + struct drbd_resource *resource =3D container_of(kref, struct drbd_resourc= e, kref); + + idr_destroy(&resource->devices); + free_cpumask_var(resource->cpu_mask); + kfree(resource->name); + kfree(resource); + module_put(THIS_MODULE); +} + +void drbd_reclaim_resource(struct rcu_head *rp) +{ + struct drbd_resource *resource =3D container_of(rp, struct drbd_resource,= rcu); + + drbd_thread_stop_nowait(&resource->worker); + + mempool_free(resource->peer_ack_req, &drbd_request_mempool); kref_put(&resource->kref, drbd_destroy_resource); } =20 @@ -2222,96 +3382,88 @@ static void do_retry(struct work_struct *ws) list_splice_init(&retry->writes, &writes); spin_unlock_irq(&retry->lock); =20 - list_for_each_entry_safe(req, tmp, &writes, tl_requests) { + list_for_each_entry_safe(req, tmp, &writes, list) { struct drbd_device *device =3D req->device; + struct drbd_resource *resource =3D device->resource; struct bio *bio =3D req->master_bio; + unsigned long start_jif =3D req->start_jif; bool expected; + ktime_get_accounting_assign(ktime_t start_kt, req->start_kt); =20 + + /* No locking when accessing local_rq_state & net_rq_state, since + * this request is not active at the moment. */ expected =3D expect(device, atomic_read(&req->completion_ref) =3D=3D 0) && - expect(device, req->rq_state & RQ_POSTPONED) && - expect(device, (req->rq_state & RQ_LOCAL_PENDING) =3D=3D 0 || - (req->rq_state & RQ_LOCAL_ABORTED) !=3D 0); + expect(device, req->local_rq_state & RQ_POSTPONED) && + expect(device, (req->local_rq_state & RQ_LOCAL_PENDING) =3D=3D 0 || + (req->local_rq_state & RQ_LOCAL_ABORTED) !=3D 0); =20 if (!expected) drbd_err(device, "req=3D%p completion_ref=3D%d rq_state=3D%x\n", req, atomic_read(&req->completion_ref), - req->rq_state); + req->local_rq_state); =20 - /* We still need to put one kref associated with the + /* We still need to put one done reference associated with the * "completion_ref" going zero in the code path that queued it * here. The request object may still be referenced by a * frozen local req->private_bio, in case we force-detached. */ - kref_put(&req->kref, drbd_req_destroy); + read_lock_irq(&resource->state_rwlock); + drbd_put_ref_tl_walk(req, 1, 0); + read_unlock_irq(&resource->state_rwlock); =20 /* A single suspended or otherwise blocking device may stall - * all others as well. Fortunately, this code path is to - * recover from a situation that "should not happen": - * concurrent writes in multi-primary setup. - * In a "normal" lifecycle, this workqueue is supposed to be - * destroyed without ever doing anything. - * If it turns out to be an issue anyways, we can do per + * all others as well. This code path is to recover from a + * situation that "should not happen": concurrent writes in + * multi-primary setup. It is also used for retrying failed + * reads. If it turns out to be an issue, we can do per * resource (replication group) or per device (minor) retry * workqueues instead. */ =20 /* We are not just doing submit_bio_noacct(), * as we want to keep the start_time information. */ - inc_ap_bio(device); - __drbd_make_request(device, bio); + __drbd_make_request(device, bio, start_kt, start_jif); } } =20 -/* called via drbd_req_put_completion_ref(), - * holds resource->req_lock */ +/* called via drbd_req_put_completion_ref() */ void drbd_restart_request(struct drbd_request *req) { + struct drbd_device *device =3D req->device; + struct drbd_resource *resource =3D device->resource; + bool susp =3D drbd_suspended(device); unsigned long flags; + spin_lock_irqsave(&retry.lock, flags); - list_move_tail(&req->tl_requests, &retry.writes); + list_move_tail(&req->list, susp ? &resource->suspended_reqs : &retry.writ= es); spin_unlock_irqrestore(&retry.lock, flags); =20 /* Drop the extra reference that would otherwise * have been dropped by complete_master_bio. * do_retry() needs to grab a new one. */ - dec_ap_bio(req->device); + dec_ap_bio(req->device, bio_data_dir(req->master_bio)); =20 - queue_work(retry.wq, &retry.worker); + if (!susp) + queue_work(retry.wq, &retry.worker); } =20 -void drbd_destroy_resource(struct kref *kref) +void drbd_restart_suspended_reqs(struct drbd_resource *resource) { - struct drbd_resource *resource =3D - container_of(kref, struct drbd_resource, kref); - - idr_destroy(&resource->devices); - free_cpumask_var(resource->cpu_mask); - kfree(resource->name); - kfree(resource); -} + unsigned long flags; =20 -void drbd_free_resource(struct drbd_resource *resource) -{ - struct drbd_connection *connection, *tmp; + spin_lock_irqsave(&retry.lock, flags); + list_splice_init(&resource->suspended_reqs, &retry.writes); + spin_unlock_irqrestore(&retry.lock, flags); =20 - for_each_connection_safe(connection, tmp, resource) { - list_del(&connection->connections); - drbd_debugfs_connection_cleanup(connection); - kref_put(&connection->kref, drbd_destroy_connection); - } - drbd_debugfs_resource_cleanup(resource); - kref_put(&resource->kref, drbd_destroy_resource); + queue_work(retry.wq, &retry.worker); } =20 static void drbd_cleanup(void) { - unsigned int i; - struct drbd_device *device; - struct drbd_resource *resource, *tmp; - /* first remove proc, - * drbdsetup uses it's presence to detect + * drbdsetup uses its presence to detect * whether DRBD is loaded. * If we would get stuck in proc removal, * but have netlink already deregistered, @@ -2325,19 +3477,13 @@ static void drbd_cleanup(void) destroy_workqueue(retry.wq); =20 drbd_genl_unregister(); - - idr_for_each_entry(&drbd_devices, device, i) - drbd_delete_device(device); - - /* not _rcu since, no other updater anymore. Genl already unregistered */ - for_each_resource_safe(resource, tmp, &drbd_resources) { - list_del(&resource->resources); - drbd_free_resource(resource); - } - drbd_debugfs_cleanup(); =20 + unregister_pernet_device(&drbd_pernet_ops); + drbd_destroy_mempools(); + if (ping_ack_sender) + destroy_workqueue(ping_ack_sender); unregister_blkdev(DRBD_MAJOR, "drbd"); =20 idr_destroy(&drbd_devices); @@ -2366,6 +3512,16 @@ static int w_complete(struct drbd_work *w, int cance= l) return 0; } =20 +void drbd_queue_work(struct drbd_work_queue *q, struct drbd_work *w) +{ + unsigned long flags; + + spin_lock_irqsave(&q->q_lock, flags); + list_add_tail(&w->list, &q->q); + spin_unlock_irqrestore(&q->q_lock, flags); + wake_up(&q->q_wait); +} + void drbd_flush_workqueue(struct drbd_work_queue *work_queue) { struct completion_work completion_work; @@ -2376,6 +3532,23 @@ void drbd_flush_workqueue(struct drbd_work_queue *wo= rk_queue) wait_for_completion(&completion_work.done); } =20 +void drbd_flush_workqueue_interruptible(struct drbd_device *device) +{ + struct completion_work completion_work; + int err; + + completion_work.w.cb =3D w_complete; + init_completion(&completion_work.done); + drbd_queue_work(&device->resource->work, &completion_work.w); + err =3D wait_for_completion_interruptible(&completion_work.done); + if (err =3D=3D -ERESTARTSYS) { + set_bit(ABORT_MDIO, &device->flags); + wake_up_all(&device->misc_wait); + wait_for_completion(&completion_work.done); + clear_bit(ABORT_MDIO, &device->flags); + } +} + struct drbd_resource *drbd_find_resource(const char *name) { struct drbd_resource *resource; @@ -2396,51 +3569,58 @@ struct drbd_resource *drbd_find_resource(const char= *name) return resource; } =20 -struct drbd_connection *conn_get_by_addrs(void *my_addr, int my_addr_len, - void *peer_addr, int peer_addr_len) +static void drbd_put_send_buffers(struct drbd_connection *connection) { - struct drbd_resource *resource; - struct drbd_connection *connection; + unsigned int i; =20 - rcu_read_lock(); - for_each_resource_rcu(resource, &drbd_resources) { - for_each_connection_rcu(connection, resource) { - if (connection->my_addr_len =3D=3D my_addr_len && - connection->peer_addr_len =3D=3D peer_addr_len && - !memcmp(&connection->my_addr, my_addr, my_addr_len) && - !memcmp(&connection->peer_addr, peer_addr, peer_addr_len)) { - kref_get(&connection->kref); - goto found; - } + for (i =3D DATA_STREAM; i <=3D CONTROL_STREAM ; i++) { + if (connection->send_buffer[i].page) { + put_page(connection->send_buffer[i].page); + connection->send_buffer[i].page =3D NULL; } } - connection =3D NULL; -found: - rcu_read_unlock(); - return connection; } =20 -static int drbd_alloc_socket(struct drbd_socket *socket) +static int drbd_alloc_send_buffers(struct drbd_connection *connection) { - socket->rbuf =3D (void *) __get_free_page(GFP_KERNEL); - if (!socket->rbuf) - return -ENOMEM; - socket->sbuf =3D (void *) __get_free_page(GFP_KERNEL); - if (!socket->sbuf) - return -ENOMEM; + unsigned int i; + + for (i =3D DATA_STREAM; i <=3D CONTROL_STREAM ; i++) { + struct page *page; + + page =3D alloc_page(GFP_KERNEL); + if (!page) { + drbd_put_send_buffers(connection); + return -ENOMEM; + } + connection->send_buffer[i].page =3D page; + connection->send_buffer[i].unsent =3D + connection->send_buffer[i].pos =3D page_address(page); + } + return 0; } =20 -static void drbd_free_socket(struct drbd_socket *socket) +void drbd_flush_peer_acks(struct drbd_resource *resource) { - free_page((unsigned long) socket->sbuf); - free_page((unsigned long) socket->rbuf); + spin_lock_irq(&resource->peer_ack_lock); + if (resource->peer_ack_req) { + resource->last_peer_acked_dagtag =3D resource->peer_ack_req->dagtag_sect= or; + drbd_queue_peer_ack(resource, resource->peer_ack_req); + resource->peer_ack_req =3D NULL; + } + spin_unlock_irq(&resource->peer_ack_lock); } =20 -void conn_free_crypto(struct drbd_connection *connection) +static void peer_ack_timer_fn(struct timer_list *t) { - drbd_free_sock(connection); + struct drbd_resource *resource =3D timer_container_of(resource, t, peer_a= ck_timer); + + drbd_flush_peer_acks(resource); +} =20 +void conn_free_crypto(struct drbd_connection *connection) +{ crypto_free_shash(connection->csums_tfm); crypto_free_shash(connection->verify_tfm); crypto_free_shash(connection->cram_hmac_tfm); @@ -2458,11 +3638,25 @@ void conn_free_crypto(struct drbd_connection *conne= ction) connection->int_dig_vv =3D NULL; } =20 -int set_resource_options(struct drbd_resource *resource, struct res_opts *= res_opts) +static void wake_all_device_misc(struct drbd_resource *resource) +{ + struct drbd_device *device; + int vnr; + rcu_read_lock(); + idr_for_each_entry(&resource->devices, device, vnr) + wake_up(&device->misc_wait); + rcu_read_unlock(); +} + +int set_resource_options(struct drbd_resource *resource, struct res_opts *= res_opts, const char *tag) { struct drbd_connection *connection; cpumask_var_t new_cpu_mask; int err; + bool wake_device_misc =3D false; + bool force_state_recalc =3D false; + unsigned long irq_flags; + struct res_opts *old_opts =3D &resource->res_opts; =20 if (!zalloc_cpumask_var(&new_cpu_mask, GFP_KERNEL)) return -ENOMEM; @@ -2491,26 +3685,47 @@ int set_resource_options(struct drbd_resource *reso= urce, struct res_opts *res_op goto fail; } } + if (res_opts->nr_requests < DRBD_NR_REQUESTS_MIN) + res_opts->nr_requests =3D DRBD_NR_REQUESTS_MIN; + + if (old_opts->quorum !=3D res_opts->quorum || + old_opts->on_no_quorum !=3D res_opts->on_no_quorum) + force_state_recalc =3D true; + + if (resource->res_opts.nr_requests < res_opts->nr_requests) + wake_device_misc =3D true; + resource->res_opts =3D *res_opts; if (cpumask_empty(new_cpu_mask)) drbd_calc_cpu_mask(&new_cpu_mask); if (!cpumask_equal(resource->cpu_mask, new_cpu_mask)) { cpumask_copy(resource->cpu_mask, new_cpu_mask); + resource->worker.reset_cpu_mask =3D 1; + rcu_read_lock(); for_each_connection_rcu(connection, resource) { connection->receiver.reset_cpu_mask =3D 1; - connection->ack_receiver.reset_cpu_mask =3D 1; - connection->worker.reset_cpu_mask =3D 1; + connection->sender.reset_cpu_mask =3D 1; } + rcu_read_unlock(); } err =3D 0; =20 + if (force_state_recalc) { + begin_state_change(resource, &irq_flags, CS_VERBOSE | CS_FORCE_RECALC); + end_state_change(resource, &irq_flags, tag); + } + + if (wake_device_misc) + wake_all_device_misc(resource); + fail: free_cpumask_var(new_cpu_mask); return err; =20 } =20 -struct drbd_resource *drbd_create_resource(const char *name) +struct drbd_resource *drbd_create_resource(const char *name, + struct res_opts *res_opts) { struct drbd_resource *resource; =20 @@ -2525,12 +3740,52 @@ struct drbd_resource *drbd_create_resource(const ch= ar *name) kref_init(&resource->kref); idr_init(&resource->devices); INIT_LIST_HEAD(&resource->connections); - resource->write_ordering =3D WO_BDEV_FLUSH; - list_add_tail_rcu(&resource->resources, &drbd_resources); + spin_lock_init(&resource->tl_update_lock); + INIT_LIST_HEAD(&resource->transfer_log); + spin_lock_init(&resource->peer_ack_lock); + INIT_LIST_HEAD(&resource->peer_ack_req_list); + INIT_LIST_HEAD(&resource->peer_ack_list); + INIT_LIST_HEAD(&resource->peer_ack_work.list); + resource->peer_ack_work.cb =3D w_queue_peer_ack; + timer_setup(&resource->peer_ack_timer, peer_ack_timer_fn, 0); + spin_lock_init(&resource->initiator_flush_lock); + sema_init(&resource->state_sem, 1); + resource->role[NOW] =3D R_SECONDARY; + resource->max_node_id =3D res_opts->drbd8_compat_mode ? 1 : res_opts->nod= e_id; + resource->twopc_reply.initiator_node_id =3D -1; mutex_init(&resource->conf_update); mutex_init(&resource->adm_mutex); - spin_lock_init(&resource->req_lock); + mutex_init(&resource->open_release); + rwlock_init(&resource->state_rwlock); + INIT_LIST_HEAD(&resource->listeners); + spin_lock_init(&resource->listeners_lock); + init_waitqueue_head(&resource->state_wait); + init_waitqueue_head(&resource->twopc_wait); + init_waitqueue_head(&resource->barrier_wait); + timer_setup(&resource->twopc_timer, twopc_timer_fn, 0); + INIT_WORK(&resource->twopc_work, nested_twopc_work); + drbd_init_workqueue(&resource->work); + drbd_thread_init(resource, &resource->worker, drbd_worker, "worker"); + spin_lock_init(&resource->current_tle_lock); drbd_debugfs_resource_add(resource); + resource->cached_min_aggreed_protocol_version =3D drbd_protocol_version_m= in; + /* members is a bit mask of the "seen" nodes in this resource. + * In drbd8 compatibility mode, we only have one peer, so we can + * set this to 1. */ + resource->members =3D res_opts->drbd8_compat_mode ? 1 : NODE_MASK(res_opt= s->node_id); + INIT_WORK(&resource->empty_twopc, drbd_empty_twopc_work_fn); + INIT_LIST_HEAD(&resource->suspended_reqs); + + ratelimit_state_init(&resource->ratelimit[D_RL_R_GENERIC], 5*HZ, 10); + + + if (set_resource_options(resource, res_opts, "create-resource")) + goto fail_free_name; + + drbd_thread_start(&resource->worker); + + list_add_tail_rcu(&resource->resources, &drbd_resources); + return resource; =20 fail_free_name: @@ -2542,128 +3797,291 @@ struct drbd_resource *drbd_create_resource(const = char *name) } =20 /* caller must be under adm_mutex */ -struct drbd_connection *conn_create(const char *name, struct res_opts *res= _opts) +struct drbd_connection *drbd_create_connection(struct drbd_resource *resou= rce, + struct drbd_transport_class *tc) { - struct drbd_resource *resource; struct drbd_connection *connection; + int size; =20 - connection =3D kzalloc_obj(struct drbd_connection); + size =3D sizeof(*connection) - sizeof(connection->transport) + tc->instan= ce_size; + connection =3D kzalloc(size, GFP_KERNEL); if (!connection) return NULL; =20 - if (drbd_alloc_socket(&connection->data)) - goto fail; - if (drbd_alloc_socket(&connection->meta)) + ratelimit_state_init(&connection->ratelimit[D_RL_C_GENERIC], 5*HZ, /* no = burst */ 1); + + if (drbd_alloc_send_buffers(connection)) goto fail; =20 connection->current_epoch =3D kzalloc_obj(struct drbd_epoch); if (!connection->current_epoch) goto fail; =20 - INIT_LIST_HEAD(&connection->transfer_log); - INIT_LIST_HEAD(&connection->current_epoch->list); connection->epochs =3D 1; spin_lock_init(&connection->epoch_lock); =20 + INIT_LIST_HEAD(&connection->todo.work_list); + connection->todo.req =3D NULL; + + atomic_set(&connection->ap_in_flight, 0); + atomic_set(&connection->rs_in_flight, 0); connection->send.seen_any_write_yet =3D false; connection->send.current_epoch_nr =3D 0; connection->send.current_epoch_writes =3D 0; + connection->send.current_dagtag_sector =3D + resource->dagtag_sector - ((BIO_MAX_VECS << PAGE_SHIFT) >> SECTOR_SHIFT)= - 1; =20 - resource =3D drbd_create_resource(name); - if (!resource) - goto fail; - - connection->cstate =3D C_STANDALONE; - mutex_init(&connection->cstate_mutex); - init_waitqueue_head(&connection->ping_wait); + connection->cstate[NOW] =3D C_STANDALONE; + connection->peer_role[NOW] =3D R_UNKNOWN; idr_init(&connection->peer_devices); =20 drbd_init_workqueue(&connection->sender_work); - mutex_init(&connection->data.mutex); - mutex_init(&connection->meta.mutex); + mutex_init(&connection->mutex[DATA_STREAM]); + mutex_init(&connection->mutex[CONTROL_STREAM]); + + INIT_LIST_HEAD(&connection->connect_timer_work.list); + timer_setup(&connection->connect_timer, connect_timer_fn, 0); =20 drbd_thread_init(resource, &connection->receiver, drbd_receiver, "receive= r"); connection->receiver.connection =3D connection; - drbd_thread_init(resource, &connection->worker, drbd_worker, "worker"); - connection->worker.connection =3D connection; - drbd_thread_init(resource, &connection->ack_receiver, drbd_ack_receiver, = "ack_recv"); - connection->ack_receiver.connection =3D connection; + drbd_thread_init(resource, &connection->sender, drbd_sender, "sender"); + connection->sender.connection =3D connection; + spin_lock_init(&connection->primary_flush_lock); + spin_lock_init(&connection->flush_ack_lock); + spin_lock_init(&connection->peer_reqs_lock); + spin_lock_init(&connection->send_oos_lock); + INIT_LIST_HEAD(&connection->peer_requests); + INIT_LIST_HEAD(&connection->peer_reads); + INIT_LIST_HEAD(&connection->send_oos); + INIT_LIST_HEAD(&connection->connections); + INIT_LIST_HEAD(&connection->done_ee); + INIT_LIST_HEAD(&connection->dagtag_wait_ee); + INIT_LIST_HEAD(&connection->remove_net_list); + init_waitqueue_head(&connection->ee_wait); =20 kref_init(&connection->kref); =20 - connection->resource =3D resource; + INIT_WORK(&connection->peer_ack_work, drbd_send_peer_ack_wf); + INIT_LIST_HEAD(&connection->send_oos_work.list); + connection->send_oos_work.cb =3D drbd_send_out_of_sync_wf; + INIT_LIST_HEAD(&connection->flush_ack_work.list); + connection->flush_ack_work.cb =3D drbd_flush_ack_wf; + INIT_WORK(&connection->send_acks_work, drbd_send_acks_wf); + INIT_WORK(&connection->send_ping_ack_work, drbd_send_ping_ack_wf); + INIT_WORK(&connection->send_ping_work, drbd_send_ping_wf); + + INIT_LIST_HEAD(&connection->send_dagtag_work.list); + connection->send_dagtag_work.cb =3D w_send_dagtag; =20 - if (set_resource_options(resource, res_opts)) - goto fail_resource; + spin_lock_init(&connection->advance_cache_ptr_lock); =20 kref_get(&resource->kref); - list_add_tail_rcu(&connection->connections, &resource->connections); - drbd_debugfs_connection_add(connection); + connection->resource =3D resource; + connection->after_reconciliation.lost_node_id =3D -1; + + connection->reassemble_buffer.buffer =3D connection->reassemble_buffer_by= tes.bytes; + + INIT_LIST_HEAD(&connection->transport.paths); + connection->transport.log_prefix =3D resource->name; + if (tc->ops.init(&connection->transport)) + goto fail; + return connection; =20 -fail_resource: - list_del(&resource->resources); - drbd_free_resource(resource); fail: + drbd_put_send_buffers(connection); kfree(connection->current_epoch); - drbd_free_socket(&connection->meta); - drbd_free_socket(&connection->data); kfree(connection); + return NULL; } =20 +/** + * drbd_transport_shutdown() - Free the transport specific members (e.g., = sockets) of a connection + * @connection: The connection to shut down + * @op: The operation. Only close the connection or destroy the whole tran= sport + * + * Must be called with conf_update held. + */ +void drbd_transport_shutdown(struct drbd_connection *connection, enum drbd= _tr_free_op op) +{ + struct drbd_transport *transport =3D &connection->transport; + + lockdep_assert_held(&connection->resource->conf_update); + + mutex_lock(&connection->mutex[DATA_STREAM]); + mutex_lock(&connection->mutex[CONTROL_STREAM]); + + /* Ignore send errors, if any: we are shutting down. */ + flush_send_buffer(connection, DATA_STREAM); + flush_send_buffer(connection, CONTROL_STREAM); + + /* Holding conf_update ensures that paths list is not modified concurrent= ly. */ + transport->class->ops.free(transport, op); + if (op =3D=3D DESTROY_TRANSPORT) { + drbd_remove_all_paths(connection); + + /* Wait for the delayed drbd_reclaim_path() calls. */ + rcu_barrier(); + drbd_put_transport_class(transport->class); + } + + mutex_unlock(&connection->mutex[CONTROL_STREAM]); + mutex_unlock(&connection->mutex[DATA_STREAM]); +} + +void drbd_destroy_path(struct kref *kref) +{ + struct drbd_path *path =3D container_of(kref, struct drbd_path, kref); + struct drbd_connection *connection =3D + container_of(path->transport, struct drbd_connection, transport); + + kref_put(&connection->kref, drbd_destroy_connection); + kfree(path); +} + void drbd_destroy_connection(struct kref *kref) { struct drbd_connection *connection =3D container_of(kref, struct drbd_con= nection, kref); struct drbd_resource *resource =3D connection->resource; + struct drbd_peer_device *peer_device; + int vnr; =20 if (atomic_read(&connection->current_epoch->epoch_size) !=3D 0) drbd_err(connection, "epoch_size:%d\n", atomic_read(&connection->current= _epoch->epoch_size)); kfree(connection->current_epoch); =20 + idr_for_each_entry(&connection->peer_devices, peer_device, vnr) { + struct drbd_device *device =3D peer_device->device; + free_peer_device(peer_device); + kref_put(&device->kref, drbd_destroy_device); + } idr_destroy(&connection->peer_devices); =20 - drbd_free_socket(&connection->meta); - drbd_free_socket(&connection->data); - kfree(connection->int_dig_in); - kfree(connection->int_dig_vv); + kfree(connection->transport.net_conf); kfree(connection); kref_put(&resource->kref, drbd_destroy_resource); } =20 +struct drbd_peer_device *create_peer_device(struct drbd_device *device, st= ruct drbd_connection *connection) +{ + struct drbd_peer_device *peer_device; + int err; + + peer_device =3D kzalloc_obj(struct drbd_peer_device); + if (!peer_device) + return NULL; + + peer_device->connection =3D connection; + peer_device->device =3D device; + peer_device->disk_state[NOW] =3D D_UNKNOWN; + peer_device->repl_state[NOW] =3D L_OFF; + peer_device->replication[NOW] =3D true; + peer_device->peer_replication[NOW] =3D true; + spin_lock_init(&peer_device->peer_seq_lock); + + ratelimit_state_init(&peer_device->ratelimit[D_RL_PD_GENERIC], 5*HZ, /* n= o burst */ 1); + + err =3D drbd_create_peer_device_default_config(peer_device); + if (err) { + kfree(peer_device); + return NULL; + } + + timer_setup(&peer_device->start_resync_timer, start_resync_timer_fn, 0); + + INIT_LIST_HEAD(&peer_device->resync_work.list); + peer_device->resync_work.cb =3D w_resync_timer; + timer_setup(&peer_device->resync_timer, resync_timer_fn, 0); + + INIT_LIST_HEAD(&peer_device->propagate_uuids_work.list); + peer_device->propagate_uuids_work.cb =3D w_send_uuids; + + atomic_set(&peer_device->ap_pending_cnt, 0); + atomic_set(&peer_device->unacked_cnt, 0); + atomic_set(&peer_device->rs_pending_cnt, 0); + + INIT_LIST_HEAD(&peer_device->resync_requests); + + atomic_set(&peer_device->rs_sect_in, 0); + + peer_device->bitmap_index =3D -1; + peer_device->resync_finished_pdsk =3D D_UNKNOWN; + + peer_device->q_limits.physical_block_size =3D SECTOR_SIZE; + peer_device->q_limits.logical_block_size =3D SECTOR_SIZE; + peer_device->q_limits.alignment_offset =3D 0; + peer_device->q_limits.io_min =3D SECTOR_SIZE; + peer_device->q_limits.io_opt =3D PAGE_SIZE; + peer_device->q_limits.max_bio_size =3D DRBD_MAX_BIO_SIZE; + + return peer_device; +} + +static void drbd_ldev_destroy(struct work_struct *ws) +{ + struct drbd_device *device =3D container_of(ws, struct drbd_device, ldev_= destroy_work); + + /* ldev_safe: destroying the bitmap */ + drbd_bm_free(device); + lc_destroy(device->act_log); + device->act_log =3D NULL; + /* ldev_safe: destroying ldev */ + drbd_backing_dev_free(device, device->ldev); + /* ldev_safe: final teardown, no other user possible */ + device->ldev =3D NULL; + + clear_bit(GOING_DISKLESS, &device->flags); + wake_up(&device->misc_wait); + kref_put(&device->kref, drbd_destroy_device); +} + +static int init_conflict_submitter(struct drbd_device *device) +{ + /* Short name so that it is recognizable from the first 15 characters. */ + device->submit_conflict.wq =3D + alloc_ordered_workqueue("drbd%u_sc", WQ_MEM_RECLAIM, device->minor); + if (!device->submit_conflict.wq) + return -ENOMEM; + INIT_WORK(&device->submit_conflict.worker, drbd_do_submit_conflict); + INIT_LIST_HEAD(&device->submit_conflict.resync_writes); + INIT_LIST_HEAD(&device->submit_conflict.resync_reads); + INIT_LIST_HEAD(&device->submit_conflict.writes); + INIT_LIST_HEAD(&device->submit_conflict.peer_writes); + spin_lock_init(&device->submit_conflict.lock); + return 0; +} + static int init_submitter(struct drbd_device *device) { - /* opencoded create_singlethread_workqueue(), - * to be able to say "drbd%d", ..., minor */ device->submit.wq =3D alloc_ordered_workqueue("drbd%u_submit", WQ_MEM_RECLAIM, device->minor); if (!device->submit.wq) return -ENOMEM; - INIT_WORK(&device->submit.worker, do_submit); INIT_LIST_HEAD(&device->submit.writes); + INIT_LIST_HEAD(&device->submit.peer_writes); + spin_lock_init(&device->submit.lock); return 0; } =20 -enum drbd_ret_code drbd_create_device(struct drbd_config_context *adm_ctx,= unsigned int minor) +enum drbd_ret_code drbd_create_device(struct drbd_config_context *adm_ctx,= unsigned int minor, + struct device_conf *device_conf, struct drbd_device **p_device) { struct drbd_resource *resource =3D adm_ctx->resource; - struct drbd_connection *connection, *n; + struct drbd_connection *connection; struct drbd_device *device; struct drbd_peer_device *peer_device, *tmp_peer_device; struct gendisk *disk; + LIST_HEAD(peer_devices); + LIST_HEAD(tmp); int id; int vnr =3D adm_ctx->volume; enum drbd_ret_code err =3D ERR_NOMEM; - struct queue_limits lim =3D { - /* - * Setting the max_hw_sectors to an odd value of 8kibyte here. - * This triggers a max_bio_size message upon first attach or - * connect. - */ - .max_hw_sectors =3D DRBD_MAX_BIO_SIZE_SAFE >> 8, - }; + bool locked =3D false; + + lockdep_assert_held(&resource->conf_update); =20 device =3D minor_to_device(minor); if (device) @@ -2675,24 +4093,65 @@ enum drbd_ret_code drbd_create_device(struct drbd_c= onfig_context *adm_ctx, unsig return ERR_NOMEM; kref_init(&device->kref); =20 + ratelimit_state_init(&device->ratelimit[D_RL_D_GENERIC], 5*HZ, /* no burs= t */ 1); + ratelimit_state_init(&device->ratelimit[D_RL_D_METADATA], 5*HZ, 10); + ratelimit_state_init(&device->ratelimit[D_RL_D_BACKEND], 5*HZ, 10); + kref_get(&resource->kref); device->resource =3D resource; device->minor =3D minor; device->vnr =3D vnr; + device->device_conf =3D *device_conf; + + drbd_set_defaults(device); + + atomic_set(&device->ap_bio_cnt[READ], 0); + atomic_set(&device->ap_bio_cnt[WRITE], 0); + atomic_set(&device->ap_actlog_cnt, 0); + atomic_set(&device->wait_for_actlog, 0); + atomic_set(&device->wait_for_actlog_ecnt, 0); + atomic_set(&device->local_cnt, 0); + atomic_set(&device->rs_sect_ev, 0); + atomic_set(&device->md_io.in_use, 0); + +#ifdef CONFIG_DRBD_TIMING_STATS + spin_lock_init(&device->timing_lock); +#endif + spin_lock_init(&device->al_lock); + + spin_lock_init(&device->pending_completion_lock); + INIT_LIST_HEAD(&device->pending_master_completion[0]); + INIT_LIST_HEAD(&device->pending_master_completion[1]); + INIT_LIST_HEAD(&device->pending_completion[0]); + INIT_LIST_HEAD(&device->pending_completion[1]); + INIT_LIST_HEAD(&device->openers); + spin_lock_init(&device->openers_lock); + spin_lock_init(&device->peer_req_bio_completion_lock); + + atomic_set(&device->pending_bitmap_work.n, 0); + spin_lock_init(&device->pending_bitmap_work.q_lock); + INIT_LIST_HEAD(&device->pending_bitmap_work.q); + + timer_setup(&device->md_sync_timer, md_sync_timer_fn, 0); + timer_setup(&device->request_timer, request_timer_fn, 0); + + init_waitqueue_head(&device->misc_wait); + init_waitqueue_head(&device->al_wait); + init_waitqueue_head(&device->seq_wait); =20 - drbd_init_set_defaults(device); + init_rwsem(&device->uuid_sem); =20 - disk =3D blk_alloc_disk(&lim, NUMA_NO_NODE); + disk =3D blk_alloc_disk(NULL, NUMA_NO_NODE); if (IS_ERR(disk)) { err =3D PTR_ERR(disk); goto out_no_disk; } =20 + INIT_WORK(&device->ldev_destroy_work, drbd_ldev_destroy); + device->vdisk =3D disk; device->rq_queue =3D disk->queue; =20 - set_disk_ro(disk, true); - disk->major =3D DRBD_MAJOR; disk->first_minor =3D minor; disk->minors =3D 1; @@ -2705,12 +4164,39 @@ enum drbd_ret_code drbd_create_device(struct drbd_c= onfig_context *adm_ctx, unsig if (!device->md_io.page) goto out_no_io_page; =20 - if (drbd_bm_init(device)) - goto out_no_bitmap; + /* Just put in some sane default; should never be used. */ + device->last_bm_block_shift =3D BM_BLOCK_SHIFT_MIN; + + spin_lock_init(&device->interval_lock); device->read_requests =3D RB_ROOT; - device->write_requests =3D RB_ROOT; + device->requests =3D RB_ROOT; + + BUG_ON(!mutex_is_locked(&resource->conf_update)); + for_each_connection(connection, resource) { + peer_device =3D create_peer_device(device, connection); + if (!peer_device) + goto out_no_peer_device; + list_add(&peer_device->peer_devices, &peer_devices); + } + + /* Insert the new device into all idrs under state_rwlock write lock + to guarantee a consistent object model. idr_preload() doesn't help + because it can only guarantee that a single idr_alloc() will + succeed. This fails (and will be retried) if no memory is + immediately available. + Keep in mid that RCU readers might find the device in the moment + we add it to the resources->devices IDR! + */ + + INIT_LIST_HEAD(&device->peer_devices); + spin_lock_init(&device->pending_bmio_lock); + INIT_LIST_HEAD(&device->pending_bitmap_io); =20 - id =3D idr_alloc(&drbd_devices, device, minor, minor + 1, GFP_KERNEL); + locked =3D true; + write_lock_irq(&resource->state_rwlock); + spin_lock(&drbd_devices_lock); + id =3D idr_alloc(&drbd_devices, device, minor, minor + 1, GFP_NOWAIT); + spin_unlock(&drbd_devices_lock); if (id < 0) { if (id =3D=3D -ENOSPC) err =3D ERR_MINOR_OR_VOLUME_EXISTS; @@ -2718,7 +4204,7 @@ enum drbd_ret_code drbd_create_device(struct drbd_con= fig_context *adm_ctx, unsig } kref_get(&device->kref); =20 - id =3D idr_alloc(&resource->devices, device, vnr, vnr + 1, GFP_KERNEL); + id =3D idr_alloc(&resource->devices, device, vnr, vnr + 1, GFP_NOWAIT); if (id < 0) { if (id =3D=3D -ENOSPC) err =3D ERR_MINOR_OR_VOLUME_EXISTS; @@ -2726,105 +4212,219 @@ enum drbd_ret_code drbd_create_device(struct drbd= _config_context *adm_ctx, unsig } kref_get(&device->kref); =20 - INIT_LIST_HEAD(&device->peer_devices); - INIT_LIST_HEAD(&device->pending_bitmap_io); - for_each_connection(connection, resource) { - peer_device =3D kzalloc_obj(struct drbd_peer_device); - if (!peer_device) - goto out_idr_remove_from_resource; - peer_device->connection =3D connection; - peer_device->device =3D device; - - list_add(&peer_device->peer_devices, &device->peer_devices); + list_for_each_entry_safe(peer_device, tmp_peer_device, &peer_devices, pee= r_devices) { + connection =3D peer_device->connection; + id =3D idr_alloc(&connection->peer_devices, peer_device, + device->vnr, device->vnr + 1, GFP_NOWAIT); + if (id < 0) + goto out_remove_peer_device; + list_del(&peer_device->peer_devices); + list_add_rcu(&peer_device->peer_devices, &device->peer_devices); + kref_get(&connection->kref); kref_get(&device->kref); + } + write_unlock_irq(&resource->state_rwlock); + locked =3D false; =20 - id =3D idr_alloc(&connection->peer_devices, peer_device, vnr, vnr + 1, G= FP_KERNEL); - if (id < 0) { - if (id =3D=3D -ENOSPC) - err =3D ERR_INVALID_REQUEST; - goto out_idr_remove_from_resource; - } - kref_get(&connection->kref); - INIT_WORK(&peer_device->send_acks_work, drbd_send_acks_wf); + if (init_conflict_submitter(device)) { + err =3D ERR_NOMEM; + goto out_remove_peer_device; } =20 if (init_submitter(device)) { err =3D ERR_NOMEM; - goto out_idr_remove_from_resource; + goto out_remove_peer_device; } =20 err =3D add_disk(disk); if (err) - goto out_destroy_workqueue; + goto out_destroy_submitter; + device->have_quorum[OLD] =3D + device->have_quorum[NEW] =3D + (resource->res_opts.quorum =3D=3D QOU_OFF); =20 - /* inherit the connection state */ - device->state.conn =3D first_connection(resource)->cstate; - if (device->state.conn =3D=3D C_WF_REPORT_PARAMS) { - for_each_peer_device(peer_device, device) + for_each_peer_device(peer_device, device) { + connection =3D peer_device->connection; + peer_device->node_id =3D connection->peer_node_id; + + if (connection->cstate[NOW] >=3D C_CONNECTED) drbd_connected(peer_device); } - /* move to create_peer_device() */ - for_each_peer_device(peer_device, device) - drbd_debugfs_peer_device_add(peer_device); + drbd_debugfs_device_add(device); + *p_device =3D device; return NO_ERROR; =20 -out_destroy_workqueue: +out_destroy_submitter: destroy_workqueue(device->submit.wq); -out_idr_remove_from_resource: - for_each_connection_safe(connection, n, resource) { - peer_device =3D idr_remove(&connection->peer_devices, vnr); - if (peer_device) - kref_put(&connection->kref, drbd_destroy_connection); - } - for_each_peer_device_safe(peer_device, tmp_peer_device, device) { + device->submit.wq =3D NULL; +out_remove_peer_device: + list_splice_init_rcu(&device->peer_devices, &tmp, synchronize_rcu); + list_for_each_entry_safe(peer_device, tmp_peer_device, &tmp, peer_devices= ) { + struct drbd_connection *connection =3D peer_device->connection; + + idr_remove(&connection->peer_devices, device->vnr); list_del(&peer_device->peer_devices); kfree(peer_device); + kref_put(&connection->kref, drbd_destroy_connection); } idr_remove(&resource->devices, vnr); + out_idr_remove_minor: + spin_lock(&drbd_devices_lock); idr_remove(&drbd_devices, minor); - synchronize_rcu(); + spin_unlock(&drbd_devices_lock); out_no_minor_idr: - drbd_bm_cleanup(device); -out_no_bitmap: + if (locked) + write_unlock_irq(&resource->state_rwlock); + synchronize_rcu(); + +out_no_peer_device: + list_for_each_entry_safe(peer_device, tmp_peer_device, &peer_devices, pee= r_devices) { + list_del(&peer_device->peer_devices); + kfree(peer_device); + } + __free_page(device->md_io.page); out_no_io_page: put_disk(disk); out_no_disk: kref_put(&resource->kref, drbd_destroy_resource); + /* kref debugging wants an extra put, see has_refs() */ kfree(device); return err; } =20 -void drbd_delete_device(struct drbd_device *device) +/** + * drbd_unregister_device() - make a device "invisible" + * @device: DRBD device to unregister + * + * Remove the device from the drbd object model and unregister it in the + * kernel. Keep reference counts on device->kref; they are dropped in + * drbd_reclaim_device(). + */ +void drbd_unregister_device(struct drbd_device *device) { struct drbd_resource *resource =3D device->resource; struct drbd_connection *connection; struct drbd_peer_device *peer_device; =20 - /* move to free_peer_device() */ - for_each_peer_device(peer_device, device) - drbd_debugfs_peer_device_cleanup(peer_device); - drbd_debugfs_device_cleanup(device); + write_lock_irq(&resource->state_rwlock); for_each_connection(connection, resource) { idr_remove(&connection->peer_devices, device->vnr); - kref_put(&device->kref, drbd_destroy_device); } idr_remove(&resource->devices, device->vnr); - kref_put(&device->kref, drbd_destroy_device); - idr_remove(&drbd_devices, device_to_minor(device)); - kref_put(&device->kref, drbd_destroy_device); + spin_lock(&drbd_devices_lock); + idr_remove(&drbd_devices, device->minor); + spin_unlock(&drbd_devices_lock); + write_unlock_irq(&resource->state_rwlock); + + for_each_peer_device(peer_device, device) + drbd_debugfs_peer_device_cleanup(peer_device); + drbd_debugfs_device_cleanup(device); del_gendisk(device->vdisk); - synchronize_rcu(); - kref_put(&device->kref, drbd_destroy_device); + + destroy_workqueue(device->submit_conflict.wq); + device->submit_conflict.wq =3D NULL; + destroy_workqueue(device->submit.wq); + device->submit.wq =3D NULL; + timer_shutdown_sync(&device->request_timer); +} + +void drbd_reclaim_device(struct rcu_head *rp) +{ + struct drbd_device *device =3D container_of(rp, struct drbd_device, rcu); + struct drbd_peer_device *peer_device; + int i; + + for_each_peer_device(peer_device, device) { + kref_put(&device->kref, drbd_destroy_device); + } + + for (i =3D 0; i < 3; i++) { + kref_put(&device->kref, drbd_destroy_device); + } +} + +static void shutdown_connect_timer(struct drbd_connection *connection) +{ + if (timer_shutdown_sync(&connection->connect_timer)) { + kref_put(&connection->kref, drbd_destroy_connection); + } +} + +void del_connect_timer(struct drbd_connection *connection) +{ + if (timer_delete_sync(&connection->connect_timer)) { + kref_put(&connection->kref, drbd_destroy_connection); + } +} + +/** + * drbd_unregister_connection() - make a connection "invisible" + * @connection: DRBD connection to unregister + * + * Remove the connection from the drbd object model. Keep reference count= s on + * connection->kref; they are dropped in drbd_reclaim_connection(). + */ +void drbd_unregister_connection(struct drbd_connection *connection) +{ + struct drbd_resource *resource =3D connection->resource; + struct drbd_peer_device *peer_device; + int vnr, rr; + + idr_for_each_entry(&connection->peer_devices, peer_device, vnr) + drbd_debugfs_peer_device_cleanup(peer_device); + + write_lock_irq(&resource->state_rwlock); + set_bit(C_UNREGISTERED, &connection->flags); + smp_wmb(); + idr_for_each_entry(&connection->peer_devices, peer_device, vnr) + list_del_rcu(&peer_device->peer_devices); + list_del_rcu(&connection->connections); + write_unlock_irq(&resource->state_rwlock); + + drbd_debugfs_connection_cleanup(connection); + + shutdown_connect_timer(connection); + + rr =3D drbd_free_peer_reqs(connection, &connection->done_ee); + if (rr) + drbd_err(connection, "%d EEs in done list found!\n", rr); + + drbd_transport_shutdown(connection, DESTROY_TRANSPORT); + drbd_put_send_buffers(connection); + conn_free_crypto(connection); +} + +void drbd_reclaim_connection(struct rcu_head *rp) +{ + struct drbd_connection *connection =3D + container_of(rp, struct drbd_connection, rcu); + struct drbd_peer_device *peer_device; + int vnr; + + idr_for_each_entry(&connection->peer_devices, peer_device, vnr) { + kref_put(&connection->kref, drbd_destroy_connection); + } + kref_put(&connection->kref, drbd_destroy_connection); +} + +void drbd_reclaim_path(struct rcu_head *rp) +{ + struct drbd_path *path =3D container_of(rp, struct drbd_path, rcu); + + INIT_LIST_HEAD(&path->list); + kref_put(&path->kref, drbd_destroy_path); } =20 static int __init drbd_init(void) { int err; =20 - if (drbd_minor_count < DRBD_MINOR_COUNT_MIN || drbd_minor_count > DRBD_MI= NOR_COUNT_MAX) { + + if (drbd_minor_count < DRBD_MINOR_COUNT_MIN + || drbd_minor_count > DRBD_MINOR_COUNT_MAX) { pr_err("invalid minor_count (%d)\n", drbd_minor_count); #ifdef MODULE return -EINVAL; @@ -2840,24 +4440,41 @@ static int __init drbd_init(void) return err; } =20 + /* + * allocate all necessary structs + */ drbd_proc =3D NULL; /* play safe for drbd_cleanup */ idr_init(&drbd_devices); =20 - mutex_init(&resources_mutex); INIT_LIST_HEAD(&drbd_resources); =20 + err =3D register_pernet_device(&drbd_pernet_ops); + if (err) { + pr_err("unable to register net namespace handlers\n"); + goto fail; + } + + drbd_enable_netns(); err =3D drbd_genl_register(); if (err) { pr_err("unable to register generic netlink family\n"); goto fail; } =20 + err =3D -ENOMEM; + ping_ack_sender =3D alloc_workqueue("drbd_pas", + WQ_UNBOUND | WQ_MEM_RECLAIM | WQ_HIGHPRI, 0); + if (!ping_ack_sender) + goto fail; + err =3D drbd_create_mempools(); if (err) goto fail; =20 err =3D -ENOMEM; - drbd_proc =3D proc_create_single("drbd", S_IFREG | 0444 , NULL, drbd_seq_= show); + drbd_proc =3D proc_create_single("drbd", S_IFREG | 0444, NULL, + drbd_seq_show); + if (!drbd_proc) { pr_err("unable to register proc file\n"); goto fail; @@ -2879,6 +4496,11 @@ static int __init drbd_init(void) GENL_MAGIC_VERSION, PRO_VERSION_MIN, PRO_VERSION_MAX); pr_info("%s\n", drbd_buildtag()); pr_info("registered as block device major %d\n", DRBD_MAJOR); + +#ifdef CONFIG_DRBD_COMPAT_84 + atomic_set(&nr_drbd8_devices, 0); +#endif + return 0; /* Success! */ =20 fail: @@ -2890,493 +4512,1104 @@ static int __init drbd_init(void) return err; } =20 -static void drbd_free_one_sock(struct drbd_socket *ds) -{ - struct socket *s; - mutex_lock(&ds->mutex); - s =3D ds->socket; - ds->socket =3D NULL; - mutex_unlock(&ds->mutex); - if (s) { - /* so debugfs does not need to mutex_lock() */ - synchronize_rcu(); - kernel_sock_shutdown(s, SHUT_RDWR); - sock_release(s); - } -} - -void drbd_free_sock(struct drbd_connection *connection) -{ - if (connection->data.socket) - drbd_free_one_sock(&connection->data); - if (connection->meta.socket) - drbd_free_one_sock(&connection->meta); -} - /* meta data management */ =20 -void conn_md_sync(struct drbd_connection *connection) +static +void drbd_md_encode_9(struct drbd_device *device, struct meta_data_on_disk= _9 *buffer) { - struct drbd_peer_device *peer_device; - int vnr; + int i; =20 - rcu_read_lock(); - idr_for_each_entry(&connection->peer_devices, peer_device, vnr) { - struct drbd_device *device =3D peer_device->device; - - kref_get(&device->kref); - rcu_read_unlock(); - drbd_md_sync(device); - kref_put(&device->kref, drbd_destroy_device); - rcu_read_lock(); - } - rcu_read_unlock(); -} - -/* aligned 4kByte */ -struct meta_data_on_disk { - u64 la_size_sect; /* last agreed size. */ - u64 uuid[UI_SIZE]; /* UUIDs. */ - u64 device_uuid; - u64 reserved_u64_1; - u32 flags; /* MDF */ - u32 magic; - u32 md_size_sect; - u32 al_offset; /* offset to this block */ - u32 al_nr_extents; /* important for restoring the AL (userspace) */ - /* `-- act_log->nr_elements <-- ldev->dc.al_extents */ - u32 bm_offset; /* offset to the bitmap, from here */ - u32 bm_bytes_per_bit; /* BM_BLOCK_SIZE */ - u32 la_peer_max_bio_size; /* last peer max_bio_size */ - - /* see al_tr_number_to_on_disk_sector() */ - u32 al_stripes; - u32 al_stripe_size_4k; - - u8 reserved_u8[4096 - (7*8 + 10*4)]; -} __packed; - - - -void drbd_md_write(struct drbd_device *device, void *b) -{ - struct meta_data_on_disk *buffer =3D b; - sector_t sector; - int i; - - memset(buffer, 0, sizeof(*buffer)); - - buffer->la_size_sect =3D cpu_to_be64(get_capacity(device->vdisk)); - for (i =3D UI_CURRENT; i < UI_SIZE; i++) - buffer->uuid[i] =3D cpu_to_be64(device->ldev->md.uuid[i]); + buffer->effective_size =3D cpu_to_be64(device->ldev->md.effective_size); + buffer->current_uuid =3D cpu_to_be64(device->ldev->md.current_uuid); + buffer->members =3D cpu_to_be64(device->ldev->md.members); buffer->flags =3D cpu_to_be32(device->ldev->md.flags); - buffer->magic =3D cpu_to_be32(DRBD_MD_MAGIC_84_UNCLEAN); + buffer->magic =3D cpu_to_be32(DRBD_MD_MAGIC_09); =20 buffer->md_size_sect =3D cpu_to_be32(device->ldev->md.md_size_sect); buffer->al_offset =3D cpu_to_be32(device->ldev->md.al_offset); buffer->al_nr_extents =3D cpu_to_be32(device->act_log->nr_elements); - buffer->bm_bytes_per_bit =3D cpu_to_be32(BM_BLOCK_SIZE); + buffer->bm_bytes_per_bit =3D cpu_to_be32(device->ldev->md.bm_block_size); buffer->device_uuid =3D cpu_to_be64(device->ldev->md.device_uuid); =20 buffer->bm_offset =3D cpu_to_be32(device->ldev->md.bm_offset); - buffer->la_peer_max_bio_size =3D cpu_to_be32(device->peer_max_bio_size); + buffer->la_peer_max_bio_size =3D cpu_to_be32(device->device_conf.max_bio_= size); + buffer->bm_max_peers =3D cpu_to_be32(device->ldev->md.max_peers); + buffer->node_id =3D cpu_to_be32(device->ldev->md.node_id); + for (i =3D 0; i < DRBD_NODE_ID_MAX; i++) { + struct drbd_peer_md *peer_md =3D &device->ldev->md.peers[i]; + + buffer->peers[i].bitmap_uuid =3D cpu_to_be64(peer_md->bitmap_uuid); + buffer->peers[i].bitmap_dagtag =3D cpu_to_be64(peer_md->bitmap_dagtag); + buffer->peers[i].flags =3D cpu_to_be32(peer_md->flags & ~MDF_HAVE_BITMAP= ); + buffer->peers[i].bitmap_index =3D cpu_to_be32(peer_md->bitmap_index); + } + BUILD_BUG_ON(ARRAY_SIZE(device->ldev->md.history_uuids) !=3D ARRAY_SIZE(b= uffer->history_uuids)); + for (i =3D 0; i < ARRAY_SIZE(buffer->history_uuids); i++) + buffer->history_uuids[i] =3D cpu_to_be64(device->ldev->md.history_uuids[= i]); =20 buffer->al_stripes =3D cpu_to_be32(device->ldev->md.al_stripes); buffer->al_stripe_size_4k =3D cpu_to_be32(device->ldev->md.al_stripe_size= _4k); =20 + if (device->bitmap =3D=3D NULL) + for (i =3D 0; i < DRBD_PEERS_MAX; i++) + buffer->peers[i].flags |=3D cpu_to_be32(MDF_PEER_FULL_SYNC); +} + +static void drbd_md_encode(struct drbd_device *device, void *buffer) +{ + if (test_bit(LEGACY_84_MD, &device->flags)) + drbd_md_encode_84(device, buffer); + else + drbd_md_encode_9(device, buffer); +} + +int drbd_md_write(struct drbd_device *device, struct meta_data_on_disk_9 *= buffer) +{ + sector_t sector; + int err; + + if (drbd_md_dax_active(device->ldev)) { + drbd_md_encode(device, drbd_dax_md_addr(device->ldev)); + arch_wb_cache_pmem(drbd_dax_md_addr(device->ldev), + sizeof(struct meta_data_on_disk_9)); + return 0; + } + + memset(buffer, 0, sizeof(*buffer)); + + drbd_md_encode(device, buffer); + D_ASSERT(device, drbd_md_ss(device->ldev) =3D=3D device->ldev->md.md_offs= et); sector =3D device->ldev->md.md_offset; =20 - if (drbd_md_sync_page_io(device, device->ldev, sector, REQ_OP_WRITE)) { - /* this was a try anyways ... */ + err =3D drbd_md_sync_page_io(device, device->ldev, sector, REQ_OP_WRITE); + if (err) { drbd_err(device, "meta data update failed!\n"); - drbd_chk_io_error(device, 1, DRBD_META_IO_ERROR); + drbd_handle_io_error(device, DRBD_META_IO_ERROR); } + + return err; } =20 /** - * drbd_md_sync() - Writes the meta data super block if the MD_DIRTY flag = bit is set + * __drbd_md_sync() - Writes the meta data super block (conditionally) if = the MD_DIRTY flag bit is set * @device: DRBD device. + * @maybe: meta data may in fact be "clean", the actual write may be skipp= ed. */ -void drbd_md_sync(struct drbd_device *device) +static int __drbd_md_sync(struct drbd_device *device, bool maybe) { - struct meta_data_on_disk *buffer; + struct meta_data_on_disk_9 *buffer; + int err =3D -EIO; =20 /* Don't accidentally change the DRBD meta data layout. */ - BUILD_BUG_ON(UI_SIZE !=3D 4); - BUILD_BUG_ON(sizeof(struct meta_data_on_disk) !=3D 4096); - - timer_delete(&device->md_sync_timer); - /* timer may be rearmed by drbd_md_mark_dirty() now. */ - if (!test_and_clear_bit(MD_DIRTY, &device->flags)) - return; + BUILD_BUG_ON(DRBD_PEERS_MAX !=3D 32); + BUILD_BUG_ON(HISTORY_UUIDS !=3D 32); + BUILD_BUG_ON(sizeof(struct meta_data_on_disk_9) !=3D 4096); =20 - /* We use here D_FAILED and not D_ATTACHING because we try to write - * metadata even if we detach due to a disk failure! */ - if (!get_ldev_if_state(device, D_FAILED)) - return; + if (!get_ldev_if_state(device, D_DETACHING)) + return -EIO; =20 buffer =3D drbd_md_get_buffer(device, __func__); if (!buffer) goto out; =20 - drbd_md_write(device, buffer); + timer_delete(&device->md_sync_timer); + /* timer may be rearmed by drbd_md_mark_dirty() now. */ =20 - /* Update device->ldev->md.la_size_sect, - * since we updated it on metadata. */ - device->ldev->md.la_size_sect =3D get_capacity(device->vdisk); + if (test_and_clear_bit(MD_DIRTY, &device->flags) || !maybe) { + err =3D drbd_md_write(device, buffer); + if (err) + set_bit(MD_DIRTY, &device->flags); + } =20 drbd_md_put_buffer(device); out: put_ldev(device); + + return err; +} + +int drbd_md_sync(struct drbd_device *device) +{ + return __drbd_md_sync(device, false); +} + +int drbd_md_sync_if_dirty(struct drbd_device *device) +{ + return __drbd_md_sync(device, true); +} + +/** + * drbd_md_mark_dirty() - Mark meta data super block as dirty + * @device: DRBD device. + * + * Call this function if you change anything that should be written to + * the meta-data super block. This function sets MD_DIRTY, and starts a + * timer that ensures that within five seconds you have to call drbd_md_sy= nc(). + */ +void drbd_md_mark_dirty(struct drbd_device *device) +{ + if (!test_and_set_bit(MD_DIRTY, &device->flags)) + mod_timer(&device->md_sync_timer, jiffies + 5*HZ); +} + +void _drbd_uuid_push_history(struct drbd_device *device, u64 val) +{ + struct drbd_md *md =3D &device->ldev->md; + int node_id, i; + + if (val =3D=3D UUID_JUST_CREATED || val =3D=3D 0) + return; + + val &=3D ~UUID_PRIMARY; + + if (val =3D=3D (md->current_uuid & ~UUID_PRIMARY)) + return; + + for (node_id =3D 0; node_id < DRBD_NODE_ID_MAX; node_id++) { + if (node_id =3D=3D md->node_id) + continue; + if (val =3D=3D (md->peers[node_id].bitmap_uuid & ~UUID_PRIMARY)) + return; + } + + for (i =3D 0; i < ARRAY_SIZE(md->history_uuids); i++) { + if (md->history_uuids[i] =3D=3D val) + return; + } + + for (i =3D ARRAY_SIZE(md->history_uuids) - 1; i > 0; i--) + md->history_uuids[i] =3D md->history_uuids[i - 1]; + md->history_uuids[i] =3D val; +} + +u64 _drbd_uuid_pull_history(struct drbd_peer_device *peer_device) +{ + struct drbd_device *device =3D peer_device->device; + struct drbd_md *md =3D &device->ldev->md; + u64 first_history_uuid; + int i; + + first_history_uuid =3D md->history_uuids[0]; + for (i =3D 0; i < ARRAY_SIZE(md->history_uuids) - 1; i++) + md->history_uuids[i] =3D md->history_uuids[i + 1]; + md->history_uuids[i] =3D 0; + + return first_history_uuid; +} + +static void __drbd_uuid_set_current(struct drbd_device *device, u64 val) +{ + drbd_md_mark_dirty(device); + if (device->resource->role[NOW] =3D=3D R_PRIMARY) + val |=3D UUID_PRIMARY; + else + val &=3D ~UUID_PRIMARY; + + device->ldev->md.current_uuid =3D val; + drbd_uuid_set_exposed(device, val, false); +} + +static void __drbd_uuid_set_bitmap(struct drbd_peer_device *peer_device, u= 64 val) +{ + struct drbd_device *device =3D peer_device->device; + struct drbd_peer_md *peer_md =3D &device->ldev->md.peers[peer_device->nod= e_id]; + + drbd_md_mark_dirty(device); + peer_md->bitmap_uuid =3D val; + peer_md->bitmap_dagtag =3D val ? device->resource->dagtag_sector : 0; +} + +void _drbd_uuid_set_current(struct drbd_device *device, u64 val) +{ + unsigned long flags; + + spin_lock_irqsave(&device->ldev->md.uuid_lock, flags); + __drbd_uuid_set_current(device, val); + spin_unlock_irqrestore(&device->ldev->md.uuid_lock, flags); +} + +void _drbd_uuid_set_bitmap(struct drbd_peer_device *peer_device, u64 val) +{ + struct drbd_device *device =3D peer_device->device; + unsigned long flags; + + down_write(&device->uuid_sem); + spin_lock_irqsave(&device->ldev->md.uuid_lock, flags); + __drbd_uuid_set_bitmap(peer_device, val); + spin_unlock_irqrestore(&device->ldev->md.uuid_lock, flags); + up_write(&device->uuid_sem); +} + +/* call holding down_write(uuid_sem) */ +void drbd_uuid_set_bitmap(struct drbd_peer_device *peer_device, u64 uuid) +{ + struct drbd_device *device =3D peer_device->device; + unsigned long flags; + u64 previous_uuid; + + spin_lock_irqsave(&device->ldev->md.uuid_lock, flags); + previous_uuid =3D drbd_bitmap_uuid(peer_device); + __drbd_uuid_set_bitmap(peer_device, uuid); + if (previous_uuid) + _drbd_uuid_push_history(device, previous_uuid); + spin_unlock_irqrestore(&device->ldev->md.uuid_lock, flags); +} + +/** + * drbd_uuid_is_day0() - Check if device is in "day0" UUID state + * @device: DRBD device (caller must hold ldev reference) + * + * Returns true if the current UUID appears to be a "day0" UUID: + * a real UUID value was set (e.g. by linstor during create-md), + * but no UUID rotation has ever happened (all history and bitmap + * UUIDs are still zero). + */ +bool drbd_uuid_is_day0(struct drbd_device *device) +{ + struct drbd_md *md =3D &device->ldev->md; + int i; + + if ((md->current_uuid & ~UUID_PRIMARY) =3D=3D UUID_JUST_CREATED || + md->current_uuid =3D=3D 0) + return false; + + for (i =3D 0; i < ARRAY_SIZE(md->history_uuids); i++) + if (md->history_uuids[i]) + return false; + + for (i =3D 0; i < DRBD_NODE_ID_MAX; i++) { + if (i =3D=3D md->node_id) + continue; + if (md->peers[i].bitmap_uuid) + return false; + } + + return true; +} + +static u64 rotate_current_into_bitmap(struct drbd_device *device, u64 weak= _nodes, u64 dagtag) +{ + struct drbd_peer_md *peer_md =3D device->ldev->md.peers; + struct drbd_peer_device *peer_device; + int node_id; + u64 bm_uuid, prev_c_uuid; + u64 node_mask =3D 0; /* bit mask of node-ids processed */ + u64 slot_mask =3D 0; /* bit mask of on-disk bitmap slots processed */ + /* return value, bit mask of node-ids for which we + * actually set a new bitmap uuid */ + u64 got_new_bitmap_uuid =3D 0; + + if (device->ldev->md.current_uuid !=3D UUID_JUST_CREATED) + prev_c_uuid =3D device->ldev->md.current_uuid; + else + get_random_bytes(&prev_c_uuid, sizeof(u64)); + + rcu_read_lock(); + for_each_peer_device_rcu(peer_device, device) { + enum drbd_disk_state pdsk; + node_id =3D peer_device->node_id; + node_mask |=3D NODE_MASK(node_id); + if (peer_device->bitmap_index !=3D -1) + __set_bit(peer_device->bitmap_index, (unsigned long *)&slot_mask); + bm_uuid =3D peer_md[node_id].bitmap_uuid; + if (bm_uuid && bm_uuid !=3D prev_c_uuid) + continue; + + pdsk =3D peer_device->disk_state[NOW]; + + /* Create a new current UUID for a peer that is diskless but usually has= a backing disk. + * Do not create a new current UUID for a CONNECTED intentional diskless= peer. + * Create one for an intentional diskless peer that is currently away. */ + if (pdsk =3D=3D D_DISKLESS && !(peer_md[node_id].flags & MDF_HAVE_BITMAP= )) + continue; + + if ((pdsk <=3D D_UNKNOWN && pdsk !=3D D_NEGOTIATING) || + (NODE_MASK(node_id) & weak_nodes)) { + peer_md[node_id].bitmap_uuid =3D prev_c_uuid; + peer_md[node_id].bitmap_dagtag =3D dagtag; + drbd_md_mark_dirty(device); + got_new_bitmap_uuid |=3D NODE_MASK(node_id); + } + } + for (node_id =3D 0; node_id < DRBD_NODE_ID_MAX; node_id++) { + int slot_nr; + if (node_id =3D=3D device->ldev->md.node_id) + continue; + if (node_mask & NODE_MASK(node_id)) + continue; + slot_nr =3D peer_md[node_id].bitmap_index; + if (slot_nr !=3D -1) { + if (test_bit(slot_nr, (unsigned long *)&slot_mask)) + continue; + __set_bit(slot_nr, (unsigned long *)&slot_mask); + } + bm_uuid =3D peer_md[node_id].bitmap_uuid; + if (bm_uuid && bm_uuid !=3D prev_c_uuid) + continue; + if (slot_nr =3D=3D -1) { + slot_nr =3D find_first_zero_bit((unsigned long *)&slot_mask, sizeof(slo= t_mask) * BITS_PER_BYTE); + __set_bit(slot_nr, (unsigned long *)&slot_mask); + } + peer_md[node_id].bitmap_uuid =3D prev_c_uuid; + peer_md[node_id].bitmap_dagtag =3D dagtag; + drbd_md_mark_dirty(device); + /* count, but only if that bitmap index exists. */ + if (slot_nr < device->ldev->md.max_peers) + got_new_bitmap_uuid |=3D NODE_MASK(node_id); + } + rcu_read_unlock(); + + return got_new_bitmap_uuid; +} + +static u64 initial_resync_nodes(struct drbd_device *device) +{ + struct drbd_peer_device *peer_device; + u64 nodes =3D 0; + + for_each_peer_device(peer_device, device) { + if (peer_device->disk_state[NOW] =3D=3D D_INCONSISTENT && + peer_device->repl_state[NOW] =3D=3D L_ESTABLISHED) + nodes |=3D NODE_MASK(peer_device->node_id); + } + + return nodes; +} + +u64 drbd_weak_nodes_device(struct drbd_device *device) +{ + struct drbd_peer_device *peer_device; + u64 not_weak =3D 0; + + if (device->disk_state[NOW] =3D=3D D_UP_TO_DATE) + not_weak =3D NODE_MASK(device->resource->res_opts.node_id); + + rcu_read_lock(); + for_each_peer_device_rcu(peer_device, device) { + enum drbd_disk_state pdsk =3D peer_device->disk_state[NOW]; + if (!(pdsk <=3D D_FAILED || pdsk =3D=3D D_UNKNOWN || pdsk =3D=3D D_OUTDA= TED)) + not_weak |=3D NODE_MASK(peer_device->node_id); + + } + rcu_read_unlock(); + + return ~not_weak; +} + + +static bool __new_current_uuid_prepare(struct drbd_device *device, bool fo= rced) +{ + u64 got_new_bitmap_uuid, val, old_current_uuid; + bool day0; + int err; + + spin_lock_irq(&device->ldev->md.uuid_lock); + day0 =3D drbd_uuid_is_day0(device); + got_new_bitmap_uuid =3D rotate_current_into_bitmap(device, + forced ? initial_resync_nodes(device) : 0, + device->resource->dagtag_sector); + + if (!got_new_bitmap_uuid && !day0) { + spin_unlock_irq(&device->ldev->md.uuid_lock); + return false; + } + + old_current_uuid =3D device->ldev->md.current_uuid; + get_random_bytes(&val, sizeof(u64)); + __drbd_uuid_set_current(device, val); + spin_unlock_irq(&device->ldev->md.uuid_lock); + + /* get it to stable storage _now_ */ + err =3D drbd_md_sync(device); + if (err) { + _drbd_uuid_set_current(device, old_current_uuid); + return false; + } + + return true; +} + +static void __new_current_uuid_info(struct drbd_device *device, u64 weak_n= odes) +{ + drbd_info(device, "new current UUID: %016llX weak: %016llX\n", + device->ldev->md.current_uuid, weak_nodes); +} + +static void __new_current_uuid_send(struct drbd_device *device, u64 weak_n= odes, bool forced) +{ + struct drbd_peer_device *peer_device; + u64 im; + + for_each_peer_device_ref(peer_device, im, device) { + if (peer_device->repl_state[NOW] >=3D L_ESTABLISHED) + drbd_send_uuids(peer_device, forced ? 0 : UUID_FLAG_NEW_DATAGEN, weak_n= odes); + } +} + +static void __drbd_uuid_new_current_send(struct drbd_device *device, bool = forced) +{ + u64 weak_nodes; + + down_write(&device->uuid_sem); + if (!__new_current_uuid_prepare(device, forced)) { + up_write(&device->uuid_sem); + return; + } + downgrade_write(&device->uuid_sem); + weak_nodes =3D drbd_weak_nodes_device(device); + __new_current_uuid_info(device, weak_nodes); + __new_current_uuid_send(device, weak_nodes, forced); + up_read(&device->uuid_sem); +} + +static void __drbd_uuid_new_current_holding_uuid_sem(struct drbd_device *d= evice) +{ + u64 weak_nodes; + + if (!__new_current_uuid_prepare(device, false)) + return; + weak_nodes =3D drbd_weak_nodes_device(device); + __new_current_uuid_info(device, weak_nodes); +} + +static bool peer_can_fill_a_bitmap_slot(struct drbd_peer_device *peer_devi= ce) +{ + struct drbd_device *device =3D peer_device->device; + const bool intentional_diskless =3D device->device_conf.intentional_diskl= ess; + const int my_node_id =3D device->resource->res_opts.node_id; + int node_id; + + for (node_id =3D 0; node_id < DRBD_NODE_ID_MAX; node_id++) { + if (node_id =3D=3D peer_device->node_id) + continue; + if (peer_device->bitmap_uuids[node_id] =3D=3D 0) { + struct drbd_peer_device *p2; + p2 =3D peer_device_by_node_id(peer_device->device, node_id); + if (p2 && !want_bitmap(p2)) + continue; + + if (node_id =3D=3D my_node_id && intentional_diskless) + continue; + + return true; + } + } + + return false; +} + +static bool diskfull_peers_need_new_cur_uuid(struct drbd_device *device) +{ + struct drbd_peer_device *peer_device; + bool rv =3D false; + + rcu_read_lock(); + for_each_peer_device_rcu(peer_device, device) { + if (peer_device->connection->agreed_pro_version < 110) + continue; + + /* Only an up-to-date peer persists a new current uuid! */ + if (peer_device->disk_state[NOW] < D_UP_TO_DATE) + continue; + if (peer_can_fill_a_bitmap_slot(peer_device)) { + rv =3D true; + break; + } + } + rcu_read_unlock(); + + return rv; +} + +static bool a_lost_peer_is_on_same_cur_uuid(struct drbd_device *device) +{ + struct drbd_peer_device *peer_device; + bool rv =3D false; + + rcu_read_lock(); + for_each_peer_device_rcu(peer_device, device) { + enum drbd_disk_state pdsk =3D peer_device->disk_state[NOW]; + + if (pdsk >=3D D_INCONSISTENT && pdsk <=3D D_UNKNOWN && + (device->exposed_data_uuid & ~UUID_PRIMARY) =3D=3D + (peer_device->current_uuid & ~UUID_PRIMARY) && + !(peer_device->uuid_flags & UUID_FLAG_SYNC_TARGET)) { + rv =3D true; + break; + } + } + rcu_read_unlock(); + + return rv; +} + +/** + * drbd_uuid_new_current() - Creates a new current UUID + * @device: DRBD device. + * @forced: Force UUID creation + * + * Creates a new current UUID, and rotates the old current UUID into + * the bitmap slot. Causes an incremental resync upon next connect. + */ +void drbd_uuid_new_current(struct drbd_device *device, bool forced) +{ + if (get_ldev_if_state(device, D_UP_TO_DATE)) { + __drbd_uuid_new_current_send(device, forced); + put_ldev(device); + } else if (diskfull_peers_need_new_cur_uuid(device) || + a_lost_peer_is_on_same_cur_uuid(device)) { + struct drbd_peer_device *peer_device; + /* The peers will store the new current UUID... */ + u64 current_uuid, weak_nodes; + get_random_bytes(¤t_uuid, sizeof(u64)); + if (device->resource->role[NOW] =3D=3D R_PRIMARY) + current_uuid |=3D UUID_PRIMARY; + else + current_uuid &=3D ~UUID_PRIMARY; + + down_write(&device->uuid_sem); + drbd_uuid_set_exposed(device, current_uuid, false); + downgrade_write(&device->uuid_sem); + drbd_info(device, "sending new current UUID: %016llX\n", current_uuid); + + weak_nodes =3D drbd_weak_nodes_device(device); + for_each_peer_device(peer_device, device) { + if (peer_device->repl_state[NOW] >=3D L_ESTABLISHED) { + drbd_send_current_uuid(peer_device, current_uuid, weak_nodes); + peer_device->current_uuid =3D current_uuid; + } + } + up_read(&device->uuid_sem); + } +} + +void drbd_uuid_new_current_by_user(struct drbd_device *device) +{ + struct drbd_peer_device *peer_device; + + down_write(&device->uuid_sem); + for_each_peer_device(peer_device, device) + drbd_uuid_set_bitmap(peer_device, 0); /* Rotate UI_BITMAP to History 1, = etc... */ + + if (get_ldev(device)) { + __drbd_uuid_new_current_holding_uuid_sem(device); + put_ldev(device); + } + up_write(&device->uuid_sem); +} + +static void drbd_propagate_uuids(struct drbd_device *device, u64 nodes) +{ + struct drbd_peer_device *peer_device; + + rcu_read_lock(); + for_each_peer_device_rcu(peer_device, device) { + if (!(nodes & NODE_MASK(peer_device->node_id))) + continue; + + if (peer_device->repl_state[NOW] < L_ESTABLISHED) + continue; + + if (list_empty(&peer_device->propagate_uuids_work.list)) + drbd_queue_work(&peer_device->connection->sender_work, + &peer_device->propagate_uuids_work); + } + rcu_read_unlock(); +} + +void drbd_uuid_received_new_current(struct drbd_peer_device *from_pd, u64 = val, u64 weak_nodes) +{ + struct drbd_device *device =3D from_pd->device; + u64 dagtag =3D atomic64_read(&from_pd->connection->last_dagtag_sector); + struct drbd_peer_device *peer_device; + u64 recipients =3D 0; + bool set_current =3D true; + + down_write(&device->uuid_sem); + spin_lock_irq(&device->ldev->md.uuid_lock); + + rcu_read_lock(); + for_each_peer_device_rcu(peer_device, device) { + if (peer_device->repl_state[NOW] =3D=3D L_SYNC_TARGET || + peer_device->repl_state[NOW] =3D=3D L_BEHIND || + peer_device->repl_state[NOW] =3D=3D L_PAUSED_SYNC_T) { + peer_device->current_uuid =3D val; + set_current =3D false; + } + if (peer_device->repl_state[NOW] =3D=3D L_WF_BITMAP_S || + peer_device->repl_state[NOW] =3D=3D L_SYNC_SOURCE || + peer_device->repl_state[NOW] =3D=3D L_PAUSED_SYNC_S || + peer_device->repl_state[NOW] =3D=3D L_ESTABLISHED) + recipients |=3D NODE_MASK(peer_device->node_id); + + if (peer_device->disk_state[NOW] =3D=3D D_DISKLESS) + recipients |=3D NODE_MASK(peer_device->node_id); + } + rcu_read_unlock(); + + if (set_current) { + u64 old_current =3D device->ldev->md.current_uuid; + u64 upd; + + if (device->disk_state[NOW] =3D=3D D_UP_TO_DATE) + recipients |=3D rotate_current_into_bitmap(device, weak_nodes, dagtag); + + upd =3D ~weak_nodes; /* These nodes are connected to the primary */ + upd &=3D __test_bitmap_slots(device); /* of those, I have a bitmap for */ + __set_bitmap_slots(device, val, upd); + /* Setting bitmap to the (new) current-UUID, means, at this moment + we know that we are at the same data as this not connected peer. */ + + __drbd_uuid_set_current(device, val); + + /* Even when the old current UUID was not used as any bitmap + * UUID, we still add it to the history. This is relevant, in + * particular, when we afterwards perform a sync handshake with + * a peer which is not one of the "weak_nodes", but hasn't + * received the new current UUID. If we do not add the current + * UUID to the history, we will end up with a spurious + * unrelated data or split-brain decision. */ + _drbd_uuid_push_history(device, old_current); + } + + spin_unlock_irq(&device->ldev->md.uuid_lock); + downgrade_write(&device->uuid_sem); + if (set_current) + drbd_propagate_uuids(device, recipients); + up_read(&device->uuid_sem); +} + +static u64 __set_bitmap_slots(struct drbd_device *device, u64 bitmap_uuid,= u64 do_nodes) +{ + struct drbd_peer_md *peer_md =3D device->ldev->md.peers; + u64 modified =3D 0; + int node_id; + + for (node_id =3D 0; node_id < DRBD_NODE_ID_MAX; node_id++) { + if (node_id =3D=3D device->ldev->md.node_id) + continue; + if (!(do_nodes & NODE_MASK(node_id))) + continue; + if (!(peer_md[node_id].flags & MDF_HAVE_BITMAP)) + continue; + if (peer_md[node_id].bitmap_uuid !=3D bitmap_uuid) { + u64 previous_bitmap_uuid =3D peer_md[node_id].bitmap_uuid; + /* drbd_info(device, "XXX bitmap[node_id=3D%d] =3D %llX\n", node_id, bi= tmap_uuid); */ + peer_md[node_id].bitmap_uuid =3D bitmap_uuid; + peer_md[node_id].bitmap_dagtag =3D + bitmap_uuid ? device->resource->dagtag_sector : 0; + _drbd_uuid_push_history(device, previous_bitmap_uuid); + drbd_md_mark_dirty(device); + modified |=3D NODE_MASK(node_id); + } + } + + return modified; +} + +static u64 __test_bitmap_slots(struct drbd_device *device) +{ + struct drbd_peer_md *peer_md =3D device->ldev->md.peers; + int node_id; + u64 rv =3D 0; + + for (node_id =3D 0; node_id < DRBD_NODE_ID_MAX; node_id++) { + if (peer_md[node_id].bitmap_uuid) + rv |=3D NODE_MASK(node_id); + } + + return rv; +} + +/* __test_bitmap_slots_of_peer() operates on view of the world I know the + SyncSource had. It might be that in the mean time some peers sent more + recent UUIDs to me. Remove all peers that are on the same UUID as I am + now from the set of nodes */ +static u64 __test_bitmap_slots_of_peer(struct drbd_peer_device *peer_devic= e) +{ + u64 set_bitmap_slots =3D 0; + int node_id; + + for (node_id =3D 0; node_id < DRBD_NODE_ID_MAX; node_id++) { + u64 bitmap_uuid =3D peer_device->bitmap_uuids[node_id]; + + if (bitmap_uuid !=3D 0 && bitmap_uuid !=3D -1) + set_bitmap_slots |=3D NODE_MASK(node_id); + } + + return set_bitmap_slots; +} + +static u64 +peers_with_current_uuid(struct drbd_device *device, u64 current_uuid) +{ + struct drbd_peer_device *peer_device; + u64 nodes =3D 0; + + current_uuid &=3D ~UUID_PRIMARY; + rcu_read_lock(); + for_each_peer_device_rcu(peer_device, device) { + enum drbd_disk_state peer_disk_state =3D peer_device->disk_state[NOW]; + if (peer_disk_state < D_INCONSISTENT || peer_disk_state =3D=3D D_UNKNOWN) + continue; + if (current_uuid =3D=3D (peer_device->current_uuid & ~UUID_PRIMARY)) + nodes |=3D NODE_MASK(peer_device->node_id); + } + rcu_read_unlock(); + + return nodes; +} + +void drbd_uuid_resync_starting(struct drbd_peer_device *peer_device) +{ + struct drbd_device *device =3D peer_device->device; + + peer_device->rs_start_uuid =3D drbd_current_uuid(device); + if (peer_device->uuid_flags & UUID_FLAG_CRASHED_PRIMARY) + set_bit(SYNC_SRC_CRASHED_PRI, &peer_device->flags); + rotate_current_into_bitmap(device, 0, device->resource->dagtag_sector); +} + +u64 drbd_uuid_resync_finished(struct drbd_peer_device *peer_device) +{ + struct drbd_device *device =3D peer_device->device; + unsigned long flags; + int i; + u64 ss_nz_bm; /* sync_source has non zero bitmap for. expressed as nodema= sk */ + u64 pwcu; /* peers with current uuid */ + u64 newer; + + spin_lock_irqsave(&device->ldev->md.uuid_lock, flags); + // Inherit history from the sync source + for (i =3D 0; i < ARRAY_SIZE(peer_device->history_uuids); i++) + _drbd_uuid_push_history(device, peer_device->history_uuids[i] & ~UUID_PR= IMARY); + + // Inherit history in bitmap UUIDs from the sync source + for (i =3D 0; i < DRBD_PEERS_MAX; i++) + if (peer_device->bitmap_uuids[i] !=3D -1) + _drbd_uuid_push_history(device, + peer_device->bitmap_uuids[i] & ~UUID_PRIMARY); + + ss_nz_bm =3D __test_bitmap_slots_of_peer(peer_device); + pwcu =3D peers_with_current_uuid(device, peer_device->current_uuid); + + newer =3D __set_bitmap_slots(device, peer_device->rs_start_uuid, ss_nz_bm= & ~pwcu); + __set_bitmap_slots(device, 0, ~ss_nz_bm | pwcu); + _drbd_uuid_push_history(device, drbd_current_uuid(device)); + __drbd_uuid_set_current(device, peer_device->current_uuid); + spin_unlock_irqrestore(&device->ldev->md.uuid_lock, flags); + + return newer; +} + +bool drbd_uuid_set_exposed(struct drbd_device *device, u64 val, bool log) +{ + if ((device->exposed_data_uuid & ~UUID_PRIMARY) =3D=3D (val & ~UUID_PRIMA= RY) || + val =3D=3D UUID_JUST_CREATED) + return false; + + if (device->resource->role[NOW] =3D=3D R_PRIMARY) + val |=3D UUID_PRIMARY; + else + val &=3D ~UUID_PRIMARY; + + device->exposed_data_uuid =3D val; + + if (log) + drbd_info(device, "Setting exposed data uuid: %016llX\n", (unsigned long= long)val); + + return true; +} + +static const char *name_of_node_id(struct drbd_resource *resource, int nod= e_id) +{ + /* Caller need to hold rcu_read_lock */ + struct drbd_connection *connection =3D drbd_connection_by_node_id(resourc= e, node_id); + + return connection ? rcu_dereference(connection->transport.net_conf)->name= : ""; } =20 -static int check_activity_log_stripe_size(struct drbd_device *device, - struct meta_data_on_disk *on_disk, - struct drbd_md *in_core) +static void forget_bitmap(struct drbd_device *device, int node_id) { - u32 al_stripes =3D be32_to_cpu(on_disk->al_stripes); - u32 al_stripe_size_4k =3D be32_to_cpu(on_disk->al_stripe_size_4k); - u64 al_size_4k; + int bitmap_index =3D device->ldev->md.peers[node_id].bitmap_index; + const char *name; =20 - /* both not set: default to old fixed size activity log */ - if (al_stripes =3D=3D 0 && al_stripe_size_4k =3D=3D 0) { - al_stripes =3D 1; - al_stripe_size_4k =3D MD_32kB_SECT/8; - } + if (_drbd_bm_total_weight(device, bitmap_index) =3D=3D 0) + return; =20 - /* some paranoia plausibility checks */ + spin_unlock_irq(&device->ldev->md.uuid_lock); + rcu_read_lock(); + name =3D name_of_node_id(device->resource, node_id); + drbd_info(device, "clearing bitmap UUID and content (%lu bits) for node %= d (%s)(slot %d)\n", + _drbd_bm_total_weight(device, bitmap_index), node_id, name, bitmap_ind= ex); + rcu_read_unlock(); + drbd_suspend_io(device, WRITE_ONLY); + drbd_bm_lock(device, "forget_bitmap()", BM_LOCK_TEST | BM_LOCK_SET); + _drbd_bm_clear_many_bits(device, bitmap_index, 0, -1UL); + drbd_bm_unlock(device); + drbd_resume_io(device); + drbd_md_mark_dirty(device); + spin_lock_irq(&device->ldev->md.uuid_lock); +} =20 - /* we need both values to be set */ - if (al_stripes =3D=3D 0 || al_stripe_size_4k =3D=3D 0) - goto err; +static void copy_bitmap(struct drbd_device *device, int from_id, int to_id) +{ + struct drbd_peer_device *peer_device =3D peer_device_by_node_id(device, t= o_id); + struct drbd_peer_md *peer_md =3D device->ldev->md.peers; + u64 previous_bitmap_uuid =3D peer_md[to_id].bitmap_uuid; + int from_index =3D peer_md[from_id].bitmap_index; + int to_index =3D peer_md[to_id].bitmap_index; + const char *from_name, *to_name; =20 - al_size_4k =3D (u64)al_stripes * al_stripe_size_4k; + peer_md[to_id].bitmap_uuid =3D peer_md[from_id].bitmap_uuid; + peer_md[to_id].bitmap_dagtag =3D peer_md[from_id].bitmap_dagtag; + _drbd_uuid_push_history(device, previous_bitmap_uuid); =20 - /* Upper limit of activity log area, to avoid potential overflow - * problems in al_tr_number_to_on_disk_sector(). As right now, more - * than 72 * 4k blocks total only increases the amount of history, - * limiting this arbitrarily to 16 GB is not a real limitation ;-) */ - if (al_size_4k > (16 * 1024 * 1024/4)) - goto err; + /* Pretending that the updated UUID was sent is a hack. + Unfortunately Necessary to not interrupt the handshake */ + if (peer_device && peer_device->comm_bitmap_uuid =3D=3D previous_bitmap_u= uid) + peer_device->comm_bitmap_uuid =3D peer_md[from_id].bitmap_uuid; =20 - /* Lower limit: we need at least 8 transaction slots (32kB) - * to not break existing setups */ - if (al_size_4k < MD_32kB_SECT/8) - goto err; + spin_unlock_irq(&device->ldev->md.uuid_lock); + rcu_read_lock(); + from_name =3D name_of_node_id(device->resource, from_id); + to_name =3D name_of_node_id(device->resource, to_id); + drbd_info(device, "Node %d (%s) synced up to node %d (%s). copying bitmap= slot %d to %d.\n", + to_id, to_name, from_id, from_name, from_index, to_index); + rcu_read_unlock(); + drbd_suspend_io(device, WRITE_ONLY); + drbd_bm_lock(device, "copy_bitmap()", BM_LOCK_ALL); + drbd_bm_copy_slot(device, from_index, to_index); + drbd_bm_unlock(device); + drbd_resume_io(device); + drbd_md_mark_dirty(device); + spin_lock_irq(&device->ldev->md.uuid_lock); +} =20 - in_core->al_stripe_size_4k =3D al_stripe_size_4k; - in_core->al_stripes =3D al_stripes; - in_core->al_size_4k =3D al_size_4k; +static int find_node_id_by_bitmap_uuid(struct drbd_device *device, u64 bm_= uuid) +{ + struct drbd_peer_md *peer_md =3D device->ldev->md.peers; + int node_id; =20 - return 0; -err: - drbd_err(device, "invalid activity log striping: al_stripes=3D%u, al_stri= pe_size_4k=3D%u\n", - al_stripes, al_stripe_size_4k); - return -EINVAL; -} - -static int check_offsets_and_sizes(struct drbd_device *device, struct drbd= _backing_dev *bdev) -{ - sector_t capacity =3D drbd_get_capacity(bdev->md_bdev); - struct drbd_md *in_core =3D &bdev->md; - s32 on_disk_al_sect; - s32 on_disk_bm_sect; - - /* The on-disk size of the activity log, calculated from offsets, and - * the size of the activity log calculated from the stripe settings, - * should match. - * Though we could relax this a bit: it is ok, if the striped activity log - * fits in the available on-disk activity log size. - * Right now, that would break how resize is implemented. - * TODO: make drbd_determine_dev_size() (and the drbdmeta tool) aware - * of possible unused padding space in the on disk layout. */ - if (in_core->al_offset < 0) { - if (in_core->bm_offset > in_core->al_offset) - goto err; - on_disk_al_sect =3D -in_core->al_offset; - on_disk_bm_sect =3D in_core->al_offset - in_core->bm_offset; - } else { - if (in_core->al_offset !=3D MD_4kB_SECT) - goto err; - if (in_core->bm_offset < in_core->al_offset + in_core->al_size_4k * MD_4= kB_SECT) - goto err; + bm_uuid &=3D ~UUID_PRIMARY; =20 - on_disk_al_sect =3D in_core->bm_offset - MD_4kB_SECT; - on_disk_bm_sect =3D in_core->md_size_sect - in_core->bm_offset; + for (node_id =3D 0; node_id < DRBD_NODE_ID_MAX; node_id++) { + if ((peer_md[node_id].bitmap_uuid & ~UUID_PRIMARY) =3D=3D bm_uuid && + peer_md[node_id].flags & MDF_HAVE_BITMAP) + return node_id; } =20 - /* old fixed size meta data is exactly that: fixed. */ - if (in_core->meta_dev_idx >=3D 0) { - if (in_core->md_size_sect !=3D MD_128MB_SECT - || in_core->al_offset !=3D MD_4kB_SECT - || in_core->bm_offset !=3D MD_4kB_SECT + MD_32kB_SECT - || in_core->al_stripes !=3D 1 - || in_core->al_stripe_size_4k !=3D MD_32kB_SECT/8) - goto err; + for (node_id =3D 0; node_id < DRBD_NODE_ID_MAX; node_id++) { + if ((peer_md[node_id].bitmap_uuid & ~UUID_PRIMARY) =3D=3D bm_uuid) + return node_id; } =20 - if (capacity < in_core->md_size_sect) - goto err; - if (capacity - in_core->md_size_sect < drbd_md_first_sector(bdev)) - goto err; - - /* should be aligned, and at least 32k */ - if ((on_disk_al_sect & 7) || (on_disk_al_sect < MD_32kB_SECT)) - goto err; - - /* should fit (for now: exactly) into the available on-disk space; - * overflow prevention is in check_activity_log_stripe_size() above. */ - if (on_disk_al_sect !=3D in_core->al_size_4k * MD_4kB_SECT) - goto err; - - /* again, should be aligned */ - if (in_core->bm_offset & 7) - goto err; + return -1; +} =20 - /* FIXME check for device grow with flex external meta data? */ +static bool node_connected(struct drbd_resource *resource, int node_id) +{ + struct drbd_connection *connection; + bool r =3D false; =20 - /* can the available bitmap space cover the last agreed device size? */ - if (on_disk_bm_sect < (in_core->la_size_sect+7)/MD_4kB_SECT/8/512) - goto err; + rcu_read_lock(); + connection =3D drbd_connection_by_node_id(resource, node_id); + if (connection) + r =3D connection->cstate[NOW] =3D=3D C_CONNECTED; + rcu_read_unlock(); =20 - return 0; + return r; +} =20 -err: - drbd_err(device, "meta data offsets don't make sense: idx=3D%d " - "al_s=3D%u, al_sz4k=3D%u, al_offset=3D%d, bm_offset=3D%d, " - "md_size_sect=3D%u, la_size=3D%llu, md_capacity=3D%llu\n", - in_core->meta_dev_idx, - in_core->al_stripes, in_core->al_stripe_size_4k, - in_core->al_offset, in_core->bm_offset, in_core->md_size_sect, - (unsigned long long)in_core->la_size_sect, - (unsigned long long)capacity); +static bool detect_copy_ops_on_peer(struct drbd_peer_device *peer_device) +{ + struct drbd_device *device =3D peer_device->device; + struct drbd_peer_md *peer_md =3D device->ldev->md.peers; + struct drbd_resource *resource =3D device->resource; + int node_id1, node_id2, from_id; + u64 peer_bm_uuid; + bool modified =3D false; =20 - return -EINVAL; -} + for (node_id1 =3D 0; node_id1 < DRBD_NODE_ID_MAX; node_id1++) { + if (device->ldev->md.peers[node_id1].bitmap_index =3D=3D -1) + continue; =20 + if (node_connected(resource, node_id1)) + continue; =20 -/** - * drbd_md_read() - Reads in the meta data super block - * @device: DRBD device. - * @bdev: Device from which the meta data should be read in. - * - * Return NO_ERROR on success, and an enum drbd_ret_code in case - * something goes wrong. - * - * Called exactly once during drbd_adm_attach(), while still being D_DISKL= ESS, - * even before @bdev is assigned to @device->ldev. - */ -int drbd_md_read(struct drbd_device *device, struct drbd_backing_dev *bdev) -{ - struct meta_data_on_disk *buffer; - u32 magic, flags; - int i, rv =3D NO_ERROR; + peer_bm_uuid =3D peer_device->bitmap_uuids[node_id1]; + if (peer_bm_uuid =3D=3D 0 || peer_bm_uuid =3D=3D -1ULL) + continue; =20 - if (device->state.disk !=3D D_DISKLESS) - return ERR_DISK_CONFIGURED; + peer_bm_uuid &=3D ~UUID_PRIMARY; + for (node_id2 =3D node_id1 + 1; node_id2 < DRBD_NODE_ID_MAX; node_id2++)= { + if (device->ldev->md.peers[node_id2].bitmap_index =3D=3D -1) + continue; =20 - buffer =3D drbd_md_get_buffer(device, __func__); - if (!buffer) - return ERR_NOMEM; + if (node_connected(resource, node_id2)) + continue; =20 - /* First, figure out where our meta data superblock is located, - * and read it. */ - bdev->md.meta_dev_idx =3D bdev->disk_conf->meta_dev_idx; - bdev->md.md_offset =3D drbd_md_ss(bdev); - /* Even for (flexible or indexed) external meta data, - * initially restrict us to the 4k superblock for now. - * Affects the paranoia out-of-range access check in drbd_md_sync_page_io= (). */ - bdev->md.md_size_sect =3D 8; - - if (drbd_md_sync_page_io(device, bdev, bdev->md.md_offset, - REQ_OP_READ)) { - /* NOTE: can't do normal error processing here as this is - called BEFORE disk is attached */ - drbd_err(device, "Error while reading metadata.\n"); - rv =3D ERR_IO_MD_DISK; - goto err; - } - - magic =3D be32_to_cpu(buffer->magic); - flags =3D be32_to_cpu(buffer->flags); - if (magic =3D=3D DRBD_MD_MAGIC_84_UNCLEAN || - (magic =3D=3D DRBD_MD_MAGIC_08 && !(flags & MDF_AL_CLEAN))) { - /* btw: that's Activity Log clean, not "all" clean. */ - drbd_err(device, "Found unclean meta data. Did you \"drbdadm apply-al\"?= \n"); - rv =3D ERR_MD_UNCLEAN; - goto err; - } - - rv =3D ERR_MD_INVALID; - if (magic !=3D DRBD_MD_MAGIC_08) { - if (magic =3D=3D DRBD_MD_MAGIC_07) - drbd_err(device, "Found old (0.7) meta data magic. Did you \"drbdadm cr= eate-md\"?\n"); - else - drbd_err(device, "Meta data magic not found. Did you \"drbdadm create-m= d\"?\n"); - goto err; + if (peer_bm_uuid =3D=3D (peer_device->bitmap_uuids[node_id2] & ~UUID_PR= IMARY)) + goto found; + } } + return false; =20 - if (be32_to_cpu(buffer->bm_bytes_per_bit) !=3D BM_BLOCK_SIZE) { - drbd_err(device, "unexpected bm_bytes_per_bit: %u (expected %u)\n", - be32_to_cpu(buffer->bm_bytes_per_bit), BM_BLOCK_SIZE); - goto err; +found: + from_id =3D find_node_id_by_bitmap_uuid(device, peer_bm_uuid); + if (from_id =3D=3D -1) { + if (peer_md[node_id1].bitmap_uuid =3D=3D 0 && peer_md[node_id2].bitmap_u= uid =3D=3D 0) + return false; + drbd_err(peer_device, "unexpected\n"); + drbd_err(peer_device, "In UUIDs from node %d found equal UUID (%llX) for= nodes %d %d\n", + peer_device->node_id, peer_bm_uuid, node_id1, node_id2); + drbd_err(peer_device, "I have %llX for node_id=3D%d\n", + peer_md[node_id1].bitmap_uuid, node_id1); + drbd_err(peer_device, "I have %llX for node_id=3D%d\n", + peer_md[node_id2].bitmap_uuid, node_id2); + return false; } =20 + if (!(peer_md[from_id].flags & MDF_HAVE_BITMAP)) + return false; =20 - /* convert to in_core endian */ - bdev->md.la_size_sect =3D be64_to_cpu(buffer->la_size_sect); - for (i =3D UI_CURRENT; i < UI_SIZE; i++) - bdev->md.uuid[i] =3D be64_to_cpu(buffer->uuid[i]); - bdev->md.flags =3D be32_to_cpu(buffer->flags); - bdev->md.device_uuid =3D be64_to_cpu(buffer->device_uuid); - - bdev->md.md_size_sect =3D be32_to_cpu(buffer->md_size_sect); - bdev->md.al_offset =3D be32_to_cpu(buffer->al_offset); - bdev->md.bm_offset =3D be32_to_cpu(buffer->bm_offset); - - if (check_activity_log_stripe_size(device, buffer, &bdev->md)) - goto err; - if (check_offsets_and_sizes(device, bdev)) - goto err; + if (from_id !=3D node_id1 && + peer_md[node_id1].bitmap_uuid !=3D peer_bm_uuid) { + copy_bitmap(device, from_id, node_id1); + modified =3D true; =20 - if (be32_to_cpu(buffer->bm_offset) !=3D bdev->md.bm_offset) { - drbd_err(device, "unexpected bm_offset: %d (expected %d)\n", - be32_to_cpu(buffer->bm_offset), bdev->md.bm_offset); - goto err; - } - if (be32_to_cpu(buffer->md_size_sect) !=3D bdev->md.md_size_sect) { - drbd_err(device, "unexpected md_size: %u (expected %u)\n", - be32_to_cpu(buffer->md_size_sect), bdev->md.md_size_sect); - goto err; } - - rv =3D NO_ERROR; - - spin_lock_irq(&device->resource->req_lock); - if (device->state.conn < C_CONNECTED) { - unsigned int peer; - peer =3D be32_to_cpu(buffer->la_peer_max_bio_size); - peer =3D max(peer, DRBD_MAX_BIO_SIZE_SAFE); - device->peer_max_bio_size =3D peer; + if (from_id !=3D node_id2 && + peer_md[node_id2].bitmap_uuid !=3D peer_bm_uuid) { + copy_bitmap(device, from_id, node_id2); + modified =3D true; } - spin_unlock_irq(&device->resource->req_lock); =20 - err: - drbd_md_put_buffer(device); - - return rv; + return modified; } =20 -/** - * drbd_md_mark_dirty() - Mark meta data super block as dirty - * @device: DRBD device. - * - * Call this function if you change anything that should be written to - * the meta-data super block. This function sets MD_DIRTY, and starts a - * timer that ensures that within five seconds you have to call drbd_md_sy= nc(). - */ -void drbd_md_mark_dirty(struct drbd_device *device) +void drbd_uuid_detect_finished_resyncs(struct drbd_peer_device *peer_devic= e) { - if (!test_and_set_bit(MD_DIRTY, &device->flags)) - mod_timer(&device->md_sync_timer, jiffies + 5*HZ); -} + u64 peer_current_uuid =3D peer_device->current_uuid & ~UUID_PRIMARY; + struct drbd_device *device =3D peer_device->device; + struct drbd_peer_md *peer_md =3D device->ldev->md.peers; + const int my_node_id =3D device->resource->res_opts.node_id; + bool write_bm =3D false; + bool filled =3D false; + bool current_equal; + int node_id; =20 -void drbd_uuid_move_history(struct drbd_device *device) __must_hold(local) -{ - int i; + current_equal =3D peer_current_uuid =3D=3D (drbd_resolved_uuid(peer_devic= e, NULL) & ~UUID_PRIMARY) && + !(peer_device->uuid_flags & UUID_FLAG_SYNC_TARGET) && + !(peer_device->comm_uuid_flags & UUID_FLAG_SYNC_TARGET); =20 - for (i =3D UI_HISTORY_START; i < UI_HISTORY_END; i++) - device->ldev->md.uuid[i+1] =3D device->ldev->md.uuid[i]; -} + spin_lock_irq(&device->ldev->md.uuid_lock); =20 -void __drbd_uuid_set(struct drbd_device *device, int idx, u64 val) __must_= hold(local) -{ - if (idx =3D=3D UI_CURRENT) { - if (device->state.role =3D=3D R_PRIMARY) - val |=3D 1; - else - val &=3D ~((u64)1); + if (peer_device->repl_state[NOW] =3D=3D L_OFF && current_equal) { + u64 bm_to_peer =3D peer_device->comm_bitmap_uuid & ~UUID_PRIMARY; + u64 bm_towards_me =3D peer_device->bitmap_uuids[my_node_id] & ~UUID_PRIM= ARY; =20 - drbd_set_ed_uuid(device, val); + if (bm_towards_me !=3D 0 && bm_to_peer =3D=3D 0 && + bm_towards_me !=3D peer_current_uuid) { + if (peer_device->comm_bm_set =3D=3D 0 && peer_device->dirty_bits =3D=3D= 0) { + drbd_info(peer_device, "Peer missed end of resync, 0 to sync\n"); + if (peer_device->connection->agreed_pro_version < 124) + set_bit(RS_PEER_MISSED_END, &peer_device->flags); + } else { + drbd_info(peer_device, "Peer missed end of resync\n"); + set_bit(RS_PEER_MISSED_END, &peer_device->flags); + } + } + if (bm_towards_me =3D=3D 0 && bm_to_peer !=3D 0 && + bm_to_peer !=3D peer_current_uuid) { + if (peer_device->comm_bm_set =3D=3D 0 && peer_device->dirty_bits =3D=3D= 0) { + int peer_node_id =3D peer_device->node_id; + u64 previous =3D peer_md[peer_node_id].bitmap_uuid; + + drbd_info(peer_device, + "Missed end of resync as sync-source, no bits to sync\n"); + peer_md[peer_node_id].bitmap_uuid =3D 0; + _drbd_uuid_push_history(device, previous); + peer_device->comm_bitmap_uuid =3D 0; + drbd_md_mark_dirty(device); + if (peer_device->connection->agreed_pro_version < 124) + set_bit(RS_SOURCE_MISSED_END, &peer_device->flags); + } else { + drbd_info(peer_device, "Missed end of resync as sync-source\n"); + set_bit(RS_SOURCE_MISSED_END, &peer_device->flags); + } + } + spin_unlock_irq(&device->ldev->md.uuid_lock); + return; } =20 - device->ldev->md.uuid[idx] =3D val; - drbd_md_mark_dirty(device); -} - -void _drbd_uuid_set(struct drbd_device *device, int idx, u64 val) __must_h= old(local) -{ - unsigned long flags; - spin_lock_irqsave(&device->ldev->md.uuid_lock, flags); - __drbd_uuid_set(device, idx, val); - spin_unlock_irqrestore(&device->ldev->md.uuid_lock, flags); -} + for (node_id =3D 0; node_id < DRBD_NODE_ID_MAX; node_id++) { + struct drbd_peer_device *pd2; =20 -void drbd_uuid_set(struct drbd_device *device, int idx, u64 val) __must_ho= ld(local) -{ - unsigned long flags; - spin_lock_irqsave(&device->ldev->md.uuid_lock, flags); - if (device->ldev->md.uuid[idx]) { - drbd_uuid_move_history(device); - device->ldev->md.uuid[UI_HISTORY_START] =3D device->ldev->md.uuid[idx]; - } - __drbd_uuid_set(device, idx, val); - spin_unlock_irqrestore(&device->ldev->md.uuid_lock, flags); -} + if (node_id =3D=3D device->ldev->md.node_id) + continue; =20 -/** - * drbd_uuid_new_current() - Creates a new current UUID - * @device: DRBD device. - * - * Creates a new current UUID, and rotates the old current UUID into - * the bitmap slot. Causes an incremental resync upon next connect. - */ -void drbd_uuid_new_current(struct drbd_device *device) __must_hold(local) -{ - u64 val; - unsigned long long bm_uuid; + if (!(peer_md[node_id].flags & MDF_HAVE_BITMAP) && !(peer_md[node_id].fl= ags & MDF_NODE_EXISTS)) + continue; =20 - get_random_bytes(&val, sizeof(u64)); + pd2 =3D peer_device_by_node_id(device, node_id); + if (pd2 && pd2 !=3D peer_device && pd2->repl_state[NOW] > L_ESTABLISHED) + continue; =20 - spin_lock_irq(&device->ldev->md.uuid_lock); - bm_uuid =3D device->ldev->md.uuid[UI_BITMAP]; + if (peer_device->bitmap_uuids[node_id] =3D=3D 0 && peer_md[node_id].bitm= ap_uuid !=3D 0) { + int from_node_id; + + if (current_equal) { + u64 previous_bitmap_uuid =3D peer_md[node_id].bitmap_uuid; + peer_md[node_id].bitmap_uuid =3D 0; + _drbd_uuid_push_history(device, previous_bitmap_uuid); + if (node_id =3D=3D peer_device->node_id) + drbd_print_uuids(peer_device, "updated UUIDs"); + else if (peer_md[node_id].flags & MDF_HAVE_BITMAP) + forget_bitmap(device, node_id); + else + drbd_info(device, "Clearing bitmap UUID for node %d\n", + node_id); + drbd_md_mark_dirty(device); + write_bm =3D true; + } =20 - if (bm_uuid) - drbd_warn(device, "bm UUID was already set: %llX\n", bm_uuid); + from_node_id =3D find_node_id_by_bitmap_uuid(device, peer_current_uuid); + if (from_node_id !=3D -1 && node_id !=3D from_node_id && + dagtag_newer(peer_md[from_node_id].bitmap_dagtag, + peer_md[node_id].bitmap_dagtag)) { + if (peer_md[node_id].flags & MDF_HAVE_BITMAP && + peer_md[from_node_id].flags & MDF_HAVE_BITMAP) + copy_bitmap(device, from_node_id, node_id); + else + drbd_info(device, "Node %d synced up to node %d.\n", + node_id, from_node_id); + drbd_md_mark_dirty(device); + filled =3D true; + } + } + } =20 - device->ldev->md.uuid[UI_BITMAP] =3D device->ldev->md.uuid[UI_CURRENT]; - __drbd_uuid_set(device, UI_CURRENT, val); + write_bm |=3D detect_copy_ops_on_peer(peer_device); spin_unlock_irq(&device->ldev->md.uuid_lock); =20 - drbd_print_uuids(device, "new current UUID"); - /* get it to stable storage _now_ */ - drbd_md_sync(device); + if (write_bm || filled) { + u64 to_nodes =3D filled ? -1 : ~NODE_MASK(peer_device->node_id); + drbd_propagate_uuids(device, to_nodes); + drbd_suspend_io(device, WRITE_ONLY); + drbd_bm_lock(device, "detect_finished_resyncs()", BM_LOCK_BULK); + drbd_bm_write(device, NULL); + drbd_bm_unlock(device); + drbd_resume_io(device); + } } =20 -void drbd_uuid_set_bm(struct drbd_device *device, u64 val) __must_hold(loc= al) +int drbd_bmio_set_all_n_write(struct drbd_device *device, + struct drbd_peer_device *peer_device) { - unsigned long flags; - spin_lock_irqsave(&device->ldev->md.uuid_lock, flags); - if (device->ldev->md.uuid[UI_BITMAP] =3D=3D 0 && val =3D=3D 0) { - spin_unlock_irqrestore(&device->ldev->md.uuid_lock, flags); - return; - } - - if (val =3D=3D 0) { - drbd_uuid_move_history(device); - device->ldev->md.uuid[UI_HISTORY_START] =3D device->ldev->md.uuid[UI_BIT= MAP]; - device->ldev->md.uuid[UI_BITMAP] =3D 0; - } else { - unsigned long long bm_uuid =3D device->ldev->md.uuid[UI_BITMAP]; - if (bm_uuid) - drbd_warn(device, "bm UUID was already set: %llX\n", bm_uuid); - - device->ldev->md.uuid[UI_BITMAP] =3D val & ~((u64)1); - } - spin_unlock_irqrestore(&device->ldev->md.uuid_lock, flags); - - drbd_md_mark_dirty(device); + drbd_bm_set_all(device); + return drbd_bm_write(device, NULL); } =20 /** @@ -3384,22 +5617,21 @@ void drbd_uuid_set_bm(struct drbd_device *device, u= 64 val) __must_hold(local) * @device: DRBD device. * @peer_device: Peer DRBD device. * - * Sets all bits in the bitmap and writes the whole bitmap to stable stora= ge. + * Sets all bits in the bitmap towards one peer and writes the whole bitma= p to stable storage. */ int drbd_bmio_set_n_write(struct drbd_device *device, - struct drbd_peer_device *peer_device) __must_hold(local) - + struct drbd_peer_device *peer_device) { int rv =3D -EIO; =20 - drbd_md_set_flag(device, MDF_FULL_SYNC); + drbd_md_set_peer_flag(peer_device, MDF_PEER_FULL_SYNC); drbd_md_sync(device); - drbd_bm_set_all(device); + drbd_bm_set_many_bits(peer_device, 0, -1UL); =20 - rv =3D drbd_bm_write(device, peer_device); + rv =3D drbd_bm_write(device, NULL); =20 if (!rv) { - drbd_md_clear_flag(device, MDF_FULL_SYNC); + drbd_md_clear_peer_flag(peer_device, MDF_PEER_FULL_SYNC); drbd_md_sync(device); } =20 @@ -3407,67 +5639,109 @@ int drbd_bmio_set_n_write(struct drbd_device *devi= ce, } =20 /** - * drbd_bmio_clear_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bi= tmap_io() + * drbd_bmio_set_allocated_n_write() - io_fn for drbd_queue_bitmap_io() or= drbd_bitmap_io() + * @device: DRBD device. + * @peer_device: parameter ignored + * + * Sets all bits in all allocated bitmap slots and writes it to stable sto= rage. + */ +int drbd_bmio_set_allocated_n_write(struct drbd_device *device, + struct drbd_peer_device *peer_device) +{ + const int my_node_id =3D device->resource->res_opts.node_id; + struct drbd_md *md =3D &device->ldev->md; + int rv =3D -EIO; + int node_id, bitmap_index; + + for (node_id =3D 0; node_id < DRBD_NODE_ID_MAX; node_id++) { + if (node_id =3D=3D my_node_id) + continue; + bitmap_index =3D md->peers[node_id].bitmap_index; + if (bitmap_index =3D=3D -1) + continue; + _drbd_bm_set_many_bits(device, bitmap_index, 0, -1UL); + } + rv =3D drbd_bm_write(device, NULL); + + return rv; +} + +/** + * drbd_bmio_clear_all_n_write() - io_fn for drbd_queue_bitmap_io() or drb= d_bitmap_io() * @device: DRBD device. * @peer_device: Peer DRBD device. * * Clears all bits in the bitmap and writes the whole bitmap to stable sto= rage. */ -int drbd_bmio_clear_n_write(struct drbd_device *device, - struct drbd_peer_device *peer_device) __must_hold(local) - +int drbd_bmio_clear_all_n_write(struct drbd_device *device, + struct drbd_peer_device *peer_device) { drbd_resume_al(device); drbd_bm_clear_all(device); - return drbd_bm_write(device, peer_device); + return drbd_bm_write(device, NULL); +} + +int drbd_bmio_clear_one_peer(struct drbd_device *device, + struct drbd_peer_device *peer_device) +{ + drbd_bm_clear_many_bits(peer_device, 0, -1UL); + return drbd_bm_write(device, NULL); } =20 static int w_bitmap_io(struct drbd_work *w, int unused) { - struct drbd_device *device =3D - container_of(w, struct drbd_device, bm_io_work.w); - struct bm_io_work *work =3D &device->bm_io_work; + struct bm_io_work *work =3D + container_of(w, struct bm_io_work, w); + struct drbd_device *device =3D work->device; int rv =3D -EIO; =20 - if (work->flags !=3D BM_LOCKED_CHANGE_ALLOWED) { - int cnt =3D atomic_read(&device->ap_bio_cnt); - if (cnt) - drbd_err(device, "FIXME: ap_bio_cnt %d, expected 0; queued for '%s'\n", - cnt, work->why); - } - if (get_ldev(device)) { - drbd_bm_lock(device, work->why, work->flags); + if (work->flags & BM_LOCK_SINGLE_SLOT) + drbd_bm_slot_lock(work->peer_device, work->why, work->flags); + else + drbd_bm_lock(device, work->why, work->flags); rv =3D work->io_fn(device, work->peer_device); - drbd_bm_unlock(device); + if (work->flags & BM_LOCK_SINGLE_SLOT) + drbd_bm_slot_unlock(work->peer_device); + else + drbd_bm_unlock(device); put_ldev(device); } =20 - clear_bit_unlock(BITMAP_IO, &device->flags); - wake_up(&device->misc_wait); - if (work->done) - work->done(device, rv); + work->done(device, work->peer_device, rv); =20 - clear_bit(BITMAP_IO_QUEUED, &device->flags); - work->why =3D NULL; - work->flags =3D 0; + if (atomic_dec_and_test(&device->pending_bitmap_work.n)) + wake_up(&device->misc_wait); + kfree(work); =20 return 0; } =20 +void drbd_queue_pending_bitmap_work(struct drbd_device *device) +{ + unsigned long flags; + + spin_lock_irqsave(&device->pending_bitmap_work.q_lock, flags); + spin_lock(&device->resource->work.q_lock); + list_splice_tail_init(&device->pending_bitmap_work.q, &device->resource->= work.q); + spin_unlock(&device->resource->work.q_lock); + spin_unlock_irqrestore(&device->pending_bitmap_work.q_lock, flags); + wake_up(&device->resource->work.q_wait); +} + /** * drbd_queue_bitmap_io() - Queues an IO operation on the whole bitmap * @device: DRBD device. * @io_fn: IO callback to be called when bitmap IO is possible * @done: callback to be called after the bitmap IO was performed * @why: Descriptive text of the reason for doing the IO - * @flags: Bitmap flags + * @flags: Bitmap operation flags * @peer_device: Peer DRBD device. * * While IO on the bitmap happens we freeze application IO thus we ensure * that drbd_set_out_of_sync() can not be called. This function MAY ONLY be - * called from worker context. It MUST NOT be used while a previous such + * called from sender context. It MUST NOT be used while a previous such * work is still pending! * * Its worker function encloses the call of io_fn() by get_ldev() and @@ -3475,35 +5749,63 @@ static int w_bitmap_io(struct drbd_work *w, int unu= sed) */ void drbd_queue_bitmap_io(struct drbd_device *device, int (*io_fn)(struct drbd_device *, struct drbd_peer_device *), - void (*done)(struct drbd_device *, int), + void (*done)(struct drbd_device *, struct drbd_peer_device *, int), char *why, enum bm_flag flags, struct drbd_peer_device *peer_device) { - D_ASSERT(device, current =3D=3D peer_device->connection->worker.task); - - D_ASSERT(device, !test_bit(BITMAP_IO_QUEUED, &device->flags)); - D_ASSERT(device, !test_bit(BITMAP_IO, &device->flags)); - D_ASSERT(device, list_empty(&device->bm_io_work.w.list)); - if (device->bm_io_work.why) - drbd_err(device, "FIXME going to queue '%s' but '%s' still pending?\n", - why, device->bm_io_work.why); - - device->bm_io_work.peer_device =3D peer_device; - device->bm_io_work.io_fn =3D io_fn; - device->bm_io_work.done =3D done; - device->bm_io_work.why =3D why; - device->bm_io_work.flags =3D flags; - - spin_lock_irq(&device->resource->req_lock); - set_bit(BITMAP_IO, &device->flags); - /* don't wait for pending application IO if the caller indicates that - * application IO does not conflict anyways. */ - if (flags =3D=3D BM_LOCKED_CHANGE_ALLOWED || atomic_read(&device->ap_bio_= cnt) =3D=3D 0) { - if (!test_and_set_bit(BITMAP_IO_QUEUED, &device->flags)) - drbd_queue_work(&peer_device->connection->sender_work, - &device->bm_io_work.w); + struct bm_io_work *bm_io_work; + + D_ASSERT(device, current =3D=3D device->resource->worker.task); + + bm_io_work =3D kmalloc_obj(*bm_io_work, GFP_NOIO); + if (!bm_io_work) { + if (done) + done(device, peer_device, -ENOMEM); + return; } - spin_unlock_irq(&device->resource->req_lock); + bm_io_work->w.cb =3D w_bitmap_io; + bm_io_work->device =3D device; + bm_io_work->peer_device =3D peer_device; + bm_io_work->io_fn =3D io_fn; + bm_io_work->done =3D done; + bm_io_work->why =3D why; + bm_io_work->flags =3D flags; + + /* + * Whole-bitmap operations can only take place when there is no + * concurrent application I/O. We ensure exclusion between the two + * types of I/O with the following mechanism: + * + * - device->ap_bio_cnt keeps track of the number of application I/O + * requests in progress. + * + * - A non-empty device->pending_bitmap_work list indicates that + * whole-bitmap I/O operations are pending, and no new application + * I/O should be started. We make sure that the list doesn't appear + * empty system wide before trying to queue the whole-bitmap I/O. + * + * - In dec_ap_bio(), we decrement device->ap_bio_cnt. If it reaches + * zero and the device->pending_bitmap_work list is non-empty, we + * queue the whole-bitmap operations. + * + * - In inc_ap_bio(), we increment device->ap_bio_cnt before checking + * if the device->pending_bitmap_work list is non-empty. If + * device->pending_bitmap_work is non-empty, we immediately call + * dec_ap_bio(). + * + * This ensures that whenever there is pending whole-bitmap I/O, we + * realize in dec_ap_bio(). + * + */ + + /* no one should accidentally schedule the next bitmap IO + * when it is only half-queued yet */ + atomic_inc(&device->ap_bio_cnt[WRITE]); + atomic_inc(&device->pending_bitmap_work.n); + spin_lock_irq(&device->pending_bitmap_work.q_lock); + list_add_tail(&bm_io_work->w.list, &device->pending_bitmap_work.q); + spin_unlock_irq(&device->pending_bitmap_work.q_lock); + dec_ap_bio(device, WRITE); /* may move to actual work queue */ } =20 /** @@ -3511,11 +5813,11 @@ void drbd_queue_bitmap_io(struct drbd_device *devic= e, * @device: DRBD device. * @io_fn: IO callback to be called when bitmap IO is possible * @why: Descriptive text of the reason for doing the IO - * @flags: Bitmap flags + * @flags: Bitmap operation flags * @peer_device: Peer DRBD device. * * freezes application IO while that the actual IO operations runs. This - * functions MAY NOT be called from worker context. + * functions MAY NOT be called from sender context. */ int drbd_bitmap_io(struct drbd_device *device, int (*io_fn)(struct drbd_device *, struct drbd_peer_device *), @@ -3523,17 +5825,28 @@ int drbd_bitmap_io(struct drbd_device *device, struct drbd_peer_device *peer_device) { /* Only suspend io, if some operation is supposed to be locked out */ - const bool do_suspend_io =3D flags & (BM_DONT_CLEAR|BM_DONT_SET|BM_DONT_T= EST); + const bool do_suspend_io =3D flags & (BM_LOCK_CLEAR|BM_LOCK_SET|BM_LOCK_T= EST); int rv; =20 - D_ASSERT(device, current !=3D first_peer_device(device)->connection->work= er.task); + D_ASSERT(device, current !=3D device->resource->worker.task); + + if (!device->bitmap) + return 0; =20 if (do_suspend_io) - drbd_suspend_io(device); + drbd_suspend_io(device, WRITE_ONLY); + + if (flags & BM_LOCK_SINGLE_SLOT) + drbd_bm_slot_lock(peer_device, why, flags); + else + drbd_bm_lock(device, why, flags); =20 - drbd_bm_lock(device, why, flags); rv =3D io_fn(device, peer_device); - drbd_bm_unlock(device); + + if (flags & BM_LOCK_SINGLE_SLOT) + drbd_bm_slot_unlock(peer_device); + else + drbd_bm_unlock(device); =20 if (do_suspend_io) drbd_resume_io(device); @@ -3541,142 +5854,52 @@ int drbd_bitmap_io(struct drbd_device *device, return rv; } =20 -void drbd_md_set_flag(struct drbd_device *device, int flag) __must_hold(lo= cal) +void drbd_md_set_peer_flag(struct drbd_peer_device *peer_device, + enum mdf_peer_flag flag) { - if ((device->ldev->md.flags & flag) !=3D flag) { + struct drbd_device *device =3D peer_device->device; + struct drbd_md *md =3D &device->ldev->md; + + if (!(md->peers[peer_device->node_id].flags & flag)) { drbd_md_mark_dirty(device); - device->ldev->md.flags |=3D flag; + md->peers[peer_device->node_id].flags |=3D flag; } } =20 -void drbd_md_clear_flag(struct drbd_device *device, int flag) __must_hold(= local) +void drbd_md_clear_peer_flag(struct drbd_peer_device *peer_device, + enum mdf_peer_flag flag) { - if ((device->ldev->md.flags & flag) !=3D 0) { + struct drbd_device *device =3D peer_device->device; + struct drbd_md *md =3D &device->ldev->md; + + if (md->peers[peer_device->node_id].flags & flag) { drbd_md_mark_dirty(device); - device->ldev->md.flags &=3D ~flag; + md->peers[peer_device->node_id].flags &=3D ~flag; } } -int drbd_md_test_flag(struct drbd_backing_dev *bdev, int flag) + +int drbd_md_test_flag(struct drbd_backing_dev *bdev, enum mdf_flag flag) { return (bdev->md.flags & flag) !=3D 0; } =20 -static void md_sync_timer_fn(struct timer_list *t) +bool drbd_md_test_peer_flag(struct drbd_peer_device *peer_device, enum mdf= _peer_flag flag) { - struct drbd_device *device =3D timer_container_of(device, t, - md_sync_timer); - drbd_device_post_work(device, MD_SYNC); -} + struct drbd_md *md =3D &peer_device->device->ldev->md; =20 -const char *cmdname(enum drbd_packet cmd) -{ - /* THINK may need to become several global tables - * when we want to support more than - * one PRO_VERSION */ - static const char *cmdnames[] =3D { - - [P_DATA] =3D "Data", - [P_DATA_REPLY] =3D "DataReply", - [P_RS_DATA_REPLY] =3D "RSDataReply", - [P_BARRIER] =3D "Barrier", - [P_BITMAP] =3D "ReportBitMap", - [P_BECOME_SYNC_TARGET] =3D "BecomeSyncTarget", - [P_BECOME_SYNC_SOURCE] =3D "BecomeSyncSource", - [P_UNPLUG_REMOTE] =3D "UnplugRemote", - [P_DATA_REQUEST] =3D "DataRequest", - [P_RS_DATA_REQUEST] =3D "RSDataRequest", - [P_SYNC_PARAM] =3D "SyncParam", - [P_PROTOCOL] =3D "ReportProtocol", - [P_UUIDS] =3D "ReportUUIDs", - [P_SIZES] =3D "ReportSizes", - [P_STATE] =3D "ReportState", - [P_SYNC_UUID] =3D "ReportSyncUUID", - [P_AUTH_CHALLENGE] =3D "AuthChallenge", - [P_AUTH_RESPONSE] =3D "AuthResponse", - [P_STATE_CHG_REQ] =3D "StateChgRequest", - [P_PING] =3D "Ping", - [P_PING_ACK] =3D "PingAck", - [P_RECV_ACK] =3D "RecvAck", - [P_WRITE_ACK] =3D "WriteAck", - [P_RS_WRITE_ACK] =3D "RSWriteAck", - [P_SUPERSEDED] =3D "Superseded", - [P_NEG_ACK] =3D "NegAck", - [P_NEG_DREPLY] =3D "NegDReply", - [P_NEG_RS_DREPLY] =3D "NegRSDReply", - [P_BARRIER_ACK] =3D "BarrierAck", - [P_STATE_CHG_REPLY] =3D "StateChgReply", - [P_OV_REQUEST] =3D "OVRequest", - [P_OV_REPLY] =3D "OVReply", - [P_OV_RESULT] =3D "OVResult", - [P_CSUM_RS_REQUEST] =3D "CsumRSRequest", - [P_RS_IS_IN_SYNC] =3D "CsumRSIsInSync", - [P_SYNC_PARAM89] =3D "SyncParam89", - [P_COMPRESSED_BITMAP] =3D "CBitmap", - [P_DELAY_PROBE] =3D "DelayProbe", - [P_OUT_OF_SYNC] =3D "OutOfSync", - [P_RS_CANCEL] =3D "RSCancel", - [P_CONN_ST_CHG_REQ] =3D "conn_st_chg_req", - [P_CONN_ST_CHG_REPLY] =3D "conn_st_chg_reply", - [P_PROTOCOL_UPDATE] =3D "protocol_update", - [P_TRIM] =3D "Trim", - [P_RS_THIN_REQ] =3D "rs_thin_req", - [P_RS_DEALLOCATED] =3D "rs_deallocated", - [P_WSAME] =3D "WriteSame", - [P_ZEROES] =3D "Zeroes", - - /* enum drbd_packet, but not commands - obsoleted flags: - * P_MAY_IGNORE - * P_MAX_OPT_CMD - */ - }; + if (peer_device->bitmap_index =3D=3D -1) + return false; =20 - /* too big for the array: 0xfffX */ - if (cmd =3D=3D P_INITIAL_META) - return "InitialMeta"; - if (cmd =3D=3D P_INITIAL_DATA) - return "InitialData"; - if (cmd =3D=3D P_CONNECTION_FEATURES) - return "ConnectionFeatures"; - if (cmd >=3D ARRAY_SIZE(cmdnames)) - return "Unknown"; - return cmdnames[cmd]; + return md->peers[peer_device->node_id].flags & flag; } =20 -/** - * drbd_wait_misc - wait for a request to make progress - * @device: device associated with the request - * @i: the struct drbd_interval embedded in struct drbd_request or - * struct drbd_peer_request - */ -int drbd_wait_misc(struct drbd_device *device, struct drbd_interval *i) +static void md_sync_timer_fn(struct timer_list *t) { - struct net_conf *nc; - DEFINE_WAIT(wait); - long timeout; - - rcu_read_lock(); - nc =3D rcu_dereference(first_peer_device(device)->connection->net_conf); - if (!nc) { - rcu_read_unlock(); - return -ETIMEDOUT; - } - timeout =3D nc->ko_count ? nc->timeout * HZ / 10 * nc->ko_count : MAX_SCH= EDULE_TIMEOUT; - rcu_read_unlock(); - - /* Indicate to wake up device->misc_wait on progress. */ - i->waiting =3D true; - prepare_to_wait(&device->misc_wait, &wait, TASK_INTERRUPTIBLE); - spin_unlock_irq(&device->resource->req_lock); - timeout =3D schedule_timeout(timeout); - finish_wait(&device->misc_wait, &wait); - spin_lock_irq(&device->resource->req_lock); - if (!timeout || device->state.conn < C_CONNECTED) - return -ETIMEDOUT; - if (signal_pending(current)) - return -ERESTARTSYS; - return 0; + struct drbd_device *device =3D timer_container_of(device, t, md_sync_time= r); + drbd_device_post_work(device, MD_SYNC); } =20 + void lock_all_resources(void) { struct drbd_resource *resource; @@ -3685,7 +5908,7 @@ void lock_all_resources(void) mutex_lock(&resources_mutex); local_irq_disable(); for_each_resource(resource, &drbd_resources) - spin_lock_nested(&resource->req_lock, i++); + read_lock(&resource->state_rwlock); } =20 void unlock_all_resources(void) @@ -3693,11 +5916,141 @@ void unlock_all_resources(void) struct drbd_resource *resource; =20 for_each_resource(resource, &drbd_resources) - spin_unlock(&resource->req_lock); + read_unlock(&resource->state_rwlock); local_irq_enable(); mutex_unlock(&resources_mutex); } =20 +long twopc_timeout(struct drbd_resource *resource) +{ + return resource->res_opts.twopc_timeout * HZ/10; +} + +u64 directly_connected_nodes(struct drbd_resource *resource, enum which_st= ate which) +{ + u64 directly_connected =3D 0; + struct drbd_connection *connection; + + rcu_read_lock(); + for_each_connection_rcu(connection, resource) { + if (connection->cstate[which] < C_CONNECTED) + continue; + directly_connected |=3D NODE_MASK(connection->peer_node_id); + } + rcu_read_unlock(); + + return directly_connected; +} + +static sector_t bm_sect_to_max_capacity(const struct drbd_md *md, sector_t= bm_sect) +{ + /* we do our meta data IO in 4k units */ + u64 bm_bytes =3D ALIGN_DOWN(bm_sect << SECTOR_SHIFT, 4096); + u64 bm_bytes_per_peer =3D div_u64(bm_bytes, md->max_peers); + u64 bm_bits_per_peer =3D bm_bytes_per_peer * BITS_PER_BYTE; + return bm_bits_per_peer << (md->bm_block_shift - SECTOR_SHIFT); +} + + +/** + * drbd_get_max_capacity() - Returns the capacity for user-data on the loc= al backing device + * @device: The DRBD device. + * @bdev: Meta data block device. + * @warn: Whether to warn when size is clipped. + * + * This function returns the capacity for user-data on the local backing + * device. In the case of internal meta-data, this is the backing disk size + * reduced by the meta-data size. In the case of external meta-data, this = is + * the size of the backing disk. + */ +sector_t drbd_get_max_capacity( + struct drbd_device *device, struct drbd_backing_dev *bdev, bool warn) +{ + unsigned int bm_max_peers =3D bdev->md.max_peers; + unsigned int bm_block_size =3D bdev->md.bm_block_size; + sector_t backing_bdev_capacity =3D drbd_get_capacity(bdev->backing_bdev); + sector_t bm_sect; + sector_t backing_capacity_remaining; + sector_t metadata_limit; + sector_t max_capacity; + + switch (bdev->md.meta_dev_idx) { + case DRBD_MD_INDEX_INTERNAL: + case DRBD_MD_INDEX_FLEX_INT: + bm_sect =3D bdev->md.al_offset - bdev->md.bm_offset; + backing_capacity_remaining =3D drbd_md_first_sector(bdev); + break; + case DRBD_MD_INDEX_FLEX_EXT: + bm_sect =3D bdev->md.md_size_sect - bdev->md.bm_offset; + backing_capacity_remaining =3D backing_bdev_capacity; + break; + default: + bm_sect =3D DRBD_BM_SECTORS_INDEXED; + backing_capacity_remaining =3D backing_bdev_capacity; + } + + metadata_limit =3D bm_sect_to_max_capacity(&bdev->md, bm_sect); + + dynamic_drbd_dbg(device, + "Backing device capacity: %llus, remaining: %llus, bitmap sectors: %llu= s\n", + (unsigned long long) backing_bdev_capacity, + (unsigned long long) backing_capacity_remaining, + (unsigned long long) bm_sect); + dynamic_drbd_dbg(device, + "Max peers: %u, bytes_per_bit: %u, metadata limit: %llus, hard limit: %= llus\n", + bm_max_peers, bm_block_size, + (unsigned long long) metadata_limit, + (unsigned long long) DRBD_MAX_SECTORS); + + max_capacity =3D backing_capacity_remaining; + if (max_capacity > DRBD_MAX_SECTORS) { + if (warn) + drbd_warn(device, "Device size clipped from %llus to %llus due to DRBD = limitations\n", + (unsigned long long) max_capacity, + (unsigned long long) DRBD_MAX_SECTORS); + max_capacity =3D DRBD_MAX_SECTORS; + } + if (max_capacity > metadata_limit) { + if (warn) + drbd_warn(device, "Device size clipped from %llus to %llus due to metad= ata size\n", + (unsigned long long) max_capacity, + (unsigned long long) metadata_limit); + max_capacity =3D metadata_limit; + } + return max_capacity; +} + +/* this is about cluster partitions, not block device partitions */ +sector_t drbd_partition_data_capacity(struct drbd_device *device) +{ + struct drbd_peer_device *peer_device; + sector_t capacity =3D (sector_t)(-1); + + rcu_read_lock(); + for_each_peer_device_rcu(peer_device, device) { + if (test_bit(HAVE_SIZES, &peer_device->flags)) { + dynamic_drbd_dbg(peer_device, "d_size: %llus\n", + (unsigned long long)peer_device->d_size); + capacity =3D min_not_zero(capacity, peer_device->d_size); + } + } + rcu_read_unlock(); + + if (get_ldev_if_state(device, D_ATTACHING)) { + /* In case we somehow end up here while attaching, but before + * we even assigned the ldev, pretend to still be diskless. + */ + if (device->ldev !=3D NULL) { + sector_t local_capacity =3D drbd_local_max_size(device); + + capacity =3D min_not_zero(capacity, local_capacity); + } + put_ldev(device); + } + + return capacity !=3D (sector_t)(-1) ? capacity : 0; +} + #ifdef CONFIG_DRBD_FAULT_INJECTION /* Fault insertion support including random number generator shamelessly * stolen from kernel/rcutorture.c */ @@ -3741,6 +6094,7 @@ _drbd_fault_str(unsigned int type) { [DRBD_FAULT_BM_ALLOC] =3D "BM allocation", [DRBD_FAULT_AL_EE] =3D "EE allocation", [DRBD_FAULT_RECEIVE] =3D "receive data corruption", + [DRBD_FAULT_BIO_TOO_SMALL] =3D "BIO too small", }; =20 return (type < DRBD_FAULT_MAX) ? _faults[type] : "**Unknown**"; @@ -3753,14 +6107,13 @@ _drbd_insert_fault(struct drbd_device *device, unsi= gned int type) =20 unsigned int ret =3D ( (drbd_fault_devs =3D=3D 0 || - ((1 << device_to_minor(device)) & drbd_fault_devs) !=3D 0) && + ((1 << device->minor) & drbd_fault_devs) !=3D 0) && (((_drbd_fault_random(&rrs) % 100) + 1) <=3D drbd_fault_rate)); =20 if (ret) { drbd_fault_count++; =20 - if (drbd_ratelimit()) - drbd_warn(device, "***Simulating %s failure\n", + drbd_warn_ratelimit(device, "***Simulating %s failure\n", _drbd_fault_str(type)); } =20 @@ -3771,7 +6124,6 @@ _drbd_insert_fault(struct drbd_device *device, unsign= ed int type) module_init(drbd_init) module_exit(drbd_cleanup) =20 -EXPORT_SYMBOL(drbd_conn_str); -EXPORT_SYMBOL(drbd_role_str); -EXPORT_SYMBOL(drbd_disk_str); -EXPORT_SYMBOL(drbd_set_st_err_str); +/* For transport layer */ +EXPORT_SYMBOL(drbd_destroy_connection); +EXPORT_SYMBOL(drbd_destroy_path); --=20 2.53.0