Rework drbd_main.c to align the module core with the DRBD 9 multi-peer
architecture introduced by the surrounding header and transport commits.
Refactor all packet sending around a page-based send buffer with
explicit cork/uncork semantics driven by the transport layer,
replacing the old per-socket static buffer and direct socket calls.
Move the transfer log from per-connection to per-resource scope, and
switch its traversal to RCU, allowing safe concurrent walks without
the coarse req_lock spinlock.
Rewrite UUID management for multi-peer: the fixed 4-slot layout is
replaced by a per-device current UUID, per-peer bitmap UUIDs, and a
history array.
This enables DRBD 9 to track resyncs across more than one peer
simultaneously. The on-disk metadata format is extended to match.
Separate the resource and connection lifecycles so that resources and
connections are created, torn down, and reference-counted
independently, with threads scoped appropriately to each object.
Add quorum-aware auto-promote semantics to the block device
open/release path.
Co-developed-by: Philipp Reisner <philipp.reisner@linbit.com>
Signed-off-by: Philipp Reisner <philipp.reisner@linbit.com>
Co-developed-by: Lars Ellenberg <lars.ellenberg@linbit.com>
Signed-off-by: Lars Ellenberg <lars.ellenberg@linbit.com>
Co-developed-by: Joel Colledge <joel.colledge@linbit.com>
Signed-off-by: Joel Colledge <joel.colledge@linbit.com>
Co-developed-by: Christoph Böhmwalder <christoph.boehmwalder@linbit.com>
Signed-off-by: Christoph Böhmwalder <christoph.boehmwalder@linbit.com>
---
drivers/block/drbd/drbd_main.c | 6008 ++++++++++++++++++++++----------
1 file changed, 4180 insertions(+), 1828 deletions(-)
diff --git a/drivers/block/drbd/drbd_main.c b/drivers/block/drbd/drbd_main.c
index 200d464e984b..acce6c4b4a16 100644
--- a/drivers/block/drbd/drbd_main.c
+++ b/drivers/block/drbd/drbd_main.c
@@ -1,6 +1,6 @@
// SPDX-License-Identifier: GPL-2.0-only
/*
- drbd.c
+ drbd_main.c
This file is part of DRBD by Philipp Reisner and Lars Ellenberg.
@@ -19,41 +19,47 @@
#include <linux/module.h>
#include <linux/jiffies.h>
#include <linux/drbd.h>
-#include <linux/uaccess.h>
#include <asm/types.h>
+#include <net/net_namespace.h>
#include <net/sock.h>
#include <linux/ctype.h>
-#include <linux/mutex.h>
#include <linux/fs.h>
#include <linux/file.h>
#include <linux/proc_fs.h>
#include <linux/init.h>
#include <linux/mm.h>
-#include <linux/memcontrol.h>
+#include <linux/memcontrol.h> /* needed on kernels <4.3 */
#include <linux/mm_inline.h>
#include <linux/slab.h>
#include <linux/string.h>
-#include <linux/random.h>
-#include <linux/reboot.h>
#include <linux/notifier.h>
-#include <linux/kthread.h>
#include <linux/workqueue.h>
-#include <linux/unistd.h>
+#include <linux/kthread.h>
#include <linux/vmalloc.h>
-#include <linux/sched/signal.h>
+#include <linux/dynamic_debug.h>
+#include <linux/libnvdimm.h>
+#include <linux/swab.h>
+#include <linux/overflow.h>
#include <linux/drbd_limits.h>
#include "drbd_int.h"
#include "drbd_protocol.h"
-#include "drbd_req.h" /* only for _req_mod in tl_release and tl_clear */
+#include "drbd_req.h"
#include "drbd_vli.h"
#include "drbd_debugfs.h"
+#include "drbd_meta_data.h"
+#include "drbd_legacy_84.h"
+#include "drbd_dax_pmem.h"
-static DEFINE_MUTEX(drbd_main_mutex);
-static int drbd_open(struct gendisk *disk, blk_mode_t mode);
+static int drbd_open(struct gendisk *gd, blk_mode_t mode);
static void drbd_release(struct gendisk *gd);
static void md_sync_timer_fn(struct timer_list *t);
static int w_bitmap_io(struct drbd_work *w, int unused);
+static int flush_send_buffer(struct drbd_connection *connection, enum drbd_stream drbd_stream);
+static u64 __set_bitmap_slots(struct drbd_device *device, u64 bitmap_uuid, u64 do_nodes);
+static u64 __test_bitmap_slots(struct drbd_device *device);
+static void drbd_send_ping_ack_wf(struct work_struct *ws);
+static void __net_exit __drbd_net_exit(struct net *net);
MODULE_AUTHOR("Philipp Reisner <phil@linbit.com>, "
"Lars Ellenberg <lars@linbit.com>");
@@ -63,16 +69,16 @@ MODULE_LICENSE("GPL");
MODULE_PARM_DESC(minor_count, "Approximate number of drbd devices ("
__stringify(DRBD_MINOR_COUNT_MIN) "-" __stringify(DRBD_MINOR_COUNT_MAX) ")");
MODULE_ALIAS_BLOCKDEV_MAJOR(DRBD_MAJOR);
+MODULE_SOFTDEP("post: handshake");
#include <linux/moduleparam.h>
-/* thanks to these macros, if compiled into the kernel (not-module),
- * these become boot parameters (e.g., drbd.minor_count) */
#ifdef CONFIG_DRBD_FAULT_INJECTION
int drbd_enable_faults;
int drbd_fault_rate;
static int drbd_fault_count;
static int drbd_fault_devs;
+
/* bitmap of enabled faults */
module_param_named(enable_faults, drbd_enable_faults, int, 0664);
/* fault rate % value - applies to all enabled faults */
@@ -84,15 +90,12 @@ module_param_named(fault_devs, drbd_fault_devs, int, 0644);
#endif
/* module parameters we can keep static */
-static bool drbd_allow_oos; /* allow_open_on_secondary */
static bool drbd_disable_sendpage;
+static bool drbd_allow_oos; /* allow_open_on_secondary */
MODULE_PARM_DESC(allow_oos, "DONT USE!");
-module_param_named(allow_oos, drbd_allow_oos, bool, 0);
module_param_named(disable_sendpage, drbd_disable_sendpage, bool, 0644);
+module_param_named(allow_oos, drbd_allow_oos, bool, 0);
-/* module parameters we share */
-int drbd_proc_details; /* Detail level in proc drbd*/
-module_param_named(proc_details, drbd_proc_details, int, 0644);
/* module parameters shared with defaults */
unsigned int drbd_minor_count = DRBD_MINOR_COUNT_DEF;
/* Module parameter for setting the user mode helper program
@@ -101,16 +104,60 @@ char drbd_usermode_helper[80] = "/sbin/drbdadm";
module_param_named(minor_count, drbd_minor_count, uint, 0444);
module_param_string(usermode_helper, drbd_usermode_helper, sizeof(drbd_usermode_helper), 0644);
+static int param_set_drbd_protocol_version(const char *s, const struct kernel_param *kp)
+{
+ unsigned long long tmp;
+ unsigned int *res = kp->arg;
+ int rv;
+
+ rv = kstrtoull(s, 0, &tmp);
+ if (rv < 0)
+ return rv;
+ if (!drbd_protocol_version_acceptable(tmp))
+ return -ERANGE;
+ *res = tmp;
+ return 0;
+}
+
+#define param_check_drbd_protocol_version param_check_uint
+#define param_get_drbd_protocol_version param_get_uint
+
+static const struct kernel_param_ops param_ops_drbd_protocol_version = {
+ .set = param_set_drbd_protocol_version,
+ .get = param_get_drbd_protocol_version,
+};
+
+unsigned int drbd_protocol_version_min = PRO_VERSION_8_MIN;
+module_param_named(protocol_version_min, drbd_protocol_version_min, drbd_protocol_version, 0644);
+#define protocol_version_min_desc \
+ "\n\t\tReject DRBD dialects older than this.\n\t\t" \
+ "Supported: " \
+ "DRBD 8 [" __stringify(PRO_VERSION_8_MIN) "-" __stringify(PRO_VERSION_8_MAX) "]; " \
+ "DRBD 9 [" __stringify(PRO_VERSION_MIN) "-" __stringify(PRO_VERSION_MAX) "].\n\t\t" \
+ "Default: " __stringify(PRO_VERSION_8_MIN)
+MODULE_PARM_DESC(protocol_version_min, protocol_version_min_desc);
+
+#define param_check_drbd_strict_names param_check_bool
+#define param_get_drbd_strict_names param_get_bool
+const struct kernel_param_ops param_ops_drbd_strict_names = {
+ .set = param_set_drbd_strict_names,
+ .get = param_get_drbd_strict_names,
+};
+bool drbd_strict_names = true;
+MODULE_PARM_DESC(strict_names, "restrict resource and connection names to ascii alnum and a subset of punct");
+module_param_named(strict_names, drbd_strict_names, drbd_strict_names, 0644);
+
/* in 2.6.x, our device mapping and config info contains our virtual gendisks
* as member "struct gendisk *vdisk;"
*/
struct idr drbd_devices;
struct list_head drbd_resources;
-struct mutex resources_mutex;
+static DEFINE_SPINLOCK(drbd_devices_lock);
+DEFINE_MUTEX(resources_mutex);
+struct workqueue_struct *ping_ack_sender;
struct kmem_cache *drbd_request_cache;
struct kmem_cache *drbd_ee_cache; /* peer requests */
-struct kmem_cache *drbd_bm_ext_cache; /* bitmap extents */
struct kmem_cache *drbd_al_ext_cache; /* activity log extents */
mempool_t drbd_request_mempool;
mempool_t drbd_ee_mempool;
@@ -119,8 +166,6 @@ mempool_t drbd_buffer_page_pool;
struct bio_set drbd_md_io_bio_set;
struct bio_set drbd_io_bio_set;
-DEFINE_RATELIMIT_STATE(drbd_ratelimit_state, 5 * HZ, 5);
-
static const struct block_device_operations drbd_ops = {
.owner = THIS_MODULE,
.submit_bio = drbd_submit_bio,
@@ -128,71 +173,241 @@ static const struct block_device_operations drbd_ops = {
.release = drbd_release,
};
-#ifdef __CHECKER__
-/* When checking with sparse, and this is an inline function, sparse will
- give tons of false positives. When this is a real functions sparse works.
- */
-int _get_ldev_if_state(struct drbd_device *device, enum drbd_disk_state mins)
+static struct pernet_operations drbd_pernet_ops = {
+ .exit = __drbd_net_exit,
+};
+
+struct drbd_connection *__drbd_next_connection_ref(u64 *visited,
+ struct drbd_connection *connection,
+ struct drbd_resource *resource)
+{
+ int node_id;
+
+ rcu_read_lock();
+ if (!connection) {
+ connection = list_first_or_null_rcu(&resource->connections,
+ struct drbd_connection,
+ connections);
+ *visited = 0;
+ } else {
+ struct list_head *pos;
+ bool previous_visible; /* on the resources connections list */
+
+ pos = list_next_rcu(&connection->connections);
+ /* follow the pointer first, then check if the previous element was
+ still an element on the list of visible connections. */
+ smp_rmb();
+ previous_visible = !test_bit(C_UNREGISTERED, &connection->flags);
+
+ kref_put(&connection->kref, drbd_destroy_connection);
+
+ if (pos == &resource->connections) {
+ connection = NULL;
+ } else if (previous_visible) { /* visible -> we are now on a vital element */
+ connection = list_entry_rcu(pos, struct drbd_connection, connections);
+ } else { /* not visible -> pos might point to a dead element now */
+ for_each_connection_rcu(connection, resource) {
+ node_id = connection->peer_node_id;
+ if (!(*visited & NODE_MASK(node_id)))
+ goto found;
+ }
+ connection = NULL;
+ }
+ }
+
+ if (connection) {
+ found:
+ node_id = connection->peer_node_id;
+ *visited |= NODE_MASK(node_id);
+
+ kref_get(&connection->kref);
+ }
+
+ rcu_read_unlock();
+ return connection;
+}
+
+
+struct drbd_peer_device *__drbd_next_peer_device_ref(u64 *visited,
+ struct drbd_peer_device *peer_device,
+ struct drbd_device *device)
{
- int io_allowed;
+ rcu_read_lock();
+ if (!peer_device) {
+ peer_device = list_first_or_null_rcu(&device->peer_devices,
+ struct drbd_peer_device,
+ peer_devices);
+ *visited = 0;
+ } else {
+ struct list_head *pos;
+ bool previous_visible;
+
+ pos = list_next_rcu(&peer_device->peer_devices);
+ smp_rmb();
+ previous_visible = !test_bit(C_UNREGISTERED, &peer_device->connection->flags);
+
+ kref_put(&peer_device->connection->kref, drbd_destroy_connection);
- atomic_inc(&device->local_cnt);
- io_allowed = (device->state.disk >= mins);
- if (!io_allowed) {
- if (atomic_dec_and_test(&device->local_cnt))
- wake_up(&device->misc_wait);
+ if (pos == &device->peer_devices) {
+ peer_device = NULL;
+ } else if (previous_visible) {
+ peer_device = list_entry_rcu(pos, struct drbd_peer_device, peer_devices);
+ } else {
+ for_each_peer_device_rcu(peer_device, device) {
+ if (!(*visited & NODE_MASK(peer_device->node_id)))
+ goto found;
+ }
+ peer_device = NULL;
+ }
+ }
+
+ if (peer_device) {
+ found:
+ *visited |= NODE_MASK(peer_device->node_id);
+
+ kref_get(&peer_device->connection->kref);
}
- return io_allowed;
+
+ rcu_read_unlock();
+ return peer_device;
}
-#endif
+static void dump_epoch(struct drbd_resource *resource, int node_id, int epoch)
+{
+ struct drbd_request *req;
+ bool found_epoch = false;
+
+ list_for_each_entry_rcu(req, &resource->transfer_log, tl_requests) {
+ if (!found_epoch && req->epoch == epoch)
+ found_epoch = true;
+
+ if (found_epoch) {
+ if (req->epoch != epoch)
+ break;
+ drbd_info(req->device, "XXX %u %llu+%u 0x%x 0x%x\n",
+ req->epoch,
+ (unsigned long long)req->i.sector, req->i.size >> 9,
+ req->local_rq_state, req->net_rq_state[node_id]
+ );
+ }
+ }
+}
/**
* tl_release() - mark as BARRIER_ACKED all requests in the corresponding transfer log epoch
* @connection: DRBD connection.
+ * @o_block_id: "block id" aka expected pointer address of the oldest request
+ * @y_block_id: "block id" aka expected pointer address of the youngest request
+ * confirmed to be on stable storage.
* @barrier_nr: Expected identifier of the DRBD write barrier packet.
- * @set_size: Expected number of requests before that barrier.
+ * @set_size: Expected number of requests before that barrier, respectively
+ * number of requests in the interval [o_block_id;y_block_id]
+ *
+ * Called for both P_BARRIER_ACK and P_CONFIRM_STABLE,
+ * which is similar to an unsolicited partial barrier ack.
+ *
+ * Either barrier_nr (for barrier acks) or both o_block_id and y_blockid (for
+ * confirm stable) are given. For barrier acks, all requests in the epoch
+ * designated by "barrier_nr" are confirmed to be on stable storage.
+ *
+ * For confirm stable, both o_block_id and y_block_id are given, barrier_nr is
+ * ignored, and all requests from "o_block_id" up to and including y_block_id
+ * are confirmed to be on stable storage on the reporting peer.
*
* In case the passed barrier_nr or set_size does not match the oldest
* epoch of not yet barrier-acked requests, this function will cause a
* termination of the connection.
*/
-void tl_release(struct drbd_connection *connection, unsigned int barrier_nr,
+int tl_release(struct drbd_connection *connection,
+ uint64_t o_block_id,
+ uint64_t y_block_id,
+ unsigned int barrier_nr,
unsigned int set_size)
{
+ struct drbd_resource *resource = connection->resource;
+ const int idx = connection->peer_node_id;
struct drbd_request *r;
- struct drbd_request *req = NULL, *tmp = NULL;
+ struct drbd_request *req = NULL;
+ struct drbd_request *req_y = NULL;
int expect_epoch = 0;
int expect_size = 0;
- spin_lock_irq(&connection->resource->req_lock);
-
+ rcu_read_lock();
/* find oldest not yet barrier-acked write request,
* count writes in its epoch. */
- list_for_each_entry(r, &connection->transfer_log, tl_requests) {
- const unsigned s = r->rq_state;
+ r = READ_ONCE(connection->req_not_net_done);
+ if (r == NULL) {
+ drbd_err(connection, "BarrierAck #%u received, but req_not_net_done = NULL\n",
+ barrier_nr);
+ goto bail;
+ }
+ smp_rmb(); /* paired with smp_wmb() in set_cache_ptr_if_null() */
+ list_for_each_entry_from_rcu(r, &resource->transfer_log, tl_requests) {
+ unsigned int local_rq_state, net_rq_state;
+
+ spin_lock_irq(&r->rq_lock);
+ local_rq_state = r->local_rq_state;
+ net_rq_state = r->net_rq_state[idx];
+ spin_unlock_irq(&r->rq_lock);
+
if (!req) {
- if (!(s & RQ_WRITE))
+ if (!(local_rq_state & RQ_WRITE))
continue;
- if (!(s & RQ_NET_MASK))
+ if (!(net_rq_state & RQ_NET_MASK))
continue;
- if (s & RQ_NET_DONE)
+ if (net_rq_state & RQ_NET_DONE)
continue;
req = r;
expect_epoch = req->epoch;
- expect_size ++;
+ expect_size++;
} else {
+ const u16 s = r->net_rq_state[idx];
if (r->epoch != expect_epoch)
break;
- if (!(s & RQ_WRITE))
+ if (!(local_rq_state & RQ_WRITE))
continue;
- /* if (s & RQ_DONE): not expected */
- /* if (!(s & RQ_NET_MASK)): not expected */
+ /* probably a "send_out_of_sync", during Ahead/Behind mode,
+ * while at least one volume already started to resync again.
+ * Or a write that was not replicated during a resync, and
+ * replication has been enabled since it was submitted.
+ */
+ if ((s & RQ_NET_MASK) && !(s & RQ_EXP_BARR_ACK))
+ continue;
+ if (s & RQ_NET_DONE || (s & RQ_NET_MASK) == 0) {
+ drbd_warn(connection, "unexpected state flags: 0x%x during BarrierAck #%u\n",
+ s, barrier_nr);
+ }
expect_size++;
}
+ if (y_block_id && (struct drbd_request *)(unsigned long)y_block_id == r) {
+ req_y = r;
+ break;
+ }
}
/* first some paranoia code */
+ if (o_block_id) {
+ if ((struct drbd_request *)(unsigned long)o_block_id != req) {
+ drbd_err(connection, "BAD! ConfirmedStable: expected %p, found %p\n",
+ (struct drbd_request *)(unsigned long)o_block_id, req);
+ goto bail;
+ }
+ if (!req_y) {
+ drbd_err(connection, "BAD! ConfirmedStable: expected youngest request %p NOT found\n",
+ (struct drbd_req *)(unsigned long)y_block_id);
+ goto bail;
+ }
+ /* A P_CONFIRM_STABLE cannot tell me the to-be-expected barrier nr,
+ * it does not know it yet. But we just confirmed it knew the
+ * expected request, so just use that one. */
+ barrier_nr = expect_epoch;
+ /* Both requests referenced must be in the same epoch. */
+ if (req_y->epoch != expect_epoch) {
+ drbd_err(connection, "BAD! ConfirmedStable: reported requests not in the same epoch (%u != %u)\n",
+ req->epoch, req_y->epoch);
+ goto bail;
+ }
+ }
if (req == NULL) {
drbd_err(connection, "BAD! BarrierAck #%u received, but no epoch in tl!?\n",
barrier_nr);
@@ -205,111 +420,135 @@ void tl_release(struct drbd_connection *connection, unsigned int barrier_nr,
}
if (expect_size != set_size) {
- drbd_err(connection, "BAD! BarrierAck #%u received with n_writes=%u, expected n_writes=%u!\n",
- barrier_nr, set_size, expect_size);
+ if (!o_block_id) {
+ DEFINE_DYNAMIC_DEBUG_METADATA(ddm, "Bad barrier ack dump");
+
+ drbd_err(connection, "BAD! BarrierAck #%u received with n_writes=%u, expected n_writes=%u!\n",
+ barrier_nr, set_size, expect_size);
+
+ if (DYNAMIC_DEBUG_BRANCH(ddm))
+ dump_epoch(resource, connection->peer_node_id, expect_epoch);
+ } else
+ drbd_err(connection, "BAD! ConfirmedStable [%p,%p] received with n_writes=%u, expected n_writes=%u!\n",
+ req, req_y, set_size, expect_size);
goto bail;
}
/* Clean up list of requests processed during current epoch. */
- /* this extra list walk restart is paranoia,
- * to catch requests being barrier-acked "unexpectedly".
- * It usually should find the same req again, or some READ preceding it. */
- list_for_each_entry(req, &connection->transfer_log, tl_requests)
- if (req->epoch == expect_epoch) {
- tmp = req;
- break;
- }
- req = list_prepare_entry(tmp, &connection->transfer_log, tl_requests);
- list_for_each_entry_safe_from(req, r, &connection->transfer_log, tl_requests) {
+ list_for_each_entry_from_rcu(req, &resource->transfer_log, tl_requests) {
struct drbd_peer_device *peer_device;
+
if (req->epoch != expect_epoch)
break;
peer_device = conn_peer_device(connection, req->device->vnr);
- _req_mod(req, BARRIER_ACKED, peer_device);
+ req_mod(req, BARRIER_ACKED, peer_device);
+ if (req == req_y)
+ break;
+ }
+ rcu_read_unlock();
+
+ /* urgently flush out peer acks for P_CONFIRM_STABLE */
+ if (req_y) {
+ drbd_flush_peer_acks(resource);
+ } else if (barrier_nr == connection->send.last_sent_epoch_nr) {
+ clear_bit(BARRIER_ACK_PENDING, &connection->flags);
+ wake_up(&resource->barrier_wait);
}
- spin_unlock_irq(&connection->resource->req_lock);
- return;
+ return 0;
bail:
- spin_unlock_irq(&connection->resource->req_lock);
- conn_request_state(connection, NS(conn, C_PROTOCOL_ERROR), CS_HARD);
+ rcu_read_unlock();
+ return -EPROTO;
}
/**
- * _tl_restart() - Walks the transfer log, and applies an action to all requests
- * @connection: DRBD connection to operate on.
+ * __tl_walk() - Walks the transfer log, and applies an action to all requests
+ * @resource: DRBD resource to opterate on
+ * @connection: DRBD connection to operate on
+ * @from_req: If set, the walk starts from the request that this points to
* @what: The action/event to perform with all request objects
*
- * @what might be one of CONNECTION_LOST_WHILE_PENDING, RESEND, FAIL_FROZEN_DISK_IO,
- * RESTART_FROZEN_DISK_IO.
+ * @what might be one of CONNECTION_LOST, CONNECTION_LOST_WHILE_SUSPENDED,
+ * RESEND, CANCEL_SUSPENDED_IO, COMPLETION_RESUMED.
*/
-/* must hold resource->req_lock */
-void _tl_restart(struct drbd_connection *connection, enum drbd_req_event what)
+void __tl_walk(struct drbd_resource *const resource,
+ struct drbd_connection *const connection,
+ struct drbd_request **from_req,
+ const enum drbd_req_event what)
{
struct drbd_peer_device *peer_device;
- struct drbd_request *req, *r;
+ struct drbd_request *req = NULL;
- list_for_each_entry_safe(req, r, &connection->transfer_log, tl_requests) {
- peer_device = conn_peer_device(connection, req->device->vnr);
+ rcu_read_lock();
+ if (from_req)
+ req = READ_ONCE(*from_req);
+ if (!req)
+ req = list_entry_rcu(resource->transfer_log.next, struct drbd_request, tl_requests);
+ smp_rmb(); /* paired with smp_wmb() in set_cache_ptr_if_null() */
+ list_for_each_entry_from_rcu(req, &resource->transfer_log, tl_requests) {
+ /* Skip if the request has already been destroyed. */
+ if (!kref_get_unless_zero(&req->kref))
+ continue;
+
+ peer_device = connection == NULL ? NULL :
+ conn_peer_device(connection, req->device->vnr);
_req_mod(req, what, peer_device);
+ kref_put(&req->kref, drbd_req_destroy);
}
+ rcu_read_unlock();
}
-void tl_restart(struct drbd_connection *connection, enum drbd_req_event what)
+void tl_walk(struct drbd_connection *connection, struct drbd_request **from_req, enum drbd_req_event what)
{
- spin_lock_irq(&connection->resource->req_lock);
- _tl_restart(connection, what);
- spin_unlock_irq(&connection->resource->req_lock);
-}
+ struct drbd_resource *resource = connection->resource;
-/**
- * tl_clear() - Clears all requests and &struct drbd_tl_epoch objects out of the TL
- * @connection: DRBD connection.
- *
- * This is called after the connection to the peer was lost. The storage covered
- * by the requests on the transfer gets marked as our of sync. Called from the
- * receiver thread and the worker thread.
- */
-void tl_clear(struct drbd_connection *connection)
-{
- tl_restart(connection, CONNECTION_LOST_WHILE_PENDING);
+ read_lock_irq(&resource->state_rwlock);
+ __tl_walk(connection->resource, connection, from_req, what);
+ read_unlock_irq(&resource->state_rwlock);
}
/**
* tl_abort_disk_io() - Abort disk I/O for all requests for a certain device in the TL
- * @device: DRBD device.
+ * @device: DRBD device.
*/
void tl_abort_disk_io(struct drbd_device *device)
{
- struct drbd_connection *connection = first_peer_device(device)->connection;
- struct drbd_request *req, *r;
+ struct drbd_resource *resource = device->resource;
+ struct drbd_request *req;
- spin_lock_irq(&connection->resource->req_lock);
- list_for_each_entry_safe(req, r, &connection->transfer_log, tl_requests) {
- if (!(req->rq_state & RQ_LOCAL_PENDING))
+ rcu_read_lock();
+ list_for_each_entry_rcu(req, &resource->transfer_log, tl_requests) {
+ if (!(READ_ONCE(req->local_rq_state) & RQ_LOCAL_PENDING))
continue;
if (req->device != device)
continue;
- _req_mod(req, ABORT_DISK_IO, NULL);
+ /* Skip if the request has already been destroyed. */
+ if (!kref_get_unless_zero(&req->kref))
+ continue;
+
+ req_mod(req, ABORT_DISK_IO, NULL);
+ kref_put(&req->kref, drbd_req_destroy);
}
- spin_unlock_irq(&connection->resource->req_lock);
+ rcu_read_unlock();
}
static int drbd_thread_setup(void *arg)
{
struct drbd_thread *thi = (struct drbd_thread *) arg;
struct drbd_resource *resource = thi->resource;
+ struct drbd_connection *connection = thi->connection;
unsigned long flags;
int retval;
- snprintf(current->comm, sizeof(current->comm), "drbd_%c_%s",
- thi->name[0],
- resource->name);
-
allow_kernel_signal(DRBD_SIGKILL);
allow_kernel_signal(SIGXCPU);
+
+ if (connection)
+ kref_get(&connection->kref);
+ else
+ kref_get(&resource->kref);
restart:
retval = thi->function(thi);
@@ -326,26 +565,33 @@ static int drbd_thread_setup(void *arg)
*/
if (thi->t_state == RESTARTING) {
- drbd_info(resource, "Restarting %s thread\n", thi->name);
+ if (connection)
+ drbd_info(connection, "Restarting %s thread\n", thi->name);
+ else
+ drbd_info(resource, "Restarting %s thread\n", thi->name);
thi->t_state = RUNNING;
spin_unlock_irqrestore(&thi->t_lock, flags);
+ flush_signals(current); /* likely it got a signal to look at t_state... */
goto restart;
}
thi->task = NULL;
thi->t_state = NONE;
smp_mb();
- complete_all(&thi->stop);
- spin_unlock_irqrestore(&thi->t_lock, flags);
- drbd_info(resource, "Terminating %s\n", current->comm);
+ if (connection)
+ drbd_info(connection, "Terminating %s thread\n", thi->name);
+ else
+ drbd_info(resource, "Terminating %s thread\n", thi->name);
- /* Release mod reference taken when thread was started */
+ complete(&thi->stop);
+ spin_unlock_irqrestore(&thi->t_lock, flags);
+
+ if (connection)
+ kref_put(&connection->kref, drbd_destroy_connection);
+ else
+ kref_put(&resource->kref, drbd_destroy_resource);
- if (thi->connection)
- kref_put(&thi->connection->kref, drbd_destroy_connection);
- kref_put(&resource->kref, drbd_destroy_resource);
- module_put(THIS_MODULE);
return retval;
}
@@ -364,6 +610,7 @@ static void drbd_thread_init(struct drbd_resource *resource, struct drbd_thread
int drbd_thread_start(struct drbd_thread *thi)
{
struct drbd_resource *resource = thi->resource;
+ struct drbd_connection *connection = thi->connection;
struct task_struct *nt;
unsigned long flags;
@@ -373,36 +620,29 @@ int drbd_thread_start(struct drbd_thread *thi)
switch (thi->t_state) {
case NONE:
- drbd_info(resource, "Starting %s thread (from %s [%d])\n",
- thi->name, current->comm, current->pid);
-
- /* Get ref on module for thread - this is released when thread exits */
- if (!try_module_get(THIS_MODULE)) {
- drbd_err(resource, "Failed to get module reference in drbd_thread_start\n");
- spin_unlock_irqrestore(&thi->t_lock, flags);
- return false;
- }
-
- kref_get(&resource->kref);
- if (thi->connection)
- kref_get(&thi->connection->kref);
+ if (connection)
+ drbd_info(connection, "Starting %s thread (peer-node-id %d)\n",
+ thi->name, connection->peer_node_id);
+ else
+ drbd_info(resource, "Starting %s thread (node-id %d)\n",
+ thi->name, resource->res_opts.node_id);
init_completion(&thi->stop);
+ D_ASSERT(resource, thi->task == NULL);
thi->reset_cpu_mask = 1;
thi->t_state = RUNNING;
spin_unlock_irqrestore(&thi->t_lock, flags);
flush_signals(current); /* otherw. may get -ERESTARTNOINTR */
nt = kthread_create(drbd_thread_setup, (void *) thi,
- "drbd_%c_%s", thi->name[0], thi->resource->name);
+ "drbd_%c_%s", thi->name[0], resource->name);
if (IS_ERR(nt)) {
- drbd_err(resource, "Couldn't start thread\n");
+ if (connection)
+ drbd_err(connection, "Couldn't start thread: %ld\n", PTR_ERR(nt));
+ else
+ drbd_err(resource, "Couldn't start thread: %ld\n", PTR_ERR(nt));
- if (thi->connection)
- kref_put(&thi->connection->kref, drbd_destroy_connection);
- kref_put(&resource->kref, drbd_destroy_resource);
- module_put(THIS_MODULE);
return false;
}
spin_lock_irqsave(&thi->t_lock, flags);
@@ -413,8 +653,10 @@ int drbd_thread_start(struct drbd_thread *thi)
break;
case EXITING:
thi->t_state = RESTARTING;
- drbd_info(resource, "Restarting %s thread (from %s [%d])\n",
- thi->name, current->comm, current->pid);
+ if (connection)
+ drbd_info(connection, "Restarting %s thread\n", thi->name);
+ else
+ drbd_info(resource, "Restarting %s thread\n", thi->name);
fallthrough;
case RUNNING:
case RESTARTING:
@@ -443,6 +685,12 @@ void _drbd_thread_stop(struct drbd_thread *thi, int restart, int wait)
return;
}
+ if (thi->t_state == EXITING && ns == RESTARTING) {
+ /* Do not abort a stop request, otherwise a waiter might never wake up */
+ spin_unlock_irqrestore(&thi->t_lock, flags);
+ return;
+ }
+
if (thi->t_state != ns) {
if (thi->task == NULL) {
spin_unlock_irqrestore(&thi->t_lock, flags);
@@ -455,7 +703,6 @@ void _drbd_thread_stop(struct drbd_thread *thi, int restart, int wait)
if (thi->task != current)
send_sig(DRBD_SIGKILL, thi->task, 1);
}
-
spin_unlock_irqrestore(&thi->t_lock, flags);
if (wait)
@@ -473,8 +720,7 @@ static void drbd_calc_cpu_mask(cpumask_var_t *cpu_mask)
{
unsigned int *resources_per_cpu, min_index = ~0;
- resources_per_cpu = kcalloc(nr_cpu_ids, sizeof(*resources_per_cpu),
- GFP_KERNEL);
+ resources_per_cpu = kzalloc(nr_cpu_ids * sizeof(*resources_per_cpu), GFP_KERNEL);
if (resources_per_cpu) {
struct drbd_resource *resource;
unsigned int cpu, min = ~0;
@@ -521,6 +767,46 @@ void drbd_thread_current_set_cpu(struct drbd_thread *thi)
#define drbd_calc_cpu_mask(A) ({})
#endif
+static bool drbd_all_neighbor_secondary(struct drbd_device *device, u64 *authoritative_ptr)
+{
+ struct drbd_peer_device *peer_device;
+ bool all_secondary = true;
+ u64 authoritative = 0;
+ int id;
+
+ rcu_read_lock();
+ for_each_peer_device_rcu(peer_device, device) {
+ if (peer_device->repl_state[NOW] >= L_ESTABLISHED &&
+ peer_device->connection->peer_role[NOW] == R_PRIMARY) {
+ all_secondary = false;
+ id = peer_device->node_id;
+ authoritative |= NODE_MASK(id);
+ }
+ }
+ rcu_read_unlock();
+ if (authoritative_ptr)
+ *authoritative_ptr = authoritative;
+ return all_secondary;
+}
+
+/* This function is supposed to have the same semantics as calc_device_stable() in drbd_state.c
+ A primary is stable since it is authoritative.
+ Unstable are neighbors of a primary and resync target nodes.
+ Nodes further away from a primary are stable! */
+bool drbd_device_stable(struct drbd_device *device, u64 *authoritative_ptr)
+{
+ struct drbd_resource *resource = device->resource;
+ bool device_stable = true;
+
+ if (resource->role[NOW] == R_PRIMARY)
+ return true;
+
+ if (!drbd_all_neighbor_secondary(device, authoritative_ptr))
+ return false;
+
+ return device_stable;
+}
+
/*
* drbd_header_size - size of a packet header
*
@@ -532,177 +818,370 @@ unsigned int drbd_header_size(struct drbd_connection *connection)
{
if (connection->agreed_pro_version >= 100) {
BUILD_BUG_ON(!IS_ALIGNED(sizeof(struct p_header100), 8));
- return sizeof(struct p_header100);
+ return sizeof(struct p_header100); /* 16 */
} else {
BUILD_BUG_ON(sizeof(struct p_header80) !=
sizeof(struct p_header95));
BUILD_BUG_ON(!IS_ALIGNED(sizeof(struct p_header80), 8));
- return sizeof(struct p_header80);
+ return sizeof(struct p_header80); /* 8 */
}
}
-static unsigned int prepare_header80(struct p_header80 *h, enum drbd_packet cmd, int size)
+static void prepare_header80(struct p_header80 *h, enum drbd_packet cmd, int size)
{
h->magic = cpu_to_be32(DRBD_MAGIC);
h->command = cpu_to_be16(cmd);
- h->length = cpu_to_be16(size);
- return sizeof(struct p_header80);
+ h->length = cpu_to_be16(size - sizeof(struct p_header80));
}
-static unsigned int prepare_header95(struct p_header95 *h, enum drbd_packet cmd, int size)
+static void prepare_header95(struct p_header95 *h, enum drbd_packet cmd, int size)
{
h->magic = cpu_to_be16(DRBD_MAGIC_BIG);
h->command = cpu_to_be16(cmd);
- h->length = cpu_to_be32(size);
- return sizeof(struct p_header95);
+ h->length = cpu_to_be32(size - sizeof(struct p_header95));
}
-static unsigned int prepare_header100(struct p_header100 *h, enum drbd_packet cmd,
+static void prepare_header100(struct p_header100 *h, enum drbd_packet cmd,
int size, int vnr)
{
h->magic = cpu_to_be32(DRBD_MAGIC_100);
h->volume = cpu_to_be16(vnr);
h->command = cpu_to_be16(cmd);
- h->length = cpu_to_be32(size);
+ h->length = cpu_to_be32(size - sizeof(struct p_header100));
h->pad = 0;
- return sizeof(struct p_header100);
}
-static unsigned int prepare_header(struct drbd_connection *connection, int vnr,
- void *buffer, enum drbd_packet cmd, int size)
+static void prepare_header(struct drbd_connection *connection, int vnr,
+ void *buffer, enum drbd_packet cmd, int size)
{
if (connection->agreed_pro_version >= 100)
- return prepare_header100(buffer, cmd, size, vnr);
+ prepare_header100(buffer, cmd, size, vnr);
else if (connection->agreed_pro_version >= 95 &&
size > DRBD_MAX_SIZE_H80_PACKET)
- return prepare_header95(buffer, cmd, size);
+ prepare_header95(buffer, cmd, size);
else
- return prepare_header80(buffer, cmd, size);
+ prepare_header80(buffer, cmd, size);
+}
+
+static void new_or_recycle_send_buffer_page(struct drbd_send_buffer *sbuf)
+{
+ while (1) {
+ struct page *page;
+ int count = page_count(sbuf->page);
+
+ BUG_ON(count == 0);
+ if (count == 1)
+ goto have_page;
+
+ page = alloc_page(GFP_NOIO | __GFP_NORETRY | __GFP_NOWARN);
+ if (page) {
+ put_page(sbuf->page);
+ sbuf->page = page;
+ goto have_page;
+ }
+
+ schedule_timeout_uninterruptible(HZ / 10);
+ }
+have_page:
+ sbuf->unsent =
+ sbuf->pos = page_address(sbuf->page);
+}
+
+static char * __must_check alloc_send_buffer(struct drbd_connection *connection, int size,
+ enum drbd_stream drbd_stream)
+{
+ struct drbd_send_buffer *sbuf = &connection->send_buffer[drbd_stream];
+ char *page_start = page_address(sbuf->page);
+ int err;
+
+ if (sbuf->pos - page_start + size > PAGE_SIZE) {
+ err = flush_send_buffer(connection, drbd_stream);
+ if (err)
+ return ERR_PTR(err);
+ new_or_recycle_send_buffer_page(sbuf);
+ }
+
+ sbuf->allocated_size = size;
+ sbuf->additional_size = 0;
+
+ return sbuf->pos;
+}
+
+/* If we called alloc_send_buffer(), possibly indirectly via __conn_prepare_command(),
+ * but then decide that we actually don't want to use it.
+ */
+static void cancel_send_buffer(struct drbd_connection *connection,
+ enum drbd_stream drbd_stream)
+{
+ connection->send_buffer[drbd_stream].allocated_size = 0;
+}
+
+/* Only used the shrink the previously allocated size. */
+static void resize_prepared_command(struct drbd_connection *connection,
+ enum drbd_stream drbd_stream,
+ int size)
+{
+ connection->send_buffer[drbd_stream].allocated_size =
+ size + drbd_header_size(connection);
}
-static void *__conn_prepare_command(struct drbd_connection *connection,
- struct drbd_socket *sock)
+static void additional_size_command(struct drbd_connection *connection,
+ enum drbd_stream drbd_stream,
+ int additional_size)
{
- if (!sock->socket)
+ connection->send_buffer[drbd_stream].additional_size = additional_size;
+}
+
+void *__conn_prepare_command(struct drbd_connection *connection, int size,
+ enum drbd_stream drbd_stream)
+{
+ struct drbd_transport *transport = &connection->transport;
+ int header_size;
+ void *p;
+
+ if (connection->cstate[NOW] < C_CONNECTING)
+ return NULL;
+
+ if (!transport->class->ops.stream_ok(transport, drbd_stream))
+ return NULL;
+
+ header_size = drbd_header_size(connection);
+ p = alloc_send_buffer(connection, header_size + size, drbd_stream) + header_size;
+ if (IS_ERR(p))
return NULL;
- return sock->sbuf + drbd_header_size(connection);
+ return p;
}
-void *conn_prepare_command(struct drbd_connection *connection, struct drbd_socket *sock)
+/**
+ * conn_prepare_command() - Allocate a send buffer for a packet/command
+ * @connection: the connections the packet will be sent through
+ * @size: number of bytes to allocate
+ * @drbd_stream: DATA_STREAM or CONTROL_STREAM
+ *
+ * This allocates a buffer with capacity to hold the header, and
+ * the requested size. Upon success is return a pointer that points
+ * to the first byte behind the header. The caller is expected to
+ * call xxx_send_command() soon.
+ */
+void *conn_prepare_command(struct drbd_connection *connection, int size,
+ enum drbd_stream drbd_stream)
{
void *p;
- mutex_lock(&sock->mutex);
- p = __conn_prepare_command(connection, sock);
+ mutex_lock(&connection->mutex[drbd_stream]);
+ p = __conn_prepare_command(connection, size, drbd_stream);
if (!p)
- mutex_unlock(&sock->mutex);
+ mutex_unlock(&connection->mutex[drbd_stream]);
return p;
}
-void *drbd_prepare_command(struct drbd_peer_device *peer_device, struct drbd_socket *sock)
+/**
+ * drbd_prepare_command() - Allocate a send buffer for a packet/command
+ * @peer_device: the DRBD peer device the packet will be sent to
+ * @size: number of bytes to allocate
+ * @drbd_stream: DATA_STREAM or CONTROL_STREAM
+ *
+ * This allocates a buffer with capacity to hold the header, and
+ * the requested size. Upon success is return a pointer that points
+ * to the first byte behind the header. The caller is expected to
+ * call xxx_send_command() soon.
+ */
+void *drbd_prepare_command(struct drbd_peer_device *peer_device, int size, enum drbd_stream drbd_stream)
{
- return conn_prepare_command(peer_device->connection, sock);
+ return conn_prepare_command(peer_device->connection, size, drbd_stream);
}
-static int __send_command(struct drbd_connection *connection, int vnr,
- struct drbd_socket *sock, enum drbd_packet cmd,
- unsigned int header_size, void *data,
- unsigned int size)
+static int flush_send_buffer(struct drbd_connection *connection, enum drbd_stream drbd_stream)
{
- int msg_flags;
+ struct drbd_send_buffer *sbuf = &connection->send_buffer[drbd_stream];
+ struct drbd_transport *transport = &connection->transport;
+ struct drbd_transport_ops *tr_ops = &transport->class->ops;
+ unsigned int flags, offset, size;
int err;
- /*
- * Called with @data == NULL and the size of the data blocks in @size
- * for commands that send data blocks. For those commands, omit the
- * MSG_MORE flag: this will increase the likelihood that data blocks
- * which are page aligned on the sender will end up page aligned on the
- * receiver.
- */
- msg_flags = data ? MSG_MORE : 0;
-
- header_size += prepare_header(connection, vnr, sock->sbuf, cmd,
- header_size + size);
- err = drbd_send_all(connection, sock->socket, sock->sbuf, header_size,
- msg_flags);
- if (data && !err)
- err = drbd_send_all(connection, sock->socket, data, size, 0);
- /* DRBD protocol "pings" are latency critical.
- * This is supposed to trigger tcp_push_pending_frames() */
- if (!err && (cmd == P_PING || cmd == P_PING_ACK))
- tcp_sock_set_nodelay(sock->socket->sk);
+ size = sbuf->pos - sbuf->unsent + sbuf->allocated_size;
+ if (size == 0)
+ return 0;
+
+ if (drbd_stream == CONTROL_STREAM) {
+ connection->ctl_packets++;
+ if (check_add_overflow(connection->ctl_bytes, size, &connection->ctl_bytes)) {
+ connection->ctl_bytes = size;
+ connection->ctl_packets = 1;
+ }
+ }
+
+ if (drbd_stream == DATA_STREAM) {
+ rcu_read_lock();
+ connection->transport.ko_count = rcu_dereference(connection->transport.net_conf)->ko_count;
+ rcu_read_unlock();
+ }
+
+ flags = (connection->cstate[NOW] < C_CONNECTING ? MSG_DONTWAIT : 0) |
+ (sbuf->additional_size ? MSG_MORE : 0);
+ offset = sbuf->unsent - (char *)page_address(sbuf->page);
+ err = tr_ops->send_page(transport, drbd_stream, sbuf->page, offset, size, flags);
+ if (err) {
+ change_cstate(connection, C_NETWORK_FAILURE, CS_HARD);
+ } else {
+ sbuf->unsent =
+ sbuf->pos += sbuf->allocated_size; /* send buffer submitted! */
+ }
+
+ sbuf->allocated_size = 0;
return err;
}
-static int __conn_send_command(struct drbd_connection *connection, struct drbd_socket *sock,
- enum drbd_packet cmd, unsigned int header_size,
- void *data, unsigned int size)
+/*
+ * SFLAG_FLUSH makes sure the packet (and everything queued in front
+ * of it) gets sent immediately independently if it is currently
+ * corked.
+ *
+ * This is used for P_PING, P_PING_ACK, P_TWOPC_PREPARE, P_TWOPC_ABORT,
+ * P_TWOPC_YES, P_TWOPC_NO, P_TWOPC_RETRY and P_TWOPC_COMMIT.
+ *
+ * This quirk is necessary because it is corked while the worker
+ * thread processes work items. When it stops processing items, it
+ * uncorks. That works perfectly to coalesce ack packets etc..
+ * A work item doing two-phase commits needs to override that behavior.
+ */
+#define SFLAG_FLUSH 0x10
+#define DRBD_STREAM_FLAGS (SFLAG_FLUSH)
+
+static inline enum drbd_stream extract_stream(int stream_and_flags)
{
- return __send_command(connection, 0, sock, cmd, header_size, data, size);
+ return stream_and_flags & ~DRBD_STREAM_FLAGS;
}
-int conn_send_command(struct drbd_connection *connection, struct drbd_socket *sock,
- enum drbd_packet cmd, unsigned int header_size,
- void *data, unsigned int size)
+int __send_command(struct drbd_connection *connection, int vnr,
+ enum drbd_packet cmd, int stream_and_flags)
{
+ enum drbd_stream drbd_stream = extract_stream(stream_and_flags);
+ struct drbd_send_buffer *sbuf = &connection->send_buffer[drbd_stream];
+ struct drbd_transport *transport = &connection->transport;
+ struct drbd_transport_ops *tr_ops = &transport->class->ops;
+ /* CORKED + drbd_stream is either DATA_CORKED or CONTROL_CORKED */
+ bool corked = test_bit(CORKED + drbd_stream, &connection->flags);
+ bool flush = stream_and_flags & SFLAG_FLUSH;
int err;
- err = __conn_send_command(connection, sock, cmd, header_size, data, size);
- mutex_unlock(&sock->mutex);
+ if (connection->cstate[NOW] < C_CONNECTING)
+ return -EIO;
+ prepare_header(connection, vnr, sbuf->pos, cmd,
+ sbuf->allocated_size + sbuf->additional_size);
+
+ if (corked && !flush) {
+ sbuf->pos += sbuf->allocated_size;
+ sbuf->allocated_size = 0;
+ err = 0;
+ } else {
+ err = flush_send_buffer(connection, drbd_stream);
+
+ /* DRBD protocol "pings" are latency critical.
+ * This is supposed to trigger tcp_push_pending_frames() */
+ if (!err && flush)
+ tr_ops->hint(transport, drbd_stream, NODELAY);
+
+ }
+
return err;
}
-int drbd_send_command(struct drbd_peer_device *peer_device, struct drbd_socket *sock,
- enum drbd_packet cmd, unsigned int header_size,
- void *data, unsigned int size)
+void drbd_cork(struct drbd_connection *connection, enum drbd_stream stream)
{
+ struct drbd_transport *transport = &connection->transport;
+ struct drbd_transport_ops *tr_ops = &transport->class->ops;
+
+ mutex_lock(&connection->mutex[stream]);
+ set_bit(CORKED + stream, &connection->flags);
+ /* only call into transport, if we expect it to work */
+ if (connection->cstate[NOW] >= C_CONNECTING)
+ tr_ops->hint(transport, stream, CORK);
+ mutex_unlock(&connection->mutex[stream]);
+}
+
+int drbd_uncork(struct drbd_connection *connection, enum drbd_stream stream)
+{
+ struct drbd_transport *transport = &connection->transport;
+ struct drbd_transport_ops *tr_ops = &transport->class->ops;
int err;
- err = __send_command(peer_device->connection, peer_device->device->vnr,
- sock, cmd, header_size, data, size);
- mutex_unlock(&sock->mutex);
+ mutex_lock(&connection->mutex[stream]);
+ err = flush_send_buffer(connection, stream);
+ if (!err) {
+ clear_bit(CORKED + stream, &connection->flags);
+ /* only call into transport, if we expect it to work */
+ if (connection->cstate[NOW] >= C_CONNECTING)
+ tr_ops->hint(transport, stream, UNCORK);
+ }
+ mutex_unlock(&connection->mutex[stream]);
return err;
}
-int drbd_send_ping(struct drbd_connection *connection)
+int send_command(struct drbd_connection *connection, int vnr,
+ enum drbd_packet cmd, int stream_and_flags)
{
- struct drbd_socket *sock;
+ enum drbd_stream drbd_stream = extract_stream(stream_and_flags);
+ int err;
- sock = &connection->meta;
- if (!conn_prepare_command(connection, sock))
- return -EIO;
- return conn_send_command(connection, sock, P_PING, 0, NULL, 0);
+ err = __send_command(connection, vnr, cmd, stream_and_flags);
+ mutex_unlock(&connection->mutex[drbd_stream]);
+ return err;
}
-int drbd_send_ping_ack(struct drbd_connection *connection)
+int drbd_send_command(struct drbd_peer_device *peer_device,
+ enum drbd_packet cmd, enum drbd_stream drbd_stream)
{
- struct drbd_socket *sock;
+ return send_command(peer_device->connection, peer_device->device->vnr,
+ cmd, drbd_stream);
+}
- sock = &connection->meta;
- if (!conn_prepare_command(connection, sock))
+int drbd_send_ping(struct drbd_connection *connection)
+{
+ if (!conn_prepare_command(connection, 0, CONTROL_STREAM))
return -EIO;
- return conn_send_command(connection, sock, P_PING_ACK, 0, NULL, 0);
+ return send_command(connection, -1, P_PING, CONTROL_STREAM | SFLAG_FLUSH);
}
-int drbd_send_sync_param(struct drbd_peer_device *peer_device)
+void drbd_send_ping_ack_wf(struct work_struct *ws)
{
- struct drbd_socket *sock;
- struct p_rs_param_95 *p;
- int size;
- const int apv = peer_device->connection->agreed_pro_version;
- enum drbd_packet cmd;
- struct net_conf *nc;
- struct disk_conf *dc;
-
- sock = &peer_device->connection->data;
- p = drbd_prepare_command(peer_device, sock);
- if (!p)
- return -EIO;
+ struct drbd_connection *connection =
+ container_of(ws, struct drbd_connection, send_ping_ack_work);
+ int err;
- rcu_read_lock();
- nc = rcu_dereference(peer_device->connection->net_conf);
+ err = conn_prepare_command(connection, 0, CONTROL_STREAM) ? 0 : -EIO;
+ if (!err)
+ err = send_command(connection, -1, P_PING_ACK, CONTROL_STREAM | SFLAG_FLUSH);
+ if (err)
+ change_cstate(connection, C_NETWORK_FAILURE, CS_HARD);
+}
+
+int drbd_send_peer_ack(struct drbd_connection *connection, u64 mask, u64 dagtag_sector)
+{
+ struct p_peer_ack *p;
+
+ p = conn_prepare_command(connection, sizeof(*p), CONTROL_STREAM);
+ if (!p)
+ return -EIO;
+ p->mask = cpu_to_be64(mask);
+ p->dagtag = cpu_to_be64(dagtag_sector);
+
+ return send_command(connection, -1, P_PEER_ACK, CONTROL_STREAM);
+}
+
+int drbd_send_sync_param(struct drbd_peer_device *peer_device)
+{
+ struct p_rs_param_95 *p;
+ int size;
+ const int apv = peer_device->connection->agreed_pro_version;
+ enum drbd_packet cmd;
+ struct net_conf *nc;
+ struct peer_device_conf *pdc;
+
+ rcu_read_lock();
+ nc = rcu_dereference(peer_device->connection->transport.net_conf);
size = apv <= 87 ? sizeof(struct p_rs_param)
: apv == 88 ? sizeof(struct p_rs_param)
@@ -711,18 +1190,30 @@ int drbd_send_sync_param(struct drbd_peer_device *peer_device)
: /* apv >= 95 */ sizeof(struct p_rs_param_95);
cmd = apv >= 89 ? P_SYNC_PARAM89 : P_SYNC_PARAM;
+ rcu_read_unlock();
+
+ p = drbd_prepare_command(peer_device, size, DATA_STREAM);
+ if (!p)
+ return -EIO;
/* initialize verify_alg and csums_alg */
- BUILD_BUG_ON(sizeof(p->algs) != 2 * SHARED_SECRET_MAX);
- memset(&p->algs, 0, sizeof(p->algs));
+ memset(p->verify_alg, 0, sizeof(p->verify_alg));
+ memset(p->csums_alg, 0, sizeof(p->csums_alg));
+
+ rcu_read_lock();
+ nc = rcu_dereference(peer_device->connection->transport.net_conf);
if (get_ldev(peer_device->device)) {
- dc = rcu_dereference(peer_device->device->ldev->disk_conf);
- p->resync_rate = cpu_to_be32(dc->resync_rate);
- p->c_plan_ahead = cpu_to_be32(dc->c_plan_ahead);
- p->c_delay_target = cpu_to_be32(dc->c_delay_target);
- p->c_fill_target = cpu_to_be32(dc->c_fill_target);
- p->c_max_rate = cpu_to_be32(dc->c_max_rate);
+ pdc = rcu_dereference(peer_device->conf);
+ /* These values will be ignored by peers running DRBD 9.2+, but
+ * we have to send something, so send the real values. We
+ * cannot omit the entire packet because we must verify that
+ * the algorithms match. */
+ p->resync_rate = cpu_to_be32(pdc->resync_rate);
+ p->c_plan_ahead = cpu_to_be32(pdc->c_plan_ahead);
+ p->c_delay_target = cpu_to_be32(pdc->c_delay_target);
+ p->c_fill_target = cpu_to_be32(pdc->c_fill_target);
+ p->c_max_rate = cpu_to_be32(pdc->c_max_rate);
put_ldev(peer_device->device);
} else {
p->resync_rate = cpu_to_be32(DRBD_RESYNC_RATE_DEF);
@@ -738,36 +1229,37 @@ int drbd_send_sync_param(struct drbd_peer_device *peer_device)
strscpy(p->csums_alg, nc->csums_alg);
rcu_read_unlock();
- return drbd_send_command(peer_device, sock, cmd, size, NULL, 0);
+ return drbd_send_command(peer_device, cmd, DATA_STREAM);
}
int __drbd_send_protocol(struct drbd_connection *connection, enum drbd_packet cmd)
{
- struct drbd_socket *sock;
struct p_protocol *p;
struct net_conf *nc;
size_t integrity_alg_len;
int size, cf;
- sock = &connection->data;
- p = __conn_prepare_command(connection, sock);
- if (!p)
- return -EIO;
-
- rcu_read_lock();
- nc = rcu_dereference(connection->net_conf);
-
- if (nc->tentative && connection->agreed_pro_version < 92) {
- rcu_read_unlock();
+ if (test_bit(CONN_DRY_RUN, &connection->flags) && connection->agreed_pro_version < 92) {
+ clear_bit(CONN_DRY_RUN, &connection->flags);
drbd_err(connection, "--dry-run is not supported by peer");
return -EOPNOTSUPP;
}
size = sizeof(*p);
+ rcu_read_lock();
+ nc = rcu_dereference(connection->transport.net_conf);
if (connection->agreed_pro_version >= 87) {
integrity_alg_len = strlen(nc->integrity_alg) + 1;
size += integrity_alg_len;
}
+ rcu_read_unlock();
+
+ p = __conn_prepare_command(connection, size, DATA_STREAM);
+ if (!p)
+ return -EIO;
+
+ rcu_read_lock();
+ nc = rcu_dereference(connection->transport.net_conf);
p->protocol = cpu_to_be32(nc->wire_protocol);
p->after_sb_0p = cpu_to_be32(nc->after_sb_0p);
@@ -775,9 +1267,9 @@ int __drbd_send_protocol(struct drbd_connection *connection, enum drbd_packet cm
p->after_sb_2p = cpu_to_be32(nc->after_sb_2p);
p->two_primaries = cpu_to_be32(nc->two_primaries);
cf = 0;
- if (nc->discard_my_data)
+ if (test_bit(CONN_DISCARD_MY_DATA, &connection->flags))
cf |= CF_DISCARD_MY_DATA;
- if (nc->tentative)
+ if (test_bit(CONN_DRY_RUN, &connection->flags))
cf |= CF_DRY_RUN;
p->conn_flags = cpu_to_be32(cf);
@@ -785,133 +1277,301 @@ int __drbd_send_protocol(struct drbd_connection *connection, enum drbd_packet cm
strscpy(p->integrity_alg, nc->integrity_alg, integrity_alg_len);
rcu_read_unlock();
- return __conn_send_command(connection, sock, cmd, size, NULL, 0);
-}
-
-int drbd_send_protocol(struct drbd_connection *connection)
-{
- int err;
-
- mutex_lock(&connection->data.mutex);
- err = __drbd_send_protocol(connection, P_PROTOCOL);
- mutex_unlock(&connection->data.mutex);
-
- return err;
+ return __send_command(connection, -1, cmd, DATA_STREAM);
}
static int _drbd_send_uuids(struct drbd_peer_device *peer_device, u64 uuid_flags)
{
struct drbd_device *device = peer_device->device;
- struct drbd_socket *sock;
struct p_uuids *p;
int i;
if (!get_ldev_if_state(device, D_NEGOTIATING))
return 0;
- sock = &peer_device->connection->data;
- p = drbd_prepare_command(peer_device, sock);
+ p = drbd_prepare_command(peer_device, sizeof(*p), DATA_STREAM);
if (!p) {
put_ldev(device);
return -EIO;
}
+
spin_lock_irq(&device->ldev->md.uuid_lock);
- for (i = UI_CURRENT; i < UI_SIZE; i++)
- p->uuid[i] = cpu_to_be64(device->ldev->md.uuid[i]);
+ p->current_uuid = cpu_to_be64(drbd_current_uuid(device));
+ p->bitmap_uuid = cpu_to_be64(drbd_bitmap_uuid(peer_device));
+ for (i = 0; i < ARRAY_SIZE(p->history_uuids); i++)
+ p->history_uuids[i] = cpu_to_be64(drbd_history_uuid(device, i));
spin_unlock_irq(&device->ldev->md.uuid_lock);
- device->comm_bm_set = drbd_bm_total_weight(device);
- p->uuid[UI_SIZE] = cpu_to_be64(device->comm_bm_set);
+ peer_device->comm_bm_set = drbd_bm_total_weight(peer_device);
+ p->dirty_bits = cpu_to_be64(peer_device->comm_bm_set);
+
+ if (test_bit(DISCARD_MY_DATA, &peer_device->flags))
+ uuid_flags |= UUID_FLAG_DISCARD_MY_DATA;
+ if (test_bit(CRASHED_PRIMARY, &device->flags))
+ uuid_flags |= UUID_FLAG_CRASHED_PRIMARY;
+ if (!drbd_md_test_flag(device->ldev, MDF_CONSISTENT))
+ uuid_flags |= UUID_FLAG_INCONSISTENT;
+
+ /* Silently mask out any "too recent" flags,
+ * we cannot communicate those in old DRBD
+ * protocol versions. */
+ uuid_flags &= UUID_FLAG_MASK_COMPAT_84;
+
+ peer_device->comm_uuid_flags = uuid_flags;
+ p->uuid_flags = cpu_to_be64(uuid_flags);
+
+ put_ldev(device);
+
+ return drbd_send_command(peer_device, P_UUIDS, DATA_STREAM);
+}
+
+static u64 __bitmap_uuid(struct drbd_device *device, int node_id)
+{
+ struct drbd_peer_device *peer_device;
+ struct drbd_peer_md *peer_md = device->ldev->md.peers;
+ u64 bitmap_uuid = peer_md[node_id].bitmap_uuid;
+
+ /* Sending a bitmap_uuid of 0 means that we are in sync with that peer.
+ The recipient of this message might use this assumption to throw away it's
+ bitmap to that peer.
+
+ Send -1 instead if we are (resync target from that peer) not at the same
+ current uuid.
+ This corner case is relevant if we finish resync from an UpToDate peer first,
+ and the second resync (which was paused first) is from an Outdated node.
+ And that second resync gets canceled by the resync target due to the first
+ resync finished successfully.
+
+ Exceptions to the above are when the peer's UUID is not known yet
+ */
+
rcu_read_lock();
- uuid_flags |= rcu_dereference(peer_device->connection->net_conf)->discard_my_data ? 1 : 0;
+ peer_device = peer_device_by_node_id(device, node_id);
+ if (peer_device) {
+ enum drbd_repl_state repl_state = peer_device->repl_state[NOW];
+ if (bitmap_uuid == 0 &&
+ (repl_state == L_SYNC_TARGET || repl_state == L_PAUSED_SYNC_T) &&
+ peer_device->current_uuid != 0 &&
+ (peer_device->current_uuid & ~UUID_PRIMARY) !=
+ (drbd_current_uuid(device) & ~UUID_PRIMARY))
+ bitmap_uuid = -1;
+ }
rcu_read_unlock();
- uuid_flags |= test_bit(CRASHED_PRIMARY, &device->flags) ? 2 : 0;
- uuid_flags |= device->new_state_tmp.disk == D_INCONSISTENT ? 4 : 0;
- p->uuid[UI_FLAGS] = cpu_to_be64(uuid_flags);
- put_ldev(device);
- return drbd_send_command(peer_device, sock, P_UUIDS, sizeof(*p), NULL, 0);
+ return bitmap_uuid;
}
-int drbd_send_uuids(struct drbd_peer_device *peer_device)
+u64 drbd_collect_local_uuid_flags(struct drbd_peer_device *peer_device, u64 *authoritative_mask)
{
- return _drbd_send_uuids(peer_device, 0);
+ struct drbd_device *device = peer_device->device;
+ u64 uuid_flags = 0;
+
+ if (test_bit(DISCARD_MY_DATA, &peer_device->flags))
+ uuid_flags |= UUID_FLAG_DISCARD_MY_DATA;
+ if (test_bit(CRASHED_PRIMARY, &device->flags))
+ uuid_flags |= UUID_FLAG_CRASHED_PRIMARY;
+ if (!drbd_md_test_flag(device->ldev, MDF_CONSISTENT))
+ uuid_flags |= UUID_FLAG_INCONSISTENT;
+ if (test_bit(RECONNECT, &peer_device->connection->flags))
+ uuid_flags |= UUID_FLAG_RECONNECT;
+ if (test_bit(PRIMARY_LOST_QUORUM, &device->flags))
+ uuid_flags |= UUID_FLAG_PRIMARY_LOST_QUORUM;
+ if (drbd_device_stable(device, authoritative_mask))
+ uuid_flags |= UUID_FLAG_STABLE;
+
+ return uuid_flags;
+}
+
+/* sets UUID_FLAG_SYNC_TARGET on uuid_flags as appropriate (may be NULL) */
+u64 drbd_resolved_uuid(struct drbd_peer_device *peer_device_base, u64 *uuid_flags)
+{
+ struct drbd_device *device = peer_device_base->device;
+ struct drbd_peer_device *peer_device;
+ u64 uuid = drbd_current_uuid(device);
+
+ rcu_read_lock();
+ for_each_peer_device_rcu(peer_device, device) {
+ if (peer_device->node_id == peer_device_base->node_id)
+ continue;
+ if (peer_device->repl_state[NOW] == L_SYNC_TARGET) {
+ uuid = peer_device->current_uuid;
+ if (uuid_flags)
+ *uuid_flags |= UUID_FLAG_SYNC_TARGET;
+ break;
+ }
+ }
+ rcu_read_unlock();
+
+ return uuid;
+}
+
+static int _drbd_send_uuids110(struct drbd_peer_device *peer_device, u64 uuid_flags, u64 node_mask)
+{
+ struct drbd_device *device = peer_device->device;
+ const int my_node_id = device->resource->res_opts.node_id;
+ struct drbd_peer_md *peer_md;
+ struct p_uuids110 *p;
+ bool sent_one_unallocated;
+ int i, pos = 0;
+ u64 local_uuid_flags = 0, authoritative_mask, bitmap_uuids_mask = 0;
+ int p_size = sizeof(*p);
+
+ if (!get_ldev_if_state(device, D_NEGOTIATING))
+ return drbd_send_current_uuid(peer_device, device->exposed_data_uuid,
+ drbd_weak_nodes_device(device));
+
+ peer_md = device->ldev->md.peers;
+
+ p_size += (DRBD_PEERS_MAX + HISTORY_UUIDS) * sizeof(p->other_uuids[0]);
+ p = drbd_prepare_command(peer_device, p_size, DATA_STREAM);
+ if (!p) {
+ put_ldev(device);
+ return -EIO;
+ }
+
+ spin_lock_irq(&device->ldev->md.uuid_lock);
+ peer_device->comm_current_uuid = drbd_resolved_uuid(peer_device, &local_uuid_flags);
+ p->current_uuid = cpu_to_be64(peer_device->comm_current_uuid);
+
+ sent_one_unallocated = peer_device->connection->agreed_pro_version < 116;
+ for (i = 0; i < DRBD_NODE_ID_MAX; i++) {
+ u64 val = __bitmap_uuid(device, i);
+ bool send_this = peer_md[i].flags & (MDF_HAVE_BITMAP | MDF_NODE_EXISTS);
+ if (!send_this && !sent_one_unallocated &&
+ i != my_node_id && i != peer_device->node_id && val) {
+ send_this = true;
+ sent_one_unallocated = true;
+ uuid_flags |= (u64)i << UUID_FLAG_UNALLOC_SHIFT;
+ uuid_flags |= UUID_FLAG_HAS_UNALLOC;
+ }
+ if (send_this) {
+ bitmap_uuids_mask |= NODE_MASK(i);
+ p->other_uuids[pos++] = cpu_to_be64(val);
+ }
+ }
+ peer_device->comm_bitmap_uuid = drbd_bitmap_uuid(peer_device);
+
+ for (i = 0; i < HISTORY_UUIDS; i++)
+ p->other_uuids[pos++] = cpu_to_be64(drbd_history_uuid(device, i));
+ spin_unlock_irq(&device->ldev->md.uuid_lock);
+
+ p->bitmap_uuids_mask = cpu_to_be64(bitmap_uuids_mask);
+
+ peer_device->comm_bm_set = drbd_bm_total_weight(peer_device);
+ p->dirty_bits = cpu_to_be64(peer_device->comm_bm_set);
+ local_uuid_flags |= drbd_collect_local_uuid_flags(peer_device, &authoritative_mask);
+ peer_device->comm_uuid_flags = local_uuid_flags;
+ uuid_flags |= local_uuid_flags;
+ if (uuid_flags & UUID_FLAG_STABLE) {
+ p->node_mask = cpu_to_be64(node_mask);
+ } else {
+ D_ASSERT(peer_device, node_mask == 0);
+ p->node_mask = cpu_to_be64(authoritative_mask);
+ }
+
+ p->uuid_flags = cpu_to_be64(uuid_flags);
+
+ put_ldev(device);
+
+ p_size = sizeof(*p) +
+ (hweight64(bitmap_uuids_mask) + HISTORY_UUIDS) * sizeof(p->other_uuids[0]);
+ resize_prepared_command(peer_device->connection, DATA_STREAM, p_size);
+ return drbd_send_command(peer_device, P_UUIDS110, DATA_STREAM);
}
-int drbd_send_uuids_skip_initial_sync(struct drbd_peer_device *peer_device)
+int drbd_send_uuids(struct drbd_peer_device *peer_device, u64 uuid_flags, u64 node_mask)
{
- return _drbd_send_uuids(peer_device, 8);
+ if (peer_device->connection->agreed_pro_version >= 110)
+ return _drbd_send_uuids110(peer_device, uuid_flags, node_mask);
+ else
+ return _drbd_send_uuids(peer_device, uuid_flags);
}
-void drbd_print_uuids(struct drbd_device *device, const char *text)
+void drbd_print_uuids(struct drbd_peer_device *peer_device, const char *text)
{
+ struct drbd_device *device = peer_device->device;
+
if (get_ldev_if_state(device, D_NEGOTIATING)) {
- u64 *uuid = device->ldev->md.uuid;
- drbd_info(device, "%s %016llX:%016llX:%016llX:%016llX\n",
- text,
- (unsigned long long)uuid[UI_CURRENT],
- (unsigned long long)uuid[UI_BITMAP],
- (unsigned long long)uuid[UI_HISTORY_START],
- (unsigned long long)uuid[UI_HISTORY_END]);
+ drbd_info(peer_device, "%s %016llX:%016llX:%016llX:%016llX\n",
+ text,
+ (unsigned long long)drbd_current_uuid(device),
+ (unsigned long long)drbd_bitmap_uuid(peer_device),
+ (unsigned long long)drbd_history_uuid(device, 0),
+ (unsigned long long)drbd_history_uuid(device, 1));
put_ldev(device);
} else {
- drbd_info(device, "%s effective data uuid: %016llX\n",
- text,
- (unsigned long long)device->ed_uuid);
+ drbd_info(peer_device, "%s exposed data uuid: %016llX\n",
+ text,
+ (unsigned long long)device->exposed_data_uuid);
}
}
+int drbd_send_current_uuid(struct drbd_peer_device *peer_device, u64 current_uuid, u64 weak_nodes)
+{
+ struct p_current_uuid *p;
+
+ p = drbd_prepare_command(peer_device, sizeof(*p), DATA_STREAM);
+ if (!p)
+ return -EIO;
+
+ peer_device->comm_current_uuid = current_uuid;
+ p->uuid = cpu_to_be64(current_uuid);
+ p->weak_nodes = cpu_to_be64(weak_nodes);
+ return drbd_send_command(peer_device, P_CURRENT_UUID, DATA_STREAM);
+}
+
void drbd_gen_and_send_sync_uuid(struct drbd_peer_device *peer_device)
{
struct drbd_device *device = peer_device->device;
- struct drbd_socket *sock;
- struct p_rs_uuid *p;
+ struct p_uuid *p;
u64 uuid;
- D_ASSERT(device, device->state.disk == D_UP_TO_DATE);
+ D_ASSERT(device, device->disk_state[NOW] == D_UP_TO_DATE);
- uuid = device->ldev->md.uuid[UI_BITMAP];
+ down_write(&device->uuid_sem);
+ uuid = drbd_bitmap_uuid(peer_device);
if (uuid && uuid != UUID_JUST_CREATED)
uuid = uuid + UUID_NEW_BM_OFFSET;
else
get_random_bytes(&uuid, sizeof(u64));
- drbd_uuid_set(device, UI_BITMAP, uuid);
- drbd_print_uuids(device, "updated sync UUID");
+ drbd_uuid_set_bitmap(peer_device, uuid);
+ drbd_print_uuids(peer_device, "updated sync UUID");
drbd_md_sync(device);
+ downgrade_write(&device->uuid_sem);
- sock = &peer_device->connection->data;
- p = drbd_prepare_command(peer_device, sock);
+ p = drbd_prepare_command(peer_device, sizeof(*p), DATA_STREAM);
if (p) {
p->uuid = cpu_to_be64(uuid);
- drbd_send_command(peer_device, sock, P_SYNC_UUID, sizeof(*p), NULL, 0);
+ drbd_send_command(peer_device, P_SYNC_UUID, DATA_STREAM);
}
+ up_read(&device->uuid_sem);
}
-int drbd_send_sizes(struct drbd_peer_device *peer_device, int trigger_reply, enum dds_flags flags)
+int drbd_send_sizes(struct drbd_peer_device *peer_device,
+ uint64_t u_size_diskless, enum dds_flags flags)
{
+ struct drbd_connection *connection = peer_device->connection;
struct drbd_device *device = peer_device->device;
- struct drbd_socket *sock;
struct p_sizes *p;
sector_t d_size, u_size;
int q_order_type;
unsigned int max_bio_size;
unsigned int packet_size;
- sock = &peer_device->connection->data;
- p = drbd_prepare_command(peer_device, sock);
- if (!p)
- return -EIO;
-
packet_size = sizeof(*p);
- if (peer_device->connection->agreed_features & DRBD_FF_WSAME)
+ if (connection->agreed_features & DRBD_FF_WSAME)
packet_size += sizeof(p->qlim[0]);
+ p = drbd_prepare_command(peer_device, packet_size, DATA_STREAM);
+ if (!p)
+ return -EIO;
+
memset(p, 0, packet_size);
if (get_ldev_if_state(device, D_NEGOTIATING)) {
struct block_device *bdev = device->ldev->backing_bdev;
struct request_queue *q = bdev_get_queue(bdev);
- d_size = drbd_get_max_capacity(device->ldev);
+ d_size = drbd_get_max_capacity(device, device->ldev, false);
rcu_read_lock();
u_size = rcu_dereference(device->ldev->disk_conf)->disk_size;
rcu_read_unlock();
@@ -927,6 +1587,10 @@ int drbd_send_sizes(struct drbd_peer_device *peer_device, int trigger_reply, enu
p->qlim->io_min = cpu_to_be32(bdev_io_min(bdev));
p->qlim->io_opt = cpu_to_be32(bdev_io_opt(bdev));
p->qlim->discard_enabled = !!bdev_max_discard_sectors(bdev);
+ p->qlim->write_same_capable = 0;
+ if (connection->agreed_features & DRBD_FF_BM_BLOCK_SHIFT)
+ p->qlim->bm_block_shift_minus_12 =
+ device->bitmap->bm_block_shift - BM_BLOCK_SHIFT_4k;
put_ldev(device);
} else {
struct request_queue *q = device->rq_queue;
@@ -939,128 +1603,307 @@ int drbd_send_sizes(struct drbd_peer_device *peer_device, int trigger_reply, enu
p->qlim->io_min = cpu_to_be32(queue_io_min(q));
p->qlim->io_opt = cpu_to_be32(queue_io_opt(q));
p->qlim->discard_enabled = 0;
+ p->qlim->write_same_capable = 0;
d_size = 0;
- u_size = 0;
+ u_size = u_size_diskless;
q_order_type = QUEUE_ORDERED_NONE;
max_bio_size = DRBD_MAX_BIO_SIZE; /* ... multiple BIOs per peer_request */
}
- if (peer_device->connection->agreed_pro_version <= 94)
+ if (connection->agreed_pro_version <= 94)
max_bio_size = min(max_bio_size, DRBD_MAX_SIZE_H80_PACKET);
- else if (peer_device->connection->agreed_pro_version < 100)
+ else if (connection->agreed_pro_version < 100)
max_bio_size = min(max_bio_size, DRBD_MAX_BIO_SIZE_P95);
+ /* 9.0.4 bumped pro_version to 112 and introduced 2PC resizes */
+ if (connection->agreed_pro_version >= 112)
+ d_size = drbd_partition_data_capacity(device);
+
p->d_size = cpu_to_be64(d_size);
p->u_size = cpu_to_be64(u_size);
- if (trigger_reply)
- p->c_size = 0;
- else
- p->c_size = cpu_to_be64(get_capacity(device->vdisk));
+ /*
+ TODO verify: this may be needed for v8 compatibility still.
+ p->c_size = cpu_to_be64(trigger_reply ? 0 : get_capacity(device->vdisk));
+ */
+ p->c_size = cpu_to_be64(get_capacity(device->vdisk));
p->max_bio_size = cpu_to_be32(max_bio_size);
p->queue_order_type = cpu_to_be16(q_order_type);
p->dds_flags = cpu_to_be16(flags);
- return drbd_send_command(peer_device, sock, P_SIZES, packet_size, NULL, 0);
+ return drbd_send_command(peer_device, P_SIZES, DATA_STREAM);
}
-/**
- * drbd_send_current_state() - Sends the drbd state to the peer
- * @peer_device: DRBD peer device.
- */
int drbd_send_current_state(struct drbd_peer_device *peer_device)
{
- struct drbd_socket *sock;
+ return drbd_send_state(peer_device, drbd_get_peer_device_state(peer_device, NOW));
+}
+
+static int send_state(struct drbd_connection *connection, int vnr, union drbd_state state)
+{
struct p_state *p;
- sock = &peer_device->connection->data;
- p = drbd_prepare_command(peer_device, sock);
+ p = conn_prepare_command(connection, sizeof(*p), DATA_STREAM);
if (!p)
return -EIO;
- p->state = cpu_to_be32(peer_device->device->state.i); /* Within the send mutex */
- return drbd_send_command(peer_device, sock, P_STATE, sizeof(*p), NULL, 0);
+
+ if (connection->agreed_pro_version < 110) {
+ /* D_DETACHING was introduced with drbd-9.0 */
+ if (state.disk > D_DETACHING)
+ state.disk--;
+ if (state.pdsk > D_DETACHING)
+ state.pdsk--;
+ }
+
+ p->state = cpu_to_be32(state.i); /* Within the send mutex */
+ return send_command(connection, vnr, P_STATE, DATA_STREAM);
+}
+
+int conn_send_state(struct drbd_connection *connection, union drbd_state state)
+{
+ BUG_ON(connection->agreed_pro_version < 100);
+ return send_state(connection, -1, state);
}
/**
- * drbd_send_state() - After a state change, sends the new state to the peer
- * @peer_device: DRBD peer device.
- * @state: the state to send, not necessarily the current state.
- *
- * Each state change queues an "after_state_ch" work, which will eventually
- * send the resulting new state to the peer. If more state changes happen
- * between queuing and processing of the after_state_ch work, we still
- * want to send each intermediary state in the order it occurred.
+ * drbd_send_state() - Sends the drbd state to the peer
+ * @peer_device: Peer DRBD device to send the state to.
+ * @state: state to send
*/
int drbd_send_state(struct drbd_peer_device *peer_device, union drbd_state state)
{
- struct drbd_socket *sock;
- struct p_state *p;
-
- sock = &peer_device->connection->data;
- p = drbd_prepare_command(peer_device, sock);
- if (!p)
- return -EIO;
- p->state = cpu_to_be32(state.i); /* Within the send mutex */
- return drbd_send_command(peer_device, sock, P_STATE, sizeof(*p), NULL, 0);
+ peer_device->comm_state = state;
+ return send_state(peer_device->connection, peer_device->device->vnr, state);
}
-int drbd_send_state_req(struct drbd_peer_device *peer_device, union drbd_state mask, union drbd_state val)
+int conn_send_state_req(struct drbd_connection *connection, int vnr, enum drbd_packet cmd,
+ union drbd_state mask, union drbd_state val)
{
- struct drbd_socket *sock;
struct p_req_state *p;
- sock = &peer_device->connection->data;
- p = drbd_prepare_command(peer_device, sock);
+ /* Protocols before version 100 only support one volume and connection.
+ * All state change requests are via P_STATE_CHG_REQ. */
+ if (connection->agreed_pro_version < 100)
+ cmd = P_STATE_CHG_REQ;
+
+ p = conn_prepare_command(connection, sizeof(*p), DATA_STREAM);
if (!p)
return -EIO;
p->mask = cpu_to_be32(mask.i);
p->val = cpu_to_be32(val.i);
- return drbd_send_command(peer_device, sock, P_STATE_CHG_REQ, sizeof(*p), NULL, 0);
+
+ return send_command(connection, vnr, cmd, DATA_STREAM);
}
-int conn_send_state_req(struct drbd_connection *connection, union drbd_state mask, union drbd_state val)
+int conn_send_twopc_request(struct drbd_connection *connection, struct twopc_request *request)
{
- enum drbd_packet cmd;
- struct drbd_socket *sock;
- struct p_req_state *p;
+ struct drbd_resource *resource = connection->resource;
+ struct p_twopc_request *p;
- cmd = connection->agreed_pro_version < 100 ? P_STATE_CHG_REQ : P_CONN_ST_CHG_REQ;
- sock = &connection->data;
- p = conn_prepare_command(connection, sock);
+ dynamic_drbd_dbg(connection, "Sending %s request for state change %u\n",
+ drbd_packet_name(request->cmd),
+ request->tid);
+
+ p = conn_prepare_command(connection, sizeof(*p), DATA_STREAM);
if (!p)
return -EIO;
- p->mask = cpu_to_be32(mask.i);
- p->val = cpu_to_be32(val.i);
- return conn_send_command(connection, sock, cmd, sizeof(*p), NULL, 0);
+ p->tid = cpu_to_be32(request->tid);
+ if (connection->agreed_features & DRBD_FF_2PC_V2) {
+ p->flags = cpu_to_be32(TWOPC_HAS_FLAGS | request->flags);
+ p->_pad = 0;
+ p->s8_initiator_node_id = request->initiator_node_id;
+ p->s8_target_node_id = request->target_node_id;
+ } else {
+ p->u32_initiator_node_id = cpu_to_be32(request->initiator_node_id);
+ p->u32_target_node_id = cpu_to_be32(request->target_node_id);
+ }
+ p->nodes_to_reach = cpu_to_be64(request->nodes_to_reach);
+ switch (resource->twopc.type) {
+ case TWOPC_STATE_CHANGE:
+ if (request->cmd == P_TWOPC_PREPARE) {
+ p->_compat_pad = 0;
+ p->mask = cpu_to_be32(resource->twopc.state_change.mask.i);
+ p->val = cpu_to_be32(resource->twopc.state_change.val.i);
+ } else { /* P_TWOPC_COMMIT */
+ p->primary_nodes = cpu_to_be64(resource->twopc.state_change.primary_nodes);
+ if (request->flags & TWOPC_HAS_REACHABLE &&
+ connection->agreed_features & DRBD_FF_2PC_V2) {
+ p->reachable_nodes = cpu_to_be64(
+ resource->twopc.state_change.reachable_nodes);
+ } else {
+ p->mask = cpu_to_be32(resource->twopc.state_change.mask.i);
+ p->val = cpu_to_be32(resource->twopc.state_change.val.i);
+ }
+ }
+ break;
+ case TWOPC_RESIZE:
+ if (request->cmd == P_TWOPC_PREP_RSZ) {
+ p->user_size = cpu_to_be64(resource->twopc.resize.user_size);
+ p->dds_flags = cpu_to_be16(resource->twopc.resize.dds_flags);
+ } else { /* P_TWOPC_COMMIT */
+ p->diskful_primary_nodes =
+ cpu_to_be64(resource->twopc.resize.diskful_primary_nodes);
+ p->exposed_size = cpu_to_be64(resource->twopc.resize.new_size);
+ }
+ }
+ return send_command(connection, request->vnr, request->cmd, DATA_STREAM | SFLAG_FLUSH);
}
-void drbd_send_sr_reply(struct drbd_peer_device *peer_device, enum drbd_state_rv retcode)
+void drbd_send_sr_reply(struct drbd_connection *connection, int vnr, enum drbd_state_rv retcode)
{
- struct drbd_socket *sock;
struct p_req_state_reply *p;
- sock = &peer_device->connection->meta;
- p = drbd_prepare_command(peer_device, sock);
+ p = conn_prepare_command(connection, sizeof(*p), CONTROL_STREAM);
if (p) {
+ enum drbd_packet cmd = P_STATE_CHG_REPLY;
+
+ if (connection->agreed_pro_version >= 100 && vnr < 0)
+ cmd = P_CONN_ST_CHG_REPLY;
+
p->retcode = cpu_to_be32(retcode);
- drbd_send_command(peer_device, sock, P_STATE_CHG_REPLY, sizeof(*p), NULL, 0);
+ send_command(connection, vnr, cmd, CONTROL_STREAM);
}
}
-void conn_send_sr_reply(struct drbd_connection *connection, enum drbd_state_rv retcode)
+void drbd_send_twopc_reply(struct drbd_connection *connection,
+ enum drbd_packet cmd, struct twopc_reply *reply)
{
- struct drbd_socket *sock;
- struct p_req_state_reply *p;
- enum drbd_packet cmd = connection->agreed_pro_version < 100 ? P_STATE_CHG_REPLY : P_CONN_ST_CHG_REPLY;
+ struct p_twopc_reply *p;
- sock = &connection->meta;
- p = conn_prepare_command(connection, sock);
+ p = conn_prepare_command(connection, sizeof(*p), CONTROL_STREAM);
if (p) {
- p->retcode = cpu_to_be32(retcode);
- conn_send_command(connection, sock, cmd, sizeof(*p), NULL, 0);
+ p->tid = cpu_to_be32(reply->tid);
+ p->initiator_node_id = cpu_to_be32(reply->initiator_node_id);
+ p->reachable_nodes = cpu_to_be64(reply->reachable_nodes);
+ switch (connection->resource->twopc.type) {
+ case TWOPC_STATE_CHANGE:
+ p->primary_nodes = cpu_to_be64(reply->primary_nodes);
+ p->weak_nodes = cpu_to_be64(reply->weak_nodes);
+ break;
+ case TWOPC_RESIZE:
+ p->diskful_primary_nodes = cpu_to_be64(reply->diskful_primary_nodes);
+ p->max_possible_size = cpu_to_be64(reply->max_possible_size);
+ break;
+ }
+ send_command(connection, reply->vnr, cmd, CONTROL_STREAM | SFLAG_FLUSH);
+ }
+}
+
+void drbd_send_peers_in_sync(struct drbd_peer_device *peer_device, u64 mask, sector_t sector, int size)
+{
+ struct p_peer_block_desc *p;
+
+ p = drbd_prepare_command(peer_device, sizeof(*p), CONTROL_STREAM);
+ if (p) {
+ p->sector = cpu_to_be64(sector);
+ p->mask = cpu_to_be64(mask);
+ p->size = cpu_to_be32(size);
+ p->pad = 0;
+ drbd_send_command(peer_device, P_PEERS_IN_SYNC, CONTROL_STREAM);
}
}
+int drbd_send_peer_dagtag(struct drbd_connection *connection, struct drbd_connection *lost_peer)
+{
+ struct p_peer_dagtag *p;
+
+ p = conn_prepare_command(connection, sizeof(*p), DATA_STREAM);
+ if (!p)
+ return -EIO;
+
+ p->dagtag = cpu_to_be64(atomic64_read(&lost_peer->last_dagtag_sector));
+ p->node_id = cpu_to_be32(lost_peer->peer_node_id);
+
+ return send_command(connection, -1, P_PEER_DAGTAG, DATA_STREAM);
+}
+
+int drbd_send_flush_requests(struct drbd_connection *connection, u64 flush_sequence)
+{
+ struct p_flush_requests *p;
+
+ p = conn_prepare_command(connection, sizeof(*p), DATA_STREAM);
+ if (!p)
+ return -EIO;
+
+ p->flush_sequence = cpu_to_be64(flush_sequence);
+
+ return send_command(connection, -1, P_FLUSH_REQUESTS, DATA_STREAM);
+}
+
+int drbd_send_flush_forward(struct drbd_connection *connection, u64 flush_sequence,
+ int initiator_node_id)
+{
+ struct p_flush_forward *p;
+
+ p = conn_prepare_command(connection, sizeof(*p), CONTROL_STREAM);
+ if (!p)
+ return -EIO;
+
+ p->flush_sequence = cpu_to_be64(flush_sequence);
+ p->initiator_node_id = cpu_to_be32(initiator_node_id);
+
+ return send_command(connection, -1, P_FLUSH_FORWARD, CONTROL_STREAM);
+}
+
+int drbd_send_flush_requests_ack(struct drbd_connection *connection, u64 flush_sequence,
+ int primary_node_id)
+{
+ struct p_flush_ack *p;
+
+ p = conn_prepare_command(connection, sizeof(*p), DATA_STREAM);
+ if (!p)
+ return -EIO;
+
+ p->flush_sequence = cpu_to_be64(flush_sequence);
+ p->primary_node_id = cpu_to_be32(primary_node_id);
+
+ return send_command(connection, -1, P_FLUSH_REQUESTS_ACK, DATA_STREAM);
+}
+
+int drbd_send_enable_replication_next(struct drbd_peer_device *peer_device)
+{
+ struct p_enable_replication *p;
+ struct peer_device_conf *pdc;
+ bool resync_without_replication;
+
+ set_bit(PEER_REPLICATION_NEXT, &peer_device->flags);
+ if (!(peer_device->connection->agreed_features & DRBD_FF_RESYNC_WITHOUT_REPLICATION))
+ return 0;
+
+ p = drbd_prepare_command(peer_device, sizeof(*p), DATA_STREAM);
+ if (!p)
+ return -EIO;
+
+ rcu_read_lock();
+ pdc = rcu_dereference(peer_device->conf);
+ resync_without_replication = pdc->resync_without_replication;
+ rcu_read_unlock();
+
+ if (resync_without_replication)
+ clear_bit(PEER_REPLICATION_NEXT, &peer_device->flags);
+
+ p->enable = !resync_without_replication;
+ p->_pad1 = 0;
+ p->_pad2 = 0;
+
+ return drbd_send_command(peer_device, P_ENABLE_REPLICATION_NEXT, DATA_STREAM);
+}
+
+int drbd_send_enable_replication(struct drbd_peer_device *peer_device, bool enable)
+{
+ struct p_enable_replication *p;
+
+ p = drbd_prepare_command(peer_device, sizeof(*p), DATA_STREAM);
+ if (!p)
+ return -EIO;
+
+ p->enable = enable;
+ p->_pad1 = 0;
+ p->_pad2 = 0;
+
+ return drbd_send_command(peer_device, P_ENABLE_REPLICATION, DATA_STREAM);
+}
+
static void dcbp_set_code(struct p_compressed_bm *p, enum drbd_bitmap_code code)
{
BUG_ON(code & ~0xf);
@@ -1078,24 +1921,28 @@ static void dcbp_set_pad_bits(struct p_compressed_bm *p, int n)
p->encoding = (p->encoding & (~0x7 << 4)) | (n << 4);
}
-static int fill_bitmap_rle_bits(struct drbd_device *device,
- struct p_compressed_bm *p,
- unsigned int size,
- struct bm_xfer_ctx *c)
+/* For compat reasons, encode bitmap as if it was 4k per bit!
+ * Easy: just scale the run length.
+ */
+static int fill_bitmap_rle_bits(struct drbd_peer_device *peer_device,
+ struct p_compressed_bm *p,
+ unsigned int size,
+ struct bm_xfer_ctx *c)
{
struct bitstream bs;
unsigned long plain_bits;
unsigned long tmp;
unsigned long rl;
+ unsigned long rl_4k;
unsigned len;
unsigned toggle;
int bits, use_rle;
/* may we use this feature? */
rcu_read_lock();
- use_rle = rcu_dereference(first_peer_device(device)->connection->net_conf)->use_rle;
+ use_rle = rcu_dereference(peer_device->connection->transport.net_conf)->use_rle;
rcu_read_unlock();
- if (!use_rle || first_peer_device(device)->connection->agreed_pro_version < 90)
+ if (!use_rle || peer_device->connection->agreed_pro_version < 90)
return 0;
if (c->bit_offset >= c->bm_bits)
@@ -1115,11 +1962,16 @@ static int fill_bitmap_rle_bits(struct drbd_device *device,
/* see how much plain bits we can stuff into one packet
* using RLE and VLI. */
do {
- tmp = (toggle == 0) ? _drbd_bm_find_next_zero(device, c->bit_offset)
- : _drbd_bm_find_next(device, c->bit_offset);
- if (tmp == -1UL)
+ tmp = (toggle == 0) ? _drbd_bm_find_next_zero(peer_device, c->bit_offset)
+ : _drbd_bm_find_next(peer_device, c->bit_offset);
+ if (tmp == -1UL) {
tmp = c->bm_bits;
- rl = tmp - c->bit_offset;
+ rl = tmp - c->bit_offset;
+ rl_4k = c->bm_bits_4k - (c->bit_offset << c->scale);
+ } else {
+ rl = tmp - c->bit_offset;
+ rl_4k = rl << c->scale;
+ }
if (toggle == 2) { /* first iteration */
if (rl == 0) {
@@ -1136,16 +1988,16 @@ static int fill_bitmap_rle_bits(struct drbd_device *device,
/* paranoia: catch zero runlength.
* can only happen if bitmap is modified while we scan it. */
if (rl == 0) {
- drbd_err(device, "unexpected zero runlength while encoding bitmap "
+ drbd_err(peer_device, "unexpected zero runlength while encoding bitmap "
"t:%u bo:%lu\n", toggle, c->bit_offset);
return -1;
}
- bits = vli_encode_bits(&bs, rl);
+ bits = vli_encode_bits(&bs, rl_4k);
if (bits == -ENOBUFS) /* buffer full */
break;
if (bits <= 0) {
- drbd_err(device, "error while encoding bitmap: %d\n", bits);
+ drbd_err(peer_device, "error while encoding bitmap: %d\n", bits);
return 0;
}
@@ -1156,7 +2008,7 @@ static int fill_bitmap_rle_bits(struct drbd_device *device,
len = bs.cur.b - p->code + !!bs.cur.bit;
- if (plain_bits < (len << 3)) {
+ if (plain_bits << c->scale < (len << 3)) {
/* incompressible with this method.
* we need to rewind both word and bit position. */
c->bit_offset -= plain_bits;
@@ -1175,33 +2027,69 @@ static int fill_bitmap_rle_bits(struct drbd_device *device,
return len;
}
+/* Repeat extracted bits by "peeling off" words from the end.
+ * scale != 0 implies that repeat >= 2.
+ * Feel free to optimize ...
+ */
+static void repeat_bits(unsigned long *base, unsigned long num, unsigned int scale)
+{
+ unsigned long *src, *dst;
+ unsigned int repeat = 1 << scale;
+ unsigned int n;
+ int sbit, dbit, i;
+
+ for (n = num - 1; n > 0; n--) {
+ src = &base[n];
+ for (i = 0; i < repeat; i++) {
+ dst = &base[n*repeat + i];
+ *dst = 0;
+ for (dbit = 0; dbit < BITS_PER_LONG; dbit++) {
+ sbit = (i * BITS_PER_LONG + dbit) >> scale;
+ if (test_bit(sbit, src))
+ *dst |= 1UL << dbit;
+ }
+ }
+ }
+}
+
/*
* send_bitmap_rle_or_plain
*
* Return 0 when done, 1 when another iteration is needed, and a negative error
* code upon failure.
+ *
+ * For compat reasons, send bitmap as if it was 4k per bit!
+ * Good thing that a "scaled" bitmap will always "compress".
*/
static int
send_bitmap_rle_or_plain(struct drbd_peer_device *peer_device, struct bm_xfer_ctx *c)
{
struct drbd_device *device = peer_device->device;
- struct drbd_socket *sock = &peer_device->connection->data;
unsigned int header_size = drbd_header_size(peer_device->connection);
- struct p_compressed_bm *p = sock->sbuf + header_size;
+ struct p_compressed_bm *pc;
+ char *p;
int len, err;
- len = fill_bitmap_rle_bits(device, p,
- DRBD_SOCKET_BUFFER_SIZE - header_size - sizeof(*p), c);
- if (len < 0)
+ p = alloc_send_buffer(peer_device->connection, DRBD_SOCKET_BUFFER_SIZE, DATA_STREAM);
+ if (IS_ERR(p))
return -EIO;
+ pc = (struct p_compressed_bm *)(p + header_size);
+
+ len = fill_bitmap_rle_bits(peer_device, pc,
+ DRBD_SOCKET_BUFFER_SIZE - header_size - sizeof(*pc), c);
+ if (len < 0) {
+ cancel_send_buffer(peer_device->connection, DATA_STREAM);
+ return -EIO;
+ }
+
if (len) {
- dcbp_set_code(p, RLE_VLI_Bits);
- err = __send_command(peer_device->connection, device->vnr, sock,
- P_COMPRESSED_BITMAP, sizeof(*p) + len,
- NULL, 0);
+ dcbp_set_code(pc, RLE_VLI_Bits);
+ resize_prepared_command(peer_device->connection, DATA_STREAM, sizeof(*pc) + len);
+ err = __send_command(peer_device->connection, device->vnr,
+ P_COMPRESSED_BITMAP, DATA_STREAM);
c->packets[0]++;
- c->bytes[0] += header_size + sizeof(*p) + len;
+ c->bytes[0] += header_size + sizeof(*pc) + len;
if (c->bit_offset >= c->bm_bits)
len = 0; /* DONE */
@@ -1210,16 +2098,40 @@ send_bitmap_rle_or_plain(struct drbd_peer_device *peer_device, struct bm_xfer_ct
* send a buffer full of plain text bits instead. */
unsigned int data_size;
unsigned long num_words;
- unsigned long *p = sock->sbuf + header_size;
-
+ unsigned long words_left = c->bm_words - c->word_offset;
+ unsigned long *pu = (unsigned long *)pc;
+
+ /* Only send full native bitmap words (actual granularity),
+ * scaled to what they would look like at 4k granularity.
+ * At maximum scale, which is (20 - 12), factor 256,
+ * to transfer at least one word of unscaled bitmap,
+ * we need data_size >= 256 (unsigned long) words,
+ * that is >= 2048 byte. Which we always have.
+ */
data_size = DRBD_SOCKET_BUFFER_SIZE - header_size;
- num_words = min_t(size_t, data_size / sizeof(*p),
- c->bm_words - c->word_offset);
- len = num_words * sizeof(*p);
- if (len)
- drbd_bm_get_lel(device, c->word_offset, num_words, p);
- err = __send_command(peer_device->connection, device->vnr, sock, P_BITMAP,
- len, NULL, 0);
+ data_size = ALIGN_DOWN(data_size, sizeof(*pu) * (1UL << c->scale));
+ num_words = (data_size / sizeof(*pu)) >> c->scale;
+ num_words = min_t(size_t, num_words, words_left);
+
+ len = num_words * sizeof(*pu);
+ if (len) {
+ drbd_bm_get_lel(peer_device, c->word_offset, num_words, pu);
+
+ if (c->scale) {
+ repeat_bits(pu, num_words, c->scale);
+ len <<= c->scale;
+ }
+ } else if (words_left != 0) {
+ drbd_err(peer_device,
+ "failed to scale %lu words by %u while sending bitmap\n",
+ words_left, c->scale);
+ cancel_send_buffer(peer_device->connection, DATA_STREAM);
+ return -ERANGE;
+ }
+
+ resize_prepared_command(peer_device->connection, DATA_STREAM, len);
+ err = __send_command(peer_device->connection, device->vnr, P_BITMAP, DATA_STREAM);
+
c->word_offset += num_words;
c->bit_offset = c->word_offset * BITS_PER_LONG;
@@ -1240,396 +2152,233 @@ send_bitmap_rle_or_plain(struct drbd_peer_device *peer_device, struct bm_xfer_ct
}
/* See the comment at receive_bitmap() */
-static int _drbd_send_bitmap(struct drbd_device *device,
- struct drbd_peer_device *peer_device)
+static bool _drbd_send_bitmap(struct drbd_device *device,
+ struct drbd_peer_device *peer_device)
{
struct bm_xfer_ctx c;
- int err;
-
- if (!expect(device, device->bitmap))
- return false;
+ int res;
if (get_ldev(device)) {
- if (drbd_md_test_flag(device->ldev, MDF_FULL_SYNC)) {
+ if (drbd_md_test_peer_flag(peer_device, MDF_PEER_FULL_SYNC)) {
drbd_info(device, "Writing the whole bitmap, MDF_FullSync was set.\n");
- drbd_bm_set_all(device);
- if (drbd_bm_write(device, peer_device)) {
+ drbd_bm_set_many_bits(peer_device, 0, -1UL);
+ if (drbd_bm_write(device, NULL)) {
/* write_bm did fail! Leave full sync flag set in Meta P_DATA
* but otherwise process as per normal - need to tell other
* side that a full resync is required! */
drbd_err(device, "Failed to write bitmap to disk!\n");
} else {
- drbd_md_clear_flag(device, MDF_FULL_SYNC);
+ drbd_md_clear_peer_flag(peer_device, MDF_PEER_FULL_SYNC);
drbd_md_sync(device);
}
}
+ c = (struct bm_xfer_ctx) {
+ .bm_bits_4k = drbd_bm_bits_4k(device),
+ .bm_bits = drbd_bm_bits(device),
+ .bm_words = drbd_bm_words(device),
+ .scale = device->bitmap->bm_block_shift - BM_BLOCK_SHIFT_4k,
+ };
+
put_ldev(device);
+ } else {
+ return false;
}
- c = (struct bm_xfer_ctx) {
- .bm_bits = drbd_bm_bits(device),
- .bm_words = drbd_bm_words(device),
- };
-
do {
- err = send_bitmap_rle_or_plain(peer_device, &c);
- } while (err > 0);
+ if (get_ldev(device)) {
+ res = send_bitmap_rle_or_plain(peer_device, &c);
+ put_ldev(device);
+ } else {
+ return false;
+ }
+ } while (res > 0);
- return err == 0;
+ return res == 0;
}
int drbd_send_bitmap(struct drbd_device *device, struct drbd_peer_device *peer_device)
{
- struct drbd_socket *sock = &peer_device->connection->data;
+ struct drbd_transport *peer_transport = &peer_device->connection->transport;
int err = -1;
- mutex_lock(&sock->mutex);
- if (sock->socket)
- err = !_drbd_send_bitmap(device, peer_device);
- mutex_unlock(&sock->mutex);
- return err;
-}
-
-void drbd_send_b_ack(struct drbd_connection *connection, u32 barrier_nr, u32 set_size)
-{
- struct drbd_socket *sock;
- struct p_barrier_ack *p;
-
- if (connection->cstate < C_WF_REPORT_PARAMS)
- return;
-
- sock = &connection->meta;
- p = conn_prepare_command(connection, sock);
- if (!p)
- return;
- p->barrier = barrier_nr;
- p->set_size = cpu_to_be32(set_size);
- conn_send_command(connection, sock, P_BARRIER_ACK, sizeof(*p), NULL, 0);
-}
-
-/**
- * _drbd_send_ack() - Sends an ack packet
- * @peer_device: DRBD peer device.
- * @cmd: Packet command code.
- * @sector: sector, needs to be in big endian byte order
- * @blksize: size in byte, needs to be in big endian byte order
- * @block_id: Id, big endian byte order
- */
-static int _drbd_send_ack(struct drbd_peer_device *peer_device, enum drbd_packet cmd,
- u64 sector, u32 blksize, u64 block_id)
-{
- struct drbd_socket *sock;
- struct p_block_ack *p;
-
- if (peer_device->device->state.conn < C_CONNECTED)
+ if (peer_device->bitmap_index == -1) {
+ drbd_err(peer_device, "No bitmap allocated in drbd_send_bitmap()!\n");
return -EIO;
+ }
- sock = &peer_device->connection->meta;
- p = drbd_prepare_command(peer_device, sock);
- if (!p)
- return -EIO;
- p->sector = sector;
- p->block_id = block_id;
- p->blksize = blksize;
- p->seq_num = cpu_to_be32(atomic_inc_return(&peer_device->device->packet_seq));
- return drbd_send_command(peer_device, sock, cmd, sizeof(*p), NULL, 0);
-}
-
-/* dp->sector and dp->block_id already/still in network byte order,
- * data_size is payload size according to dp->head,
- * and may need to be corrected for digest size. */
-void drbd_send_ack_dp(struct drbd_peer_device *peer_device, enum drbd_packet cmd,
- struct p_data *dp, int data_size)
-{
- if (peer_device->connection->peer_integrity_tfm)
- data_size -= crypto_shash_digestsize(peer_device->connection->peer_integrity_tfm);
- _drbd_send_ack(peer_device, cmd, dp->sector, cpu_to_be32(data_size),
- dp->block_id);
-}
-
-void drbd_send_ack_rp(struct drbd_peer_device *peer_device, enum drbd_packet cmd,
- struct p_block_req *rp)
-{
- _drbd_send_ack(peer_device, cmd, rp->sector, rp->blksize, rp->block_id);
-}
-
-/**
- * drbd_send_ack() - Sends an ack packet
- * @peer_device: DRBD peer device
- * @cmd: packet command code
- * @peer_req: peer request
- */
-int drbd_send_ack(struct drbd_peer_device *peer_device, enum drbd_packet cmd,
- struct drbd_peer_request *peer_req)
-{
- return _drbd_send_ack(peer_device, cmd,
- cpu_to_be64(peer_req->i.sector),
- cpu_to_be32(peer_req->i.size),
- peer_req->block_id);
-}
+ mutex_lock(&peer_device->connection->mutex[DATA_STREAM]);
+ if (peer_transport->class->ops.stream_ok(peer_transport, DATA_STREAM))
+ err = !_drbd_send_bitmap(device, peer_device);
+ mutex_unlock(&peer_device->connection->mutex[DATA_STREAM]);
-/* This function misuses the block_id field to signal if the blocks
- * are is sync or not. */
-int drbd_send_ack_ex(struct drbd_peer_device *peer_device, enum drbd_packet cmd,
- sector_t sector, int blksize, u64 block_id)
-{
- return _drbd_send_ack(peer_device, cmd,
- cpu_to_be64(sector),
- cpu_to_be32(blksize),
- cpu_to_be64(block_id));
+ return err;
}
int drbd_send_rs_deallocated(struct drbd_peer_device *peer_device,
struct drbd_peer_request *peer_req)
{
- struct drbd_socket *sock;
- struct p_block_desc *p;
+ struct p_block_ack *p_id;
- sock = &peer_device->connection->data;
- p = drbd_prepare_command(peer_device, sock);
- if (!p)
+ if (peer_device->connection->agreed_pro_version < 122) {
+ struct p_block_desc *p;
+
+ p = drbd_prepare_command(peer_device, sizeof(*p), DATA_STREAM);
+ if (!p)
+ return -EIO;
+ p->sector = cpu_to_be64(peer_req->i.sector);
+ p->blksize = cpu_to_be32(peer_req->i.size);
+ p->pad = 0;
+ return drbd_send_command(peer_device, P_RS_DEALLOCATED, DATA_STREAM);
+ }
+
+ p_id = drbd_prepare_command(peer_device, sizeof(*p_id), DATA_STREAM);
+ if (!p_id)
return -EIO;
- p->sector = cpu_to_be64(peer_req->i.sector);
- p->blksize = cpu_to_be32(peer_req->i.size);
- p->pad = 0;
- return drbd_send_command(peer_device, sock, P_RS_DEALLOCATED, sizeof(*p), NULL, 0);
+ p_id->sector = cpu_to_be64(peer_req->i.sector);
+ p_id->blksize = cpu_to_be32(peer_req->i.size);
+ p_id->block_id = peer_req->block_id;
+ p_id->seq_num = 0;
+ return drbd_send_command(peer_device, P_RS_DEALLOCATED_ID, DATA_STREAM);
}
-int drbd_send_drequest(struct drbd_peer_device *peer_device, int cmd,
+int drbd_send_drequest(struct drbd_peer_device *peer_device,
sector_t sector, int size, u64 block_id)
{
- struct drbd_socket *sock;
struct p_block_req *p;
- sock = &peer_device->connection->data;
- p = drbd_prepare_command(peer_device, sock);
+ p = drbd_prepare_command(peer_device, sizeof(*p), DATA_STREAM);
if (!p)
return -EIO;
p->sector = cpu_to_be64(sector);
p->block_id = block_id;
p->blksize = cpu_to_be32(size);
- return drbd_send_command(peer_device, sock, cmd, sizeof(*p), NULL, 0);
-}
-
-int drbd_send_drequest_csum(struct drbd_peer_device *peer_device, sector_t sector, int size,
- void *digest, int digest_size, enum drbd_packet cmd)
-{
- struct drbd_socket *sock;
- struct p_block_req *p;
+ p->pad = 0;
+ return drbd_send_command(peer_device, P_DATA_REQUEST, DATA_STREAM);
+}
+
+static void *drbd_prepare_rs_req(struct drbd_peer_device *peer_device, enum drbd_packet cmd, int payload_size,
+ sector_t sector, int blksize, u64 block_id, unsigned int dagtag_node_id, u64 dagtag)
+{
+ void *payload;
+ struct p_block_req_common *req_common;
+
+ if (cmd == P_RS_DAGTAG_REQ || cmd == P_RS_CSUM_DAGTAG_REQ || cmd == P_RS_THIN_DAGTAG_REQ ||
+ cmd == P_OV_DAGTAG_REQ || cmd == P_OV_DAGTAG_REPLY) {
+ struct p_rs_req *p;
+ /* Due to the slightly complicated nested struct definition,
+ * verify that the packet size is as expected. */
+ BUILD_BUG_ON(sizeof(struct p_rs_req) != 32);
+ p = drbd_prepare_command(peer_device, sizeof(*p) + payload_size, DATA_STREAM);
+ if (!p)
+ return NULL;
+ payload = p + 1;
+ req_common = &p->req_common;
+ p->dagtag_node_id = cpu_to_be32(dagtag_node_id);
+ p->dagtag = cpu_to_be64(dagtag);
+ } else {
+ struct p_block_req *p;
+ /* Due to the slightly complicated nested struct definition,
+ * verify that the packet size is as expected. */
+ BUILD_BUG_ON(sizeof(struct p_block_req) != 24);
+ p = drbd_prepare_command(peer_device, sizeof(*p) + payload_size, DATA_STREAM);
+ if (!p)
+ return NULL;
+ payload = p + 1;
+ req_common = &p->req_common;
+ p->pad = 0;
+ }
- /* FIXME: Put the digest into the preallocated socket buffer. */
+ req_common->sector = cpu_to_be64(sector);
+ req_common->block_id = block_id;
+ req_common->blksize = cpu_to_be32(blksize);
- sock = &peer_device->connection->data;
- p = drbd_prepare_command(peer_device, sock);
- if (!p)
- return -EIO;
- p->sector = cpu_to_be64(sector);
- p->block_id = ID_SYNCER /* unused */;
- p->blksize = cpu_to_be32(size);
- return drbd_send_command(peer_device, sock, cmd, sizeof(*p), digest, digest_size);
+ return payload;
}
-int drbd_send_ov_request(struct drbd_peer_device *peer_device, sector_t sector, int size)
+int drbd_send_rs_request(struct drbd_peer_device *peer_device, enum drbd_packet cmd,
+ sector_t sector, int size, u64 block_id,
+ unsigned int dagtag_node_id, u64 dagtag)
{
- struct drbd_socket *sock;
- struct p_block_req *p;
-
- sock = &peer_device->connection->data;
- p = drbd_prepare_command(peer_device, sock);
- if (!p)
+ if (!drbd_prepare_rs_req(peer_device, cmd, 0,
+ sector, size, block_id, dagtag_node_id, dagtag))
return -EIO;
- p->sector = cpu_to_be64(sector);
- p->block_id = ID_SYNCER /* unused */;
- p->blksize = cpu_to_be32(size);
- return drbd_send_command(peer_device, sock, P_OV_REQUEST, sizeof(*p), NULL, 0);
+ return drbd_send_command(peer_device, cmd, DATA_STREAM);
}
-/* called on sndtimeo
- * returns false if we should retry,
- * true if we think connection is dead
- */
-static int we_should_drop_the_connection(struct drbd_connection *connection, struct socket *sock)
+void *drbd_prepare_drequest_csum(struct drbd_peer_request *peer_req, enum drbd_packet cmd,
+ int digest_size, unsigned int dagtag_node_id, u64 dagtag)
{
- int drop_it;
- /* long elapsed = (long)(jiffies - device->last_received); */
-
- drop_it = connection->meta.socket == sock
- || !connection->ack_receiver.task
- || get_t_state(&connection->ack_receiver) != RUNNING
- || connection->cstate < C_WF_REPORT_PARAMS;
-
- if (drop_it)
- return true;
-
- drop_it = !--connection->ko_count;
- if (!drop_it) {
- drbd_err(connection, "[%s/%d] sock_sendmsg time expired, ko = %u\n",
- current->comm, current->pid, connection->ko_count);
- request_ping(connection);
- }
-
- return drop_it; /* && (device->state == R_PRIMARY) */;
+ struct drbd_peer_device *peer_device = peer_req->peer_device;
+ return drbd_prepare_rs_req(peer_device, cmd, digest_size,
+ peer_req->i.sector, peer_req->i.size, peer_req->block_id,
+ dagtag_node_id, dagtag);
}
-static void drbd_update_congested(struct drbd_connection *connection)
-{
- struct sock *sk = connection->data.socket->sk;
- if (sk->sk_wmem_queued > sk->sk_sndbuf * 4 / 5)
- set_bit(NET_CONGESTED, &connection->flags);
-}
-/* The idea of sendpage seems to be to put some kind of reference
- * to the page into the skb, and to hand it over to the NIC. In
- * this process get_page() gets called.
- *
- * As soon as the page was really sent over the network put_page()
- * gets called by some part of the network layer. [ NIC driver? ]
- *
- * [ get_page() / put_page() increment/decrement the count. If count
- * reaches 0 the page will be freed. ]
- *
- * This works nicely with pages from FSs.
- * But this means that in protocol A we might signal IO completion too early!
- *
- * In order not to corrupt data during a resync we must make sure
- * that we do not reuse our own buffer pages (EEs) to early, therefore
- * we have the net_ee list.
- *
- * XFS seems to have problems, still, it submits pages with page_count == 0!
- * As a workaround, we disable sendpage on pages
- * with page_count == 0 or PageSlab.
- */
-static int _drbd_no_send_page(struct drbd_peer_device *peer_device, struct page *page,
- int offset, size_t size, unsigned msg_flags)
+static int __send_bio(struct drbd_peer_device *peer_device, struct bio *bio, unsigned int msg_flags)
{
- struct socket *socket;
- void *addr;
+ struct drbd_connection *connection = peer_device->connection;
+ struct drbd_transport *transport = &connection->transport;
+ struct drbd_transport_ops *tr_ops = &transport->class->ops;
int err;
- socket = peer_device->connection->data.socket;
- addr = kmap(page) + offset;
- err = drbd_send_all(peer_device->connection, socket, addr, size, msg_flags);
- kunmap(page);
- if (!err)
- peer_device->device->send_cnt += size >> 9;
+ err = flush_send_buffer(connection, DATA_STREAM);
+ if (!err) {
+ err = tr_ops->send_bio(transport, bio, msg_flags);
+ if (!err)
+ peer_device->send_cnt += bio->bi_iter.bi_size >> 9;
+ }
+
return err;
}
-static int _drbd_send_page(struct drbd_peer_device *peer_device, struct page *page,
- int offset, size_t size, unsigned msg_flags)
+/* sendmsg(MSG_SPLICE_PAGES) (former (sendpage()) increases the page ref_count
+ * and hands it to the network stack. After the NIC DMA sends the data, it
+ * decreases that page's ref_count.
+ * We may not do this for protocol A, where we could complete a write operation
+ * before the network stack sends the data.
+ */
+static int
+drbd_send_bio(struct drbd_peer_device *peer_device, struct bio *bio, unsigned int msg_flags)
{
- struct socket *socket = peer_device->connection->data.socket;
- struct msghdr msg = { .msg_flags = msg_flags, };
- struct bio_vec bvec;
- int len = size;
- int err = -EIO;
+ if (drbd_disable_sendpage)
+ msg_flags &= ~MSG_SPLICE_PAGES;
- /* e.g. XFS meta- & log-data is in slab pages, which have a
- * page_count of 0 and/or have PageSlab() set.
- * we cannot use send_page for those, as that does get_page();
- * put_page(); and would cause either a VM_BUG directly, or
- * __page_cache_release a page that would actually still be referenced
- * by someone, leading to some obscure delayed Oops somewhere else. */
- if (!drbd_disable_sendpage && sendpages_ok(page, len, offset))
- msg.msg_flags |= MSG_NOSIGNAL | MSG_SPLICE_PAGES;
+ /* e.g. XFS meta- & log-data is in slab pages have !sendpage_ok(page) */
+ if (msg_flags & MSG_SPLICE_PAGES) {
+ struct bvec_iter iter;
+ struct bio_vec bvec;
- drbd_update_congested(peer_device->connection);
- do {
- int sent;
-
- bvec_set_page(&bvec, page, len, offset);
- iov_iter_bvec(&msg.msg_iter, ITER_SOURCE, &bvec, 1, len);
+ bio_for_each_segment(bvec, bio, iter) {
+ struct page *page = bvec.bv_page;
- sent = sock_sendmsg(socket, &msg);
- if (sent <= 0) {
- if (sent == -EAGAIN) {
- if (we_should_drop_the_connection(peer_device->connection, socket))
- break;
- continue;
+ if (!sendpage_ok(page)) {
+ msg_flags &= ~MSG_SPLICE_PAGES;
+ break;
}
- drbd_warn(peer_device->device, "%s: size=%d len=%d sent=%d\n",
- __func__, (int)size, len, sent);
- if (sent < 0)
- err = sent;
- break;
}
- len -= sent;
- offset += sent;
- } while (len > 0 /* THINK && device->cstate >= C_CONNECTED*/);
- clear_bit(NET_CONGESTED, &peer_device->connection->flags);
-
- if (len == 0) {
- err = 0;
- peer_device->device->send_cnt += size >> 9;
}
- return err;
-}
-
-static int _drbd_send_bio(struct drbd_peer_device *peer_device, struct bio *bio)
-{
- struct bio_vec bvec;
- struct bvec_iter iter;
- /* hint all but last page with MSG_MORE */
- bio_for_each_segment(bvec, bio, iter) {
- int err;
-
- err = _drbd_no_send_page(peer_device, bvec.bv_page,
- bvec.bv_offset, bvec.bv_len,
- bio_iter_last(bvec, iter)
- ? 0 : MSG_MORE);
- if (err)
- return err;
- }
- return 0;
+ return __send_bio(peer_device, bio, msg_flags);
}
-static int _drbd_send_zc_bio(struct drbd_peer_device *peer_device, struct bio *bio)
+static int drbd_send_ee(struct drbd_peer_device *peer_device, struct drbd_peer_request *peer_req)
{
- struct bio_vec bvec;
- struct bvec_iter iter;
+ struct bio *bio;
+ int err = 0;
- /* hint all but last page with MSG_MORE */
- bio_for_each_segment(bvec, bio, iter) {
- int err;
-
- err = _drbd_send_page(peer_device, bvec.bv_page,
- bvec.bv_offset, bvec.bv_len,
- bio_iter_last(bvec, iter) ? 0 : MSG_MORE);
+ bio_list_for_each(bio, &peer_req->bios) {
+ err = __send_bio(peer_device, bio,
+ peer_req->flags & EE_RELEASE_TO_MEMPOOL ? 0 : MSG_SPLICE_PAGES);
if (err)
- return err;
+ break;
}
- return 0;
-}
-static int _drbd_send_zc_ee(struct drbd_peer_device *peer_device,
- struct drbd_peer_request *peer_req)
-{
- bool use_sendpage = !(peer_req->flags & EE_RELEASE_TO_MEMPOOL);
- struct page *page = peer_req->pages;
- unsigned len = peer_req->i.size;
- int err;
-
- /* hint all but last page with MSG_MORE */
- page_chain_for_each(page) {
- unsigned l = min_t(unsigned, len, PAGE_SIZE);
-
- if (likely(use_sendpage))
- err = _drbd_send_page(peer_device, page, 0, l,
- page_chain_next(page) ? MSG_MORE : 0);
- else
- err = _drbd_no_send_page(peer_device, page, 0, l,
- page_chain_next(page) ? MSG_MORE : 0);
-
- if (err)
- return err;
- len -= l;
- }
- return 0;
+ return err;
}
-static u32 bio_flags_to_wire(struct drbd_connection *connection,
- struct bio *bio)
+/* see also wire_flags_to_bio() */
+static u32 bio_flags_to_wire(struct drbd_connection *connection, struct bio *bio)
{
if (connection->agreed_pro_version >= 95)
return (bio->bi_opf & REQ_SYNC ? DP_RW_SYNC : 0) |
@@ -1637,12 +2386,13 @@ static u32 bio_flags_to_wire(struct drbd_connection *connection,
(bio->bi_opf & REQ_PREFLUSH ? DP_FLUSH : 0) |
(bio_op(bio) == REQ_OP_DISCARD ? DP_DISCARD : 0) |
(bio_op(bio) == REQ_OP_WRITE_ZEROES ?
- ((connection->agreed_features & DRBD_FF_WZEROES) ?
- (DP_ZEROES |(!(bio->bi_opf & REQ_NOUNMAP) ? DP_DISCARD : 0))
- : DP_DISCARD)
- : 0);
- else
- return bio->bi_opf & REQ_SYNC ? DP_RW_SYNC : 0;
+ ((connection->agreed_features & DRBD_FF_WZEROES) ?
+ (DP_ZEROES | (!(bio->bi_opf & REQ_NOUNMAP) ? DP_DISCARD : 0))
+ : DP_DISCARD)
+ : 0);
+
+ /* else: we used to communicate one bit only in older DRBD */
+ return bio->bi_opf & REQ_SYNC ? DP_RW_SYNC : 0;
}
/* Used to send write or TRIM aka REQ_OP_DISCARD requests
@@ -1651,53 +2401,62 @@ static u32 bio_flags_to_wire(struct drbd_connection *connection,
int drbd_send_dblock(struct drbd_peer_device *peer_device, struct drbd_request *req)
{
struct drbd_device *device = peer_device->device;
- struct drbd_socket *sock;
+ struct drbd_connection *connection = peer_device->connection;
+ char *const before = connection->scratch_buffer.d.before;
+ char *const after = connection->scratch_buffer.d.after;
+ struct p_trim *trim = NULL;
struct p_data *p;
- void *digest_out;
+ void *digest_out = NULL;
unsigned int dp_flags = 0;
- int digest_size;
+ int digest_size = 0;
int err;
+ const unsigned s = req->net_rq_state[peer_device->node_id];
+ const enum req_op op = bio_op(req->master_bio);
+
+ if (op == REQ_OP_DISCARD || op == REQ_OP_WRITE_ZEROES) {
+ trim = drbd_prepare_command(peer_device, sizeof(*trim), DATA_STREAM);
+ if (!trim)
+ return -EIO;
+ p = &trim->p_data;
+ trim->size = cpu_to_be32(req->i.size);
+ } else {
+ if (connection->integrity_tfm)
+ digest_size = crypto_shash_digestsize(connection->integrity_tfm);
- sock = &peer_device->connection->data;
- p = drbd_prepare_command(peer_device, sock);
- digest_size = peer_device->connection->integrity_tfm ?
- crypto_shash_digestsize(peer_device->connection->integrity_tfm) : 0;
+ p = drbd_prepare_command(peer_device, sizeof(*p) + digest_size, DATA_STREAM);
+ if (!p)
+ return -EIO;
+ digest_out = p + 1;
+ }
- if (!p)
- return -EIO;
p->sector = cpu_to_be64(req->i.sector);
p->block_id = (unsigned long)req;
- p->seq_num = cpu_to_be32(atomic_inc_return(&device->packet_seq));
- dp_flags = bio_flags_to_wire(peer_device->connection, req->master_bio);
- if (device->state.conn >= C_SYNC_SOURCE &&
- device->state.conn <= C_PAUSED_SYNC_T)
+ p->seq_num = cpu_to_be32(atomic_inc_return(&peer_device->packet_seq));
+ dp_flags = bio_flags_to_wire(connection, req->master_bio);
+ if (peer_device->repl_state[NOW] >= L_SYNC_SOURCE && peer_device->repl_state[NOW] <= L_PAUSED_SYNC_T)
dp_flags |= DP_MAY_SET_IN_SYNC;
- if (peer_device->connection->agreed_pro_version >= 100) {
- if (req->rq_state & RQ_EXP_RECEIVE_ACK)
+ if (connection->agreed_pro_version >= 100) {
+ if (s & RQ_EXP_RECEIVE_ACK)
dp_flags |= DP_SEND_RECEIVE_ACK;
- /* During resync, request an explicit write ack,
- * even in protocol != C */
- if (req->rq_state & RQ_EXP_WRITE_ACK
- || (dp_flags & DP_MAY_SET_IN_SYNC))
+ if (s & RQ_EXP_WRITE_ACK || dp_flags & DP_MAY_SET_IN_SYNC)
dp_flags |= DP_SEND_WRITE_ACK;
}
p->dp_flags = cpu_to_be32(dp_flags);
- if (dp_flags & (DP_DISCARD|DP_ZEROES)) {
- enum drbd_packet cmd = (dp_flags & DP_ZEROES) ? P_ZEROES : P_TRIM;
- struct p_trim *t = (struct p_trim*)p;
- t->size = cpu_to_be32(req->i.size);
- err = __send_command(peer_device->connection, device->vnr, sock, cmd, sizeof(*t), NULL, 0);
+ if (trim) {
+ err = __send_command(connection, device->vnr,
+ (dp_flags & DP_ZEROES) ? P_ZEROES : P_TRIM, DATA_STREAM);
goto out;
}
- digest_out = p + 1;
- /* our digest is still only over the payload.
- * TRIM does not carry any payload. */
- if (digest_size)
- drbd_csum_bio(peer_device->connection->integrity_tfm, req->master_bio, digest_out);
- err = __send_command(peer_device->connection, device->vnr, sock, P_DATA,
- sizeof(*p) + digest_size, NULL, req->i.size);
+ if (digest_size && digest_out) {
+ WARN_ON(digest_size > sizeof(connection->scratch_buffer.d.before));
+ drbd_csum_bio(connection->integrity_tfm, req->master_bio, before);
+ memcpy(digest_out, before, digest_size);
+ }
+
+ additional_size_command(connection, DATA_STREAM, req->i.size);
+ err = __send_command(connection, device->vnr, P_DATA, DATA_STREAM);
if (!err) {
/* For protocol A, we have to memcpy the payload into
* socket buffers, as we may complete right away
@@ -1710,50 +2469,43 @@ int drbd_send_dblock(struct drbd_peer_device *peer_device, struct drbd_request *
* out ok after sending on this side, but does not fit on the
* receiving side, we sure have detected corruption elsewhere.
*/
- if (!(req->rq_state & (RQ_EXP_RECEIVE_ACK | RQ_EXP_WRITE_ACK)) || digest_size)
- err = _drbd_send_bio(peer_device, req->master_bio);
- else
- err = _drbd_send_zc_bio(peer_device, req->master_bio);
+ bool proto_b_or_c = (s & (RQ_EXP_RECEIVE_ACK | RQ_EXP_WRITE_ACK));
+ int msg_flags = proto_b_or_c && !digest_size ? MSG_SPLICE_PAGES : 0;
+
+ err = drbd_send_bio(peer_device, req->master_bio, msg_flags);
/* double check digest, sometimes buffers have been modified in flight. */
- if (digest_size > 0 && digest_size <= 64) {
- /* 64 byte, 512 bit, is the largest digest size
- * currently supported in kernel crypto. */
- unsigned char digest[64];
- drbd_csum_bio(peer_device->connection->integrity_tfm, req->master_bio, digest);
- if (memcmp(p + 1, digest, digest_size)) {
+ if (digest_size > 0) {
+ drbd_csum_bio(connection->integrity_tfm, req->master_bio, after);
+ if (memcmp(before, after, digest_size)) {
drbd_warn(device,
"Digest mismatch, buffer modified by upper layers during write: %llus +%u\n",
(unsigned long long)req->i.sector, req->i.size);
}
- } /* else if (digest_size > 64) {
- ... Be noisy about digest too large ...
- } */
+ }
}
out:
- mutex_unlock(&sock->mutex); /* locked by drbd_prepare_command() */
+ mutex_unlock(&connection->mutex[DATA_STREAM]);
return err;
}
/* answer packet, used to send data back for read requests:
* Peer -> (diskless) R_PRIMARY (P_DATA_REPLY)
- * C_SYNC_SOURCE -> C_SYNC_TARGET (P_RS_DATA_REPLY)
+ * L_SYNC_SOURCE -> L_SYNC_TARGET (P_RS_DATA_REPLY)
*/
int drbd_send_block(struct drbd_peer_device *peer_device, enum drbd_packet cmd,
struct drbd_peer_request *peer_req)
{
- struct drbd_device *device = peer_device->device;
- struct drbd_socket *sock;
+ struct drbd_connection *connection = peer_device->connection;
struct p_data *p;
int err;
int digest_size;
- sock = &peer_device->connection->data;
- p = drbd_prepare_command(peer_device, sock);
+ digest_size = connection->integrity_tfm ?
+ crypto_shash_digestsize(connection->integrity_tfm) : 0;
- digest_size = peer_device->connection->integrity_tfm ?
- crypto_shash_digestsize(peer_device->connection->integrity_tfm) : 0;
+ p = drbd_prepare_command(peer_device, sizeof(*p) + digest_size, DATA_STREAM);
if (!p)
return -EIO;
@@ -1761,314 +2513,721 @@ int drbd_send_block(struct drbd_peer_device *peer_device, enum drbd_packet cmd,
p->block_id = peer_req->block_id;
p->seq_num = 0; /* unused */
p->dp_flags = 0;
+
+ /* Older peers expect block_id for P_RS_DATA_REPLY to be ID_SYNCER. */
+ if (connection->agreed_pro_version < 122 && cmd == P_RS_DATA_REPLY)
+ p->block_id = ID_SYNCER;
+
if (digest_size)
- drbd_csum_ee(peer_device->connection->integrity_tfm, peer_req, p + 1);
- err = __send_command(peer_device->connection, device->vnr, sock, cmd, sizeof(*p) + digest_size, NULL, peer_req->i.size);
+ drbd_csum_bios(connection->integrity_tfm, &peer_req->bios, p + 1);
+ additional_size_command(connection, DATA_STREAM, peer_req->i.size);
+ err = __send_command(connection,
+ peer_device->device->vnr, cmd, DATA_STREAM);
if (!err)
- err = _drbd_send_zc_ee(peer_device, peer_req);
- mutex_unlock(&sock->mutex); /* locked by drbd_prepare_command() */
+ err = drbd_send_ee(peer_device, peer_req);
+ mutex_unlock(&connection->mutex[DATA_STREAM]);
return err;
}
-int drbd_send_out_of_sync(struct drbd_peer_device *peer_device, struct drbd_request *req)
+int drbd_send_out_of_sync(struct drbd_peer_device *peer_device, sector_t sector, unsigned int size)
{
- struct drbd_socket *sock;
struct p_block_desc *p;
- sock = &peer_device->connection->data;
- p = drbd_prepare_command(peer_device, sock);
+ p = drbd_prepare_command(peer_device, sizeof(*p), DATA_STREAM);
if (!p)
return -EIO;
- p->sector = cpu_to_be64(req->i.sector);
- p->blksize = cpu_to_be32(req->i.size);
- return drbd_send_command(peer_device, sock, P_OUT_OF_SYNC, sizeof(*p), NULL, 0);
+ p->sector = cpu_to_be64(sector);
+ p->blksize = cpu_to_be32(size);
+ return drbd_send_command(peer_device, P_OUT_OF_SYNC, DATA_STREAM);
}
-/*
- drbd_send distinguishes two cases:
+int drbd_send_dagtag(struct drbd_connection *connection, u64 dagtag)
+{
+ struct p_dagtag *p;
- Packets sent via the data socket "sock"
- and packets sent via the meta data socket "msock"
+ if (connection->agreed_pro_version < 110)
+ return 0;
- sock msock
- -----------------+-------------------------+------------------------------
- timeout conf.timeout / 2 conf.timeout / 2
- timeout action send a ping via msock Abort communication
- and close all sockets
-*/
+ p = conn_prepare_command(connection, sizeof(*p), DATA_STREAM);
+ if (!p)
+ return -EIO;
+ p->dagtag = cpu_to_be64(dagtag);
+ return send_command(connection, -1, P_DAGTAG, DATA_STREAM);
+}
-/*
- * you must have down()ed the appropriate [m]sock_mutex elsewhere!
- */
-int drbd_send(struct drbd_connection *connection, struct socket *sock,
- void *buf, size_t size, unsigned msg_flags)
+/* primary_peer_present_and_not_two_primaries_allowed() */
+static bool primary_peer_present(struct drbd_resource *resource)
{
- struct kvec iov = {.iov_base = buf, .iov_len = size};
- struct msghdr msg = {.msg_flags = msg_flags | MSG_NOSIGNAL};
- int rv, sent = 0;
+ struct drbd_connection *connection;
+ struct net_conf *nc;
+ bool two_primaries, rv = false;
- if (!sock)
- return -EBADR;
+ rcu_read_lock();
+ for_each_connection_rcu(connection, resource) {
+ nc = rcu_dereference(connection->transport.net_conf);
+ two_primaries = nc ? nc->two_primaries : false;
- /* THINK if (signal_pending) return ... ? */
+ if (connection->peer_role[NOW] == R_PRIMARY && !two_primaries) {
+ rv = true;
+ break;
+ }
+ }
+ rcu_read_unlock();
- iov_iter_kvec(&msg.msg_iter, ITER_SOURCE, &iov, 1, size);
+ return rv;
+}
- if (sock == connection->data.socket) {
- rcu_read_lock();
- connection->ko_count = rcu_dereference(connection->net_conf)->ko_count;
- rcu_read_unlock();
- drbd_update_congested(connection);
- }
- do {
- rv = sock_sendmsg(sock, &msg);
- if (rv == -EAGAIN) {
- if (we_should_drop_the_connection(connection, sock))
+static bool any_disk_is_uptodate(struct drbd_device *device)
+{
+ bool ret = false;
+
+ rcu_read_lock();
+ if (device->disk_state[NOW] == D_UP_TO_DATE)
+ ret = true;
+ else {
+ struct drbd_peer_device *peer_device;
+
+ for_each_peer_device_rcu(peer_device, device) {
+ if (peer_device->disk_state[NOW] == D_UP_TO_DATE) {
+ ret = true;
break;
- else
- continue;
- }
- if (rv == -EINTR) {
- flush_signals(current);
- rv = 0;
+ }
}
- if (rv < 0)
- break;
- sent += rv;
- } while (sent < size);
-
- if (sock == connection->data.socket)
- clear_bit(NET_CONGESTED, &connection->flags);
-
- if (rv <= 0) {
- if (rv != -EAGAIN) {
- drbd_err(connection, "%s_sendmsg returned %d\n",
- sock == connection->meta.socket ? "msock" : "sock",
- rv);
- conn_request_state(connection, NS(conn, C_BROKEN_PIPE), CS_HARD);
- } else
- conn_request_state(connection, NS(conn, C_TIMEOUT), CS_HARD);
}
+ rcu_read_unlock();
- return sent;
+ return ret;
}
-/*
- * drbd_send_all - Send an entire buffer
- *
- * Returns 0 upon success and a negative error value otherwise.
+/* If we are trying to (re-)establish some connection,
+ * it may be useful to re-try the conditions in drbd_open().
+ * But if we have no connection at all (yet/anymore),
+ * or are disconnected and not trying to (re-)establish,
+ * or are established already, retrying won't help at all.
+ * Asking the same peer(s) the same question
+ * is unlikely to change their answer.
+ * Almost always triggered by udev (and the configured probes) while bringing
+ * the resource "up", just after "new-minor", even before "attach" or any
+ * "peers"/"paths" are configured.
*/
-int drbd_send_all(struct drbd_connection *connection, struct socket *sock, void *buffer,
- size_t size, unsigned msg_flags)
+static bool connection_state_may_improve_soon(struct drbd_resource *resource)
{
- int err;
-
- err = drbd_send(connection, sock, buffer, size, msg_flags);
- if (err < 0)
- return err;
- if (err != size)
- return -EIO;
- return 0;
+ struct drbd_connection *connection;
+ bool ret = false;
+ rcu_read_lock();
+ for_each_connection_rcu(connection, resource) {
+ enum drbd_conn_state cstate = connection->cstate[NOW];
+ if (C_DISCONNECTING < cstate && cstate < C_CONNECTED) {
+ ret = true;
+ break;
+ }
+ }
+ rcu_read_unlock();
+ return ret;
}
-static int drbd_open(struct gendisk *disk, blk_mode_t mode)
+/* TASK_COMM_LEN reserves one '\0', sizeof("") both include '\0',
+ * that's room enough for ':' and ' ' separators and the EOS.
+ */
+union comm_pid_tag_buf {
+ char comm[TASK_COMM_LEN];
+ char buf[TASK_COMM_LEN + sizeof("2147483647") + sizeof("auto-promote")];
+};
+
+static void snprintf_current_comm_pid_tag(union comm_pid_tag_buf *s, const char *tag)
{
- struct drbd_device *device = disk->private_data;
- unsigned long flags;
- int rv = 0;
+ int len;
- mutex_lock(&drbd_main_mutex);
- spin_lock_irqsave(&device->resource->req_lock, flags);
- /* to have a stable device->state.role
- * and no race with updating open_cnt */
+ get_task_comm(s->comm, current);
+ len = strlen(s->buf);
+ snprintf(s->buf + len, sizeof(s->buf)-len, ":%d %s", task_pid_nr(current), tag);
+}
- if (device->state.role != R_PRIMARY) {
- if (mode & BLK_OPEN_WRITE)
- rv = -EROFS;
- else if (!drbd_allow_oos)
- rv = -EMEDIUMTYPE;
- }
+static int try_to_promote(struct drbd_device *device, long timeout, bool ndelay)
+{
+ struct drbd_resource *resource = device->resource;
+ int rv;
- if (!rv)
- device->open_cnt++;
- spin_unlock_irqrestore(&device->resource->req_lock, flags);
- mutex_unlock(&drbd_main_mutex);
+ do {
+ union comm_pid_tag_buf tag;
+ unsigned long start = jiffies;
+ long t;
+ snprintf_current_comm_pid_tag(&tag, "auto-promote");
+ rv = drbd_set_role(resource, R_PRIMARY, false, tag.buf, NULL);
+ timeout -= jiffies - start;
+
+ if (ndelay || rv >= SS_SUCCESS || timeout <= 0) {
+ break;
+ } else if (rv == SS_CW_FAILED_BY_PEER) {
+ /* Probably udev has it open read-only on one of the peers;
+ since commit cbcbb50a65 from 2017 it waits on the peer;
+ retry only if the timeout permits */
+ if (jiffies - start < HZ / 10) {
+ t = schedule_timeout_interruptible(HZ / 10);
+ if (t)
+ break;
+ timeout -= HZ / 10;
+ }
+ } else if (rv == SS_TWO_PRIMARIES) {
+ /* Wait till the peer demoted itself */
+ t = wait_event_interruptible_timeout(resource->state_wait,
+ resource->role[NOW] == R_PRIMARY ||
+ (!primary_peer_present(resource) && any_disk_is_uptodate(device)),
+ timeout);
+ if (t <= 0)
+ break;
+ timeout -= t;
+ } else if (rv == SS_NO_UP_TO_DATE_DISK && connection_state_may_improve_soon(resource)) {
+ /* Wait until we get a connection established */
+ t = wait_event_interruptible_timeout(resource->state_wait,
+ any_disk_is_uptodate(device), timeout);
+ if (t <= 0)
+ break;
+ timeout -= t;
+ } else {
+ break;
+ }
+ } while (timeout > 0);
return rv;
}
-static void drbd_release(struct gendisk *gd)
+static int ro_open_cond(struct drbd_device *device)
{
- struct drbd_device *device = gd->private_data;
+ struct drbd_resource *resource = device->resource;
- mutex_lock(&drbd_main_mutex);
- device->open_cnt--;
- mutex_unlock(&drbd_main_mutex);
+ if (!device->have_quorum[NOW])
+ return -ENODATA;
+ else if (resource->role[NOW] != R_PRIMARY &&
+ primary_peer_present(resource) && !drbd_allow_oos)
+ return -EMEDIUMTYPE;
+ else if (any_disk_is_uptodate(device))
+ return 0;
+ else if (connection_state_may_improve_soon(resource))
+ return -EAGAIN;
+ else
+ return -ENODATA;
}
-/* need to hold resource->req_lock */
-void drbd_queue_unplug(struct drbd_device *device)
+enum ioc_rv {
+ IOC_SLEEP = 0,
+ IOC_OK = 1,
+ IOC_ABORT = 2,
+};
+
+/* If we are in the middle of a cluster wide state change, we don't want
+ * to change (open_cnt == 0), as that then could cause a failure to commit
+ * some already promised peer auto-promote locally.
+ * So we wait until the pending remote_state_change is finalized,
+ * or give up when the timeout is reached.
+ *
+ * But we don't want to fail an open on a Primary just because it happens
+ * during some unrelated remote state change.
+ * If we are already Primary, or already have an open count != 0,
+ * we don't need to wait, it won't change anything.
+ */
+static enum ioc_rv inc_open_count(struct drbd_device *device, blk_mode_t mode)
{
- if (device->state.pdsk >= D_INCONSISTENT && device->state.conn >= C_CONNECTED) {
- D_ASSERT(device, device->state.role == R_PRIMARY);
- if (test_and_clear_bit(UNPLUG_REMOTE, &device->flags)) {
- drbd_queue_work_if_unqueued(
- &first_peer_device(device)->connection->sender_work,
- &device->unplug_work);
- }
+ struct drbd_resource *resource = device->resource;
+ enum ioc_rv r;
+
+ if (test_bit(DOWN_IN_PROGRESS, &resource->flags))
+ return IOC_ABORT;
+
+ read_lock_irq(&resource->state_rwlock);
+ if (test_bit(UNREGISTERED, &device->flags))
+ r = IOC_ABORT;
+ else if (resource->remote_state_change &&
+ resource->role[NOW] != R_PRIMARY &&
+ (device->open_cnt == 0 || mode & BLK_OPEN_WRITE)) {
+ if (mode & BLK_OPEN_NDELAY)
+ r = IOC_ABORT;
+ else
+ r = IOC_SLEEP;
+ } else {
+ r = IOC_OK;
+ device->open_cnt++;
+ if (mode & BLK_OPEN_WRITE)
+ device->writable = true;
}
-}
+ read_unlock_irq(&resource->state_rwlock);
-static void drbd_set_defaults(struct drbd_device *device)
-{
- /* Beware! The actual layout differs
- * between big endian and little endian */
- device->state = (union drbd_dev_state) {
- { .role = R_SECONDARY,
- .peer = R_UNKNOWN,
- .conn = C_STANDALONE,
- .disk = D_DISKLESS,
- .pdsk = D_UNKNOWN,
- } };
+ return r;
}
-void drbd_init_set_defaults(struct drbd_device *device)
+static void __prune_or_free_openers(struct drbd_device *device, pid_t pid)
{
- /* the memset(,0,) did most of this.
- * note: only assignments, no allocation in here */
+ struct opener *pos, *tmp;
- drbd_set_defaults(device);
+ list_for_each_entry_safe(pos, tmp, &device->openers, list) {
+ // if pid == 0, i.e., counts were 0, delete all entries, else the matching one
+ if (pid == 0 || pid == pos->pid) {
+ dynamic_drbd_dbg(device, "%sopeners del: %s(%d)\n", pid == 0 ? "" : "all ",
+ pos->comm, pos->pid);
+ list_del(&pos->list);
+ kfree(pos);
- atomic_set(&device->ap_bio_cnt, 0);
- atomic_set(&device->ap_actlog_cnt, 0);
- atomic_set(&device->ap_pending_cnt, 0);
- atomic_set(&device->rs_pending_cnt, 0);
- atomic_set(&device->unacked_cnt, 0);
- atomic_set(&device->local_cnt, 0);
- atomic_set(&device->pp_in_use_by_net, 0);
- atomic_set(&device->rs_sect_in, 0);
- atomic_set(&device->rs_sect_ev, 0);
- atomic_set(&device->ap_in_flight, 0);
- atomic_set(&device->md_io.in_use, 0);
+ /* in case we remove a real process, stop here, there might be multiple openers with the same pid */
+ /* this assumes that the oldest opener with the same pid releases first. "as good as it gets" */
+ if (pid != 0)
+ break;
+ }
+ }
+}
- mutex_init(&device->own_state_mutex);
- device->state_mutex = &device->own_state_mutex;
+static void free_openers(struct drbd_device *device)
+{
+ __prune_or_free_openers(device, 0);
+}
- spin_lock_init(&device->al_lock);
- spin_lock_init(&device->peer_seq_lock);
-
- INIT_LIST_HEAD(&device->active_ee);
- INIT_LIST_HEAD(&device->sync_ee);
- INIT_LIST_HEAD(&device->done_ee);
- INIT_LIST_HEAD(&device->read_ee);
- INIT_LIST_HEAD(&device->resync_reads);
- INIT_LIST_HEAD(&device->resync_work.list);
- INIT_LIST_HEAD(&device->unplug_work.list);
- INIT_LIST_HEAD(&device->bm_io_work.w.list);
- INIT_LIST_HEAD(&device->pending_master_completion[0]);
- INIT_LIST_HEAD(&device->pending_master_completion[1]);
- INIT_LIST_HEAD(&device->pending_completion[0]);
- INIT_LIST_HEAD(&device->pending_completion[1]);
+static void prune_or_free_openers(struct drbd_device *device, pid_t pid)
+{
+ spin_lock(&device->openers_lock);
+ __prune_or_free_openers(device, pid);
+ spin_unlock(&device->openers_lock);
+}
- device->resync_work.cb = w_resync_timer;
- device->unplug_work.cb = w_send_write_hint;
- device->bm_io_work.w.cb = w_bitmap_io;
+static void add_opener(struct drbd_device *device, bool did_auto_promote)
+{
+ struct opener *opener, *tmp;
+ ktime_t now = ktime_get_real();
+ int len = 0;
- timer_setup(&device->resync_timer, resync_timer_fn, 0);
- timer_setup(&device->md_sync_timer, md_sync_timer_fn, 0);
- timer_setup(&device->start_resync_timer, start_resync_timer_fn, 0);
- timer_setup(&device->request_timer, request_timer_fn, 0);
+ if (did_auto_promote) {
+ struct drbd_resource *resource = device->resource;
- init_waitqueue_head(&device->misc_wait);
- init_waitqueue_head(&device->state_wait);
- init_waitqueue_head(&device->ee_wait);
- init_waitqueue_head(&device->al_wait);
- init_waitqueue_head(&device->seq_wait);
+ resource->auto_promoted_by.minor = device->minor;
+ resource->auto_promoted_by.pid = task_pid_nr(current);
+ resource->auto_promoted_by.opened = now;
+ get_task_comm(resource->auto_promoted_by.comm, current);
+ }
+ opener = kmalloc_obj(*opener, GFP_NOIO);
+ if (!opener)
+ return;
+ get_task_comm(opener->comm, current);
+ opener->pid = task_pid_nr(current);
+ opener->opened = now;
+
+ spin_lock(&device->openers_lock);
+ list_for_each_entry(tmp, &device->openers, list)
+ if (++len > 100) { /* 100 ought to be enough for everybody */
+ dynamic_drbd_dbg(device, "openers: list full, do not add new opener\n");
+ kfree(opener);
+ goto out;
+ }
- device->resync_wenr = LC_FREE;
- device->peer_max_bio_size = DRBD_MAX_BIO_SIZE_SAFE;
- device->local_max_bio_size = DRBD_MAX_BIO_SIZE_SAFE;
+ list_add(&opener->list, &device->openers);
+ dynamic_drbd_dbg(device, "openers add: %s(%d)\n", opener->comm, opener->pid);
+out:
+ spin_unlock(&device->openers_lock);
}
-void drbd_set_my_capacity(struct drbd_device *device, sector_t size)
+static int drbd_open(struct gendisk *gd, blk_mode_t mode)
{
- char ppb[10];
+ struct drbd_device *device = gd->private_data;
+ struct drbd_resource *resource = device->resource;
+ long timeout = resource->res_opts.auto_promote_timeout * HZ / 10;
+ enum drbd_state_rv rv = SS_UNKNOWN_ERROR;
+ bool was_writable;
+ enum ioc_rv r;
+ int err = 0;
+
+ /* Fail read-only open from systemd-udev (version <= 238) */
+ if (!(mode & BLK_OPEN_WRITE) && !drbd_allow_oos) {
+ char comm[TASK_COMM_LEN];
+ get_task_comm(comm, current);
+ if (!strcmp("systemd-udevd", comm))
+ return -EACCES;
+ }
- set_capacity_and_notify(device->vdisk, size);
+ /* Fail read-write open early,
+ * in case someone explicitly set us read-only (blockdev --setro) */
+ if (bdev_read_only(gd->part0) && (mode & BLK_OPEN_WRITE))
+ return -EACCES;
- drbd_info(device, "size = %s (%llu KB)\n",
- ppsize(ppb, size>>1), (unsigned long long)size>>1);
-}
+ if (resource->fail_io[NOW])
+ return -ENOTRECOVERABLE;
-void drbd_device_cleanup(struct drbd_device *device)
-{
- int i;
- if (first_peer_device(device)->connection->receiver.t_state != NONE)
- drbd_err(device, "ASSERT FAILED: receiver t_state == %d expected 0.\n",
- first_peer_device(device)->connection->receiver.t_state);
-
- device->al_writ_cnt =
- device->bm_writ_cnt =
- device->read_cnt =
- device->recv_cnt =
- device->send_cnt =
- device->writ_cnt =
- device->p_size =
- device->rs_start =
- device->rs_total =
- device->rs_failed = 0;
- device->rs_last_events = 0;
- device->rs_last_sect_ev = 0;
- for (i = 0; i < DRBD_SYNC_MARKS; i++) {
- device->rs_mark_left[i] = 0;
- device->rs_mark_time[i] = 0;
- }
- D_ASSERT(device, first_peer_device(device)->connection->net_conf == NULL);
-
- set_capacity_and_notify(device->vdisk, 0);
- if (device->bitmap) {
- /* maybe never allocated. */
- drbd_bm_resize(device, 0, 1);
- drbd_bm_cleanup(device);
+ kref_get(&device->kref);
+
+ mutex_lock(&resource->open_release);
+ was_writable = device->writable;
+
+ timeout = wait_event_interruptible_timeout(resource->twopc_wait,
+ (r = inc_open_count(device, mode)),
+ timeout);
+
+ if (r == IOC_ABORT || (r == IOC_SLEEP && timeout <= 0)) {
+ mutex_unlock(&resource->open_release);
+
+ kref_put(&device->kref, drbd_destroy_device);
+ return -EAGAIN;
}
- drbd_backing_dev_free(device, device->ldev);
- device->ldev = NULL;
+ if (resource->res_opts.auto_promote) {
+ /* Allow opening in read-only mode on an unconnected secondary.
+ This avoids split brain when the drbd volume gets opened
+ temporarily by udev while it scans for PV signatures. */
+
+ if (mode & BLK_OPEN_WRITE) {
+ if (resource->role[NOW] == R_SECONDARY) {
+ rv = try_to_promote(device, timeout, (mode & BLK_OPEN_NDELAY));
+ if (rv < SS_SUCCESS)
+ drbd_info(resource, "Auto-promote failed: %s (%d)\n",
+ drbd_set_st_err_str(rv), rv);
+ }
+ } else if ((mode & BLK_OPEN_NDELAY) == 0) {
+ /* Double check peers
+ *
+ * Some services may try to first open ro, and only if that
+ * works open rw. An attempt to failover immediately after
+ * primary crash, before DRBD has noticed that the primary peer
+ * is gone, would result in open failure, thus failure to take
+ * over services. */
+ err = ro_open_cond(device);
+ if (err == -EMEDIUMTYPE) {
+ drbd_check_peers(resource);
+ err = -EAGAIN;
+ }
+ if (err == -EAGAIN) {
+ wait_event_interruptible_timeout(resource->state_wait,
+ ro_open_cond(device) != -EAGAIN,
+ resource->res_opts.auto_promote_timeout * HZ / 10);
+ }
+ }
+ } else if (resource->role[NOW] != R_PRIMARY &&
+ !(mode & BLK_OPEN_WRITE) && !drbd_allow_oos) {
+ err = -EMEDIUMTYPE;
+ goto out;
+ }
- clear_bit(AL_SUSPENDED, &device->flags);
+ if (test_bit(UNREGISTERED, &device->flags)) {
+ err = -ENODEV;
+ } else if (mode & BLK_OPEN_WRITE) {
+ if (resource->role[NOW] != R_PRIMARY)
+ err = rv == SS_INTERRUPTED ? -ERESTARTSYS : -EROFS;
+ } else /* READ access only */ {
+ err = ro_open_cond(device);
+ }
+out:
+ /* still keep mutex, but release ASAP */
+ if (!err) {
+ add_opener(device, rv >= SS_SUCCESS);
+ /* Only interested in first open and last close. */
+ if (device->open_cnt == 1) {
+ struct device_info info;
+
+ device_to_info(&info, device);
+ mutex_lock(¬ification_mutex);
+ notify_device_state(NULL, 0, device, &info, NOTIFY_CHANGE);
+ mutex_unlock(¬ification_mutex);
+ }
+ } else
+ device->writable = was_writable;
- D_ASSERT(device, list_empty(&device->active_ee));
- D_ASSERT(device, list_empty(&device->sync_ee));
- D_ASSERT(device, list_empty(&device->done_ee));
- D_ASSERT(device, list_empty(&device->read_ee));
- D_ASSERT(device, list_empty(&device->resync_reads));
- D_ASSERT(device, list_empty(&first_peer_device(device)->connection->sender_work.q));
- D_ASSERT(device, list_empty(&device->resync_work.list));
- D_ASSERT(device, list_empty(&device->unplug_work.list));
+ mutex_unlock(&resource->open_release);
+ if (err) {
+ drbd_release(gd);
+ if (err == -EAGAIN && !(mode & BLK_OPEN_NDELAY))
+ err = -EMEDIUMTYPE;
+ }
- drbd_set_defaults(device);
+ return err;
}
+void drbd_open_counts(struct drbd_resource *resource, int *rw_count_ptr, int *ro_count_ptr)
+{
+ struct drbd_device *device;
+ int vnr, rw_count = 0, ro_count = 0;
+
+ rcu_read_lock();
+ idr_for_each_entry(&resource->devices, device, vnr) {
+ if (device->writable)
+ rw_count += device->open_cnt;
+ else
+ ro_count += device->open_cnt;
+ }
+ rcu_read_unlock();
+ *rw_count_ptr = rw_count;
+ *ro_count_ptr = ro_count;
+}
-static void drbd_destroy_mempools(void)
+static void wait_for_peer_disk_updates(struct drbd_resource *resource)
+{
+ struct drbd_peer_device *peer_device;
+ struct drbd_device *device;
+ int vnr;
+
+restart:
+ rcu_read_lock();
+ idr_for_each_entry(&resource->devices, device, vnr) {
+ for_each_peer_device_rcu(peer_device, device) {
+ if (test_bit(GOT_NEG_ACK, &peer_device->flags)) {
+ clear_bit(GOT_NEG_ACK, &peer_device->flags);
+ rcu_read_unlock();
+ wait_event(resource->state_wait, peer_device->disk_state[NOW] < D_UP_TO_DATE);
+ goto restart;
+ }
+ }
+ }
+ rcu_read_unlock();
+}
+
+static void drbd_fsync_device(struct drbd_device *device)
+{
+ struct drbd_resource *resource = device->resource;
+
+ sync_blockdev(device->vdisk->part0);
+ /* Prevent writes occurring after demotion, at least
+ * the writes already submitted in this context. This
+ * covers the case where DRBD auto-demotes on release,
+ * which is important because it often occurs
+ * immediately after a write. */
+ wait_event(device->misc_wait, !atomic_read(&device->ap_bio_cnt[WRITE]));
+
+ if (start_new_tl_epoch(resource)) {
+ struct drbd_connection *connection;
+ u64 im;
+
+ for_each_connection_ref(connection, im, resource)
+ drbd_flush_workqueue(&connection->sender_work);
+ }
+ wait_event(resource->barrier_wait, !barrier_pending(resource));
+ /* After waiting for pending barriers, we got any possible NEG_ACKs,
+ and see them in wait_for_peer_disk_updates() */
+ wait_for_peer_disk_updates(resource);
+
+ /* In case switching from R_PRIMARY to R_SECONDARY works
+ out, there is no rw opener at this point. Thus, no new
+ writes can come in. -> Flushing queued peer acks is
+ necessary and sufficient.
+ The cluster wide role change required packets to be
+ received by the sender. -> We can be sure that the
+ peer_acks queued on a sender's TODO list go out before
+ we send the two phase commit packet.
+ */
+ drbd_flush_peer_acks(resource);
+}
+
+static void drbd_release(struct gendisk *gd)
+{
+ struct drbd_device *device = gd->private_data;
+ struct drbd_resource *resource = device->resource;
+ int open_rw_cnt, open_ro_cnt;
+
+ mutex_lock(&resource->open_release);
+ /* The last one to close already called sync_blockdevice(), generic
+ * bdev_release() respectively blkdev_put_whole() takes care of that.
+ * We still want our side effects of drbd_fsync_device():
+ * wait until all peers confirmed they have all the data, regardless of
+ * replication protocol, even if that is asynchronous.
+ * Still, do it before decreasing the open_cnt, just in case, so we
+ * won't confuse drbd_reject_write_early() or other code paths that may
+ * check for open_cnt != 0 when they see write requests.
+ */
+ if (device->writable && device->open_cnt == 1) {
+ drbd_fsync_device(device);
+ device->writable = false;
+ }
+ device->open_cnt--;
+ drbd_open_counts(resource, &open_rw_cnt, &open_ro_cnt);
+
+ if (open_ro_cnt == 0)
+ wake_up_all(&resource->state_wait);
+
+ if (test_bit(UNREGISTERED, &device->flags) && device->open_cnt == 0 &&
+ !test_and_set_bit(DESTROYING_DEV, &device->flags))
+ call_rcu(&device->rcu, drbd_reclaim_device);
+
+ if (resource->res_opts.auto_promote &&
+ open_rw_cnt == 0 &&
+ resource->role[NOW] == R_PRIMARY &&
+ !test_bit(EXPLICIT_PRIMARY, &resource->flags)) {
+ union comm_pid_tag_buf tag;
+ sigset_t mask, oldmask;
+ int rv;
+
+ snprintf_current_comm_pid_tag(&tag, "auto-demote");
+
+ /*
+ * Auto-demote is triggered by the last opener releasing the
+ * DRBD device. However, it is an implicit action, so it should
+ * not be affected by the state of the process. In particular,
+ * it should ignore any pending signals. It may be the case
+ * that the process is releasing DRBD because it is being
+ * terminated using a signal.
+ */
+ sigfillset(&mask);
+ sigprocmask(SIG_BLOCK, &mask, &oldmask);
+
+ rv = drbd_set_role(resource, R_SECONDARY, false, tag.buf, NULL);
+ if (rv < SS_SUCCESS)
+ drbd_warn(resource, "Auto-demote failed: %s (%d)\n",
+ drbd_set_st_err_str(rv), rv);
+
+ sigprocmask(SIG_SETMASK, &oldmask, NULL);
+ }
+
+ if (open_ro_cnt == 0 && open_rw_cnt == 0 && resource->fail_io[NOW]) {
+ unsigned long irq_flags;
+
+ begin_state_change(resource, &irq_flags, CS_VERBOSE);
+ resource->fail_io[NEW] = false;
+ end_state_change(resource, &irq_flags, "release");
+ }
+
+ /* if the open count is 0, we free the whole list, otherwise we remove the specific pid */
+ prune_or_free_openers(device, (device->open_cnt == 0) ? 0 : task_pid_nr(current));
+ if (open_rw_cnt == 0 && open_ro_cnt == 0 && resource->auto_promoted_by.pid != 0)
+ memset(&resource->auto_promoted_by, 0, sizeof(resource->auto_promoted_by));
+ if (device->open_cnt == 0) {
+ struct device_info info;
+
+ device_to_info(&info, device);
+ mutex_lock(¬ification_mutex);
+ notify_device_state(NULL, 0, device, &info, NOTIFY_CHANGE);
+ mutex_unlock(¬ification_mutex);
+ }
+ mutex_unlock(&resource->open_release);
+
+ kref_put(&device->kref, drbd_destroy_device); /* might destroy the resource as well */
+}
+
+static void drbd_remove_all_paths(struct drbd_connection *connection)
+{
+ struct drbd_resource *resource = connection->resource;
+ struct drbd_transport *transport = &connection->transport;
+ struct drbd_path *path, *tmp;
+
+ lockdep_assert_held(&resource->conf_update);
+
+ list_for_each_entry(path, &transport->paths, list)
+ set_bit(TR_UNREGISTERED, &path->flags);
+
+ /* Ensure flag visible before list manipulation. */
+ smp_wmb();
+
+ list_for_each_entry_safe(path, tmp, &transport->paths, list) {
+ /* Exclusive with reading state, in particular remember_state_change() */
+ write_lock_irq(&resource->state_rwlock);
+ list_del_rcu(&path->list);
+ write_unlock_irq(&resource->state_rwlock);
+
+ transport->class->ops.remove_path(path);
+ notify_path(connection, path, NOTIFY_DESTROY);
+ call_rcu(&path->rcu, drbd_reclaim_path);
+ }
+}
+
+/** __drbd_net_exit is called when a network namespace is removed.
+ *
+ * For DRBD this means it needs to remove any sockets assigned to that namespace,
+ * i.e. it needs to disconnect some connections. It also needs to remove those
+ * paths associated with the to be removed namespace, so the connection can be
+ * reconfigured from a new namespace.
+ */
+static void __net_exit __drbd_net_exit(struct net *net)
+{
+ struct drbd_resource *resource;
+ struct drbd_connection *connection, *n;
+ enum drbd_state_rv rv;
+ LIST_HEAD(connections_wait_list);
+
+ /* Disconnect and removal of paths works in 3 steps:
+ * 1. Find all connections associated with the namespace, add it to a separate list.
+ * 2. Iterate over all connections in the new list and start the disconnect.
+ * 3. Iterate again over all connections, waiting for them to disconnect and remove the path configuration.*/
+
+ /* Step 1 */
+ rcu_read_lock();
+ for_each_resource_rcu(resource, &drbd_resources) {
+ for_each_connection_rcu(connection, resource) {
+ /* We don't have to worry about any races here:
+ * For a connection to be "missed", it would need to be configured
+ * from the namespace to be removed. Since netlink does keep the
+ * namespace alive for the duration of it's connection, we can
+ * assume the namespace assignment can no longer be changed. */
+ if (net_eq(net, drbd_net_assigned_to_connection(connection))) {
+ drbd_info(connection, "Disconnect because network namespace is exiting\n");
+
+ kref_get(&connection->kref);
+
+ list_add(&connection->remove_net_list, &connections_wait_list);
+ }
+ }
+ }
+ rcu_read_unlock();
+
+ /* Step 2 */
+ list_for_each_entry(connection, &connections_wait_list, remove_net_list) {
+ /* We just start the disconnect here. We have to use force=true here,
+ * otherwise the disconnect might fail waiting for some openers to disappear.
+ *
+ * Actually waiting for the disconnect is relegated to step 3, so we disconnect
+ * in parallel. */
+ rv = change_cstate(connection, C_DISCONNECTING, CS_HARD);
+ if (rv < SS_SUCCESS && rv != SS_ALREADY_STANDALONE)
+ drbd_err(connection, "Failed to disconnect: %s\n", drbd_set_st_err_str(rv));
+ }
+
+ /* Step 3 */
+ list_for_each_entry_safe(connection, n, &connections_wait_list, remove_net_list) {
+ list_del_init(&connection->remove_net_list);
+
+ /* Wait here for StandAlone: a path can only be removed if it's not established */
+ wait_event(connection->resource->state_wait, connection->cstate[NOW] == C_STANDALONE);
+
+ mutex_lock(&connection->resource->adm_mutex);
+ mutex_lock(&connection->resource->conf_update);
+ drbd_remove_all_paths(connection);
+ mutex_unlock(&connection->resource->conf_update);
+ mutex_unlock(&connection->resource->adm_mutex);
+
+ kref_put(&connection->kref, drbd_destroy_connection);
+ }
+}
+
+void drbd_queue_unplug(struct drbd_device *device)
+{
+ struct drbd_resource *resource = device->resource;
+ struct drbd_connection *connection;
+ u64 dagtag_sector;
+
+ dagtag_sector = resource->dagtag_sector;
+
+ rcu_read_lock();
+ for_each_connection_rcu(connection, resource) {
+ /* use the "next" slot */
+ unsigned int i = !connection->todo.unplug_slot;
+ connection->todo.unplug_dagtag_sector[i] = dagtag_sector;
+ wake_up(&connection->sender_work.q_wait);
+ }
+ rcu_read_unlock();
+}
+
+static void drbd_set_defaults(struct drbd_device *device)
{
- /* D_ASSERT(device, atomic_read(&drbd_pp_vacant)==0); */
+ device->disk_state[NOW] = D_DISKLESS;
+}
+static void drbd_destroy_mempools(void)
+{
bioset_exit(&drbd_io_bio_set);
bioset_exit(&drbd_md_io_bio_set);
mempool_exit(&drbd_buffer_page_pool);
mempool_exit(&drbd_md_io_page_pool);
mempool_exit(&drbd_ee_mempool);
mempool_exit(&drbd_request_mempool);
- kmem_cache_destroy(drbd_ee_cache);
- kmem_cache_destroy(drbd_request_cache);
- kmem_cache_destroy(drbd_bm_ext_cache);
- kmem_cache_destroy(drbd_al_ext_cache);
+ if (drbd_ee_cache)
+ kmem_cache_destroy(drbd_ee_cache);
+ if (drbd_request_cache)
+ kmem_cache_destroy(drbd_request_cache);
+ if (drbd_al_ext_cache)
+ kmem_cache_destroy(drbd_al_ext_cache);
drbd_ee_cache = NULL;
drbd_request_cache = NULL;
- drbd_bm_ext_cache = NULL;
drbd_al_ext_cache = NULL;
return;
@@ -2090,11 +3249,6 @@ static int drbd_create_mempools(void)
if (drbd_ee_cache == NULL)
goto Enomem;
- drbd_bm_ext_cache = kmem_cache_create(
- "drbd_bm", sizeof(struct bm_extent), 0, 0, NULL);
- if (drbd_bm_ext_cache == NULL)
- goto Enomem;
-
drbd_al_ext_cache = kmem_cache_create(
"drbd_al", sizeof(struct lc_element), 0, 0, NULL);
if (drbd_al_ext_cache == NULL)
@@ -2113,7 +3267,6 @@ static int drbd_create_mempools(void)
ret = mempool_init_page_pool(&drbd_md_io_page_pool, DRBD_MIN_POOL_PAGES, 0);
if (ret)
goto Enomem;
-
ret = mempool_init_page_pool(&drbd_buffer_page_pool, number, 0);
if (ret)
goto Enomem;
@@ -2134,70 +3287,77 @@ static int drbd_create_mempools(void)
return -ENOMEM;
}
-static void drbd_release_all_peer_reqs(struct drbd_device *device)
+static void free_peer_device(struct drbd_peer_device *peer_device)
{
- int rr;
+ if (test_and_clear_bit(HOLDING_UUID_READ_LOCK, &peer_device->flags))
+ up_read_non_owner(&peer_device->device->uuid_sem);
- rr = drbd_free_peer_reqs(device, &device->active_ee);
- if (rr)
- drbd_err(device, "%d EEs in active list found!\n", rr);
+ kfree(peer_device->rs_plan_s);
+ kfree(peer_device->conf);
+ kfree(peer_device);
+}
- rr = drbd_free_peer_reqs(device, &device->sync_ee);
- if (rr)
- drbd_err(device, "%d EEs in sync list found!\n", rr);
+static void drbd_device_finalize_work_fn(struct work_struct *work)
+{
+ struct drbd_device *device = container_of(work, struct drbd_device, finalize_work);
+ struct drbd_resource *resource = device->resource;
- rr = drbd_free_peer_reqs(device, &device->read_ee);
- if (rr)
- drbd_err(device, "%d EEs in read list found!\n", rr);
+ /* ldev_safe: no other contexts can access */
+ drbd_bm_free(device);
- rr = drbd_free_peer_reqs(device, &device->done_ee);
- if (rr)
- drbd_err(device, "%d EEs in done list found!\n", rr);
+ put_disk(device->vdisk);
+
+ kfree(device);
+
+ kref_put(&resource->kref, drbd_destroy_resource);
}
-/* caution. no locking. */
+/* may not sleep, called from call_rcu. */
void drbd_destroy_device(struct kref *kref)
{
struct drbd_device *device = container_of(kref, struct drbd_device, kref);
- struct drbd_resource *resource = device->resource;
- struct drbd_peer_device *peer_device, *tmp_peer_device;
-
- timer_shutdown_sync(&device->request_timer);
-
- /* paranoia asserts */
- D_ASSERT(device, device->open_cnt == 0);
- /* end paranoia asserts */
+ struct drbd_peer_device *peer_device, *tmp;
/* cleanup stuff that may have been allocated during
* device (re-)configuration or state changes */
- drbd_backing_dev_free(device, device->ldev);
- device->ldev = NULL;
+#ifdef CONFIG_DRBD_COMPAT_84
+ if (device->resource->res_opts.drbd8_compat_mode)
+ atomic_dec(&nr_drbd8_devices);
+#endif
- drbd_release_all_peer_reqs(device);
+ free_openers(device);
lc_destroy(device->act_log);
- lc_destroy(device->resync);
-
- kfree(device->p_uuid);
- /* device->p_uuid = NULL; */
+ for_each_peer_device_safe(peer_device, tmp, device) {
+ kref_put(&peer_device->connection->kref, drbd_destroy_connection);
+ free_peer_device(peer_device);
+ }
- if (device->bitmap) /* should no longer be there. */
- drbd_bm_cleanup(device);
__free_page(device->md_io.page);
- put_disk(device->vdisk);
- kfree(device->rs_plan_s);
- /* not for_each_connection(connection, resource):
- * those may have been cleaned up and disassociated already.
- */
- for_each_peer_device_safe(peer_device, tmp_peer_device, device) {
- kref_put(&peer_device->connection->kref, drbd_destroy_connection);
- kfree(peer_device);
- }
- if (device->submit.wq)
- destroy_workqueue(device->submit.wq);
- kfree(device);
+ INIT_WORK(&device->finalize_work, drbd_device_finalize_work_fn);
+ schedule_work(&device->finalize_work);
+}
+
+void drbd_destroy_resource(struct kref *kref)
+{
+ struct drbd_resource *resource = container_of(kref, struct drbd_resource, kref);
+
+ idr_destroy(&resource->devices);
+ free_cpumask_var(resource->cpu_mask);
+ kfree(resource->name);
+ kfree(resource);
+ module_put(THIS_MODULE);
+}
+
+void drbd_reclaim_resource(struct rcu_head *rp)
+{
+ struct drbd_resource *resource = container_of(rp, struct drbd_resource, rcu);
+
+ drbd_thread_stop_nowait(&resource->worker);
+
+ mempool_free(resource->peer_ack_req, &drbd_request_mempool);
kref_put(&resource->kref, drbd_destroy_resource);
}
@@ -2222,96 +3382,88 @@ static void do_retry(struct work_struct *ws)
list_splice_init(&retry->writes, &writes);
spin_unlock_irq(&retry->lock);
- list_for_each_entry_safe(req, tmp, &writes, tl_requests) {
+ list_for_each_entry_safe(req, tmp, &writes, list) {
struct drbd_device *device = req->device;
+ struct drbd_resource *resource = device->resource;
struct bio *bio = req->master_bio;
+ unsigned long start_jif = req->start_jif;
bool expected;
+ ktime_get_accounting_assign(ktime_t start_kt, req->start_kt);
+
+ /* No locking when accessing local_rq_state & net_rq_state, since
+ * this request is not active at the moment. */
expected =
expect(device, atomic_read(&req->completion_ref) == 0) &&
- expect(device, req->rq_state & RQ_POSTPONED) &&
- expect(device, (req->rq_state & RQ_LOCAL_PENDING) == 0 ||
- (req->rq_state & RQ_LOCAL_ABORTED) != 0);
+ expect(device, req->local_rq_state & RQ_POSTPONED) &&
+ expect(device, (req->local_rq_state & RQ_LOCAL_PENDING) == 0 ||
+ (req->local_rq_state & RQ_LOCAL_ABORTED) != 0);
if (!expected)
drbd_err(device, "req=%p completion_ref=%d rq_state=%x\n",
req, atomic_read(&req->completion_ref),
- req->rq_state);
+ req->local_rq_state);
- /* We still need to put one kref associated with the
+ /* We still need to put one done reference associated with the
* "completion_ref" going zero in the code path that queued it
* here. The request object may still be referenced by a
* frozen local req->private_bio, in case we force-detached.
*/
- kref_put(&req->kref, drbd_req_destroy);
+ read_lock_irq(&resource->state_rwlock);
+ drbd_put_ref_tl_walk(req, 1, 0);
+ read_unlock_irq(&resource->state_rwlock);
/* A single suspended or otherwise blocking device may stall
- * all others as well. Fortunately, this code path is to
- * recover from a situation that "should not happen":
- * concurrent writes in multi-primary setup.
- * In a "normal" lifecycle, this workqueue is supposed to be
- * destroyed without ever doing anything.
- * If it turns out to be an issue anyways, we can do per
+ * all others as well. This code path is to recover from a
+ * situation that "should not happen": concurrent writes in
+ * multi-primary setup. It is also used for retrying failed
+ * reads. If it turns out to be an issue, we can do per
* resource (replication group) or per device (minor) retry
* workqueues instead.
*/
/* We are not just doing submit_bio_noacct(),
* as we want to keep the start_time information. */
- inc_ap_bio(device);
- __drbd_make_request(device, bio);
+ __drbd_make_request(device, bio, start_kt, start_jif);
}
}
-/* called via drbd_req_put_completion_ref(),
- * holds resource->req_lock */
+/* called via drbd_req_put_completion_ref() */
void drbd_restart_request(struct drbd_request *req)
{
+ struct drbd_device *device = req->device;
+ struct drbd_resource *resource = device->resource;
+ bool susp = drbd_suspended(device);
unsigned long flags;
+
spin_lock_irqsave(&retry.lock, flags);
- list_move_tail(&req->tl_requests, &retry.writes);
+ list_move_tail(&req->list, susp ? &resource->suspended_reqs : &retry.writes);
spin_unlock_irqrestore(&retry.lock, flags);
/* Drop the extra reference that would otherwise
* have been dropped by complete_master_bio.
* do_retry() needs to grab a new one. */
- dec_ap_bio(req->device);
+ dec_ap_bio(req->device, bio_data_dir(req->master_bio));
- queue_work(retry.wq, &retry.worker);
+ if (!susp)
+ queue_work(retry.wq, &retry.worker);
}
-void drbd_destroy_resource(struct kref *kref)
+void drbd_restart_suspended_reqs(struct drbd_resource *resource)
{
- struct drbd_resource *resource =
- container_of(kref, struct drbd_resource, kref);
-
- idr_destroy(&resource->devices);
- free_cpumask_var(resource->cpu_mask);
- kfree(resource->name);
- kfree(resource);
-}
+ unsigned long flags;
-void drbd_free_resource(struct drbd_resource *resource)
-{
- struct drbd_connection *connection, *tmp;
+ spin_lock_irqsave(&retry.lock, flags);
+ list_splice_init(&resource->suspended_reqs, &retry.writes);
+ spin_unlock_irqrestore(&retry.lock, flags);
- for_each_connection_safe(connection, tmp, resource) {
- list_del(&connection->connections);
- drbd_debugfs_connection_cleanup(connection);
- kref_put(&connection->kref, drbd_destroy_connection);
- }
- drbd_debugfs_resource_cleanup(resource);
- kref_put(&resource->kref, drbd_destroy_resource);
+ queue_work(retry.wq, &retry.worker);
}
static void drbd_cleanup(void)
{
- unsigned int i;
- struct drbd_device *device;
- struct drbd_resource *resource, *tmp;
-
/* first remove proc,
- * drbdsetup uses it's presence to detect
+ * drbdsetup uses its presence to detect
* whether DRBD is loaded.
* If we would get stuck in proc removal,
* but have netlink already deregistered,
@@ -2325,19 +3477,13 @@ static void drbd_cleanup(void)
destroy_workqueue(retry.wq);
drbd_genl_unregister();
-
- idr_for_each_entry(&drbd_devices, device, i)
- drbd_delete_device(device);
-
- /* not _rcu since, no other updater anymore. Genl already unregistered */
- for_each_resource_safe(resource, tmp, &drbd_resources) {
- list_del(&resource->resources);
- drbd_free_resource(resource);
- }
-
drbd_debugfs_cleanup();
+ unregister_pernet_device(&drbd_pernet_ops);
+
drbd_destroy_mempools();
+ if (ping_ack_sender)
+ destroy_workqueue(ping_ack_sender);
unregister_blkdev(DRBD_MAJOR, "drbd");
idr_destroy(&drbd_devices);
@@ -2366,6 +3512,16 @@ static int w_complete(struct drbd_work *w, int cancel)
return 0;
}
+void drbd_queue_work(struct drbd_work_queue *q, struct drbd_work *w)
+{
+ unsigned long flags;
+
+ spin_lock_irqsave(&q->q_lock, flags);
+ list_add_tail(&w->list, &q->q);
+ spin_unlock_irqrestore(&q->q_lock, flags);
+ wake_up(&q->q_wait);
+}
+
void drbd_flush_workqueue(struct drbd_work_queue *work_queue)
{
struct completion_work completion_work;
@@ -2376,6 +3532,23 @@ void drbd_flush_workqueue(struct drbd_work_queue *work_queue)
wait_for_completion(&completion_work.done);
}
+void drbd_flush_workqueue_interruptible(struct drbd_device *device)
+{
+ struct completion_work completion_work;
+ int err;
+
+ completion_work.w.cb = w_complete;
+ init_completion(&completion_work.done);
+ drbd_queue_work(&device->resource->work, &completion_work.w);
+ err = wait_for_completion_interruptible(&completion_work.done);
+ if (err == -ERESTARTSYS) {
+ set_bit(ABORT_MDIO, &device->flags);
+ wake_up_all(&device->misc_wait);
+ wait_for_completion(&completion_work.done);
+ clear_bit(ABORT_MDIO, &device->flags);
+ }
+}
+
struct drbd_resource *drbd_find_resource(const char *name)
{
struct drbd_resource *resource;
@@ -2396,51 +3569,58 @@ struct drbd_resource *drbd_find_resource(const char *name)
return resource;
}
-struct drbd_connection *conn_get_by_addrs(void *my_addr, int my_addr_len,
- void *peer_addr, int peer_addr_len)
+static void drbd_put_send_buffers(struct drbd_connection *connection)
{
- struct drbd_resource *resource;
- struct drbd_connection *connection;
+ unsigned int i;
- rcu_read_lock();
- for_each_resource_rcu(resource, &drbd_resources) {
- for_each_connection_rcu(connection, resource) {
- if (connection->my_addr_len == my_addr_len &&
- connection->peer_addr_len == peer_addr_len &&
- !memcmp(&connection->my_addr, my_addr, my_addr_len) &&
- !memcmp(&connection->peer_addr, peer_addr, peer_addr_len)) {
- kref_get(&connection->kref);
- goto found;
- }
+ for (i = DATA_STREAM; i <= CONTROL_STREAM ; i++) {
+ if (connection->send_buffer[i].page) {
+ put_page(connection->send_buffer[i].page);
+ connection->send_buffer[i].page = NULL;
}
}
- connection = NULL;
-found:
- rcu_read_unlock();
- return connection;
}
-static int drbd_alloc_socket(struct drbd_socket *socket)
+static int drbd_alloc_send_buffers(struct drbd_connection *connection)
{
- socket->rbuf = (void *) __get_free_page(GFP_KERNEL);
- if (!socket->rbuf)
- return -ENOMEM;
- socket->sbuf = (void *) __get_free_page(GFP_KERNEL);
- if (!socket->sbuf)
- return -ENOMEM;
+ unsigned int i;
+
+ for (i = DATA_STREAM; i <= CONTROL_STREAM ; i++) {
+ struct page *page;
+
+ page = alloc_page(GFP_KERNEL);
+ if (!page) {
+ drbd_put_send_buffers(connection);
+ return -ENOMEM;
+ }
+ connection->send_buffer[i].page = page;
+ connection->send_buffer[i].unsent =
+ connection->send_buffer[i].pos = page_address(page);
+ }
+
return 0;
}
-static void drbd_free_socket(struct drbd_socket *socket)
+void drbd_flush_peer_acks(struct drbd_resource *resource)
{
- free_page((unsigned long) socket->sbuf);
- free_page((unsigned long) socket->rbuf);
+ spin_lock_irq(&resource->peer_ack_lock);
+ if (resource->peer_ack_req) {
+ resource->last_peer_acked_dagtag = resource->peer_ack_req->dagtag_sector;
+ drbd_queue_peer_ack(resource, resource->peer_ack_req);
+ resource->peer_ack_req = NULL;
+ }
+ spin_unlock_irq(&resource->peer_ack_lock);
}
-void conn_free_crypto(struct drbd_connection *connection)
+static void peer_ack_timer_fn(struct timer_list *t)
{
- drbd_free_sock(connection);
+ struct drbd_resource *resource = timer_container_of(resource, t, peer_ack_timer);
+
+ drbd_flush_peer_acks(resource);
+}
+void conn_free_crypto(struct drbd_connection *connection)
+{
crypto_free_shash(connection->csums_tfm);
crypto_free_shash(connection->verify_tfm);
crypto_free_shash(connection->cram_hmac_tfm);
@@ -2458,11 +3638,25 @@ void conn_free_crypto(struct drbd_connection *connection)
connection->int_dig_vv = NULL;
}
-int set_resource_options(struct drbd_resource *resource, struct res_opts *res_opts)
+static void wake_all_device_misc(struct drbd_resource *resource)
+{
+ struct drbd_device *device;
+ int vnr;
+ rcu_read_lock();
+ idr_for_each_entry(&resource->devices, device, vnr)
+ wake_up(&device->misc_wait);
+ rcu_read_unlock();
+}
+
+int set_resource_options(struct drbd_resource *resource, struct res_opts *res_opts, const char *tag)
{
struct drbd_connection *connection;
cpumask_var_t new_cpu_mask;
int err;
+ bool wake_device_misc = false;
+ bool force_state_recalc = false;
+ unsigned long irq_flags;
+ struct res_opts *old_opts = &resource->res_opts;
if (!zalloc_cpumask_var(&new_cpu_mask, GFP_KERNEL))
return -ENOMEM;
@@ -2491,26 +3685,47 @@ int set_resource_options(struct drbd_resource *resource, struct res_opts *res_op
goto fail;
}
}
+ if (res_opts->nr_requests < DRBD_NR_REQUESTS_MIN)
+ res_opts->nr_requests = DRBD_NR_REQUESTS_MIN;
+
+ if (old_opts->quorum != res_opts->quorum ||
+ old_opts->on_no_quorum != res_opts->on_no_quorum)
+ force_state_recalc = true;
+
+ if (resource->res_opts.nr_requests < res_opts->nr_requests)
+ wake_device_misc = true;
+
resource->res_opts = *res_opts;
if (cpumask_empty(new_cpu_mask))
drbd_calc_cpu_mask(&new_cpu_mask);
if (!cpumask_equal(resource->cpu_mask, new_cpu_mask)) {
cpumask_copy(resource->cpu_mask, new_cpu_mask);
+ resource->worker.reset_cpu_mask = 1;
+ rcu_read_lock();
for_each_connection_rcu(connection, resource) {
connection->receiver.reset_cpu_mask = 1;
- connection->ack_receiver.reset_cpu_mask = 1;
- connection->worker.reset_cpu_mask = 1;
+ connection->sender.reset_cpu_mask = 1;
}
+ rcu_read_unlock();
}
err = 0;
+ if (force_state_recalc) {
+ begin_state_change(resource, &irq_flags, CS_VERBOSE | CS_FORCE_RECALC);
+ end_state_change(resource, &irq_flags, tag);
+ }
+
+ if (wake_device_misc)
+ wake_all_device_misc(resource);
+
fail:
free_cpumask_var(new_cpu_mask);
return err;
}
-struct drbd_resource *drbd_create_resource(const char *name)
+struct drbd_resource *drbd_create_resource(const char *name,
+ struct res_opts *res_opts)
{
struct drbd_resource *resource;
@@ -2525,12 +3740,52 @@ struct drbd_resource *drbd_create_resource(const char *name)
kref_init(&resource->kref);
idr_init(&resource->devices);
INIT_LIST_HEAD(&resource->connections);
- resource->write_ordering = WO_BDEV_FLUSH;
- list_add_tail_rcu(&resource->resources, &drbd_resources);
+ spin_lock_init(&resource->tl_update_lock);
+ INIT_LIST_HEAD(&resource->transfer_log);
+ spin_lock_init(&resource->peer_ack_lock);
+ INIT_LIST_HEAD(&resource->peer_ack_req_list);
+ INIT_LIST_HEAD(&resource->peer_ack_list);
+ INIT_LIST_HEAD(&resource->peer_ack_work.list);
+ resource->peer_ack_work.cb = w_queue_peer_ack;
+ timer_setup(&resource->peer_ack_timer, peer_ack_timer_fn, 0);
+ spin_lock_init(&resource->initiator_flush_lock);
+ sema_init(&resource->state_sem, 1);
+ resource->role[NOW] = R_SECONDARY;
+ resource->max_node_id = res_opts->drbd8_compat_mode ? 1 : res_opts->node_id;
+ resource->twopc_reply.initiator_node_id = -1;
mutex_init(&resource->conf_update);
mutex_init(&resource->adm_mutex);
- spin_lock_init(&resource->req_lock);
+ mutex_init(&resource->open_release);
+ rwlock_init(&resource->state_rwlock);
+ INIT_LIST_HEAD(&resource->listeners);
+ spin_lock_init(&resource->listeners_lock);
+ init_waitqueue_head(&resource->state_wait);
+ init_waitqueue_head(&resource->twopc_wait);
+ init_waitqueue_head(&resource->barrier_wait);
+ timer_setup(&resource->twopc_timer, twopc_timer_fn, 0);
+ INIT_WORK(&resource->twopc_work, nested_twopc_work);
+ drbd_init_workqueue(&resource->work);
+ drbd_thread_init(resource, &resource->worker, drbd_worker, "worker");
+ spin_lock_init(&resource->current_tle_lock);
drbd_debugfs_resource_add(resource);
+ resource->cached_min_aggreed_protocol_version = drbd_protocol_version_min;
+ /* members is a bit mask of the "seen" nodes in this resource.
+ * In drbd8 compatibility mode, we only have one peer, so we can
+ * set this to 1. */
+ resource->members = res_opts->drbd8_compat_mode ? 1 : NODE_MASK(res_opts->node_id);
+ INIT_WORK(&resource->empty_twopc, drbd_empty_twopc_work_fn);
+ INIT_LIST_HEAD(&resource->suspended_reqs);
+
+ ratelimit_state_init(&resource->ratelimit[D_RL_R_GENERIC], 5*HZ, 10);
+
+
+ if (set_resource_options(resource, res_opts, "create-resource"))
+ goto fail_free_name;
+
+ drbd_thread_start(&resource->worker);
+
+ list_add_tail_rcu(&resource->resources, &drbd_resources);
+
return resource;
fail_free_name:
@@ -2542,128 +3797,291 @@ struct drbd_resource *drbd_create_resource(const char *name)
}
/* caller must be under adm_mutex */
-struct drbd_connection *conn_create(const char *name, struct res_opts *res_opts)
+struct drbd_connection *drbd_create_connection(struct drbd_resource *resource,
+ struct drbd_transport_class *tc)
{
- struct drbd_resource *resource;
struct drbd_connection *connection;
+ int size;
- connection = kzalloc_obj(struct drbd_connection);
+ size = sizeof(*connection) - sizeof(connection->transport) + tc->instance_size;
+ connection = kzalloc(size, GFP_KERNEL);
if (!connection)
return NULL;
- if (drbd_alloc_socket(&connection->data))
- goto fail;
- if (drbd_alloc_socket(&connection->meta))
+ ratelimit_state_init(&connection->ratelimit[D_RL_C_GENERIC], 5*HZ, /* no burst */ 1);
+
+ if (drbd_alloc_send_buffers(connection))
goto fail;
connection->current_epoch = kzalloc_obj(struct drbd_epoch);
if (!connection->current_epoch)
goto fail;
- INIT_LIST_HEAD(&connection->transfer_log);
-
INIT_LIST_HEAD(&connection->current_epoch->list);
connection->epochs = 1;
spin_lock_init(&connection->epoch_lock);
+ INIT_LIST_HEAD(&connection->todo.work_list);
+ connection->todo.req = NULL;
+
+ atomic_set(&connection->ap_in_flight, 0);
+ atomic_set(&connection->rs_in_flight, 0);
connection->send.seen_any_write_yet = false;
connection->send.current_epoch_nr = 0;
connection->send.current_epoch_writes = 0;
+ connection->send.current_dagtag_sector =
+ resource->dagtag_sector - ((BIO_MAX_VECS << PAGE_SHIFT) >> SECTOR_SHIFT) - 1;
- resource = drbd_create_resource(name);
- if (!resource)
- goto fail;
-
- connection->cstate = C_STANDALONE;
- mutex_init(&connection->cstate_mutex);
- init_waitqueue_head(&connection->ping_wait);
+ connection->cstate[NOW] = C_STANDALONE;
+ connection->peer_role[NOW] = R_UNKNOWN;
idr_init(&connection->peer_devices);
drbd_init_workqueue(&connection->sender_work);
- mutex_init(&connection->data.mutex);
- mutex_init(&connection->meta.mutex);
+ mutex_init(&connection->mutex[DATA_STREAM]);
+ mutex_init(&connection->mutex[CONTROL_STREAM]);
+
+ INIT_LIST_HEAD(&connection->connect_timer_work.list);
+ timer_setup(&connection->connect_timer, connect_timer_fn, 0);
drbd_thread_init(resource, &connection->receiver, drbd_receiver, "receiver");
connection->receiver.connection = connection;
- drbd_thread_init(resource, &connection->worker, drbd_worker, "worker");
- connection->worker.connection = connection;
- drbd_thread_init(resource, &connection->ack_receiver, drbd_ack_receiver, "ack_recv");
- connection->ack_receiver.connection = connection;
+ drbd_thread_init(resource, &connection->sender, drbd_sender, "sender");
+ connection->sender.connection = connection;
+ spin_lock_init(&connection->primary_flush_lock);
+ spin_lock_init(&connection->flush_ack_lock);
+ spin_lock_init(&connection->peer_reqs_lock);
+ spin_lock_init(&connection->send_oos_lock);
+ INIT_LIST_HEAD(&connection->peer_requests);
+ INIT_LIST_HEAD(&connection->peer_reads);
+ INIT_LIST_HEAD(&connection->send_oos);
+ INIT_LIST_HEAD(&connection->connections);
+ INIT_LIST_HEAD(&connection->done_ee);
+ INIT_LIST_HEAD(&connection->dagtag_wait_ee);
+ INIT_LIST_HEAD(&connection->remove_net_list);
+ init_waitqueue_head(&connection->ee_wait);
kref_init(&connection->kref);
- connection->resource = resource;
+ INIT_WORK(&connection->peer_ack_work, drbd_send_peer_ack_wf);
+ INIT_LIST_HEAD(&connection->send_oos_work.list);
+ connection->send_oos_work.cb = drbd_send_out_of_sync_wf;
+ INIT_LIST_HEAD(&connection->flush_ack_work.list);
+ connection->flush_ack_work.cb = drbd_flush_ack_wf;
+ INIT_WORK(&connection->send_acks_work, drbd_send_acks_wf);
+ INIT_WORK(&connection->send_ping_ack_work, drbd_send_ping_ack_wf);
+ INIT_WORK(&connection->send_ping_work, drbd_send_ping_wf);
+
+ INIT_LIST_HEAD(&connection->send_dagtag_work.list);
+ connection->send_dagtag_work.cb = w_send_dagtag;
- if (set_resource_options(resource, res_opts))
- goto fail_resource;
+ spin_lock_init(&connection->advance_cache_ptr_lock);
kref_get(&resource->kref);
- list_add_tail_rcu(&connection->connections, &resource->connections);
- drbd_debugfs_connection_add(connection);
+ connection->resource = resource;
+ connection->after_reconciliation.lost_node_id = -1;
+
+ connection->reassemble_buffer.buffer = connection->reassemble_buffer_bytes.bytes;
+
+ INIT_LIST_HEAD(&connection->transport.paths);
+ connection->transport.log_prefix = resource->name;
+ if (tc->ops.init(&connection->transport))
+ goto fail;
+
return connection;
-fail_resource:
- list_del(&resource->resources);
- drbd_free_resource(resource);
fail:
+ drbd_put_send_buffers(connection);
kfree(connection->current_epoch);
- drbd_free_socket(&connection->meta);
- drbd_free_socket(&connection->data);
kfree(connection);
+
return NULL;
}
+/**
+ * drbd_transport_shutdown() - Free the transport specific members (e.g., sockets) of a connection
+ * @connection: The connection to shut down
+ * @op: The operation. Only close the connection or destroy the whole transport
+ *
+ * Must be called with conf_update held.
+ */
+void drbd_transport_shutdown(struct drbd_connection *connection, enum drbd_tr_free_op op)
+{
+ struct drbd_transport *transport = &connection->transport;
+
+ lockdep_assert_held(&connection->resource->conf_update);
+
+ mutex_lock(&connection->mutex[DATA_STREAM]);
+ mutex_lock(&connection->mutex[CONTROL_STREAM]);
+
+ /* Ignore send errors, if any: we are shutting down. */
+ flush_send_buffer(connection, DATA_STREAM);
+ flush_send_buffer(connection, CONTROL_STREAM);
+
+ /* Holding conf_update ensures that paths list is not modified concurrently. */
+ transport->class->ops.free(transport, op);
+ if (op == DESTROY_TRANSPORT) {
+ drbd_remove_all_paths(connection);
+
+ /* Wait for the delayed drbd_reclaim_path() calls. */
+ rcu_barrier();
+ drbd_put_transport_class(transport->class);
+ }
+
+ mutex_unlock(&connection->mutex[CONTROL_STREAM]);
+ mutex_unlock(&connection->mutex[DATA_STREAM]);
+}
+
+void drbd_destroy_path(struct kref *kref)
+{
+ struct drbd_path *path = container_of(kref, struct drbd_path, kref);
+ struct drbd_connection *connection =
+ container_of(path->transport, struct drbd_connection, transport);
+
+ kref_put(&connection->kref, drbd_destroy_connection);
+ kfree(path);
+}
+
void drbd_destroy_connection(struct kref *kref)
{
struct drbd_connection *connection = container_of(kref, struct drbd_connection, kref);
struct drbd_resource *resource = connection->resource;
+ struct drbd_peer_device *peer_device;
+ int vnr;
if (atomic_read(&connection->current_epoch->epoch_size) != 0)
drbd_err(connection, "epoch_size:%d\n", atomic_read(&connection->current_epoch->epoch_size));
kfree(connection->current_epoch);
+ idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
+ struct drbd_device *device = peer_device->device;
+ free_peer_device(peer_device);
+ kref_put(&device->kref, drbd_destroy_device);
+ }
idr_destroy(&connection->peer_devices);
- drbd_free_socket(&connection->meta);
- drbd_free_socket(&connection->data);
- kfree(connection->int_dig_in);
- kfree(connection->int_dig_vv);
+ kfree(connection->transport.net_conf);
kfree(connection);
kref_put(&resource->kref, drbd_destroy_resource);
}
+struct drbd_peer_device *create_peer_device(struct drbd_device *device, struct drbd_connection *connection)
+{
+ struct drbd_peer_device *peer_device;
+ int err;
+
+ peer_device = kzalloc_obj(struct drbd_peer_device);
+ if (!peer_device)
+ return NULL;
+
+ peer_device->connection = connection;
+ peer_device->device = device;
+ peer_device->disk_state[NOW] = D_UNKNOWN;
+ peer_device->repl_state[NOW] = L_OFF;
+ peer_device->replication[NOW] = true;
+ peer_device->peer_replication[NOW] = true;
+ spin_lock_init(&peer_device->peer_seq_lock);
+
+ ratelimit_state_init(&peer_device->ratelimit[D_RL_PD_GENERIC], 5*HZ, /* no burst */ 1);
+
+ err = drbd_create_peer_device_default_config(peer_device);
+ if (err) {
+ kfree(peer_device);
+ return NULL;
+ }
+
+ timer_setup(&peer_device->start_resync_timer, start_resync_timer_fn, 0);
+
+ INIT_LIST_HEAD(&peer_device->resync_work.list);
+ peer_device->resync_work.cb = w_resync_timer;
+ timer_setup(&peer_device->resync_timer, resync_timer_fn, 0);
+
+ INIT_LIST_HEAD(&peer_device->propagate_uuids_work.list);
+ peer_device->propagate_uuids_work.cb = w_send_uuids;
+
+ atomic_set(&peer_device->ap_pending_cnt, 0);
+ atomic_set(&peer_device->unacked_cnt, 0);
+ atomic_set(&peer_device->rs_pending_cnt, 0);
+
+ INIT_LIST_HEAD(&peer_device->resync_requests);
+
+ atomic_set(&peer_device->rs_sect_in, 0);
+
+ peer_device->bitmap_index = -1;
+ peer_device->resync_finished_pdsk = D_UNKNOWN;
+
+ peer_device->q_limits.physical_block_size = SECTOR_SIZE;
+ peer_device->q_limits.logical_block_size = SECTOR_SIZE;
+ peer_device->q_limits.alignment_offset = 0;
+ peer_device->q_limits.io_min = SECTOR_SIZE;
+ peer_device->q_limits.io_opt = PAGE_SIZE;
+ peer_device->q_limits.max_bio_size = DRBD_MAX_BIO_SIZE;
+
+ return peer_device;
+}
+
+static void drbd_ldev_destroy(struct work_struct *ws)
+{
+ struct drbd_device *device = container_of(ws, struct drbd_device, ldev_destroy_work);
+
+ /* ldev_safe: destroying the bitmap */
+ drbd_bm_free(device);
+ lc_destroy(device->act_log);
+ device->act_log = NULL;
+ /* ldev_safe: destroying ldev */
+ drbd_backing_dev_free(device, device->ldev);
+ /* ldev_safe: final teardown, no other user possible */
+ device->ldev = NULL;
+
+ clear_bit(GOING_DISKLESS, &device->flags);
+ wake_up(&device->misc_wait);
+ kref_put(&device->kref, drbd_destroy_device);
+}
+
+static int init_conflict_submitter(struct drbd_device *device)
+{
+ /* Short name so that it is recognizable from the first 15 characters. */
+ device->submit_conflict.wq =
+ alloc_ordered_workqueue("drbd%u_sc", WQ_MEM_RECLAIM, device->minor);
+ if (!device->submit_conflict.wq)
+ return -ENOMEM;
+ INIT_WORK(&device->submit_conflict.worker, drbd_do_submit_conflict);
+ INIT_LIST_HEAD(&device->submit_conflict.resync_writes);
+ INIT_LIST_HEAD(&device->submit_conflict.resync_reads);
+ INIT_LIST_HEAD(&device->submit_conflict.writes);
+ INIT_LIST_HEAD(&device->submit_conflict.peer_writes);
+ spin_lock_init(&device->submit_conflict.lock);
+ return 0;
+}
+
static int init_submitter(struct drbd_device *device)
{
- /* opencoded create_singlethread_workqueue(),
- * to be able to say "drbd%d", ..., minor */
device->submit.wq =
alloc_ordered_workqueue("drbd%u_submit", WQ_MEM_RECLAIM, device->minor);
if (!device->submit.wq)
return -ENOMEM;
-
INIT_WORK(&device->submit.worker, do_submit);
INIT_LIST_HEAD(&device->submit.writes);
+ INIT_LIST_HEAD(&device->submit.peer_writes);
+ spin_lock_init(&device->submit.lock);
return 0;
}
-enum drbd_ret_code drbd_create_device(struct drbd_config_context *adm_ctx, unsigned int minor)
+enum drbd_ret_code drbd_create_device(struct drbd_config_context *adm_ctx, unsigned int minor,
+ struct device_conf *device_conf, struct drbd_device **p_device)
{
struct drbd_resource *resource = adm_ctx->resource;
- struct drbd_connection *connection, *n;
+ struct drbd_connection *connection;
struct drbd_device *device;
struct drbd_peer_device *peer_device, *tmp_peer_device;
struct gendisk *disk;
+ LIST_HEAD(peer_devices);
+ LIST_HEAD(tmp);
int id;
int vnr = adm_ctx->volume;
enum drbd_ret_code err = ERR_NOMEM;
- struct queue_limits lim = {
- /*
- * Setting the max_hw_sectors to an odd value of 8kibyte here.
- * This triggers a max_bio_size message upon first attach or
- * connect.
- */
- .max_hw_sectors = DRBD_MAX_BIO_SIZE_SAFE >> 8,
- };
+ bool locked = false;
+
+ lockdep_assert_held(&resource->conf_update);
device = minor_to_device(minor);
if (device)
@@ -2675,24 +4093,65 @@ enum drbd_ret_code drbd_create_device(struct drbd_config_context *adm_ctx, unsig
return ERR_NOMEM;
kref_init(&device->kref);
+ ratelimit_state_init(&device->ratelimit[D_RL_D_GENERIC], 5*HZ, /* no burst */ 1);
+ ratelimit_state_init(&device->ratelimit[D_RL_D_METADATA], 5*HZ, 10);
+ ratelimit_state_init(&device->ratelimit[D_RL_D_BACKEND], 5*HZ, 10);
+
kref_get(&resource->kref);
device->resource = resource;
device->minor = minor;
device->vnr = vnr;
+ device->device_conf = *device_conf;
+
+ drbd_set_defaults(device);
+
+ atomic_set(&device->ap_bio_cnt[READ], 0);
+ atomic_set(&device->ap_bio_cnt[WRITE], 0);
+ atomic_set(&device->ap_actlog_cnt, 0);
+ atomic_set(&device->wait_for_actlog, 0);
+ atomic_set(&device->wait_for_actlog_ecnt, 0);
+ atomic_set(&device->local_cnt, 0);
+ atomic_set(&device->rs_sect_ev, 0);
+ atomic_set(&device->md_io.in_use, 0);
+
+#ifdef CONFIG_DRBD_TIMING_STATS
+ spin_lock_init(&device->timing_lock);
+#endif
+ spin_lock_init(&device->al_lock);
+
+ spin_lock_init(&device->pending_completion_lock);
+ INIT_LIST_HEAD(&device->pending_master_completion[0]);
+ INIT_LIST_HEAD(&device->pending_master_completion[1]);
+ INIT_LIST_HEAD(&device->pending_completion[0]);
+ INIT_LIST_HEAD(&device->pending_completion[1]);
+ INIT_LIST_HEAD(&device->openers);
+ spin_lock_init(&device->openers_lock);
+ spin_lock_init(&device->peer_req_bio_completion_lock);
+
+ atomic_set(&device->pending_bitmap_work.n, 0);
+ spin_lock_init(&device->pending_bitmap_work.q_lock);
+ INIT_LIST_HEAD(&device->pending_bitmap_work.q);
+
+ timer_setup(&device->md_sync_timer, md_sync_timer_fn, 0);
+ timer_setup(&device->request_timer, request_timer_fn, 0);
+
+ init_waitqueue_head(&device->misc_wait);
+ init_waitqueue_head(&device->al_wait);
+ init_waitqueue_head(&device->seq_wait);
- drbd_init_set_defaults(device);
+ init_rwsem(&device->uuid_sem);
- disk = blk_alloc_disk(&lim, NUMA_NO_NODE);
+ disk = blk_alloc_disk(NULL, NUMA_NO_NODE);
if (IS_ERR(disk)) {
err = PTR_ERR(disk);
goto out_no_disk;
}
+ INIT_WORK(&device->ldev_destroy_work, drbd_ldev_destroy);
+
device->vdisk = disk;
device->rq_queue = disk->queue;
- set_disk_ro(disk, true);
-
disk->major = DRBD_MAJOR;
disk->first_minor = minor;
disk->minors = 1;
@@ -2705,12 +4164,39 @@ enum drbd_ret_code drbd_create_device(struct drbd_config_context *adm_ctx, unsig
if (!device->md_io.page)
goto out_no_io_page;
- if (drbd_bm_init(device))
- goto out_no_bitmap;
+ /* Just put in some sane default; should never be used. */
+ device->last_bm_block_shift = BM_BLOCK_SHIFT_MIN;
+
+ spin_lock_init(&device->interval_lock);
device->read_requests = RB_ROOT;
- device->write_requests = RB_ROOT;
+ device->requests = RB_ROOT;
+
+ BUG_ON(!mutex_is_locked(&resource->conf_update));
+ for_each_connection(connection, resource) {
+ peer_device = create_peer_device(device, connection);
+ if (!peer_device)
+ goto out_no_peer_device;
+ list_add(&peer_device->peer_devices, &peer_devices);
+ }
+
+ /* Insert the new device into all idrs under state_rwlock write lock
+ to guarantee a consistent object model. idr_preload() doesn't help
+ because it can only guarantee that a single idr_alloc() will
+ succeed. This fails (and will be retried) if no memory is
+ immediately available.
+ Keep in mid that RCU readers might find the device in the moment
+ we add it to the resources->devices IDR!
+ */
+
+ INIT_LIST_HEAD(&device->peer_devices);
+ spin_lock_init(&device->pending_bmio_lock);
+ INIT_LIST_HEAD(&device->pending_bitmap_io);
- id = idr_alloc(&drbd_devices, device, minor, minor + 1, GFP_KERNEL);
+ locked = true;
+ write_lock_irq(&resource->state_rwlock);
+ spin_lock(&drbd_devices_lock);
+ id = idr_alloc(&drbd_devices, device, minor, minor + 1, GFP_NOWAIT);
+ spin_unlock(&drbd_devices_lock);
if (id < 0) {
if (id == -ENOSPC)
err = ERR_MINOR_OR_VOLUME_EXISTS;
@@ -2718,7 +4204,7 @@ enum drbd_ret_code drbd_create_device(struct drbd_config_context *adm_ctx, unsig
}
kref_get(&device->kref);
- id = idr_alloc(&resource->devices, device, vnr, vnr + 1, GFP_KERNEL);
+ id = idr_alloc(&resource->devices, device, vnr, vnr + 1, GFP_NOWAIT);
if (id < 0) {
if (id == -ENOSPC)
err = ERR_MINOR_OR_VOLUME_EXISTS;
@@ -2726,105 +4212,219 @@ enum drbd_ret_code drbd_create_device(struct drbd_config_context *adm_ctx, unsig
}
kref_get(&device->kref);
- INIT_LIST_HEAD(&device->peer_devices);
- INIT_LIST_HEAD(&device->pending_bitmap_io);
- for_each_connection(connection, resource) {
- peer_device = kzalloc_obj(struct drbd_peer_device);
- if (!peer_device)
- goto out_idr_remove_from_resource;
- peer_device->connection = connection;
- peer_device->device = device;
-
- list_add(&peer_device->peer_devices, &device->peer_devices);
+ list_for_each_entry_safe(peer_device, tmp_peer_device, &peer_devices, peer_devices) {
+ connection = peer_device->connection;
+ id = idr_alloc(&connection->peer_devices, peer_device,
+ device->vnr, device->vnr + 1, GFP_NOWAIT);
+ if (id < 0)
+ goto out_remove_peer_device;
+ list_del(&peer_device->peer_devices);
+ list_add_rcu(&peer_device->peer_devices, &device->peer_devices);
+ kref_get(&connection->kref);
kref_get(&device->kref);
+ }
+ write_unlock_irq(&resource->state_rwlock);
+ locked = false;
- id = idr_alloc(&connection->peer_devices, peer_device, vnr, vnr + 1, GFP_KERNEL);
- if (id < 0) {
- if (id == -ENOSPC)
- err = ERR_INVALID_REQUEST;
- goto out_idr_remove_from_resource;
- }
- kref_get(&connection->kref);
- INIT_WORK(&peer_device->send_acks_work, drbd_send_acks_wf);
+ if (init_conflict_submitter(device)) {
+ err = ERR_NOMEM;
+ goto out_remove_peer_device;
}
if (init_submitter(device)) {
err = ERR_NOMEM;
- goto out_idr_remove_from_resource;
+ goto out_remove_peer_device;
}
err = add_disk(disk);
if (err)
- goto out_destroy_workqueue;
+ goto out_destroy_submitter;
+ device->have_quorum[OLD] =
+ device->have_quorum[NEW] =
+ (resource->res_opts.quorum == QOU_OFF);
- /* inherit the connection state */
- device->state.conn = first_connection(resource)->cstate;
- if (device->state.conn == C_WF_REPORT_PARAMS) {
- for_each_peer_device(peer_device, device)
+ for_each_peer_device(peer_device, device) {
+ connection = peer_device->connection;
+ peer_device->node_id = connection->peer_node_id;
+
+ if (connection->cstate[NOW] >= C_CONNECTED)
drbd_connected(peer_device);
}
- /* move to create_peer_device() */
- for_each_peer_device(peer_device, device)
- drbd_debugfs_peer_device_add(peer_device);
+
drbd_debugfs_device_add(device);
+ *p_device = device;
return NO_ERROR;
-out_destroy_workqueue:
+out_destroy_submitter:
destroy_workqueue(device->submit.wq);
-out_idr_remove_from_resource:
- for_each_connection_safe(connection, n, resource) {
- peer_device = idr_remove(&connection->peer_devices, vnr);
- if (peer_device)
- kref_put(&connection->kref, drbd_destroy_connection);
- }
- for_each_peer_device_safe(peer_device, tmp_peer_device, device) {
+ device->submit.wq = NULL;
+out_remove_peer_device:
+ list_splice_init_rcu(&device->peer_devices, &tmp, synchronize_rcu);
+ list_for_each_entry_safe(peer_device, tmp_peer_device, &tmp, peer_devices) {
+ struct drbd_connection *connection = peer_device->connection;
+
+ idr_remove(&connection->peer_devices, device->vnr);
list_del(&peer_device->peer_devices);
kfree(peer_device);
+ kref_put(&connection->kref, drbd_destroy_connection);
}
idr_remove(&resource->devices, vnr);
+
out_idr_remove_minor:
+ spin_lock(&drbd_devices_lock);
idr_remove(&drbd_devices, minor);
- synchronize_rcu();
+ spin_unlock(&drbd_devices_lock);
out_no_minor_idr:
- drbd_bm_cleanup(device);
-out_no_bitmap:
+ if (locked)
+ write_unlock_irq(&resource->state_rwlock);
+ synchronize_rcu();
+
+out_no_peer_device:
+ list_for_each_entry_safe(peer_device, tmp_peer_device, &peer_devices, peer_devices) {
+ list_del(&peer_device->peer_devices);
+ kfree(peer_device);
+ }
+
__free_page(device->md_io.page);
out_no_io_page:
put_disk(disk);
out_no_disk:
kref_put(&resource->kref, drbd_destroy_resource);
+ /* kref debugging wants an extra put, see has_refs() */
kfree(device);
return err;
}
-void drbd_delete_device(struct drbd_device *device)
+/**
+ * drbd_unregister_device() - make a device "invisible"
+ * @device: DRBD device to unregister
+ *
+ * Remove the device from the drbd object model and unregister it in the
+ * kernel. Keep reference counts on device->kref; they are dropped in
+ * drbd_reclaim_device().
+ */
+void drbd_unregister_device(struct drbd_device *device)
{
struct drbd_resource *resource = device->resource;
struct drbd_connection *connection;
struct drbd_peer_device *peer_device;
- /* move to free_peer_device() */
- for_each_peer_device(peer_device, device)
- drbd_debugfs_peer_device_cleanup(peer_device);
- drbd_debugfs_device_cleanup(device);
+ write_lock_irq(&resource->state_rwlock);
for_each_connection(connection, resource) {
idr_remove(&connection->peer_devices, device->vnr);
- kref_put(&device->kref, drbd_destroy_device);
}
idr_remove(&resource->devices, device->vnr);
- kref_put(&device->kref, drbd_destroy_device);
- idr_remove(&drbd_devices, device_to_minor(device));
- kref_put(&device->kref, drbd_destroy_device);
+ spin_lock(&drbd_devices_lock);
+ idr_remove(&drbd_devices, device->minor);
+ spin_unlock(&drbd_devices_lock);
+ write_unlock_irq(&resource->state_rwlock);
+
+ for_each_peer_device(peer_device, device)
+ drbd_debugfs_peer_device_cleanup(peer_device);
+ drbd_debugfs_device_cleanup(device);
del_gendisk(device->vdisk);
- synchronize_rcu();
- kref_put(&device->kref, drbd_destroy_device);
+
+ destroy_workqueue(device->submit_conflict.wq);
+ device->submit_conflict.wq = NULL;
+ destroy_workqueue(device->submit.wq);
+ device->submit.wq = NULL;
+ timer_shutdown_sync(&device->request_timer);
+}
+
+void drbd_reclaim_device(struct rcu_head *rp)
+{
+ struct drbd_device *device = container_of(rp, struct drbd_device, rcu);
+ struct drbd_peer_device *peer_device;
+ int i;
+
+ for_each_peer_device(peer_device, device) {
+ kref_put(&device->kref, drbd_destroy_device);
+ }
+
+ for (i = 0; i < 3; i++) {
+ kref_put(&device->kref, drbd_destroy_device);
+ }
+}
+
+static void shutdown_connect_timer(struct drbd_connection *connection)
+{
+ if (timer_shutdown_sync(&connection->connect_timer)) {
+ kref_put(&connection->kref, drbd_destroy_connection);
+ }
+}
+
+void del_connect_timer(struct drbd_connection *connection)
+{
+ if (timer_delete_sync(&connection->connect_timer)) {
+ kref_put(&connection->kref, drbd_destroy_connection);
+ }
+}
+
+/**
+ * drbd_unregister_connection() - make a connection "invisible"
+ * @connection: DRBD connection to unregister
+ *
+ * Remove the connection from the drbd object model. Keep reference counts on
+ * connection->kref; they are dropped in drbd_reclaim_connection().
+ */
+void drbd_unregister_connection(struct drbd_connection *connection)
+{
+ struct drbd_resource *resource = connection->resource;
+ struct drbd_peer_device *peer_device;
+ int vnr, rr;
+
+ idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
+ drbd_debugfs_peer_device_cleanup(peer_device);
+
+ write_lock_irq(&resource->state_rwlock);
+ set_bit(C_UNREGISTERED, &connection->flags);
+ smp_wmb();
+ idr_for_each_entry(&connection->peer_devices, peer_device, vnr)
+ list_del_rcu(&peer_device->peer_devices);
+ list_del_rcu(&connection->connections);
+ write_unlock_irq(&resource->state_rwlock);
+
+ drbd_debugfs_connection_cleanup(connection);
+
+ shutdown_connect_timer(connection);
+
+ rr = drbd_free_peer_reqs(connection, &connection->done_ee);
+ if (rr)
+ drbd_err(connection, "%d EEs in done list found!\n", rr);
+
+ drbd_transport_shutdown(connection, DESTROY_TRANSPORT);
+ drbd_put_send_buffers(connection);
+ conn_free_crypto(connection);
+}
+
+void drbd_reclaim_connection(struct rcu_head *rp)
+{
+ struct drbd_connection *connection =
+ container_of(rp, struct drbd_connection, rcu);
+ struct drbd_peer_device *peer_device;
+ int vnr;
+
+ idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
+ kref_put(&connection->kref, drbd_destroy_connection);
+ }
+ kref_put(&connection->kref, drbd_destroy_connection);
+}
+
+void drbd_reclaim_path(struct rcu_head *rp)
+{
+ struct drbd_path *path = container_of(rp, struct drbd_path, rcu);
+
+ INIT_LIST_HEAD(&path->list);
+ kref_put(&path->kref, drbd_destroy_path);
}
static int __init drbd_init(void)
{
int err;
- if (drbd_minor_count < DRBD_MINOR_COUNT_MIN || drbd_minor_count > DRBD_MINOR_COUNT_MAX) {
+
+ if (drbd_minor_count < DRBD_MINOR_COUNT_MIN
+ || drbd_minor_count > DRBD_MINOR_COUNT_MAX) {
pr_err("invalid minor_count (%d)\n", drbd_minor_count);
#ifdef MODULE
return -EINVAL;
@@ -2840,24 +4440,41 @@ static int __init drbd_init(void)
return err;
}
+ /*
+ * allocate all necessary structs
+ */
drbd_proc = NULL; /* play safe for drbd_cleanup */
idr_init(&drbd_devices);
- mutex_init(&resources_mutex);
INIT_LIST_HEAD(&drbd_resources);
+ err = register_pernet_device(&drbd_pernet_ops);
+ if (err) {
+ pr_err("unable to register net namespace handlers\n");
+ goto fail;
+ }
+
+ drbd_enable_netns();
err = drbd_genl_register();
if (err) {
pr_err("unable to register generic netlink family\n");
goto fail;
}
+ err = -ENOMEM;
+ ping_ack_sender = alloc_workqueue("drbd_pas",
+ WQ_UNBOUND | WQ_MEM_RECLAIM | WQ_HIGHPRI, 0);
+ if (!ping_ack_sender)
+ goto fail;
+
err = drbd_create_mempools();
if (err)
goto fail;
err = -ENOMEM;
- drbd_proc = proc_create_single("drbd", S_IFREG | 0444 , NULL, drbd_seq_show);
+ drbd_proc = proc_create_single("drbd", S_IFREG | 0444, NULL,
+ drbd_seq_show);
+
if (!drbd_proc) {
pr_err("unable to register proc file\n");
goto fail;
@@ -2879,6 +4496,11 @@ static int __init drbd_init(void)
GENL_MAGIC_VERSION, PRO_VERSION_MIN, PRO_VERSION_MAX);
pr_info("%s\n", drbd_buildtag());
pr_info("registered as block device major %d\n", DRBD_MAJOR);
+
+#ifdef CONFIG_DRBD_COMPAT_84
+ atomic_set(&nr_drbd8_devices, 0);
+#endif
+
return 0; /* Success! */
fail:
@@ -2890,493 +4512,1104 @@ static int __init drbd_init(void)
return err;
}
-static void drbd_free_one_sock(struct drbd_socket *ds)
-{
- struct socket *s;
- mutex_lock(&ds->mutex);
- s = ds->socket;
- ds->socket = NULL;
- mutex_unlock(&ds->mutex);
- if (s) {
- /* so debugfs does not need to mutex_lock() */
- synchronize_rcu();
- kernel_sock_shutdown(s, SHUT_RDWR);
- sock_release(s);
- }
-}
-
-void drbd_free_sock(struct drbd_connection *connection)
-{
- if (connection->data.socket)
- drbd_free_one_sock(&connection->data);
- if (connection->meta.socket)
- drbd_free_one_sock(&connection->meta);
-}
-
/* meta data management */
-void conn_md_sync(struct drbd_connection *connection)
+static
+void drbd_md_encode_9(struct drbd_device *device, struct meta_data_on_disk_9 *buffer)
{
- struct drbd_peer_device *peer_device;
- int vnr;
+ int i;
- rcu_read_lock();
- idr_for_each_entry(&connection->peer_devices, peer_device, vnr) {
- struct drbd_device *device = peer_device->device;
-
- kref_get(&device->kref);
- rcu_read_unlock();
- drbd_md_sync(device);
- kref_put(&device->kref, drbd_destroy_device);
- rcu_read_lock();
- }
- rcu_read_unlock();
-}
-
-/* aligned 4kByte */
-struct meta_data_on_disk {
- u64 la_size_sect; /* last agreed size. */
- u64 uuid[UI_SIZE]; /* UUIDs. */
- u64 device_uuid;
- u64 reserved_u64_1;
- u32 flags; /* MDF */
- u32 magic;
- u32 md_size_sect;
- u32 al_offset; /* offset to this block */
- u32 al_nr_extents; /* important for restoring the AL (userspace) */
- /* `-- act_log->nr_elements <-- ldev->dc.al_extents */
- u32 bm_offset; /* offset to the bitmap, from here */
- u32 bm_bytes_per_bit; /* BM_BLOCK_SIZE */
- u32 la_peer_max_bio_size; /* last peer max_bio_size */
-
- /* see al_tr_number_to_on_disk_sector() */
- u32 al_stripes;
- u32 al_stripe_size_4k;
-
- u8 reserved_u8[4096 - (7*8 + 10*4)];
-} __packed;
-
-
-
-void drbd_md_write(struct drbd_device *device, void *b)
-{
- struct meta_data_on_disk *buffer = b;
- sector_t sector;
- int i;
-
- memset(buffer, 0, sizeof(*buffer));
-
- buffer->la_size_sect = cpu_to_be64(get_capacity(device->vdisk));
- for (i = UI_CURRENT; i < UI_SIZE; i++)
- buffer->uuid[i] = cpu_to_be64(device->ldev->md.uuid[i]);
+ buffer->effective_size = cpu_to_be64(device->ldev->md.effective_size);
+ buffer->current_uuid = cpu_to_be64(device->ldev->md.current_uuid);
+ buffer->members = cpu_to_be64(device->ldev->md.members);
buffer->flags = cpu_to_be32(device->ldev->md.flags);
- buffer->magic = cpu_to_be32(DRBD_MD_MAGIC_84_UNCLEAN);
+ buffer->magic = cpu_to_be32(DRBD_MD_MAGIC_09);
buffer->md_size_sect = cpu_to_be32(device->ldev->md.md_size_sect);
buffer->al_offset = cpu_to_be32(device->ldev->md.al_offset);
buffer->al_nr_extents = cpu_to_be32(device->act_log->nr_elements);
- buffer->bm_bytes_per_bit = cpu_to_be32(BM_BLOCK_SIZE);
+ buffer->bm_bytes_per_bit = cpu_to_be32(device->ldev->md.bm_block_size);
buffer->device_uuid = cpu_to_be64(device->ldev->md.device_uuid);
buffer->bm_offset = cpu_to_be32(device->ldev->md.bm_offset);
- buffer->la_peer_max_bio_size = cpu_to_be32(device->peer_max_bio_size);
+ buffer->la_peer_max_bio_size = cpu_to_be32(device->device_conf.max_bio_size);
+ buffer->bm_max_peers = cpu_to_be32(device->ldev->md.max_peers);
+ buffer->node_id = cpu_to_be32(device->ldev->md.node_id);
+ for (i = 0; i < DRBD_NODE_ID_MAX; i++) {
+ struct drbd_peer_md *peer_md = &device->ldev->md.peers[i];
+
+ buffer->peers[i].bitmap_uuid = cpu_to_be64(peer_md->bitmap_uuid);
+ buffer->peers[i].bitmap_dagtag = cpu_to_be64(peer_md->bitmap_dagtag);
+ buffer->peers[i].flags = cpu_to_be32(peer_md->flags & ~MDF_HAVE_BITMAP);
+ buffer->peers[i].bitmap_index = cpu_to_be32(peer_md->bitmap_index);
+ }
+ BUILD_BUG_ON(ARRAY_SIZE(device->ldev->md.history_uuids) != ARRAY_SIZE(buffer->history_uuids));
+ for (i = 0; i < ARRAY_SIZE(buffer->history_uuids); i++)
+ buffer->history_uuids[i] = cpu_to_be64(device->ldev->md.history_uuids[i]);
buffer->al_stripes = cpu_to_be32(device->ldev->md.al_stripes);
buffer->al_stripe_size_4k = cpu_to_be32(device->ldev->md.al_stripe_size_4k);
+ if (device->bitmap == NULL)
+ for (i = 0; i < DRBD_PEERS_MAX; i++)
+ buffer->peers[i].flags |= cpu_to_be32(MDF_PEER_FULL_SYNC);
+}
+
+static void drbd_md_encode(struct drbd_device *device, void *buffer)
+{
+ if (test_bit(LEGACY_84_MD, &device->flags))
+ drbd_md_encode_84(device, buffer);
+ else
+ drbd_md_encode_9(device, buffer);
+}
+
+int drbd_md_write(struct drbd_device *device, struct meta_data_on_disk_9 *buffer)
+{
+ sector_t sector;
+ int err;
+
+ if (drbd_md_dax_active(device->ldev)) {
+ drbd_md_encode(device, drbd_dax_md_addr(device->ldev));
+ arch_wb_cache_pmem(drbd_dax_md_addr(device->ldev),
+ sizeof(struct meta_data_on_disk_9));
+ return 0;
+ }
+
+ memset(buffer, 0, sizeof(*buffer));
+
+ drbd_md_encode(device, buffer);
+
D_ASSERT(device, drbd_md_ss(device->ldev) == device->ldev->md.md_offset);
sector = device->ldev->md.md_offset;
- if (drbd_md_sync_page_io(device, device->ldev, sector, REQ_OP_WRITE)) {
- /* this was a try anyways ... */
+ err = drbd_md_sync_page_io(device, device->ldev, sector, REQ_OP_WRITE);
+ if (err) {
drbd_err(device, "meta data update failed!\n");
- drbd_chk_io_error(device, 1, DRBD_META_IO_ERROR);
+ drbd_handle_io_error(device, DRBD_META_IO_ERROR);
}
+
+ return err;
}
/**
- * drbd_md_sync() - Writes the meta data super block if the MD_DIRTY flag bit is set
+ * __drbd_md_sync() - Writes the meta data super block (conditionally) if the MD_DIRTY flag bit is set
* @device: DRBD device.
+ * @maybe: meta data may in fact be "clean", the actual write may be skipped.
*/
-void drbd_md_sync(struct drbd_device *device)
+static int __drbd_md_sync(struct drbd_device *device, bool maybe)
{
- struct meta_data_on_disk *buffer;
+ struct meta_data_on_disk_9 *buffer;
+ int err = -EIO;
/* Don't accidentally change the DRBD meta data layout. */
- BUILD_BUG_ON(UI_SIZE != 4);
- BUILD_BUG_ON(sizeof(struct meta_data_on_disk) != 4096);
-
- timer_delete(&device->md_sync_timer);
- /* timer may be rearmed by drbd_md_mark_dirty() now. */
- if (!test_and_clear_bit(MD_DIRTY, &device->flags))
- return;
+ BUILD_BUG_ON(DRBD_PEERS_MAX != 32);
+ BUILD_BUG_ON(HISTORY_UUIDS != 32);
+ BUILD_BUG_ON(sizeof(struct meta_data_on_disk_9) != 4096);
- /* We use here D_FAILED and not D_ATTACHING because we try to write
- * metadata even if we detach due to a disk failure! */
- if (!get_ldev_if_state(device, D_FAILED))
- return;
+ if (!get_ldev_if_state(device, D_DETACHING))
+ return -EIO;
buffer = drbd_md_get_buffer(device, __func__);
if (!buffer)
goto out;
- drbd_md_write(device, buffer);
+ timer_delete(&device->md_sync_timer);
+ /* timer may be rearmed by drbd_md_mark_dirty() now. */
- /* Update device->ldev->md.la_size_sect,
- * since we updated it on metadata. */
- device->ldev->md.la_size_sect = get_capacity(device->vdisk);
+ if (test_and_clear_bit(MD_DIRTY, &device->flags) || !maybe) {
+ err = drbd_md_write(device, buffer);
+ if (err)
+ set_bit(MD_DIRTY, &device->flags);
+ }
drbd_md_put_buffer(device);
out:
put_ldev(device);
+
+ return err;
+}
+
+int drbd_md_sync(struct drbd_device *device)
+{
+ return __drbd_md_sync(device, false);
+}
+
+int drbd_md_sync_if_dirty(struct drbd_device *device)
+{
+ return __drbd_md_sync(device, true);
+}
+
+/**
+ * drbd_md_mark_dirty() - Mark meta data super block as dirty
+ * @device: DRBD device.
+ *
+ * Call this function if you change anything that should be written to
+ * the meta-data super block. This function sets MD_DIRTY, and starts a
+ * timer that ensures that within five seconds you have to call drbd_md_sync().
+ */
+void drbd_md_mark_dirty(struct drbd_device *device)
+{
+ if (!test_and_set_bit(MD_DIRTY, &device->flags))
+ mod_timer(&device->md_sync_timer, jiffies + 5*HZ);
+}
+
+void _drbd_uuid_push_history(struct drbd_device *device, u64 val)
+{
+ struct drbd_md *md = &device->ldev->md;
+ int node_id, i;
+
+ if (val == UUID_JUST_CREATED || val == 0)
+ return;
+
+ val &= ~UUID_PRIMARY;
+
+ if (val == (md->current_uuid & ~UUID_PRIMARY))
+ return;
+
+ for (node_id = 0; node_id < DRBD_NODE_ID_MAX; node_id++) {
+ if (node_id == md->node_id)
+ continue;
+ if (val == (md->peers[node_id].bitmap_uuid & ~UUID_PRIMARY))
+ return;
+ }
+
+ for (i = 0; i < ARRAY_SIZE(md->history_uuids); i++) {
+ if (md->history_uuids[i] == val)
+ return;
+ }
+
+ for (i = ARRAY_SIZE(md->history_uuids) - 1; i > 0; i--)
+ md->history_uuids[i] = md->history_uuids[i - 1];
+ md->history_uuids[i] = val;
+}
+
+u64 _drbd_uuid_pull_history(struct drbd_peer_device *peer_device)
+{
+ struct drbd_device *device = peer_device->device;
+ struct drbd_md *md = &device->ldev->md;
+ u64 first_history_uuid;
+ int i;
+
+ first_history_uuid = md->history_uuids[0];
+ for (i = 0; i < ARRAY_SIZE(md->history_uuids) - 1; i++)
+ md->history_uuids[i] = md->history_uuids[i + 1];
+ md->history_uuids[i] = 0;
+
+ return first_history_uuid;
+}
+
+static void __drbd_uuid_set_current(struct drbd_device *device, u64 val)
+{
+ drbd_md_mark_dirty(device);
+ if (device->resource->role[NOW] == R_PRIMARY)
+ val |= UUID_PRIMARY;
+ else
+ val &= ~UUID_PRIMARY;
+
+ device->ldev->md.current_uuid = val;
+ drbd_uuid_set_exposed(device, val, false);
+}
+
+static void __drbd_uuid_set_bitmap(struct drbd_peer_device *peer_device, u64 val)
+{
+ struct drbd_device *device = peer_device->device;
+ struct drbd_peer_md *peer_md = &device->ldev->md.peers[peer_device->node_id];
+
+ drbd_md_mark_dirty(device);
+ peer_md->bitmap_uuid = val;
+ peer_md->bitmap_dagtag = val ? device->resource->dagtag_sector : 0;
+}
+
+void _drbd_uuid_set_current(struct drbd_device *device, u64 val)
+{
+ unsigned long flags;
+
+ spin_lock_irqsave(&device->ldev->md.uuid_lock, flags);
+ __drbd_uuid_set_current(device, val);
+ spin_unlock_irqrestore(&device->ldev->md.uuid_lock, flags);
+}
+
+void _drbd_uuid_set_bitmap(struct drbd_peer_device *peer_device, u64 val)
+{
+ struct drbd_device *device = peer_device->device;
+ unsigned long flags;
+
+ down_write(&device->uuid_sem);
+ spin_lock_irqsave(&device->ldev->md.uuid_lock, flags);
+ __drbd_uuid_set_bitmap(peer_device, val);
+ spin_unlock_irqrestore(&device->ldev->md.uuid_lock, flags);
+ up_write(&device->uuid_sem);
+}
+
+/* call holding down_write(uuid_sem) */
+void drbd_uuid_set_bitmap(struct drbd_peer_device *peer_device, u64 uuid)
+{
+ struct drbd_device *device = peer_device->device;
+ unsigned long flags;
+ u64 previous_uuid;
+
+ spin_lock_irqsave(&device->ldev->md.uuid_lock, flags);
+ previous_uuid = drbd_bitmap_uuid(peer_device);
+ __drbd_uuid_set_bitmap(peer_device, uuid);
+ if (previous_uuid)
+ _drbd_uuid_push_history(device, previous_uuid);
+ spin_unlock_irqrestore(&device->ldev->md.uuid_lock, flags);
+}
+
+/**
+ * drbd_uuid_is_day0() - Check if device is in "day0" UUID state
+ * @device: DRBD device (caller must hold ldev reference)
+ *
+ * Returns true if the current UUID appears to be a "day0" UUID:
+ * a real UUID value was set (e.g. by linstor during create-md),
+ * but no UUID rotation has ever happened (all history and bitmap
+ * UUIDs are still zero).
+ */
+bool drbd_uuid_is_day0(struct drbd_device *device)
+{
+ struct drbd_md *md = &device->ldev->md;
+ int i;
+
+ if ((md->current_uuid & ~UUID_PRIMARY) == UUID_JUST_CREATED ||
+ md->current_uuid == 0)
+ return false;
+
+ for (i = 0; i < ARRAY_SIZE(md->history_uuids); i++)
+ if (md->history_uuids[i])
+ return false;
+
+ for (i = 0; i < DRBD_NODE_ID_MAX; i++) {
+ if (i == md->node_id)
+ continue;
+ if (md->peers[i].bitmap_uuid)
+ return false;
+ }
+
+ return true;
+}
+
+static u64 rotate_current_into_bitmap(struct drbd_device *device, u64 weak_nodes, u64 dagtag)
+{
+ struct drbd_peer_md *peer_md = device->ldev->md.peers;
+ struct drbd_peer_device *peer_device;
+ int node_id;
+ u64 bm_uuid, prev_c_uuid;
+ u64 node_mask = 0; /* bit mask of node-ids processed */
+ u64 slot_mask = 0; /* bit mask of on-disk bitmap slots processed */
+ /* return value, bit mask of node-ids for which we
+ * actually set a new bitmap uuid */
+ u64 got_new_bitmap_uuid = 0;
+
+ if (device->ldev->md.current_uuid != UUID_JUST_CREATED)
+ prev_c_uuid = device->ldev->md.current_uuid;
+ else
+ get_random_bytes(&prev_c_uuid, sizeof(u64));
+
+ rcu_read_lock();
+ for_each_peer_device_rcu(peer_device, device) {
+ enum drbd_disk_state pdsk;
+ node_id = peer_device->node_id;
+ node_mask |= NODE_MASK(node_id);
+ if (peer_device->bitmap_index != -1)
+ __set_bit(peer_device->bitmap_index, (unsigned long *)&slot_mask);
+ bm_uuid = peer_md[node_id].bitmap_uuid;
+ if (bm_uuid && bm_uuid != prev_c_uuid)
+ continue;
+
+ pdsk = peer_device->disk_state[NOW];
+
+ /* Create a new current UUID for a peer that is diskless but usually has a backing disk.
+ * Do not create a new current UUID for a CONNECTED intentional diskless peer.
+ * Create one for an intentional diskless peer that is currently away. */
+ if (pdsk == D_DISKLESS && !(peer_md[node_id].flags & MDF_HAVE_BITMAP))
+ continue;
+
+ if ((pdsk <= D_UNKNOWN && pdsk != D_NEGOTIATING) ||
+ (NODE_MASK(node_id) & weak_nodes)) {
+ peer_md[node_id].bitmap_uuid = prev_c_uuid;
+ peer_md[node_id].bitmap_dagtag = dagtag;
+ drbd_md_mark_dirty(device);
+ got_new_bitmap_uuid |= NODE_MASK(node_id);
+ }
+ }
+ for (node_id = 0; node_id < DRBD_NODE_ID_MAX; node_id++) {
+ int slot_nr;
+ if (node_id == device->ldev->md.node_id)
+ continue;
+ if (node_mask & NODE_MASK(node_id))
+ continue;
+ slot_nr = peer_md[node_id].bitmap_index;
+ if (slot_nr != -1) {
+ if (test_bit(slot_nr, (unsigned long *)&slot_mask))
+ continue;
+ __set_bit(slot_nr, (unsigned long *)&slot_mask);
+ }
+ bm_uuid = peer_md[node_id].bitmap_uuid;
+ if (bm_uuid && bm_uuid != prev_c_uuid)
+ continue;
+ if (slot_nr == -1) {
+ slot_nr = find_first_zero_bit((unsigned long *)&slot_mask, sizeof(slot_mask) * BITS_PER_BYTE);
+ __set_bit(slot_nr, (unsigned long *)&slot_mask);
+ }
+ peer_md[node_id].bitmap_uuid = prev_c_uuid;
+ peer_md[node_id].bitmap_dagtag = dagtag;
+ drbd_md_mark_dirty(device);
+ /* count, but only if that bitmap index exists. */
+ if (slot_nr < device->ldev->md.max_peers)
+ got_new_bitmap_uuid |= NODE_MASK(node_id);
+ }
+ rcu_read_unlock();
+
+ return got_new_bitmap_uuid;
+}
+
+static u64 initial_resync_nodes(struct drbd_device *device)
+{
+ struct drbd_peer_device *peer_device;
+ u64 nodes = 0;
+
+ for_each_peer_device(peer_device, device) {
+ if (peer_device->disk_state[NOW] == D_INCONSISTENT &&
+ peer_device->repl_state[NOW] == L_ESTABLISHED)
+ nodes |= NODE_MASK(peer_device->node_id);
+ }
+
+ return nodes;
+}
+
+u64 drbd_weak_nodes_device(struct drbd_device *device)
+{
+ struct drbd_peer_device *peer_device;
+ u64 not_weak = 0;
+
+ if (device->disk_state[NOW] == D_UP_TO_DATE)
+ not_weak = NODE_MASK(device->resource->res_opts.node_id);
+
+ rcu_read_lock();
+ for_each_peer_device_rcu(peer_device, device) {
+ enum drbd_disk_state pdsk = peer_device->disk_state[NOW];
+ if (!(pdsk <= D_FAILED || pdsk == D_UNKNOWN || pdsk == D_OUTDATED))
+ not_weak |= NODE_MASK(peer_device->node_id);
+
+ }
+ rcu_read_unlock();
+
+ return ~not_weak;
+}
+
+
+static bool __new_current_uuid_prepare(struct drbd_device *device, bool forced)
+{
+ u64 got_new_bitmap_uuid, val, old_current_uuid;
+ bool day0;
+ int err;
+
+ spin_lock_irq(&device->ldev->md.uuid_lock);
+ day0 = drbd_uuid_is_day0(device);
+ got_new_bitmap_uuid = rotate_current_into_bitmap(device,
+ forced ? initial_resync_nodes(device) : 0,
+ device->resource->dagtag_sector);
+
+ if (!got_new_bitmap_uuid && !day0) {
+ spin_unlock_irq(&device->ldev->md.uuid_lock);
+ return false;
+ }
+
+ old_current_uuid = device->ldev->md.current_uuid;
+ get_random_bytes(&val, sizeof(u64));
+ __drbd_uuid_set_current(device, val);
+ spin_unlock_irq(&device->ldev->md.uuid_lock);
+
+ /* get it to stable storage _now_ */
+ err = drbd_md_sync(device);
+ if (err) {
+ _drbd_uuid_set_current(device, old_current_uuid);
+ return false;
+ }
+
+ return true;
+}
+
+static void __new_current_uuid_info(struct drbd_device *device, u64 weak_nodes)
+{
+ drbd_info(device, "new current UUID: %016llX weak: %016llX\n",
+ device->ldev->md.current_uuid, weak_nodes);
+}
+
+static void __new_current_uuid_send(struct drbd_device *device, u64 weak_nodes, bool forced)
+{
+ struct drbd_peer_device *peer_device;
+ u64 im;
+
+ for_each_peer_device_ref(peer_device, im, device) {
+ if (peer_device->repl_state[NOW] >= L_ESTABLISHED)
+ drbd_send_uuids(peer_device, forced ? 0 : UUID_FLAG_NEW_DATAGEN, weak_nodes);
+ }
+}
+
+static void __drbd_uuid_new_current_send(struct drbd_device *device, bool forced)
+{
+ u64 weak_nodes;
+
+ down_write(&device->uuid_sem);
+ if (!__new_current_uuid_prepare(device, forced)) {
+ up_write(&device->uuid_sem);
+ return;
+ }
+ downgrade_write(&device->uuid_sem);
+ weak_nodes = drbd_weak_nodes_device(device);
+ __new_current_uuid_info(device, weak_nodes);
+ __new_current_uuid_send(device, weak_nodes, forced);
+ up_read(&device->uuid_sem);
+}
+
+static void __drbd_uuid_new_current_holding_uuid_sem(struct drbd_device *device)
+{
+ u64 weak_nodes;
+
+ if (!__new_current_uuid_prepare(device, false))
+ return;
+ weak_nodes = drbd_weak_nodes_device(device);
+ __new_current_uuid_info(device, weak_nodes);
+}
+
+static bool peer_can_fill_a_bitmap_slot(struct drbd_peer_device *peer_device)
+{
+ struct drbd_device *device = peer_device->device;
+ const bool intentional_diskless = device->device_conf.intentional_diskless;
+ const int my_node_id = device->resource->res_opts.node_id;
+ int node_id;
+
+ for (node_id = 0; node_id < DRBD_NODE_ID_MAX; node_id++) {
+ if (node_id == peer_device->node_id)
+ continue;
+ if (peer_device->bitmap_uuids[node_id] == 0) {
+ struct drbd_peer_device *p2;
+ p2 = peer_device_by_node_id(peer_device->device, node_id);
+ if (p2 && !want_bitmap(p2))
+ continue;
+
+ if (node_id == my_node_id && intentional_diskless)
+ continue;
+
+ return true;
+ }
+ }
+
+ return false;
+}
+
+static bool diskfull_peers_need_new_cur_uuid(struct drbd_device *device)
+{
+ struct drbd_peer_device *peer_device;
+ bool rv = false;
+
+ rcu_read_lock();
+ for_each_peer_device_rcu(peer_device, device) {
+ if (peer_device->connection->agreed_pro_version < 110)
+ continue;
+
+ /* Only an up-to-date peer persists a new current uuid! */
+ if (peer_device->disk_state[NOW] < D_UP_TO_DATE)
+ continue;
+ if (peer_can_fill_a_bitmap_slot(peer_device)) {
+ rv = true;
+ break;
+ }
+ }
+ rcu_read_unlock();
+
+ return rv;
+}
+
+static bool a_lost_peer_is_on_same_cur_uuid(struct drbd_device *device)
+{
+ struct drbd_peer_device *peer_device;
+ bool rv = false;
+
+ rcu_read_lock();
+ for_each_peer_device_rcu(peer_device, device) {
+ enum drbd_disk_state pdsk = peer_device->disk_state[NOW];
+
+ if (pdsk >= D_INCONSISTENT && pdsk <= D_UNKNOWN &&
+ (device->exposed_data_uuid & ~UUID_PRIMARY) ==
+ (peer_device->current_uuid & ~UUID_PRIMARY) &&
+ !(peer_device->uuid_flags & UUID_FLAG_SYNC_TARGET)) {
+ rv = true;
+ break;
+ }
+ }
+ rcu_read_unlock();
+
+ return rv;
+}
+
+/**
+ * drbd_uuid_new_current() - Creates a new current UUID
+ * @device: DRBD device.
+ * @forced: Force UUID creation
+ *
+ * Creates a new current UUID, and rotates the old current UUID into
+ * the bitmap slot. Causes an incremental resync upon next connect.
+ */
+void drbd_uuid_new_current(struct drbd_device *device, bool forced)
+{
+ if (get_ldev_if_state(device, D_UP_TO_DATE)) {
+ __drbd_uuid_new_current_send(device, forced);
+ put_ldev(device);
+ } else if (diskfull_peers_need_new_cur_uuid(device) ||
+ a_lost_peer_is_on_same_cur_uuid(device)) {
+ struct drbd_peer_device *peer_device;
+ /* The peers will store the new current UUID... */
+ u64 current_uuid, weak_nodes;
+ get_random_bytes(¤t_uuid, sizeof(u64));
+ if (device->resource->role[NOW] == R_PRIMARY)
+ current_uuid |= UUID_PRIMARY;
+ else
+ current_uuid &= ~UUID_PRIMARY;
+
+ down_write(&device->uuid_sem);
+ drbd_uuid_set_exposed(device, current_uuid, false);
+ downgrade_write(&device->uuid_sem);
+ drbd_info(device, "sending new current UUID: %016llX\n", current_uuid);
+
+ weak_nodes = drbd_weak_nodes_device(device);
+ for_each_peer_device(peer_device, device) {
+ if (peer_device->repl_state[NOW] >= L_ESTABLISHED) {
+ drbd_send_current_uuid(peer_device, current_uuid, weak_nodes);
+ peer_device->current_uuid = current_uuid;
+ }
+ }
+ up_read(&device->uuid_sem);
+ }
+}
+
+void drbd_uuid_new_current_by_user(struct drbd_device *device)
+{
+ struct drbd_peer_device *peer_device;
+
+ down_write(&device->uuid_sem);
+ for_each_peer_device(peer_device, device)
+ drbd_uuid_set_bitmap(peer_device, 0); /* Rotate UI_BITMAP to History 1, etc... */
+
+ if (get_ldev(device)) {
+ __drbd_uuid_new_current_holding_uuid_sem(device);
+ put_ldev(device);
+ }
+ up_write(&device->uuid_sem);
+}
+
+static void drbd_propagate_uuids(struct drbd_device *device, u64 nodes)
+{
+ struct drbd_peer_device *peer_device;
+
+ rcu_read_lock();
+ for_each_peer_device_rcu(peer_device, device) {
+ if (!(nodes & NODE_MASK(peer_device->node_id)))
+ continue;
+
+ if (peer_device->repl_state[NOW] < L_ESTABLISHED)
+ continue;
+
+ if (list_empty(&peer_device->propagate_uuids_work.list))
+ drbd_queue_work(&peer_device->connection->sender_work,
+ &peer_device->propagate_uuids_work);
+ }
+ rcu_read_unlock();
+}
+
+void drbd_uuid_received_new_current(struct drbd_peer_device *from_pd, u64 val, u64 weak_nodes)
+{
+ struct drbd_device *device = from_pd->device;
+ u64 dagtag = atomic64_read(&from_pd->connection->last_dagtag_sector);
+ struct drbd_peer_device *peer_device;
+ u64 recipients = 0;
+ bool set_current = true;
+
+ down_write(&device->uuid_sem);
+ spin_lock_irq(&device->ldev->md.uuid_lock);
+
+ rcu_read_lock();
+ for_each_peer_device_rcu(peer_device, device) {
+ if (peer_device->repl_state[NOW] == L_SYNC_TARGET ||
+ peer_device->repl_state[NOW] == L_BEHIND ||
+ peer_device->repl_state[NOW] == L_PAUSED_SYNC_T) {
+ peer_device->current_uuid = val;
+ set_current = false;
+ }
+ if (peer_device->repl_state[NOW] == L_WF_BITMAP_S ||
+ peer_device->repl_state[NOW] == L_SYNC_SOURCE ||
+ peer_device->repl_state[NOW] == L_PAUSED_SYNC_S ||
+ peer_device->repl_state[NOW] == L_ESTABLISHED)
+ recipients |= NODE_MASK(peer_device->node_id);
+
+ if (peer_device->disk_state[NOW] == D_DISKLESS)
+ recipients |= NODE_MASK(peer_device->node_id);
+ }
+ rcu_read_unlock();
+
+ if (set_current) {
+ u64 old_current = device->ldev->md.current_uuid;
+ u64 upd;
+
+ if (device->disk_state[NOW] == D_UP_TO_DATE)
+ recipients |= rotate_current_into_bitmap(device, weak_nodes, dagtag);
+
+ upd = ~weak_nodes; /* These nodes are connected to the primary */
+ upd &= __test_bitmap_slots(device); /* of those, I have a bitmap for */
+ __set_bitmap_slots(device, val, upd);
+ /* Setting bitmap to the (new) current-UUID, means, at this moment
+ we know that we are at the same data as this not connected peer. */
+
+ __drbd_uuid_set_current(device, val);
+
+ /* Even when the old current UUID was not used as any bitmap
+ * UUID, we still add it to the history. This is relevant, in
+ * particular, when we afterwards perform a sync handshake with
+ * a peer which is not one of the "weak_nodes", but hasn't
+ * received the new current UUID. If we do not add the current
+ * UUID to the history, we will end up with a spurious
+ * unrelated data or split-brain decision. */
+ _drbd_uuid_push_history(device, old_current);
+ }
+
+ spin_unlock_irq(&device->ldev->md.uuid_lock);
+ downgrade_write(&device->uuid_sem);
+ if (set_current)
+ drbd_propagate_uuids(device, recipients);
+ up_read(&device->uuid_sem);
+}
+
+static u64 __set_bitmap_slots(struct drbd_device *device, u64 bitmap_uuid, u64 do_nodes)
+{
+ struct drbd_peer_md *peer_md = device->ldev->md.peers;
+ u64 modified = 0;
+ int node_id;
+
+ for (node_id = 0; node_id < DRBD_NODE_ID_MAX; node_id++) {
+ if (node_id == device->ldev->md.node_id)
+ continue;
+ if (!(do_nodes & NODE_MASK(node_id)))
+ continue;
+ if (!(peer_md[node_id].flags & MDF_HAVE_BITMAP))
+ continue;
+ if (peer_md[node_id].bitmap_uuid != bitmap_uuid) {
+ u64 previous_bitmap_uuid = peer_md[node_id].bitmap_uuid;
+ /* drbd_info(device, "XXX bitmap[node_id=%d] = %llX\n", node_id, bitmap_uuid); */
+ peer_md[node_id].bitmap_uuid = bitmap_uuid;
+ peer_md[node_id].bitmap_dagtag =
+ bitmap_uuid ? device->resource->dagtag_sector : 0;
+ _drbd_uuid_push_history(device, previous_bitmap_uuid);
+ drbd_md_mark_dirty(device);
+ modified |= NODE_MASK(node_id);
+ }
+ }
+
+ return modified;
+}
+
+static u64 __test_bitmap_slots(struct drbd_device *device)
+{
+ struct drbd_peer_md *peer_md = device->ldev->md.peers;
+ int node_id;
+ u64 rv = 0;
+
+ for (node_id = 0; node_id < DRBD_NODE_ID_MAX; node_id++) {
+ if (peer_md[node_id].bitmap_uuid)
+ rv |= NODE_MASK(node_id);
+ }
+
+ return rv;
+}
+
+/* __test_bitmap_slots_of_peer() operates on view of the world I know the
+ SyncSource had. It might be that in the mean time some peers sent more
+ recent UUIDs to me. Remove all peers that are on the same UUID as I am
+ now from the set of nodes */
+static u64 __test_bitmap_slots_of_peer(struct drbd_peer_device *peer_device)
+{
+ u64 set_bitmap_slots = 0;
+ int node_id;
+
+ for (node_id = 0; node_id < DRBD_NODE_ID_MAX; node_id++) {
+ u64 bitmap_uuid = peer_device->bitmap_uuids[node_id];
+
+ if (bitmap_uuid != 0 && bitmap_uuid != -1)
+ set_bitmap_slots |= NODE_MASK(node_id);
+ }
+
+ return set_bitmap_slots;
+}
+
+static u64
+peers_with_current_uuid(struct drbd_device *device, u64 current_uuid)
+{
+ struct drbd_peer_device *peer_device;
+ u64 nodes = 0;
+
+ current_uuid &= ~UUID_PRIMARY;
+ rcu_read_lock();
+ for_each_peer_device_rcu(peer_device, device) {
+ enum drbd_disk_state peer_disk_state = peer_device->disk_state[NOW];
+ if (peer_disk_state < D_INCONSISTENT || peer_disk_state == D_UNKNOWN)
+ continue;
+ if (current_uuid == (peer_device->current_uuid & ~UUID_PRIMARY))
+ nodes |= NODE_MASK(peer_device->node_id);
+ }
+ rcu_read_unlock();
+
+ return nodes;
+}
+
+void drbd_uuid_resync_starting(struct drbd_peer_device *peer_device)
+{
+ struct drbd_device *device = peer_device->device;
+
+ peer_device->rs_start_uuid = drbd_current_uuid(device);
+ if (peer_device->uuid_flags & UUID_FLAG_CRASHED_PRIMARY)
+ set_bit(SYNC_SRC_CRASHED_PRI, &peer_device->flags);
+ rotate_current_into_bitmap(device, 0, device->resource->dagtag_sector);
+}
+
+u64 drbd_uuid_resync_finished(struct drbd_peer_device *peer_device)
+{
+ struct drbd_device *device = peer_device->device;
+ unsigned long flags;
+ int i;
+ u64 ss_nz_bm; /* sync_source has non zero bitmap for. expressed as nodemask */
+ u64 pwcu; /* peers with current uuid */
+ u64 newer;
+
+ spin_lock_irqsave(&device->ldev->md.uuid_lock, flags);
+ // Inherit history from the sync source
+ for (i = 0; i < ARRAY_SIZE(peer_device->history_uuids); i++)
+ _drbd_uuid_push_history(device, peer_device->history_uuids[i] & ~UUID_PRIMARY);
+
+ // Inherit history in bitmap UUIDs from the sync source
+ for (i = 0; i < DRBD_PEERS_MAX; i++)
+ if (peer_device->bitmap_uuids[i] != -1)
+ _drbd_uuid_push_history(device,
+ peer_device->bitmap_uuids[i] & ~UUID_PRIMARY);
+
+ ss_nz_bm = __test_bitmap_slots_of_peer(peer_device);
+ pwcu = peers_with_current_uuid(device, peer_device->current_uuid);
+
+ newer = __set_bitmap_slots(device, peer_device->rs_start_uuid, ss_nz_bm & ~pwcu);
+ __set_bitmap_slots(device, 0, ~ss_nz_bm | pwcu);
+ _drbd_uuid_push_history(device, drbd_current_uuid(device));
+ __drbd_uuid_set_current(device, peer_device->current_uuid);
+ spin_unlock_irqrestore(&device->ldev->md.uuid_lock, flags);
+
+ return newer;
+}
+
+bool drbd_uuid_set_exposed(struct drbd_device *device, u64 val, bool log)
+{
+ if ((device->exposed_data_uuid & ~UUID_PRIMARY) == (val & ~UUID_PRIMARY) ||
+ val == UUID_JUST_CREATED)
+ return false;
+
+ if (device->resource->role[NOW] == R_PRIMARY)
+ val |= UUID_PRIMARY;
+ else
+ val &= ~UUID_PRIMARY;
+
+ device->exposed_data_uuid = val;
+
+ if (log)
+ drbd_info(device, "Setting exposed data uuid: %016llX\n", (unsigned long long)val);
+
+ return true;
+}
+
+static const char *name_of_node_id(struct drbd_resource *resource, int node_id)
+{
+ /* Caller need to hold rcu_read_lock */
+ struct drbd_connection *connection = drbd_connection_by_node_id(resource, node_id);
+
+ return connection ? rcu_dereference(connection->transport.net_conf)->name : "";
}
-static int check_activity_log_stripe_size(struct drbd_device *device,
- struct meta_data_on_disk *on_disk,
- struct drbd_md *in_core)
+static void forget_bitmap(struct drbd_device *device, int node_id)
{
- u32 al_stripes = be32_to_cpu(on_disk->al_stripes);
- u32 al_stripe_size_4k = be32_to_cpu(on_disk->al_stripe_size_4k);
- u64 al_size_4k;
+ int bitmap_index = device->ldev->md.peers[node_id].bitmap_index;
+ const char *name;
- /* both not set: default to old fixed size activity log */
- if (al_stripes == 0 && al_stripe_size_4k == 0) {
- al_stripes = 1;
- al_stripe_size_4k = MD_32kB_SECT/8;
- }
+ if (_drbd_bm_total_weight(device, bitmap_index) == 0)
+ return;
- /* some paranoia plausibility checks */
+ spin_unlock_irq(&device->ldev->md.uuid_lock);
+ rcu_read_lock();
+ name = name_of_node_id(device->resource, node_id);
+ drbd_info(device, "clearing bitmap UUID and content (%lu bits) for node %d (%s)(slot %d)\n",
+ _drbd_bm_total_weight(device, bitmap_index), node_id, name, bitmap_index);
+ rcu_read_unlock();
+ drbd_suspend_io(device, WRITE_ONLY);
+ drbd_bm_lock(device, "forget_bitmap()", BM_LOCK_TEST | BM_LOCK_SET);
+ _drbd_bm_clear_many_bits(device, bitmap_index, 0, -1UL);
+ drbd_bm_unlock(device);
+ drbd_resume_io(device);
+ drbd_md_mark_dirty(device);
+ spin_lock_irq(&device->ldev->md.uuid_lock);
+}
- /* we need both values to be set */
- if (al_stripes == 0 || al_stripe_size_4k == 0)
- goto err;
+static void copy_bitmap(struct drbd_device *device, int from_id, int to_id)
+{
+ struct drbd_peer_device *peer_device = peer_device_by_node_id(device, to_id);
+ struct drbd_peer_md *peer_md = device->ldev->md.peers;
+ u64 previous_bitmap_uuid = peer_md[to_id].bitmap_uuid;
+ int from_index = peer_md[from_id].bitmap_index;
+ int to_index = peer_md[to_id].bitmap_index;
+ const char *from_name, *to_name;
- al_size_4k = (u64)al_stripes * al_stripe_size_4k;
+ peer_md[to_id].bitmap_uuid = peer_md[from_id].bitmap_uuid;
+ peer_md[to_id].bitmap_dagtag = peer_md[from_id].bitmap_dagtag;
+ _drbd_uuid_push_history(device, previous_bitmap_uuid);
- /* Upper limit of activity log area, to avoid potential overflow
- * problems in al_tr_number_to_on_disk_sector(). As right now, more
- * than 72 * 4k blocks total only increases the amount of history,
- * limiting this arbitrarily to 16 GB is not a real limitation ;-) */
- if (al_size_4k > (16 * 1024 * 1024/4))
- goto err;
+ /* Pretending that the updated UUID was sent is a hack.
+ Unfortunately Necessary to not interrupt the handshake */
+ if (peer_device && peer_device->comm_bitmap_uuid == previous_bitmap_uuid)
+ peer_device->comm_bitmap_uuid = peer_md[from_id].bitmap_uuid;
- /* Lower limit: we need at least 8 transaction slots (32kB)
- * to not break existing setups */
- if (al_size_4k < MD_32kB_SECT/8)
- goto err;
+ spin_unlock_irq(&device->ldev->md.uuid_lock);
+ rcu_read_lock();
+ from_name = name_of_node_id(device->resource, from_id);
+ to_name = name_of_node_id(device->resource, to_id);
+ drbd_info(device, "Node %d (%s) synced up to node %d (%s). copying bitmap slot %d to %d.\n",
+ to_id, to_name, from_id, from_name, from_index, to_index);
+ rcu_read_unlock();
+ drbd_suspend_io(device, WRITE_ONLY);
+ drbd_bm_lock(device, "copy_bitmap()", BM_LOCK_ALL);
+ drbd_bm_copy_slot(device, from_index, to_index);
+ drbd_bm_unlock(device);
+ drbd_resume_io(device);
+ drbd_md_mark_dirty(device);
+ spin_lock_irq(&device->ldev->md.uuid_lock);
+}
- in_core->al_stripe_size_4k = al_stripe_size_4k;
- in_core->al_stripes = al_stripes;
- in_core->al_size_4k = al_size_4k;
+static int find_node_id_by_bitmap_uuid(struct drbd_device *device, u64 bm_uuid)
+{
+ struct drbd_peer_md *peer_md = device->ldev->md.peers;
+ int node_id;
- return 0;
-err:
- drbd_err(device, "invalid activity log striping: al_stripes=%u, al_stripe_size_4k=%u\n",
- al_stripes, al_stripe_size_4k);
- return -EINVAL;
-}
-
-static int check_offsets_and_sizes(struct drbd_device *device, struct drbd_backing_dev *bdev)
-{
- sector_t capacity = drbd_get_capacity(bdev->md_bdev);
- struct drbd_md *in_core = &bdev->md;
- s32 on_disk_al_sect;
- s32 on_disk_bm_sect;
-
- /* The on-disk size of the activity log, calculated from offsets, and
- * the size of the activity log calculated from the stripe settings,
- * should match.
- * Though we could relax this a bit: it is ok, if the striped activity log
- * fits in the available on-disk activity log size.
- * Right now, that would break how resize is implemented.
- * TODO: make drbd_determine_dev_size() (and the drbdmeta tool) aware
- * of possible unused padding space in the on disk layout. */
- if (in_core->al_offset < 0) {
- if (in_core->bm_offset > in_core->al_offset)
- goto err;
- on_disk_al_sect = -in_core->al_offset;
- on_disk_bm_sect = in_core->al_offset - in_core->bm_offset;
- } else {
- if (in_core->al_offset != MD_4kB_SECT)
- goto err;
- if (in_core->bm_offset < in_core->al_offset + in_core->al_size_4k * MD_4kB_SECT)
- goto err;
+ bm_uuid &= ~UUID_PRIMARY;
- on_disk_al_sect = in_core->bm_offset - MD_4kB_SECT;
- on_disk_bm_sect = in_core->md_size_sect - in_core->bm_offset;
+ for (node_id = 0; node_id < DRBD_NODE_ID_MAX; node_id++) {
+ if ((peer_md[node_id].bitmap_uuid & ~UUID_PRIMARY) == bm_uuid &&
+ peer_md[node_id].flags & MDF_HAVE_BITMAP)
+ return node_id;
}
- /* old fixed size meta data is exactly that: fixed. */
- if (in_core->meta_dev_idx >= 0) {
- if (in_core->md_size_sect != MD_128MB_SECT
- || in_core->al_offset != MD_4kB_SECT
- || in_core->bm_offset != MD_4kB_SECT + MD_32kB_SECT
- || in_core->al_stripes != 1
- || in_core->al_stripe_size_4k != MD_32kB_SECT/8)
- goto err;
+ for (node_id = 0; node_id < DRBD_NODE_ID_MAX; node_id++) {
+ if ((peer_md[node_id].bitmap_uuid & ~UUID_PRIMARY) == bm_uuid)
+ return node_id;
}
- if (capacity < in_core->md_size_sect)
- goto err;
- if (capacity - in_core->md_size_sect < drbd_md_first_sector(bdev))
- goto err;
-
- /* should be aligned, and at least 32k */
- if ((on_disk_al_sect & 7) || (on_disk_al_sect < MD_32kB_SECT))
- goto err;
-
- /* should fit (for now: exactly) into the available on-disk space;
- * overflow prevention is in check_activity_log_stripe_size() above. */
- if (on_disk_al_sect != in_core->al_size_4k * MD_4kB_SECT)
- goto err;
-
- /* again, should be aligned */
- if (in_core->bm_offset & 7)
- goto err;
+ return -1;
+}
- /* FIXME check for device grow with flex external meta data? */
+static bool node_connected(struct drbd_resource *resource, int node_id)
+{
+ struct drbd_connection *connection;
+ bool r = false;
- /* can the available bitmap space cover the last agreed device size? */
- if (on_disk_bm_sect < (in_core->la_size_sect+7)/MD_4kB_SECT/8/512)
- goto err;
+ rcu_read_lock();
+ connection = drbd_connection_by_node_id(resource, node_id);
+ if (connection)
+ r = connection->cstate[NOW] == C_CONNECTED;
+ rcu_read_unlock();
- return 0;
+ return r;
+}
-err:
- drbd_err(device, "meta data offsets don't make sense: idx=%d "
- "al_s=%u, al_sz4k=%u, al_offset=%d, bm_offset=%d, "
- "md_size_sect=%u, la_size=%llu, md_capacity=%llu\n",
- in_core->meta_dev_idx,
- in_core->al_stripes, in_core->al_stripe_size_4k,
- in_core->al_offset, in_core->bm_offset, in_core->md_size_sect,
- (unsigned long long)in_core->la_size_sect,
- (unsigned long long)capacity);
+static bool detect_copy_ops_on_peer(struct drbd_peer_device *peer_device)
+{
+ struct drbd_device *device = peer_device->device;
+ struct drbd_peer_md *peer_md = device->ldev->md.peers;
+ struct drbd_resource *resource = device->resource;
+ int node_id1, node_id2, from_id;
+ u64 peer_bm_uuid;
+ bool modified = false;
- return -EINVAL;
-}
+ for (node_id1 = 0; node_id1 < DRBD_NODE_ID_MAX; node_id1++) {
+ if (device->ldev->md.peers[node_id1].bitmap_index == -1)
+ continue;
+ if (node_connected(resource, node_id1))
+ continue;
-/**
- * drbd_md_read() - Reads in the meta data super block
- * @device: DRBD device.
- * @bdev: Device from which the meta data should be read in.
- *
- * Return NO_ERROR on success, and an enum drbd_ret_code in case
- * something goes wrong.
- *
- * Called exactly once during drbd_adm_attach(), while still being D_DISKLESS,
- * even before @bdev is assigned to @device->ldev.
- */
-int drbd_md_read(struct drbd_device *device, struct drbd_backing_dev *bdev)
-{
- struct meta_data_on_disk *buffer;
- u32 magic, flags;
- int i, rv = NO_ERROR;
+ peer_bm_uuid = peer_device->bitmap_uuids[node_id1];
+ if (peer_bm_uuid == 0 || peer_bm_uuid == -1ULL)
+ continue;
- if (device->state.disk != D_DISKLESS)
- return ERR_DISK_CONFIGURED;
+ peer_bm_uuid &= ~UUID_PRIMARY;
+ for (node_id2 = node_id1 + 1; node_id2 < DRBD_NODE_ID_MAX; node_id2++) {
+ if (device->ldev->md.peers[node_id2].bitmap_index == -1)
+ continue;
- buffer = drbd_md_get_buffer(device, __func__);
- if (!buffer)
- return ERR_NOMEM;
+ if (node_connected(resource, node_id2))
+ continue;
- /* First, figure out where our meta data superblock is located,
- * and read it. */
- bdev->md.meta_dev_idx = bdev->disk_conf->meta_dev_idx;
- bdev->md.md_offset = drbd_md_ss(bdev);
- /* Even for (flexible or indexed) external meta data,
- * initially restrict us to the 4k superblock for now.
- * Affects the paranoia out-of-range access check in drbd_md_sync_page_io(). */
- bdev->md.md_size_sect = 8;
-
- if (drbd_md_sync_page_io(device, bdev, bdev->md.md_offset,
- REQ_OP_READ)) {
- /* NOTE: can't do normal error processing here as this is
- called BEFORE disk is attached */
- drbd_err(device, "Error while reading metadata.\n");
- rv = ERR_IO_MD_DISK;
- goto err;
- }
-
- magic = be32_to_cpu(buffer->magic);
- flags = be32_to_cpu(buffer->flags);
- if (magic == DRBD_MD_MAGIC_84_UNCLEAN ||
- (magic == DRBD_MD_MAGIC_08 && !(flags & MDF_AL_CLEAN))) {
- /* btw: that's Activity Log clean, not "all" clean. */
- drbd_err(device, "Found unclean meta data. Did you \"drbdadm apply-al\"?\n");
- rv = ERR_MD_UNCLEAN;
- goto err;
- }
-
- rv = ERR_MD_INVALID;
- if (magic != DRBD_MD_MAGIC_08) {
- if (magic == DRBD_MD_MAGIC_07)
- drbd_err(device, "Found old (0.7) meta data magic. Did you \"drbdadm create-md\"?\n");
- else
- drbd_err(device, "Meta data magic not found. Did you \"drbdadm create-md\"?\n");
- goto err;
+ if (peer_bm_uuid == (peer_device->bitmap_uuids[node_id2] & ~UUID_PRIMARY))
+ goto found;
+ }
}
+ return false;
- if (be32_to_cpu(buffer->bm_bytes_per_bit) != BM_BLOCK_SIZE) {
- drbd_err(device, "unexpected bm_bytes_per_bit: %u (expected %u)\n",
- be32_to_cpu(buffer->bm_bytes_per_bit), BM_BLOCK_SIZE);
- goto err;
+found:
+ from_id = find_node_id_by_bitmap_uuid(device, peer_bm_uuid);
+ if (from_id == -1) {
+ if (peer_md[node_id1].bitmap_uuid == 0 && peer_md[node_id2].bitmap_uuid == 0)
+ return false;
+ drbd_err(peer_device, "unexpected\n");
+ drbd_err(peer_device, "In UUIDs from node %d found equal UUID (%llX) for nodes %d %d\n",
+ peer_device->node_id, peer_bm_uuid, node_id1, node_id2);
+ drbd_err(peer_device, "I have %llX for node_id=%d\n",
+ peer_md[node_id1].bitmap_uuid, node_id1);
+ drbd_err(peer_device, "I have %llX for node_id=%d\n",
+ peer_md[node_id2].bitmap_uuid, node_id2);
+ return false;
}
+ if (!(peer_md[from_id].flags & MDF_HAVE_BITMAP))
+ return false;
- /* convert to in_core endian */
- bdev->md.la_size_sect = be64_to_cpu(buffer->la_size_sect);
- for (i = UI_CURRENT; i < UI_SIZE; i++)
- bdev->md.uuid[i] = be64_to_cpu(buffer->uuid[i]);
- bdev->md.flags = be32_to_cpu(buffer->flags);
- bdev->md.device_uuid = be64_to_cpu(buffer->device_uuid);
-
- bdev->md.md_size_sect = be32_to_cpu(buffer->md_size_sect);
- bdev->md.al_offset = be32_to_cpu(buffer->al_offset);
- bdev->md.bm_offset = be32_to_cpu(buffer->bm_offset);
-
- if (check_activity_log_stripe_size(device, buffer, &bdev->md))
- goto err;
- if (check_offsets_and_sizes(device, bdev))
- goto err;
+ if (from_id != node_id1 &&
+ peer_md[node_id1].bitmap_uuid != peer_bm_uuid) {
+ copy_bitmap(device, from_id, node_id1);
+ modified = true;
- if (be32_to_cpu(buffer->bm_offset) != bdev->md.bm_offset) {
- drbd_err(device, "unexpected bm_offset: %d (expected %d)\n",
- be32_to_cpu(buffer->bm_offset), bdev->md.bm_offset);
- goto err;
- }
- if (be32_to_cpu(buffer->md_size_sect) != bdev->md.md_size_sect) {
- drbd_err(device, "unexpected md_size: %u (expected %u)\n",
- be32_to_cpu(buffer->md_size_sect), bdev->md.md_size_sect);
- goto err;
}
-
- rv = NO_ERROR;
-
- spin_lock_irq(&device->resource->req_lock);
- if (device->state.conn < C_CONNECTED) {
- unsigned int peer;
- peer = be32_to_cpu(buffer->la_peer_max_bio_size);
- peer = max(peer, DRBD_MAX_BIO_SIZE_SAFE);
- device->peer_max_bio_size = peer;
+ if (from_id != node_id2 &&
+ peer_md[node_id2].bitmap_uuid != peer_bm_uuid) {
+ copy_bitmap(device, from_id, node_id2);
+ modified = true;
}
- spin_unlock_irq(&device->resource->req_lock);
- err:
- drbd_md_put_buffer(device);
-
- return rv;
+ return modified;
}
-/**
- * drbd_md_mark_dirty() - Mark meta data super block as dirty
- * @device: DRBD device.
- *
- * Call this function if you change anything that should be written to
- * the meta-data super block. This function sets MD_DIRTY, and starts a
- * timer that ensures that within five seconds you have to call drbd_md_sync().
- */
-void drbd_md_mark_dirty(struct drbd_device *device)
+void drbd_uuid_detect_finished_resyncs(struct drbd_peer_device *peer_device)
{
- if (!test_and_set_bit(MD_DIRTY, &device->flags))
- mod_timer(&device->md_sync_timer, jiffies + 5*HZ);
-}
+ u64 peer_current_uuid = peer_device->current_uuid & ~UUID_PRIMARY;
+ struct drbd_device *device = peer_device->device;
+ struct drbd_peer_md *peer_md = device->ldev->md.peers;
+ const int my_node_id = device->resource->res_opts.node_id;
+ bool write_bm = false;
+ bool filled = false;
+ bool current_equal;
+ int node_id;
-void drbd_uuid_move_history(struct drbd_device *device) __must_hold(local)
-{
- int i;
+ current_equal = peer_current_uuid == (drbd_resolved_uuid(peer_device, NULL) & ~UUID_PRIMARY) &&
+ !(peer_device->uuid_flags & UUID_FLAG_SYNC_TARGET) &&
+ !(peer_device->comm_uuid_flags & UUID_FLAG_SYNC_TARGET);
- for (i = UI_HISTORY_START; i < UI_HISTORY_END; i++)
- device->ldev->md.uuid[i+1] = device->ldev->md.uuid[i];
-}
+ spin_lock_irq(&device->ldev->md.uuid_lock);
-void __drbd_uuid_set(struct drbd_device *device, int idx, u64 val) __must_hold(local)
-{
- if (idx == UI_CURRENT) {
- if (device->state.role == R_PRIMARY)
- val |= 1;
- else
- val &= ~((u64)1);
+ if (peer_device->repl_state[NOW] == L_OFF && current_equal) {
+ u64 bm_to_peer = peer_device->comm_bitmap_uuid & ~UUID_PRIMARY;
+ u64 bm_towards_me = peer_device->bitmap_uuids[my_node_id] & ~UUID_PRIMARY;
- drbd_set_ed_uuid(device, val);
+ if (bm_towards_me != 0 && bm_to_peer == 0 &&
+ bm_towards_me != peer_current_uuid) {
+ if (peer_device->comm_bm_set == 0 && peer_device->dirty_bits == 0) {
+ drbd_info(peer_device, "Peer missed end of resync, 0 to sync\n");
+ if (peer_device->connection->agreed_pro_version < 124)
+ set_bit(RS_PEER_MISSED_END, &peer_device->flags);
+ } else {
+ drbd_info(peer_device, "Peer missed end of resync\n");
+ set_bit(RS_PEER_MISSED_END, &peer_device->flags);
+ }
+ }
+ if (bm_towards_me == 0 && bm_to_peer != 0 &&
+ bm_to_peer != peer_current_uuid) {
+ if (peer_device->comm_bm_set == 0 && peer_device->dirty_bits == 0) {
+ int peer_node_id = peer_device->node_id;
+ u64 previous = peer_md[peer_node_id].bitmap_uuid;
+
+ drbd_info(peer_device,
+ "Missed end of resync as sync-source, no bits to sync\n");
+ peer_md[peer_node_id].bitmap_uuid = 0;
+ _drbd_uuid_push_history(device, previous);
+ peer_device->comm_bitmap_uuid = 0;
+ drbd_md_mark_dirty(device);
+ if (peer_device->connection->agreed_pro_version < 124)
+ set_bit(RS_SOURCE_MISSED_END, &peer_device->flags);
+ } else {
+ drbd_info(peer_device, "Missed end of resync as sync-source\n");
+ set_bit(RS_SOURCE_MISSED_END, &peer_device->flags);
+ }
+ }
+ spin_unlock_irq(&device->ldev->md.uuid_lock);
+ return;
}
- device->ldev->md.uuid[idx] = val;
- drbd_md_mark_dirty(device);
-}
-
-void _drbd_uuid_set(struct drbd_device *device, int idx, u64 val) __must_hold(local)
-{
- unsigned long flags;
- spin_lock_irqsave(&device->ldev->md.uuid_lock, flags);
- __drbd_uuid_set(device, idx, val);
- spin_unlock_irqrestore(&device->ldev->md.uuid_lock, flags);
-}
+ for (node_id = 0; node_id < DRBD_NODE_ID_MAX; node_id++) {
+ struct drbd_peer_device *pd2;
-void drbd_uuid_set(struct drbd_device *device, int idx, u64 val) __must_hold(local)
-{
- unsigned long flags;
- spin_lock_irqsave(&device->ldev->md.uuid_lock, flags);
- if (device->ldev->md.uuid[idx]) {
- drbd_uuid_move_history(device);
- device->ldev->md.uuid[UI_HISTORY_START] = device->ldev->md.uuid[idx];
- }
- __drbd_uuid_set(device, idx, val);
- spin_unlock_irqrestore(&device->ldev->md.uuid_lock, flags);
-}
+ if (node_id == device->ldev->md.node_id)
+ continue;
-/**
- * drbd_uuid_new_current() - Creates a new current UUID
- * @device: DRBD device.
- *
- * Creates a new current UUID, and rotates the old current UUID into
- * the bitmap slot. Causes an incremental resync upon next connect.
- */
-void drbd_uuid_new_current(struct drbd_device *device) __must_hold(local)
-{
- u64 val;
- unsigned long long bm_uuid;
+ if (!(peer_md[node_id].flags & MDF_HAVE_BITMAP) && !(peer_md[node_id].flags & MDF_NODE_EXISTS))
+ continue;
- get_random_bytes(&val, sizeof(u64));
+ pd2 = peer_device_by_node_id(device, node_id);
+ if (pd2 && pd2 != peer_device && pd2->repl_state[NOW] > L_ESTABLISHED)
+ continue;
- spin_lock_irq(&device->ldev->md.uuid_lock);
- bm_uuid = device->ldev->md.uuid[UI_BITMAP];
+ if (peer_device->bitmap_uuids[node_id] == 0 && peer_md[node_id].bitmap_uuid != 0) {
+ int from_node_id;
+
+ if (current_equal) {
+ u64 previous_bitmap_uuid = peer_md[node_id].bitmap_uuid;
+ peer_md[node_id].bitmap_uuid = 0;
+ _drbd_uuid_push_history(device, previous_bitmap_uuid);
+ if (node_id == peer_device->node_id)
+ drbd_print_uuids(peer_device, "updated UUIDs");
+ else if (peer_md[node_id].flags & MDF_HAVE_BITMAP)
+ forget_bitmap(device, node_id);
+ else
+ drbd_info(device, "Clearing bitmap UUID for node %d\n",
+ node_id);
+ drbd_md_mark_dirty(device);
+ write_bm = true;
+ }
- if (bm_uuid)
- drbd_warn(device, "bm UUID was already set: %llX\n", bm_uuid);
+ from_node_id = find_node_id_by_bitmap_uuid(device, peer_current_uuid);
+ if (from_node_id != -1 && node_id != from_node_id &&
+ dagtag_newer(peer_md[from_node_id].bitmap_dagtag,
+ peer_md[node_id].bitmap_dagtag)) {
+ if (peer_md[node_id].flags & MDF_HAVE_BITMAP &&
+ peer_md[from_node_id].flags & MDF_HAVE_BITMAP)
+ copy_bitmap(device, from_node_id, node_id);
+ else
+ drbd_info(device, "Node %d synced up to node %d.\n",
+ node_id, from_node_id);
+ drbd_md_mark_dirty(device);
+ filled = true;
+ }
+ }
+ }
- device->ldev->md.uuid[UI_BITMAP] = device->ldev->md.uuid[UI_CURRENT];
- __drbd_uuid_set(device, UI_CURRENT, val);
+ write_bm |= detect_copy_ops_on_peer(peer_device);
spin_unlock_irq(&device->ldev->md.uuid_lock);
- drbd_print_uuids(device, "new current UUID");
- /* get it to stable storage _now_ */
- drbd_md_sync(device);
+ if (write_bm || filled) {
+ u64 to_nodes = filled ? -1 : ~NODE_MASK(peer_device->node_id);
+ drbd_propagate_uuids(device, to_nodes);
+ drbd_suspend_io(device, WRITE_ONLY);
+ drbd_bm_lock(device, "detect_finished_resyncs()", BM_LOCK_BULK);
+ drbd_bm_write(device, NULL);
+ drbd_bm_unlock(device);
+ drbd_resume_io(device);
+ }
}
-void drbd_uuid_set_bm(struct drbd_device *device, u64 val) __must_hold(local)
+int drbd_bmio_set_all_n_write(struct drbd_device *device,
+ struct drbd_peer_device *peer_device)
{
- unsigned long flags;
- spin_lock_irqsave(&device->ldev->md.uuid_lock, flags);
- if (device->ldev->md.uuid[UI_BITMAP] == 0 && val == 0) {
- spin_unlock_irqrestore(&device->ldev->md.uuid_lock, flags);
- return;
- }
-
- if (val == 0) {
- drbd_uuid_move_history(device);
- device->ldev->md.uuid[UI_HISTORY_START] = device->ldev->md.uuid[UI_BITMAP];
- device->ldev->md.uuid[UI_BITMAP] = 0;
- } else {
- unsigned long long bm_uuid = device->ldev->md.uuid[UI_BITMAP];
- if (bm_uuid)
- drbd_warn(device, "bm UUID was already set: %llX\n", bm_uuid);
-
- device->ldev->md.uuid[UI_BITMAP] = val & ~((u64)1);
- }
- spin_unlock_irqrestore(&device->ldev->md.uuid_lock, flags);
-
- drbd_md_mark_dirty(device);
+ drbd_bm_set_all(device);
+ return drbd_bm_write(device, NULL);
}
/**
@@ -3384,22 +5617,21 @@ void drbd_uuid_set_bm(struct drbd_device *device, u64 val) __must_hold(local)
* @device: DRBD device.
* @peer_device: Peer DRBD device.
*
- * Sets all bits in the bitmap and writes the whole bitmap to stable storage.
+ * Sets all bits in the bitmap towards one peer and writes the whole bitmap to stable storage.
*/
int drbd_bmio_set_n_write(struct drbd_device *device,
- struct drbd_peer_device *peer_device) __must_hold(local)
-
+ struct drbd_peer_device *peer_device)
{
int rv = -EIO;
- drbd_md_set_flag(device, MDF_FULL_SYNC);
+ drbd_md_set_peer_flag(peer_device, MDF_PEER_FULL_SYNC);
drbd_md_sync(device);
- drbd_bm_set_all(device);
+ drbd_bm_set_many_bits(peer_device, 0, -1UL);
- rv = drbd_bm_write(device, peer_device);
+ rv = drbd_bm_write(device, NULL);
if (!rv) {
- drbd_md_clear_flag(device, MDF_FULL_SYNC);
+ drbd_md_clear_peer_flag(peer_device, MDF_PEER_FULL_SYNC);
drbd_md_sync(device);
}
@@ -3407,67 +5639,109 @@ int drbd_bmio_set_n_write(struct drbd_device *device,
}
/**
- * drbd_bmio_clear_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
+ * drbd_bmio_set_allocated_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
+ * @device: DRBD device.
+ * @peer_device: parameter ignored
+ *
+ * Sets all bits in all allocated bitmap slots and writes it to stable storage.
+ */
+int drbd_bmio_set_allocated_n_write(struct drbd_device *device,
+ struct drbd_peer_device *peer_device)
+{
+ const int my_node_id = device->resource->res_opts.node_id;
+ struct drbd_md *md = &device->ldev->md;
+ int rv = -EIO;
+ int node_id, bitmap_index;
+
+ for (node_id = 0; node_id < DRBD_NODE_ID_MAX; node_id++) {
+ if (node_id == my_node_id)
+ continue;
+ bitmap_index = md->peers[node_id].bitmap_index;
+ if (bitmap_index == -1)
+ continue;
+ _drbd_bm_set_many_bits(device, bitmap_index, 0, -1UL);
+ }
+ rv = drbd_bm_write(device, NULL);
+
+ return rv;
+}
+
+/**
+ * drbd_bmio_clear_all_n_write() - io_fn for drbd_queue_bitmap_io() or drbd_bitmap_io()
* @device: DRBD device.
* @peer_device: Peer DRBD device.
*
* Clears all bits in the bitmap and writes the whole bitmap to stable storage.
*/
-int drbd_bmio_clear_n_write(struct drbd_device *device,
- struct drbd_peer_device *peer_device) __must_hold(local)
-
+int drbd_bmio_clear_all_n_write(struct drbd_device *device,
+ struct drbd_peer_device *peer_device)
{
drbd_resume_al(device);
drbd_bm_clear_all(device);
- return drbd_bm_write(device, peer_device);
+ return drbd_bm_write(device, NULL);
+}
+
+int drbd_bmio_clear_one_peer(struct drbd_device *device,
+ struct drbd_peer_device *peer_device)
+{
+ drbd_bm_clear_many_bits(peer_device, 0, -1UL);
+ return drbd_bm_write(device, NULL);
}
static int w_bitmap_io(struct drbd_work *w, int unused)
{
- struct drbd_device *device =
- container_of(w, struct drbd_device, bm_io_work.w);
- struct bm_io_work *work = &device->bm_io_work;
+ struct bm_io_work *work =
+ container_of(w, struct bm_io_work, w);
+ struct drbd_device *device = work->device;
int rv = -EIO;
- if (work->flags != BM_LOCKED_CHANGE_ALLOWED) {
- int cnt = atomic_read(&device->ap_bio_cnt);
- if (cnt)
- drbd_err(device, "FIXME: ap_bio_cnt %d, expected 0; queued for '%s'\n",
- cnt, work->why);
- }
-
if (get_ldev(device)) {
- drbd_bm_lock(device, work->why, work->flags);
+ if (work->flags & BM_LOCK_SINGLE_SLOT)
+ drbd_bm_slot_lock(work->peer_device, work->why, work->flags);
+ else
+ drbd_bm_lock(device, work->why, work->flags);
rv = work->io_fn(device, work->peer_device);
- drbd_bm_unlock(device);
+ if (work->flags & BM_LOCK_SINGLE_SLOT)
+ drbd_bm_slot_unlock(work->peer_device);
+ else
+ drbd_bm_unlock(device);
put_ldev(device);
}
- clear_bit_unlock(BITMAP_IO, &device->flags);
- wake_up(&device->misc_wait);
-
if (work->done)
- work->done(device, rv);
+ work->done(device, work->peer_device, rv);
- clear_bit(BITMAP_IO_QUEUED, &device->flags);
- work->why = NULL;
- work->flags = 0;
+ if (atomic_dec_and_test(&device->pending_bitmap_work.n))
+ wake_up(&device->misc_wait);
+ kfree(work);
return 0;
}
+void drbd_queue_pending_bitmap_work(struct drbd_device *device)
+{
+ unsigned long flags;
+
+ spin_lock_irqsave(&device->pending_bitmap_work.q_lock, flags);
+ spin_lock(&device->resource->work.q_lock);
+ list_splice_tail_init(&device->pending_bitmap_work.q, &device->resource->work.q);
+ spin_unlock(&device->resource->work.q_lock);
+ spin_unlock_irqrestore(&device->pending_bitmap_work.q_lock, flags);
+ wake_up(&device->resource->work.q_wait);
+}
+
/**
* drbd_queue_bitmap_io() - Queues an IO operation on the whole bitmap
* @device: DRBD device.
* @io_fn: IO callback to be called when bitmap IO is possible
* @done: callback to be called after the bitmap IO was performed
* @why: Descriptive text of the reason for doing the IO
- * @flags: Bitmap flags
+ * @flags: Bitmap operation flags
* @peer_device: Peer DRBD device.
*
* While IO on the bitmap happens we freeze application IO thus we ensure
* that drbd_set_out_of_sync() can not be called. This function MAY ONLY be
- * called from worker context. It MUST NOT be used while a previous such
+ * called from sender context. It MUST NOT be used while a previous such
* work is still pending!
*
* Its worker function encloses the call of io_fn() by get_ldev() and
@@ -3475,35 +5749,63 @@ static int w_bitmap_io(struct drbd_work *w, int unused)
*/
void drbd_queue_bitmap_io(struct drbd_device *device,
int (*io_fn)(struct drbd_device *, struct drbd_peer_device *),
- void (*done)(struct drbd_device *, int),
+ void (*done)(struct drbd_device *, struct drbd_peer_device *, int),
char *why, enum bm_flag flags,
struct drbd_peer_device *peer_device)
{
- D_ASSERT(device, current == peer_device->connection->worker.task);
-
- D_ASSERT(device, !test_bit(BITMAP_IO_QUEUED, &device->flags));
- D_ASSERT(device, !test_bit(BITMAP_IO, &device->flags));
- D_ASSERT(device, list_empty(&device->bm_io_work.w.list));
- if (device->bm_io_work.why)
- drbd_err(device, "FIXME going to queue '%s' but '%s' still pending?\n",
- why, device->bm_io_work.why);
-
- device->bm_io_work.peer_device = peer_device;
- device->bm_io_work.io_fn = io_fn;
- device->bm_io_work.done = done;
- device->bm_io_work.why = why;
- device->bm_io_work.flags = flags;
-
- spin_lock_irq(&device->resource->req_lock);
- set_bit(BITMAP_IO, &device->flags);
- /* don't wait for pending application IO if the caller indicates that
- * application IO does not conflict anyways. */
- if (flags == BM_LOCKED_CHANGE_ALLOWED || atomic_read(&device->ap_bio_cnt) == 0) {
- if (!test_and_set_bit(BITMAP_IO_QUEUED, &device->flags))
- drbd_queue_work(&peer_device->connection->sender_work,
- &device->bm_io_work.w);
+ struct bm_io_work *bm_io_work;
+
+ D_ASSERT(device, current == device->resource->worker.task);
+
+ bm_io_work = kmalloc_obj(*bm_io_work, GFP_NOIO);
+ if (!bm_io_work) {
+ if (done)
+ done(device, peer_device, -ENOMEM);
+ return;
}
- spin_unlock_irq(&device->resource->req_lock);
+ bm_io_work->w.cb = w_bitmap_io;
+ bm_io_work->device = device;
+ bm_io_work->peer_device = peer_device;
+ bm_io_work->io_fn = io_fn;
+ bm_io_work->done = done;
+ bm_io_work->why = why;
+ bm_io_work->flags = flags;
+
+ /*
+ * Whole-bitmap operations can only take place when there is no
+ * concurrent application I/O. We ensure exclusion between the two
+ * types of I/O with the following mechanism:
+ *
+ * - device->ap_bio_cnt keeps track of the number of application I/O
+ * requests in progress.
+ *
+ * - A non-empty device->pending_bitmap_work list indicates that
+ * whole-bitmap I/O operations are pending, and no new application
+ * I/O should be started. We make sure that the list doesn't appear
+ * empty system wide before trying to queue the whole-bitmap I/O.
+ *
+ * - In dec_ap_bio(), we decrement device->ap_bio_cnt. If it reaches
+ * zero and the device->pending_bitmap_work list is non-empty, we
+ * queue the whole-bitmap operations.
+ *
+ * - In inc_ap_bio(), we increment device->ap_bio_cnt before checking
+ * if the device->pending_bitmap_work list is non-empty. If
+ * device->pending_bitmap_work is non-empty, we immediately call
+ * dec_ap_bio().
+ *
+ * This ensures that whenever there is pending whole-bitmap I/O, we
+ * realize in dec_ap_bio().
+ *
+ */
+
+ /* no one should accidentally schedule the next bitmap IO
+ * when it is only half-queued yet */
+ atomic_inc(&device->ap_bio_cnt[WRITE]);
+ atomic_inc(&device->pending_bitmap_work.n);
+ spin_lock_irq(&device->pending_bitmap_work.q_lock);
+ list_add_tail(&bm_io_work->w.list, &device->pending_bitmap_work.q);
+ spin_unlock_irq(&device->pending_bitmap_work.q_lock);
+ dec_ap_bio(device, WRITE); /* may move to actual work queue */
}
/**
@@ -3511,11 +5813,11 @@ void drbd_queue_bitmap_io(struct drbd_device *device,
* @device: DRBD device.
* @io_fn: IO callback to be called when bitmap IO is possible
* @why: Descriptive text of the reason for doing the IO
- * @flags: Bitmap flags
+ * @flags: Bitmap operation flags
* @peer_device: Peer DRBD device.
*
* freezes application IO while that the actual IO operations runs. This
- * functions MAY NOT be called from worker context.
+ * functions MAY NOT be called from sender context.
*/
int drbd_bitmap_io(struct drbd_device *device,
int (*io_fn)(struct drbd_device *, struct drbd_peer_device *),
@@ -3523,17 +5825,28 @@ int drbd_bitmap_io(struct drbd_device *device,
struct drbd_peer_device *peer_device)
{
/* Only suspend io, if some operation is supposed to be locked out */
- const bool do_suspend_io = flags & (BM_DONT_CLEAR|BM_DONT_SET|BM_DONT_TEST);
+ const bool do_suspend_io = flags & (BM_LOCK_CLEAR|BM_LOCK_SET|BM_LOCK_TEST);
int rv;
- D_ASSERT(device, current != first_peer_device(device)->connection->worker.task);
+ D_ASSERT(device, current != device->resource->worker.task);
+
+ if (!device->bitmap)
+ return 0;
if (do_suspend_io)
- drbd_suspend_io(device);
+ drbd_suspend_io(device, WRITE_ONLY);
+
+ if (flags & BM_LOCK_SINGLE_SLOT)
+ drbd_bm_slot_lock(peer_device, why, flags);
+ else
+ drbd_bm_lock(device, why, flags);
- drbd_bm_lock(device, why, flags);
rv = io_fn(device, peer_device);
- drbd_bm_unlock(device);
+
+ if (flags & BM_LOCK_SINGLE_SLOT)
+ drbd_bm_slot_unlock(peer_device);
+ else
+ drbd_bm_unlock(device);
if (do_suspend_io)
drbd_resume_io(device);
@@ -3541,142 +5854,52 @@ int drbd_bitmap_io(struct drbd_device *device,
return rv;
}
-void drbd_md_set_flag(struct drbd_device *device, int flag) __must_hold(local)
+void drbd_md_set_peer_flag(struct drbd_peer_device *peer_device,
+ enum mdf_peer_flag flag)
{
- if ((device->ldev->md.flags & flag) != flag) {
+ struct drbd_device *device = peer_device->device;
+ struct drbd_md *md = &device->ldev->md;
+
+ if (!(md->peers[peer_device->node_id].flags & flag)) {
drbd_md_mark_dirty(device);
- device->ldev->md.flags |= flag;
+ md->peers[peer_device->node_id].flags |= flag;
}
}
-void drbd_md_clear_flag(struct drbd_device *device, int flag) __must_hold(local)
+void drbd_md_clear_peer_flag(struct drbd_peer_device *peer_device,
+ enum mdf_peer_flag flag)
{
- if ((device->ldev->md.flags & flag) != 0) {
+ struct drbd_device *device = peer_device->device;
+ struct drbd_md *md = &device->ldev->md;
+
+ if (md->peers[peer_device->node_id].flags & flag) {
drbd_md_mark_dirty(device);
- device->ldev->md.flags &= ~flag;
+ md->peers[peer_device->node_id].flags &= ~flag;
}
}
-int drbd_md_test_flag(struct drbd_backing_dev *bdev, int flag)
+
+int drbd_md_test_flag(struct drbd_backing_dev *bdev, enum mdf_flag flag)
{
return (bdev->md.flags & flag) != 0;
}
-static void md_sync_timer_fn(struct timer_list *t)
+bool drbd_md_test_peer_flag(struct drbd_peer_device *peer_device, enum mdf_peer_flag flag)
{
- struct drbd_device *device = timer_container_of(device, t,
- md_sync_timer);
- drbd_device_post_work(device, MD_SYNC);
-}
+ struct drbd_md *md = &peer_device->device->ldev->md;
-const char *cmdname(enum drbd_packet cmd)
-{
- /* THINK may need to become several global tables
- * when we want to support more than
- * one PRO_VERSION */
- static const char *cmdnames[] = {
-
- [P_DATA] = "Data",
- [P_DATA_REPLY] = "DataReply",
- [P_RS_DATA_REPLY] = "RSDataReply",
- [P_BARRIER] = "Barrier",
- [P_BITMAP] = "ReportBitMap",
- [P_BECOME_SYNC_TARGET] = "BecomeSyncTarget",
- [P_BECOME_SYNC_SOURCE] = "BecomeSyncSource",
- [P_UNPLUG_REMOTE] = "UnplugRemote",
- [P_DATA_REQUEST] = "DataRequest",
- [P_RS_DATA_REQUEST] = "RSDataRequest",
- [P_SYNC_PARAM] = "SyncParam",
- [P_PROTOCOL] = "ReportProtocol",
- [P_UUIDS] = "ReportUUIDs",
- [P_SIZES] = "ReportSizes",
- [P_STATE] = "ReportState",
- [P_SYNC_UUID] = "ReportSyncUUID",
- [P_AUTH_CHALLENGE] = "AuthChallenge",
- [P_AUTH_RESPONSE] = "AuthResponse",
- [P_STATE_CHG_REQ] = "StateChgRequest",
- [P_PING] = "Ping",
- [P_PING_ACK] = "PingAck",
- [P_RECV_ACK] = "RecvAck",
- [P_WRITE_ACK] = "WriteAck",
- [P_RS_WRITE_ACK] = "RSWriteAck",
- [P_SUPERSEDED] = "Superseded",
- [P_NEG_ACK] = "NegAck",
- [P_NEG_DREPLY] = "NegDReply",
- [P_NEG_RS_DREPLY] = "NegRSDReply",
- [P_BARRIER_ACK] = "BarrierAck",
- [P_STATE_CHG_REPLY] = "StateChgReply",
- [P_OV_REQUEST] = "OVRequest",
- [P_OV_REPLY] = "OVReply",
- [P_OV_RESULT] = "OVResult",
- [P_CSUM_RS_REQUEST] = "CsumRSRequest",
- [P_RS_IS_IN_SYNC] = "CsumRSIsInSync",
- [P_SYNC_PARAM89] = "SyncParam89",
- [P_COMPRESSED_BITMAP] = "CBitmap",
- [P_DELAY_PROBE] = "DelayProbe",
- [P_OUT_OF_SYNC] = "OutOfSync",
- [P_RS_CANCEL] = "RSCancel",
- [P_CONN_ST_CHG_REQ] = "conn_st_chg_req",
- [P_CONN_ST_CHG_REPLY] = "conn_st_chg_reply",
- [P_PROTOCOL_UPDATE] = "protocol_update",
- [P_TRIM] = "Trim",
- [P_RS_THIN_REQ] = "rs_thin_req",
- [P_RS_DEALLOCATED] = "rs_deallocated",
- [P_WSAME] = "WriteSame",
- [P_ZEROES] = "Zeroes",
-
- /* enum drbd_packet, but not commands - obsoleted flags:
- * P_MAY_IGNORE
- * P_MAX_OPT_CMD
- */
- };
+ if (peer_device->bitmap_index == -1)
+ return false;
- /* too big for the array: 0xfffX */
- if (cmd == P_INITIAL_META)
- return "InitialMeta";
- if (cmd == P_INITIAL_DATA)
- return "InitialData";
- if (cmd == P_CONNECTION_FEATURES)
- return "ConnectionFeatures";
- if (cmd >= ARRAY_SIZE(cmdnames))
- return "Unknown";
- return cmdnames[cmd];
+ return md->peers[peer_device->node_id].flags & flag;
}
-/**
- * drbd_wait_misc - wait for a request to make progress
- * @device: device associated with the request
- * @i: the struct drbd_interval embedded in struct drbd_request or
- * struct drbd_peer_request
- */
-int drbd_wait_misc(struct drbd_device *device, struct drbd_interval *i)
+static void md_sync_timer_fn(struct timer_list *t)
{
- struct net_conf *nc;
- DEFINE_WAIT(wait);
- long timeout;
-
- rcu_read_lock();
- nc = rcu_dereference(first_peer_device(device)->connection->net_conf);
- if (!nc) {
- rcu_read_unlock();
- return -ETIMEDOUT;
- }
- timeout = nc->ko_count ? nc->timeout * HZ / 10 * nc->ko_count : MAX_SCHEDULE_TIMEOUT;
- rcu_read_unlock();
-
- /* Indicate to wake up device->misc_wait on progress. */
- i->waiting = true;
- prepare_to_wait(&device->misc_wait, &wait, TASK_INTERRUPTIBLE);
- spin_unlock_irq(&device->resource->req_lock);
- timeout = schedule_timeout(timeout);
- finish_wait(&device->misc_wait, &wait);
- spin_lock_irq(&device->resource->req_lock);
- if (!timeout || device->state.conn < C_CONNECTED)
- return -ETIMEDOUT;
- if (signal_pending(current))
- return -ERESTARTSYS;
- return 0;
+ struct drbd_device *device = timer_container_of(device, t, md_sync_timer);
+ drbd_device_post_work(device, MD_SYNC);
}
+
void lock_all_resources(void)
{
struct drbd_resource *resource;
@@ -3685,7 +5908,7 @@ void lock_all_resources(void)
mutex_lock(&resources_mutex);
local_irq_disable();
for_each_resource(resource, &drbd_resources)
- spin_lock_nested(&resource->req_lock, i++);
+ read_lock(&resource->state_rwlock);
}
void unlock_all_resources(void)
@@ -3693,11 +5916,141 @@ void unlock_all_resources(void)
struct drbd_resource *resource;
for_each_resource(resource, &drbd_resources)
- spin_unlock(&resource->req_lock);
+ read_unlock(&resource->state_rwlock);
local_irq_enable();
mutex_unlock(&resources_mutex);
}
+long twopc_timeout(struct drbd_resource *resource)
+{
+ return resource->res_opts.twopc_timeout * HZ/10;
+}
+
+u64 directly_connected_nodes(struct drbd_resource *resource, enum which_state which)
+{
+ u64 directly_connected = 0;
+ struct drbd_connection *connection;
+
+ rcu_read_lock();
+ for_each_connection_rcu(connection, resource) {
+ if (connection->cstate[which] < C_CONNECTED)
+ continue;
+ directly_connected |= NODE_MASK(connection->peer_node_id);
+ }
+ rcu_read_unlock();
+
+ return directly_connected;
+}
+
+static sector_t bm_sect_to_max_capacity(const struct drbd_md *md, sector_t bm_sect)
+{
+ /* we do our meta data IO in 4k units */
+ u64 bm_bytes = ALIGN_DOWN(bm_sect << SECTOR_SHIFT, 4096);
+ u64 bm_bytes_per_peer = div_u64(bm_bytes, md->max_peers);
+ u64 bm_bits_per_peer = bm_bytes_per_peer * BITS_PER_BYTE;
+ return bm_bits_per_peer << (md->bm_block_shift - SECTOR_SHIFT);
+}
+
+
+/**
+ * drbd_get_max_capacity() - Returns the capacity for user-data on the local backing device
+ * @device: The DRBD device.
+ * @bdev: Meta data block device.
+ * @warn: Whether to warn when size is clipped.
+ *
+ * This function returns the capacity for user-data on the local backing
+ * device. In the case of internal meta-data, this is the backing disk size
+ * reduced by the meta-data size. In the case of external meta-data, this is
+ * the size of the backing disk.
+ */
+sector_t drbd_get_max_capacity(
+ struct drbd_device *device, struct drbd_backing_dev *bdev, bool warn)
+{
+ unsigned int bm_max_peers = bdev->md.max_peers;
+ unsigned int bm_block_size = bdev->md.bm_block_size;
+ sector_t backing_bdev_capacity = drbd_get_capacity(bdev->backing_bdev);
+ sector_t bm_sect;
+ sector_t backing_capacity_remaining;
+ sector_t metadata_limit;
+ sector_t max_capacity;
+
+ switch (bdev->md.meta_dev_idx) {
+ case DRBD_MD_INDEX_INTERNAL:
+ case DRBD_MD_INDEX_FLEX_INT:
+ bm_sect = bdev->md.al_offset - bdev->md.bm_offset;
+ backing_capacity_remaining = drbd_md_first_sector(bdev);
+ break;
+ case DRBD_MD_INDEX_FLEX_EXT:
+ bm_sect = bdev->md.md_size_sect - bdev->md.bm_offset;
+ backing_capacity_remaining = backing_bdev_capacity;
+ break;
+ default:
+ bm_sect = DRBD_BM_SECTORS_INDEXED;
+ backing_capacity_remaining = backing_bdev_capacity;
+ }
+
+ metadata_limit = bm_sect_to_max_capacity(&bdev->md, bm_sect);
+
+ dynamic_drbd_dbg(device,
+ "Backing device capacity: %llus, remaining: %llus, bitmap sectors: %llus\n",
+ (unsigned long long) backing_bdev_capacity,
+ (unsigned long long) backing_capacity_remaining,
+ (unsigned long long) bm_sect);
+ dynamic_drbd_dbg(device,
+ "Max peers: %u, bytes_per_bit: %u, metadata limit: %llus, hard limit: %llus\n",
+ bm_max_peers, bm_block_size,
+ (unsigned long long) metadata_limit,
+ (unsigned long long) DRBD_MAX_SECTORS);
+
+ max_capacity = backing_capacity_remaining;
+ if (max_capacity > DRBD_MAX_SECTORS) {
+ if (warn)
+ drbd_warn(device, "Device size clipped from %llus to %llus due to DRBD limitations\n",
+ (unsigned long long) max_capacity,
+ (unsigned long long) DRBD_MAX_SECTORS);
+ max_capacity = DRBD_MAX_SECTORS;
+ }
+ if (max_capacity > metadata_limit) {
+ if (warn)
+ drbd_warn(device, "Device size clipped from %llus to %llus due to metadata size\n",
+ (unsigned long long) max_capacity,
+ (unsigned long long) metadata_limit);
+ max_capacity = metadata_limit;
+ }
+ return max_capacity;
+}
+
+/* this is about cluster partitions, not block device partitions */
+sector_t drbd_partition_data_capacity(struct drbd_device *device)
+{
+ struct drbd_peer_device *peer_device;
+ sector_t capacity = (sector_t)(-1);
+
+ rcu_read_lock();
+ for_each_peer_device_rcu(peer_device, device) {
+ if (test_bit(HAVE_SIZES, &peer_device->flags)) {
+ dynamic_drbd_dbg(peer_device, "d_size: %llus\n",
+ (unsigned long long)peer_device->d_size);
+ capacity = min_not_zero(capacity, peer_device->d_size);
+ }
+ }
+ rcu_read_unlock();
+
+ if (get_ldev_if_state(device, D_ATTACHING)) {
+ /* In case we somehow end up here while attaching, but before
+ * we even assigned the ldev, pretend to still be diskless.
+ */
+ if (device->ldev != NULL) {
+ sector_t local_capacity = drbd_local_max_size(device);
+
+ capacity = min_not_zero(capacity, local_capacity);
+ }
+ put_ldev(device);
+ }
+
+ return capacity != (sector_t)(-1) ? capacity : 0;
+}
+
#ifdef CONFIG_DRBD_FAULT_INJECTION
/* Fault insertion support including random number generator shamelessly
* stolen from kernel/rcutorture.c */
@@ -3741,6 +6094,7 @@ _drbd_fault_str(unsigned int type) {
[DRBD_FAULT_BM_ALLOC] = "BM allocation",
[DRBD_FAULT_AL_EE] = "EE allocation",
[DRBD_FAULT_RECEIVE] = "receive data corruption",
+ [DRBD_FAULT_BIO_TOO_SMALL] = "BIO too small",
};
return (type < DRBD_FAULT_MAX) ? _faults[type] : "**Unknown**";
@@ -3753,14 +6107,13 @@ _drbd_insert_fault(struct drbd_device *device, unsigned int type)
unsigned int ret = (
(drbd_fault_devs == 0 ||
- ((1 << device_to_minor(device)) & drbd_fault_devs) != 0) &&
+ ((1 << device->minor) & drbd_fault_devs) != 0) &&
(((_drbd_fault_random(&rrs) % 100) + 1) <= drbd_fault_rate));
if (ret) {
drbd_fault_count++;
- if (drbd_ratelimit())
- drbd_warn(device, "***Simulating %s failure\n",
+ drbd_warn_ratelimit(device, "***Simulating %s failure\n",
_drbd_fault_str(type));
}
@@ -3771,7 +6124,6 @@ _drbd_insert_fault(struct drbd_device *device, unsigned int type)
module_init(drbd_init)
module_exit(drbd_cleanup)
-EXPORT_SYMBOL(drbd_conn_str);
-EXPORT_SYMBOL(drbd_role_str);
-EXPORT_SYMBOL(drbd_disk_str);
-EXPORT_SYMBOL(drbd_set_st_err_str);
+/* For transport layer */
+EXPORT_SYMBOL(drbd_destroy_connection);
+EXPORT_SYMBOL(drbd_destroy_path);
--
2.53.0
© 2016 - 2026 Red Hat, Inc.