Add a separate module implementing DRBD's transport abstraction
over InfiniBand/RDMA using the kernel's rdma_cm and IB verbs APIs.
The implementation uses send/receive semantics rather than RDMA WRITE
or READ, keeping the model compatible with the existing TCP transport.
The RDMA transport multiplexes DRBD's data and control streams over a
single RDMA connection using immediate data to tag and sequence
messages per stream.
Co-developed-by: Philipp Reisner <philipp.reisner@linbit.com>
Signed-off-by: Philipp Reisner <philipp.reisner@linbit.com>
Co-developed-by: Lars Ellenberg <lars.ellenberg@linbit.com>
Signed-off-by: Lars Ellenberg <lars.ellenberg@linbit.com>
Co-developed-by: Joel Colledge <joel.colledge@linbit.com>
Signed-off-by: Joel Colledge <joel.colledge@linbit.com>
Co-developed-by: Christoph Böhmwalder <christoph.boehmwalder@linbit.com>
Signed-off-by: Christoph Böhmwalder <christoph.boehmwalder@linbit.com>
---
drivers/block/drbd/Kconfig | 10 +
drivers/block/drbd/Makefile | 1 +
drivers/block/drbd/drbd_transport_rdma.c | 3524 ++++++++++++++++++++++
3 files changed, 3535 insertions(+)
create mode 100644 drivers/block/drbd/drbd_transport_rdma.c
diff --git a/drivers/block/drbd/Kconfig b/drivers/block/drbd/Kconfig
index f69e50be190e..203cfa2bf228 100644
--- a/drivers/block/drbd/Kconfig
+++ b/drivers/block/drbd/Kconfig
@@ -83,3 +83,13 @@ config BLK_DEV_DRBD_TCP
for DRBD replication over TCP/IP networks.
If unsure, say Y.
+
+config BLK_DEV_DRBD_RDMA
+ tristate "DRBD RDMA transport"
+ depends on BLK_DEV_DRBD && INFINIBAND && INFINIBAND_ADDR_TRANS
+ help
+
+ RDMA transport support for DRBD. This enables DRBD replication
+ over RDMA-capable networks for lower latency and higher throughput.
+
+ If unsure, say N.
diff --git a/drivers/block/drbd/Makefile b/drivers/block/drbd/Makefile
index 35f1c60d4142..d47d311f76ea 100644
--- a/drivers/block/drbd/Makefile
+++ b/drivers/block/drbd/Makefile
@@ -10,3 +10,4 @@ drbd-$(CONFIG_DEBUG_FS) += drbd_debugfs.o
obj-$(CONFIG_BLK_DEV_DRBD) += drbd.o
obj-$(CONFIG_BLK_DEV_DRBD_TCP) += drbd_transport_tcp.o
+obj-$(CONFIG_BLK_DEV_DRBD_RDMA) += drbd_transport_rdma.o
diff --git a/drivers/block/drbd/drbd_transport_rdma.c b/drivers/block/drbd/drbd_transport_rdma.c
new file mode 100644
index 000000000000..21790a769d63
--- /dev/null
+++ b/drivers/block/drbd/drbd_transport_rdma.c
@@ -0,0 +1,3524 @@
+// SPDX-License-Identifier: GPL-2.0-only
+/*
+ drbd_transport_rdma.c
+
+ This file is part of DRBD.
+
+ Copyright (C) 2014-2021, LINBIT HA-Solutions GmbH.
+*/
+
+#undef pr_fmt
+#define pr_fmt(fmt) "drbd_rdma: " fmt
+
+#ifndef SENDER_COMPACTS_BVECS
+/* My benchmarking shows a limit of 30 MB/s
+ * with the current implementation of this idea.
+ * cpu bound, perf top shows mainly get_page/put_page.
+ * Without this, using the plain send_page,
+ * I achieve > 400 MB/s on the same system.
+ * => disable for now, improve later.
+ */
+#define SENDER_COMPACTS_BVECS 0
+#endif
+
+#include <linux/module.h>
+#include <linux/sched/signal.h>
+#include <linux/bio.h>
+#include <rdma/ib_verbs.h>
+#include <rdma/rdma_cm.h>
+#include <rdma/ib_cm.h>
+#include <linux/interrupt.h>
+#include <linux/drbd_genl_api.h>
+#include "drbd_protocol.h"
+#include "drbd_transport.h"
+#include "linux/drbd_config.h" /* for REL_VERSION */
+
+/* Nearly all data transfer uses the send/receive semantics. No need to
+ actually use RDMA WRITE / READ.
+
+ Only for DRBD's remote read (P_DATA_REQUEST and P_DATA_REPLY) a
+ RDMA WRITE would make a lot of sense:
+ Right now the recv_dless_read() function in DRBD is one of the few
+ remaining callers of recv(,,CALLER_BUFFER). This in turn needs a
+ memcpy().
+
+ The block_id field (64 bit) could be re-labelled to be the RKEY for
+ an RDMA WRITE. The P_DATA_REPLY packet will then only deliver the
+ news that the RDMA WRITE was executed...
+
+
+ Flow Control
+ ============
+
+ If the receiving machine can not keep up with the data rate it needs to
+ slow down the sending machine. In order to do so we keep track of the
+ number of rx_descs the peer has posted (peer_rx_descs).
+
+ If one player posts new rx_descs it tells the peer about it with a
+ dtr_flow_control packet. Those packet get never delivered to the
+ DRBD above us.
+*/
+
+MODULE_AUTHOR("Roland Kammerer <roland.kammerer@linbit.com>");
+MODULE_AUTHOR("Philipp Reisner <philipp.reisner@linbit.com>");
+MODULE_AUTHOR("Lars Ellenberg <lars.ellenberg@linbit.com>");
+MODULE_DESCRIPTION("RDMA transport layer for DRBD");
+MODULE_LICENSE("GPL");
+MODULE_VERSION(REL_VERSION);
+
+int allocation_size;
+/* module_param(allocation_size, int, 0664);
+ MODULE_PARM_DESC(allocation_size, "Allocation size for receive buffers (page size of peer)");
+
+ That needs to be implemented in dtr_create_rx_desc() and in dtr_recv() and dtr_recv_pages() */
+
+/* If no recvbuf_size or sendbuf_size is configured use 1M plus two pages for the DATA_STREAM */
+/* Actually it is not a buffer, but the number of tx_descs or rx_descs we allow,
+ very comparable to the socket sendbuf and recvbuf sizes */
+#define RDMA_DEF_BUFFER_SIZE (DRBD_MAX_BIO_SIZE + 2 * PAGE_SIZE)
+
+/* If we can send less than 8 packets, we consider the transport as congested. */
+#define DESCS_LOW_LEVEL 8
+
+/* Assuming that a singe 4k write should be at the highest scatterd over 8
+ pages. I.e. has no parts smaller than 512 bytes.
+ Arbitrary assumption. It seems that Mellanox hardware can do up to 29
+ ppc64 page size might be 64k */
+#if (PAGE_SIZE / 512) > 28
+# define DTR_MAX_TX_SGES 28
+#else
+# define DTR_MAX_TX_SGES (PAGE_SIZE / 512)
+#endif
+
+#define DTR_MAGIC ((u32)0x5257494E)
+
+struct dtr_flow_control {
+ uint32_t magic;
+ uint32_t new_rx_descs[2];
+ uint32_t send_from_stream;
+} __packed;
+
+/* These numbers are sent within the immediate data value to identify
+ if the packet is a data, and control or a (transport private) flow_control
+ message */
+enum dtr_stream_nr {
+ ST_DATA = DATA_STREAM,
+ ST_CONTROL = CONTROL_STREAM,
+ ST_FLOW_CTRL
+};
+
+/* IB_WR_SEND_WITH_IMM and IB_WR_RDMA_WRITE_WITH_IMM
+
+ both transfer user data and a 32bit value with is delivered at the receiving
+ to the event handler of the completion queue. I.e. that can be used to queue
+ the incoming messages to different streams.
+
+ dtr_imm:
+ In order to support folding the data and the control stream into one RDMA
+ connection we use the stream field of dtr_imm: DATA_STREAM, CONTROL_STREAM
+ and FLOW_CONTROL.
+ To be able to order the messages on the receiving side before delivering them
+ to the upper layers we use a sequence number.
+
+ */
+#define SEQUENCE_BITS 30
+union dtr_immediate {
+ struct {
+#if defined(__LITTLE_ENDIAN_BITFIELD)
+ unsigned int sequence:SEQUENCE_BITS;
+ unsigned int stream:2;
+#elif defined(__BIG_ENDIAN_BITFIELD)
+ unsigned int stream:2;
+ unsigned int sequence:SEQUENCE_BITS;
+#else
+# error "this endianness is not supported"
+#endif
+ };
+ unsigned int i;
+};
+
+
+enum dtr_state_bits {
+ DSB_CONNECT_REQ,
+ DSB_CONNECTING,
+ DSB_CONNECTED,
+ DSB_ERROR,
+};
+
+#define DSM_CONNECT_REQ (1 << DSB_CONNECT_REQ)
+#define DSM_CONNECTING (1 << DSB_CONNECTING)
+#define DSM_CONNECTED (1 << DSB_CONNECTED)
+#define DSM_ERROR (1 << DSB_ERROR)
+
+enum dtr_alloc_rdma_res_causes {
+ IB_ALLOC_PD,
+ IB_ALLOC_CQ_RX,
+ IB_ALLOC_CQ_TX,
+ RDMA_CREATE_QP,
+ IB_GET_DMA_MR
+};
+
+struct dtr_rx_desc {
+ struct page *page;
+ struct list_head list;
+ int size;
+ unsigned int sequence;
+ struct dtr_cm *cm;
+ struct ib_cqe cqe;
+ struct ib_sge sge;
+};
+
+struct dtr_tx_desc {
+ union {
+ struct page *page;
+ void *data;
+ struct bio *bio;
+ };
+ enum {
+ SEND_PAGE,
+ SEND_MSG,
+ SEND_BIO,
+ } type;
+ int nr_sges;
+ union dtr_immediate imm;
+ struct ib_cqe cqe;
+ struct ib_sge sge[]; /* must be last! */
+};
+
+struct dtr_flow {
+ struct dtr_path *path;
+
+ atomic_t tx_descs_posted;
+ int tx_descs_max; /* derived from net_conf->sndbuf_size. Do not change after alloc. */
+ atomic_t peer_rx_descs; /* peer's receive window in number of rx descs */
+
+ atomic_t rx_descs_posted;
+ int rx_descs_max; /* derived from net_conf->rcvbuf_size. Do not change after alloc. */
+
+ atomic_t rx_descs_allocated;
+ int rx_descs_want_posted;
+ atomic_t rx_descs_known_to_peer;
+};
+
+enum connect_state_enum {
+ PCS_INACTIVE,
+ PCS_REQUEST_ABORT,
+ PCS_FINISHING = PCS_REQUEST_ABORT,
+ PCS_CONNECTING,
+};
+
+struct dtr_connect_state {
+ struct delayed_work retry_connect_work;
+ atomic_t active_state; /* trying to establish a connection*/
+ atomic_t passive_state; /* listening for a connection */
+ wait_queue_head_t wq;
+ bool active; /* active = established by connect ; !active = established by accept */
+};
+
+struct dtr_path {
+ struct drbd_path path;
+
+ struct dtr_connect_state cs;
+
+ struct dtr_cm *cm; /* RCU'd and kref in cm */
+
+ struct dtr_flow flow[2];
+ spinlock_t send_flow_control_lock;
+ struct tasklet_struct flow_control_tasklet;
+ struct work_struct refill_rx_descs_work;
+};
+
+struct dtr_stream {
+ wait_queue_head_t send_wq;
+ wait_queue_head_t recv_wq;
+
+ /* for recv() to keep track of the current rx_desc:
+ * - whenever the bytes_left of the current rx_desc == 0, we know that all data
+ * is consumed, and get a new rx_desc from the completion queue, and set
+ * current rx_desc accodingly.
+ */
+ struct {
+ struct dtr_rx_desc *desc;
+ void *pos;
+ int bytes_left;
+ } current_rx;
+
+ unsigned long unread; /* unread received; unit: bytes */
+ struct list_head rx_descs;
+ spinlock_t rx_descs_lock;
+
+ long send_timeout;
+ long recv_timeout;
+
+ unsigned int tx_sequence;
+ unsigned int rx_sequence;
+ struct dtr_transport *rdma_transport;
+};
+
+struct dtr_transport {
+ struct drbd_transport transport;
+ struct dtr_stream stream[2];
+ int rx_allocation_size;
+ int sges_max;
+ bool active; /* connect() returned no error. I.e. C_CONNECTING or C_CONNECTED */
+
+ /* per transport rate limit state for diagnostic messages.
+ * maybe: one for debug, one for warning, one for error?
+ * maybe: move into generic drbd_transport an tr_{warn,err,debug}().
+ */
+ struct ratelimit_state rate_limit;
+
+ struct timer_list control_timer;
+ atomic_t first_path_connect_err;
+ struct completion connected;
+
+ struct tasklet_struct control_tasklet;
+};
+
+struct dtr_cm {
+ struct kref kref;
+ struct rdma_cm_id *id;
+ struct dtr_path *path;
+
+ struct ib_cq *recv_cq;
+ struct ib_cq *send_cq;
+ struct ib_pd *pd;
+
+ unsigned long state; /* DSB bits / DSM masks */
+ wait_queue_head_t state_wq;
+ unsigned long last_sent_jif;
+ atomic_t tx_descs_posted;
+ struct timer_list tx_timeout;
+
+ struct work_struct tx_timeout_work;
+ struct work_struct connect_work;
+ struct work_struct establish_work;
+ struct work_struct disconnect_work;
+
+ struct list_head error_rx_descs;
+ spinlock_t error_rx_descs_lock;
+ struct work_struct end_rx_work;
+ struct work_struct end_tx_work;
+
+ struct dtr_transport *rdma_transport;
+ struct rcu_head rcu;
+};
+
+struct dtr_listener {
+ struct drbd_listener listener;
+
+ struct dtr_cm cm;
+};
+
+static int dtr_init(struct drbd_transport *transport);
+static void dtr_free(struct drbd_transport *transport, enum drbd_tr_free_op);
+static int dtr_prepare_connect(struct drbd_transport *transport);
+static int dtr_connect(struct drbd_transport *transport);
+static void dtr_finish_connect(struct drbd_transport *transport);
+static int dtr_recv(struct drbd_transport *transport, enum drbd_stream stream, void **buf, size_t size, int flags);
+static void dtr_stats(struct drbd_transport *transport, struct drbd_transport_stats *stats);
+static int dtr_net_conf_change(struct drbd_transport *transport, struct net_conf *new_net_conf);
+static void dtr_set_rcvtimeo(struct drbd_transport *transport, enum drbd_stream stream, long timeout);
+static long dtr_get_rcvtimeo(struct drbd_transport *transport, enum drbd_stream stream);
+static int dtr_send_page(struct drbd_transport *transport, enum drbd_stream stream, struct page *page,
+ int offset, size_t size, unsigned msg_flags);
+static int dtr_send_zc_bio(struct drbd_transport *, struct bio *bio);
+static int dtr_recv_pages(struct drbd_transport *transport, struct drbd_page_chain_head *chain, size_t size);
+static bool dtr_stream_ok(struct drbd_transport *transport, enum drbd_stream stream);
+static bool dtr_hint(struct drbd_transport *transport, enum drbd_stream stream, enum drbd_tr_hints hint);
+static void dtr_debugfs_show(struct drbd_transport *, struct seq_file *m);
+static int dtr_add_path(struct drbd_path *path);
+static bool dtr_may_remove_path(struct drbd_path *path);
+static void dtr_remove_path(struct drbd_path *path);
+
+static int dtr_create_cm_id(struct dtr_cm *cm_context, struct net *net);
+static bool dtr_path_ok(struct dtr_path *path);
+static bool dtr_transport_ok(struct drbd_transport *transport);
+static int __dtr_post_tx_desc(struct dtr_cm *, struct dtr_tx_desc *);
+static int dtr_post_tx_desc(struct dtr_transport *, struct dtr_tx_desc *);
+static int dtr_repost_tx_desc(struct dtr_cm *old_cm, struct dtr_tx_desc *tx_desc);
+static int dtr_repost_rx_desc(struct dtr_cm *cm, struct dtr_rx_desc *rx_desc);
+static bool dtr_receive_rx_desc(struct dtr_transport *, enum drbd_stream,
+ struct dtr_rx_desc **);
+static void dtr_recycle_rx_desc(struct drbd_transport *transport,
+ enum drbd_stream stream,
+ struct dtr_rx_desc **pp_rx_desc,
+ gfp_t gfp_mask);
+static void dtr_refill_rx_desc(struct dtr_transport *rdma_transport,
+ enum drbd_stream stream);
+static void dtr_free_tx_desc(struct dtr_cm *cm, struct dtr_tx_desc *tx_desc);
+static void dtr_free_rx_desc(struct dtr_rx_desc *rx_desc);
+static void dtr_cma_disconnect_work_fn(struct work_struct *work);
+static void dtr_disconnect_path(struct dtr_path *path);
+static void __dtr_disconnect_path(struct dtr_path *path);
+static int dtr_init_flow(struct dtr_path *path, enum drbd_stream stream);
+static int dtr_cm_alloc_rdma_res(struct dtr_cm *cm);
+static void __dtr_refill_rx_desc(struct dtr_path *path, enum drbd_stream stream);
+static int dtr_send_flow_control_msg(struct dtr_path *path, gfp_t gfp_mask);
+static struct dtr_cm *dtr_path_get_cm_connected(struct dtr_path *path);
+static void dtr_destroy_cm(struct kref *kref);
+static void dtr_destroy_cm_keep_id(struct kref *kref);
+static int dtr_activate_path(struct dtr_path *path);
+static void dtr_end_tx_work_fn(struct work_struct *work);
+static void dtr_end_rx_work_fn(struct work_struct *work);
+static void dtr_cma_retry_connect(struct dtr_path *path, struct dtr_cm *failed_cm);
+static void dtr_tx_timeout_fn(struct timer_list *t);
+static void dtr_control_timer_fn(struct timer_list *t);
+static void dtr_tx_timeout_work_fn(struct work_struct *work);
+static void dtr_cma_connect_work_fn(struct work_struct *work);
+static struct dtr_rx_desc *dtr_next_rx_desc(struct dtr_stream *rdma_stream);
+static void dtr_control_tasklet_fn(struct tasklet_struct *t);
+static int dtr_init_listener(struct drbd_transport *transport, const struct sockaddr *addr,
+ struct net *net, struct drbd_listener *drbd_listener);
+static void dtr_destroy_listener(struct drbd_listener *generic_listener);
+
+
+static struct drbd_transport_class rdma_transport_class = {
+ .name = "rdma",
+ .instance_size = sizeof(struct dtr_transport),
+ .path_instance_size = sizeof(struct dtr_path),
+ .listener_instance_size = sizeof(struct dtr_listener),
+ .ops = (struct drbd_transport_ops) {
+ .init = dtr_init,
+ .free = dtr_free,
+ .init_listener = dtr_init_listener,
+ .release_listener = dtr_destroy_listener,
+ .prepare_connect = dtr_prepare_connect,
+ .connect = dtr_connect,
+ .finish_connect = dtr_finish_connect,
+ .recv = dtr_recv,
+ .stats = dtr_stats,
+ .net_conf_change = dtr_net_conf_change,
+ .set_rcvtimeo = dtr_set_rcvtimeo,
+ .get_rcvtimeo = dtr_get_rcvtimeo,
+ .send_page = dtr_send_page,
+ .send_zc_bio = dtr_send_zc_bio,
+ .recv_pages = dtr_recv_pages,
+ .stream_ok = dtr_stream_ok,
+ .hint = dtr_hint,
+ .debugfs_show = dtr_debugfs_show,
+ .add_path = dtr_add_path,
+ .may_remove_path = dtr_may_remove_path,
+ .remove_path = dtr_remove_path,
+ },
+ .module = THIS_MODULE,
+ .list = LIST_HEAD_INIT(rdma_transport_class.list),
+};
+
+static struct rdma_conn_param dtr_conn_param = {
+ .responder_resources = 1,
+ .initiator_depth = 1,
+ .retry_count = 10,
+ .rnr_retry_count = 7,
+};
+
+static u32 dtr_cm_to_lkey(struct dtr_cm *cm)
+{
+ return cm->pd->local_dma_lkey;
+}
+
+static void dtr_re_init_stream(struct dtr_stream *rdma_stream)
+{
+ struct drbd_transport *transport = &rdma_stream->rdma_transport->transport;
+
+ rdma_stream->current_rx.pos = NULL;
+ rdma_stream->current_rx.bytes_left = 0;
+
+ rdma_stream->tx_sequence = 1;
+ rdma_stream->rx_sequence = 1;
+ rdma_stream->unread = 0;
+
+ TR_ASSERT(transport, list_empty(&rdma_stream->rx_descs));
+ TR_ASSERT(transport, rdma_stream->current_rx.desc == NULL);
+}
+
+static void dtr_init_stream(struct dtr_stream *rdma_stream,
+ struct drbd_transport *transport)
+{
+ rdma_stream->current_rx.desc = NULL;
+
+ rdma_stream->recv_timeout = MAX_SCHEDULE_TIMEOUT;
+ rdma_stream->send_timeout = MAX_SCHEDULE_TIMEOUT;
+
+ init_waitqueue_head(&rdma_stream->recv_wq);
+ init_waitqueue_head(&rdma_stream->send_wq);
+ rdma_stream->rdma_transport =
+ container_of(transport, struct dtr_transport, transport);
+
+ INIT_LIST_HEAD(&rdma_stream->rx_descs);
+ spin_lock_init(&rdma_stream->rx_descs_lock);
+
+ dtr_re_init_stream(rdma_stream);
+}
+
+static int dtr_init(struct drbd_transport *transport)
+{
+ struct dtr_transport *rdma_transport =
+ container_of(transport, struct dtr_transport, transport);
+ int i;
+
+ transport->class = &rdma_transport_class;
+
+ rdma_transport->rx_allocation_size = allocation_size;
+ rdma_transport->active = false;
+ rdma_transport->sges_max = DTR_MAX_TX_SGES;
+
+ ratelimit_state_init(&rdma_transport->rate_limit, 5*HZ, 4);
+ timer_setup(&rdma_transport->control_timer, dtr_control_timer_fn, 0);
+
+ for (i = DATA_STREAM; i <= CONTROL_STREAM ; i++)
+ dtr_init_stream(&rdma_transport->stream[i], transport);
+
+ tasklet_setup(&rdma_transport->control_tasklet, dtr_control_tasklet_fn);
+
+ return 0;
+}
+
+static void dtr_free(struct drbd_transport *transport, enum drbd_tr_free_op free_op)
+{
+ struct dtr_transport *rdma_transport =
+ container_of(transport, struct dtr_transport, transport);
+ struct drbd_path *drbd_path;
+ int i;
+
+ rdma_transport->active = false;
+
+ list_for_each_entry(drbd_path, &transport->paths, list) {
+ struct dtr_path *path = container_of(drbd_path, struct dtr_path, path);
+
+ __dtr_disconnect_path(path);
+ }
+
+ /* Free the rx_descs that where received and not consumed. */
+ for (i = DATA_STREAM; i <= CONTROL_STREAM ; i++) {
+ struct dtr_stream *rdma_stream = &rdma_transport->stream[i];
+ struct dtr_rx_desc *rx_desc, *tmp;
+ LIST_HEAD(rx_descs);
+
+ dtr_free_rx_desc(rdma_stream->current_rx.desc);
+ rdma_stream->current_rx.desc = NULL;
+
+ spin_lock_irq(&rdma_stream->rx_descs_lock);
+ list_splice_init(&rdma_stream->rx_descs, &rx_descs);
+ spin_unlock_irq(&rdma_stream->rx_descs_lock);
+
+ list_for_each_entry_safe(rx_desc, tmp, &rx_descs, list)
+ dtr_free_rx_desc(rx_desc);
+ }
+
+ list_for_each_entry(drbd_path, &transport->paths, list) {
+ struct dtr_path *path = container_of(drbd_path, struct dtr_path, path);
+ struct dtr_cm *cm;
+
+ cm = xchg(&path->cm, NULL); // RCU xchg
+ if (cm)
+ kref_put(&cm->kref, dtr_destroy_cm);
+ }
+
+ timer_delete_sync(&rdma_transport->control_timer);
+
+ if (free_op == DESTROY_TRANSPORT) {
+ list_for_each_entry(drbd_path, &transport->paths, list) {
+ struct dtr_path *path = container_of(drbd_path, struct dtr_path, path);
+
+ cancel_work_sync(&path->refill_rx_descs_work);
+ flush_delayed_work(&path->cs.retry_connect_work);
+ }
+
+ /* The transport object itself is embedded into a conneciton.
+ Do not free it here! The function should better be called
+ uninit. */
+ }
+}
+
+static void dtr_control_timer_fn(struct timer_list *t)
+{
+ struct dtr_transport *rdma_transport = timer_container_of(rdma_transport, t, control_timer);
+ struct drbd_transport *transport = &rdma_transport->transport;
+
+ drbd_control_event(transport, TIMEOUT);
+}
+
+static bool atomic_inc_if_below(atomic_t *v, int limit)
+{
+ int old, cur;
+
+ cur = atomic_read(v);
+ do {
+ old = cur;
+ if (old >= limit)
+ return false;
+
+ cur = atomic_cmpxchg(v, old, old + 1);
+ } while (cur != old);
+
+ return true;
+}
+
+static int dtr_send(struct dtr_path *path, void *buf, size_t size, gfp_t gfp_mask)
+{
+ struct ib_device *device;
+ struct dtr_tx_desc *tx_desc;
+ struct dtr_cm *cm;
+ void *send_buffer;
+ int err = -ECONNRESET;
+
+ // pr_info("%s: dtr_send() size = %d data[0]:%lx\n", rdma_stream->name, (int)size, *(unsigned long*)buf);
+
+ cm = dtr_path_get_cm_connected(path);
+ if (!cm)
+ goto out;
+
+ err = -ENOMEM;
+ tx_desc = kzalloc(sizeof(*tx_desc) + sizeof(struct ib_sge), gfp_mask);
+ if (!tx_desc)
+ goto out_put;
+
+ send_buffer = kmalloc(size, gfp_mask);
+ if (!send_buffer) {
+ kfree(tx_desc);
+ goto out_put;
+ }
+ memcpy(send_buffer, buf, size);
+
+ device = cm->id->device;
+ tx_desc->type = SEND_MSG;
+ tx_desc->data = send_buffer;
+ tx_desc->nr_sges = 1;
+ tx_desc->sge[0].addr = ib_dma_map_single(device, send_buffer, size, DMA_TO_DEVICE);
+ err = ib_dma_mapping_error(device, tx_desc->sge[0].addr);
+ if (err) {
+ kfree(tx_desc);
+ kfree(send_buffer);
+ goto out_put;
+ }
+
+ tx_desc->sge[0].lkey = dtr_cm_to_lkey(cm);
+ tx_desc->sge[0].length = size;
+ tx_desc->imm = (union dtr_immediate)
+ { .stream = ST_FLOW_CTRL, .sequence = 0 };
+
+ err = __dtr_post_tx_desc(cm, tx_desc);
+ if (err)
+ dtr_free_tx_desc(cm, tx_desc);
+
+out_put:
+ kref_put(&cm->kref, dtr_destroy_cm);
+out:
+ return err;
+}
+
+
+static int dtr_recv_pages(struct drbd_transport *transport, struct drbd_page_chain_head *chain, size_t size)
+{
+ struct dtr_transport *rdma_transport =
+ container_of(transport, struct dtr_transport, transport);
+ struct dtr_stream *rdma_stream = &rdma_transport->stream[DATA_STREAM];
+ struct page *page, *head = NULL, *tail = NULL;
+ int i = 0;
+
+ if (!dtr_transport_ok(transport))
+ return -ECONNRESET;
+
+ // pr_info("%s: in recv_pages, size: %zu\n", rdma_stream->name, size);
+ TR_ASSERT(transport, rdma_stream->current_rx.bytes_left == 0);
+ dtr_recycle_rx_desc(transport, DATA_STREAM, &rdma_stream->current_rx.desc, GFP_NOIO);
+ dtr_refill_rx_desc(rdma_transport, DATA_STREAM);
+
+ while (size) {
+ struct dtr_rx_desc *rx_desc = NULL;
+ long t;
+
+ t = wait_event_interruptible_timeout(rdma_stream->recv_wq,
+ dtr_receive_rx_desc(rdma_transport, DATA_STREAM, &rx_desc),
+ rdma_stream->recv_timeout);
+
+ if (t <= 0) {
+ /*
+ * Cannot give back pages that may still be in use!
+ * (More reason why we only have one rx_desc per page,
+ * and don't get_page() in dtr_create_rx_desc).
+ */
+ drbd_free_pages(transport, head);
+ return t == 0 ? -EAGAIN : -EINTR;
+ }
+
+ page = rx_desc->page;
+ /* put_page() if we would get_page() in
+ * dtr_create_rx_desc(). but we don't. We return the page
+ * chain to the user, which is supposed to give it back to
+ * drbd_free_pages() eventually. */
+ rx_desc->page = NULL;
+ size -= rx_desc->size;
+
+ /* If the sender did dtr_send_page every bvec of a bio with
+ * unaligned bvecs (as xfs often creates), rx_desc->size and
+ * offset may well be not the PAGE_SIZE and 0 we hope for.
+ */
+ if (tail) {
+ /* See also dtr_create_rx_desc().
+ * For PAGE_SIZE > 4k, we may create several RR per page.
+ * We cannot link a page to itself, though.
+ *
+ * Adding to size would be easy enough.
+ * But what do we do about possible holes?
+ * FIXME
+ */
+ BUG_ON(page == tail);
+
+ set_page_chain_next(tail, page);
+ tail = page;
+ } else
+ head = tail = page;
+
+ set_page_chain_offset(page, 0);
+ set_page_chain_size(page, rx_desc->size);
+
+ atomic_dec(&rx_desc->cm->path->flow[DATA_STREAM].rx_descs_allocated);
+ dtr_free_rx_desc(rx_desc);
+
+ i++;
+ dtr_refill_rx_desc(rdma_transport, DATA_STREAM);
+ }
+
+ // pr_info("%s: rcvd %d pages\n", rdma_stream->name, i);
+ chain->head = head;
+ chain->nr_pages = i;
+ return 0;
+}
+
+static int _dtr_recv(struct drbd_transport *transport, enum drbd_stream stream,
+ void **buf, size_t size, int flags)
+{
+ struct dtr_transport *rdma_transport =
+ container_of(transport, struct dtr_transport, transport);
+ struct dtr_stream *rdma_stream = &rdma_transport->stream[stream];
+ struct dtr_rx_desc *rx_desc = NULL;
+ void *buffer;
+
+ if (flags & GROW_BUFFER) {
+ /* Since transport_rdma always returns the full, requested amount
+ of data, DRBD should never call with GROW_BUFFER! */
+ tr_err(transport, "Called with GROW_BUFFER\n");
+ return -EINVAL;
+ } else if (rdma_stream->current_rx.bytes_left == 0) {
+ long t;
+
+ dtr_recycle_rx_desc(transport, stream, &rdma_stream->current_rx.desc, GFP_NOIO);
+ if (flags & MSG_DONTWAIT) {
+ t = dtr_receive_rx_desc(rdma_transport, stream, &rx_desc);
+ } else {
+ t = wait_event_interruptible_timeout(rdma_stream->recv_wq,
+ dtr_receive_rx_desc(rdma_transport, stream, &rx_desc),
+ rdma_stream->recv_timeout);
+ }
+
+ if (t <= 0)
+ return t == 0 ? -EAGAIN : -EINTR;
+
+ // pr_info("%s: got a new page with size: %d\n", rdma_stream->name, rx_desc->size);
+ buffer = page_address(rx_desc->page);
+ rdma_stream->current_rx.desc = rx_desc;
+ rdma_stream->current_rx.pos = buffer + size;
+ rdma_stream->current_rx.bytes_left = rx_desc->size - size;
+ if (rdma_stream->current_rx.bytes_left < 0)
+ tr_warn(transport,
+ "new, requesting more (%zu) than available (%d)\n", size, rx_desc->size);
+
+ if (flags & CALLER_BUFFER)
+ memcpy(*buf, buffer, size);
+ else
+ *buf = buffer;
+
+ // pr_info("%s: recv completely new fine, returning size on\n", rdma_stream->name);
+ // pr_info("%s: rx_count: %d\n", rdma_stream->name, rdma_stream->rx_descs_posted);
+
+ return size;
+ } else { /* return next part */
+ // pr_info("recv next part on %s\n", rdma_stream->name);
+ buffer = rdma_stream->current_rx.pos;
+ rdma_stream->current_rx.pos += size;
+
+ if (rdma_stream->current_rx.bytes_left < size) {
+ tr_err(transport,
+ "requested more than left! bytes_left = %d, size = %zu\n",
+ rdma_stream->current_rx.bytes_left, size);
+ rdma_stream->current_rx.bytes_left = 0; /* 0 left == get new entry */
+ } else {
+ rdma_stream->current_rx.bytes_left -= size;
+ // pr_info("%s: old_rx left: %d\n", rdma_stream->name, rdma_stream->current_rx.bytes_left);
+ }
+
+ if (flags & CALLER_BUFFER)
+ memcpy(*buf, buffer, size);
+ else
+ *buf = buffer;
+
+ // pr_info("%s: recv next part fine, returning size\n", rdma_stream->name);
+ return size;
+ }
+
+ return 0;
+}
+
+static int dtr_recv(struct drbd_transport *transport, enum drbd_stream stream, void **buf, size_t size, int flags)
+{
+ struct dtr_transport *rdma_transport;
+ int err;
+
+ if (!transport)
+ return -ECONNRESET;
+
+ rdma_transport = container_of(transport, struct dtr_transport, transport);
+
+ if (!dtr_transport_ok(transport))
+ return -ECONNRESET;
+
+ err = _dtr_recv(transport, stream, buf, size, flags);
+
+ dtr_refill_rx_desc(rdma_transport, stream);
+ return err;
+}
+
+static void dtr_stats(struct drbd_transport *transport, struct drbd_transport_stats *stats)
+{
+ struct dtr_transport *rdma_transport =
+ container_of(transport, struct dtr_transport, transport);
+ struct dtr_path *path;
+ int sb_size = 0, sb_used = 0;
+
+ rcu_read_lock();
+ list_for_each_entry_rcu(path, &transport->paths, path.list) {
+ struct dtr_flow *flow = &path->flow[DATA_STREAM];
+
+ sb_size += flow->tx_descs_max;
+ sb_used += atomic_read(&flow->tx_descs_posted);
+ }
+ rcu_read_unlock();
+
+ /* these are used by the sender, guess we should them get right */
+ stats->send_buffer_size = sb_size * DRBD_SOCKET_BUFFER_SIZE;
+ stats->send_buffer_used = sb_used * DRBD_SOCKET_BUFFER_SIZE;
+
+ /* these two for debugfs */
+ stats->unread_received = rdma_transport->stream[DATA_STREAM].unread;
+ stats->unacked_send = stats->send_buffer_used;
+
+}
+
+/* The following functions (at least)
+ dtr_path_established_work_fn(),
+ dtr_cma_accept_work_fn(), dtr_cma_accept(),
+ dtr_cma_retry_connect_work_fn(),
+ dtr_cma_retry_connect(),
+ dtr_cma_connect_fail_work_fn(), dtr_cma_connect(),
+ dtr_cma_disconnect_work_fn(), dtr_cma_disconnect(),
+ dtr_cma_event_handler()
+
+ are called from worker context or are callbacks from rdma_cm's context.
+
+ We need to make sure the path does not go away in the meantime.
+ */
+
+static int dtr_path_prepare(struct dtr_path *path, struct dtr_cm *cm, bool active)
+{
+ struct dtr_cm *cm2;
+ int i, err;
+
+ cm2 = cmpxchg(&path->cm, NULL, cm); // RCU xchg
+ if (cm2) {
+ /*
+ * The caller needs to hold a ref on cm. dtr_path_prepare()
+ * gifts that reference to the path. If setting the pointer in
+ * the path fails, we have to put one ref of cm.
+ */
+ kref_put(&cm->kref, dtr_destroy_cm);
+ return -ENOENT;
+ }
+
+ path->cs.active = active;
+ for (i = DATA_STREAM; i <= CONTROL_STREAM ; i++)
+ dtr_init_flow(path, i);
+
+ err = dtr_cm_alloc_rdma_res(cm);
+
+ return err;
+}
+
+static struct dtr_cm *__dtr_path_get_cm(struct dtr_path *path)
+{
+ struct dtr_cm *cm;
+
+ cm = rcu_dereference(path->cm);
+ if (cm && !kref_get_unless_zero(&cm->kref))
+ cm = NULL;
+ return cm;
+}
+
+static struct dtr_cm *dtr_path_get_cm(struct dtr_path *path)
+{
+ struct dtr_cm *cm;
+
+ rcu_read_lock();
+ cm = __dtr_path_get_cm(path);
+ rcu_read_unlock();
+ return cm;
+}
+
+static struct dtr_cm *dtr_path_get_cm_connected(struct dtr_path *path)
+{
+ struct dtr_cm *cm;
+
+ cm = dtr_path_get_cm(path);
+ if (cm && cm->state != DSM_CONNECTED) {
+ kref_put(&cm->kref, dtr_destroy_cm);
+ cm = NULL;
+ }
+ return cm;
+}
+
+static void dtr_path_established_work_fn(struct work_struct *work)
+{
+ struct dtr_cm *cm = container_of(work, struct dtr_cm, establish_work);
+ struct dtr_path *path = cm->path;
+ struct drbd_transport *transport = path->path.transport;
+ struct dtr_transport *rdma_transport =
+ container_of(transport, struct dtr_transport, transport);
+ struct dtr_connect_state *cs = &path->cs;
+ int i, p, err;
+
+
+ err = cm != path->cm;
+ kref_put(&cm->kref, dtr_destroy_cm);
+ if (err)
+ return;
+
+ p = atomic_cmpxchg(&cs->passive_state, PCS_CONNECTING, PCS_FINISHING);
+ if (p < PCS_CONNECTING)
+ goto out;
+
+ path->cm->state = DSM_CONNECTED;
+
+ for (i = DATA_STREAM; i <= CONTROL_STREAM ; i++)
+ __dtr_refill_rx_desc(path, i);
+ err = dtr_send_flow_control_msg(path, GFP_NOIO);
+ if (err > 0)
+ err = 0;
+ if (err)
+ tr_err(transport, "sending first flow_control_msg() failed\n");
+
+ schedule_timeout(HZ / 4);
+ if (!dtr_path_ok(path)) {
+ if (path->cs.active)
+ dtr_cma_retry_connect(path, path->cm);
+ return;
+ }
+
+ p = atomic_cmpxchg(&rdma_transport->first_path_connect_err, 1, err);
+ if (p == 1) {
+ if (cs->active)
+ set_bit(RESOLVE_CONFLICTS, &transport->flags);
+ else
+ clear_bit(RESOLVE_CONFLICTS, &transport->flags);
+ complete(&rdma_transport->connected);
+ }
+
+ set_bit(TR_ESTABLISHED, &path->path.flags);
+ drbd_path_event(transport, &path->path);
+
+out:
+ atomic_set(&cs->active_state, PCS_INACTIVE);
+ p = atomic_xchg(&cs->passive_state, PCS_INACTIVE);
+ if (p > PCS_INACTIVE)
+ drbd_put_listener(&path->path);
+
+ wake_up(&cs->wq);
+}
+
+static struct dtr_cm *dtr_alloc_cm(struct dtr_path *path)
+{
+ struct dtr_cm *cm;
+
+ cm = kzalloc_obj(*cm);
+ if (!cm)
+ return NULL;
+
+ kref_init(&cm->kref);
+ INIT_WORK(&cm->connect_work, dtr_cma_connect_work_fn);
+ INIT_WORK(&cm->establish_work, dtr_path_established_work_fn);
+ INIT_WORK(&cm->disconnect_work, dtr_cma_disconnect_work_fn);
+ INIT_WORK(&cm->end_rx_work, dtr_end_rx_work_fn);
+ INIT_WORK(&cm->end_tx_work, dtr_end_tx_work_fn);
+ INIT_WORK(&cm->tx_timeout_work, dtr_tx_timeout_work_fn);
+ INIT_LIST_HEAD(&cm->error_rx_descs);
+ spin_lock_init(&cm->error_rx_descs_lock);
+ timer_setup(&cm->tx_timeout, dtr_tx_timeout_fn, 0);
+
+ kref_get(&path->path.kref);
+ cm->path = path;
+ cm->rdma_transport = container_of(path->path.transport, struct dtr_transport, transport);
+
+ /*
+ * We need this module in core as long as a dtr_tx_desc, a dtr_rx_desc
+ * or a dtr_cm object exists because they might have a callback
+ * registered in the RDMA code that will call back into this module. The
+ * rx and tx descs have a reference to the dtr_cm object, so taking an
+ * extra reference to the module for each dtr_cm object is sufficient.
+ */
+ __module_get(THIS_MODULE);
+
+ return cm;
+}
+
+static int dtr_cma_accept(struct dtr_listener *listener, struct rdma_cm_id *new_cm_id, struct dtr_cm **ret_cm)
+{
+ struct sockaddr_storage *peer_addr;
+ struct dtr_connect_state *cs;
+ struct dtr_path *path;
+ struct drbd_path *drbd_path;
+ struct dtr_cm *cm;
+ int err;
+
+ *ret_cm = NULL;
+ peer_addr = &new_cm_id->route.addr.dst_addr;
+
+ spin_lock(&listener->listener.waiters_lock);
+ drbd_path = drbd_find_path_by_addr(&listener->listener, peer_addr);
+ if (drbd_path)
+ kref_get(&drbd_path->kref);
+ spin_unlock(&listener->listener.waiters_lock);
+
+ if (!drbd_path) {
+ struct sockaddr_in6 *from_sin6;
+ struct sockaddr_in *from_sin;
+
+ switch (peer_addr->ss_family) {
+ case AF_INET6:
+ from_sin6 = (struct sockaddr_in6 *)peer_addr;
+ pr_warn("Closing unexpected connection from "
+ "%pI6\n", &from_sin6->sin6_addr);
+ break;
+ case AF_INET:
+ from_sin = (struct sockaddr_in *)peer_addr;
+ pr_warn("Closing unexpected connection from "
+ "%pI4\n", &from_sin->sin_addr);
+ break;
+ default:
+ pr_warn("Closing unexpected connection family = %d\n",
+ peer_addr->ss_family);
+ }
+
+ rdma_reject(new_cm_id, NULL, 0, IB_CM_REJ_CONSUMER_DEFINED);
+ return -EAGAIN;
+ }
+
+ path = container_of(drbd_path, struct dtr_path, path);
+ cs = &path->cs;
+ if (atomic_read(&cs->passive_state) < PCS_CONNECTING)
+ goto reject;
+
+ cm = dtr_alloc_cm(path);
+ if (!cm) {
+ pr_err("rejecting connecting since -ENOMEM for cm\n");
+ goto reject;
+ }
+
+ cm->state = DSM_CONNECT_REQ;
+ init_waitqueue_head(&cm->state_wq);
+ new_cm_id->context = cm;
+ cm->id = new_cm_id;
+ *ret_cm = cm;
+
+ /* Expecting RDMA_CM_EVENT_ESTABLISHED, after rdma_accept(). Get
+ the ref before dtr_path_prepare(), since that exposes the cm
+ to the path, and the path might get destroyed, and with that
+ going to put the cm */
+ kref_get(&cm->kref);
+
+ /* Gifting the initial kref to the path->cm pointer */
+ err = dtr_path_prepare(path, cm, false);
+ if (err) {
+ /* Returning the cm via ret_cm and an error causes the caller to put one ref */
+ goto reject;
+ }
+ kref_put(&drbd_path->kref, drbd_destroy_path);
+
+ err = rdma_accept(new_cm_id, &dtr_conn_param);
+ if (err)
+ kref_put(&cm->kref, dtr_destroy_cm);
+
+ return err;
+
+reject:
+ rdma_reject(new_cm_id, NULL, 0, IB_CM_REJ_CONSUMER_DEFINED);
+ kref_put(&drbd_path->kref, drbd_destroy_path);
+ return -EAGAIN;
+}
+
+static int dtr_start_try_connect(struct dtr_connect_state *cs)
+{
+ struct dtr_path *path = container_of(cs, struct dtr_path, cs);
+ struct drbd_transport *transport = path->path.transport;
+ struct dtr_cm *cm;
+ int err = -ENOMEM;
+
+ cm = dtr_alloc_cm(path);
+ if (!cm)
+ goto out;
+
+ err = dtr_create_cm_id(cm, path->path.net);
+ if (err) {
+ tr_err(transport, "rdma_create_id() failed %d\n", err);
+ goto out;
+ }
+
+ /* Holding the initial reference on cm, expecting RDMA_CM_EVENT_ADDR_RESOLVED */
+ err = rdma_resolve_addr(cm->id, NULL,
+ (struct sockaddr *)&path->path.peer_addr,
+ 2000);
+ if (err) {
+ tr_err(transport, "rdma_resolve_addr error %d\n", err);
+ goto out;
+ }
+
+ return 0;
+out:
+ if (cm)
+ kref_put(&cm->kref, dtr_destroy_cm);
+ return err;
+}
+
+static void dtr_cma_retry_connect_work_fn(struct work_struct *work)
+{
+ struct dtr_connect_state *cs = container_of(work, struct dtr_connect_state, retry_connect_work.work);
+ enum connect_state_enum p;
+ int err;
+
+ p = atomic_cmpxchg(&cs->active_state, PCS_REQUEST_ABORT, PCS_INACTIVE);
+ if (p != PCS_CONNECTING) {
+ wake_up(&cs->wq);
+ return;
+ }
+
+ err = dtr_start_try_connect(cs);
+ if (err) {
+ struct dtr_path *path = container_of(cs, struct dtr_path, cs);
+ struct drbd_transport *transport = path->path.transport;
+
+ tr_err(transport, "dtr_start_try_connect failed %d\n", err);
+ schedule_delayed_work(&cs->retry_connect_work, HZ);
+ }
+}
+
+static void dtr_remove_cm_from_path(struct dtr_path *path, struct dtr_cm *failed_cm)
+{
+ struct dtr_cm *cm;
+
+ cm = cmpxchg(&path->cm, failed_cm, NULL); // RCU &path->cm
+ if (cm == failed_cm && cm->id && cm->id->qp) {
+ struct drbd_transport *transport = path->path.transport;
+ struct ib_qp_attr attr = { .qp_state = IB_QPS_ERR };
+ int err;
+
+ err = ib_modify_qp(cm->id->qp, &attr, IB_QP_STATE);
+ if (err)
+ tr_err(transport, "ib_modify_qp failed %d\n", err);
+
+ kref_put(&cm->kref, dtr_destroy_cm);
+ }
+}
+
+static void dtr_cma_retry_connect(struct dtr_path *path, struct dtr_cm *failed_cm)
+{
+ struct drbd_transport *transport = path->path.transport;
+ struct dtr_connect_state *cs = &path->cs;
+ long connect_int = 10 * HZ;
+ struct net_conf *nc;
+ int a;
+
+ dtr_remove_cm_from_path(path, failed_cm);
+
+ a = atomic_read(&cs->active_state);
+ if (a == PCS_INACTIVE) {
+ return;
+ } else if (a == PCS_CONNECTING) {
+ rcu_read_lock();
+ nc = rcu_dereference(transport->net_conf);
+ if (nc)
+ connect_int = nc->connect_int * HZ;
+ rcu_read_unlock();
+ } else {
+ connect_int = 1;
+ }
+ schedule_delayed_work(&cs->retry_connect_work, connect_int);
+}
+
+static void dtr_cma_connect_work_fn(struct work_struct *work)
+{
+ struct dtr_cm *cm = container_of(work, struct dtr_cm, connect_work);
+ struct dtr_path *path = cm->path;
+ struct drbd_transport *transport = path->path.transport;
+ enum connect_state_enum p;
+ int err;
+
+ p = atomic_cmpxchg(&path->cs.active_state, PCS_REQUEST_ABORT, PCS_INACTIVE);
+ if (p != PCS_CONNECTING) {
+ wake_up(&path->cs.wq);
+ kref_put(&cm->kref, dtr_destroy_cm); /* for work */
+ return;
+ }
+
+ kref_get(&cm->kref); /* for the path->cm pointer */
+ err = dtr_path_prepare(path, cm, true);
+ if (err) {
+ tr_err(transport, "dtr_path_prepare() = %d\n", err);
+ goto out;
+ }
+
+ kref_get(&cm->kref); /* Expecting RDMA_CM_EVENT_ESTABLISHED */
+ set_bit(DSB_CONNECTING, &cm->state);
+ err = rdma_connect(cm->id, &dtr_conn_param);
+ if (err) {
+ if (test_and_clear_bit(DSB_CONNECTING, &cm->state))
+ kref_put(&cm->kref, dtr_destroy_cm); /* no _EVENT_ESTABLISHED */
+ tr_err(transport, "rdma_connect error %d\n", err);
+ goto out;
+ }
+
+ kref_put(&cm->kref, dtr_destroy_cm); /* for work */
+ return;
+out:
+ kref_put(&cm->kref, dtr_destroy_cm); /* for work */
+ dtr_cma_retry_connect(path, cm);
+}
+
+static void dtr_cma_disconnect_work_fn(struct work_struct *work)
+{
+ struct dtr_cm *cm = container_of(work, struct dtr_cm, disconnect_work);
+ struct dtr_path *path = cm->path;
+ struct drbd_transport *transport = path->path.transport;
+ struct dtr_transport *rdma_transport =
+ container_of(transport, struct dtr_transport, transport);
+ struct drbd_path *drbd_path = &path->path;
+ bool destroyed;
+ int err;
+
+ err = cm != path->cm;
+ kref_put(&cm->kref, dtr_destroy_cm);
+ if (err)
+ return;
+
+ destroyed = test_bit(TR_UNREGISTERED, &drbd_path->flags) || rdma_transport->active == false;
+ if (test_and_clear_bit(TR_ESTABLISHED, &drbd_path->flags) && !destroyed)
+ drbd_path_event(transport, drbd_path);
+
+ if (!dtr_transport_ok(transport))
+ drbd_control_event(transport, CLOSED_BY_PEER);
+
+ if (destroyed)
+ return;
+
+ /* in dtr_disconnect_path() -> __dtr_uninit_path() we free the previous
+ cm. That causes the reference on the path to be dropped.
+ In dtr_activate_path() -> dtr_start_try_connect() we allocate a new
+ cm, that holds a reference on the path again.
+
+ Bridge the gap with a reference here!
+ */
+
+ kref_get(&path->path.kref);
+ dtr_disconnect_path(path);
+
+ /* dtr_disconnect_path() may take time, recheck here... */
+ if (test_bit(TR_UNREGISTERED, &drbd_path->flags) || rdma_transport->active == false)
+ goto abort;
+
+ if (!dtr_transport_ok(transport)) {
+ /* If there is no other connected path mark the connection as
+ no longer active. Do not try to re-establish this path!! */
+ rdma_transport->active = false;
+ goto abort;
+ }
+
+ err = dtr_activate_path(path);
+ if (err)
+ tr_err(transport, "dtr_activate_path() = %d\n", err);
+abort:
+ kref_put(&path->path.kref, drbd_destroy_path);
+}
+
+static void dtr_cma_disconnect(struct dtr_cm *cm)
+{
+ kref_get(&cm->kref);
+ schedule_work(&cm->disconnect_work);
+}
+
+static int dtr_cma_event_handler(struct rdma_cm_id *cm_id, struct rdma_cm_event *event)
+{
+ int err;
+ /* context comes from rdma_create_id() */
+ struct dtr_cm *cm = cm_id->context;
+ struct dtr_listener *listener;
+ bool connecting;
+
+ if (!cm) {
+ pr_err("id %p event %d, but no context!\n", cm_id, event->event);
+ return 0;
+ }
+
+ switch (event->event) {
+ case RDMA_CM_EVENT_ADDR_RESOLVED:
+ // pr_info("%s: RDMA_CM_EVENT_ADDR_RESOLVED\n", cm->name);
+ kref_get(&cm->kref); /* Expecting RDMA_CM_EVENT_ROUTE_RESOLVED */
+ err = rdma_resolve_route(cm_id, 2000);
+ if (err) {
+ kref_put(&cm->kref, dtr_destroy_cm);
+ pr_err("rdma_resolve_route error %d\n", err);
+ }
+ break;
+
+ case RDMA_CM_EVENT_ROUTE_RESOLVED:
+ // pr_info("%s: RDMA_CM_EVENT_ROUTE_RESOLVED\n", cm->name);
+
+ kref_get(&cm->kref);
+ schedule_work(&cm->connect_work);
+ break;
+
+ case RDMA_CM_EVENT_CONNECT_REQUEST:
+ // pr_info("%s: RDMA_CM_EVENT_CONNECT_REQUEST\n", cm->name);
+ /* for listener */
+
+ listener = container_of(cm, struct dtr_listener, cm);
+ err = dtr_cma_accept(listener, cm_id, &cm);
+
+ /* I found this a bit confusing. When a new connection comes in, the callback
+ gets called with a new rdma_cm_id. The new rdma_cm_id inherits its context
+ pointer from the listening rdma_cm_id. The new context gets created in
+ dtr_cma_accept() and is put into &cm here.
+ cm now contains the accepted connection (no longer the listener); */
+ if (err) {
+ if (!cm)
+ return 1; /* caller destroy the cm_id */
+ break; /* drop the last ref of cm at function exit */
+ }
+ return 0; /* do not touch kref of the new connection */
+
+ case RDMA_CM_EVENT_CONNECT_RESPONSE:
+ // pr_info("%s: RDMA_CM_EVENT_CONNECT_RESPONSE\n", cm->name);
+ /*cm->path->cm = cm;
+ dtr_path_established(cm->path); */
+ break;
+
+ case RDMA_CM_EVENT_ESTABLISHED:
+ // pr_info("%s: RDMA_CM_EVENT_ESTABLISHED\n", cm->name);
+ /* cm->state = DSM_CONNECTED; is set later in the work item */
+ /* This is called for active and passive connections */
+
+ connecting = test_and_clear_bit(DSB_CONNECTING, &cm->state) ||
+ test_and_clear_bit(DSB_CONNECT_REQ, &cm->state);
+ kref_get(&cm->kref); /* connected -> expect a disconnect in the future */
+ kref_get(&cm->kref); /* for the work */
+ schedule_work(&cm->establish_work);
+
+ if (!connecting)
+ return 0; /* keep ref; __dtr_disconnect_path() won */
+ break;
+
+ case RDMA_CM_EVENT_ADDR_ERROR:
+ // pr_info("%s: RDMA_CM_EVENT_ADDR_ERROR\n", cm->name);
+ case RDMA_CM_EVENT_ROUTE_ERROR:
+ // pr_info("%s: RDMA_CM_EVENT_ROUTE_ERROR\n", cm->name);
+ set_bit(DSB_ERROR, &cm->state);
+
+ dtr_cma_retry_connect(cm->path, cm);
+ break;
+
+ case RDMA_CM_EVENT_CONNECT_ERROR:
+ // pr_info("%s: RDMA_CM_EVENT_CONNECT_ERROR\n", cm->name);
+ case RDMA_CM_EVENT_UNREACHABLE:
+ // pr_info("%s: RDMA_CM_EVENT_UNREACHABLE\n", cm->name);
+ case RDMA_CM_EVENT_REJECTED:
+ // pr_info("%s: RDMA_CM_EVENT_REJECTED\n", cm->name);
+ // pr_info("event = %d, status = %d\n", event->event, event->status);
+ set_bit(DSB_ERROR, &cm->state);
+
+ dtr_cma_retry_connect(cm->path, cm);
+ connecting = test_and_clear_bit(DSB_CONNECTING, &cm->state) ||
+ test_and_clear_bit(DSB_CONNECT_REQ, &cm->state);
+ if (!connecting)
+ return 0; /* keep ref; __dtr_disconnect_path() won */
+ break;
+
+ case RDMA_CM_EVENT_DISCONNECTED:
+ // pr_info("%s: RDMA_CM_EVENT_DISCONNECTED\n", cm->name);
+ if (!test_and_clear_bit(DSB_CONNECTED, &cm->state))
+ return 0; /* keep ref on cm; probably a tx_timeout */
+
+ dtr_cma_disconnect(cm);
+ break;
+
+ case RDMA_CM_EVENT_DEVICE_REMOVAL:
+ // pr_info("%s: RDMA_CM_EVENT_DEVICE_REMOVAL\n", cm->name);
+ return 0;
+
+ case RDMA_CM_EVENT_TIMEWAIT_EXIT:
+ return 0;
+
+ default:
+ pr_warn("id %p context %p unexpected event %d!\n",
+ cm_id, cm, event->event);
+ return 0;
+ }
+ wake_up(&cm->state_wq);
+
+ /* by returning 1 we instruct the caller to destroy the cm_id. We
+ are not allowed to free it within the callback, since that deadlocks! */
+ return kref_put(&cm->kref, dtr_destroy_cm_keep_id);
+}
+
+static int dtr_create_cm_id(struct dtr_cm *cm, struct net *net)
+{
+ struct rdma_cm_id *id;
+
+ cm->state = 0;
+ init_waitqueue_head(&cm->state_wq);
+
+ id = rdma_create_id(net, dtr_cma_event_handler, cm, RDMA_PS_TCP, IB_QPT_RC);
+ if (IS_ERR(id)) {
+ cm->id = NULL;
+ set_bit(DSB_ERROR, &cm->state);
+ return PTR_ERR(id);
+ }
+
+ cm->id = id;
+ return 0;
+}
+
+/* Number of rx_descs the peer does not know */
+static int dtr_new_rx_descs(struct dtr_flow *flow)
+{
+ int posted, known;
+
+ posted = atomic_read(&flow->rx_descs_posted);
+ smp_rmb(); /* smp_wmb() is in dtr_rx_cqe_done() */
+ known = atomic_read(&flow->rx_descs_known_to_peer);
+
+ /* If the two decrements in dtr_rx_cqe_done() execute in
+ * parallel our result might be one too low, that does not matter.
+ * Only make sure to never return a -1 because that would matter! */
+ return max(posted - known, 0);
+}
+
+static struct dtr_rx_desc *dtr_next_rx_desc(struct dtr_stream *rdma_stream)
+{
+ struct dtr_rx_desc *rx_desc;
+
+ spin_lock_irq(&rdma_stream->rx_descs_lock);
+ rx_desc = list_first_entry_or_null(&rdma_stream->rx_descs, struct dtr_rx_desc, list);
+ if (rx_desc) {
+ if (rx_desc->sequence == rdma_stream->rx_sequence) {
+ list_del(&rx_desc->list);
+ rdma_stream->rx_sequence =
+ (rdma_stream->rx_sequence + 1) & ((1UL << SEQUENCE_BITS) - 1);
+ rdma_stream->unread -= rx_desc->size;
+ } else {
+ rx_desc = NULL;
+ }
+ }
+ spin_unlock_irq(&rdma_stream->rx_descs_lock);
+
+ return rx_desc;
+}
+
+static bool dtr_receive_rx_desc(struct dtr_transport *rdma_transport,
+ enum drbd_stream stream,
+ struct dtr_rx_desc **ptr_rx_desc)
+{
+ struct dtr_stream *rdma_stream = &rdma_transport->stream[stream];
+ struct dtr_rx_desc *rx_desc;
+
+ rx_desc = dtr_next_rx_desc(rdma_stream);
+
+ if (rx_desc) {
+ struct dtr_cm *cm = rx_desc->cm;
+ struct dtr_transport *rdma_transport =
+ container_of(cm->path->path.transport, struct dtr_transport, transport);
+
+ INIT_LIST_HEAD(&rx_desc->list);
+ ib_dma_sync_single_for_cpu(cm->id->device, rx_desc->sge.addr,
+ rdma_transport->rx_allocation_size, DMA_FROM_DEVICE);
+ *ptr_rx_desc = rx_desc;
+ return true;
+ } else {
+ /* The waiting thread gets woken up if a packet arrived, or if there is no
+ new packet but we need to tell the peer about space in our receive window */
+ struct dtr_path *path;
+
+ rcu_read_lock();
+ list_for_each_entry_rcu(path, &rdma_transport->transport.paths, path.list) {
+ struct dtr_flow *flow = &path->flow[stream];
+
+ if (atomic_read(&flow->rx_descs_known_to_peer) <
+ atomic_read(&flow->rx_descs_posted) / 8)
+ dtr_send_flow_control_msg(path, GFP_ATOMIC);
+ }
+ rcu_read_unlock();
+ }
+
+ return false;
+}
+
+static int dtr_send_flow_control_msg(struct dtr_path *path, gfp_t gfp_mask)
+{
+ struct dtr_flow_control msg;
+ struct dtr_flow *flow;
+ enum drbd_stream i;
+ int err, n[2], send_from_stream = -1, rx_descs = 0;
+
+ msg.magic = cpu_to_be32(DTR_MAGIC);
+
+ spin_lock_bh(&path->send_flow_control_lock);
+ /* dtr_send_flow_control_msg() is called from the receiver thread and
+ areceiver, asender (multiple threads).
+ determining the number of new tx_descs and subtracting this number
+ from rx_descs_known_to_peer has to be atomic!
+ */
+ for (i = DATA_STREAM; i <= CONTROL_STREAM; i++) {
+ flow = &path->flow[i];
+
+ n[i] = dtr_new_rx_descs(flow);
+ atomic_add(n[i], &flow->rx_descs_known_to_peer);
+ rx_descs += n[i];
+
+ msg.new_rx_descs[i] = cpu_to_be32(n[i]);
+ if (send_from_stream == -1 &&
+ atomic_read(&flow->tx_descs_posted) < flow->tx_descs_max &&
+ atomic_dec_if_positive(&flow->peer_rx_descs) >= 0)
+ send_from_stream = i;
+ }
+ spin_unlock_bh(&path->send_flow_control_lock);
+
+ if (send_from_stream == -1) {
+ struct drbd_transport *transport = path->path.transport;
+ struct dtr_transport *rdma_transport =
+ container_of(transport, struct dtr_transport, transport);
+
+ if (__ratelimit(&rdma_transport->rate_limit))
+ tr_err(transport, "Not sending flow_control msg, no receive window!\n");
+ err = -ENOBUFS;
+ goto out_undo;
+ }
+
+ flow = &path->flow[send_from_stream];
+ if (rx_descs == 0 || !atomic_inc_if_below(&flow->tx_descs_posted, flow->tx_descs_max)) {
+ atomic_inc(&flow->peer_rx_descs);
+ return 0;
+ }
+
+ msg.send_from_stream = cpu_to_be32(send_from_stream);
+ err = dtr_send(path, &msg, sizeof(msg), gfp_mask);
+ if (err) {
+ atomic_inc(&flow->peer_rx_descs);
+ atomic_dec(&flow->tx_descs_posted);
+out_undo:
+ for (i = DATA_STREAM; i <= CONTROL_STREAM; i++) {
+ flow = &path->flow[i];
+ atomic_sub(n[i], &flow->rx_descs_known_to_peer);
+ }
+ }
+ return err;
+}
+
+static void dtr_flow_control(struct dtr_flow *flow, gfp_t gfp_mask)
+{
+ int n, known_to_peer = atomic_read(&flow->rx_descs_known_to_peer);
+ int tx_descs_max = flow->tx_descs_max;
+
+ n = dtr_new_rx_descs(flow);
+ if (n > tx_descs_max / 8 || known_to_peer < tx_descs_max / 8)
+ dtr_send_flow_control_msg(flow->path, gfp_mask);
+}
+
+static int dtr_got_flow_control_msg(struct dtr_path *path,
+ struct dtr_flow_control *msg)
+{
+ struct dtr_transport *rdma_transport =
+ container_of(path->path.transport, struct dtr_transport, transport);
+ struct dtr_flow *flow;
+ int i, n;
+
+ for (i = CONTROL_STREAM; i >= DATA_STREAM; i--) {
+ uint32_t new_rx_descs = be32_to_cpu(msg->new_rx_descs[i]);
+ flow = &path->flow[i];
+
+ n = atomic_add_return(new_rx_descs, &flow->peer_rx_descs);
+ wake_up_interruptible(&rdma_transport->stream[i].send_wq);
+ }
+
+ /* rdma_stream is the data_stream here... */
+ if (n >= DESCS_LOW_LEVEL) {
+ int tx_descs_posted = atomic_read(&flow->tx_descs_posted);
+ if (flow->tx_descs_max - tx_descs_posted >= DESCS_LOW_LEVEL)
+ clear_bit(NET_CONGESTED, &rdma_transport->transport.flags);
+ }
+
+ return be32_to_cpu(msg->send_from_stream);
+}
+
+static void dtr_flow_control_tasklet_fn(struct tasklet_struct *t)
+{
+ struct dtr_path *path = from_tasklet(path, t, flow_control_tasklet);
+
+ dtr_send_flow_control_msg(path, GFP_ATOMIC);
+}
+
+static void dtr_maybe_trigger_flow_control_msg(struct dtr_path *path, int send_from_stream)
+{
+ struct dtr_flow *flow;
+ int n;
+
+ flow = &path->flow[send_from_stream];
+ n = atomic_dec_return(&flow->rx_descs_known_to_peer);
+ /* If we get a lot of flow control messages in, but no data on this
+ * path, we need to tell the peer that we recycled all these buffers
+ */
+ if (n < atomic_read(&flow->rx_descs_posted) / 8)
+ tasklet_schedule(&path->flow_control_tasklet);
+}
+
+static void dtr_tx_timeout_work_fn(struct work_struct *work)
+{
+ struct dtr_cm *cm = container_of(work, struct dtr_cm, tx_timeout_work);
+ struct drbd_transport *transport;
+ struct dtr_path *path = cm->path;
+
+ if (!test_and_clear_bit(DSB_CONNECTED, &cm->state) || !path)
+ goto out;
+
+ transport = path->path.transport;
+ tr_warn(transport, "%pI4 - %pI4: tx timeout\n",
+ &((struct sockaddr_in *)&path->path.my_addr)->sin_addr,
+ &((struct sockaddr_in *)&path->path.peer_addr)->sin_addr);
+
+ dtr_remove_cm_from_path(path, cm);
+
+ /* It is not sure that a RDMA_CM_EVENT_DISCONNECTED will be delivered.
+ * Dropping ref for that here. In case it is delivered we will not drop
+ * the ref in dtr_cma_event_handler() due to clearing DSB_CONNECTED
+ * from cm->state */
+ kref_put(&cm->kref, dtr_destroy_cm);
+
+ clear_bit(TR_ESTABLISHED, &path->path.flags);
+ drbd_path_event(transport, &path->path);
+
+ if (!dtr_transport_ok(transport)) {
+ struct dtr_transport *rdma_transport =
+ container_of(transport, struct dtr_transport, transport);
+
+ drbd_control_event(transport, CLOSED_BY_PEER);
+ rdma_transport->active = false;
+ } else {
+ dtr_activate_path(path);
+ }
+
+out:
+ kref_put(&cm->kref, dtr_destroy_cm); /* for work (armed timer) */
+}
+
+static void dtr_tx_timeout_fn(struct timer_list *t)
+{
+ struct dtr_cm *cm = timer_container_of(cm, t, tx_timeout);
+
+ /* cm->kref for armed timer becomes a ref for the work */
+ schedule_work(&cm->tx_timeout_work);
+}
+
+static bool higher_in_sequence(unsigned int higher, unsigned int base)
+{
+ /*
+ SEQUENCE Arithmetic: By looking at the most signifficant bit of
+ the reduced word size we find out if the difference is positive.
+ The difference is necessary to deal with the overflow in the
+ sequence number space.
+ */
+ unsigned int diff = higher - base;
+
+ return !(diff & (1 << (SEQUENCE_BITS - 1)));
+}
+
+static void __dtr_order_rx_descs(struct dtr_stream *rdma_stream,
+ struct dtr_rx_desc *rx_desc)
+{
+ struct dtr_rx_desc *pos;
+ unsigned int seq = rx_desc->sequence;
+
+ list_for_each_entry_reverse(pos, &rdma_stream->rx_descs, list) {
+ if (higher_in_sequence(seq, pos->sequence)) { /* think: seq > pos->sequence */
+ list_add(&rx_desc->list, &pos->list);
+ return;
+ }
+ }
+ list_add(&rx_desc->list, &rdma_stream->rx_descs);
+}
+
+static void dtr_order_rx_descs(struct dtr_stream *rdma_stream,
+ struct dtr_rx_desc *rx_desc)
+{
+ unsigned long flags;
+
+ spin_lock_irqsave(&rdma_stream->rx_descs_lock, flags);
+ __dtr_order_rx_descs(rdma_stream, rx_desc);
+ rdma_stream->unread += rx_desc->size;
+ spin_unlock_irqrestore(&rdma_stream->rx_descs_lock, flags);
+}
+
+static void dtr_dec_rx_descs(struct dtr_cm *cm)
+{
+ struct dtr_flow *flow = cm->path->flow;
+ struct dtr_transport *rdma_transport = cm->rdma_transport;
+
+ /* When we get the posted rx_descs back, we do not know if they
+ * where accoutend for the data stream or the control stream...
+ */
+ if (atomic_dec_if_positive(&flow[DATA_STREAM].rx_descs_posted) >= 0)
+ return;
+
+ if (atomic_dec_if_positive(&flow[CONTROL_STREAM].rx_descs_posted) >= 0)
+ return;
+
+ if (__ratelimit(&rdma_transport->rate_limit)) {
+ struct drbd_transport *transport = &rdma_transport->transport;
+
+ tr_warn(transport, "rx_descs_posted underflow avoided\n");
+ }
+}
+
+static void dtr_control_data_ready(struct dtr_stream *rdma_stream, struct dtr_rx_desc *rx_desc)
+{
+ struct dtr_transport *rdma_transport = rdma_stream->rdma_transport;
+ struct drbd_transport *transport = &rdma_transport->transport;
+ struct drbd_const_buffer buffer;
+ struct dtr_cm *cm = rx_desc->cm;
+ struct dtr_path *path = cm->path;
+ struct dtr_flow *flow = &path->flow[CONTROL_STREAM];
+
+ if (atomic_read(&flow->rx_descs_known_to_peer) < atomic_read(&flow->rx_descs_posted) / 8)
+ dtr_send_flow_control_msg(path, GFP_ATOMIC);
+
+ ib_dma_sync_single_for_cpu(cm->id->device, rx_desc->sge.addr,
+ rdma_transport->rx_allocation_size, DMA_FROM_DEVICE);
+
+ buffer.buffer = page_address(rx_desc->page);
+ buffer.avail = rx_desc->size;
+ drbd_control_data_ready(transport, &buffer);
+
+ dtr_recycle_rx_desc(transport, CONTROL_STREAM, &rx_desc, GFP_ATOMIC);
+}
+
+static void __dtr_order_rx_descs_front(struct dtr_stream *rdma_stream,
+ struct dtr_rx_desc *rx_desc)
+{
+ struct dtr_rx_desc *pos;
+ unsigned int seq = rx_desc->sequence;
+
+ list_for_each_entry(pos, &rdma_stream->rx_descs, list) {
+ if (higher_in_sequence(seq, pos->sequence)) { /* think: seq > pos->sequence */
+ list_add(&rx_desc->list, &pos->list);
+ return;
+ }
+ }
+ list_add(&rx_desc->list, &rdma_stream->rx_descs);
+}
+
+static void dtr_control_tasklet_fn(struct tasklet_struct *t)
+{
+ struct dtr_transport *rdma_transport =
+ from_tasklet(rdma_transport, t, control_tasklet);
+ struct dtr_stream *rdma_stream = &rdma_transport->stream[CONTROL_STREAM];
+ struct dtr_rx_desc *rx_desc, *tmp;
+ LIST_HEAD(rx_descs);
+
+ spin_lock_irq(&rdma_stream->rx_descs_lock);
+ list_splice_init(&rdma_stream->rx_descs, &rx_descs);
+ spin_unlock_irq(&rdma_stream->rx_descs_lock);
+
+ list_for_each_entry_safe(rx_desc, tmp, &rx_descs, list) {
+ if (rx_desc->sequence != rdma_stream->rx_sequence)
+ goto abort;
+ list_del(&rx_desc->list);
+ rdma_stream->rx_sequence =
+ (rdma_stream->rx_sequence + 1) & ((1UL << SEQUENCE_BITS) - 1);
+ rdma_stream->unread -= rx_desc->size;
+ dtr_control_data_ready(rdma_stream, rx_desc);
+ }
+ return;
+
+abort:
+ spin_lock_irq(&rdma_stream->rx_descs_lock);
+ list_for_each_entry_safe(rx_desc, tmp, &rx_descs, list) {
+ list_del(&rx_desc->list);
+ __dtr_order_rx_descs_front(rdma_stream, rx_desc);
+ }
+ spin_unlock_irq(&rdma_stream->rx_descs_lock);
+
+ tasklet_schedule(&rdma_transport->control_tasklet);
+}
+
+static void dtr_rx_cqe_done(struct ib_cq *cq, struct ib_wc *wc)
+{
+ struct dtr_rx_desc *rx_desc = container_of(wc->wr_cqe, struct dtr_rx_desc, cqe);
+ struct dtr_cm *cm = rx_desc->cm;
+ struct dtr_path *path = cm->path;
+ struct dtr_transport *rdma_transport =
+ container_of(path->path.transport, struct dtr_transport, transport);
+ union dtr_immediate immediate;
+ int err;
+
+ if (wc->status != IB_WC_SUCCESS || !(wc->opcode & IB_WC_RECV)) {
+ struct drbd_transport *transport = &rdma_transport->transport;
+ unsigned long irq_flags;
+
+ switch (wc->status) {
+ case IB_WC_WR_FLUSH_ERR:
+ /* "Work Request Flushed Error: A Work Request was in
+ * process or outstanding when the QP transitioned into
+ * the Error State."
+ *
+ * Which is not entirely unexpected...
+ */
+ break;
+
+ default:
+ if (__ratelimit(&rdma_transport->rate_limit)) {
+ tr_warn(transport,
+ "wc.status = %d (%s), wc.opcode = %d (%s)\n",
+ wc->status, wc->status == IB_WC_SUCCESS ? "ok" : "bad",
+ wc->opcode, wc->opcode & IB_WC_RECV ? "ok" : "bad");
+
+ tr_warn(transport,
+ "wc.vendor_err = %d, wc.byte_len = %d wc.imm_data = %d\n",
+ wc->vendor_err, wc->byte_len, wc->ex.imm_data);
+ }
+ }
+
+ /* dtr_free_rx_desc() will call drbd_free_page(), and that function
+ * should not be called from softirq context.
+ */
+ spin_lock_irqsave(&cm->error_rx_descs_lock, irq_flags);
+ list_add_tail(&rx_desc->list, &cm->error_rx_descs);
+ spin_unlock_irqrestore(&cm->error_rx_descs_lock, irq_flags);
+ dtr_dec_rx_descs(cm);
+ set_bit(DSB_ERROR, &cm->state);
+
+ kref_get(&cm->kref);
+ if (!schedule_work(&cm->end_rx_work))
+ kref_put(&cm->kref, dtr_destroy_cm);
+
+ return;
+ }
+
+ rx_desc->size = wc->byte_len;
+ immediate.i = be32_to_cpu(wc->ex.imm_data);
+ if (immediate.stream == ST_FLOW_CTRL) {
+ int send_from_stream;
+
+ ib_dma_sync_single_for_cpu(cm->id->device, rx_desc->sge.addr,
+ rdma_transport->rx_allocation_size, DMA_FROM_DEVICE);
+ send_from_stream = dtr_got_flow_control_msg(path, page_address(rx_desc->page));
+ err = dtr_repost_rx_desc(cm, rx_desc);
+ if (err)
+ tr_err(&rdma_transport->transport, "dtr_repost_rx_desc() failed %d", err);
+ dtr_maybe_trigger_flow_control_msg(path, send_from_stream);
+ } else {
+ struct dtr_flow *flow = &path->flow[immediate.stream];
+ struct dtr_stream *rdma_stream = &rdma_transport->stream[immediate.stream];
+
+ atomic_dec(&flow->rx_descs_posted);
+ smp_wmb(); /* smp_rmb() is in dtr_new_rx_descs() */
+ atomic_dec(&flow->rx_descs_known_to_peer);
+
+ if (immediate.stream == ST_CONTROL)
+ mod_timer(&rdma_transport->control_timer, jiffies + rdma_stream->recv_timeout);
+
+ rx_desc->sequence = immediate.sequence;
+ dtr_order_rx_descs(rdma_stream, rx_desc);
+
+ if (immediate.stream == ST_CONTROL)
+ tasklet_schedule(&rdma_transport->control_tasklet);
+ else
+ wake_up_interruptible(&rdma_stream->recv_wq);
+ }
+
+ if (dtr_path_ok(path)) {
+ struct dtr_flow *flow = &path->flow[DATA_STREAM];
+
+ if (atomic_read(&flow->rx_descs_posted) < flow->rx_descs_want_posted / 2)
+ schedule_work(&path->refill_rx_descs_work);
+ }
+}
+
+static void dtr_free_tx_desc(struct dtr_cm *cm, struct dtr_tx_desc *tx_desc)
+{
+ struct ib_device *device = cm->id->device;
+ struct bio_vec bvec;
+ struct bvec_iter iter;
+ int i, nr_sges;
+
+ switch (tx_desc->type) {
+ case SEND_PAGE:
+ ib_dma_unmap_page(device, tx_desc->sge[0].addr, tx_desc->sge[0].length, DMA_TO_DEVICE);
+ put_page(tx_desc->page);
+ break;
+ case SEND_MSG:
+ ib_dma_unmap_single(device, tx_desc->sge[0].addr, tx_desc->sge[0].length, DMA_TO_DEVICE);
+ kfree(tx_desc->data);
+ break;
+ case SEND_BIO:
+ nr_sges = tx_desc->nr_sges;
+ for (i = 0; i < nr_sges; i++)
+ ib_dma_unmap_page(device, tx_desc->sge[i].addr, tx_desc->sge[i].length,
+ DMA_TO_DEVICE);
+ bio_for_each_segment(bvec, tx_desc->bio, iter) {
+ put_page(bvec.bv_page);
+ }
+ break;
+ }
+ kfree(tx_desc);
+}
+
+static void dtr_tx_cqe_done(struct ib_cq *cq, struct ib_wc *wc)
+{
+ struct dtr_tx_desc *tx_desc = container_of(wc->wr_cqe, struct dtr_tx_desc, cqe);
+ struct dtr_cm *cm = cq->cq_context;
+ struct dtr_path *path = cm->path;
+ struct dtr_transport *rdma_transport =
+ container_of(path->path.transport, struct dtr_transport, transport);
+ struct dtr_flow *flow;
+ struct dtr_stream *rdma_stream;
+ enum dtr_stream_nr stream_nr = tx_desc->imm.stream;
+ int err;
+
+ if (stream_nr != ST_FLOW_CTRL) {
+ flow = &path->flow[stream_nr];
+ rdma_stream = &rdma_transport->stream[stream_nr];
+ } else {
+ struct dtr_flow_control *msg = (struct dtr_flow_control *)tx_desc->data;
+ enum dtr_stream_nr send_from_stream = be32_to_cpu(msg->send_from_stream);
+
+ flow = &path->flow[send_from_stream];
+ rdma_stream = &rdma_transport->stream[send_from_stream];
+ }
+
+ if (wc->status != IB_WC_SUCCESS || wc->opcode != IB_WC_SEND) {
+ struct drbd_transport *transport = &rdma_transport->transport;
+
+ if (wc->status == IB_WC_RNR_RETRY_EXC_ERR) {
+ tr_err(transport, "tx_event: wc.status = IB_WC_RNR_RETRY_EXC_ERR\n");
+ tr_info(transport, "peer_rx_descs = %d", atomic_read(&flow->peer_rx_descs));
+ } else if (wc->status != IB_WC_WR_FLUSH_ERR) {
+ tr_err(transport, "tx_event: wc.status != IB_WC_SUCCESS %d\n", wc->status);
+ tr_err(transport, "wc.vendor_err = %d, wc.byte_len = %d wc.imm_data = %d\n",
+ wc->vendor_err, wc->byte_len, wc->ex.imm_data);
+ }
+
+ atomic_inc(&flow->peer_rx_descs);
+ set_bit(DSB_ERROR, &cm->state);
+
+ if (stream_nr != ST_FLOW_CTRL) {
+ err = dtr_repost_tx_desc(cm, tx_desc);
+ if (!err)
+ tx_desc = NULL; /* it is in the air again! Fly! */
+ else if (__ratelimit(&rdma_transport->rate_limit)) {
+ tr_warn(transport, "repost of tx_desc failed! %d\n", err);
+ drbd_control_event(transport, CLOSED_BY_PEER);
+ }
+ }
+ }
+
+ atomic_dec(&flow->tx_descs_posted);
+ wake_up_interruptible(&rdma_stream->send_wq);
+
+ if (tx_desc)
+ dtr_free_tx_desc(cm, tx_desc);
+ if (atomic_dec_and_test(&cm->tx_descs_posted)) {
+ bool was_active = timer_delete(&cm->tx_timeout);
+
+ if (was_active)
+ kref_put(&cm->kref, dtr_destroy_cm);
+
+ if (cm->state == DSM_CONNECTED)
+ kref_put(&cm->kref, dtr_destroy_cm); /* this is _not_ the last ref */
+ else
+ schedule_work(&cm->end_tx_work); /* the last ref might be put in this work */
+ }
+}
+
+static int dtr_create_qp(struct dtr_cm *cm, int rx_descs_max, int tx_descs_max)
+{
+ struct dtr_transport *rdma_transport =
+ container_of(cm->path->path.transport, struct dtr_transport, transport);
+ int err;
+
+ struct ib_qp_init_attr init_attr = {
+ .cap.max_send_wr = tx_descs_max,
+ .cap.max_recv_wr = rx_descs_max,
+ .cap.max_recv_sge = 1, /* We only receive into single pages */
+ .cap.max_send_sge = rdma_transport->sges_max,
+ .qp_type = IB_QPT_RC,
+ .send_cq = cm->send_cq,
+ .recv_cq = cm->recv_cq,
+ .sq_sig_type = IB_SIGNAL_REQ_WR
+ };
+
+ err = rdma_create_qp(cm->id, cm->pd, &init_attr);
+
+ return err;
+}
+
+static int dtr_post_rx_desc(struct dtr_cm *cm, struct dtr_rx_desc *rx_desc)
+{
+ struct dtr_transport *rdma_transport =
+ container_of(cm->path->path.transport, struct dtr_transport, transport);
+ struct ib_recv_wr recv_wr;
+ const struct ib_recv_wr *recv_wr_failed;
+ int err = -EIO;
+
+ recv_wr.next = NULL;
+ rx_desc->cqe.done = dtr_rx_cqe_done;
+ recv_wr.wr_cqe = &rx_desc->cqe;
+ recv_wr.sg_list = &rx_desc->sge;
+ recv_wr.num_sge = 1;
+
+ ib_dma_sync_single_for_device(cm->id->device,
+ rx_desc->sge.addr, rdma_transport->rx_allocation_size, DMA_FROM_DEVICE);
+
+ err = ib_post_recv(cm->id->qp, &recv_wr, &recv_wr_failed);
+ if (err)
+ tr_err(&rdma_transport->transport, "ib_post_recv error %d\n", err);
+
+ return err;
+}
+
+static void dtr_free_rx_desc(struct dtr_rx_desc *rx_desc)
+{
+ struct dtr_transport *rdma_transport;
+ struct dtr_path *path;
+ struct ib_device *device;
+ struct dtr_cm *cm;
+ int alloc_size;
+
+ if (!rx_desc)
+ return; /* Allow call with NULL */
+
+ cm = rx_desc->cm;
+ device = cm->id->device;
+ path = cm->path;
+ rdma_transport = container_of(path->path.transport, struct dtr_transport, transport);
+ alloc_size = rdma_transport->rx_allocation_size;
+ ib_dma_unmap_single(device, rx_desc->sge.addr, alloc_size, DMA_FROM_DEVICE);
+ kref_put(&cm->kref, dtr_destroy_cm);
+
+ if (rx_desc->page) {
+ struct drbd_transport *transport = &rdma_transport->transport;
+
+ /* put_page(), if we had more than one rx_desc per page,
+ * but see comments in dtr_create_rx_desc */
+ drbd_free_pages(transport, rx_desc->page);
+ }
+ kfree(rx_desc);
+}
+
+static int dtr_create_rx_desc(struct dtr_flow *flow, gfp_t gfp_mask, bool connected_only)
+{
+ struct dtr_path *path = flow->path;
+ struct drbd_transport *transport = path->path.transport;
+ struct dtr_transport *rdma_transport =
+ container_of(transport, struct dtr_transport, transport);
+ struct dtr_rx_desc *rx_desc;
+ struct page *page;
+ int err, alloc_size = rdma_transport->rx_allocation_size;
+ int nr_pages = alloc_size / PAGE_SIZE;
+ struct dtr_cm *cm;
+
+ rx_desc = kzalloc_obj(*rx_desc, gfp_mask);
+ if (!rx_desc)
+ return -ENOMEM;
+
+ /* As of now, this MUST NEVER return a highmem page!
+ * Which means no other user may ever have requested and then given
+ * back a highmem page!
+ */
+ page = drbd_alloc_pages(transport, nr_pages, gfp_mask);
+ if (!page) {
+ kfree(rx_desc);
+ return -ENOMEM;
+ }
+ BUG_ON(PageHighMem(page));
+
+ err = -ECONNRESET;
+ cm = dtr_path_get_cm(path);
+ if (!cm)
+ goto out;
+ if (connected_only && cm->state != DSM_CONNECTED)
+ goto out_put;
+
+ rx_desc->cm = cm;
+ rx_desc->page = page;
+ rx_desc->size = 0;
+ rx_desc->sge.lkey = dtr_cm_to_lkey(cm);
+ rx_desc->sge.addr = ib_dma_map_single(cm->id->device, page_address(page), alloc_size,
+ DMA_FROM_DEVICE);
+ err = ib_dma_mapping_error(cm->id->device, rx_desc->sge.addr);
+ if (err) {
+ tr_err(transport, "ib_dma_map_single() failed %d\n", err);
+ goto out_put;
+ }
+ rx_desc->sge.length = alloc_size;
+
+ atomic_inc(&flow->rx_descs_allocated);
+ atomic_inc(&flow->rx_descs_posted);
+ err = dtr_post_rx_desc(cm, rx_desc);
+ if (err) {
+ tr_err(transport, "dtr_post_rx_desc() returned %d\n", err);
+ atomic_dec(&flow->rx_descs_posted);
+ atomic_dec(&flow->rx_descs_allocated);
+ dtr_free_rx_desc(rx_desc);
+ }
+ return err;
+
+out_put:
+ kref_put(&cm->kref, dtr_destroy_cm);
+out:
+ kfree(rx_desc);
+ drbd_free_pages(transport, page);
+ return err;
+}
+
+static void dtr_refill_rx_descs_work_fn(struct work_struct *work)
+{
+ struct dtr_path *path = container_of(work, struct dtr_path, refill_rx_descs_work);
+ int i;
+
+ if (!dtr_path_ok(path))
+ return;
+
+ for (i = DATA_STREAM; i <= CONTROL_STREAM ; i++) {
+ struct dtr_flow *flow = &path->flow[i];
+
+ if (atomic_read(&flow->rx_descs_posted) < flow->rx_descs_want_posted / 2)
+ __dtr_refill_rx_desc(path, i);
+ dtr_flow_control(flow, GFP_NOIO);
+ }
+}
+
+static void __dtr_refill_rx_desc(struct dtr_path *path, enum drbd_stream stream)
+{
+ struct drbd_transport *transport = path->path.transport;
+ struct dtr_flow *flow = &path->flow[stream];
+ int descs_want_posted, descs_max;
+
+ descs_max = flow->rx_descs_max;
+ descs_want_posted = flow->rx_descs_want_posted;
+
+ while (atomic_read(&flow->rx_descs_posted) < descs_want_posted &&
+ atomic_read(&flow->rx_descs_allocated) < descs_max) {
+ int err;
+
+ err = dtr_create_rx_desc(flow, (GFP_NOIO & ~__GFP_RECLAIM) | __GFP_NOWARN, true);
+ /*
+ * drbd_alloc_pages() goes over the configured max_buffers, but throttles the
+ * caller with sleeping 100ms for each of those excess pages. By calling
+ * without __GFP_RECLAIM we request to get a -ENOMEM instead of sleeping.
+ * We simply stop refilling then.
+ */
+ if (err == -ENOMEM) {
+ break;
+ } else if (err) {
+ tr_err(transport, "dtr_create_rx_desc() = %d\n", err);
+ break;
+ }
+ }
+}
+
+static void dtr_refill_rx_desc(struct dtr_transport *rdma_transport,
+ enum drbd_stream stream)
+{
+ struct drbd_transport *transport = &rdma_transport->transport;
+ struct drbd_path *drbd_path;
+
+ for_each_path_ref(drbd_path, transport) {
+ struct dtr_path *path = container_of(drbd_path, struct dtr_path, path);
+
+ schedule_work(&path->refill_rx_descs_work);
+ }
+}
+
+static int dtr_repost_rx_desc(struct dtr_cm *cm, struct dtr_rx_desc *rx_desc)
+{
+ int err;
+
+ rx_desc->size = 0;
+ rx_desc->sge.lkey = dtr_cm_to_lkey(cm);
+ /* rx_desc->sge.addr = rx_desc->dma_addr;
+ rx_desc->sge.length = rx_desc->alloc_size; */
+
+ err = dtr_post_rx_desc(cm, rx_desc);
+ return err;
+}
+
+static void dtr_recycle_rx_desc(struct drbd_transport *transport,
+ enum drbd_stream stream,
+ struct dtr_rx_desc **pp_rx_desc,
+ gfp_t gfp_mask)
+{
+ struct dtr_rx_desc *rx_desc = *pp_rx_desc;
+ struct dtr_cm *cm;
+ struct dtr_path *path;
+ struct dtr_flow *flow;
+ int err;
+
+ if (!rx_desc)
+ return;
+
+ cm = rx_desc->cm;
+ path = cm->path;
+ flow = &path->flow[stream];
+
+ err = dtr_repost_rx_desc(cm, rx_desc);
+
+ if (err) {
+ dtr_free_rx_desc(rx_desc);
+ } else {
+ atomic_inc(&flow->rx_descs_posted);
+ dtr_flow_control(flow, gfp_mask);
+ }
+
+ *pp_rx_desc = NULL;
+}
+
+static int __dtr_post_tx_desc(struct dtr_cm *cm, struct dtr_tx_desc *tx_desc)
+{
+ struct dtr_transport *rdma_transport =
+ container_of(cm->path->path.transport, struct dtr_transport, transport);
+ struct drbd_transport *transport = &rdma_transport->transport;
+ struct ib_send_wr send_wr;
+ const struct ib_send_wr *send_wr_failed;
+ struct ib_device *device = cm->id->device;
+ unsigned long timeout;
+ struct net_conf *nc;
+ int i, err = -EIO;
+ bool was_active;
+
+ send_wr.next = NULL;
+ tx_desc->cqe.done = dtr_tx_cqe_done;
+ send_wr.wr_cqe = &tx_desc->cqe;
+ send_wr.sg_list = tx_desc->sge;
+ send_wr.num_sge = tx_desc->nr_sges;
+ send_wr.ex.imm_data = cpu_to_be32(tx_desc->imm.i);
+ send_wr.opcode = IB_WR_SEND_WITH_IMM;
+ send_wr.send_flags = IB_SEND_SIGNALED;
+
+ rcu_read_lock();
+ nc = rcu_dereference(transport->net_conf);
+ timeout = nc->ping_timeo;
+ rcu_read_unlock();
+
+ for (i = 0; i < tx_desc->nr_sges; i++)
+ ib_dma_sync_single_for_device(device, tx_desc->sge[i].addr,
+ tx_desc->sge[i].length, DMA_TO_DEVICE);
+
+ if (atomic_inc_return(&cm->tx_descs_posted) == 1)
+ kref_get(&cm->kref); /* keep one extra ref as long as one tx is posted */
+
+ kref_get(&cm->kref);
+ was_active = mod_timer(&cm->tx_timeout, jiffies + timeout * HZ / 20);
+ if (was_active)
+ kref_put(&cm->kref, dtr_destroy_cm);
+
+ err = ib_post_send(cm->id->qp, &send_wr, &send_wr_failed);
+ if (err) {
+ tr_err(&rdma_transport->transport, "ib_post_send() failed %d\n", err);
+ was_active = timer_delete(&cm->tx_timeout);
+ if (!was_active)
+ was_active = cancel_work_sync(&cm->tx_timeout_work);
+ if (was_active)
+ kref_put(&cm->kref, dtr_destroy_cm);
+ if (atomic_dec_and_test(&cm->tx_descs_posted))
+ kref_put(&cm->kref, dtr_destroy_cm);
+ }
+
+ return err;
+}
+
+static struct dtr_cm *dtr_select_and_get_cm_for_tx(struct dtr_transport *rdma_transport,
+ enum drbd_stream stream)
+{
+ struct drbd_transport *transport = &rdma_transport->transport;
+ struct dtr_path *path, *candidate = NULL;
+ unsigned long last_sent_jif = -1UL;
+ struct dtr_cm *cm;
+
+ /* Within in 16 jiffy use one path, in case we switch to an other one,
+ use that that was used longest ago */
+
+ rcu_read_lock();
+ list_for_each_entry_rcu(path, &transport->paths, path.list) {
+ struct dtr_flow *flow = &path->flow[stream];
+ unsigned long ls;
+
+ cm = rcu_dereference(path->cm);
+ if (!cm || cm->state != DSM_CONNECTED)
+ continue;
+
+ /* Normal packets are not allowed to consume all of the peer's rx_descs,
+ the last one is reserved for flow-control messages. */
+ if (atomic_read(&flow->tx_descs_posted) >= flow->tx_descs_max ||
+ atomic_read(&flow->peer_rx_descs) <= 1)
+ continue;
+
+ ls = cm->last_sent_jif;
+ if ((ls & ~0xfUL) == (jiffies & ~0xfUL) && kref_get_unless_zero(&cm->kref)) {
+ rcu_read_unlock();
+ return cm;
+ }
+ if (ls < last_sent_jif) {
+ last_sent_jif = ls;
+ candidate = path;
+ }
+ }
+
+ if (candidate) {
+ cm = __dtr_path_get_cm(candidate);
+ cm->last_sent_jif = jiffies;
+ } else {
+ cm = NULL;
+ }
+ rcu_read_unlock();
+
+ return cm;
+}
+
+static int dtr_remap_tx_desc(struct dtr_cm *old_cm, struct dtr_cm *cm,
+ struct dtr_tx_desc *tx_desc)
+{
+ struct ib_device *device = old_cm->id->device;
+ int i, nr_sges, err;
+ dma_addr_t a = 0;
+
+ switch (tx_desc->type) {
+ case SEND_PAGE:
+ ib_dma_unmap_page(device, tx_desc->sge[0].addr, tx_desc->sge[0].length, DMA_TO_DEVICE);
+ break;
+ case SEND_MSG:
+ ib_dma_unmap_single(device, tx_desc->sge[0].addr, tx_desc->sge[0].length, DMA_TO_DEVICE);
+ break;
+ case SEND_BIO:
+ nr_sges = tx_desc->nr_sges;
+ for (i = 0; i < nr_sges; i++)
+ ib_dma_unmap_page(device, tx_desc->sge[i].addr, tx_desc->sge[i].length,
+ DMA_TO_DEVICE);
+ break;
+ }
+
+ device = cm->id->device;
+ switch (tx_desc->type) {
+ case SEND_PAGE:
+ a = ib_dma_map_page(device, tx_desc->page, tx_desc->sge[0].addr & ~PAGE_MASK,
+ tx_desc->sge[0].length, DMA_TO_DEVICE);
+ break;
+ case SEND_MSG:
+ a = ib_dma_map_single(device, tx_desc->data, tx_desc->sge[0].length, DMA_TO_DEVICE);
+ break;
+ case SEND_BIO:
+#if SENDER_COMPACTS_BVECS
+ #error implement me
+#endif
+ break;
+ }
+ err = ib_dma_mapping_error(device, a);
+
+ tx_desc->sge[0].addr = a;
+ tx_desc->sge[0].lkey = dtr_cm_to_lkey(cm);
+
+ return err;
+}
+
+
+static int dtr_repost_tx_desc(struct dtr_cm *old_cm, struct dtr_tx_desc *tx_desc)
+{
+ struct dtr_transport *rdma_transport =
+ container_of(old_cm->path->path.transport, struct dtr_transport, transport);
+ enum drbd_stream stream = tx_desc->imm.stream;
+ struct dtr_cm *cm;
+ struct dtr_flow *flow;
+ int err;
+
+ do {
+ cm = dtr_select_and_get_cm_for_tx(rdma_transport, stream);
+ if (!cm)
+ return -ECONNRESET;
+
+ err = dtr_remap_tx_desc(old_cm, cm, tx_desc);
+ if (err) {
+ tr_err(&rdma_transport->transport, "dtr_remap_tx_desc failed: %d\n", err);
+ kref_put(&cm->kref, dtr_destroy_cm);
+ continue;
+ }
+
+ flow = &cm->path->flow[stream];
+ if (atomic_dec_if_positive(&flow->peer_rx_descs) < 0) {
+ kref_put(&cm->kref, dtr_destroy_cm);
+ continue;
+ }
+ if (!atomic_inc_if_below(&flow->tx_descs_posted, flow->tx_descs_max)) {
+ atomic_inc(&flow->peer_rx_descs);
+ kref_put(&cm->kref, dtr_destroy_cm);
+ continue;
+ }
+
+ err = __dtr_post_tx_desc(cm, tx_desc);
+ if (err) {
+ atomic_inc(&flow->peer_rx_descs);
+ atomic_dec(&flow->tx_descs_posted);
+ }
+ kref_put(&cm->kref, dtr_destroy_cm);
+ } while (err);
+
+ return err;
+}
+
+static int dtr_post_tx_desc(struct dtr_transport *rdma_transport,
+ struct dtr_tx_desc *tx_desc)
+{
+ enum drbd_stream stream = tx_desc->imm.stream;
+ struct dtr_stream *rdma_stream = &rdma_transport->stream[stream];
+ struct ib_device *device;
+ struct dtr_flow *flow;
+ struct dtr_cm *cm;
+ int offset, err;
+ long t;
+
+retry:
+ t = wait_event_interruptible_timeout(rdma_stream->send_wq,
+ (cm = dtr_select_and_get_cm_for_tx(rdma_transport, stream)),
+ rdma_stream->send_timeout);
+
+ if (t == 0) {
+ struct dtr_transport *rdma_transport = rdma_stream->rdma_transport;
+
+ if (drbd_stream_send_timed_out(&rdma_transport->transport, stream))
+ return -EAGAIN;
+ goto retry;
+ } else if (t < 0)
+ return -EINTR;
+
+ flow = &cm->path->flow[stream];
+ if (atomic_dec_if_positive(&flow->peer_rx_descs) < 0) {
+ kref_put(&cm->kref, dtr_destroy_cm);
+ goto retry;
+ }
+ if (!atomic_inc_if_below(&flow->tx_descs_posted, flow->tx_descs_max)) {
+ atomic_inc(&flow->peer_rx_descs);
+ kref_put(&cm->kref, dtr_destroy_cm);
+ goto retry;
+ }
+
+ device = cm->id->device;
+ switch (tx_desc->type) {
+ case SEND_PAGE:
+ offset = tx_desc->sge[0].lkey;
+ tx_desc->sge[0].addr = ib_dma_map_page(device, tx_desc->page, offset,
+ tx_desc->sge[0].length, DMA_TO_DEVICE);
+ err = ib_dma_mapping_error(device, tx_desc->sge[0].addr);
+ if (err) {
+ atomic_inc(&flow->peer_rx_descs);
+ atomic_dec(&flow->tx_descs_posted);
+ goto out;
+ }
+
+ tx_desc->sge[0].lkey = dtr_cm_to_lkey(cm);
+ break;
+ case SEND_MSG:
+ case SEND_BIO:
+ BUG();
+ }
+
+ err = __dtr_post_tx_desc(cm, tx_desc);
+ if (err) {
+ atomic_inc(&flow->peer_rx_descs);
+ atomic_dec(&flow->tx_descs_posted);
+ ib_dma_unmap_page(device, tx_desc->sge[0].addr, tx_desc->sge[0].length, DMA_TO_DEVICE);
+ }
+
+
+out:
+ // pr_info("%s: Created send_wr (%p, %p): nr_sges=%u, first seg: lkey=%x, addr=%llx, length=%d\n", rdma_stream->name, tx_desc->page, tx_desc, tx_desc->nr_sges, tx_desc->sge[0].lkey, tx_desc->sge[0].addr, tx_desc->sge[0].length);
+ kref_put(&cm->kref, dtr_destroy_cm);
+ return err;
+}
+
+static int dtr_init_flow(struct dtr_path *path, enum drbd_stream stream)
+{
+ struct drbd_transport *transport = path->path.transport;
+ struct dtr_transport *rdma_transport =
+ container_of(transport, struct dtr_transport, transport);
+ unsigned int alloc_size = rdma_transport->rx_allocation_size;
+ unsigned int rcvbuf_size = RDMA_DEF_BUFFER_SIZE;
+ unsigned int sndbuf_size = RDMA_DEF_BUFFER_SIZE;
+ struct dtr_flow *flow = &path->flow[stream];
+ struct net_conf *nc;
+ int err = 0;
+
+ rcu_read_lock();
+ nc = rcu_dereference(transport->net_conf);
+ if (!nc) {
+ rcu_read_unlock();
+ tr_err(transport, "need net_conf\n");
+ err = -EINVAL;
+ goto out;
+ }
+
+ if (nc->rcvbuf_size)
+ rcvbuf_size = nc->rcvbuf_size;
+ if (nc->sndbuf_size)
+ sndbuf_size = nc->sndbuf_size;
+
+ if (stream == CONTROL_STREAM) {
+ rcvbuf_size = nc->rdma_ctrl_rcvbuf_size ?: max(rcvbuf_size / 64, alloc_size * 8);
+ sndbuf_size = nc->rdma_ctrl_sndbuf_size ?: max(sndbuf_size / 64, alloc_size * 8);
+ }
+
+ if (rcvbuf_size / DRBD_SOCKET_BUFFER_SIZE > nc->max_buffers) {
+ tr_err(transport, "Set max-buffers at least to %d, (right now it is %d).\n",
+ rcvbuf_size / DRBD_SOCKET_BUFFER_SIZE, nc->max_buffers);
+ tr_err(transport, "This is due to rcvbuf-size = %d.\n", rcvbuf_size);
+ rcu_read_unlock();
+ err = -EINVAL;
+ goto out;
+ }
+
+ rcu_read_unlock();
+
+ flow->path = path;
+ flow->tx_descs_max = sndbuf_size / DRBD_SOCKET_BUFFER_SIZE;
+ flow->rx_descs_max = rcvbuf_size / DRBD_SOCKET_BUFFER_SIZE;
+
+ atomic_set(&flow->tx_descs_posted, 0);
+ atomic_set(&flow->peer_rx_descs, stream == CONTROL_STREAM ? 1 : 0);
+ atomic_set(&flow->rx_descs_known_to_peer, stream == CONTROL_STREAM ? 1 : 0);
+
+ atomic_set(&flow->rx_descs_posted, 0);
+ atomic_set(&flow->rx_descs_allocated, 0);
+
+ flow->rx_descs_want_posted = flow->rx_descs_max / 2;
+
+ out:
+ return err;
+}
+
+static int _dtr_cm_alloc_rdma_res(struct dtr_cm *cm,
+ enum dtr_alloc_rdma_res_causes *cause)
+{
+ int err, i, rx_descs_max = 0, tx_descs_max = 0;
+ struct dtr_path *path = cm->path;
+
+ /* Each path might be the sole path, therefore it must be able to
+ support both streams */
+ for (i = DATA_STREAM; i <= CONTROL_STREAM ; i++) {
+ rx_descs_max += path->flow[i].rx_descs_max;
+ tx_descs_max += path->flow[i].tx_descs_max;
+ }
+
+ /* alloc protection domain (PD) */
+ /* in 4.9 ib_alloc_pd got the ability to specify flags as second param */
+ /* so far we don't use flags, but if we start using them, we have to be
+ * aware that the compat layer removes this parameter for old kernels */
+ cm->pd = ib_alloc_pd(cm->id->device, 0);
+ if (IS_ERR(cm->pd)) {
+ *cause = IB_ALLOC_PD;
+ err = PTR_ERR(cm->pd);
+ goto pd_failed;
+ }
+
+ /* allocate recv completion queue (CQ) */
+ cm->recv_cq = ib_alloc_cq_any(cm->id->device, cm, rx_descs_max, IB_POLL_SOFTIRQ);
+ if (IS_ERR(cm->recv_cq)) {
+ *cause = IB_ALLOC_CQ_RX;
+ err = PTR_ERR(cm->recv_cq);
+ goto recv_cq_failed;
+ }
+
+ /* allocate send completion queue (CQ) */
+ cm->send_cq = ib_alloc_cq_any(cm->id->device, cm, tx_descs_max, IB_POLL_SOFTIRQ);
+ if (IS_ERR(cm->send_cq)) {
+ *cause = IB_ALLOC_CQ_TX;
+ err = PTR_ERR(cm->send_cq);
+ goto send_cq_failed;
+ }
+
+ /* create a queue pair (QP) */
+ err = dtr_create_qp(cm, rx_descs_max, tx_descs_max);
+ if (err) {
+ *cause = RDMA_CREATE_QP;
+ goto createqp_failed;
+ }
+
+ /* some RDMA transports need at least one rx desc for establishing a connection */
+ for (i = DATA_STREAM; i <= CONTROL_STREAM ; i++)
+ dtr_create_rx_desc(&path->flow[i], GFP_NOIO, false);
+
+ return 0;
+
+createqp_failed:
+ ib_free_cq(cm->send_cq);
+ cm->send_cq = NULL;
+send_cq_failed:
+ ib_free_cq(cm->recv_cq);
+ cm->recv_cq = NULL;
+recv_cq_failed:
+ ib_dealloc_pd(cm->pd);
+ cm->pd = NULL;
+pd_failed:
+ return err;
+}
+
+
+static int dtr_cm_alloc_rdma_res(struct dtr_cm *cm)
+{
+ struct dtr_path *path = cm->path;
+ struct drbd_transport *transport = path->path.transport;
+ struct dtr_transport *rdma_transport =
+ container_of(transport, struct dtr_transport, transport);
+ enum dtr_alloc_rdma_res_causes cause;
+ struct ib_device_attr dev_attr;
+ struct ib_udata uhw = {.outlen = 0, .inlen = 0};
+ struct ib_device *device = cm->id->device;
+ int rx_descs_max = 0, tx_descs_max = 0;
+ bool reduced = false;
+ int i, hca_max, err, dev_sge;
+
+ static const char * const err_txt[] = {
+ [IB_ALLOC_PD] = "ib_alloc_pd()",
+ [IB_ALLOC_CQ_RX] = "ib_alloc_cq_any() rx",
+ [IB_ALLOC_CQ_TX] = "ib_alloc_cq_any() tx",
+ [RDMA_CREATE_QP] = "rdma_create_qp()",
+ [IB_GET_DMA_MR] = "ib_get_dma_mr()",
+ };
+
+ err = device->ops.query_device(device, &dev_attr, &uhw);
+ if (err) {
+ tr_err(transport, "ib_query_device: %d\n", err);
+ return err;
+ }
+
+ dev_sge = min(dev_attr.max_send_sge, dev_attr.max_recv_sge);
+ if (rdma_transport->sges_max > dev_sge)
+ rdma_transport->sges_max = dev_sge;
+
+ hca_max = min(dev_attr.max_qp_wr, dev_attr.max_cqe);
+
+ for (i = DATA_STREAM; i <= CONTROL_STREAM ; i++) {
+ rx_descs_max += path->flow[i].rx_descs_max;
+ tx_descs_max += path->flow[i].tx_descs_max;
+ }
+
+ if (tx_descs_max > hca_max || rx_descs_max > hca_max) {
+ int rx_correction = 0, tx_correction = 0;
+ reduced = true;
+
+ if (tx_descs_max > hca_max)
+ tx_correction = hca_max - tx_descs_max;
+
+ if (rx_descs_max > hca_max)
+ rx_correction = hca_max - rx_descs_max;
+
+ path->flow[DATA_STREAM].rx_descs_max -= rx_correction;
+ path->flow[DATA_STREAM].tx_descs_max -= tx_correction;
+
+ rx_descs_max -= rx_correction;
+ tx_descs_max -= tx_correction;
+ }
+
+ for (;;) {
+ err = _dtr_cm_alloc_rdma_res(cm, &cause);
+
+ if (err == 0 || cause != RDMA_CREATE_QP || err != -ENOMEM)
+ break;
+
+ reduced = true;
+ if (path->flow[DATA_STREAM].rx_descs_max <= 64)
+ break;
+ path->flow[DATA_STREAM].rx_descs_max -= 64;
+ if (path->flow[DATA_STREAM].tx_descs_max <= 64)
+ break;
+ path->flow[DATA_STREAM].tx_descs_max -= 64;
+ if (path->flow[CONTROL_STREAM].rx_descs_max > 8)
+ path->flow[CONTROL_STREAM].rx_descs_max -= 1;
+ if (path->flow[CONTROL_STREAM].tx_descs_max > 8)
+ path->flow[CONTROL_STREAM].tx_descs_max -= 1;
+ }
+
+ if (err) {
+ tr_err(transport, "%s failed with err = %d\n", err_txt[cause], err);
+ } else if (reduced) {
+ /* ib_create_qp() may return -ENOMEM if max_send_wr or max_recv_wr are
+ too big. Unfortunately there is no way to find the working maxima.
+ http://www.rdmamojo.com/2012/12/21/ibv_create_qp/
+ Suggests "Trial end error" to find the maximal number. */
+
+ tr_warn(transport, "Needed to adjust buffer sizes for HCA\n");
+ tr_warn(transport, "rcvbuf = %d sndbuf = %d \n",
+ path->flow[DATA_STREAM].rx_descs_max * DRBD_SOCKET_BUFFER_SIZE,
+ path->flow[DATA_STREAM].tx_descs_max * DRBD_SOCKET_BUFFER_SIZE);
+ tr_warn(transport, "It is recommended to apply this change to the configuration\n");
+ }
+
+ return err;
+}
+
+static void dtr_end_rx_work_fn(struct work_struct *work)
+{
+ struct dtr_cm *cm = container_of(work, struct dtr_cm, end_rx_work);
+ struct dtr_rx_desc *rx_desc, *tmp;
+ unsigned long irq_flags;
+ LIST_HEAD(rx_descs);
+
+ spin_lock_irqsave(&cm->error_rx_descs_lock, irq_flags);
+ list_splice_init(&cm->error_rx_descs, &rx_descs);
+ spin_unlock_irqrestore(&cm->error_rx_descs_lock, irq_flags);
+ list_for_each_entry_safe(rx_desc, tmp, &rx_descs, list)
+ dtr_free_rx_desc(rx_desc);
+ kref_put(&cm->kref, dtr_destroy_cm);
+}
+
+static void dtr_end_tx_work_fn(struct work_struct *work)
+{
+ struct dtr_cm *cm = container_of(work, struct dtr_cm, end_tx_work);
+
+ kref_put(&cm->kref, dtr_destroy_cm);
+}
+
+static void __dtr_disconnect_path(struct dtr_path *path)
+{
+ struct ib_qp_attr attr = { .qp_state = IB_QPS_ERR };
+ struct drbd_transport *transport;
+ enum connect_state_enum a, p;
+ bool was_scheduled;
+ struct dtr_cm *cm;
+ long t;
+ int err;
+
+ if (!path)
+ return;
+
+ transport = path->path.transport;
+
+ a = atomic_cmpxchg(&path->cs.active_state, PCS_CONNECTING, PCS_REQUEST_ABORT);
+ p = atomic_cmpxchg(&path->cs.passive_state, PCS_CONNECTING, PCS_INACTIVE);
+
+ switch (p) {
+ case PCS_CONNECTING:
+ drbd_put_listener(&path->path);
+ break;
+ case PCS_FINISHING:
+ t = wait_event_timeout(path->cs.wq,
+ atomic_read(&path->cs.passive_state) == PCS_INACTIVE,
+ HZ * 60);
+ if (t == 0)
+ tr_warn(transport, "passive_state still %d\n", atomic_read(&path->cs.passive_state));
+ fallthrough;
+ case PCS_INACTIVE:
+ break;
+ }
+
+ switch (a) {
+ case PCS_CONNECTING:
+ was_scheduled = flush_delayed_work(&path->cs.retry_connect_work);
+ if (!was_scheduled) {
+ atomic_set(&path->cs.active_state, PCS_INACTIVE);
+ break;
+ }
+ fallthrough;
+ case PCS_REQUEST_ABORT:
+ t = wait_event_timeout(path->cs.wq,
+ atomic_read(&path->cs.active_state) == PCS_INACTIVE,
+ HZ * 60);
+ if (t == 0)
+ tr_warn(transport, "active_state still %d\n", atomic_read(&path->cs.active_state));
+ fallthrough;
+ case PCS_INACTIVE:
+ break;
+ }
+
+ cm = dtr_path_get_cm(path);
+ if (!cm)
+ return;
+
+ err = rdma_disconnect(cm->id);
+ if (err) {
+ tr_warn(transport, "failed to disconnect, id %p context %p err %d\n",
+ cm->id, cm->id->context, err);
+ /* We are ignoring errors here on purpose */
+ goto out;
+ }
+
+ /* There might be a signal pending here. Not incorruptible! */
+ wait_event_timeout(cm->state_wq,
+ !test_bit(DSB_CONNECTED, &cm->state),
+ HZ);
+
+ if (test_bit(DSB_CONNECTED, &cm->state))
+ tr_warn(transport, "WARN: not properly disconnected, state = %lu\n",
+ cm->state);
+
+ out:
+ /* between dtr_alloc_cm() and dtr_cm_alloc_rdma_res() cm->id->qp is NULL */
+ if (cm->id->qp) {
+ /* With putting the QP into error state, it has to hand back
+ all posted rx_descs */
+ err = ib_modify_qp(cm->id->qp, &attr, IB_QP_STATE);
+ if (err)
+ tr_err(transport, "ib_modify_qp failed %d\n", err);
+ }
+
+ /*
+ * We are expecting one of RDMA_CM_EVENT_ESTABLISHED, _UNREACHABLE,
+ * _CONNECT_ERROR, or _REJECTED on this cm. Some RDMA drivers report
+ * these error events after unexpectedly long timeouts, while others do
+ * not report it at all. We are no longer interested in these
+ * events. Destroy the cm and cm_id to avoid leaking it.
+ * This is racing with the event delivery, which drops a reference.
+ */
+ if (test_and_clear_bit(DSB_CONNECTING, &cm->state) ||
+ test_and_clear_bit(DSB_CONNECT_REQ, &cm->state))
+ kref_put(&cm->kref, dtr_destroy_cm);
+
+ kref_put(&cm->kref, dtr_destroy_cm);
+}
+
+static void dtr_reclaim_cm(struct rcu_head *rcu_head)
+{
+ struct dtr_cm *cm = container_of(rcu_head, struct dtr_cm, rcu);
+
+ kfree(cm);
+ module_put(THIS_MODULE);
+}
+
+/* dtr_destroy_cm() might run after the transport was destroyed */
+static void __dtr_destroy_cm(struct kref *kref, bool destroy_id)
+{
+ struct dtr_cm *cm = container_of(kref, struct dtr_cm, kref);
+
+ if (cm->id) {
+ if (cm->id->qp)
+ rdma_destroy_qp(cm->id);
+ cm->id->qp = NULL;
+ }
+
+ if (cm->send_cq) {
+ ib_free_cq(cm->send_cq);
+ cm->send_cq = NULL;
+ }
+
+ if (cm->recv_cq) {
+ ib_free_cq(cm->recv_cq);
+ cm->recv_cq = NULL;
+ }
+
+ if (cm->pd) {
+ ib_dealloc_pd(cm->pd);
+ cm->pd = NULL;
+ }
+
+ if (cm->id) {
+ /* Just in case some callback is still triggered
+ * after we kfree'd path. */
+ cm->id->context = NULL;
+ if (destroy_id)
+ rdma_destroy_id(cm->id);
+ cm->id = NULL;
+ }
+ if (cm->path) {
+ kref_put(&cm->path->path.kref, drbd_destroy_path);
+ cm->path = NULL;
+ }
+
+ call_rcu(&cm->rcu, dtr_reclaim_cm);
+}
+
+static void dtr_destroy_cm(struct kref *kref)
+{
+ __dtr_destroy_cm(kref, true);
+}
+
+static void dtr_destroy_cm_keep_id(struct kref *kref)
+{
+ __dtr_destroy_cm(kref, false);
+}
+
+static void dtr_disconnect_path(struct dtr_path *path)
+{
+ struct dtr_cm *cm;
+
+ if (!path)
+ return;
+
+ __dtr_disconnect_path(path);
+ cancel_work_sync(&path->refill_rx_descs_work);
+
+ cm = xchg(&path->cm, NULL); // RCU xchg
+ if (cm)
+ kref_put(&cm->kref, dtr_destroy_cm);
+}
+
+static void dtr_destroy_listener(struct drbd_listener *generic_listener)
+{
+ struct dtr_listener *listener =
+ container_of(generic_listener, struct dtr_listener, listener);
+
+ if (listener->cm.id)
+ rdma_destroy_id(listener->cm.id);
+}
+
+static int dtr_init_listener(struct drbd_transport *transport, const struct sockaddr *addr, struct net *net, struct drbd_listener *drbd_listener)
+{
+ struct dtr_listener *listener = container_of(drbd_listener, struct dtr_listener, listener);
+ struct sockaddr_storage my_addr;
+ int err = -ENOMEM;
+
+ my_addr = *(struct sockaddr_storage *)addr;
+
+ err = dtr_create_cm_id(&listener->cm, net);
+ if (err) {
+ tr_err(transport, "rdma_create_id() failed\n");
+ goto out;
+ }
+ listener->cm.state = 0; /* listening */
+
+ err = rdma_bind_addr(listener->cm.id, (struct sockaddr *)&my_addr);
+ if (err) {
+ tr_err(transport, "rdma_bind_addr error %d\n", err);
+ goto out;
+ }
+
+ err = rdma_listen(listener->cm.id, 1);
+ if (err) {
+ tr_err(transport, "rdma_listen error %d\n", err);
+ goto out;
+ }
+
+ listener->listener.listen_addr = *(struct sockaddr_storage *)addr;
+
+ return 0;
+out:
+ if (listener->cm.id) {
+ rdma_destroy_id(listener->cm.id);
+ listener->cm.id = NULL;
+ }
+
+ return err;
+}
+
+static int dtr_activate_path(struct dtr_path *path)
+{
+ struct drbd_transport *transport = path->path.transport;
+ struct dtr_connect_state *cs;
+ int err = -ENOMEM;
+
+ cs = &path->cs;
+
+ init_waitqueue_head(&cs->wq);
+
+ atomic_set(&cs->passive_state, PCS_CONNECTING);
+ atomic_set(&cs->active_state, PCS_CONNECTING);
+
+ if (path->path.listener) {
+ tr_warn(transport, "ASSERTION FAILED: in dtr_activate_path() found listener, dropping it\n");
+ drbd_put_listener(&path->path);
+ }
+ err = drbd_get_listener(&path->path);
+ if (err)
+ goto out_no_put;
+
+ /*
+ * Check passive_state after drbd_get_listener() completed.
+ * __dtr_disconnect_path() sets passive_state before calling
+ * drbd_put_listener(). That drbd_put_listner() might return
+ * before the drbd_get_listner() here started.
+ */
+ if (atomic_read(&cs->passive_state) != PCS_CONNECTING ||
+ atomic_read(&cs->active_state) != PCS_CONNECTING)
+ goto out;
+
+ err = dtr_start_try_connect(cs);
+ if (err)
+ goto out;
+
+ return 0;
+
+out:
+ drbd_put_listener(&path->path);
+out_no_put:
+ atomic_set(&cs->passive_state, PCS_INACTIVE);
+ atomic_set(&cs->active_state, PCS_INACTIVE);
+ wake_up(&cs->wq);
+
+ return err;
+}
+
+static int dtr_prepare_connect(struct drbd_transport *transport)
+{
+ struct dtr_transport *rdma_transport =
+ container_of(transport, struct dtr_transport, transport);
+
+ struct dtr_stream *data_stream = NULL, *control_stream = NULL;
+ struct dtr_path *path;
+ struct net_conf *nc;
+ int timeout, err = -ENOMEM;
+
+ flush_signals(current);
+
+ if (!list_first_or_null_rcu(&transport->paths, struct drbd_path, list))
+ return -EDESTADDRREQ;
+
+ data_stream = &rdma_transport->stream[DATA_STREAM];
+ dtr_re_init_stream(data_stream);
+
+ control_stream = &rdma_transport->stream[CONTROL_STREAM];
+ dtr_re_init_stream(control_stream);
+
+ rcu_read_lock();
+ nc = rcu_dereference(transport->net_conf);
+
+ timeout = nc->timeout * HZ / 10;
+ rcu_read_unlock();
+
+ data_stream->send_timeout = timeout;
+ control_stream->send_timeout = timeout;
+
+ atomic_set(&rdma_transport->first_path_connect_err, 1);
+ init_completion(&rdma_transport->connected);
+
+ rdma_transport->active = true;
+
+ list_for_each_entry(path, &transport->paths, path.list) {
+ err = dtr_activate_path(path);
+ if (err)
+ goto abort;
+ }
+
+ return 0;
+
+abort:
+ rdma_transport->active = false;
+ return err;
+}
+
+static int dtr_connect(struct drbd_transport *transport)
+{
+ struct dtr_transport *rdma_transport =
+ container_of(transport, struct dtr_transport, transport);
+ int i, err = -ENOMEM;
+
+ err = wait_for_completion_interruptible(&rdma_transport->connected);
+ if (err) {
+ flush_signals(current);
+ goto abort;
+ }
+
+ err = atomic_read(&rdma_transport->first_path_connect_err);
+ if (err == 1)
+ err = -EAGAIN;
+ if (err)
+ goto abort;
+
+
+ /* Make sure at least one path has rx_descs... */
+ for (i = DATA_STREAM; i <= CONTROL_STREAM ; i++)
+ dtr_refill_rx_desc(rdma_transport, i);
+
+ /* make sure the other side had time to create rx_descs */
+ schedule_timeout(HZ / 4);
+
+ return 0;
+
+abort:
+ rdma_transport->active = false;
+
+ return err;
+}
+
+static void dtr_finish_connect(struct drbd_transport *transport)
+{
+ struct dtr_transport *rdma_transport =
+ container_of(transport, struct dtr_transport, transport);
+
+ if (!rdma_transport->active) {
+ struct dtr_path *path;
+
+ list_for_each_entry(path, &transport->paths, path.list)
+ dtr_disconnect_path(path);
+ }
+}
+
+static int dtr_net_conf_change(struct drbd_transport *transport, struct net_conf *new_net_conf)
+{
+ struct net_conf *old_net_conf;
+ struct dtr_transport *dtr_transport = container_of(transport,
+ struct dtr_transport, transport);
+ int ret = 0;
+
+ rcu_read_lock();
+ old_net_conf = rcu_dereference(transport->net_conf);
+ if (old_net_conf && dtr_transport->active) {
+ if (old_net_conf->sndbuf_size != new_net_conf->sndbuf_size) {
+ tr_warn(transport, "online change of sndbuf_size not supported\n");
+ ret = -EINVAL;
+ }
+ if (old_net_conf->rcvbuf_size != new_net_conf->rcvbuf_size) {
+ tr_warn(transport, "online change of rcvbuf_size not supported\n");
+ ret = -EINVAL;
+ }
+ }
+ rcu_read_unlock();
+
+ return ret;
+}
+
+static void dtr_set_rcvtimeo(struct drbd_transport *transport, enum drbd_stream stream, long timeout)
+{
+ struct dtr_transport *rdma_transport =
+ container_of(transport, struct dtr_transport, transport);
+
+ rdma_transport->stream[stream].recv_timeout = timeout;
+
+ if (stream == CONTROL_STREAM)
+ mod_timer(&rdma_transport->control_timer, jiffies + timeout);
+}
+
+static long dtr_get_rcvtimeo(struct drbd_transport *transport, enum drbd_stream stream)
+{
+ struct dtr_transport *rdma_transport =
+ container_of(transport, struct dtr_transport, transport);
+
+ return rdma_transport->stream[stream].recv_timeout;
+}
+
+static bool dtr_path_ok(struct dtr_path *path)
+{
+ bool r = false;
+ struct dtr_cm *cm = path->cm;
+
+ rcu_read_lock();
+ cm = rcu_dereference(path->cm);
+ if (cm) {
+ r = cm->id && cm->state == DSM_CONNECTED;
+ }
+ rcu_read_unlock();
+
+ return r;
+}
+
+static bool dtr_transport_ok(struct drbd_transport *transport)
+{
+ struct dtr_path *path;
+ bool r = false;
+
+ rcu_read_lock();
+ list_for_each_entry_rcu(path, &transport->paths, path.list) {
+ r = dtr_path_ok(path);
+ if (r)
+ break;
+ }
+ rcu_read_unlock();
+
+ return r;
+}
+
+static bool dtr_stream_ok(struct drbd_transport *transport, enum drbd_stream stream)
+{
+ return dtr_transport_ok(transport);
+}
+
+static void dtr_update_congested(struct drbd_transport *transport)
+{
+ struct dtr_path *path;
+ bool congested = true;
+
+ rcu_read_lock();
+ list_for_each_entry_rcu(path, &transport->paths, path.list) {
+ struct dtr_flow *flow = &path->flow[DATA_STREAM];
+ bool path_congested = false;
+ int tx_descs_posted;
+
+ if (!dtr_path_ok(path))
+ continue;
+
+ tx_descs_posted = atomic_read(&flow->tx_descs_posted);
+ path_congested |= flow->tx_descs_max - tx_descs_posted < DESCS_LOW_LEVEL;
+ path_congested |= atomic_read(&flow->peer_rx_descs) < DESCS_LOW_LEVEL;
+
+ if (!path_congested) {
+ congested = false;
+ break;
+ }
+ }
+ rcu_read_unlock();
+
+ if (congested)
+ set_bit(NET_CONGESTED, &transport->flags);
+}
+
+static int dtr_send_page(struct drbd_transport *transport, enum drbd_stream stream,
+ struct page *page, int offset, size_t size, unsigned msg_flags)
+{
+ struct dtr_transport *rdma_transport =
+ container_of(transport, struct dtr_transport, transport);
+ struct dtr_tx_desc *tx_desc;
+ int err;
+
+ // pr_info("%s: in send_page, size: %zu\n", rdma_stream->name, size);
+
+ if (!dtr_transport_ok(transport))
+ return -ECONNRESET;
+
+ tx_desc = kmalloc(sizeof(*tx_desc) + sizeof(struct ib_sge), GFP_NOIO);
+ if (!tx_desc)
+ return -ENOMEM;
+
+ if (msg_flags & MSG_SPLICE_PAGES) {
+ page = caller_page;
+ get_page(page); /* The put_page() is in dtr_tx_cqe_done() */
+ } else {
+ void *from;
+
+ page = drbd_alloc_pages(transport, GFP_NOIO, PAGE_SIZE);
+ from = kmap_local_page(caller_page);
+ memcpy(page_address(page), from + offset, size);
+ kunmap_local(from);
+ offset = 0;
+ }
+
+ tx_desc->type = SEND_PAGE;
+ tx_desc->page = page;
+ tx_desc->nr_sges = 1;
+ tx_desc->imm = (union dtr_immediate)
+ { .stream = stream,
+ .sequence = rdma_transport->stream[stream].tx_sequence++
+ };
+ tx_desc->sge[0].length = size;
+ tx_desc->sge[0].lkey = offset; /* abusing lkey fild. See dtr_post_tx_desc() */
+
+ err = dtr_post_tx_desc(rdma_transport, tx_desc);
+ if (err) {
+ put_page(page);
+ kfree(tx_desc);
+
+ tr_err(transport, "dtr_post_tx_desc() failed %d\n", err);
+ drbd_control_event(transport, CLOSED_BY_PEER);
+ }
+
+ if (stream == DATA_STREAM)
+ dtr_update_congested(transport);
+
+ return err;
+}
+
+#if SENDER_COMPACTS_BVECS
+static int dtr_send_bio_part(struct dtr_transport *rdma_transport,
+ struct bio *bio, int start, int size_tx_desc, int sges)
+{
+ struct dtr_stream *rdma_stream = &rdma_transport->stream[DATA_STREAM];
+ struct dtr_tx_desc *tx_desc;
+ struct ib_device *device;
+ struct dtr_path *path = NULL;
+ struct bio_vec bvec;
+ struct bvec_iter iter;
+ int i = 0, pos = 0, done = 0, err;
+
+ if (!size_tx_desc)
+ return 0;
+
+ //tr_info(&rdma_transport->transport,
+ // " dtr_send_bio_part(start = %d, size = %d, sges = %d)\n",
+ // start, size_tx_desc, sges);
+
+ tx_desc = kmalloc(sizeof(*tx_desc) + sizeof(struct ib_sge) * sges, GFP_NOIO);
+ if (!tx_desc)
+ return -ENOMEM;
+
+ tx_desc->type = SEND_BIO;
+ tx_desc->bio = bio;
+ tx_desc->nr_sges = sges;
+ device = rdma_stream->cm.id->device;
+
+ bio_for_each_segment(bvec, tx_desc->bio, iter) {
+ struct page *page = bvec.bv_page;
+ int offset = bvec.bv_offset;
+ int size = bvec.bv_len;
+ int shift = 0;
+ get_page(page);
+
+ if (pos < start || done == size_tx_desc) {
+ if (done != size_tx_desc && pos + size > start) {
+ shift = (start - pos);
+ } else {
+ pos += size;
+ continue;
+ }
+ }
+
+ pos += size;
+ offset += shift;
+ size = min(size - shift, size_tx_desc - done);
+
+ //tr_info(&rdma_transport->transport,
+ // " sge (i = %d, offset = %d, size = %d)\n",
+ // i, offset, size);
+
+ tx_desc->sge[i].addr = ib_dma_map_page(device, page, offset, size, DMA_TO_DEVICE);
+ err = ib_dma_mapping_error(device, tx_desc->sge[i].addr);
+ if (err)
+ return err; // FIX THIS
+ tx_desc->sge[i].lkey = dtr_path_to_lkey(path);
+ tx_desc->sge[i].length = size;
+ done += size;
+ i++;
+ }
+
+ TR_ASSERT(&rdma_transport->transport, done == size_tx_desc);
+ tx_desc->imm = (union dtr_immediate)
+ { .stream = ST_DATA,
+ .sequence = rdma_transport->stream[ST_DATA].tx_sequence++
+ };
+
+ err = dtr_post_tx_desc(rdma_stream, tx_desc, &path);
+ if (err) {
+ if (path) {
+ dtr_free_tx_desc(path, tx_desc);
+ } else {
+ bio_for_each_segment(bvec, tx_desc->bio, iter) {
+ put_page(bvec.bv_page);
+ }
+ kfree(tx_desc);
+ }
+
+ tr_err(transport, "dtr_post_tx_desc() failed %d\n", err);
+ drbd_control_event(transport, CLOSED_BY_PEER);
+ }
+
+ return err;
+}
+#endif
+
+static int dtr_send_zc_bio(struct drbd_transport *transport, struct bio *bio)
+{
+#if SENDER_COMPACTS_BVECS
+ struct dtr_transport *rdma_transport =
+ container_of(transport, struct dtr_transport, transport);
+ int start = 0, sges = 0, size_tx_desc = 0, remaining = 0, err;
+ int sges_max = rdma_transport->sges_max;
+#endif
+ int err = -EINVAL;
+ struct bio_vec bvec;
+ struct bvec_iter iter;
+
+ //tr_info(transport, "in send_zc_bio, size: %d\n", bio->bi_size);
+
+ if (!dtr_transport_ok(transport))
+ return -ECONNRESET;
+
+#if SENDER_COMPACTS_BVECS
+ bio_for_each_segment(bvec, bio, iter) {
+ size_tx_desc += bvec.bv_len;
+ //tr_info(transport, " bvec len = %d\n", bvec.bv_len);
+ if (size_tx_desc > DRBD_SOCKET_BUFFER_SIZE) {
+ remaining = size_tx_desc - DRBD_SOCKET_BUFFER_SIZE;
+ size_tx_desc = DRBD_SOCKET_BUFFER_SIZE;
+ }
+ sges++;
+ if (size_tx_desc == DRBD_SOCKET_BUFFER_SIZE || sges >= sges_max) {
+ err = dtr_send_bio_part(rdma_transport, bio, start, size_tx_desc, sges);
+ if (err)
+ goto out;
+ start += size_tx_desc;
+ sges = 0;
+ size_tx_desc = remaining;
+ if (remaining) {
+ sges++;
+ remaining = 0;
+ }
+ }
+ }
+ err = dtr_send_bio_part(rdma_transport, bio, start, size_tx_desc, sges);
+ start += size_tx_desc;
+
+ TR_ASSERT(transport, start == bio->bi_iter.bi_size);
+out:
+#else
+ bio_for_each_segment(bvec, bio, iter) {
+ err = dtr_send_page(transport, DATA_STREAM,
+ bvec.bv_page, bvec.bv_offset, bvec.bv_len,
+ 0 /* flags currently unused by dtr_send_page */);
+ if (err)
+ break;
+ }
+#endif
+ if (1 /* stream == DATA_STREAM */)
+ dtr_update_congested(transport);
+
+ return err;
+}
+
+static bool dtr_hint(struct drbd_transport *transport, enum drbd_stream stream,
+ enum drbd_tr_hints hint)
+{
+ switch (hint) {
+ default: /* not implemented, but should not trigger error handling */
+ return true;
+ }
+ return true;
+}
+
+static void dtr_debugfs_show_flow(struct dtr_flow *flow, const char *name, struct seq_file *m)
+{
+ seq_printf(m, " %-7s field: posted\t alloc\tdesired\t max\n", name);
+ seq_printf(m, " tx_descs: %5d\t\t\t%5d\n", atomic_read(&flow->tx_descs_posted), flow->tx_descs_max);
+ seq_printf(m, " peer_rx_descs: %5d (receive window at peer)\n", atomic_read(&flow->peer_rx_descs));
+ seq_printf(m, " rx_descs: %5d\t%5d\t%5d\t%5d\n", atomic_read(&flow->rx_descs_posted),
+ atomic_read(&flow->rx_descs_allocated),
+ flow->rx_descs_want_posted, flow->rx_descs_max);
+ seq_printf(m, " rx_peer_knows: %5d (what the peer knows about my receive window)\n\n",
+ atomic_read(&flow->rx_descs_known_to_peer));
+}
+
+static void dtr_debugfs_show_path(struct dtr_path *path, struct seq_file *m)
+{
+ static const char * const stream_names[] = {
+ [ST_DATA] = "data",
+ [ST_CONTROL] = "control",
+ };
+ static const char * const state_names[] = {
+ [0] = "not connected",
+ [DSM_CONNECT_REQ] = "CONNECT_REQ",
+ [DSM_CONNECTING] = "CONNECTING",
+ [DSM_CONNECTING|DSM_CONNECT_REQ] = "CONNECTING|DSM_CONNECT_REQ",
+ [DSM_CONNECTED] = "CONNECTED",
+ [DSM_CONNECTED|DSM_CONNECT_REQ] = "CONNECTED|CONNECT_REQ",
+ [DSM_CONNECTED|DSM_CONNECTING] = "CONNECTED|CONNECTING",
+ [DSM_CONNECTED|DSM_CONNECTING|DSM_CONNECT_REQ] =
+ "CONNECTED|CONNECTING|DSM_CONNECT_REQ",
+ [DSM_ERROR] = "ERROR",
+ [DSM_ERROR|DSM_CONNECT_REQ] = "ERROR|CONNECT_REQ",
+ [DSM_ERROR|DSM_CONNECTING] = "ERROR|CONNECTING",
+ [DSM_ERROR|DSM_CONNECTING|DSM_CONNECT_REQ] = "ERROR|CONNECTING|CONNECT_REQ",
+ [DSM_ERROR|DSM_CONNECTED] = "ERROR|CONNECTED",
+ [DSM_ERROR|DSM_CONNECTED|DSM_CONNECT_REQ] = "ERROR|CONNECTED|CONNECT_REQ",
+ [DSM_ERROR|DSM_CONNECTED|DSM_CONNECTING] = "ERROR|CONNECTED|CONNECTING|",
+ [DSM_ERROR|DSM_CONNECTED|DSM_CONNECTING|DSM_CONNECT_REQ] =
+ "ERROR|CONNECTED|CONNECTING|CONNECT_REQ",
+ };
+
+ enum drbd_stream i;
+ unsigned long s = 0;
+ struct dtr_cm *cm;
+
+ rcu_read_lock();
+ cm = rcu_dereference(path->cm);
+ if (cm)
+ s = cm->state;
+ rcu_read_unlock();
+
+ seq_printf(m, "%pI4 - %pI4: %s\n",
+ &((struct sockaddr_in *)&path->path.my_addr)->sin_addr,
+ &((struct sockaddr_in *)&path->path.peer_addr)->sin_addr,
+ state_names[s]);
+
+ if (dtr_path_ok(path)) {
+ for (i = DATA_STREAM; i <= CONTROL_STREAM ; i++)
+ dtr_debugfs_show_flow(&path->flow[i], stream_names[i], m);
+ }
+}
+
+static void dtr_debugfs_show(struct drbd_transport *transport, struct seq_file *m)
+{
+ struct dtr_path *path;
+
+ /* BUMP me if you change the file format/content/presentation */
+ seq_printf(m, "v: %u\n\n", 1);
+
+ rcu_read_lock();
+ list_for_each_entry_rcu(path, &transport->paths, path.list)
+ dtr_debugfs_show_path(path, m);
+ rcu_read_unlock();
+}
+
+static int dtr_add_path(struct drbd_path *add_path)
+{
+ struct drbd_transport *transport = add_path->transport;
+ struct dtr_transport *rdma_transport =
+ container_of(transport, struct dtr_transport, transport);
+ struct dtr_path *path;
+
+ path = container_of(add_path, struct dtr_path, path);
+
+ /* initialize private parts of path */
+ atomic_set(&path->cs.passive_state, PCS_INACTIVE);
+ atomic_set(&path->cs.active_state, PCS_INACTIVE);
+ spin_lock_init(&path->send_flow_control_lock);
+ tasklet_setup(&path->flow_control_tasklet, dtr_flow_control_tasklet_fn);
+ INIT_WORK(&path->refill_rx_descs_work, dtr_refill_rx_descs_work_fn);
+ INIT_DELAYED_WORK(&path->cs.retry_connect_work, dtr_cma_retry_connect_work_fn);
+
+ if (!rdma_transport->active)
+ return 0;
+
+ return dtr_activate_path(path);
+}
+
+static bool dtr_may_remove_path(struct drbd_path *del_path)
+{
+ struct drbd_transport *transport = del_path->transport;
+ struct dtr_transport *rdma_transport =
+ container_of(transport, struct dtr_transport, transport);
+ struct drbd_path *drbd_path, *connected_path = NULL;
+ int connected = 0;
+
+ if (!rdma_transport->active)
+ return true;
+
+ list_for_each_entry(drbd_path, &transport->paths, list) {
+ struct dtr_path *path = container_of(drbd_path, struct dtr_path, path);
+
+ if (dtr_path_ok(path)) {
+ connected++;
+ connected_path = drbd_path;
+ }
+ }
+
+ return connected > 1 || connected_path != del_path;
+}
+
+static void dtr_remove_path(struct drbd_path *del_path)
+{
+ struct dtr_path *path = container_of(del_path, struct dtr_path, path);
+
+ dtr_disconnect_path(path);
+}
+
+static int __init dtr_initialize(void)
+{
+ allocation_size = PAGE_SIZE;
+
+ return drbd_register_transport_class(&rdma_transport_class,
+ DRBD_TRANSPORT_API_VERSION,
+ sizeof(struct drbd_transport));
+}
+
+static void __exit dtr_cleanup(void)
+{
+ drbd_unregister_transport_class(&rdma_transport_class);
+}
+
+module_init(dtr_initialize)
+module_exit(dtr_cleanup)
--
2.53.0
© 2016 - 2026 Red Hat, Inc.