aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorsantosh.shilimkar@oracle.com <santosh.shilimkar@oracle.com>2016-03-01 18:20:42 -0500
committerDavid S. Miller <davem@davemloft.net>2016-03-02 14:13:17 -0500
commitdcdede0406d30e14a6ed727dce69dfac0518371d (patch)
tree4aeda07983f9d11bda34553f7f8d8d9823a50e31
parentafc3de9285968c9e15e34e01090a3ef47032a67d (diff)
RDS: Drop stale iWARP RDMA transport
RDS iWarp support code has become stale and non testable. As indicated earlier, am dropping the support for it. If new iWarp user(s) shows up in future, we can adapat the RDS IB transprt for the special RDMA READ sink case. iWarp needs an MR for the RDMA READ sink. Signed-off-by: Santosh Shilimkar <ssantosh@kernel.org> Signed-off-by: Santosh Shilimkar <santosh.shilimkar@oracle.com> Signed-off-by: David S. Miller <davem@davemloft.net>
-rw-r--r--Documentation/networking/rds.txt4
-rw-r--r--net/rds/Kconfig7
-rw-r--r--net/rds/Makefile4
-rw-r--r--net/rds/iw.c312
-rw-r--r--net/rds/iw.h398
-rw-r--r--net/rds/iw_cm.c769
-rw-r--r--net/rds/iw_rdma.c837
-rw-r--r--net/rds/iw_recv.c904
-rw-r--r--net/rds/iw_ring.c169
-rw-r--r--net/rds/iw_send.c981
-rw-r--r--net/rds/iw_stats.c95
-rw-r--r--net/rds/iw_sysctl.c123
-rw-r--r--net/rds/rdma_transport.c13
-rw-r--r--net/rds/rdma_transport.h5
14 files changed, 7 insertions, 4614 deletions
diff --git a/Documentation/networking/rds.txt b/Documentation/networking/rds.txt
index e1a3d59bbe0f..9d219d856d46 100644
--- a/Documentation/networking/rds.txt
+++ b/Documentation/networking/rds.txt
@@ -19,9 +19,7 @@ to N*N if you use a connection-oriented socket transport like TCP.
19 19
20RDS is not Infiniband-specific; it was designed to support different 20RDS is not Infiniband-specific; it was designed to support different
21transports. The current implementation used to support RDS over TCP as well 21transports. The current implementation used to support RDS over TCP as well
22as IB. Work is in progress to support RDS over iWARP, and using DCE to 22as IB.
23guarantee no dropped packets on Ethernet, it may be possible to use RDS over
24UDP in the future.
25 23
26The high-level semantics of RDS from the application's point of view are 24The high-level semantics of RDS from the application's point of view are
27 25
diff --git a/net/rds/Kconfig b/net/rds/Kconfig
index f2c670ba7b9b..bffde4b46c5d 100644
--- a/net/rds/Kconfig
+++ b/net/rds/Kconfig
@@ -4,14 +4,13 @@ config RDS
4 depends on INET 4 depends on INET
5 ---help--- 5 ---help---
6 The RDS (Reliable Datagram Sockets) protocol provides reliable, 6 The RDS (Reliable Datagram Sockets) protocol provides reliable,
7 sequenced delivery of datagrams over Infiniband, iWARP, 7 sequenced delivery of datagrams over Infiniband or TCP.
8 or TCP.
9 8
10config RDS_RDMA 9config RDS_RDMA
11 tristate "RDS over Infiniband and iWARP" 10 tristate "RDS over Infiniband"
12 depends on RDS && INFINIBAND && INFINIBAND_ADDR_TRANS 11 depends on RDS && INFINIBAND && INFINIBAND_ADDR_TRANS
13 ---help--- 12 ---help---
14 Allow RDS to use Infiniband and iWARP as a transport. 13 Allow RDS to use Infiniband as a transport.
15 This transport supports RDMA operations. 14 This transport supports RDMA operations.
16 15
17config RDS_TCP 16config RDS_TCP
diff --git a/net/rds/Makefile b/net/rds/Makefile
index 56d3f6023ced..19e54854ba2d 100644
--- a/net/rds/Makefile
+++ b/net/rds/Makefile
@@ -6,9 +6,7 @@ rds-y := af_rds.o bind.o cong.o connection.o info.o message.o \
6obj-$(CONFIG_RDS_RDMA) += rds_rdma.o 6obj-$(CONFIG_RDS_RDMA) += rds_rdma.o
7rds_rdma-y := rdma_transport.o \ 7rds_rdma-y := rdma_transport.o \
8 ib.o ib_cm.o ib_recv.o ib_ring.o ib_send.o ib_stats.o \ 8 ib.o ib_cm.o ib_recv.o ib_ring.o ib_send.o ib_stats.o \
9 ib_sysctl.o ib_rdma.o \ 9 ib_sysctl.o ib_rdma.o
10 iw.o iw_cm.o iw_recv.o iw_ring.o iw_send.o iw_stats.o \
11 iw_sysctl.o iw_rdma.o
12 10
13 11
14obj-$(CONFIG_RDS_TCP) += rds_tcp.o 12obj-$(CONFIG_RDS_TCP) += rds_tcp.o
diff --git a/net/rds/iw.c b/net/rds/iw.c
deleted file mode 100644
index f4a9fff829e0..000000000000
--- a/net/rds/iw.c
+++ /dev/null
@@ -1,312 +0,0 @@
1/*
2 * Copyright (c) 2006 Oracle. All rights reserved.
3 *
4 * This software is available to you under a choice of one of two
5 * licenses. You may choose to be licensed under the terms of the GNU
6 * General Public License (GPL) Version 2, available from the file
7 * COPYING in the main directory of this source tree, or the
8 * OpenIB.org BSD license below:
9 *
10 * Redistribution and use in source and binary forms, with or
11 * without modification, are permitted provided that the following
12 * conditions are met:
13 *
14 * - Redistributions of source code must retain the above
15 * copyright notice, this list of conditions and the following
16 * disclaimer.
17 *
18 * - Redistributions in binary form must reproduce the above
19 * copyright notice, this list of conditions and the following
20 * disclaimer in the documentation and/or other materials
21 * provided with the distribution.
22 *
23 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
24 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
25 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
26 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
27 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
28 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
29 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
30 * SOFTWARE.
31 *
32 */
33#include <linux/kernel.h>
34#include <linux/in.h>
35#include <linux/if.h>
36#include <linux/netdevice.h>
37#include <linux/inetdevice.h>
38#include <linux/if_arp.h>
39#include <linux/delay.h>
40#include <linux/slab.h>
41#include <linux/module.h>
42
43#include "rds.h"
44#include "iw.h"
45
46unsigned int fastreg_pool_size = RDS_FASTREG_POOL_SIZE;
47unsigned int fastreg_message_size = RDS_FASTREG_SIZE + 1; /* +1 allows for unaligned MRs */
48
49module_param(fastreg_pool_size, int, 0444);
50MODULE_PARM_DESC(fastreg_pool_size, " Max number of fastreg MRs per device");
51module_param(fastreg_message_size, int, 0444);
52MODULE_PARM_DESC(fastreg_message_size, " Max size of a RDMA transfer (fastreg MRs)");
53
54struct list_head rds_iw_devices;
55
56/* NOTE: if also grabbing iwdev lock, grab this first */
57DEFINE_SPINLOCK(iw_nodev_conns_lock);
58LIST_HEAD(iw_nodev_conns);
59
60static void rds_iw_add_one(struct ib_device *device)
61{
62 struct rds_iw_device *rds_iwdev;
63
64 /* Only handle iwarp devices */
65 if (device->node_type != RDMA_NODE_RNIC)
66 return;
67
68 rds_iwdev = kmalloc(sizeof *rds_iwdev, GFP_KERNEL);
69 if (!rds_iwdev)
70 return;
71
72 spin_lock_init(&rds_iwdev->spinlock);
73
74 rds_iwdev->dma_local_lkey = !!(device->attrs.device_cap_flags & IB_DEVICE_LOCAL_DMA_LKEY);
75 rds_iwdev->max_wrs = device->attrs.max_qp_wr;
76 rds_iwdev->max_sge = min(device->attrs.max_sge, RDS_IW_MAX_SGE);
77
78 rds_iwdev->dev = device;
79 rds_iwdev->pd = ib_alloc_pd(device);
80 if (IS_ERR(rds_iwdev->pd))
81 goto free_dev;
82
83 if (!rds_iwdev->dma_local_lkey) {
84 rds_iwdev->mr = ib_get_dma_mr(rds_iwdev->pd,
85 IB_ACCESS_REMOTE_READ |
86 IB_ACCESS_REMOTE_WRITE |
87 IB_ACCESS_LOCAL_WRITE);
88 if (IS_ERR(rds_iwdev->mr))
89 goto err_pd;
90 } else
91 rds_iwdev->mr = NULL;
92
93 rds_iwdev->mr_pool = rds_iw_create_mr_pool(rds_iwdev);
94 if (IS_ERR(rds_iwdev->mr_pool)) {
95 rds_iwdev->mr_pool = NULL;
96 goto err_mr;
97 }
98
99 INIT_LIST_HEAD(&rds_iwdev->cm_id_list);
100 INIT_LIST_HEAD(&rds_iwdev->conn_list);
101 list_add_tail(&rds_iwdev->list, &rds_iw_devices);
102
103 ib_set_client_data(device, &rds_iw_client, rds_iwdev);
104 return;
105
106err_mr:
107 if (rds_iwdev->mr)
108 ib_dereg_mr(rds_iwdev->mr);
109err_pd:
110 ib_dealloc_pd(rds_iwdev->pd);
111free_dev:
112 kfree(rds_iwdev);
113}
114
115static void rds_iw_remove_one(struct ib_device *device, void *client_data)
116{
117 struct rds_iw_device *rds_iwdev = client_data;
118 struct rds_iw_cm_id *i_cm_id, *next;
119
120 if (!rds_iwdev)
121 return;
122
123 spin_lock_irq(&rds_iwdev->spinlock);
124 list_for_each_entry_safe(i_cm_id, next, &rds_iwdev->cm_id_list, list) {
125 list_del(&i_cm_id->list);
126 kfree(i_cm_id);
127 }
128 spin_unlock_irq(&rds_iwdev->spinlock);
129
130 rds_iw_destroy_conns(rds_iwdev);
131
132 if (rds_iwdev->mr_pool)
133 rds_iw_destroy_mr_pool(rds_iwdev->mr_pool);
134
135 if (rds_iwdev->mr)
136 ib_dereg_mr(rds_iwdev->mr);
137
138 ib_dealloc_pd(rds_iwdev->pd);
139
140 list_del(&rds_iwdev->list);
141 kfree(rds_iwdev);
142}
143
144struct ib_client rds_iw_client = {
145 .name = "rds_iw",
146 .add = rds_iw_add_one,
147 .remove = rds_iw_remove_one
148};
149
150static int rds_iw_conn_info_visitor(struct rds_connection *conn,
151 void *buffer)
152{
153 struct rds_info_rdma_connection *iinfo = buffer;
154 struct rds_iw_connection *ic;
155
156 /* We will only ever look at IB transports */
157 if (conn->c_trans != &rds_iw_transport)
158 return 0;
159
160 iinfo->src_addr = conn->c_laddr;
161 iinfo->dst_addr = conn->c_faddr;
162
163 memset(&iinfo->src_gid, 0, sizeof(iinfo->src_gid));
164 memset(&iinfo->dst_gid, 0, sizeof(iinfo->dst_gid));
165 if (rds_conn_state(conn) == RDS_CONN_UP) {
166 struct rds_iw_device *rds_iwdev;
167 struct rdma_dev_addr *dev_addr;
168
169 ic = conn->c_transport_data;
170 dev_addr = &ic->i_cm_id->route.addr.dev_addr;
171
172 rdma_addr_get_sgid(dev_addr, (union ib_gid *) &iinfo->src_gid);
173 rdma_addr_get_dgid(dev_addr, (union ib_gid *) &iinfo->dst_gid);
174
175 rds_iwdev = ib_get_client_data(ic->i_cm_id->device, &rds_iw_client);
176 iinfo->max_send_wr = ic->i_send_ring.w_nr;
177 iinfo->max_recv_wr = ic->i_recv_ring.w_nr;
178 iinfo->max_send_sge = rds_iwdev->max_sge;
179 rds_iw_get_mr_info(rds_iwdev, iinfo);
180 }
181 return 1;
182}
183
184static void rds_iw_ic_info(struct socket *sock, unsigned int len,
185 struct rds_info_iterator *iter,
186 struct rds_info_lengths *lens)
187{
188 rds_for_each_conn_info(sock, len, iter, lens,
189 rds_iw_conn_info_visitor,
190 sizeof(struct rds_info_rdma_connection));
191}
192
193
194/*
195 * Early RDS/IB was built to only bind to an address if there is an IPoIB
196 * device with that address set.
197 *
198 * If it were me, I'd advocate for something more flexible. Sending and
199 * receiving should be device-agnostic. Transports would try and maintain
200 * connections between peers who have messages queued. Userspace would be
201 * allowed to influence which paths have priority. We could call userspace
202 * asserting this policy "routing".
203 */
204static int rds_iw_laddr_check(struct net *net, __be32 addr)
205{
206 int ret;
207 struct rdma_cm_id *cm_id;
208 struct sockaddr_in sin;
209
210 /* Create a CMA ID and try to bind it. This catches both
211 * IB and iWARP capable NICs.
212 */
213 cm_id = rdma_create_id(&init_net, NULL, NULL, RDMA_PS_TCP, IB_QPT_RC);
214 if (IS_ERR(cm_id))
215 return PTR_ERR(cm_id);
216
217 memset(&sin, 0, sizeof(sin));
218 sin.sin_family = AF_INET;
219 sin.sin_addr.s_addr = addr;
220
221 /* rdma_bind_addr will only succeed for IB & iWARP devices */
222 ret = rdma_bind_addr(cm_id, (struct sockaddr *)&sin);
223 /* due to this, we will claim to support IB devices unless we
224 check node_type. */
225 if (ret || !cm_id->device ||
226 cm_id->device->node_type != RDMA_NODE_RNIC)
227 ret = -EADDRNOTAVAIL;
228
229 rdsdebug("addr %pI4 ret %d node type %d\n",
230 &addr, ret,
231 cm_id->device ? cm_id->device->node_type : -1);
232
233 rdma_destroy_id(cm_id);
234
235 return ret;
236}
237
238void rds_iw_exit(void)
239{
240 rds_info_deregister_func(RDS_INFO_IWARP_CONNECTIONS, rds_iw_ic_info);
241 rds_iw_destroy_nodev_conns();
242 ib_unregister_client(&rds_iw_client);
243 rds_iw_sysctl_exit();
244 rds_iw_recv_exit();
245 rds_trans_unregister(&rds_iw_transport);
246}
247
248struct rds_transport rds_iw_transport = {
249 .laddr_check = rds_iw_laddr_check,
250 .xmit_complete = rds_iw_xmit_complete,
251 .xmit = rds_iw_xmit,
252 .xmit_rdma = rds_iw_xmit_rdma,
253 .recv = rds_iw_recv,
254 .conn_alloc = rds_iw_conn_alloc,
255 .conn_free = rds_iw_conn_free,
256 .conn_connect = rds_iw_conn_connect,
257 .conn_shutdown = rds_iw_conn_shutdown,
258 .inc_copy_to_user = rds_iw_inc_copy_to_user,
259 .inc_free = rds_iw_inc_free,
260 .cm_initiate_connect = rds_iw_cm_initiate_connect,
261 .cm_handle_connect = rds_iw_cm_handle_connect,
262 .cm_connect_complete = rds_iw_cm_connect_complete,
263 .stats_info_copy = rds_iw_stats_info_copy,
264 .exit = rds_iw_exit,
265 .get_mr = rds_iw_get_mr,
266 .sync_mr = rds_iw_sync_mr,
267 .free_mr = rds_iw_free_mr,
268 .flush_mrs = rds_iw_flush_mrs,
269 .t_owner = THIS_MODULE,
270 .t_name = "iwarp",
271 .t_type = RDS_TRANS_IWARP,
272 .t_prefer_loopback = 1,
273};
274
275int rds_iw_init(void)
276{
277 int ret;
278
279 INIT_LIST_HEAD(&rds_iw_devices);
280
281 ret = ib_register_client(&rds_iw_client);
282 if (ret)
283 goto out;
284
285 ret = rds_iw_sysctl_init();
286 if (ret)
287 goto out_ibreg;
288
289 ret = rds_iw_recv_init();
290 if (ret)
291 goto out_sysctl;
292
293 ret = rds_trans_register(&rds_iw_transport);
294 if (ret)
295 goto out_recv;
296
297 rds_info_register_func(RDS_INFO_IWARP_CONNECTIONS, rds_iw_ic_info);
298
299 goto out;
300
301out_recv:
302 rds_iw_recv_exit();
303out_sysctl:
304 rds_iw_sysctl_exit();
305out_ibreg:
306 ib_unregister_client(&rds_iw_client);
307out:
308 return ret;
309}
310
311MODULE_LICENSE("GPL");
312
diff --git a/net/rds/iw.h b/net/rds/iw.h
deleted file mode 100644
index 5af01d1758b3..000000000000
--- a/net/rds/iw.h
+++ /dev/null
@@ -1,398 +0,0 @@
1#ifndef _RDS_IW_H
2#define _RDS_IW_H
3
4#include <linux/interrupt.h>
5#include <rdma/ib_verbs.h>
6#include <rdma/rdma_cm.h>
7#include "rds.h"
8#include "rdma_transport.h"
9
10#define RDS_FASTREG_SIZE 20
11#define RDS_FASTREG_POOL_SIZE 2048
12
13#define RDS_IW_MAX_SGE 8
14#define RDS_IW_RECV_SGE 2
15
16#define RDS_IW_DEFAULT_RECV_WR 1024
17#define RDS_IW_DEFAULT_SEND_WR 256
18
19#define RDS_IW_SUPPORTED_PROTOCOLS 0x00000003 /* minor versions supported */
20
21extern struct list_head rds_iw_devices;
22
23/*
24 * IB posts RDS_FRAG_SIZE fragments of pages to the receive queues to
25 * try and minimize the amount of memory tied up both the device and
26 * socket receive queues.
27 */
28/* page offset of the final full frag that fits in the page */
29#define RDS_PAGE_LAST_OFF (((PAGE_SIZE / RDS_FRAG_SIZE) - 1) * RDS_FRAG_SIZE)
30struct rds_page_frag {
31 struct list_head f_item;
32 struct page *f_page;
33 unsigned long f_offset;
34 dma_addr_t f_mapped;
35};
36
37struct rds_iw_incoming {
38 struct list_head ii_frags;
39 struct rds_incoming ii_inc;
40};
41
42struct rds_iw_connect_private {
43 /* Add new fields at the end, and don't permute existing fields. */
44 __be32 dp_saddr;
45 __be32 dp_daddr;
46 u8 dp_protocol_major;
47 u8 dp_protocol_minor;
48 __be16 dp_protocol_minor_mask; /* bitmask */
49 __be32 dp_reserved1;
50 __be64 dp_ack_seq;
51 __be32 dp_credit; /* non-zero enables flow ctl */
52};
53
54struct rds_iw_scatterlist {
55 struct scatterlist *list;
56 unsigned int len;
57 int dma_len;
58 unsigned int dma_npages;
59 unsigned int bytes;
60};
61
62struct rds_iw_mapping {
63 spinlock_t m_lock; /* protect the mapping struct */
64 struct list_head m_list;
65 struct rds_iw_mr *m_mr;
66 uint32_t m_rkey;
67 struct rds_iw_scatterlist m_sg;
68};
69
70struct rds_iw_send_work {
71 struct rds_message *s_rm;
72
73 /* We should really put these into a union: */
74 struct rm_rdma_op *s_op;
75 struct rds_iw_mapping *s_mapping;
76 struct ib_mr *s_mr;
77 unsigned char s_remap_count;
78
79 union {
80 struct ib_send_wr s_send_wr;
81 struct ib_rdma_wr s_rdma_wr;
82 struct ib_reg_wr s_reg_wr;
83 };
84 struct ib_sge s_sge[RDS_IW_MAX_SGE];
85 unsigned long s_queued;
86};
87
88struct rds_iw_recv_work {
89 struct rds_iw_incoming *r_iwinc;
90 struct rds_page_frag *r_frag;
91 struct ib_recv_wr r_wr;
92 struct ib_sge r_sge[2];
93};
94
95struct rds_iw_work_ring {
96 u32 w_nr;
97 u32 w_alloc_ptr;
98 u32 w_alloc_ctr;
99 u32 w_free_ptr;
100 atomic_t w_free_ctr;
101};
102
103struct rds_iw_device;
104
105struct rds_iw_connection {
106
107 struct list_head iw_node;
108 struct rds_iw_device *rds_iwdev;
109 struct rds_connection *conn;
110
111 /* alphabet soup, IBTA style */
112 struct rdma_cm_id *i_cm_id;
113 struct ib_pd *i_pd;
114 struct ib_mr *i_mr;
115 struct ib_cq *i_send_cq;
116 struct ib_cq *i_recv_cq;
117
118 /* tx */
119 struct rds_iw_work_ring i_send_ring;
120 struct rds_message *i_rm;
121 struct rds_header *i_send_hdrs;
122 u64 i_send_hdrs_dma;
123 struct rds_iw_send_work *i_sends;
124
125 /* rx */
126 struct tasklet_struct i_recv_tasklet;
127 struct mutex i_recv_mutex;
128 struct rds_iw_work_ring i_recv_ring;
129 struct rds_iw_incoming *i_iwinc;
130 u32 i_recv_data_rem;
131 struct rds_header *i_recv_hdrs;
132 u64 i_recv_hdrs_dma;
133 struct rds_iw_recv_work *i_recvs;
134 struct rds_page_frag i_frag;
135 u64 i_ack_recv; /* last ACK received */
136
137 /* sending acks */
138 unsigned long i_ack_flags;
139#ifdef KERNEL_HAS_ATOMIC64
140 atomic64_t i_ack_next; /* next ACK to send */
141#else
142 spinlock_t i_ack_lock; /* protect i_ack_next */
143 u64 i_ack_next; /* next ACK to send */
144#endif
145 struct rds_header *i_ack;
146 struct ib_send_wr i_ack_wr;
147 struct ib_sge i_ack_sge;
148 u64 i_ack_dma;
149 unsigned long i_ack_queued;
150
151 /* Flow control related information
152 *
153 * Our algorithm uses a pair variables that we need to access
154 * atomically - one for the send credits, and one posted
155 * recv credits we need to transfer to remote.
156 * Rather than protect them using a slow spinlock, we put both into
157 * a single atomic_t and update it using cmpxchg
158 */
159 atomic_t i_credits;
160
161 /* Protocol version specific information */
162 unsigned int i_flowctl:1; /* enable/disable flow ctl */
163 unsigned int i_dma_local_lkey:1;
164 unsigned int i_fastreg_posted:1; /* fastreg posted on this connection */
165 /* Batched completions */
166 unsigned int i_unsignaled_wrs;
167 long i_unsignaled_bytes;
168};
169
170/* This assumes that atomic_t is at least 32 bits */
171#define IB_GET_SEND_CREDITS(v) ((v) & 0xffff)
172#define IB_GET_POST_CREDITS(v) ((v) >> 16)
173#define IB_SET_SEND_CREDITS(v) ((v) & 0xffff)
174#define IB_SET_POST_CREDITS(v) ((v) << 16)
175
176struct rds_iw_cm_id {
177 struct list_head list;
178 struct rdma_cm_id *cm_id;
179};
180
181struct rds_iw_device {
182 struct list_head list;
183 struct list_head cm_id_list;
184 struct list_head conn_list;
185 struct ib_device *dev;
186 struct ib_pd *pd;
187 struct ib_mr *mr;
188 struct rds_iw_mr_pool *mr_pool;
189 int max_sge;
190 unsigned int max_wrs;
191 unsigned int dma_local_lkey:1;
192 spinlock_t spinlock; /* protect the above */
193};
194
195/* bits for i_ack_flags */
196#define IB_ACK_IN_FLIGHT 0
197#define IB_ACK_REQUESTED 1
198
199/* Magic WR_ID for ACKs */
200#define RDS_IW_ACK_WR_ID ((u64)0xffffffffffffffffULL)
201#define RDS_IW_REG_WR_ID ((u64)0xefefefefefefefefULL)
202#define RDS_IW_LOCAL_INV_WR_ID ((u64)0xdfdfdfdfdfdfdfdfULL)
203
204struct rds_iw_statistics {
205 uint64_t s_iw_connect_raced;
206 uint64_t s_iw_listen_closed_stale;
207 uint64_t s_iw_tx_cq_call;
208 uint64_t s_iw_tx_cq_event;
209 uint64_t s_iw_tx_ring_full;
210 uint64_t s_iw_tx_throttle;
211 uint64_t s_iw_tx_sg_mapping_failure;
212 uint64_t s_iw_tx_stalled;
213 uint64_t s_iw_tx_credit_updates;
214 uint64_t s_iw_rx_cq_call;
215 uint64_t s_iw_rx_cq_event;
216 uint64_t s_iw_rx_ring_empty;
217 uint64_t s_iw_rx_refill_from_cq;
218 uint64_t s_iw_rx_refill_from_thread;
219 uint64_t s_iw_rx_alloc_limit;
220 uint64_t s_iw_rx_credit_updates;
221 uint64_t s_iw_ack_sent;
222 uint64_t s_iw_ack_send_failure;
223 uint64_t s_iw_ack_send_delayed;
224 uint64_t s_iw_ack_send_piggybacked;
225 uint64_t s_iw_ack_received;
226 uint64_t s_iw_rdma_mr_alloc;
227 uint64_t s_iw_rdma_mr_free;
228 uint64_t s_iw_rdma_mr_used;
229 uint64_t s_iw_rdma_mr_pool_flush;
230 uint64_t s_iw_rdma_mr_pool_wait;
231 uint64_t s_iw_rdma_mr_pool_depleted;
232};
233
234extern struct workqueue_struct *rds_iw_wq;
235
236/*
237 * Fake ib_dma_sync_sg_for_{cpu,device} as long as ib_verbs.h
238 * doesn't define it.
239 */
240static inline void rds_iw_dma_sync_sg_for_cpu(struct ib_device *dev,
241 struct scatterlist *sg, unsigned int sg_dma_len, int direction)
242{
243 unsigned int i;
244
245 for (i = 0; i < sg_dma_len; ++i) {
246 ib_dma_sync_single_for_cpu(dev,
247 ib_sg_dma_address(dev, &sg[i]),
248 ib_sg_dma_len(dev, &sg[i]),
249 direction);
250 }
251}
252#define ib_dma_sync_sg_for_cpu rds_iw_dma_sync_sg_for_cpu
253
254static inline void rds_iw_dma_sync_sg_for_device(struct ib_device *dev,
255 struct scatterlist *sg, unsigned int sg_dma_len, int direction)
256{
257 unsigned int i;
258
259 for (i = 0; i < sg_dma_len; ++i) {
260 ib_dma_sync_single_for_device(dev,
261 ib_sg_dma_address(dev, &sg[i]),
262 ib_sg_dma_len(dev, &sg[i]),
263 direction);
264 }
265}
266#define ib_dma_sync_sg_for_device rds_iw_dma_sync_sg_for_device
267
268static inline u32 rds_iw_local_dma_lkey(struct rds_iw_connection *ic)
269{
270 return ic->i_dma_local_lkey ? ic->i_cm_id->device->local_dma_lkey : ic->i_mr->lkey;
271}
272
273/* ib.c */
274extern struct rds_transport rds_iw_transport;
275extern struct ib_client rds_iw_client;
276
277extern unsigned int fastreg_pool_size;
278extern unsigned int fastreg_message_size;
279
280extern spinlock_t iw_nodev_conns_lock;
281extern struct list_head iw_nodev_conns;
282
283/* ib_cm.c */
284int rds_iw_conn_alloc(struct rds_connection *conn, gfp_t gfp);
285void rds_iw_conn_free(void *arg);
286int rds_iw_conn_connect(struct rds_connection *conn);
287void rds_iw_conn_shutdown(struct rds_connection *conn);
288void rds_iw_state_change(struct sock *sk);
289int rds_iw_listen_init(void);
290void rds_iw_listen_stop(void);
291void __rds_iw_conn_error(struct rds_connection *conn, const char *, ...);
292int rds_iw_cm_handle_connect(struct rdma_cm_id *cm_id,
293 struct rdma_cm_event *event);
294int rds_iw_cm_initiate_connect(struct rdma_cm_id *cm_id);
295void rds_iw_cm_connect_complete(struct rds_connection *conn,
296 struct rdma_cm_event *event);
297
298
299#define rds_iw_conn_error(conn, fmt...) \
300 __rds_iw_conn_error(conn, KERN_WARNING "RDS/IW: " fmt)
301
302/* ib_rdma.c */
303int rds_iw_update_cm_id(struct rds_iw_device *rds_iwdev, struct rdma_cm_id *cm_id);
304void rds_iw_add_conn(struct rds_iw_device *rds_iwdev, struct rds_connection *conn);
305void rds_iw_remove_conn(struct rds_iw_device *rds_iwdev, struct rds_connection *conn);
306void __rds_iw_destroy_conns(struct list_head *list, spinlock_t *list_lock);
307static inline void rds_iw_destroy_nodev_conns(void)
308{
309 __rds_iw_destroy_conns(&iw_nodev_conns, &iw_nodev_conns_lock);
310}
311static inline void rds_iw_destroy_conns(struct rds_iw_device *rds_iwdev)
312{
313 __rds_iw_destroy_conns(&rds_iwdev->conn_list, &rds_iwdev->spinlock);
314}
315struct rds_iw_mr_pool *rds_iw_create_mr_pool(struct rds_iw_device *);
316void rds_iw_get_mr_info(struct rds_iw_device *rds_iwdev, struct rds_info_rdma_connection *iinfo);
317void rds_iw_destroy_mr_pool(struct rds_iw_mr_pool *);
318void *rds_iw_get_mr(struct scatterlist *sg, unsigned long nents,
319 struct rds_sock *rs, u32 *key_ret);
320void rds_iw_sync_mr(void *trans_private, int dir);
321void rds_iw_free_mr(void *trans_private, int invalidate);
322void rds_iw_flush_mrs(void);
323
324/* ib_recv.c */
325int rds_iw_recv_init(void);
326void rds_iw_recv_exit(void);
327int rds_iw_recv(struct rds_connection *conn);
328int rds_iw_recv_refill(struct rds_connection *conn, gfp_t kptr_gfp,
329 gfp_t page_gfp, int prefill);
330void rds_iw_inc_free(struct rds_incoming *inc);
331int rds_iw_inc_copy_to_user(struct rds_incoming *inc, struct iov_iter *to);
332void rds_iw_recv_cq_comp_handler(struct ib_cq *cq, void *context);
333void rds_iw_recv_tasklet_fn(unsigned long data);
334void rds_iw_recv_init_ring(struct rds_iw_connection *ic);
335void rds_iw_recv_clear_ring(struct rds_iw_connection *ic);
336void rds_iw_recv_init_ack(struct rds_iw_connection *ic);
337void rds_iw_attempt_ack(struct rds_iw_connection *ic);
338void rds_iw_ack_send_complete(struct rds_iw_connection *ic);
339u64 rds_iw_piggyb_ack(struct rds_iw_connection *ic);
340
341/* ib_ring.c */
342void rds_iw_ring_init(struct rds_iw_work_ring *ring, u32 nr);
343void rds_iw_ring_resize(struct rds_iw_work_ring *ring, u32 nr);
344u32 rds_iw_ring_alloc(struct rds_iw_work_ring *ring, u32 val, u32 *pos);
345void rds_iw_ring_free(struct rds_iw_work_ring *ring, u32 val);
346void rds_iw_ring_unalloc(struct rds_iw_work_ring *ring, u32 val);
347int rds_iw_ring_empty(struct rds_iw_work_ring *ring);
348int rds_iw_ring_low(struct rds_iw_work_ring *ring);
349u32 rds_iw_ring_oldest(struct rds_iw_work_ring *ring);
350u32 rds_iw_ring_completed(struct rds_iw_work_ring *ring, u32 wr_id, u32 oldest);
351extern wait_queue_head_t rds_iw_ring_empty_wait;
352
353/* ib_send.c */
354void rds_iw_xmit_complete(struct rds_connection *conn);
355int rds_iw_xmit(struct rds_connection *conn, struct rds_message *rm,
356 unsigned int hdr_off, unsigned int sg, unsigned int off);
357void rds_iw_send_cq_comp_handler(struct ib_cq *cq, void *context);
358void rds_iw_send_init_ring(struct rds_iw_connection *ic);
359void rds_iw_send_clear_ring(struct rds_iw_connection *ic);
360int rds_iw_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op);
361void rds_iw_send_add_credits(struct rds_connection *conn, unsigned int credits);
362void rds_iw_advertise_credits(struct rds_connection *conn, unsigned int posted);
363int rds_iw_send_grab_credits(struct rds_iw_connection *ic, u32 wanted,
364 u32 *adv_credits, int need_posted, int max_posted);
365
366/* ib_stats.c */
367DECLARE_PER_CPU(struct rds_iw_statistics, rds_iw_stats);
368#define rds_iw_stats_inc(member) rds_stats_inc_which(rds_iw_stats, member)
369unsigned int rds_iw_stats_info_copy(struct rds_info_iterator *iter,
370 unsigned int avail);
371
372/* ib_sysctl.c */
373int rds_iw_sysctl_init(void);
374void rds_iw_sysctl_exit(void);
375extern unsigned long rds_iw_sysctl_max_send_wr;
376extern unsigned long rds_iw_sysctl_max_recv_wr;
377extern unsigned long rds_iw_sysctl_max_unsig_wrs;
378extern unsigned long rds_iw_sysctl_max_unsig_bytes;
379extern unsigned long rds_iw_sysctl_max_recv_allocation;
380extern unsigned int rds_iw_sysctl_flow_control;
381
382/*
383 * Helper functions for getting/setting the header and data SGEs in
384 * RDS packets (not RDMA)
385 */
386static inline struct ib_sge *
387rds_iw_header_sge(struct rds_iw_connection *ic, struct ib_sge *sge)
388{
389 return &sge[0];
390}
391
392static inline struct ib_sge *
393rds_iw_data_sge(struct rds_iw_connection *ic, struct ib_sge *sge)
394{
395 return &sge[1];
396}
397
398#endif
diff --git a/net/rds/iw_cm.c b/net/rds/iw_cm.c
deleted file mode 100644
index aea4c911bc76..000000000000
--- a/net/rds/iw_cm.c
+++ /dev/null
@@ -1,769 +0,0 @@
1/*
2 * Copyright (c) 2006 Oracle. All rights reserved.
3 *
4 * This software is available to you under a choice of one of two
5 * licenses. You may choose to be licensed under the terms of the GNU
6 * General Public License (GPL) Version 2, available from the file
7 * COPYING in the main directory of this source tree, or the
8 * OpenIB.org BSD license below:
9 *
10 * Redistribution and use in source and binary forms, with or
11 * without modification, are permitted provided that the following
12 * conditions are met:
13 *
14 * - Redistributions of source code must retain the above
15 * copyright notice, this list of conditions and the following
16 * disclaimer.
17 *
18 * - Redistributions in binary form must reproduce the above
19 * copyright notice, this list of conditions and the following
20 * disclaimer in the documentation and/or other materials
21 * provided with the distribution.
22 *
23 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
24 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
25 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
26 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
27 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
28 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
29 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
30 * SOFTWARE.
31 *
32 */
33#include <linux/kernel.h>
34#include <linux/in.h>
35#include <linux/slab.h>
36#include <linux/vmalloc.h>
37#include <linux/ratelimit.h>
38
39#include "rds.h"
40#include "iw.h"
41
42/*
43 * Set the selected protocol version
44 */
45static void rds_iw_set_protocol(struct rds_connection *conn, unsigned int version)
46{
47 conn->c_version = version;
48}
49
50/*
51 * Set up flow control
52 */
53static void rds_iw_set_flow_control(struct rds_connection *conn, u32 credits)
54{
55 struct rds_iw_connection *ic = conn->c_transport_data;
56
57 if (rds_iw_sysctl_flow_control && credits != 0) {
58 /* We're doing flow control */
59 ic->i_flowctl = 1;
60 rds_iw_send_add_credits(conn, credits);
61 } else {
62 ic->i_flowctl = 0;
63 }
64}
65
66/*
67 * Connection established.
68 * We get here for both outgoing and incoming connection.
69 */
70void rds_iw_cm_connect_complete(struct rds_connection *conn, struct rdma_cm_event *event)
71{
72 const struct rds_iw_connect_private *dp = NULL;
73 struct rds_iw_connection *ic = conn->c_transport_data;
74 struct rds_iw_device *rds_iwdev;
75 int err;
76
77 if (event->param.conn.private_data_len) {
78 dp = event->param.conn.private_data;
79
80 rds_iw_set_protocol(conn,
81 RDS_PROTOCOL(dp->dp_protocol_major,
82 dp->dp_protocol_minor));
83 rds_iw_set_flow_control(conn, be32_to_cpu(dp->dp_credit));
84 }
85
86 /* update ib_device with this local ipaddr & conn */
87 rds_iwdev = ib_get_client_data(ic->i_cm_id->device, &rds_iw_client);
88 err = rds_iw_update_cm_id(rds_iwdev, ic->i_cm_id);
89 if (err)
90 printk(KERN_ERR "rds_iw_update_ipaddr failed (%d)\n", err);
91 rds_iw_add_conn(rds_iwdev, conn);
92
93 /* If the peer gave us the last packet it saw, process this as if
94 * we had received a regular ACK. */
95 if (dp && dp->dp_ack_seq)
96 rds_send_drop_acked(conn, be64_to_cpu(dp->dp_ack_seq), NULL);
97
98 printk(KERN_NOTICE "RDS/IW: connected to %pI4<->%pI4 version %u.%u%s\n",
99 &conn->c_laddr, &conn->c_faddr,
100 RDS_PROTOCOL_MAJOR(conn->c_version),
101 RDS_PROTOCOL_MINOR(conn->c_version),
102 ic->i_flowctl ? ", flow control" : "");
103
104 rds_connect_complete(conn);
105}
106
107static void rds_iw_cm_fill_conn_param(struct rds_connection *conn,
108 struct rdma_conn_param *conn_param,
109 struct rds_iw_connect_private *dp,
110 u32 protocol_version)
111{
112 struct rds_iw_connection *ic = conn->c_transport_data;
113
114 memset(conn_param, 0, sizeof(struct rdma_conn_param));
115 /* XXX tune these? */
116 conn_param->responder_resources = 1;
117 conn_param->initiator_depth = 1;
118
119 if (dp) {
120 memset(dp, 0, sizeof(*dp));
121 dp->dp_saddr = conn->c_laddr;
122 dp->dp_daddr = conn->c_faddr;
123 dp->dp_protocol_major = RDS_PROTOCOL_MAJOR(protocol_version);
124 dp->dp_protocol_minor = RDS_PROTOCOL_MINOR(protocol_version);
125 dp->dp_protocol_minor_mask = cpu_to_be16(RDS_IW_SUPPORTED_PROTOCOLS);
126 dp->dp_ack_seq = rds_iw_piggyb_ack(ic);
127
128 /* Advertise flow control */
129 if (ic->i_flowctl) {
130 unsigned int credits;
131
132 credits = IB_GET_POST_CREDITS(atomic_read(&ic->i_credits));
133 dp->dp_credit = cpu_to_be32(credits);
134 atomic_sub(IB_SET_POST_CREDITS(credits), &ic->i_credits);
135 }
136
137 conn_param->private_data = dp;
138 conn_param->private_data_len = sizeof(*dp);
139 }
140}
141
142static void rds_iw_cq_event_handler(struct ib_event *event, void *data)
143{
144 rdsdebug("event %u data %p\n", event->event, data);
145}
146
147static void rds_iw_qp_event_handler(struct ib_event *event, void *data)
148{
149 struct rds_connection *conn = data;
150 struct rds_iw_connection *ic = conn->c_transport_data;
151
152 rdsdebug("conn %p ic %p event %u\n", conn, ic, event->event);
153
154 switch (event->event) {
155 case IB_EVENT_COMM_EST:
156 rdma_notify(ic->i_cm_id, IB_EVENT_COMM_EST);
157 break;
158 case IB_EVENT_QP_REQ_ERR:
159 case IB_EVENT_QP_FATAL:
160 default:
161 rdsdebug("Fatal QP Event %u "
162 "- connection %pI4->%pI4, reconnecting\n",
163 event->event, &conn->c_laddr,
164 &conn->c_faddr);
165 rds_conn_drop(conn);
166 break;
167 }
168}
169
170/*
171 * Create a QP
172 */
173static int rds_iw_init_qp_attrs(struct ib_qp_init_attr *attr,
174 struct rds_iw_device *rds_iwdev,
175 struct rds_iw_work_ring *send_ring,
176 void (*send_cq_handler)(struct ib_cq *, void *),
177 struct rds_iw_work_ring *recv_ring,
178 void (*recv_cq_handler)(struct ib_cq *, void *),
179 void *context)
180{
181 struct ib_device *dev = rds_iwdev->dev;
182 struct ib_cq_init_attr cq_attr = {};
183 unsigned int send_size, recv_size;
184 int ret;
185
186 /* The offset of 1 is to accommodate the additional ACK WR. */
187 send_size = min_t(unsigned int, rds_iwdev->max_wrs, rds_iw_sysctl_max_send_wr + 1);
188 recv_size = min_t(unsigned int, rds_iwdev->max_wrs, rds_iw_sysctl_max_recv_wr + 1);
189 rds_iw_ring_resize(send_ring, send_size - 1);
190 rds_iw_ring_resize(recv_ring, recv_size - 1);
191
192 memset(attr, 0, sizeof(*attr));
193 attr->event_handler = rds_iw_qp_event_handler;
194 attr->qp_context = context;
195 attr->cap.max_send_wr = send_size;
196 attr->cap.max_recv_wr = recv_size;
197 attr->cap.max_send_sge = rds_iwdev->max_sge;
198 attr->cap.max_recv_sge = RDS_IW_RECV_SGE;
199 attr->sq_sig_type = IB_SIGNAL_REQ_WR;
200 attr->qp_type = IB_QPT_RC;
201
202 cq_attr.cqe = send_size;
203 attr->send_cq = ib_create_cq(dev, send_cq_handler,
204 rds_iw_cq_event_handler,
205 context, &cq_attr);
206 if (IS_ERR(attr->send_cq)) {
207 ret = PTR_ERR(attr->send_cq);
208 attr->send_cq = NULL;
209 rdsdebug("ib_create_cq send failed: %d\n", ret);
210 goto out;
211 }
212
213 cq_attr.cqe = recv_size;
214 attr->recv_cq = ib_create_cq(dev, recv_cq_handler,
215 rds_iw_cq_event_handler,
216 context, &cq_attr);
217 if (IS_ERR(attr->recv_cq)) {
218 ret = PTR_ERR(attr->recv_cq);
219 attr->recv_cq = NULL;
220 rdsdebug("ib_create_cq send failed: %d\n", ret);
221 goto out;
222 }
223
224 ret = ib_req_notify_cq(attr->send_cq, IB_CQ_NEXT_COMP);
225 if (ret) {
226 rdsdebug("ib_req_notify_cq send failed: %d\n", ret);
227 goto out;
228 }
229
230 ret = ib_req_notify_cq(attr->recv_cq, IB_CQ_SOLICITED);
231 if (ret) {
232 rdsdebug("ib_req_notify_cq recv failed: %d\n", ret);
233 goto out;
234 }
235
236out:
237 if (ret) {
238 if (attr->send_cq)
239 ib_destroy_cq(attr->send_cq);
240 if (attr->recv_cq)
241 ib_destroy_cq(attr->recv_cq);
242 }
243 return ret;
244}
245
246/*
247 * This needs to be very careful to not leave IS_ERR pointers around for
248 * cleanup to trip over.
249 */
250static int rds_iw_setup_qp(struct rds_connection *conn)
251{
252 struct rds_iw_connection *ic = conn->c_transport_data;
253 struct ib_device *dev = ic->i_cm_id->device;
254 struct ib_qp_init_attr attr;
255 struct rds_iw_device *rds_iwdev;
256 int ret;
257
258 /* rds_iw_add_one creates a rds_iw_device object per IB device,
259 * and allocates a protection domain, memory range and MR pool
260 * for each. If that fails for any reason, it will not register
261 * the rds_iwdev at all.
262 */
263 rds_iwdev = ib_get_client_data(dev, &rds_iw_client);
264 if (!rds_iwdev) {
265 printk_ratelimited(KERN_NOTICE "RDS/IW: No client_data for device %s\n",
266 dev->name);
267 return -EOPNOTSUPP;
268 }
269
270 /* Protection domain and memory range */
271 ic->i_pd = rds_iwdev->pd;
272 ic->i_mr = rds_iwdev->mr;
273
274 ret = rds_iw_init_qp_attrs(&attr, rds_iwdev,
275 &ic->i_send_ring, rds_iw_send_cq_comp_handler,
276 &ic->i_recv_ring, rds_iw_recv_cq_comp_handler,
277 conn);
278 if (ret < 0)
279 goto out;
280
281 ic->i_send_cq = attr.send_cq;
282 ic->i_recv_cq = attr.recv_cq;
283
284 /*
285 * XXX this can fail if max_*_wr is too large? Are we supposed
286 * to back off until we get a value that the hardware can support?
287 */
288 ret = rdma_create_qp(ic->i_cm_id, ic->i_pd, &attr);
289 if (ret) {
290 rdsdebug("rdma_create_qp failed: %d\n", ret);
291 goto out;
292 }
293
294 ic->i_send_hdrs = ib_dma_alloc_coherent(dev,
295 ic->i_send_ring.w_nr *
296 sizeof(struct rds_header),
297 &ic->i_send_hdrs_dma, GFP_KERNEL);
298 if (!ic->i_send_hdrs) {
299 ret = -ENOMEM;
300 rdsdebug("ib_dma_alloc_coherent send failed\n");
301 goto out;
302 }
303
304 ic->i_recv_hdrs = ib_dma_alloc_coherent(dev,
305 ic->i_recv_ring.w_nr *
306 sizeof(struct rds_header),
307 &ic->i_recv_hdrs_dma, GFP_KERNEL);
308 if (!ic->i_recv_hdrs) {
309 ret = -ENOMEM;
310 rdsdebug("ib_dma_alloc_coherent recv failed\n");
311 goto out;
312 }
313
314 ic->i_ack = ib_dma_alloc_coherent(dev, sizeof(struct rds_header),
315 &ic->i_ack_dma, GFP_KERNEL);
316 if (!ic->i_ack) {
317 ret = -ENOMEM;
318 rdsdebug("ib_dma_alloc_coherent ack failed\n");
319 goto out;
320 }
321
322 ic->i_sends = vmalloc(ic->i_send_ring.w_nr * sizeof(struct rds_iw_send_work));
323 if (!ic->i_sends) {
324 ret = -ENOMEM;
325 rdsdebug("send allocation failed\n");
326 goto out;
327 }
328 rds_iw_send_init_ring(ic);
329
330 ic->i_recvs = vmalloc(ic->i_recv_ring.w_nr * sizeof(struct rds_iw_recv_work));
331 if (!ic->i_recvs) {
332 ret = -ENOMEM;
333 rdsdebug("recv allocation failed\n");
334 goto out;
335 }
336
337 rds_iw_recv_init_ring(ic);
338 rds_iw_recv_init_ack(ic);
339
340 /* Post receive buffers - as a side effect, this will update
341 * the posted credit count. */
342 rds_iw_recv_refill(conn, GFP_KERNEL, GFP_HIGHUSER, 1);
343
344 rdsdebug("conn %p pd %p mr %p cq %p %p\n", conn, ic->i_pd, ic->i_mr,
345 ic->i_send_cq, ic->i_recv_cq);
346
347out:
348 return ret;
349}
350
351static u32 rds_iw_protocol_compatible(const struct rds_iw_connect_private *dp)
352{
353 u16 common;
354 u32 version = 0;
355
356 /* rdma_cm private data is odd - when there is any private data in the
357 * request, we will be given a pretty large buffer without telling us the
358 * original size. The only way to tell the difference is by looking at
359 * the contents, which are initialized to zero.
360 * If the protocol version fields aren't set, this is a connection attempt
361 * from an older version. This could could be 3.0 or 2.0 - we can't tell.
362 * We really should have changed this for OFED 1.3 :-( */
363 if (dp->dp_protocol_major == 0)
364 return RDS_PROTOCOL_3_0;
365
366 common = be16_to_cpu(dp->dp_protocol_minor_mask) & RDS_IW_SUPPORTED_PROTOCOLS;
367 if (dp->dp_protocol_major == 3 && common) {
368 version = RDS_PROTOCOL_3_0;
369 while ((common >>= 1) != 0)
370 version++;
371 }
372 printk_ratelimited(KERN_NOTICE "RDS: Connection from %pI4 using "
373 "incompatible protocol version %u.%u\n",
374 &dp->dp_saddr,
375 dp->dp_protocol_major,
376 dp->dp_protocol_minor);
377 return version;
378}
379
380int rds_iw_cm_handle_connect(struct rdma_cm_id *cm_id,
381 struct rdma_cm_event *event)
382{
383 const struct rds_iw_connect_private *dp = event->param.conn.private_data;
384 struct rds_iw_connect_private dp_rep;
385 struct rds_connection *conn = NULL;
386 struct rds_iw_connection *ic = NULL;
387 struct rdma_conn_param conn_param;
388 struct rds_iw_device *rds_iwdev;
389 u32 version;
390 int err, destroy = 1;
391
392 /* Check whether the remote protocol version matches ours. */
393 version = rds_iw_protocol_compatible(dp);
394 if (!version)
395 goto out;
396
397 rdsdebug("saddr %pI4 daddr %pI4 RDSv%u.%u\n",
398 &dp->dp_saddr, &dp->dp_daddr,
399 RDS_PROTOCOL_MAJOR(version), RDS_PROTOCOL_MINOR(version));
400
401 /* RDS/IW is not currently netns aware, thus init_net */
402 conn = rds_conn_create(&init_net, dp->dp_daddr, dp->dp_saddr,
403 &rds_iw_transport, GFP_KERNEL);
404 if (IS_ERR(conn)) {
405 rdsdebug("rds_conn_create failed (%ld)\n", PTR_ERR(conn));
406 conn = NULL;
407 goto out;
408 }
409
410 /*
411 * The connection request may occur while the
412 * previous connection exist, e.g. in case of failover.
413 * But as connections may be initiated simultaneously
414 * by both hosts, we have a random backoff mechanism -
415 * see the comment above rds_queue_reconnect()
416 */
417 mutex_lock(&conn->c_cm_lock);
418 if (!rds_conn_transition(conn, RDS_CONN_DOWN, RDS_CONN_CONNECTING)) {
419 if (rds_conn_state(conn) == RDS_CONN_UP) {
420 rdsdebug("incoming connect while connecting\n");
421 rds_conn_drop(conn);
422 rds_iw_stats_inc(s_iw_listen_closed_stale);
423 } else
424 if (rds_conn_state(conn) == RDS_CONN_CONNECTING) {
425 /* Wait and see - our connect may still be succeeding */
426 rds_iw_stats_inc(s_iw_connect_raced);
427 }
428 mutex_unlock(&conn->c_cm_lock);
429 goto out;
430 }
431
432 ic = conn->c_transport_data;
433
434 rds_iw_set_protocol(conn, version);
435 rds_iw_set_flow_control(conn, be32_to_cpu(dp->dp_credit));
436
437 /* If the peer gave us the last packet it saw, process this as if
438 * we had received a regular ACK. */
439 if (dp->dp_ack_seq)
440 rds_send_drop_acked(conn, be64_to_cpu(dp->dp_ack_seq), NULL);
441
442 BUG_ON(cm_id->context);
443 BUG_ON(ic->i_cm_id);
444
445 ic->i_cm_id = cm_id;
446 cm_id->context = conn;
447
448 rds_iwdev = ib_get_client_data(cm_id->device, &rds_iw_client);
449 ic->i_dma_local_lkey = rds_iwdev->dma_local_lkey;
450
451 /* We got halfway through setting up the ib_connection, if we
452 * fail now, we have to take the long route out of this mess. */
453 destroy = 0;
454
455 err = rds_iw_setup_qp(conn);
456 if (err) {
457 rds_iw_conn_error(conn, "rds_iw_setup_qp failed (%d)\n", err);
458 mutex_unlock(&conn->c_cm_lock);
459 goto out;
460 }
461
462 rds_iw_cm_fill_conn_param(conn, &conn_param, &dp_rep, version);
463
464 /* rdma_accept() calls rdma_reject() internally if it fails */
465 err = rdma_accept(cm_id, &conn_param);
466 mutex_unlock(&conn->c_cm_lock);
467 if (err) {
468 rds_iw_conn_error(conn, "rdma_accept failed (%d)\n", err);
469 goto out;
470 }
471
472 return 0;
473
474out:
475 rdma_reject(cm_id, NULL, 0);
476 return destroy;
477}
478
479
480int rds_iw_cm_initiate_connect(struct rdma_cm_id *cm_id)
481{
482 struct rds_connection *conn = cm_id->context;
483 struct rds_iw_connection *ic = conn->c_transport_data;
484 struct rdma_conn_param conn_param;
485 struct rds_iw_connect_private dp;
486 int ret;
487
488 /* If the peer doesn't do protocol negotiation, we must
489 * default to RDSv3.0 */
490 rds_iw_set_protocol(conn, RDS_PROTOCOL_3_0);
491 ic->i_flowctl = rds_iw_sysctl_flow_control; /* advertise flow control */
492
493 ret = rds_iw_setup_qp(conn);
494 if (ret) {
495 rds_iw_conn_error(conn, "rds_iw_setup_qp failed (%d)\n", ret);
496 goto out;
497 }
498
499 rds_iw_cm_fill_conn_param(conn, &conn_param, &dp, RDS_PROTOCOL_VERSION);
500
501 ret = rdma_connect(cm_id, &conn_param);
502 if (ret)
503 rds_iw_conn_error(conn, "rdma_connect failed (%d)\n", ret);
504
505out:
506 /* Beware - returning non-zero tells the rdma_cm to destroy
507 * the cm_id. We should certainly not do it as long as we still
508 * "own" the cm_id. */
509 if (ret) {
510 struct rds_iw_connection *ic = conn->c_transport_data;
511
512 if (ic->i_cm_id == cm_id)
513 ret = 0;
514 }
515 return ret;
516}
517
518int rds_iw_conn_connect(struct rds_connection *conn)
519{
520 struct rds_iw_connection *ic = conn->c_transport_data;
521 struct rds_iw_device *rds_iwdev;
522 struct sockaddr_in src, dest;
523 int ret;
524
525 /* XXX I wonder what affect the port space has */
526 /* delegate cm event handler to rdma_transport */
527 ic->i_cm_id = rdma_create_id(&init_net, rds_rdma_cm_event_handler, conn,
528 RDMA_PS_TCP, IB_QPT_RC);
529 if (IS_ERR(ic->i_cm_id)) {
530 ret = PTR_ERR(ic->i_cm_id);
531 ic->i_cm_id = NULL;
532 rdsdebug("rdma_create_id() failed: %d\n", ret);
533 goto out;
534 }
535
536 rdsdebug("created cm id %p for conn %p\n", ic->i_cm_id, conn);
537
538 src.sin_family = AF_INET;
539 src.sin_addr.s_addr = (__force u32)conn->c_laddr;
540 src.sin_port = (__force u16)htons(0);
541
542 /* First, bind to the local address and device. */
543 ret = rdma_bind_addr(ic->i_cm_id, (struct sockaddr *) &src);
544 if (ret) {
545 rdsdebug("rdma_bind_addr(%pI4) failed: %d\n",
546 &conn->c_laddr, ret);
547 rdma_destroy_id(ic->i_cm_id);
548 ic->i_cm_id = NULL;
549 goto out;
550 }
551
552 rds_iwdev = ib_get_client_data(ic->i_cm_id->device, &rds_iw_client);
553 ic->i_dma_local_lkey = rds_iwdev->dma_local_lkey;
554
555 dest.sin_family = AF_INET;
556 dest.sin_addr.s_addr = (__force u32)conn->c_faddr;
557 dest.sin_port = (__force u16)htons(RDS_PORT);
558
559 ret = rdma_resolve_addr(ic->i_cm_id, (struct sockaddr *)&src,
560 (struct sockaddr *)&dest,
561 RDS_RDMA_RESOLVE_TIMEOUT_MS);
562 if (ret) {
563 rdsdebug("addr resolve failed for cm id %p: %d\n", ic->i_cm_id,
564 ret);
565 rdma_destroy_id(ic->i_cm_id);
566 ic->i_cm_id = NULL;
567 }
568
569out:
570 return ret;
571}
572
573/*
574 * This is so careful about only cleaning up resources that were built up
575 * so that it can be called at any point during startup. In fact it
576 * can be called multiple times for a given connection.
577 */
578void rds_iw_conn_shutdown(struct rds_connection *conn)
579{
580 struct rds_iw_connection *ic = conn->c_transport_data;
581 int err = 0;
582 struct ib_qp_attr qp_attr;
583
584 rdsdebug("cm %p pd %p cq %p %p qp %p\n", ic->i_cm_id,
585 ic->i_pd, ic->i_send_cq, ic->i_recv_cq,
586 ic->i_cm_id ? ic->i_cm_id->qp : NULL);
587
588 if (ic->i_cm_id) {
589 struct ib_device *dev = ic->i_cm_id->device;
590
591 rdsdebug("disconnecting cm %p\n", ic->i_cm_id);
592 err = rdma_disconnect(ic->i_cm_id);
593 if (err) {
594 /* Actually this may happen quite frequently, when
595 * an outgoing connect raced with an incoming connect.
596 */
597 rdsdebug("failed to disconnect, cm: %p err %d\n",
598 ic->i_cm_id, err);
599 }
600
601 if (ic->i_cm_id->qp) {
602 qp_attr.qp_state = IB_QPS_ERR;
603 ib_modify_qp(ic->i_cm_id->qp, &qp_attr, IB_QP_STATE);
604 }
605
606 wait_event(rds_iw_ring_empty_wait,
607 rds_iw_ring_empty(&ic->i_send_ring) &&
608 rds_iw_ring_empty(&ic->i_recv_ring));
609
610 if (ic->i_send_hdrs)
611 ib_dma_free_coherent(dev,
612 ic->i_send_ring.w_nr *
613 sizeof(struct rds_header),
614 ic->i_send_hdrs,
615 ic->i_send_hdrs_dma);
616
617 if (ic->i_recv_hdrs)
618 ib_dma_free_coherent(dev,
619 ic->i_recv_ring.w_nr *
620 sizeof(struct rds_header),
621 ic->i_recv_hdrs,
622 ic->i_recv_hdrs_dma);
623
624 if (ic->i_ack)
625 ib_dma_free_coherent(dev, sizeof(struct rds_header),
626 ic->i_ack, ic->i_ack_dma);
627
628 if (ic->i_sends)
629 rds_iw_send_clear_ring(ic);
630 if (ic->i_recvs)
631 rds_iw_recv_clear_ring(ic);
632
633 if (ic->i_cm_id->qp)
634 rdma_destroy_qp(ic->i_cm_id);
635 if (ic->i_send_cq)
636 ib_destroy_cq(ic->i_send_cq);
637 if (ic->i_recv_cq)
638 ib_destroy_cq(ic->i_recv_cq);
639
640 /*
641 * If associated with an rds_iw_device:
642 * Move connection back to the nodev list.
643 * Remove cm_id from the device cm_id list.
644 */
645 if (ic->rds_iwdev)
646 rds_iw_remove_conn(ic->rds_iwdev, conn);
647
648 rdma_destroy_id(ic->i_cm_id);
649
650 ic->i_cm_id = NULL;
651 ic->i_pd = NULL;
652 ic->i_mr = NULL;
653 ic->i_send_cq = NULL;
654 ic->i_recv_cq = NULL;
655 ic->i_send_hdrs = NULL;
656 ic->i_recv_hdrs = NULL;
657 ic->i_ack = NULL;
658 }
659 BUG_ON(ic->rds_iwdev);
660
661 /* Clear pending transmit */
662 if (ic->i_rm) {
663 rds_message_put(ic->i_rm);
664 ic->i_rm = NULL;
665 }
666
667 /* Clear the ACK state */
668 clear_bit(IB_ACK_IN_FLIGHT, &ic->i_ack_flags);
669#ifdef KERNEL_HAS_ATOMIC64
670 atomic64_set(&ic->i_ack_next, 0);
671#else
672 ic->i_ack_next = 0;
673#endif
674 ic->i_ack_recv = 0;
675
676 /* Clear flow control state */
677 ic->i_flowctl = 0;
678 atomic_set(&ic->i_credits, 0);
679
680 rds_iw_ring_init(&ic->i_send_ring, rds_iw_sysctl_max_send_wr);
681 rds_iw_ring_init(&ic->i_recv_ring, rds_iw_sysctl_max_recv_wr);
682
683 if (ic->i_iwinc) {
684 rds_inc_put(&ic->i_iwinc->ii_inc);
685 ic->i_iwinc = NULL;
686 }
687
688 vfree(ic->i_sends);
689 ic->i_sends = NULL;
690 vfree(ic->i_recvs);
691 ic->i_recvs = NULL;
692 rdsdebug("shutdown complete\n");
693}
694
695int rds_iw_conn_alloc(struct rds_connection *conn, gfp_t gfp)
696{
697 struct rds_iw_connection *ic;
698 unsigned long flags;
699
700 /* XXX too lazy? */
701 ic = kzalloc(sizeof(struct rds_iw_connection), gfp);
702 if (!ic)
703 return -ENOMEM;
704
705 INIT_LIST_HEAD(&ic->iw_node);
706 tasklet_init(&ic->i_recv_tasklet, rds_iw_recv_tasklet_fn,
707 (unsigned long) ic);
708 mutex_init(&ic->i_recv_mutex);
709#ifndef KERNEL_HAS_ATOMIC64
710 spin_lock_init(&ic->i_ack_lock);
711#endif
712
713 /*
714 * rds_iw_conn_shutdown() waits for these to be emptied so they
715 * must be initialized before it can be called.
716 */
717 rds_iw_ring_init(&ic->i_send_ring, rds_iw_sysctl_max_send_wr);
718 rds_iw_ring_init(&ic->i_recv_ring, rds_iw_sysctl_max_recv_wr);
719
720 ic->conn = conn;
721 conn->c_transport_data = ic;
722
723 spin_lock_irqsave(&iw_nodev_conns_lock, flags);
724 list_add_tail(&ic->iw_node, &iw_nodev_conns);
725 spin_unlock_irqrestore(&iw_nodev_conns_lock, flags);
726
727
728 rdsdebug("conn %p conn ic %p\n", conn, conn->c_transport_data);
729 return 0;
730}
731
732/*
733 * Free a connection. Connection must be shut down and not set for reconnect.
734 */
735void rds_iw_conn_free(void *arg)
736{
737 struct rds_iw_connection *ic = arg;
738 spinlock_t *lock_ptr;
739
740 rdsdebug("ic %p\n", ic);
741
742 /*
743 * Conn is either on a dev's list or on the nodev list.
744 * A race with shutdown() or connect() would cause problems
745 * (since rds_iwdev would change) but that should never happen.
746 */
747 lock_ptr = ic->rds_iwdev ? &ic->rds_iwdev->spinlock : &iw_nodev_conns_lock;
748
749 spin_lock_irq(lock_ptr);
750 list_del(&ic->iw_node);
751 spin_unlock_irq(lock_ptr);
752
753 kfree(ic);
754}
755
756/*
757 * An error occurred on the connection
758 */
759void
760__rds_iw_conn_error(struct rds_connection *conn, const char *fmt, ...)
761{
762 va_list ap;
763
764 rds_conn_drop(conn);
765
766 va_start(ap, fmt);
767 vprintk(fmt, ap);
768 va_end(ap);
769}
diff --git a/net/rds/iw_rdma.c b/net/rds/iw_rdma.c
deleted file mode 100644
index b09a40c1adce..000000000000
--- a/net/rds/iw_rdma.c
+++ /dev/null
@@ -1,837 +0,0 @@
1/*
2 * Copyright (c) 2006 Oracle. All rights reserved.
3 *
4 * This software is available to you under a choice of one of two
5 * licenses. You may choose to be licensed under the terms of the GNU
6 * General Public License (GPL) Version 2, available from the file
7 * COPYING in the main directory of this source tree, or the
8 * OpenIB.org BSD license below:
9 *
10 * Redistribution and use in source and binary forms, with or
11 * without modification, are permitted provided that the following
12 * conditions are met:
13 *
14 * - Redistributions of source code must retain the above
15 * copyright notice, this list of conditions and the following
16 * disclaimer.
17 *
18 * - Redistributions in binary form must reproduce the above
19 * copyright notice, this list of conditions and the following
20 * disclaimer in the documentation and/or other materials
21 * provided with the distribution.
22 *
23 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
24 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
25 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
26 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
27 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
28 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
29 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
30 * SOFTWARE.
31 *
32 */
33#include <linux/kernel.h>
34#include <linux/slab.h>
35#include <linux/ratelimit.h>
36
37#include "rds.h"
38#include "iw.h"
39
40
41/*
42 * This is stored as mr->r_trans_private.
43 */
44struct rds_iw_mr {
45 struct rds_iw_device *device;
46 struct rds_iw_mr_pool *pool;
47 struct rdma_cm_id *cm_id;
48
49 struct ib_mr *mr;
50
51 struct rds_iw_mapping mapping;
52 unsigned char remap_count;
53};
54
55/*
56 * Our own little MR pool
57 */
58struct rds_iw_mr_pool {
59 struct rds_iw_device *device; /* back ptr to the device that owns us */
60
61 struct mutex flush_lock; /* serialize fmr invalidate */
62 struct work_struct flush_worker; /* flush worker */
63
64 spinlock_t list_lock; /* protect variables below */
65 atomic_t item_count; /* total # of MRs */
66 atomic_t dirty_count; /* # dirty of MRs */
67 struct list_head dirty_list; /* dirty mappings */
68 struct list_head clean_list; /* unused & unamapped MRs */
69 atomic_t free_pinned; /* memory pinned by free MRs */
70 unsigned long max_message_size; /* in pages */
71 unsigned long max_items;
72 unsigned long max_items_soft;
73 unsigned long max_free_pinned;
74 int max_pages;
75};
76
77static void rds_iw_flush_mr_pool(struct rds_iw_mr_pool *pool, int free_all);
78static void rds_iw_mr_pool_flush_worker(struct work_struct *work);
79static int rds_iw_init_reg(struct rds_iw_mr_pool *pool, struct rds_iw_mr *ibmr);
80static int rds_iw_map_reg(struct rds_iw_mr_pool *pool,
81 struct rds_iw_mr *ibmr,
82 struct scatterlist *sg, unsigned int nents);
83static void rds_iw_free_fastreg(struct rds_iw_mr_pool *pool, struct rds_iw_mr *ibmr);
84static unsigned int rds_iw_unmap_fastreg_list(struct rds_iw_mr_pool *pool,
85 struct list_head *unmap_list,
86 struct list_head *kill_list,
87 int *unpinned);
88static void rds_iw_destroy_fastreg(struct rds_iw_mr_pool *pool, struct rds_iw_mr *ibmr);
89
90static int rds_iw_get_device(struct sockaddr_in *src, struct sockaddr_in *dst,
91 struct rds_iw_device **rds_iwdev,
92 struct rdma_cm_id **cm_id)
93{
94 struct rds_iw_device *iwdev;
95 struct rds_iw_cm_id *i_cm_id;
96
97 *rds_iwdev = NULL;
98 *cm_id = NULL;
99
100 list_for_each_entry(iwdev, &rds_iw_devices, list) {
101 spin_lock_irq(&iwdev->spinlock);
102 list_for_each_entry(i_cm_id, &iwdev->cm_id_list, list) {
103 struct sockaddr_in *src_addr, *dst_addr;
104
105 src_addr = (struct sockaddr_in *)&i_cm_id->cm_id->route.addr.src_addr;
106 dst_addr = (struct sockaddr_in *)&i_cm_id->cm_id->route.addr.dst_addr;
107
108 rdsdebug("local ipaddr = %x port %d, "
109 "remote ipaddr = %x port %d"
110 "..looking for %x port %d, "
111 "remote ipaddr = %x port %d\n",
112 src_addr->sin_addr.s_addr,
113 src_addr->sin_port,
114 dst_addr->sin_addr.s_addr,
115 dst_addr->sin_port,
116 src->sin_addr.s_addr,
117 src->sin_port,
118 dst->sin_addr.s_addr,
119 dst->sin_port);
120#ifdef WORKING_TUPLE_DETECTION
121 if (src_addr->sin_addr.s_addr == src->sin_addr.s_addr &&
122 src_addr->sin_port == src->sin_port &&
123 dst_addr->sin_addr.s_addr == dst->sin_addr.s_addr &&
124 dst_addr->sin_port == dst->sin_port) {
125#else
126 /* FIXME - needs to compare the local and remote
127 * ipaddr/port tuple, but the ipaddr is the only
128 * available information in the rds_sock (as the rest are
129 * zero'ed. It doesn't appear to be properly populated
130 * during connection setup...
131 */
132 if (src_addr->sin_addr.s_addr == src->sin_addr.s_addr) {
133#endif
134 spin_unlock_irq(&iwdev->spinlock);
135 *rds_iwdev = iwdev;
136 *cm_id = i_cm_id->cm_id;
137 return 0;
138 }
139 }
140 spin_unlock_irq(&iwdev->spinlock);
141 }
142
143 return 1;
144}
145
146static int rds_iw_add_cm_id(struct rds_iw_device *rds_iwdev, struct rdma_cm_id *cm_id)
147{
148 struct rds_iw_cm_id *i_cm_id;
149
150 i_cm_id = kmalloc(sizeof *i_cm_id, GFP_KERNEL);
151 if (!i_cm_id)
152 return -ENOMEM;
153
154 i_cm_id->cm_id = cm_id;
155
156 spin_lock_irq(&rds_iwdev->spinlock);
157 list_add_tail(&i_cm_id->list, &rds_iwdev->cm_id_list);
158 spin_unlock_irq(&rds_iwdev->spinlock);
159
160 return 0;
161}
162
163static void rds_iw_remove_cm_id(struct rds_iw_device *rds_iwdev,
164 struct rdma_cm_id *cm_id)
165{
166 struct rds_iw_cm_id *i_cm_id;
167
168 spin_lock_irq(&rds_iwdev->spinlock);
169 list_for_each_entry(i_cm_id, &rds_iwdev->cm_id_list, list) {
170 if (i_cm_id->cm_id == cm_id) {
171 list_del(&i_cm_id->list);
172 kfree(i_cm_id);
173 break;
174 }
175 }
176 spin_unlock_irq(&rds_iwdev->spinlock);
177}
178
179
180int rds_iw_update_cm_id(struct rds_iw_device *rds_iwdev, struct rdma_cm_id *cm_id)
181{
182 struct sockaddr_in *src_addr, *dst_addr;
183 struct rds_iw_device *rds_iwdev_old;
184 struct rdma_cm_id *pcm_id;
185 int rc;
186
187 src_addr = (struct sockaddr_in *)&cm_id->route.addr.src_addr;
188 dst_addr = (struct sockaddr_in *)&cm_id->route.addr.dst_addr;
189
190 rc = rds_iw_get_device(src_addr, dst_addr, &rds_iwdev_old, &pcm_id);
191 if (rc)
192 rds_iw_remove_cm_id(rds_iwdev, cm_id);
193
194 return rds_iw_add_cm_id(rds_iwdev, cm_id);
195}
196
197void rds_iw_add_conn(struct rds_iw_device *rds_iwdev, struct rds_connection *conn)
198{
199 struct rds_iw_connection *ic = conn->c_transport_data;
200
201 /* conn was previously on the nodev_conns_list */
202 spin_lock_irq(&iw_nodev_conns_lock);
203 BUG_ON(list_empty(&iw_nodev_conns));
204 BUG_ON(list_empty(&ic->iw_node));
205 list_del(&ic->iw_node);
206
207 spin_lock(&rds_iwdev->spinlock);
208 list_add_tail(&ic->iw_node, &rds_iwdev->conn_list);
209 spin_unlock(&rds_iwdev->spinlock);
210 spin_unlock_irq(&iw_nodev_conns_lock);
211
212 ic->rds_iwdev = rds_iwdev;
213}
214
215void rds_iw_remove_conn(struct rds_iw_device *rds_iwdev, struct rds_connection *conn)
216{
217 struct rds_iw_connection *ic = conn->c_transport_data;
218
219 /* place conn on nodev_conns_list */
220 spin_lock(&iw_nodev_conns_lock);
221
222 spin_lock_irq(&rds_iwdev->spinlock);
223 BUG_ON(list_empty(&ic->iw_node));
224 list_del(&ic->iw_node);
225 spin_unlock_irq(&rds_iwdev->spinlock);
226
227 list_add_tail(&ic->iw_node, &iw_nodev_conns);
228
229 spin_unlock(&iw_nodev_conns_lock);
230
231 rds_iw_remove_cm_id(ic->rds_iwdev, ic->i_cm_id);
232 ic->rds_iwdev = NULL;
233}
234
235void __rds_iw_destroy_conns(struct list_head *list, spinlock_t *list_lock)
236{
237 struct rds_iw_connection *ic, *_ic;
238 LIST_HEAD(tmp_list);
239
240 /* avoid calling conn_destroy with irqs off */
241 spin_lock_irq(list_lock);
242 list_splice(list, &tmp_list);
243 INIT_LIST_HEAD(list);
244 spin_unlock_irq(list_lock);
245
246 list_for_each_entry_safe(ic, _ic, &tmp_list, iw_node)
247 rds_conn_destroy(ic->conn);
248}
249
250static void rds_iw_set_scatterlist(struct rds_iw_scatterlist *sg,
251 struct scatterlist *list, unsigned int sg_len)
252{
253 sg->list = list;
254 sg->len = sg_len;
255 sg->dma_len = 0;
256 sg->dma_npages = 0;
257 sg->bytes = 0;
258}
259
260static int rds_iw_map_scatterlist(struct rds_iw_device *rds_iwdev,
261 struct rds_iw_scatterlist *sg)
262{
263 struct ib_device *dev = rds_iwdev->dev;
264 int i, ret;
265
266 WARN_ON(sg->dma_len);
267
268 sg->dma_len = ib_dma_map_sg(dev, sg->list, sg->len, DMA_BIDIRECTIONAL);
269 if (unlikely(!sg->dma_len)) {
270 printk(KERN_WARNING "RDS/IW: dma_map_sg failed!\n");
271 return -EBUSY;
272 }
273
274 sg->bytes = 0;
275 sg->dma_npages = 0;
276
277 ret = -EINVAL;
278 for (i = 0; i < sg->dma_len; ++i) {
279 unsigned int dma_len = ib_sg_dma_len(dev, &sg->list[i]);
280 u64 dma_addr = ib_sg_dma_address(dev, &sg->list[i]);
281 u64 end_addr;
282
283 sg->bytes += dma_len;
284
285 end_addr = dma_addr + dma_len;
286 if (dma_addr & PAGE_MASK) {
287 if (i > 0)
288 goto out_unmap;
289 dma_addr &= ~PAGE_MASK;
290 }
291 if (end_addr & PAGE_MASK) {
292 if (i < sg->dma_len - 1)
293 goto out_unmap;
294 end_addr = (end_addr + PAGE_MASK) & ~PAGE_MASK;
295 }
296
297 sg->dma_npages += (end_addr - dma_addr) >> PAGE_SHIFT;
298 }
299
300 /* Now gather the dma addrs into one list */
301 if (sg->dma_npages > fastreg_message_size)
302 goto out_unmap;
303
304
305
306 return 0;
307
308out_unmap:
309 ib_dma_unmap_sg(rds_iwdev->dev, sg->list, sg->len, DMA_BIDIRECTIONAL);
310 sg->dma_len = 0;
311 return ret;
312}
313
314
315struct rds_iw_mr_pool *rds_iw_create_mr_pool(struct rds_iw_device *rds_iwdev)
316{
317 struct rds_iw_mr_pool *pool;
318
319 pool = kzalloc(sizeof(*pool), GFP_KERNEL);
320 if (!pool) {
321 printk(KERN_WARNING "RDS/IW: rds_iw_create_mr_pool alloc error\n");
322 return ERR_PTR(-ENOMEM);
323 }
324
325 pool->device = rds_iwdev;
326 INIT_LIST_HEAD(&pool->dirty_list);
327 INIT_LIST_HEAD(&pool->clean_list);
328 mutex_init(&pool->flush_lock);
329 spin_lock_init(&pool->list_lock);
330 INIT_WORK(&pool->flush_worker, rds_iw_mr_pool_flush_worker);
331
332 pool->max_message_size = fastreg_message_size;
333 pool->max_items = fastreg_pool_size;
334 pool->max_free_pinned = pool->max_items * pool->max_message_size / 4;
335 pool->max_pages = fastreg_message_size;
336
337 /* We never allow more than max_items MRs to be allocated.
338 * When we exceed more than max_items_soft, we start freeing
339 * items more aggressively.
340 * Make sure that max_items > max_items_soft > max_items / 2
341 */
342 pool->max_items_soft = pool->max_items * 3 / 4;
343
344 return pool;
345}
346
347void rds_iw_get_mr_info(struct rds_iw_device *rds_iwdev, struct rds_info_rdma_connection *iinfo)
348{
349 struct rds_iw_mr_pool *pool = rds_iwdev->mr_pool;
350
351 iinfo->rdma_mr_max = pool->max_items;
352 iinfo->rdma_mr_size = pool->max_pages;
353}
354
355void rds_iw_destroy_mr_pool(struct rds_iw_mr_pool *pool)
356{
357 flush_workqueue(rds_wq);
358 rds_iw_flush_mr_pool(pool, 1);
359 BUG_ON(atomic_read(&pool->item_count));
360 BUG_ON(atomic_read(&pool->free_pinned));
361 kfree(pool);
362}
363
364static inline struct rds_iw_mr *rds_iw_reuse_fmr(struct rds_iw_mr_pool *pool)
365{
366 struct rds_iw_mr *ibmr = NULL;
367 unsigned long flags;
368
369 spin_lock_irqsave(&pool->list_lock, flags);
370 if (!list_empty(&pool->clean_list)) {
371 ibmr = list_entry(pool->clean_list.next, struct rds_iw_mr, mapping.m_list);
372 list_del_init(&ibmr->mapping.m_list);
373 }
374 spin_unlock_irqrestore(&pool->list_lock, flags);
375
376 return ibmr;
377}
378
379static struct rds_iw_mr *rds_iw_alloc_mr(struct rds_iw_device *rds_iwdev)
380{
381 struct rds_iw_mr_pool *pool = rds_iwdev->mr_pool;
382 struct rds_iw_mr *ibmr = NULL;
383 int err = 0, iter = 0;
384
385 while (1) {
386 ibmr = rds_iw_reuse_fmr(pool);
387 if (ibmr)
388 return ibmr;
389
390 /* No clean MRs - now we have the choice of either
391 * allocating a fresh MR up to the limit imposed by the
392 * driver, or flush any dirty unused MRs.
393 * We try to avoid stalling in the send path if possible,
394 * so we allocate as long as we're allowed to.
395 *
396 * We're fussy with enforcing the FMR limit, though. If the driver
397 * tells us we can't use more than N fmrs, we shouldn't start
398 * arguing with it */
399 if (atomic_inc_return(&pool->item_count) <= pool->max_items)
400 break;
401
402 atomic_dec(&pool->item_count);
403
404 if (++iter > 2) {
405 rds_iw_stats_inc(s_iw_rdma_mr_pool_depleted);
406 return ERR_PTR(-EAGAIN);
407 }
408
409 /* We do have some empty MRs. Flush them out. */
410 rds_iw_stats_inc(s_iw_rdma_mr_pool_wait);
411 rds_iw_flush_mr_pool(pool, 0);
412 }
413
414 ibmr = kzalloc(sizeof(*ibmr), GFP_KERNEL);
415 if (!ibmr) {
416 err = -ENOMEM;
417 goto out_no_cigar;
418 }
419
420 spin_lock_init(&ibmr->mapping.m_lock);
421 INIT_LIST_HEAD(&ibmr->mapping.m_list);
422 ibmr->mapping.m_mr = ibmr;
423
424 err = rds_iw_init_reg(pool, ibmr);
425 if (err)
426 goto out_no_cigar;
427
428 rds_iw_stats_inc(s_iw_rdma_mr_alloc);
429 return ibmr;
430
431out_no_cigar:
432 if (ibmr) {
433 rds_iw_destroy_fastreg(pool, ibmr);
434 kfree(ibmr);
435 }
436 atomic_dec(&pool->item_count);
437 return ERR_PTR(err);
438}
439
440void rds_iw_sync_mr(void *trans_private, int direction)
441{
442 struct rds_iw_mr *ibmr = trans_private;
443 struct rds_iw_device *rds_iwdev = ibmr->device;
444
445 switch (direction) {
446 case DMA_FROM_DEVICE:
447 ib_dma_sync_sg_for_cpu(rds_iwdev->dev, ibmr->mapping.m_sg.list,
448 ibmr->mapping.m_sg.dma_len, DMA_BIDIRECTIONAL);
449 break;
450 case DMA_TO_DEVICE:
451 ib_dma_sync_sg_for_device(rds_iwdev->dev, ibmr->mapping.m_sg.list,
452 ibmr->mapping.m_sg.dma_len, DMA_BIDIRECTIONAL);
453 break;
454 }
455}
456
457/*
458 * Flush our pool of MRs.
459 * At a minimum, all currently unused MRs are unmapped.
460 * If the number of MRs allocated exceeds the limit, we also try
461 * to free as many MRs as needed to get back to this limit.
462 */
463static void rds_iw_flush_mr_pool(struct rds_iw_mr_pool *pool, int free_all)
464{
465 struct rds_iw_mr *ibmr, *next;
466 LIST_HEAD(unmap_list);
467 LIST_HEAD(kill_list);
468 unsigned long flags;
469 unsigned int nfreed = 0, ncleaned = 0, unpinned = 0;
470
471 rds_iw_stats_inc(s_iw_rdma_mr_pool_flush);
472
473 mutex_lock(&pool->flush_lock);
474
475 spin_lock_irqsave(&pool->list_lock, flags);
476 /* Get the list of all mappings to be destroyed */
477 list_splice_init(&pool->dirty_list, &unmap_list);
478 if (free_all)
479 list_splice_init(&pool->clean_list, &kill_list);
480 spin_unlock_irqrestore(&pool->list_lock, flags);
481
482 /* Batched invalidate of dirty MRs.
483 * For FMR based MRs, the mappings on the unmap list are
484 * actually members of an ibmr (ibmr->mapping). They either
485 * migrate to the kill_list, or have been cleaned and should be
486 * moved to the clean_list.
487 * For fastregs, they will be dynamically allocated, and
488 * will be destroyed by the unmap function.
489 */
490 if (!list_empty(&unmap_list)) {
491 ncleaned = rds_iw_unmap_fastreg_list(pool, &unmap_list,
492 &kill_list, &unpinned);
493 /* If we've been asked to destroy all MRs, move those
494 * that were simply cleaned to the kill list */
495 if (free_all)
496 list_splice_init(&unmap_list, &kill_list);
497 }
498
499 /* Destroy any MRs that are past their best before date */
500 list_for_each_entry_safe(ibmr, next, &kill_list, mapping.m_list) {
501 rds_iw_stats_inc(s_iw_rdma_mr_free);
502 list_del(&ibmr->mapping.m_list);
503 rds_iw_destroy_fastreg(pool, ibmr);
504 kfree(ibmr);
505 nfreed++;
506 }
507
508 /* Anything that remains are laundered ibmrs, which we can add
509 * back to the clean list. */
510 if (!list_empty(&unmap_list)) {
511 spin_lock_irqsave(&pool->list_lock, flags);
512 list_splice(&unmap_list, &pool->clean_list);
513 spin_unlock_irqrestore(&pool->list_lock, flags);
514 }
515
516 atomic_sub(unpinned, &pool->free_pinned);
517 atomic_sub(ncleaned, &pool->dirty_count);
518 atomic_sub(nfreed, &pool->item_count);
519
520 mutex_unlock(&pool->flush_lock);
521}
522
523static void rds_iw_mr_pool_flush_worker(struct work_struct *work)
524{
525 struct rds_iw_mr_pool *pool = container_of(work, struct rds_iw_mr_pool, flush_worker);
526
527 rds_iw_flush_mr_pool(pool, 0);
528}
529
530void rds_iw_free_mr(void *trans_private, int invalidate)
531{
532 struct rds_iw_mr *ibmr = trans_private;
533 struct rds_iw_mr_pool *pool = ibmr->device->mr_pool;
534
535 rdsdebug("RDS/IW: free_mr nents %u\n", ibmr->mapping.m_sg.len);
536 if (!pool)
537 return;
538
539 /* Return it to the pool's free list */
540 rds_iw_free_fastreg(pool, ibmr);
541
542 /* If we've pinned too many pages, request a flush */
543 if (atomic_read(&pool->free_pinned) >= pool->max_free_pinned ||
544 atomic_read(&pool->dirty_count) >= pool->max_items / 10)
545 queue_work(rds_wq, &pool->flush_worker);
546
547 if (invalidate) {
548 if (likely(!in_interrupt())) {
549 rds_iw_flush_mr_pool(pool, 0);
550 } else {
551 /* We get here if the user created a MR marked
552 * as use_once and invalidate at the same time. */
553 queue_work(rds_wq, &pool->flush_worker);
554 }
555 }
556}
557
558void rds_iw_flush_mrs(void)
559{
560 struct rds_iw_device *rds_iwdev;
561
562 list_for_each_entry(rds_iwdev, &rds_iw_devices, list) {
563 struct rds_iw_mr_pool *pool = rds_iwdev->mr_pool;
564
565 if (pool)
566 rds_iw_flush_mr_pool(pool, 0);
567 }
568}
569
570void *rds_iw_get_mr(struct scatterlist *sg, unsigned long nents,
571 struct rds_sock *rs, u32 *key_ret)
572{
573 struct rds_iw_device *rds_iwdev;
574 struct rds_iw_mr *ibmr = NULL;
575 struct rdma_cm_id *cm_id;
576 struct sockaddr_in src = {
577 .sin_addr.s_addr = rs->rs_bound_addr,
578 .sin_port = rs->rs_bound_port,
579 };
580 struct sockaddr_in dst = {
581 .sin_addr.s_addr = rs->rs_conn_addr,
582 .sin_port = rs->rs_conn_port,
583 };
584 int ret;
585
586 ret = rds_iw_get_device(&src, &dst, &rds_iwdev, &cm_id);
587 if (ret || !cm_id) {
588 ret = -ENODEV;
589 goto out;
590 }
591
592 if (!rds_iwdev->mr_pool) {
593 ret = -ENODEV;
594 goto out;
595 }
596
597 ibmr = rds_iw_alloc_mr(rds_iwdev);
598 if (IS_ERR(ibmr))
599 return ibmr;
600
601 ibmr->cm_id = cm_id;
602 ibmr->device = rds_iwdev;
603
604 ret = rds_iw_map_reg(rds_iwdev->mr_pool, ibmr, sg, nents);
605 if (ret == 0)
606 *key_ret = ibmr->mr->rkey;
607 else
608 printk(KERN_WARNING "RDS/IW: failed to map mr (errno=%d)\n", ret);
609
610out:
611 if (ret) {
612 if (ibmr)
613 rds_iw_free_mr(ibmr, 0);
614 ibmr = ERR_PTR(ret);
615 }
616 return ibmr;
617}
618
619/*
620 * iWARP reg handling
621 *
622 * The life cycle of a fastreg registration is a bit different from
623 * FMRs.
624 * The idea behind fastreg is to have one MR, to which we bind different
625 * mappings over time. To avoid stalling on the expensive map and invalidate
626 * operations, these operations are pipelined on the same send queue on
627 * which we want to send the message containing the r_key.
628 *
629 * This creates a bit of a problem for us, as we do not have the destination
630 * IP in GET_MR, so the connection must be setup prior to the GET_MR call for
631 * RDMA to be correctly setup. If a fastreg request is present, rds_iw_xmit
632 * will try to queue a LOCAL_INV (if needed) and a REG_MR work request
633 * before queuing the SEND. When completions for these arrive, they are
634 * dispatched to the MR has a bit set showing that RDMa can be performed.
635 *
636 * There is another interesting aspect that's related to invalidation.
637 * The application can request that a mapping is invalidated in FREE_MR.
638 * The expectation there is that this invalidation step includes ALL
639 * PREVIOUSLY FREED MRs.
640 */
641static int rds_iw_init_reg(struct rds_iw_mr_pool *pool,
642 struct rds_iw_mr *ibmr)
643{
644 struct rds_iw_device *rds_iwdev = pool->device;
645 struct ib_mr *mr;
646 int err;
647
648 mr = ib_alloc_mr(rds_iwdev->pd, IB_MR_TYPE_MEM_REG,
649 pool->max_message_size);
650 if (IS_ERR(mr)) {
651 err = PTR_ERR(mr);
652
653 printk(KERN_WARNING "RDS/IW: ib_alloc_mr failed (err=%d)\n", err);
654 return err;
655 }
656
657 ibmr->mr = mr;
658 return 0;
659}
660
661static int rds_iw_rdma_reg_mr(struct rds_iw_mapping *mapping)
662{
663 struct rds_iw_mr *ibmr = mapping->m_mr;
664 struct rds_iw_scatterlist *m_sg = &mapping->m_sg;
665 struct ib_reg_wr reg_wr;
666 struct ib_send_wr *failed_wr;
667 int ret, n;
668
669 n = ib_map_mr_sg_zbva(ibmr->mr, m_sg->list, m_sg->len, PAGE_SIZE);
670 if (unlikely(n != m_sg->len))
671 return n < 0 ? n : -EINVAL;
672
673 reg_wr.wr.next = NULL;
674 reg_wr.wr.opcode = IB_WR_REG_MR;
675 reg_wr.wr.wr_id = RDS_IW_REG_WR_ID;
676 reg_wr.wr.num_sge = 0;
677 reg_wr.mr = ibmr->mr;
678 reg_wr.key = mapping->m_rkey;
679 reg_wr.access = IB_ACCESS_LOCAL_WRITE |
680 IB_ACCESS_REMOTE_READ |
681 IB_ACCESS_REMOTE_WRITE;
682
683 /*
684 * Perform a WR for the reg_mr. Each individual page
685 * in the sg list is added to the fast reg page list and placed
686 * inside the reg_mr WR. The key used is a rolling 8bit
687 * counter, which should guarantee uniqueness.
688 */
689 ib_update_fast_reg_key(ibmr->mr, ibmr->remap_count++);
690 mapping->m_rkey = ibmr->mr->rkey;
691
692 failed_wr = &reg_wr.wr;
693 ret = ib_post_send(ibmr->cm_id->qp, &reg_wr.wr, &failed_wr);
694 BUG_ON(failed_wr != &reg_wr.wr);
695 if (ret)
696 printk_ratelimited(KERN_WARNING "RDS/IW: %s:%d ib_post_send returned %d\n",
697 __func__, __LINE__, ret);
698 return ret;
699}
700
701static int rds_iw_rdma_fastreg_inv(struct rds_iw_mr *ibmr)
702{
703 struct ib_send_wr s_wr, *failed_wr;
704 int ret = 0;
705
706 if (!ibmr->cm_id->qp || !ibmr->mr)
707 goto out;
708
709 memset(&s_wr, 0, sizeof(s_wr));
710 s_wr.wr_id = RDS_IW_LOCAL_INV_WR_ID;
711 s_wr.opcode = IB_WR_LOCAL_INV;
712 s_wr.ex.invalidate_rkey = ibmr->mr->rkey;
713 s_wr.send_flags = IB_SEND_SIGNALED;
714
715 failed_wr = &s_wr;
716 ret = ib_post_send(ibmr->cm_id->qp, &s_wr, &failed_wr);
717 if (ret) {
718 printk_ratelimited(KERN_WARNING "RDS/IW: %s:%d ib_post_send returned %d\n",
719 __func__, __LINE__, ret);
720 goto out;
721 }
722out:
723 return ret;
724}
725
726static int rds_iw_map_reg(struct rds_iw_mr_pool *pool,
727 struct rds_iw_mr *ibmr,
728 struct scatterlist *sg,
729 unsigned int sg_len)
730{
731 struct rds_iw_device *rds_iwdev = pool->device;
732 struct rds_iw_mapping *mapping = &ibmr->mapping;
733 u64 *dma_pages;
734 int ret = 0;
735
736 rds_iw_set_scatterlist(&mapping->m_sg, sg, sg_len);
737
738 ret = rds_iw_map_scatterlist(rds_iwdev, &mapping->m_sg);
739 if (ret) {
740 dma_pages = NULL;
741 goto out;
742 }
743
744 if (mapping->m_sg.dma_len > pool->max_message_size) {
745 ret = -EMSGSIZE;
746 goto out;
747 }
748
749 ret = rds_iw_rdma_reg_mr(mapping);
750 if (ret)
751 goto out;
752
753 rds_iw_stats_inc(s_iw_rdma_mr_used);
754
755out:
756 kfree(dma_pages);
757
758 return ret;
759}
760
761/*
762 * "Free" a fastreg MR.
763 */
764static void rds_iw_free_fastreg(struct rds_iw_mr_pool *pool,
765 struct rds_iw_mr *ibmr)
766{
767 unsigned long flags;
768 int ret;
769
770 if (!ibmr->mapping.m_sg.dma_len)
771 return;
772
773 ret = rds_iw_rdma_fastreg_inv(ibmr);
774 if (ret)
775 return;
776
777 /* Try to post the LOCAL_INV WR to the queue. */
778 spin_lock_irqsave(&pool->list_lock, flags);
779
780 list_add_tail(&ibmr->mapping.m_list, &pool->dirty_list);
781 atomic_add(ibmr->mapping.m_sg.len, &pool->free_pinned);
782 atomic_inc(&pool->dirty_count);
783
784 spin_unlock_irqrestore(&pool->list_lock, flags);
785}
786
787static unsigned int rds_iw_unmap_fastreg_list(struct rds_iw_mr_pool *pool,
788 struct list_head *unmap_list,
789 struct list_head *kill_list,
790 int *unpinned)
791{
792 struct rds_iw_mapping *mapping, *next;
793 unsigned int ncleaned = 0;
794 LIST_HEAD(laundered);
795
796 /* Batched invalidation of fastreg MRs.
797 * Why do we do it this way, even though we could pipeline unmap
798 * and remap? The reason is the application semantics - when the
799 * application requests an invalidation of MRs, it expects all
800 * previously released R_Keys to become invalid.
801 *
802 * If we implement MR reuse naively, we risk memory corruption
803 * (this has actually been observed). So the default behavior
804 * requires that a MR goes through an explicit unmap operation before
805 * we can reuse it again.
806 *
807 * We could probably improve on this a little, by allowing immediate
808 * reuse of a MR on the same socket (eg you could add small
809 * cache of unused MRs to strct rds_socket - GET_MR could grab one
810 * of these without requiring an explicit invalidate).
811 */
812 while (!list_empty(unmap_list)) {
813 unsigned long flags;
814
815 spin_lock_irqsave(&pool->list_lock, flags);
816 list_for_each_entry_safe(mapping, next, unmap_list, m_list) {
817 *unpinned += mapping->m_sg.len;
818 list_move(&mapping->m_list, &laundered);
819 ncleaned++;
820 }
821 spin_unlock_irqrestore(&pool->list_lock, flags);
822 }
823
824 /* Move all laundered mappings back to the unmap list.
825 * We do not kill any WRs right now - it doesn't seem the
826 * fastreg API has a max_remap limit. */
827 list_splice_init(&laundered, unmap_list);
828
829 return ncleaned;
830}
831
832static void rds_iw_destroy_fastreg(struct rds_iw_mr_pool *pool,
833 struct rds_iw_mr *ibmr)
834{
835 if (ibmr->mr)
836 ib_dereg_mr(ibmr->mr);
837}
diff --git a/net/rds/iw_recv.c b/net/rds/iw_recv.c
deleted file mode 100644
index a66d1794b2d0..000000000000
--- a/net/rds/iw_recv.c
+++ /dev/null
@@ -1,904 +0,0 @@
1/*
2 * Copyright (c) 2006 Oracle. All rights reserved.
3 *
4 * This software is available to you under a choice of one of two
5 * licenses. You may choose to be licensed under the terms of the GNU
6 * General Public License (GPL) Version 2, available from the file
7 * COPYING in the main directory of this source tree, or the
8 * OpenIB.org BSD license below:
9 *
10 * Redistribution and use in source and binary forms, with or
11 * without modification, are permitted provided that the following
12 * conditions are met:
13 *
14 * - Redistributions of source code must retain the above
15 * copyright notice, this list of conditions and the following
16 * disclaimer.
17 *
18 * - Redistributions in binary form must reproduce the above
19 * copyright notice, this list of conditions and the following
20 * disclaimer in the documentation and/or other materials
21 * provided with the distribution.
22 *
23 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
24 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
25 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
26 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
27 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
28 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
29 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
30 * SOFTWARE.
31 *
32 */
33#include <linux/kernel.h>
34#include <linux/slab.h>
35#include <linux/pci.h>
36#include <linux/dma-mapping.h>
37#include <rdma/rdma_cm.h>
38
39#include "rds.h"
40#include "iw.h"
41
42static struct kmem_cache *rds_iw_incoming_slab;
43static struct kmem_cache *rds_iw_frag_slab;
44static atomic_t rds_iw_allocation = ATOMIC_INIT(0);
45
46static void rds_iw_frag_drop_page(struct rds_page_frag *frag)
47{
48 rdsdebug("frag %p page %p\n", frag, frag->f_page);
49 __free_page(frag->f_page);
50 frag->f_page = NULL;
51}
52
53static void rds_iw_frag_free(struct rds_page_frag *frag)
54{
55 rdsdebug("frag %p page %p\n", frag, frag->f_page);
56 BUG_ON(frag->f_page);
57 kmem_cache_free(rds_iw_frag_slab, frag);
58}
59
60/*
61 * We map a page at a time. Its fragments are posted in order. This
62 * is called in fragment order as the fragments get send completion events.
63 * Only the last frag in the page performs the unmapping.
64 *
65 * It's OK for ring cleanup to call this in whatever order it likes because
66 * DMA is not in flight and so we can unmap while other ring entries still
67 * hold page references in their frags.
68 */
69static void rds_iw_recv_unmap_page(struct rds_iw_connection *ic,
70 struct rds_iw_recv_work *recv)
71{
72 struct rds_page_frag *frag = recv->r_frag;
73
74 rdsdebug("recv %p frag %p page %p\n", recv, frag, frag->f_page);
75 if (frag->f_mapped)
76 ib_dma_unmap_page(ic->i_cm_id->device,
77 frag->f_mapped,
78 RDS_FRAG_SIZE, DMA_FROM_DEVICE);
79 frag->f_mapped = 0;
80}
81
82void rds_iw_recv_init_ring(struct rds_iw_connection *ic)
83{
84 struct rds_iw_recv_work *recv;
85 u32 i;
86
87 for (i = 0, recv = ic->i_recvs; i < ic->i_recv_ring.w_nr; i++, recv++) {
88 struct ib_sge *sge;
89
90 recv->r_iwinc = NULL;
91 recv->r_frag = NULL;
92
93 recv->r_wr.next = NULL;
94 recv->r_wr.wr_id = i;
95 recv->r_wr.sg_list = recv->r_sge;
96 recv->r_wr.num_sge = RDS_IW_RECV_SGE;
97
98 sge = rds_iw_data_sge(ic, recv->r_sge);
99 sge->addr = 0;
100 sge->length = RDS_FRAG_SIZE;
101 sge->lkey = 0;
102
103 sge = rds_iw_header_sge(ic, recv->r_sge);
104 sge->addr = ic->i_recv_hdrs_dma + (i * sizeof(struct rds_header));
105 sge->length = sizeof(struct rds_header);
106 sge->lkey = 0;
107 }
108}
109
110static void rds_iw_recv_clear_one(struct rds_iw_connection *ic,
111 struct rds_iw_recv_work *recv)
112{
113 if (recv->r_iwinc) {
114 rds_inc_put(&recv->r_iwinc->ii_inc);
115 recv->r_iwinc = NULL;
116 }
117 if (recv->r_frag) {
118 rds_iw_recv_unmap_page(ic, recv);
119 if (recv->r_frag->f_page)
120 rds_iw_frag_drop_page(recv->r_frag);
121 rds_iw_frag_free(recv->r_frag);
122 recv->r_frag = NULL;
123 }
124}
125
126void rds_iw_recv_clear_ring(struct rds_iw_connection *ic)
127{
128 u32 i;
129
130 for (i = 0; i < ic->i_recv_ring.w_nr; i++)
131 rds_iw_recv_clear_one(ic, &ic->i_recvs[i]);
132
133 if (ic->i_frag.f_page)
134 rds_iw_frag_drop_page(&ic->i_frag);
135}
136
137static int rds_iw_recv_refill_one(struct rds_connection *conn,
138 struct rds_iw_recv_work *recv,
139 gfp_t kptr_gfp, gfp_t page_gfp)
140{
141 struct rds_iw_connection *ic = conn->c_transport_data;
142 dma_addr_t dma_addr;
143 struct ib_sge *sge;
144 int ret = -ENOMEM;
145
146 if (!recv->r_iwinc) {
147 if (!atomic_add_unless(&rds_iw_allocation, 1, rds_iw_sysctl_max_recv_allocation)) {
148 rds_iw_stats_inc(s_iw_rx_alloc_limit);
149 goto out;
150 }
151 recv->r_iwinc = kmem_cache_alloc(rds_iw_incoming_slab,
152 kptr_gfp);
153 if (!recv->r_iwinc) {
154 atomic_dec(&rds_iw_allocation);
155 goto out;
156 }
157 INIT_LIST_HEAD(&recv->r_iwinc->ii_frags);
158 rds_inc_init(&recv->r_iwinc->ii_inc, conn, conn->c_faddr);
159 }
160
161 if (!recv->r_frag) {
162 recv->r_frag = kmem_cache_alloc(rds_iw_frag_slab, kptr_gfp);
163 if (!recv->r_frag)
164 goto out;
165 INIT_LIST_HEAD(&recv->r_frag->f_item);
166 recv->r_frag->f_page = NULL;
167 }
168
169 if (!ic->i_frag.f_page) {
170 ic->i_frag.f_page = alloc_page(page_gfp);
171 if (!ic->i_frag.f_page)
172 goto out;
173 ic->i_frag.f_offset = 0;
174 }
175
176 dma_addr = ib_dma_map_page(ic->i_cm_id->device,
177 ic->i_frag.f_page,
178 ic->i_frag.f_offset,
179 RDS_FRAG_SIZE,
180 DMA_FROM_DEVICE);
181 if (ib_dma_mapping_error(ic->i_cm_id->device, dma_addr))
182 goto out;
183
184 /*
185 * Once we get the RDS_PAGE_LAST_OFF frag then rds_iw_frag_unmap()
186 * must be called on this recv. This happens as completions hit
187 * in order or on connection shutdown.
188 */
189 recv->r_frag->f_page = ic->i_frag.f_page;
190 recv->r_frag->f_offset = ic->i_frag.f_offset;
191 recv->r_frag->f_mapped = dma_addr;
192
193 sge = rds_iw_data_sge(ic, recv->r_sge);
194 sge->addr = dma_addr;
195 sge->length = RDS_FRAG_SIZE;
196
197 sge = rds_iw_header_sge(ic, recv->r_sge);
198 sge->addr = ic->i_recv_hdrs_dma + (recv - ic->i_recvs) * sizeof(struct rds_header);
199 sge->length = sizeof(struct rds_header);
200
201 get_page(recv->r_frag->f_page);
202
203 if (ic->i_frag.f_offset < RDS_PAGE_LAST_OFF) {
204 ic->i_frag.f_offset += RDS_FRAG_SIZE;
205 } else {
206 put_page(ic->i_frag.f_page);
207 ic->i_frag.f_page = NULL;
208 ic->i_frag.f_offset = 0;
209 }
210
211 ret = 0;
212out:
213 return ret;
214}
215
216/*
217 * This tries to allocate and post unused work requests after making sure that
218 * they have all the allocations they need to queue received fragments into
219 * sockets. The i_recv_mutex is held here so that ring_alloc and _unalloc
220 * pairs don't go unmatched.
221 *
222 * -1 is returned if posting fails due to temporary resource exhaustion.
223 */
224int rds_iw_recv_refill(struct rds_connection *conn, gfp_t kptr_gfp,
225 gfp_t page_gfp, int prefill)
226{
227 struct rds_iw_connection *ic = conn->c_transport_data;
228 struct rds_iw_recv_work *recv;
229 struct ib_recv_wr *failed_wr;
230 unsigned int posted = 0;
231 int ret = 0;
232 u32 pos;
233
234 while ((prefill || rds_conn_up(conn)) &&
235 rds_iw_ring_alloc(&ic->i_recv_ring, 1, &pos)) {
236 if (pos >= ic->i_recv_ring.w_nr) {
237 printk(KERN_NOTICE "Argh - ring alloc returned pos=%u\n",
238 pos);
239 ret = -EINVAL;
240 break;
241 }
242
243 recv = &ic->i_recvs[pos];
244 ret = rds_iw_recv_refill_one(conn, recv, kptr_gfp, page_gfp);
245 if (ret) {
246 ret = -1;
247 break;
248 }
249
250 /* XXX when can this fail? */
251 ret = ib_post_recv(ic->i_cm_id->qp, &recv->r_wr, &failed_wr);
252 rdsdebug("recv %p iwinc %p page %p addr %lu ret %d\n", recv,
253 recv->r_iwinc, recv->r_frag->f_page,
254 (long) recv->r_frag->f_mapped, ret);
255 if (ret) {
256 rds_iw_conn_error(conn, "recv post on "
257 "%pI4 returned %d, disconnecting and "
258 "reconnecting\n", &conn->c_faddr,
259 ret);
260 ret = -1;
261 break;
262 }
263
264 posted++;
265 }
266
267 /* We're doing flow control - update the window. */
268 if (ic->i_flowctl && posted)
269 rds_iw_advertise_credits(conn, posted);
270
271 if (ret)
272 rds_iw_ring_unalloc(&ic->i_recv_ring, 1);
273 return ret;
274}
275
276static void rds_iw_inc_purge(struct rds_incoming *inc)
277{
278 struct rds_iw_incoming *iwinc;
279 struct rds_page_frag *frag;
280 struct rds_page_frag *pos;
281
282 iwinc = container_of(inc, struct rds_iw_incoming, ii_inc);
283 rdsdebug("purging iwinc %p inc %p\n", iwinc, inc);
284
285 list_for_each_entry_safe(frag, pos, &iwinc->ii_frags, f_item) {
286 list_del_init(&frag->f_item);
287 rds_iw_frag_drop_page(frag);
288 rds_iw_frag_free(frag);
289 }
290}
291
292void rds_iw_inc_free(struct rds_incoming *inc)
293{
294 struct rds_iw_incoming *iwinc;
295
296 iwinc = container_of(inc, struct rds_iw_incoming, ii_inc);
297
298 rds_iw_inc_purge(inc);
299 rdsdebug("freeing iwinc %p inc %p\n", iwinc, inc);
300 BUG_ON(!list_empty(&iwinc->ii_frags));
301 kmem_cache_free(rds_iw_incoming_slab, iwinc);
302 atomic_dec(&rds_iw_allocation);
303 BUG_ON(atomic_read(&rds_iw_allocation) < 0);
304}
305
306int rds_iw_inc_copy_to_user(struct rds_incoming *inc, struct iov_iter *to)
307{
308 struct rds_iw_incoming *iwinc;
309 struct rds_page_frag *frag;
310 unsigned long to_copy;
311 unsigned long frag_off = 0;
312 int copied = 0;
313 int ret;
314 u32 len;
315
316 iwinc = container_of(inc, struct rds_iw_incoming, ii_inc);
317 frag = list_entry(iwinc->ii_frags.next, struct rds_page_frag, f_item);
318 len = be32_to_cpu(inc->i_hdr.h_len);
319
320 while (iov_iter_count(to) && copied < len) {
321 if (frag_off == RDS_FRAG_SIZE) {
322 frag = list_entry(frag->f_item.next,
323 struct rds_page_frag, f_item);
324 frag_off = 0;
325 }
326 to_copy = min_t(unsigned long, iov_iter_count(to),
327 RDS_FRAG_SIZE - frag_off);
328 to_copy = min_t(unsigned long, to_copy, len - copied);
329
330 /* XXX needs + offset for multiple recvs per page */
331 rds_stats_add(s_copy_to_user, to_copy);
332 ret = copy_page_to_iter(frag->f_page,
333 frag->f_offset + frag_off,
334 to_copy,
335 to);
336 if (ret != to_copy)
337 return -EFAULT;
338
339 frag_off += to_copy;
340 copied += to_copy;
341 }
342
343 return copied;
344}
345
346/* ic starts out kzalloc()ed */
347void rds_iw_recv_init_ack(struct rds_iw_connection *ic)
348{
349 struct ib_send_wr *wr = &ic->i_ack_wr;
350 struct ib_sge *sge = &ic->i_ack_sge;
351
352 sge->addr = ic->i_ack_dma;
353 sge->length = sizeof(struct rds_header);
354 sge->lkey = rds_iw_local_dma_lkey(ic);
355
356 wr->sg_list = sge;
357 wr->num_sge = 1;
358 wr->opcode = IB_WR_SEND;
359 wr->wr_id = RDS_IW_ACK_WR_ID;
360 wr->send_flags = IB_SEND_SIGNALED | IB_SEND_SOLICITED;
361}
362
363/*
364 * You'd think that with reliable IB connections you wouldn't need to ack
365 * messages that have been received. The problem is that IB hardware generates
366 * an ack message before it has DMAed the message into memory. This creates a
367 * potential message loss if the HCA is disabled for any reason between when it
368 * sends the ack and before the message is DMAed and processed. This is only a
369 * potential issue if another HCA is available for fail-over.
370 *
371 * When the remote host receives our ack they'll free the sent message from
372 * their send queue. To decrease the latency of this we always send an ack
373 * immediately after we've received messages.
374 *
375 * For simplicity, we only have one ack in flight at a time. This puts
376 * pressure on senders to have deep enough send queues to absorb the latency of
377 * a single ack frame being in flight. This might not be good enough.
378 *
379 * This is implemented by have a long-lived send_wr and sge which point to a
380 * statically allocated ack frame. This ack wr does not fall under the ring
381 * accounting that the tx and rx wrs do. The QP attribute specifically makes
382 * room for it beyond the ring size. Send completion notices its special
383 * wr_id and avoids working with the ring in that case.
384 */
385#ifndef KERNEL_HAS_ATOMIC64
386static void rds_iw_set_ack(struct rds_iw_connection *ic, u64 seq,
387 int ack_required)
388{
389 unsigned long flags;
390
391 spin_lock_irqsave(&ic->i_ack_lock, flags);
392 ic->i_ack_next = seq;
393 if (ack_required)
394 set_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
395 spin_unlock_irqrestore(&ic->i_ack_lock, flags);
396}
397
398static u64 rds_iw_get_ack(struct rds_iw_connection *ic)
399{
400 unsigned long flags;
401 u64 seq;
402
403 clear_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
404
405 spin_lock_irqsave(&ic->i_ack_lock, flags);
406 seq = ic->i_ack_next;
407 spin_unlock_irqrestore(&ic->i_ack_lock, flags);
408
409 return seq;
410}
411#else
412static void rds_iw_set_ack(struct rds_iw_connection *ic, u64 seq,
413 int ack_required)
414{
415 atomic64_set(&ic->i_ack_next, seq);
416 if (ack_required) {
417 smp_mb__before_atomic();
418 set_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
419 }
420}
421
422static u64 rds_iw_get_ack(struct rds_iw_connection *ic)
423{
424 clear_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
425 smp_mb__after_atomic();
426
427 return atomic64_read(&ic->i_ack_next);
428}
429#endif
430
431
432static void rds_iw_send_ack(struct rds_iw_connection *ic, unsigned int adv_credits)
433{
434 struct rds_header *hdr = ic->i_ack;
435 struct ib_send_wr *failed_wr;
436 u64 seq;
437 int ret;
438
439 seq = rds_iw_get_ack(ic);
440
441 rdsdebug("send_ack: ic %p ack %llu\n", ic, (unsigned long long) seq);
442 rds_message_populate_header(hdr, 0, 0, 0);
443 hdr->h_ack = cpu_to_be64(seq);
444 hdr->h_credit = adv_credits;
445 rds_message_make_checksum(hdr);
446 ic->i_ack_queued = jiffies;
447
448 ret = ib_post_send(ic->i_cm_id->qp, &ic->i_ack_wr, &failed_wr);
449 if (unlikely(ret)) {
450 /* Failed to send. Release the WR, and
451 * force another ACK.
452 */
453 clear_bit(IB_ACK_IN_FLIGHT, &ic->i_ack_flags);
454 set_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
455
456 rds_iw_stats_inc(s_iw_ack_send_failure);
457
458 rds_iw_conn_error(ic->conn, "sending ack failed\n");
459 } else
460 rds_iw_stats_inc(s_iw_ack_sent);
461}
462
463/*
464 * There are 3 ways of getting acknowledgements to the peer:
465 * 1. We call rds_iw_attempt_ack from the recv completion handler
466 * to send an ACK-only frame.
467 * However, there can be only one such frame in the send queue
468 * at any time, so we may have to postpone it.
469 * 2. When another (data) packet is transmitted while there's
470 * an ACK in the queue, we piggyback the ACK sequence number
471 * on the data packet.
472 * 3. If the ACK WR is done sending, we get called from the
473 * send queue completion handler, and check whether there's
474 * another ACK pending (postponed because the WR was on the
475 * queue). If so, we transmit it.
476 *
477 * We maintain 2 variables:
478 * - i_ack_flags, which keeps track of whether the ACK WR
479 * is currently in the send queue or not (IB_ACK_IN_FLIGHT)
480 * - i_ack_next, which is the last sequence number we received
481 *
482 * Potentially, send queue and receive queue handlers can run concurrently.
483 * It would be nice to not have to use a spinlock to synchronize things,
484 * but the one problem that rules this out is that 64bit updates are
485 * not atomic on all platforms. Things would be a lot simpler if
486 * we had atomic64 or maybe cmpxchg64 everywhere.
487 *
488 * Reconnecting complicates this picture just slightly. When we
489 * reconnect, we may be seeing duplicate packets. The peer
490 * is retransmitting them, because it hasn't seen an ACK for
491 * them. It is important that we ACK these.
492 *
493 * ACK mitigation adds a header flag "ACK_REQUIRED"; any packet with
494 * this flag set *MUST* be acknowledged immediately.
495 */
496
497/*
498 * When we get here, we're called from the recv queue handler.
499 * Check whether we ought to transmit an ACK.
500 */
501void rds_iw_attempt_ack(struct rds_iw_connection *ic)
502{
503 unsigned int adv_credits;
504
505 if (!test_bit(IB_ACK_REQUESTED, &ic->i_ack_flags))
506 return;
507
508 if (test_and_set_bit(IB_ACK_IN_FLIGHT, &ic->i_ack_flags)) {
509 rds_iw_stats_inc(s_iw_ack_send_delayed);
510 return;
511 }
512
513 /* Can we get a send credit? */
514 if (!rds_iw_send_grab_credits(ic, 1, &adv_credits, 0, RDS_MAX_ADV_CREDIT)) {
515 rds_iw_stats_inc(s_iw_tx_throttle);
516 clear_bit(IB_ACK_IN_FLIGHT, &ic->i_ack_flags);
517 return;
518 }
519
520 clear_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
521 rds_iw_send_ack(ic, adv_credits);
522}
523
524/*
525 * We get here from the send completion handler, when the
526 * adapter tells us the ACK frame was sent.
527 */
528void rds_iw_ack_send_complete(struct rds_iw_connection *ic)
529{
530 clear_bit(IB_ACK_IN_FLIGHT, &ic->i_ack_flags);
531 rds_iw_attempt_ack(ic);
532}
533
534/*
535 * This is called by the regular xmit code when it wants to piggyback
536 * an ACK on an outgoing frame.
537 */
538u64 rds_iw_piggyb_ack(struct rds_iw_connection *ic)
539{
540 if (test_and_clear_bit(IB_ACK_REQUESTED, &ic->i_ack_flags))
541 rds_iw_stats_inc(s_iw_ack_send_piggybacked);
542 return rds_iw_get_ack(ic);
543}
544
545/*
546 * It's kind of lame that we're copying from the posted receive pages into
547 * long-lived bitmaps. We could have posted the bitmaps and rdma written into
548 * them. But receiving new congestion bitmaps should be a *rare* event, so
549 * hopefully we won't need to invest that complexity in making it more
550 * efficient. By copying we can share a simpler core with TCP which has to
551 * copy.
552 */
553static void rds_iw_cong_recv(struct rds_connection *conn,
554 struct rds_iw_incoming *iwinc)
555{
556 struct rds_cong_map *map;
557 unsigned int map_off;
558 unsigned int map_page;
559 struct rds_page_frag *frag;
560 unsigned long frag_off;
561 unsigned long to_copy;
562 unsigned long copied;
563 uint64_t uncongested = 0;
564 void *addr;
565
566 /* catch completely corrupt packets */
567 if (be32_to_cpu(iwinc->ii_inc.i_hdr.h_len) != RDS_CONG_MAP_BYTES)
568 return;
569
570 map = conn->c_fcong;
571 map_page = 0;
572 map_off = 0;
573
574 frag = list_entry(iwinc->ii_frags.next, struct rds_page_frag, f_item);
575 frag_off = 0;
576
577 copied = 0;
578
579 while (copied < RDS_CONG_MAP_BYTES) {
580 uint64_t *src, *dst;
581 unsigned int k;
582
583 to_copy = min(RDS_FRAG_SIZE - frag_off, PAGE_SIZE - map_off);
584 BUG_ON(to_copy & 7); /* Must be 64bit aligned. */
585
586 addr = kmap_atomic(frag->f_page);
587
588 src = addr + frag_off;
589 dst = (void *)map->m_page_addrs[map_page] + map_off;
590 for (k = 0; k < to_copy; k += 8) {
591 /* Record ports that became uncongested, ie
592 * bits that changed from 0 to 1. */
593 uncongested |= ~(*src) & *dst;
594 *dst++ = *src++;
595 }
596 kunmap_atomic(addr);
597
598 copied += to_copy;
599
600 map_off += to_copy;
601 if (map_off == PAGE_SIZE) {
602 map_off = 0;
603 map_page++;
604 }
605
606 frag_off += to_copy;
607 if (frag_off == RDS_FRAG_SIZE) {
608 frag = list_entry(frag->f_item.next,
609 struct rds_page_frag, f_item);
610 frag_off = 0;
611 }
612 }
613
614 /* the congestion map is in little endian order */
615 uncongested = le64_to_cpu(uncongested);
616
617 rds_cong_map_updated(map, uncongested);
618}
619
620/*
621 * Rings are posted with all the allocations they'll need to queue the
622 * incoming message to the receiving socket so this can't fail.
623 * All fragments start with a header, so we can make sure we're not receiving
624 * garbage, and we can tell a small 8 byte fragment from an ACK frame.
625 */
626struct rds_iw_ack_state {
627 u64 ack_next;
628 u64 ack_recv;
629 unsigned int ack_required:1;
630 unsigned int ack_next_valid:1;
631 unsigned int ack_recv_valid:1;
632};
633
634static void rds_iw_process_recv(struct rds_connection *conn,
635 struct rds_iw_recv_work *recv, u32 byte_len,
636 struct rds_iw_ack_state *state)
637{
638 struct rds_iw_connection *ic = conn->c_transport_data;
639 struct rds_iw_incoming *iwinc = ic->i_iwinc;
640 struct rds_header *ihdr, *hdr;
641
642 /* XXX shut down the connection if port 0,0 are seen? */
643
644 rdsdebug("ic %p iwinc %p recv %p byte len %u\n", ic, iwinc, recv,
645 byte_len);
646
647 if (byte_len < sizeof(struct rds_header)) {
648 rds_iw_conn_error(conn, "incoming message "
649 "from %pI4 didn't include a "
650 "header, disconnecting and "
651 "reconnecting\n",
652 &conn->c_faddr);
653 return;
654 }
655 byte_len -= sizeof(struct rds_header);
656
657 ihdr = &ic->i_recv_hdrs[recv - ic->i_recvs];
658
659 /* Validate the checksum. */
660 if (!rds_message_verify_checksum(ihdr)) {
661 rds_iw_conn_error(conn, "incoming message "
662 "from %pI4 has corrupted header - "
663 "forcing a reconnect\n",
664 &conn->c_faddr);
665 rds_stats_inc(s_recv_drop_bad_checksum);
666 return;
667 }
668
669 /* Process the ACK sequence which comes with every packet */
670 state->ack_recv = be64_to_cpu(ihdr->h_ack);
671 state->ack_recv_valid = 1;
672
673 /* Process the credits update if there was one */
674 if (ihdr->h_credit)
675 rds_iw_send_add_credits(conn, ihdr->h_credit);
676
677 if (ihdr->h_sport == 0 && ihdr->h_dport == 0 && byte_len == 0) {
678 /* This is an ACK-only packet. The fact that it gets
679 * special treatment here is that historically, ACKs
680 * were rather special beasts.
681 */
682 rds_iw_stats_inc(s_iw_ack_received);
683
684 /*
685 * Usually the frags make their way on to incs and are then freed as
686 * the inc is freed. We don't go that route, so we have to drop the
687 * page ref ourselves. We can't just leave the page on the recv
688 * because that confuses the dma mapping of pages and each recv's use
689 * of a partial page. We can leave the frag, though, it will be
690 * reused.
691 *
692 * FIXME: Fold this into the code path below.
693 */
694 rds_iw_frag_drop_page(recv->r_frag);
695 return;
696 }
697
698 /*
699 * If we don't already have an inc on the connection then this
700 * fragment has a header and starts a message.. copy its header
701 * into the inc and save the inc so we can hang upcoming fragments
702 * off its list.
703 */
704 if (!iwinc) {
705 iwinc = recv->r_iwinc;
706 recv->r_iwinc = NULL;
707 ic->i_iwinc = iwinc;
708
709 hdr = &iwinc->ii_inc.i_hdr;
710 memcpy(hdr, ihdr, sizeof(*hdr));
711 ic->i_recv_data_rem = be32_to_cpu(hdr->h_len);
712
713 rdsdebug("ic %p iwinc %p rem %u flag 0x%x\n", ic, iwinc,
714 ic->i_recv_data_rem, hdr->h_flags);
715 } else {
716 hdr = &iwinc->ii_inc.i_hdr;
717 /* We can't just use memcmp here; fragments of a
718 * single message may carry different ACKs */
719 if (hdr->h_sequence != ihdr->h_sequence ||
720 hdr->h_len != ihdr->h_len ||
721 hdr->h_sport != ihdr->h_sport ||
722 hdr->h_dport != ihdr->h_dport) {
723 rds_iw_conn_error(conn,
724 "fragment header mismatch; forcing reconnect\n");
725 return;
726 }
727 }
728
729 list_add_tail(&recv->r_frag->f_item, &iwinc->ii_frags);
730 recv->r_frag = NULL;
731
732 if (ic->i_recv_data_rem > RDS_FRAG_SIZE)
733 ic->i_recv_data_rem -= RDS_FRAG_SIZE;
734 else {
735 ic->i_recv_data_rem = 0;
736 ic->i_iwinc = NULL;
737
738 if (iwinc->ii_inc.i_hdr.h_flags == RDS_FLAG_CONG_BITMAP)
739 rds_iw_cong_recv(conn, iwinc);
740 else {
741 rds_recv_incoming(conn, conn->c_faddr, conn->c_laddr,
742 &iwinc->ii_inc, GFP_ATOMIC);
743 state->ack_next = be64_to_cpu(hdr->h_sequence);
744 state->ack_next_valid = 1;
745 }
746
747 /* Evaluate the ACK_REQUIRED flag *after* we received
748 * the complete frame, and after bumping the next_rx
749 * sequence. */
750 if (hdr->h_flags & RDS_FLAG_ACK_REQUIRED) {
751 rds_stats_inc(s_recv_ack_required);
752 state->ack_required = 1;
753 }
754
755 rds_inc_put(&iwinc->ii_inc);
756 }
757}
758
759/*
760 * Plucking the oldest entry from the ring can be done concurrently with
761 * the thread refilling the ring. Each ring operation is protected by
762 * spinlocks and the transient state of refilling doesn't change the
763 * recording of which entry is oldest.
764 *
765 * This relies on IB only calling one cq comp_handler for each cq so that
766 * there will only be one caller of rds_recv_incoming() per RDS connection.
767 */
768void rds_iw_recv_cq_comp_handler(struct ib_cq *cq, void *context)
769{
770 struct rds_connection *conn = context;
771 struct rds_iw_connection *ic = conn->c_transport_data;
772
773 rdsdebug("conn %p cq %p\n", conn, cq);
774
775 rds_iw_stats_inc(s_iw_rx_cq_call);
776
777 tasklet_schedule(&ic->i_recv_tasklet);
778}
779
780static inline void rds_poll_cq(struct rds_iw_connection *ic,
781 struct rds_iw_ack_state *state)
782{
783 struct rds_connection *conn = ic->conn;
784 struct ib_wc wc;
785 struct rds_iw_recv_work *recv;
786
787 while (ib_poll_cq(ic->i_recv_cq, 1, &wc) > 0) {
788 rdsdebug("wc wr_id 0x%llx status %u byte_len %u imm_data %u\n",
789 (unsigned long long)wc.wr_id, wc.status, wc.byte_len,
790 be32_to_cpu(wc.ex.imm_data));
791 rds_iw_stats_inc(s_iw_rx_cq_event);
792
793 recv = &ic->i_recvs[rds_iw_ring_oldest(&ic->i_recv_ring)];
794
795 rds_iw_recv_unmap_page(ic, recv);
796
797 /*
798 * Also process recvs in connecting state because it is possible
799 * to get a recv completion _before_ the rdmacm ESTABLISHED
800 * event is processed.
801 */
802 if (rds_conn_up(conn) || rds_conn_connecting(conn)) {
803 /* We expect errors as the qp is drained during shutdown */
804 if (wc.status == IB_WC_SUCCESS) {
805 rds_iw_process_recv(conn, recv, wc.byte_len, state);
806 } else {
807 rds_iw_conn_error(conn, "recv completion on "
808 "%pI4 had status %u, disconnecting and "
809 "reconnecting\n", &conn->c_faddr,
810 wc.status);
811 }
812 }
813
814 rds_iw_ring_free(&ic->i_recv_ring, 1);
815 }
816}
817
818void rds_iw_recv_tasklet_fn(unsigned long data)
819{
820 struct rds_iw_connection *ic = (struct rds_iw_connection *) data;
821 struct rds_connection *conn = ic->conn;
822 struct rds_iw_ack_state state = { 0, };
823
824 rds_poll_cq(ic, &state);
825 ib_req_notify_cq(ic->i_recv_cq, IB_CQ_SOLICITED);
826 rds_poll_cq(ic, &state);
827
828 if (state.ack_next_valid)
829 rds_iw_set_ack(ic, state.ack_next, state.ack_required);
830 if (state.ack_recv_valid && state.ack_recv > ic->i_ack_recv) {
831 rds_send_drop_acked(conn, state.ack_recv, NULL);
832 ic->i_ack_recv = state.ack_recv;
833 }
834 if (rds_conn_up(conn))
835 rds_iw_attempt_ack(ic);
836
837 /* If we ever end up with a really empty receive ring, we're
838 * in deep trouble, as the sender will definitely see RNR
839 * timeouts. */
840 if (rds_iw_ring_empty(&ic->i_recv_ring))
841 rds_iw_stats_inc(s_iw_rx_ring_empty);
842
843 /*
844 * If the ring is running low, then schedule the thread to refill.
845 */
846 if (rds_iw_ring_low(&ic->i_recv_ring))
847 queue_delayed_work(rds_wq, &conn->c_recv_w, 0);
848}
849
850int rds_iw_recv(struct rds_connection *conn)
851{
852 struct rds_iw_connection *ic = conn->c_transport_data;
853 int ret = 0;
854
855 rdsdebug("conn %p\n", conn);
856
857 /*
858 * If we get a temporary posting failure in this context then
859 * we're really low and we want the caller to back off for a bit.
860 */
861 mutex_lock(&ic->i_recv_mutex);
862 if (rds_iw_recv_refill(conn, GFP_KERNEL, GFP_HIGHUSER, 0))
863 ret = -ENOMEM;
864 else
865 rds_iw_stats_inc(s_iw_rx_refill_from_thread);
866 mutex_unlock(&ic->i_recv_mutex);
867
868 if (rds_conn_up(conn))
869 rds_iw_attempt_ack(ic);
870
871 return ret;
872}
873
874int rds_iw_recv_init(void)
875{
876 struct sysinfo si;
877 int ret = -ENOMEM;
878
879 /* Default to 30% of all available RAM for recv memory */
880 si_meminfo(&si);
881 rds_iw_sysctl_max_recv_allocation = si.totalram / 3 * PAGE_SIZE / RDS_FRAG_SIZE;
882
883 rds_iw_incoming_slab = kmem_cache_create("rds_iw_incoming",
884 sizeof(struct rds_iw_incoming),
885 0, 0, NULL);
886 if (!rds_iw_incoming_slab)
887 goto out;
888
889 rds_iw_frag_slab = kmem_cache_create("rds_iw_frag",
890 sizeof(struct rds_page_frag),
891 0, 0, NULL);
892 if (!rds_iw_frag_slab)
893 kmem_cache_destroy(rds_iw_incoming_slab);
894 else
895 ret = 0;
896out:
897 return ret;
898}
899
900void rds_iw_recv_exit(void)
901{
902 kmem_cache_destroy(rds_iw_incoming_slab);
903 kmem_cache_destroy(rds_iw_frag_slab);
904}
diff --git a/net/rds/iw_ring.c b/net/rds/iw_ring.c
deleted file mode 100644
index da8e3b63f663..000000000000
--- a/net/rds/iw_ring.c
+++ /dev/null
@@ -1,169 +0,0 @@
1/*
2 * Copyright (c) 2006 Oracle. All rights reserved.
3 *
4 * This software is available to you under a choice of one of two
5 * licenses. You may choose to be licensed under the terms of the GNU
6 * General Public License (GPL) Version 2, available from the file
7 * COPYING in the main directory of this source tree, or the
8 * OpenIB.org BSD license below:
9 *
10 * Redistribution and use in source and binary forms, with or
11 * without modification, are permitted provided that the following
12 * conditions are met:
13 *
14 * - Redistributions of source code must retain the above
15 * copyright notice, this list of conditions and the following
16 * disclaimer.
17 *
18 * - Redistributions in binary form must reproduce the above
19 * copyright notice, this list of conditions and the following
20 * disclaimer in the documentation and/or other materials
21 * provided with the distribution.
22 *
23 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
24 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
25 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
26 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
27 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
28 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
29 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
30 * SOFTWARE.
31 *
32 */
33#include <linux/kernel.h>
34
35#include "rds.h"
36#include "iw.h"
37
38/*
39 * Locking for IB rings.
40 * We assume that allocation is always protected by a mutex
41 * in the caller (this is a valid assumption for the current
42 * implementation).
43 *
44 * Freeing always happens in an interrupt, and hence only
45 * races with allocations, but not with other free()s.
46 *
47 * The interaction between allocation and freeing is that
48 * the alloc code has to determine the number of free entries.
49 * To this end, we maintain two counters; an allocation counter
50 * and a free counter. Both are allowed to run freely, and wrap
51 * around.
52 * The number of used entries is always (alloc_ctr - free_ctr) % NR.
53 *
54 * The current implementation makes free_ctr atomic. When the
55 * caller finds an allocation fails, it should set an "alloc fail"
56 * bit and retry the allocation. The "alloc fail" bit essentially tells
57 * the CQ completion handlers to wake it up after freeing some
58 * more entries.
59 */
60
61/*
62 * This only happens on shutdown.
63 */
64DECLARE_WAIT_QUEUE_HEAD(rds_iw_ring_empty_wait);
65
66void rds_iw_ring_init(struct rds_iw_work_ring *ring, u32 nr)
67{
68 memset(ring, 0, sizeof(*ring));
69 ring->w_nr = nr;
70 rdsdebug("ring %p nr %u\n", ring, ring->w_nr);
71}
72
73static inline u32 __rds_iw_ring_used(struct rds_iw_work_ring *ring)
74{
75 u32 diff;
76
77 /* This assumes that atomic_t has at least as many bits as u32 */
78 diff = ring->w_alloc_ctr - (u32) atomic_read(&ring->w_free_ctr);
79 BUG_ON(diff > ring->w_nr);
80
81 return diff;
82}
83
84void rds_iw_ring_resize(struct rds_iw_work_ring *ring, u32 nr)
85{
86 /* We only ever get called from the connection setup code,
87 * prior to creating the QP. */
88 BUG_ON(__rds_iw_ring_used(ring));
89 ring->w_nr = nr;
90}
91
92static int __rds_iw_ring_empty(struct rds_iw_work_ring *ring)
93{
94 return __rds_iw_ring_used(ring) == 0;
95}
96
97u32 rds_iw_ring_alloc(struct rds_iw_work_ring *ring, u32 val, u32 *pos)
98{
99 u32 ret = 0, avail;
100
101 avail = ring->w_nr - __rds_iw_ring_used(ring);
102
103 rdsdebug("ring %p val %u next %u free %u\n", ring, val,
104 ring->w_alloc_ptr, avail);
105
106 if (val && avail) {
107 ret = min(val, avail);
108 *pos = ring->w_alloc_ptr;
109
110 ring->w_alloc_ptr = (ring->w_alloc_ptr + ret) % ring->w_nr;
111 ring->w_alloc_ctr += ret;
112 }
113
114 return ret;
115}
116
117void rds_iw_ring_free(struct rds_iw_work_ring *ring, u32 val)
118{
119 ring->w_free_ptr = (ring->w_free_ptr + val) % ring->w_nr;
120 atomic_add(val, &ring->w_free_ctr);
121
122 if (__rds_iw_ring_empty(ring) &&
123 waitqueue_active(&rds_iw_ring_empty_wait))
124 wake_up(&rds_iw_ring_empty_wait);
125}
126
127void rds_iw_ring_unalloc(struct rds_iw_work_ring *ring, u32 val)
128{
129 ring->w_alloc_ptr = (ring->w_alloc_ptr - val) % ring->w_nr;
130 ring->w_alloc_ctr -= val;
131}
132
133int rds_iw_ring_empty(struct rds_iw_work_ring *ring)
134{
135 return __rds_iw_ring_empty(ring);
136}
137
138int rds_iw_ring_low(struct rds_iw_work_ring *ring)
139{
140 return __rds_iw_ring_used(ring) <= (ring->w_nr >> 1);
141}
142
143
144/*
145 * returns the oldest alloced ring entry. This will be the next one
146 * freed. This can't be called if there are none allocated.
147 */
148u32 rds_iw_ring_oldest(struct rds_iw_work_ring *ring)
149{
150 return ring->w_free_ptr;
151}
152
153/*
154 * returns the number of completed work requests.
155 */
156
157u32 rds_iw_ring_completed(struct rds_iw_work_ring *ring, u32 wr_id, u32 oldest)
158{
159 u32 ret;
160
161 if (oldest <= (unsigned long long)wr_id)
162 ret = (unsigned long long)wr_id - oldest + 1;
163 else
164 ret = ring->w_nr - oldest + (unsigned long long)wr_id + 1;
165
166 rdsdebug("ring %p ret %u wr_id %u oldest %u\n", ring, ret,
167 wr_id, oldest);
168 return ret;
169}
diff --git a/net/rds/iw_send.c b/net/rds/iw_send.c
deleted file mode 100644
index e20bd503f4bd..000000000000
--- a/net/rds/iw_send.c
+++ /dev/null
@@ -1,981 +0,0 @@
1/*
2 * Copyright (c) 2006 Oracle. All rights reserved.
3 *
4 * This software is available to you under a choice of one of two
5 * licenses. You may choose to be licensed under the terms of the GNU
6 * General Public License (GPL) Version 2, available from the file
7 * COPYING in the main directory of this source tree, or the
8 * OpenIB.org BSD license below:
9 *
10 * Redistribution and use in source and binary forms, with or
11 * without modification, are permitted provided that the following
12 * conditions are met:
13 *
14 * - Redistributions of source code must retain the above
15 * copyright notice, this list of conditions and the following
16 * disclaimer.
17 *
18 * - Redistributions in binary form must reproduce the above
19 * copyright notice, this list of conditions and the following
20 * disclaimer in the documentation and/or other materials
21 * provided with the distribution.
22 *
23 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
24 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
25 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
26 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
27 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
28 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
29 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
30 * SOFTWARE.
31 *
32 */
33#include <linux/kernel.h>
34#include <linux/in.h>
35#include <linux/device.h>
36#include <linux/dmapool.h>
37#include <linux/ratelimit.h>
38
39#include "rds.h"
40#include "iw.h"
41
42static void rds_iw_send_rdma_complete(struct rds_message *rm,
43 int wc_status)
44{
45 int notify_status;
46
47 switch (wc_status) {
48 case IB_WC_WR_FLUSH_ERR:
49 return;
50
51 case IB_WC_SUCCESS:
52 notify_status = RDS_RDMA_SUCCESS;
53 break;
54
55 case IB_WC_REM_ACCESS_ERR:
56 notify_status = RDS_RDMA_REMOTE_ERROR;
57 break;
58
59 default:
60 notify_status = RDS_RDMA_OTHER_ERROR;
61 break;
62 }
63 rds_rdma_send_complete(rm, notify_status);
64}
65
66static void rds_iw_send_unmap_rdma(struct rds_iw_connection *ic,
67 struct rm_rdma_op *op)
68{
69 if (op->op_mapped) {
70 ib_dma_unmap_sg(ic->i_cm_id->device,
71 op->op_sg, op->op_nents,
72 op->op_write ? DMA_TO_DEVICE : DMA_FROM_DEVICE);
73 op->op_mapped = 0;
74 }
75}
76
77static void rds_iw_send_unmap_rm(struct rds_iw_connection *ic,
78 struct rds_iw_send_work *send,
79 int wc_status)
80{
81 struct rds_message *rm = send->s_rm;
82
83 rdsdebug("ic %p send %p rm %p\n", ic, send, rm);
84
85 ib_dma_unmap_sg(ic->i_cm_id->device,
86 rm->data.op_sg, rm->data.op_nents,
87 DMA_TO_DEVICE);
88
89 if (rm->rdma.op_active) {
90 rds_iw_send_unmap_rdma(ic, &rm->rdma);
91
92 /* If the user asked for a completion notification on this
93 * message, we can implement three different semantics:
94 * 1. Notify when we received the ACK on the RDS message
95 * that was queued with the RDMA. This provides reliable
96 * notification of RDMA status at the expense of a one-way
97 * packet delay.
98 * 2. Notify when the IB stack gives us the completion event for
99 * the RDMA operation.
100 * 3. Notify when the IB stack gives us the completion event for
101 * the accompanying RDS messages.
102 * Here, we implement approach #3. To implement approach #2,
103 * call rds_rdma_send_complete from the cq_handler. To implement #1,
104 * don't call rds_rdma_send_complete at all, and fall back to the notify
105 * handling in the ACK processing code.
106 *
107 * Note: There's no need to explicitly sync any RDMA buffers using
108 * ib_dma_sync_sg_for_cpu - the completion for the RDMA
109 * operation itself unmapped the RDMA buffers, which takes care
110 * of synching.
111 */
112 rds_iw_send_rdma_complete(rm, wc_status);
113
114 if (rm->rdma.op_write)
115 rds_stats_add(s_send_rdma_bytes, rm->rdma.op_bytes);
116 else
117 rds_stats_add(s_recv_rdma_bytes, rm->rdma.op_bytes);
118 }
119
120 /* If anyone waited for this message to get flushed out, wake
121 * them up now */
122 rds_message_unmapped(rm);
123
124 rds_message_put(rm);
125 send->s_rm = NULL;
126}
127
128void rds_iw_send_init_ring(struct rds_iw_connection *ic)
129{
130 struct rds_iw_send_work *send;
131 u32 i;
132
133 for (i = 0, send = ic->i_sends; i < ic->i_send_ring.w_nr; i++, send++) {
134 struct ib_sge *sge;
135
136 send->s_rm = NULL;
137 send->s_op = NULL;
138 send->s_mapping = NULL;
139
140 send->s_send_wr.next = NULL;
141 send->s_send_wr.wr_id = i;
142 send->s_send_wr.sg_list = send->s_sge;
143 send->s_send_wr.num_sge = 1;
144 send->s_send_wr.opcode = IB_WR_SEND;
145 send->s_send_wr.send_flags = 0;
146 send->s_send_wr.ex.imm_data = 0;
147
148 sge = rds_iw_data_sge(ic, send->s_sge);
149 sge->lkey = 0;
150
151 sge = rds_iw_header_sge(ic, send->s_sge);
152 sge->addr = ic->i_send_hdrs_dma + (i * sizeof(struct rds_header));
153 sge->length = sizeof(struct rds_header);
154 sge->lkey = 0;
155
156 send->s_mr = ib_alloc_mr(ic->i_pd, IB_MR_TYPE_MEM_REG,
157 fastreg_message_size);
158 if (IS_ERR(send->s_mr)) {
159 printk(KERN_WARNING "RDS/IW: ib_alloc_mr failed\n");
160 break;
161 }
162 }
163}
164
165void rds_iw_send_clear_ring(struct rds_iw_connection *ic)
166{
167 struct rds_iw_send_work *send;
168 u32 i;
169
170 for (i = 0, send = ic->i_sends; i < ic->i_send_ring.w_nr; i++, send++) {
171 BUG_ON(!send->s_mr);
172 ib_dereg_mr(send->s_mr);
173 if (send->s_send_wr.opcode == 0xdead)
174 continue;
175 if (send->s_rm)
176 rds_iw_send_unmap_rm(ic, send, IB_WC_WR_FLUSH_ERR);
177 if (send->s_op)
178 rds_iw_send_unmap_rdma(ic, send->s_op);
179 }
180}
181
182/*
183 * The _oldest/_free ring operations here race cleanly with the alloc/unalloc
184 * operations performed in the send path. As the sender allocs and potentially
185 * unallocs the next free entry in the ring it doesn't alter which is
186 * the next to be freed, which is what this is concerned with.
187 */
188void rds_iw_send_cq_comp_handler(struct ib_cq *cq, void *context)
189{
190 struct rds_connection *conn = context;
191 struct rds_iw_connection *ic = conn->c_transport_data;
192 struct ib_wc wc;
193 struct rds_iw_send_work *send;
194 u32 completed;
195 u32 oldest;
196 u32 i;
197 int ret;
198
199 rdsdebug("cq %p conn %p\n", cq, conn);
200 rds_iw_stats_inc(s_iw_tx_cq_call);
201 ret = ib_req_notify_cq(cq, IB_CQ_NEXT_COMP);
202 if (ret)
203 rdsdebug("ib_req_notify_cq send failed: %d\n", ret);
204
205 while (ib_poll_cq(cq, 1, &wc) > 0) {
206 rdsdebug("wc wr_id 0x%llx status %u byte_len %u imm_data %u\n",
207 (unsigned long long)wc.wr_id, wc.status, wc.byte_len,
208 be32_to_cpu(wc.ex.imm_data));
209 rds_iw_stats_inc(s_iw_tx_cq_event);
210
211 if (wc.status != IB_WC_SUCCESS) {
212 printk(KERN_ERR "WC Error: status = %d opcode = %d\n", wc.status, wc.opcode);
213 break;
214 }
215
216 if (wc.opcode == IB_WC_LOCAL_INV && wc.wr_id == RDS_IW_LOCAL_INV_WR_ID) {
217 ic->i_fastreg_posted = 0;
218 continue;
219 }
220
221 if (wc.opcode == IB_WC_REG_MR && wc.wr_id == RDS_IW_REG_WR_ID) {
222 ic->i_fastreg_posted = 1;
223 continue;
224 }
225
226 if (wc.wr_id == RDS_IW_ACK_WR_ID) {
227 if (time_after(jiffies, ic->i_ack_queued + HZ/2))
228 rds_iw_stats_inc(s_iw_tx_stalled);
229 rds_iw_ack_send_complete(ic);
230 continue;
231 }
232
233 oldest = rds_iw_ring_oldest(&ic->i_send_ring);
234
235 completed = rds_iw_ring_completed(&ic->i_send_ring, wc.wr_id, oldest);
236
237 for (i = 0; i < completed; i++) {
238 send = &ic->i_sends[oldest];
239
240 /* In the error case, wc.opcode sometimes contains garbage */
241 switch (send->s_send_wr.opcode) {
242 case IB_WR_SEND:
243 if (send->s_rm)
244 rds_iw_send_unmap_rm(ic, send, wc.status);
245 break;
246 case IB_WR_REG_MR:
247 case IB_WR_RDMA_WRITE:
248 case IB_WR_RDMA_READ:
249 case IB_WR_RDMA_READ_WITH_INV:
250 /* Nothing to be done - the SG list will be unmapped
251 * when the SEND completes. */
252 break;
253 default:
254 printk_ratelimited(KERN_NOTICE
255 "RDS/IW: %s: unexpected opcode 0x%x in WR!\n",
256 __func__, send->s_send_wr.opcode);
257 break;
258 }
259
260 send->s_send_wr.opcode = 0xdead;
261 send->s_send_wr.num_sge = 1;
262 if (time_after(jiffies, send->s_queued + HZ/2))
263 rds_iw_stats_inc(s_iw_tx_stalled);
264
265 /* If a RDMA operation produced an error, signal this right
266 * away. If we don't, the subsequent SEND that goes with this
267 * RDMA will be canceled with ERR_WFLUSH, and the application
268 * never learn that the RDMA failed. */
269 if (unlikely(wc.status == IB_WC_REM_ACCESS_ERR && send->s_op)) {
270 struct rds_message *rm;
271
272 rm = rds_send_get_message(conn, send->s_op);
273 if (rm)
274 rds_iw_send_rdma_complete(rm, wc.status);
275 }
276
277 oldest = (oldest + 1) % ic->i_send_ring.w_nr;
278 }
279
280 rds_iw_ring_free(&ic->i_send_ring, completed);
281
282 if (test_and_clear_bit(RDS_LL_SEND_FULL, &conn->c_flags) ||
283 test_bit(0, &conn->c_map_queued))
284 queue_delayed_work(rds_wq, &conn->c_send_w, 0);
285
286 /* We expect errors as the qp is drained during shutdown */
287 if (wc.status != IB_WC_SUCCESS && rds_conn_up(conn)) {
288 rds_iw_conn_error(conn,
289 "send completion on %pI4 "
290 "had status %u, disconnecting and reconnecting\n",
291 &conn->c_faddr, wc.status);
292 }
293 }
294}
295
296/*
297 * This is the main function for allocating credits when sending
298 * messages.
299 *
300 * Conceptually, we have two counters:
301 * - send credits: this tells us how many WRs we're allowed
302 * to submit without overruning the receiver's queue. For
303 * each SEND WR we post, we decrement this by one.
304 *
305 * - posted credits: this tells us how many WRs we recently
306 * posted to the receive queue. This value is transferred
307 * to the peer as a "credit update" in a RDS header field.
308 * Every time we transmit credits to the peer, we subtract
309 * the amount of transferred credits from this counter.
310 *
311 * It is essential that we avoid situations where both sides have
312 * exhausted their send credits, and are unable to send new credits
313 * to the peer. We achieve this by requiring that we send at least
314 * one credit update to the peer before exhausting our credits.
315 * When new credits arrive, we subtract one credit that is withheld
316 * until we've posted new buffers and are ready to transmit these
317 * credits (see rds_iw_send_add_credits below).
318 *
319 * The RDS send code is essentially single-threaded; rds_send_xmit
320 * grabs c_send_lock to ensure exclusive access to the send ring.
321 * However, the ACK sending code is independent and can race with
322 * message SENDs.
323 *
324 * In the send path, we need to update the counters for send credits
325 * and the counter of posted buffers atomically - when we use the
326 * last available credit, we cannot allow another thread to race us
327 * and grab the posted credits counter. Hence, we have to use a
328 * spinlock to protect the credit counter, or use atomics.
329 *
330 * Spinlocks shared between the send and the receive path are bad,
331 * because they create unnecessary delays. An early implementation
332 * using a spinlock showed a 5% degradation in throughput at some
333 * loads.
334 *
335 * This implementation avoids spinlocks completely, putting both
336 * counters into a single atomic, and updating that atomic using
337 * atomic_add (in the receive path, when receiving fresh credits),
338 * and using atomic_cmpxchg when updating the two counters.
339 */
340int rds_iw_send_grab_credits(struct rds_iw_connection *ic,
341 u32 wanted, u32 *adv_credits, int need_posted, int max_posted)
342{
343 unsigned int avail, posted, got = 0, advertise;
344 long oldval, newval;
345
346 *adv_credits = 0;
347 if (!ic->i_flowctl)
348 return wanted;
349
350try_again:
351 advertise = 0;
352 oldval = newval = atomic_read(&ic->i_credits);
353 posted = IB_GET_POST_CREDITS(oldval);
354 avail = IB_GET_SEND_CREDITS(oldval);
355
356 rdsdebug("wanted=%u credits=%u posted=%u\n",
357 wanted, avail, posted);
358
359 /* The last credit must be used to send a credit update. */
360 if (avail && !posted)
361 avail--;
362
363 if (avail < wanted) {
364 struct rds_connection *conn = ic->i_cm_id->context;
365
366 /* Oops, there aren't that many credits left! */
367 set_bit(RDS_LL_SEND_FULL, &conn->c_flags);
368 got = avail;
369 } else {
370 /* Sometimes you get what you want, lalala. */
371 got = wanted;
372 }
373 newval -= IB_SET_SEND_CREDITS(got);
374
375 /*
376 * If need_posted is non-zero, then the caller wants
377 * the posted regardless of whether any send credits are
378 * available.
379 */
380 if (posted && (got || need_posted)) {
381 advertise = min_t(unsigned int, posted, max_posted);
382 newval -= IB_SET_POST_CREDITS(advertise);
383 }
384
385 /* Finally bill everything */
386 if (atomic_cmpxchg(&ic->i_credits, oldval, newval) != oldval)
387 goto try_again;
388
389 *adv_credits = advertise;
390 return got;
391}
392
393void rds_iw_send_add_credits(struct rds_connection *conn, unsigned int credits)
394{
395 struct rds_iw_connection *ic = conn->c_transport_data;
396
397 if (credits == 0)
398 return;
399
400 rdsdebug("credits=%u current=%u%s\n",
401 credits,
402 IB_GET_SEND_CREDITS(atomic_read(&ic->i_credits)),
403 test_bit(RDS_LL_SEND_FULL, &conn->c_flags) ? ", ll_send_full" : "");
404
405 atomic_add(IB_SET_SEND_CREDITS(credits), &ic->i_credits);
406 if (test_and_clear_bit(RDS_LL_SEND_FULL, &conn->c_flags))
407 queue_delayed_work(rds_wq, &conn->c_send_w, 0);
408
409 WARN_ON(IB_GET_SEND_CREDITS(credits) >= 16384);
410
411 rds_iw_stats_inc(s_iw_rx_credit_updates);
412}
413
414void rds_iw_advertise_credits(struct rds_connection *conn, unsigned int posted)
415{
416 struct rds_iw_connection *ic = conn->c_transport_data;
417
418 if (posted == 0)
419 return;
420
421 atomic_add(IB_SET_POST_CREDITS(posted), &ic->i_credits);
422
423 /* Decide whether to send an update to the peer now.
424 * If we would send a credit update for every single buffer we
425 * post, we would end up with an ACK storm (ACK arrives,
426 * consumes buffer, we refill the ring, send ACK to remote
427 * advertising the newly posted buffer... ad inf)
428 *
429 * Performance pretty much depends on how often we send
430 * credit updates - too frequent updates mean lots of ACKs.
431 * Too infrequent updates, and the peer will run out of
432 * credits and has to throttle.
433 * For the time being, 16 seems to be a good compromise.
434 */
435 if (IB_GET_POST_CREDITS(atomic_read(&ic->i_credits)) >= 16)
436 set_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
437}
438
439static inline void
440rds_iw_xmit_populate_wr(struct rds_iw_connection *ic,
441 struct rds_iw_send_work *send, unsigned int pos,
442 unsigned long buffer, unsigned int length,
443 int send_flags)
444{
445 struct ib_sge *sge;
446
447 WARN_ON(pos != send - ic->i_sends);
448
449 send->s_send_wr.send_flags = send_flags;
450 send->s_send_wr.opcode = IB_WR_SEND;
451 send->s_send_wr.num_sge = 2;
452 send->s_send_wr.next = NULL;
453 send->s_queued = jiffies;
454 send->s_op = NULL;
455
456 if (length != 0) {
457 sge = rds_iw_data_sge(ic, send->s_sge);
458 sge->addr = buffer;
459 sge->length = length;
460 sge->lkey = rds_iw_local_dma_lkey(ic);
461
462 sge = rds_iw_header_sge(ic, send->s_sge);
463 } else {
464 /* We're sending a packet with no payload. There is only
465 * one SGE */
466 send->s_send_wr.num_sge = 1;
467 sge = &send->s_sge[0];
468 }
469
470 sge->addr = ic->i_send_hdrs_dma + (pos * sizeof(struct rds_header));
471 sge->length = sizeof(struct rds_header);
472 sge->lkey = rds_iw_local_dma_lkey(ic);
473}
474
475/*
476 * This can be called multiple times for a given message. The first time
477 * we see a message we map its scatterlist into the IB device so that
478 * we can provide that mapped address to the IB scatter gather entries
479 * in the IB work requests. We translate the scatterlist into a series
480 * of work requests that fragment the message. These work requests complete
481 * in order so we pass ownership of the message to the completion handler
482 * once we send the final fragment.
483 *
484 * The RDS core uses the c_send_lock to only enter this function once
485 * per connection. This makes sure that the tx ring alloc/unalloc pairs
486 * don't get out of sync and confuse the ring.
487 */
488int rds_iw_xmit(struct rds_connection *conn, struct rds_message *rm,
489 unsigned int hdr_off, unsigned int sg, unsigned int off)
490{
491 struct rds_iw_connection *ic = conn->c_transport_data;
492 struct ib_device *dev = ic->i_cm_id->device;
493 struct rds_iw_send_work *send = NULL;
494 struct rds_iw_send_work *first;
495 struct rds_iw_send_work *prev;
496 struct ib_send_wr *failed_wr;
497 struct scatterlist *scat;
498 u32 pos;
499 u32 i;
500 u32 work_alloc;
501 u32 credit_alloc;
502 u32 posted;
503 u32 adv_credits = 0;
504 int send_flags = 0;
505 int sent;
506 int ret;
507 int flow_controlled = 0;
508
509 BUG_ON(off % RDS_FRAG_SIZE);
510 BUG_ON(hdr_off != 0 && hdr_off != sizeof(struct rds_header));
511
512 /* Fastreg support */
513 if (rds_rdma_cookie_key(rm->m_rdma_cookie) && !ic->i_fastreg_posted) {
514 ret = -EAGAIN;
515 goto out;
516 }
517
518 /* FIXME we may overallocate here */
519 if (be32_to_cpu(rm->m_inc.i_hdr.h_len) == 0)
520 i = 1;
521 else
522 i = ceil(be32_to_cpu(rm->m_inc.i_hdr.h_len), RDS_FRAG_SIZE);
523
524 work_alloc = rds_iw_ring_alloc(&ic->i_send_ring, i, &pos);
525 if (work_alloc == 0) {
526 set_bit(RDS_LL_SEND_FULL, &conn->c_flags);
527 rds_iw_stats_inc(s_iw_tx_ring_full);
528 ret = -ENOMEM;
529 goto out;
530 }
531
532 credit_alloc = work_alloc;
533 if (ic->i_flowctl) {
534 credit_alloc = rds_iw_send_grab_credits(ic, work_alloc, &posted, 0, RDS_MAX_ADV_CREDIT);
535 adv_credits += posted;
536 if (credit_alloc < work_alloc) {
537 rds_iw_ring_unalloc(&ic->i_send_ring, work_alloc - credit_alloc);
538 work_alloc = credit_alloc;
539 flow_controlled++;
540 }
541 if (work_alloc == 0) {
542 set_bit(RDS_LL_SEND_FULL, &conn->c_flags);
543 rds_iw_stats_inc(s_iw_tx_throttle);
544 ret = -ENOMEM;
545 goto out;
546 }
547 }
548
549 /* map the message the first time we see it */
550 if (!ic->i_rm) {
551 /*
552 printk(KERN_NOTICE "rds_iw_xmit prep msg dport=%u flags=0x%x len=%d\n",
553 be16_to_cpu(rm->m_inc.i_hdr.h_dport),
554 rm->m_inc.i_hdr.h_flags,
555 be32_to_cpu(rm->m_inc.i_hdr.h_len));
556 */
557 if (rm->data.op_nents) {
558 rm->data.op_count = ib_dma_map_sg(dev,
559 rm->data.op_sg,
560 rm->data.op_nents,
561 DMA_TO_DEVICE);
562 rdsdebug("ic %p mapping rm %p: %d\n", ic, rm, rm->data.op_count);
563 if (rm->data.op_count == 0) {
564 rds_iw_stats_inc(s_iw_tx_sg_mapping_failure);
565 rds_iw_ring_unalloc(&ic->i_send_ring, work_alloc);
566 ret = -ENOMEM; /* XXX ? */
567 goto out;
568 }
569 } else {
570 rm->data.op_count = 0;
571 }
572
573 ic->i_unsignaled_wrs = rds_iw_sysctl_max_unsig_wrs;
574 ic->i_unsignaled_bytes = rds_iw_sysctl_max_unsig_bytes;
575 rds_message_addref(rm);
576 rm->data.op_dmasg = 0;
577 rm->data.op_dmaoff = 0;
578 ic->i_rm = rm;
579
580 /* Finalize the header */
581 if (test_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags))
582 rm->m_inc.i_hdr.h_flags |= RDS_FLAG_ACK_REQUIRED;
583 if (test_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags))
584 rm->m_inc.i_hdr.h_flags |= RDS_FLAG_RETRANSMITTED;
585
586 /* If it has a RDMA op, tell the peer we did it. This is
587 * used by the peer to release use-once RDMA MRs. */
588 if (rm->rdma.op_active) {
589 struct rds_ext_header_rdma ext_hdr;
590
591 ext_hdr.h_rdma_rkey = cpu_to_be32(rm->rdma.op_rkey);
592 rds_message_add_extension(&rm->m_inc.i_hdr,
593 RDS_EXTHDR_RDMA, &ext_hdr, sizeof(ext_hdr));
594 }
595 if (rm->m_rdma_cookie) {
596 rds_message_add_rdma_dest_extension(&rm->m_inc.i_hdr,
597 rds_rdma_cookie_key(rm->m_rdma_cookie),
598 rds_rdma_cookie_offset(rm->m_rdma_cookie));
599 }
600
601 /* Note - rds_iw_piggyb_ack clears the ACK_REQUIRED bit, so
602 * we should not do this unless we have a chance of at least
603 * sticking the header into the send ring. Which is why we
604 * should call rds_iw_ring_alloc first. */
605 rm->m_inc.i_hdr.h_ack = cpu_to_be64(rds_iw_piggyb_ack(ic));
606 rds_message_make_checksum(&rm->m_inc.i_hdr);
607
608 /*
609 * Update adv_credits since we reset the ACK_REQUIRED bit.
610 */
611 rds_iw_send_grab_credits(ic, 0, &posted, 1, RDS_MAX_ADV_CREDIT - adv_credits);
612 adv_credits += posted;
613 BUG_ON(adv_credits > 255);
614 }
615
616 send = &ic->i_sends[pos];
617 first = send;
618 prev = NULL;
619 scat = &rm->data.op_sg[rm->data.op_dmasg];
620 sent = 0;
621 i = 0;
622
623 /* Sometimes you want to put a fence between an RDMA
624 * READ and the following SEND.
625 * We could either do this all the time
626 * or when requested by the user. Right now, we let
627 * the application choose.
628 */
629 if (rm->rdma.op_active && rm->rdma.op_fence)
630 send_flags = IB_SEND_FENCE;
631
632 /*
633 * We could be copying the header into the unused tail of the page.
634 * That would need to be changed in the future when those pages might
635 * be mapped userspace pages or page cache pages. So instead we always
636 * use a second sge and our long-lived ring of mapped headers. We send
637 * the header after the data so that the data payload can be aligned on
638 * the receiver.
639 */
640
641 /* handle a 0-len message */
642 if (be32_to_cpu(rm->m_inc.i_hdr.h_len) == 0) {
643 rds_iw_xmit_populate_wr(ic, send, pos, 0, 0, send_flags);
644 goto add_header;
645 }
646
647 /* if there's data reference it with a chain of work reqs */
648 for (; i < work_alloc && scat != &rm->data.op_sg[rm->data.op_count]; i++) {
649 unsigned int len;
650
651 send = &ic->i_sends[pos];
652
653 len = min(RDS_FRAG_SIZE,
654 ib_sg_dma_len(dev, scat) - rm->data.op_dmaoff);
655 rds_iw_xmit_populate_wr(ic, send, pos,
656 ib_sg_dma_address(dev, scat) + rm->data.op_dmaoff, len,
657 send_flags);
658
659 /*
660 * We want to delay signaling completions just enough to get
661 * the batching benefits but not so much that we create dead time
662 * on the wire.
663 */
664 if (ic->i_unsignaled_wrs-- == 0) {
665 ic->i_unsignaled_wrs = rds_iw_sysctl_max_unsig_wrs;
666 send->s_send_wr.send_flags |= IB_SEND_SIGNALED | IB_SEND_SOLICITED;
667 }
668
669 ic->i_unsignaled_bytes -= len;
670 if (ic->i_unsignaled_bytes <= 0) {
671 ic->i_unsignaled_bytes = rds_iw_sysctl_max_unsig_bytes;
672 send->s_send_wr.send_flags |= IB_SEND_SIGNALED | IB_SEND_SOLICITED;
673 }
674
675 /*
676 * Always signal the last one if we're stopping due to flow control.
677 */
678 if (flow_controlled && i == (work_alloc-1))
679 send->s_send_wr.send_flags |= IB_SEND_SIGNALED | IB_SEND_SOLICITED;
680
681 rdsdebug("send %p wr %p num_sge %u next %p\n", send,
682 &send->s_send_wr, send->s_send_wr.num_sge, send->s_send_wr.next);
683
684 sent += len;
685 rm->data.op_dmaoff += len;
686 if (rm->data.op_dmaoff == ib_sg_dma_len(dev, scat)) {
687 scat++;
688 rm->data.op_dmaoff = 0;
689 rm->data.op_dmasg++;
690 }
691
692add_header:
693 /* Tack on the header after the data. The header SGE should already
694 * have been set up to point to the right header buffer. */
695 memcpy(&ic->i_send_hdrs[pos], &rm->m_inc.i_hdr, sizeof(struct rds_header));
696
697 if (0) {
698 struct rds_header *hdr = &ic->i_send_hdrs[pos];
699
700 printk(KERN_NOTICE "send WR dport=%u flags=0x%x len=%d\n",
701 be16_to_cpu(hdr->h_dport),
702 hdr->h_flags,
703 be32_to_cpu(hdr->h_len));
704 }
705 if (adv_credits) {
706 struct rds_header *hdr = &ic->i_send_hdrs[pos];
707
708 /* add credit and redo the header checksum */
709 hdr->h_credit = adv_credits;
710 rds_message_make_checksum(hdr);
711 adv_credits = 0;
712 rds_iw_stats_inc(s_iw_tx_credit_updates);
713 }
714
715 if (prev)
716 prev->s_send_wr.next = &send->s_send_wr;
717 prev = send;
718
719 pos = (pos + 1) % ic->i_send_ring.w_nr;
720 }
721
722 /* Account the RDS header in the number of bytes we sent, but just once.
723 * The caller has no concept of fragmentation. */
724 if (hdr_off == 0)
725 sent += sizeof(struct rds_header);
726
727 /* if we finished the message then send completion owns it */
728 if (scat == &rm->data.op_sg[rm->data.op_count]) {
729 prev->s_rm = ic->i_rm;
730 prev->s_send_wr.send_flags |= IB_SEND_SIGNALED | IB_SEND_SOLICITED;
731 ic->i_rm = NULL;
732 }
733
734 if (i < work_alloc) {
735 rds_iw_ring_unalloc(&ic->i_send_ring, work_alloc - i);
736 work_alloc = i;
737 }
738 if (ic->i_flowctl && i < credit_alloc)
739 rds_iw_send_add_credits(conn, credit_alloc - i);
740
741 /* XXX need to worry about failed_wr and partial sends. */
742 failed_wr = &first->s_send_wr;
743 ret = ib_post_send(ic->i_cm_id->qp, &first->s_send_wr, &failed_wr);
744 rdsdebug("ic %p first %p (wr %p) ret %d wr %p\n", ic,
745 first, &first->s_send_wr, ret, failed_wr);
746 BUG_ON(failed_wr != &first->s_send_wr);
747 if (ret) {
748 printk(KERN_WARNING "RDS/IW: ib_post_send to %pI4 "
749 "returned %d\n", &conn->c_faddr, ret);
750 rds_iw_ring_unalloc(&ic->i_send_ring, work_alloc);
751 if (prev->s_rm) {
752 ic->i_rm = prev->s_rm;
753 prev->s_rm = NULL;
754 }
755 goto out;
756 }
757
758 ret = sent;
759out:
760 BUG_ON(adv_credits);
761 return ret;
762}
763
764static int rds_iw_build_send_reg(struct rds_iw_send_work *send,
765 struct scatterlist *sg,
766 int sg_nents)
767{
768 int n;
769
770 n = ib_map_mr_sg(send->s_mr, sg, sg_nents, PAGE_SIZE);
771 if (unlikely(n != sg_nents))
772 return n < 0 ? n : -EINVAL;
773
774 send->s_reg_wr.wr.opcode = IB_WR_REG_MR;
775 send->s_reg_wr.wr.wr_id = 0;
776 send->s_reg_wr.wr.num_sge = 0;
777 send->s_reg_wr.mr = send->s_mr;
778 send->s_reg_wr.key = send->s_mr->rkey;
779 send->s_reg_wr.access = IB_ACCESS_REMOTE_WRITE;
780
781 ib_update_fast_reg_key(send->s_mr, send->s_remap_count++);
782
783 return 0;
784}
785
786int rds_iw_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op)
787{
788 struct rds_iw_connection *ic = conn->c_transport_data;
789 struct rds_iw_send_work *send = NULL;
790 struct rds_iw_send_work *first;
791 struct rds_iw_send_work *prev;
792 struct ib_send_wr *failed_wr;
793 struct rds_iw_device *rds_iwdev;
794 struct scatterlist *scat;
795 unsigned long len;
796 u64 remote_addr = op->op_remote_addr;
797 u32 pos, fr_pos;
798 u32 work_alloc;
799 u32 i;
800 u32 j;
801 int sent;
802 int ret;
803 int num_sge;
804 int sg_nents;
805
806 rds_iwdev = ib_get_client_data(ic->i_cm_id->device, &rds_iw_client);
807
808 /* map the message the first time we see it */
809 if (!op->op_mapped) {
810 op->op_count = ib_dma_map_sg(ic->i_cm_id->device,
811 op->op_sg, op->op_nents, (op->op_write) ?
812 DMA_TO_DEVICE : DMA_FROM_DEVICE);
813 rdsdebug("ic %p mapping op %p: %d\n", ic, op, op->op_count);
814 if (op->op_count == 0) {
815 rds_iw_stats_inc(s_iw_tx_sg_mapping_failure);
816 ret = -ENOMEM; /* XXX ? */
817 goto out;
818 }
819
820 op->op_mapped = 1;
821 }
822
823 if (!op->op_write) {
824 /* Alloc space on the send queue for the fastreg */
825 work_alloc = rds_iw_ring_alloc(&ic->i_send_ring, 1, &fr_pos);
826 if (work_alloc != 1) {
827 rds_iw_ring_unalloc(&ic->i_send_ring, work_alloc);
828 rds_iw_stats_inc(s_iw_tx_ring_full);
829 ret = -ENOMEM;
830 goto out;
831 }
832 }
833
834 /*
835 * Instead of knowing how to return a partial rdma read/write we insist that there
836 * be enough work requests to send the entire message.
837 */
838 i = ceil(op->op_count, rds_iwdev->max_sge);
839
840 work_alloc = rds_iw_ring_alloc(&ic->i_send_ring, i, &pos);
841 if (work_alloc != i) {
842 rds_iw_ring_unalloc(&ic->i_send_ring, work_alloc);
843 rds_iw_stats_inc(s_iw_tx_ring_full);
844 ret = -ENOMEM;
845 goto out;
846 }
847
848 send = &ic->i_sends[pos];
849 if (!op->op_write) {
850 first = prev = &ic->i_sends[fr_pos];
851 } else {
852 first = send;
853 prev = NULL;
854 }
855 scat = &op->op_sg[0];
856 sent = 0;
857 num_sge = op->op_count;
858 sg_nents = 0;
859
860 for (i = 0; i < work_alloc && scat != &op->op_sg[op->op_count]; i++) {
861 send->s_rdma_wr.wr.send_flags = 0;
862 send->s_queued = jiffies;
863
864 /*
865 * We want to delay signaling completions just enough to get
866 * the batching benefits but not so much that we create dead time on the wire.
867 */
868 if (ic->i_unsignaled_wrs-- == 0) {
869 ic->i_unsignaled_wrs = rds_iw_sysctl_max_unsig_wrs;
870 send->s_rdma_wr.wr.send_flags = IB_SEND_SIGNALED;
871 }
872
873 /* To avoid the need to have the plumbing to invalidate the fastreg_mr used
874 * for local access after RDS is finished with it, using
875 * IB_WR_RDMA_READ_WITH_INV will invalidate it after the read has completed.
876 */
877 if (op->op_write)
878 send->s_rdma_wr.wr.opcode = IB_WR_RDMA_WRITE;
879 else
880 send->s_rdma_wr.wr.opcode = IB_WR_RDMA_READ_WITH_INV;
881
882 send->s_rdma_wr.remote_addr = remote_addr;
883 send->s_rdma_wr.rkey = op->op_rkey;
884 send->s_op = op;
885
886 if (num_sge > rds_iwdev->max_sge) {
887 send->s_rdma_wr.wr.num_sge = rds_iwdev->max_sge;
888 num_sge -= rds_iwdev->max_sge;
889 } else
890 send->s_rdma_wr.wr.num_sge = num_sge;
891
892 send->s_rdma_wr.wr.next = NULL;
893
894 if (prev)
895 prev->s_send_wr.next = &send->s_rdma_wr.wr;
896
897 for (j = 0; j < send->s_rdma_wr.wr.num_sge &&
898 scat != &op->op_sg[op->op_count]; j++) {
899 len = ib_sg_dma_len(ic->i_cm_id->device, scat);
900
901 if (send->s_rdma_wr.wr.opcode == IB_WR_RDMA_READ_WITH_INV)
902 sg_nents++;
903 else {
904 send->s_sge[j].addr = ib_sg_dma_address(ic->i_cm_id->device, scat);
905 send->s_sge[j].length = len;
906 send->s_sge[j].lkey = rds_iw_local_dma_lkey(ic);
907 }
908
909 sent += len;
910 rdsdebug("ic %p sent %d remote_addr %llu\n", ic, sent, remote_addr);
911 remote_addr += len;
912
913 scat++;
914 }
915
916 if (send->s_rdma_wr.wr.opcode == IB_WR_RDMA_READ_WITH_INV) {
917 send->s_rdma_wr.wr.num_sge = 1;
918 send->s_sge[0].addr = conn->c_xmit_rm->m_rs->rs_user_addr;
919 send->s_sge[0].length = conn->c_xmit_rm->m_rs->rs_user_bytes;
920 send->s_sge[0].lkey = ic->i_sends[fr_pos].s_mr->lkey;
921 }
922
923 rdsdebug("send %p wr %p num_sge %u next %p\n", send,
924 &send->s_rdma_wr,
925 send->s_rdma_wr.wr.num_sge,
926 send->s_rdma_wr.wr.next);
927
928 prev = send;
929 if (++send == &ic->i_sends[ic->i_send_ring.w_nr])
930 send = ic->i_sends;
931 }
932
933 /* if we finished the message then send completion owns it */
934 if (scat == &op->op_sg[op->op_count])
935 first->s_rdma_wr.wr.send_flags = IB_SEND_SIGNALED;
936
937 if (i < work_alloc) {
938 rds_iw_ring_unalloc(&ic->i_send_ring, work_alloc - i);
939 work_alloc = i;
940 }
941
942 /* On iWARP, local memory access by a remote system (ie, RDMA Read) is not
943 * recommended. Putting the lkey on the wire is a security hole, as it can
944 * allow for memory access to all of memory on the remote system. Some
945 * adapters do not allow using the lkey for this at all. To bypass this use a
946 * fastreg_mr (or possibly a dma_mr)
947 */
948 if (!op->op_write) {
949 ret = rds_iw_build_send_reg(&ic->i_sends[fr_pos],
950 &op->op_sg[0], sg_nents);
951 if (ret) {
952 printk(KERN_WARNING "RDS/IW: failed to reg send mem\n");
953 goto out;
954 }
955 work_alloc++;
956 }
957
958 failed_wr = &first->s_rdma_wr.wr;
959 ret = ib_post_send(ic->i_cm_id->qp, &first->s_rdma_wr.wr, &failed_wr);
960 rdsdebug("ic %p first %p (wr %p) ret %d wr %p\n", ic,
961 first, &first->s_rdma_wr, ret, failed_wr);
962 BUG_ON(failed_wr != &first->s_rdma_wr.wr);
963 if (ret) {
964 printk(KERN_WARNING "RDS/IW: rdma ib_post_send to %pI4 "
965 "returned %d\n", &conn->c_faddr, ret);
966 rds_iw_ring_unalloc(&ic->i_send_ring, work_alloc);
967 goto out;
968 }
969
970out:
971 return ret;
972}
973
974void rds_iw_xmit_complete(struct rds_connection *conn)
975{
976 struct rds_iw_connection *ic = conn->c_transport_data;
977
978 /* We may have a pending ACK or window update we were unable
979 * to send previously (due to flow control). Try again. */
980 rds_iw_attempt_ack(ic);
981}
diff --git a/net/rds/iw_stats.c b/net/rds/iw_stats.c
deleted file mode 100644
index 5fe67f6a1d80..000000000000
--- a/net/rds/iw_stats.c
+++ /dev/null
@@ -1,95 +0,0 @@
1/*
2 * Copyright (c) 2006 Oracle. All rights reserved.
3 *
4 * This software is available to you under a choice of one of two
5 * licenses. You may choose to be licensed under the terms of the GNU
6 * General Public License (GPL) Version 2, available from the file
7 * COPYING in the main directory of this source tree, or the
8 * OpenIB.org BSD license below:
9 *
10 * Redistribution and use in source and binary forms, with or
11 * without modification, are permitted provided that the following
12 * conditions are met:
13 *
14 * - Redistributions of source code must retain the above
15 * copyright notice, this list of conditions and the following
16 * disclaimer.
17 *
18 * - Redistributions in binary form must reproduce the above
19 * copyright notice, this list of conditions and the following
20 * disclaimer in the documentation and/or other materials
21 * provided with the distribution.
22 *
23 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
24 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
25 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
26 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
27 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
28 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
29 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
30 * SOFTWARE.
31 *
32 */
33#include <linux/percpu.h>
34#include <linux/seq_file.h>
35#include <linux/proc_fs.h>
36
37#include "rds.h"
38#include "iw.h"
39
40DEFINE_PER_CPU_SHARED_ALIGNED(struct rds_iw_statistics, rds_iw_stats);
41
42static const char *const rds_iw_stat_names[] = {
43 "iw_connect_raced",
44 "iw_listen_closed_stale",
45 "iw_tx_cq_call",
46 "iw_tx_cq_event",
47 "iw_tx_ring_full",
48 "iw_tx_throttle",
49 "iw_tx_sg_mapping_failure",
50 "iw_tx_stalled",
51 "iw_tx_credit_updates",
52 "iw_rx_cq_call",
53 "iw_rx_cq_event",
54 "iw_rx_ring_empty",
55 "iw_rx_refill_from_cq",
56 "iw_rx_refill_from_thread",
57 "iw_rx_alloc_limit",
58 "iw_rx_credit_updates",
59 "iw_ack_sent",
60 "iw_ack_send_failure",
61 "iw_ack_send_delayed",
62 "iw_ack_send_piggybacked",
63 "iw_ack_received",
64 "iw_rdma_mr_alloc",
65 "iw_rdma_mr_free",
66 "iw_rdma_mr_used",
67 "iw_rdma_mr_pool_flush",
68 "iw_rdma_mr_pool_wait",
69 "iw_rdma_mr_pool_depleted",
70};
71
72unsigned int rds_iw_stats_info_copy(struct rds_info_iterator *iter,
73 unsigned int avail)
74{
75 struct rds_iw_statistics stats = {0, };
76 uint64_t *src;
77 uint64_t *sum;
78 size_t i;
79 int cpu;
80
81 if (avail < ARRAY_SIZE(rds_iw_stat_names))
82 goto out;
83
84 for_each_online_cpu(cpu) {
85 src = (uint64_t *)&(per_cpu(rds_iw_stats, cpu));
86 sum = (uint64_t *)&stats;
87 for (i = 0; i < sizeof(stats) / sizeof(uint64_t); i++)
88 *(sum++) += *(src++);
89 }
90
91 rds_stats_info_copy(iter, (uint64_t *)&stats, rds_iw_stat_names,
92 ARRAY_SIZE(rds_iw_stat_names));
93out:
94 return ARRAY_SIZE(rds_iw_stat_names);
95}
diff --git a/net/rds/iw_sysctl.c b/net/rds/iw_sysctl.c
deleted file mode 100644
index 139239d2cb22..000000000000
--- a/net/rds/iw_sysctl.c
+++ /dev/null
@@ -1,123 +0,0 @@
1/*
2 * Copyright (c) 2006 Oracle. All rights reserved.
3 *
4 * This software is available to you under a choice of one of two
5 * licenses. You may choose to be licensed under the terms of the GNU
6 * General Public License (GPL) Version 2, available from the file
7 * COPYING in the main directory of this source tree, or the
8 * OpenIB.org BSD license below:
9 *
10 * Redistribution and use in source and binary forms, with or
11 * without modification, are permitted provided that the following
12 * conditions are met:
13 *
14 * - Redistributions of source code must retain the above
15 * copyright notice, this list of conditions and the following
16 * disclaimer.
17 *
18 * - Redistributions in binary form must reproduce the above
19 * copyright notice, this list of conditions and the following
20 * disclaimer in the documentation and/or other materials
21 * provided with the distribution.
22 *
23 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
24 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
25 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
26 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
27 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
28 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
29 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
30 * SOFTWARE.
31 *
32 */
33#include <linux/kernel.h>
34#include <linux/sysctl.h>
35#include <linux/proc_fs.h>
36
37#include "iw.h"
38
39static struct ctl_table_header *rds_iw_sysctl_hdr;
40
41unsigned long rds_iw_sysctl_max_send_wr = RDS_IW_DEFAULT_SEND_WR;
42unsigned long rds_iw_sysctl_max_recv_wr = RDS_IW_DEFAULT_RECV_WR;
43unsigned long rds_iw_sysctl_max_recv_allocation = (128 * 1024 * 1024) / RDS_FRAG_SIZE;
44static unsigned long rds_iw_sysctl_max_wr_min = 1;
45/* hardware will fail CQ creation long before this */
46static unsigned long rds_iw_sysctl_max_wr_max = (u32)~0;
47
48unsigned long rds_iw_sysctl_max_unsig_wrs = 16;
49static unsigned long rds_iw_sysctl_max_unsig_wr_min = 1;
50static unsigned long rds_iw_sysctl_max_unsig_wr_max = 64;
51
52unsigned long rds_iw_sysctl_max_unsig_bytes = (16 << 20);
53static unsigned long rds_iw_sysctl_max_unsig_bytes_min = 1;
54static unsigned long rds_iw_sysctl_max_unsig_bytes_max = ~0UL;
55
56unsigned int rds_iw_sysctl_flow_control = 1;
57
58static struct ctl_table rds_iw_sysctl_table[] = {
59 {
60 .procname = "max_send_wr",
61 .data = &rds_iw_sysctl_max_send_wr,
62 .maxlen = sizeof(unsigned long),
63 .mode = 0644,
64 .proc_handler = proc_doulongvec_minmax,
65 .extra1 = &rds_iw_sysctl_max_wr_min,
66 .extra2 = &rds_iw_sysctl_max_wr_max,
67 },
68 {
69 .procname = "max_recv_wr",
70 .data = &rds_iw_sysctl_max_recv_wr,
71 .maxlen = sizeof(unsigned long),
72 .mode = 0644,
73 .proc_handler = proc_doulongvec_minmax,
74 .extra1 = &rds_iw_sysctl_max_wr_min,
75 .extra2 = &rds_iw_sysctl_max_wr_max,
76 },
77 {
78 .procname = "max_unsignaled_wr",
79 .data = &rds_iw_sysctl_max_unsig_wrs,
80 .maxlen = sizeof(unsigned long),
81 .mode = 0644,
82 .proc_handler = proc_doulongvec_minmax,
83 .extra1 = &rds_iw_sysctl_max_unsig_wr_min,
84 .extra2 = &rds_iw_sysctl_max_unsig_wr_max,
85 },
86 {
87 .procname = "max_unsignaled_bytes",
88 .data = &rds_iw_sysctl_max_unsig_bytes,
89 .maxlen = sizeof(unsigned long),
90 .mode = 0644,
91 .proc_handler = proc_doulongvec_minmax,
92 .extra1 = &rds_iw_sysctl_max_unsig_bytes_min,
93 .extra2 = &rds_iw_sysctl_max_unsig_bytes_max,
94 },
95 {
96 .procname = "max_recv_allocation",
97 .data = &rds_iw_sysctl_max_recv_allocation,
98 .maxlen = sizeof(unsigned long),
99 .mode = 0644,
100 .proc_handler = proc_doulongvec_minmax,
101 },
102 {
103 .procname = "flow_control",
104 .data = &rds_iw_sysctl_flow_control,
105 .maxlen = sizeof(rds_iw_sysctl_flow_control),
106 .mode = 0644,
107 .proc_handler = proc_dointvec,
108 },
109 { }
110};
111
112void rds_iw_sysctl_exit(void)
113{
114 unregister_net_sysctl_table(rds_iw_sysctl_hdr);
115}
116
117int rds_iw_sysctl_init(void)
118{
119 rds_iw_sysctl_hdr = register_net_sysctl(&init_net, "net/rds/iw", rds_iw_sysctl_table);
120 if (!rds_iw_sysctl_hdr)
121 return -ENOMEM;
122 return 0;
123}
diff --git a/net/rds/rdma_transport.c b/net/rds/rdma_transport.c
index 9c1fed81bf0f..4f4b3d88319d 100644
--- a/net/rds/rdma_transport.c
+++ b/net/rds/rdma_transport.c
@@ -49,9 +49,7 @@ int rds_rdma_cm_event_handler(struct rdma_cm_id *cm_id,
49 rdsdebug("conn %p id %p handling event %u (%s)\n", conn, cm_id, 49 rdsdebug("conn %p id %p handling event %u (%s)\n", conn, cm_id,
50 event->event, rdma_event_msg(event->event)); 50 event->event, rdma_event_msg(event->event));
51 51
52 if (cm_id->device->node_type == RDMA_NODE_RNIC) 52 if (cm_id->device->node_type == RDMA_NODE_IB_CA)
53 trans = &rds_iw_transport;
54 else
55 trans = &rds_ib_transport; 53 trans = &rds_ib_transport;
56 54
57 /* Prevent shutdown from tearing down the connection 55 /* Prevent shutdown from tearing down the connection
@@ -200,10 +198,6 @@ static int rds_rdma_init(void)
200 if (ret) 198 if (ret)
201 goto out; 199 goto out;
202 200
203 ret = rds_iw_init();
204 if (ret)
205 goto err_iw_init;
206
207 ret = rds_ib_init(); 201 ret = rds_ib_init();
208 if (ret) 202 if (ret)
209 goto err_ib_init; 203 goto err_ib_init;
@@ -211,8 +205,6 @@ static int rds_rdma_init(void)
211 goto out; 205 goto out;
212 206
213err_ib_init: 207err_ib_init:
214 rds_iw_exit();
215err_iw_init:
216 rds_rdma_listen_stop(); 208 rds_rdma_listen_stop();
217out: 209out:
218 return ret; 210 return ret;
@@ -224,11 +216,10 @@ static void rds_rdma_exit(void)
224 /* stop listening first to ensure no new connections are attempted */ 216 /* stop listening first to ensure no new connections are attempted */
225 rds_rdma_listen_stop(); 217 rds_rdma_listen_stop();
226 rds_ib_exit(); 218 rds_ib_exit();
227 rds_iw_exit();
228} 219}
229module_exit(rds_rdma_exit); 220module_exit(rds_rdma_exit);
230 221
231MODULE_AUTHOR("Oracle Corporation <rds-devel@oss.oracle.com>"); 222MODULE_AUTHOR("Oracle Corporation <rds-devel@oss.oracle.com>");
232MODULE_DESCRIPTION("RDS: IB/iWARP transport"); 223MODULE_DESCRIPTION("RDS: IB transport");
233MODULE_LICENSE("Dual BSD/GPL"); 224MODULE_LICENSE("Dual BSD/GPL");
234 225
diff --git a/net/rds/rdma_transport.h b/net/rds/rdma_transport.h
index faba4e382695..ff2010e9d20c 100644
--- a/net/rds/rdma_transport.h
+++ b/net/rds/rdma_transport.h
@@ -16,9 +16,4 @@ extern struct rds_transport rds_ib_transport;
16int rds_ib_init(void); 16int rds_ib_init(void);
17void rds_ib_exit(void); 17void rds_ib_exit(void);
18 18
19/* from iw.c */
20extern struct rds_transport rds_iw_transport;
21int rds_iw_init(void);
22void rds_iw_exit(void);
23
24#endif 19#endif