aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--net/rds/iw.c333
-rw-r--r--net/rds/iw.h395
-rw-r--r--net/rds/iw_cm.c750
-rw-r--r--net/rds/iw_rdma.c888
-rw-r--r--net/rds/iw_recv.c869
-rw-r--r--net/rds/iw_ring.c169
-rw-r--r--net/rds/iw_send.c975
-rw-r--r--net/rds/iw_stats.c95
-rw-r--r--net/rds/iw_sysctl.c137
9 files changed, 4611 insertions, 0 deletions
diff --git a/net/rds/iw.c b/net/rds/iw.c
new file mode 100644
index 000000000000..1b56905c4c08
--- /dev/null
+++ b/net/rds/iw.c
@@ -0,0 +1,333 @@
1/*
2 * Copyright (c) 2006 Oracle. All rights reserved.
3 *
4 * This software is available to you under a choice of one of two
5 * licenses. You may choose to be licensed under the terms of the GNU
6 * General Public License (GPL) Version 2, available from the file
7 * COPYING in the main directory of this source tree, or the
8 * OpenIB.org BSD license below:
9 *
10 * Redistribution and use in source and binary forms, with or
11 * without modification, are permitted provided that the following
12 * conditions are met:
13 *
14 * - Redistributions of source code must retain the above
15 * copyright notice, this list of conditions and the following
16 * disclaimer.
17 *
18 * - Redistributions in binary form must reproduce the above
19 * copyright notice, this list of conditions and the following
20 * disclaimer in the documentation and/or other materials
21 * provided with the distribution.
22 *
23 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
24 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
25 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
26 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
27 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
28 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
29 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
30 * SOFTWARE.
31 *
32 */
33#include <linux/kernel.h>
34#include <linux/in.h>
35#include <linux/if.h>
36#include <linux/netdevice.h>
37#include <linux/inetdevice.h>
38#include <linux/if_arp.h>
39#include <linux/delay.h>
40
41#include "rds.h"
42#include "iw.h"
43
44unsigned int fastreg_pool_size = RDS_FASTREG_POOL_SIZE;
45unsigned int fastreg_message_size = RDS_FASTREG_SIZE + 1; /* +1 allows for unaligned MRs */
46
47module_param(fastreg_pool_size, int, 0444);
48MODULE_PARM_DESC(fastreg_pool_size, " Max number of fastreg MRs per device");
49module_param(fastreg_message_size, int, 0444);
50MODULE_PARM_DESC(fastreg_message_size, " Max size of a RDMA transfer (fastreg MRs)");
51
52struct list_head rds_iw_devices;
53
54DEFINE_SPINLOCK(iw_nodev_conns_lock);
55LIST_HEAD(iw_nodev_conns);
56
57void rds_iw_add_one(struct ib_device *device)
58{
59 struct rds_iw_device *rds_iwdev;
60 struct ib_device_attr *dev_attr;
61
62 /* Only handle iwarp devices */
63 if (device->node_type != RDMA_NODE_RNIC)
64 return;
65
66 dev_attr = kmalloc(sizeof *dev_attr, GFP_KERNEL);
67 if (!dev_attr)
68 return;
69
70 if (ib_query_device(device, dev_attr)) {
71 rdsdebug("Query device failed for %s\n", device->name);
72 goto free_attr;
73 }
74
75 rds_iwdev = kmalloc(sizeof *rds_iwdev, GFP_KERNEL);
76 if (!rds_iwdev)
77 goto free_attr;
78
79 spin_lock_init(&rds_iwdev->spinlock);
80
81 rds_iwdev->dma_local_lkey = !!(dev_attr->device_cap_flags & IB_DEVICE_LOCAL_DMA_LKEY);
82 rds_iwdev->max_wrs = dev_attr->max_qp_wr;
83 rds_iwdev->max_sge = min(dev_attr->max_sge, RDS_IW_MAX_SGE);
84
85 rds_iwdev->page_shift = max(PAGE_SHIFT, ffs(dev_attr->page_size_cap) - 1);
86
87 rds_iwdev->dev = device;
88 rds_iwdev->pd = ib_alloc_pd(device);
89 if (IS_ERR(rds_iwdev->pd))
90 goto free_dev;
91
92 if (!rds_iwdev->dma_local_lkey) {
93 if (device->node_type != RDMA_NODE_RNIC) {
94 rds_iwdev->mr = ib_get_dma_mr(rds_iwdev->pd,
95 IB_ACCESS_LOCAL_WRITE);
96 } else {
97 rds_iwdev->mr = ib_get_dma_mr(rds_iwdev->pd,
98 IB_ACCESS_REMOTE_READ |
99 IB_ACCESS_REMOTE_WRITE |
100 IB_ACCESS_LOCAL_WRITE);
101 }
102 if (IS_ERR(rds_iwdev->mr))
103 goto err_pd;
104 } else
105 rds_iwdev->mr = NULL;
106
107 rds_iwdev->mr_pool = rds_iw_create_mr_pool(rds_iwdev);
108 if (IS_ERR(rds_iwdev->mr_pool)) {
109 rds_iwdev->mr_pool = NULL;
110 goto err_mr;
111 }
112
113 INIT_LIST_HEAD(&rds_iwdev->cm_id_list);
114 INIT_LIST_HEAD(&rds_iwdev->conn_list);
115 list_add_tail(&rds_iwdev->list, &rds_iw_devices);
116
117 ib_set_client_data(device, &rds_iw_client, rds_iwdev);
118
119 goto free_attr;
120
121err_mr:
122 if (rds_iwdev->mr)
123 ib_dereg_mr(rds_iwdev->mr);
124err_pd:
125 ib_dealloc_pd(rds_iwdev->pd);
126free_dev:
127 kfree(rds_iwdev);
128free_attr:
129 kfree(dev_attr);
130}
131
132void rds_iw_remove_one(struct ib_device *device)
133{
134 struct rds_iw_device *rds_iwdev;
135 struct rds_iw_cm_id *i_cm_id, *next;
136
137 rds_iwdev = ib_get_client_data(device, &rds_iw_client);
138 if (!rds_iwdev)
139 return;
140
141 spin_lock_irq(&rds_iwdev->spinlock);
142 list_for_each_entry_safe(i_cm_id, next, &rds_iwdev->cm_id_list, list) {
143 list_del(&i_cm_id->list);
144 kfree(i_cm_id);
145 }
146 spin_unlock_irq(&rds_iwdev->spinlock);
147
148 rds_iw_remove_conns(rds_iwdev);
149
150 if (rds_iwdev->mr_pool)
151 rds_iw_destroy_mr_pool(rds_iwdev->mr_pool);
152
153 if (rds_iwdev->mr)
154 ib_dereg_mr(rds_iwdev->mr);
155
156 while (ib_dealloc_pd(rds_iwdev->pd)) {
157 rdsdebug("Failed to dealloc pd %p\n", rds_iwdev->pd);
158 msleep(1);
159 }
160
161 list_del(&rds_iwdev->list);
162 kfree(rds_iwdev);
163}
164
165struct ib_client rds_iw_client = {
166 .name = "rds_iw",
167 .add = rds_iw_add_one,
168 .remove = rds_iw_remove_one
169};
170
171static int rds_iw_conn_info_visitor(struct rds_connection *conn,
172 void *buffer)
173{
174 struct rds_info_rdma_connection *iinfo = buffer;
175 struct rds_iw_connection *ic;
176
177 /* We will only ever look at IB transports */
178 if (conn->c_trans != &rds_iw_transport)
179 return 0;
180
181 iinfo->src_addr = conn->c_laddr;
182 iinfo->dst_addr = conn->c_faddr;
183
184 memset(&iinfo->src_gid, 0, sizeof(iinfo->src_gid));
185 memset(&iinfo->dst_gid, 0, sizeof(iinfo->dst_gid));
186 if (rds_conn_state(conn) == RDS_CONN_UP) {
187 struct rds_iw_device *rds_iwdev;
188 struct rdma_dev_addr *dev_addr;
189
190 ic = conn->c_transport_data;
191 dev_addr = &ic->i_cm_id->route.addr.dev_addr;
192
193 ib_addr_get_sgid(dev_addr, (union ib_gid *) &iinfo->src_gid);
194 ib_addr_get_dgid(dev_addr, (union ib_gid *) &iinfo->dst_gid);
195
196 rds_iwdev = ib_get_client_data(ic->i_cm_id->device, &rds_iw_client);
197 iinfo->max_send_wr = ic->i_send_ring.w_nr;
198 iinfo->max_recv_wr = ic->i_recv_ring.w_nr;
199 iinfo->max_send_sge = rds_iwdev->max_sge;
200 rds_iw_get_mr_info(rds_iwdev, iinfo);
201 }
202 return 1;
203}
204
205static void rds_iw_ic_info(struct socket *sock, unsigned int len,
206 struct rds_info_iterator *iter,
207 struct rds_info_lengths *lens)
208{
209 rds_for_each_conn_info(sock, len, iter, lens,
210 rds_iw_conn_info_visitor,
211 sizeof(struct rds_info_rdma_connection));
212}
213
214
215/*
216 * Early RDS/IB was built to only bind to an address if there is an IPoIB
217 * device with that address set.
218 *
219 * If it were me, I'd advocate for something more flexible. Sending and
220 * receiving should be device-agnostic. Transports would try and maintain
221 * connections between peers who have messages queued. Userspace would be
222 * allowed to influence which paths have priority. We could call userspace
223 * asserting this policy "routing".
224 */
225static int rds_iw_laddr_check(__be32 addr)
226{
227 int ret;
228 struct rdma_cm_id *cm_id;
229 struct sockaddr_in sin;
230
231 /* Create a CMA ID and try to bind it. This catches both
232 * IB and iWARP capable NICs.
233 */
234 cm_id = rdma_create_id(NULL, NULL, RDMA_PS_TCP);
235 if (!cm_id)
236 return -EADDRNOTAVAIL;
237
238 memset(&sin, 0, sizeof(sin));
239 sin.sin_family = AF_INET;
240 sin.sin_addr.s_addr = addr;
241
242 /* rdma_bind_addr will only succeed for IB & iWARP devices */
243 ret = rdma_bind_addr(cm_id, (struct sockaddr *)&sin);
244 /* due to this, we will claim to support IB devices unless we
245 check node_type. */
246 if (ret || cm_id->device->node_type != RDMA_NODE_RNIC)
247 ret = -EADDRNOTAVAIL;
248
249 rdsdebug("addr %pI4 ret %d node type %d\n",
250 &addr, ret,
251 cm_id->device ? cm_id->device->node_type : -1);
252
253 rdma_destroy_id(cm_id);
254
255 return ret;
256}
257
258void rds_iw_exit(void)
259{
260 rds_info_deregister_func(RDS_INFO_IWARP_CONNECTIONS, rds_iw_ic_info);
261 rds_iw_remove_nodev_conns();
262 ib_unregister_client(&rds_iw_client);
263 rds_iw_sysctl_exit();
264 rds_iw_recv_exit();
265 rds_trans_unregister(&rds_iw_transport);
266}
267
268struct rds_transport rds_iw_transport = {
269 .laddr_check = rds_iw_laddr_check,
270 .xmit_complete = rds_iw_xmit_complete,
271 .xmit = rds_iw_xmit,
272 .xmit_cong_map = NULL,
273 .xmit_rdma = rds_iw_xmit_rdma,
274 .recv = rds_iw_recv,
275 .conn_alloc = rds_iw_conn_alloc,
276 .conn_free = rds_iw_conn_free,
277 .conn_connect = rds_iw_conn_connect,
278 .conn_shutdown = rds_iw_conn_shutdown,
279 .inc_copy_to_user = rds_iw_inc_copy_to_user,
280 .inc_purge = rds_iw_inc_purge,
281 .inc_free = rds_iw_inc_free,
282 .cm_initiate_connect = rds_iw_cm_initiate_connect,
283 .cm_handle_connect = rds_iw_cm_handle_connect,
284 .cm_connect_complete = rds_iw_cm_connect_complete,
285 .stats_info_copy = rds_iw_stats_info_copy,
286 .exit = rds_iw_exit,
287 .get_mr = rds_iw_get_mr,
288 .sync_mr = rds_iw_sync_mr,
289 .free_mr = rds_iw_free_mr,
290 .flush_mrs = rds_iw_flush_mrs,
291 .t_owner = THIS_MODULE,
292 .t_name = "iwarp",
293 .t_prefer_loopback = 1,
294};
295
296int __init rds_iw_init(void)
297{
298 int ret;
299
300 INIT_LIST_HEAD(&rds_iw_devices);
301
302 ret = ib_register_client(&rds_iw_client);
303 if (ret)
304 goto out;
305
306 ret = rds_iw_sysctl_init();
307 if (ret)
308 goto out_ibreg;
309
310 ret = rds_iw_recv_init();
311 if (ret)
312 goto out_sysctl;
313
314 ret = rds_trans_register(&rds_iw_transport);
315 if (ret)
316 goto out_recv;
317
318 rds_info_register_func(RDS_INFO_IWARP_CONNECTIONS, rds_iw_ic_info);
319
320 goto out;
321
322out_recv:
323 rds_iw_recv_exit();
324out_sysctl:
325 rds_iw_sysctl_exit();
326out_ibreg:
327 ib_unregister_client(&rds_iw_client);
328out:
329 return ret;
330}
331
332MODULE_LICENSE("GPL");
333
diff --git a/net/rds/iw.h b/net/rds/iw.h
new file mode 100644
index 000000000000..0ddda34f2a1c
--- /dev/null
+++ b/net/rds/iw.h
@@ -0,0 +1,395 @@
1#ifndef _RDS_IW_H
2#define _RDS_IW_H
3
4#include <rdma/ib_verbs.h>
5#include <rdma/rdma_cm.h>
6#include "rds.h"
7#include "rdma_transport.h"
8
9#define RDS_FASTREG_SIZE 20
10#define RDS_FASTREG_POOL_SIZE 2048
11
12#define RDS_IW_MAX_SGE 8
13#define RDS_IW_RECV_SGE 2
14
15#define RDS_IW_DEFAULT_RECV_WR 1024
16#define RDS_IW_DEFAULT_SEND_WR 256
17
18#define RDS_IW_SUPPORTED_PROTOCOLS 0x00000003 /* minor versions supported */
19
20extern struct list_head rds_iw_devices;
21
22/*
23 * IB posts RDS_FRAG_SIZE fragments of pages to the receive queues to
24 * try and minimize the amount of memory tied up both the device and
25 * socket receive queues.
26 */
27/* page offset of the final full frag that fits in the page */
28#define RDS_PAGE_LAST_OFF (((PAGE_SIZE / RDS_FRAG_SIZE) - 1) * RDS_FRAG_SIZE)
29struct rds_page_frag {
30 struct list_head f_item;
31 struct page *f_page;
32 unsigned long f_offset;
33 dma_addr_t f_mapped;
34};
35
36struct rds_iw_incoming {
37 struct list_head ii_frags;
38 struct rds_incoming ii_inc;
39};
40
41struct rds_iw_connect_private {
42 /* Add new fields at the end, and don't permute existing fields. */
43 __be32 dp_saddr;
44 __be32 dp_daddr;
45 u8 dp_protocol_major;
46 u8 dp_protocol_minor;
47 __be16 dp_protocol_minor_mask; /* bitmask */
48 __be32 dp_reserved1;
49 __be64 dp_ack_seq;
50 __be32 dp_credit; /* non-zero enables flow ctl */
51};
52
53struct rds_iw_scatterlist {
54 struct scatterlist *list;
55 unsigned int len;
56 int dma_len;
57 unsigned int dma_npages;
58 unsigned int bytes;
59};
60
61struct rds_iw_mapping {
62 spinlock_t m_lock; /* protect the mapping struct */
63 struct list_head m_list;
64 struct rds_iw_mr *m_mr;
65 uint32_t m_rkey;
66 struct rds_iw_scatterlist m_sg;
67};
68
69struct rds_iw_send_work {
70 struct rds_message *s_rm;
71
72 /* We should really put these into a union: */
73 struct rds_rdma_op *s_op;
74 struct rds_iw_mapping *s_mapping;
75 struct ib_mr *s_mr;
76 struct ib_fast_reg_page_list *s_page_list;
77 unsigned char s_remap_count;
78
79 struct ib_send_wr s_wr;
80 struct ib_sge s_sge[RDS_IW_MAX_SGE];
81 unsigned long s_queued;
82};
83
84struct rds_iw_recv_work {
85 struct rds_iw_incoming *r_iwinc;
86 struct rds_page_frag *r_frag;
87 struct ib_recv_wr r_wr;
88 struct ib_sge r_sge[2];
89};
90
91struct rds_iw_work_ring {
92 u32 w_nr;
93 u32 w_alloc_ptr;
94 u32 w_alloc_ctr;
95 u32 w_free_ptr;
96 atomic_t w_free_ctr;
97};
98
99struct rds_iw_device;
100
101struct rds_iw_connection {
102
103 struct list_head iw_node;
104 struct rds_iw_device *rds_iwdev;
105 struct rds_connection *conn;
106
107 /* alphabet soup, IBTA style */
108 struct rdma_cm_id *i_cm_id;
109 struct ib_pd *i_pd;
110 struct ib_mr *i_mr;
111 struct ib_cq *i_send_cq;
112 struct ib_cq *i_recv_cq;
113
114 /* tx */
115 struct rds_iw_work_ring i_send_ring;
116 struct rds_message *i_rm;
117 struct rds_header *i_send_hdrs;
118 u64 i_send_hdrs_dma;
119 struct rds_iw_send_work *i_sends;
120
121 /* rx */
122 struct mutex i_recv_mutex;
123 struct rds_iw_work_ring i_recv_ring;
124 struct rds_iw_incoming *i_iwinc;
125 u32 i_recv_data_rem;
126 struct rds_header *i_recv_hdrs;
127 u64 i_recv_hdrs_dma;
128 struct rds_iw_recv_work *i_recvs;
129 struct rds_page_frag i_frag;
130 u64 i_ack_recv; /* last ACK received */
131
132 /* sending acks */
133 unsigned long i_ack_flags;
134 u64 i_ack_next; /* next ACK to send */
135 struct rds_header *i_ack;
136 struct ib_send_wr i_ack_wr;
137 struct ib_sge i_ack_sge;
138 u64 i_ack_dma;
139 unsigned long i_ack_queued;
140
141 /* Flow control related information
142 *
143 * Our algorithm uses a pair variables that we need to access
144 * atomically - one for the send credits, and one posted
145 * recv credits we need to transfer to remote.
146 * Rather than protect them using a slow spinlock, we put both into
147 * a single atomic_t and update it using cmpxchg
148 */
149 atomic_t i_credits;
150
151 /* Protocol version specific information */
152 unsigned int i_flowctl:1; /* enable/disable flow ctl */
153 unsigned int i_dma_local_lkey:1;
154 unsigned int i_fastreg_posted:1; /* fastreg posted on this connection */
155 /* Batched completions */
156 unsigned int i_unsignaled_wrs;
157 long i_unsignaled_bytes;
158};
159
160/* This assumes that atomic_t is at least 32 bits */
161#define IB_GET_SEND_CREDITS(v) ((v) & 0xffff)
162#define IB_GET_POST_CREDITS(v) ((v) >> 16)
163#define IB_SET_SEND_CREDITS(v) ((v) & 0xffff)
164#define IB_SET_POST_CREDITS(v) ((v) << 16)
165
166struct rds_iw_cm_id {
167 struct list_head list;
168 struct rdma_cm_id *cm_id;
169};
170
171struct rds_iw_device {
172 struct list_head list;
173 struct list_head cm_id_list;
174 struct list_head conn_list;
175 struct ib_device *dev;
176 struct ib_pd *pd;
177 struct ib_mr *mr;
178 struct rds_iw_mr_pool *mr_pool;
179 int page_shift;
180 int max_sge;
181 unsigned int max_wrs;
182 unsigned int dma_local_lkey:1;
183 spinlock_t spinlock; /* protect the above */
184};
185
186/* bits for i_ack_flags */
187#define IB_ACK_IN_FLIGHT 0
188#define IB_ACK_REQUESTED 1
189
190/* Magic WR_ID for ACKs */
191#define RDS_IW_ACK_WR_ID ((u64)0xffffffffffffffffULL)
192#define RDS_IW_FAST_REG_WR_ID ((u64)0xefefefefefefefefULL)
193#define RDS_IW_LOCAL_INV_WR_ID ((u64)0xdfdfdfdfdfdfdfdfULL)
194
195struct rds_iw_statistics {
196 uint64_t s_iw_connect_raced;
197 uint64_t s_iw_listen_closed_stale;
198 uint64_t s_iw_tx_cq_call;
199 uint64_t s_iw_tx_cq_event;
200 uint64_t s_iw_tx_ring_full;
201 uint64_t s_iw_tx_throttle;
202 uint64_t s_iw_tx_sg_mapping_failure;
203 uint64_t s_iw_tx_stalled;
204 uint64_t s_iw_tx_credit_updates;
205 uint64_t s_iw_rx_cq_call;
206 uint64_t s_iw_rx_cq_event;
207 uint64_t s_iw_rx_ring_empty;
208 uint64_t s_iw_rx_refill_from_cq;
209 uint64_t s_iw_rx_refill_from_thread;
210 uint64_t s_iw_rx_alloc_limit;
211 uint64_t s_iw_rx_credit_updates;
212 uint64_t s_iw_ack_sent;
213 uint64_t s_iw_ack_send_failure;
214 uint64_t s_iw_ack_send_delayed;
215 uint64_t s_iw_ack_send_piggybacked;
216 uint64_t s_iw_ack_received;
217 uint64_t s_iw_rdma_mr_alloc;
218 uint64_t s_iw_rdma_mr_free;
219 uint64_t s_iw_rdma_mr_used;
220 uint64_t s_iw_rdma_mr_pool_flush;
221 uint64_t s_iw_rdma_mr_pool_wait;
222 uint64_t s_iw_rdma_mr_pool_depleted;
223};
224
225extern struct workqueue_struct *rds_iw_wq;
226
227/*
228 * Fake ib_dma_sync_sg_for_{cpu,device} as long as ib_verbs.h
229 * doesn't define it.
230 */
231static inline void rds_iw_dma_sync_sg_for_cpu(struct ib_device *dev,
232 struct scatterlist *sg, unsigned int sg_dma_len, int direction)
233{
234 unsigned int i;
235
236 for (i = 0; i < sg_dma_len; ++i) {
237 ib_dma_sync_single_for_cpu(dev,
238 ib_sg_dma_address(dev, &sg[i]),
239 ib_sg_dma_len(dev, &sg[i]),
240 direction);
241 }
242}
243#define ib_dma_sync_sg_for_cpu rds_iw_dma_sync_sg_for_cpu
244
245static inline void rds_iw_dma_sync_sg_for_device(struct ib_device *dev,
246 struct scatterlist *sg, unsigned int sg_dma_len, int direction)
247{
248 unsigned int i;
249
250 for (i = 0; i < sg_dma_len; ++i) {
251 ib_dma_sync_single_for_device(dev,
252 ib_sg_dma_address(dev, &sg[i]),
253 ib_sg_dma_len(dev, &sg[i]),
254 direction);
255 }
256}
257#define ib_dma_sync_sg_for_device rds_iw_dma_sync_sg_for_device
258
259static inline u32 rds_iw_local_dma_lkey(struct rds_iw_connection *ic)
260{
261 return ic->i_dma_local_lkey ? ic->i_cm_id->device->local_dma_lkey : ic->i_mr->lkey;
262}
263
264/* ib.c */
265extern struct rds_transport rds_iw_transport;
266extern void rds_iw_add_one(struct ib_device *device);
267extern void rds_iw_remove_one(struct ib_device *device);
268extern struct ib_client rds_iw_client;
269
270extern unsigned int fastreg_pool_size;
271extern unsigned int fastreg_message_size;
272
273extern spinlock_t iw_nodev_conns_lock;
274extern struct list_head iw_nodev_conns;
275
276/* ib_cm.c */
277int rds_iw_conn_alloc(struct rds_connection *conn, gfp_t gfp);
278void rds_iw_conn_free(void *arg);
279int rds_iw_conn_connect(struct rds_connection *conn);
280void rds_iw_conn_shutdown(struct rds_connection *conn);
281void rds_iw_state_change(struct sock *sk);
282int __init rds_iw_listen_init(void);
283void rds_iw_listen_stop(void);
284void __rds_iw_conn_error(struct rds_connection *conn, const char *, ...);
285int rds_iw_cm_handle_connect(struct rdma_cm_id *cm_id,
286 struct rdma_cm_event *event);
287int rds_iw_cm_initiate_connect(struct rdma_cm_id *cm_id);
288void rds_iw_cm_connect_complete(struct rds_connection *conn,
289 struct rdma_cm_event *event);
290
291
292#define rds_iw_conn_error(conn, fmt...) \
293 __rds_iw_conn_error(conn, KERN_WARNING "RDS/IW: " fmt)
294
295/* ib_rdma.c */
296int rds_iw_update_cm_id(struct rds_iw_device *rds_iwdev, struct rdma_cm_id *cm_id);
297int rds_iw_add_conn(struct rds_iw_device *rds_iwdev, struct rds_connection *conn);
298void rds_iw_remove_nodev_conns(void);
299void rds_iw_remove_conns(struct rds_iw_device *rds_iwdev);
300struct rds_iw_mr_pool *rds_iw_create_mr_pool(struct rds_iw_device *);
301void rds_iw_get_mr_info(struct rds_iw_device *rds_iwdev, struct rds_info_rdma_connection *iinfo);
302void rds_iw_destroy_mr_pool(struct rds_iw_mr_pool *);
303void *rds_iw_get_mr(struct scatterlist *sg, unsigned long nents,
304 struct rds_sock *rs, u32 *key_ret);
305void rds_iw_sync_mr(void *trans_private, int dir);
306void rds_iw_free_mr(void *trans_private, int invalidate);
307void rds_iw_flush_mrs(void);
308void rds_iw_remove_cm_id(struct rds_iw_device *rds_iwdev, struct rdma_cm_id *cm_id);
309
310/* ib_recv.c */
311int __init rds_iw_recv_init(void);
312void rds_iw_recv_exit(void);
313int rds_iw_recv(struct rds_connection *conn);
314int rds_iw_recv_refill(struct rds_connection *conn, gfp_t kptr_gfp,
315 gfp_t page_gfp, int prefill);
316void rds_iw_inc_purge(struct rds_incoming *inc);
317void rds_iw_inc_free(struct rds_incoming *inc);
318int rds_iw_inc_copy_to_user(struct rds_incoming *inc, struct iovec *iov,
319 size_t size);
320void rds_iw_recv_cq_comp_handler(struct ib_cq *cq, void *context);
321void rds_iw_recv_init_ring(struct rds_iw_connection *ic);
322void rds_iw_recv_clear_ring(struct rds_iw_connection *ic);
323void rds_iw_recv_init_ack(struct rds_iw_connection *ic);
324void rds_iw_attempt_ack(struct rds_iw_connection *ic);
325void rds_iw_ack_send_complete(struct rds_iw_connection *ic);
326u64 rds_iw_piggyb_ack(struct rds_iw_connection *ic);
327
328/* ib_ring.c */
329void rds_iw_ring_init(struct rds_iw_work_ring *ring, u32 nr);
330void rds_iw_ring_resize(struct rds_iw_work_ring *ring, u32 nr);
331u32 rds_iw_ring_alloc(struct rds_iw_work_ring *ring, u32 val, u32 *pos);
332void rds_iw_ring_free(struct rds_iw_work_ring *ring, u32 val);
333void rds_iw_ring_unalloc(struct rds_iw_work_ring *ring, u32 val);
334int rds_iw_ring_empty(struct rds_iw_work_ring *ring);
335int rds_iw_ring_low(struct rds_iw_work_ring *ring);
336u32 rds_iw_ring_oldest(struct rds_iw_work_ring *ring);
337u32 rds_iw_ring_completed(struct rds_iw_work_ring *ring, u32 wr_id, u32 oldest);
338extern wait_queue_head_t rds_iw_ring_empty_wait;
339
340/* ib_send.c */
341void rds_iw_xmit_complete(struct rds_connection *conn);
342int rds_iw_xmit(struct rds_connection *conn, struct rds_message *rm,
343 unsigned int hdr_off, unsigned int sg, unsigned int off);
344void rds_iw_send_cq_comp_handler(struct ib_cq *cq, void *context);
345void rds_iw_send_init_ring(struct rds_iw_connection *ic);
346void rds_iw_send_clear_ring(struct rds_iw_connection *ic);
347int rds_iw_xmit_rdma(struct rds_connection *conn, struct rds_rdma_op *op);
348void rds_iw_send_add_credits(struct rds_connection *conn, unsigned int credits);
349void rds_iw_advertise_credits(struct rds_connection *conn, unsigned int posted);
350int rds_iw_send_grab_credits(struct rds_iw_connection *ic, u32 wanted,
351 u32 *adv_credits, int need_posted);
352
353/* ib_stats.c */
354DECLARE_PER_CPU(struct rds_iw_statistics, rds_iw_stats);
355#define rds_iw_stats_inc(member) rds_stats_inc_which(rds_iw_stats, member)
356unsigned int rds_iw_stats_info_copy(struct rds_info_iterator *iter,
357 unsigned int avail);
358
359/* ib_sysctl.c */
360int __init rds_iw_sysctl_init(void);
361void rds_iw_sysctl_exit(void);
362extern unsigned long rds_iw_sysctl_max_send_wr;
363extern unsigned long rds_iw_sysctl_max_recv_wr;
364extern unsigned long rds_iw_sysctl_max_unsig_wrs;
365extern unsigned long rds_iw_sysctl_max_unsig_bytes;
366extern unsigned long rds_iw_sysctl_max_recv_allocation;
367extern unsigned int rds_iw_sysctl_flow_control;
368extern ctl_table rds_iw_sysctl_table[];
369
370/*
371 * Helper functions for getting/setting the header and data SGEs in
372 * RDS packets (not RDMA)
373 */
374static inline struct ib_sge *
375rds_iw_header_sge(struct rds_iw_connection *ic, struct ib_sge *sge)
376{
377 return &sge[0];
378}
379
380static inline struct ib_sge *
381rds_iw_data_sge(struct rds_iw_connection *ic, struct ib_sge *sge)
382{
383 return &sge[1];
384}
385
386static inline void rds_iw_set_64bit(u64 *ptr, u64 val)
387{
388#if BITS_PER_LONG == 64
389 *ptr = val;
390#else
391 set_64bit(ptr, val);
392#endif
393}
394
395#endif
diff --git a/net/rds/iw_cm.c b/net/rds/iw_cm.c
new file mode 100644
index 000000000000..57ecb3d4b8a5
--- /dev/null
+++ b/net/rds/iw_cm.c
@@ -0,0 +1,750 @@
1/*
2 * Copyright (c) 2006 Oracle. All rights reserved.
3 *
4 * This software is available to you under a choice of one of two
5 * licenses. You may choose to be licensed under the terms of the GNU
6 * General Public License (GPL) Version 2, available from the file
7 * COPYING in the main directory of this source tree, or the
8 * OpenIB.org BSD license below:
9 *
10 * Redistribution and use in source and binary forms, with or
11 * without modification, are permitted provided that the following
12 * conditions are met:
13 *
14 * - Redistributions of source code must retain the above
15 * copyright notice, this list of conditions and the following
16 * disclaimer.
17 *
18 * - Redistributions in binary form must reproduce the above
19 * copyright notice, this list of conditions and the following
20 * disclaimer in the documentation and/or other materials
21 * provided with the distribution.
22 *
23 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
24 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
25 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
26 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
27 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
28 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
29 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
30 * SOFTWARE.
31 *
32 */
33#include <linux/kernel.h>
34#include <linux/in.h>
35#include <linux/vmalloc.h>
36
37#include "rds.h"
38#include "iw.h"
39
40/*
41 * Set the selected protocol version
42 */
43static void rds_iw_set_protocol(struct rds_connection *conn, unsigned int version)
44{
45 conn->c_version = version;
46}
47
48/*
49 * Set up flow control
50 */
51static void rds_iw_set_flow_control(struct rds_connection *conn, u32 credits)
52{
53 struct rds_iw_connection *ic = conn->c_transport_data;
54
55 if (rds_iw_sysctl_flow_control && credits != 0) {
56 /* We're doing flow control */
57 ic->i_flowctl = 1;
58 rds_iw_send_add_credits(conn, credits);
59 } else {
60 ic->i_flowctl = 0;
61 }
62}
63
64/*
65 * Connection established.
66 * We get here for both outgoing and incoming connection.
67 */
68void rds_iw_cm_connect_complete(struct rds_connection *conn, struct rdma_cm_event *event)
69{
70 const struct rds_iw_connect_private *dp = NULL;
71 struct rds_iw_connection *ic = conn->c_transport_data;
72 struct rds_iw_device *rds_iwdev;
73 int err;
74
75 if (event->param.conn.private_data_len) {
76 dp = event->param.conn.private_data;
77
78 rds_iw_set_protocol(conn,
79 RDS_PROTOCOL(dp->dp_protocol_major,
80 dp->dp_protocol_minor));
81 rds_iw_set_flow_control(conn, be32_to_cpu(dp->dp_credit));
82 }
83
84 /* update ib_device with this local ipaddr & conn */
85 rds_iwdev = ib_get_client_data(ic->i_cm_id->device, &rds_iw_client);
86 err = rds_iw_update_cm_id(rds_iwdev, ic->i_cm_id);
87 if (err)
88 printk(KERN_ERR "rds_iw_update_ipaddr failed (%d)\n", err);
89 err = rds_iw_add_conn(rds_iwdev, conn);
90 if (err)
91 printk(KERN_ERR "rds_iw_add_conn failed (%d)\n", err);
92
93 /* If the peer gave us the last packet it saw, process this as if
94 * we had received a regular ACK. */
95 if (dp && dp->dp_ack_seq)
96 rds_send_drop_acked(conn, be64_to_cpu(dp->dp_ack_seq), NULL);
97
98 printk(KERN_NOTICE "RDS/IW: connected to %pI4<->%pI4 version %u.%u%s\n",
99 &conn->c_laddr, &conn->c_faddr,
100 RDS_PROTOCOL_MAJOR(conn->c_version),
101 RDS_PROTOCOL_MINOR(conn->c_version),
102 ic->i_flowctl ? ", flow control" : "");
103
104 rds_connect_complete(conn);
105}
106
107static void rds_iw_cm_fill_conn_param(struct rds_connection *conn,
108 struct rdma_conn_param *conn_param,
109 struct rds_iw_connect_private *dp,
110 u32 protocol_version)
111{
112 struct rds_iw_connection *ic = conn->c_transport_data;
113
114 memset(conn_param, 0, sizeof(struct rdma_conn_param));
115 /* XXX tune these? */
116 conn_param->responder_resources = 1;
117 conn_param->initiator_depth = 1;
118
119 if (dp) {
120 memset(dp, 0, sizeof(*dp));
121 dp->dp_saddr = conn->c_laddr;
122 dp->dp_daddr = conn->c_faddr;
123 dp->dp_protocol_major = RDS_PROTOCOL_MAJOR(protocol_version);
124 dp->dp_protocol_minor = RDS_PROTOCOL_MINOR(protocol_version);
125 dp->dp_protocol_minor_mask = cpu_to_be16(RDS_IW_SUPPORTED_PROTOCOLS);
126 dp->dp_ack_seq = rds_iw_piggyb_ack(ic);
127
128 /* Advertise flow control */
129 if (ic->i_flowctl) {
130 unsigned int credits;
131
132 credits = IB_GET_POST_CREDITS(atomic_read(&ic->i_credits));
133 dp->dp_credit = cpu_to_be32(credits);
134 atomic_sub(IB_SET_POST_CREDITS(credits), &ic->i_credits);
135 }
136
137 conn_param->private_data = dp;
138 conn_param->private_data_len = sizeof(*dp);
139 }
140}
141
142static void rds_iw_cq_event_handler(struct ib_event *event, void *data)
143{
144 rdsdebug("event %u data %p\n", event->event, data);
145}
146
147static void rds_iw_qp_event_handler(struct ib_event *event, void *data)
148{
149 struct rds_connection *conn = data;
150 struct rds_iw_connection *ic = conn->c_transport_data;
151
152 rdsdebug("conn %p ic %p event %u\n", conn, ic, event->event);
153
154 switch (event->event) {
155 case IB_EVENT_COMM_EST:
156 rdma_notify(ic->i_cm_id, IB_EVENT_COMM_EST);
157 break;
158 case IB_EVENT_QP_REQ_ERR:
159 case IB_EVENT_QP_FATAL:
160 default:
161 rds_iw_conn_error(conn, "RDS/IW: Fatal QP Event %u - connection %pI4->%pI4...reconnecting\n",
162 event->event, &conn->c_laddr,
163 &conn->c_faddr);
164 break;
165 }
166}
167
168/*
169 * Create a QP
170 */
171static int rds_iw_init_qp_attrs(struct ib_qp_init_attr *attr,
172 struct rds_iw_device *rds_iwdev,
173 struct rds_iw_work_ring *send_ring,
174 void (*send_cq_handler)(struct ib_cq *, void *),
175 struct rds_iw_work_ring *recv_ring,
176 void (*recv_cq_handler)(struct ib_cq *, void *),
177 void *context)
178{
179 struct ib_device *dev = rds_iwdev->dev;
180 unsigned int send_size, recv_size;
181 int ret;
182
183 /* The offset of 1 is to accomodate the additional ACK WR. */
184 send_size = min_t(unsigned int, rds_iwdev->max_wrs, rds_iw_sysctl_max_send_wr + 1);
185 recv_size = min_t(unsigned int, rds_iwdev->max_wrs, rds_iw_sysctl_max_recv_wr + 1);
186 rds_iw_ring_resize(send_ring, send_size - 1);
187 rds_iw_ring_resize(recv_ring, recv_size - 1);
188
189 memset(attr, 0, sizeof(*attr));
190 attr->event_handler = rds_iw_qp_event_handler;
191 attr->qp_context = context;
192 attr->cap.max_send_wr = send_size;
193 attr->cap.max_recv_wr = recv_size;
194 attr->cap.max_send_sge = rds_iwdev->max_sge;
195 attr->cap.max_recv_sge = RDS_IW_RECV_SGE;
196 attr->sq_sig_type = IB_SIGNAL_REQ_WR;
197 attr->qp_type = IB_QPT_RC;
198
199 attr->send_cq = ib_create_cq(dev, send_cq_handler,
200 rds_iw_cq_event_handler,
201 context, send_size, 0);
202 if (IS_ERR(attr->send_cq)) {
203 ret = PTR_ERR(attr->send_cq);
204 attr->send_cq = NULL;
205 rdsdebug("ib_create_cq send failed: %d\n", ret);
206 goto out;
207 }
208
209 attr->recv_cq = ib_create_cq(dev, recv_cq_handler,
210 rds_iw_cq_event_handler,
211 context, recv_size, 0);
212 if (IS_ERR(attr->recv_cq)) {
213 ret = PTR_ERR(attr->recv_cq);
214 attr->recv_cq = NULL;
215 rdsdebug("ib_create_cq send failed: %d\n", ret);
216 goto out;
217 }
218
219 ret = ib_req_notify_cq(attr->send_cq, IB_CQ_NEXT_COMP);
220 if (ret) {
221 rdsdebug("ib_req_notify_cq send failed: %d\n", ret);
222 goto out;
223 }
224
225 ret = ib_req_notify_cq(attr->recv_cq, IB_CQ_SOLICITED);
226 if (ret) {
227 rdsdebug("ib_req_notify_cq recv failed: %d\n", ret);
228 goto out;
229 }
230
231out:
232 if (ret) {
233 if (attr->send_cq)
234 ib_destroy_cq(attr->send_cq);
235 if (attr->recv_cq)
236 ib_destroy_cq(attr->recv_cq);
237 }
238 return ret;
239}
240
241/*
242 * This needs to be very careful to not leave IS_ERR pointers around for
243 * cleanup to trip over.
244 */
245static int rds_iw_setup_qp(struct rds_connection *conn)
246{
247 struct rds_iw_connection *ic = conn->c_transport_data;
248 struct ib_device *dev = ic->i_cm_id->device;
249 struct ib_qp_init_attr attr;
250 struct rds_iw_device *rds_iwdev;
251 int ret;
252
253 /* rds_iw_add_one creates a rds_iw_device object per IB device,
254 * and allocates a protection domain, memory range and MR pool
255 * for each. If that fails for any reason, it will not register
256 * the rds_iwdev at all.
257 */
258 rds_iwdev = ib_get_client_data(dev, &rds_iw_client);
259 if (rds_iwdev == NULL) {
260 if (printk_ratelimit())
261 printk(KERN_NOTICE "RDS/IW: No client_data for device %s\n",
262 dev->name);
263 return -EOPNOTSUPP;
264 }
265
266 /* Protection domain and memory range */
267 ic->i_pd = rds_iwdev->pd;
268 ic->i_mr = rds_iwdev->mr;
269
270 ret = rds_iw_init_qp_attrs(&attr, rds_iwdev,
271 &ic->i_send_ring, rds_iw_send_cq_comp_handler,
272 &ic->i_recv_ring, rds_iw_recv_cq_comp_handler,
273 conn);
274 if (ret < 0)
275 goto out;
276
277 ic->i_send_cq = attr.send_cq;
278 ic->i_recv_cq = attr.recv_cq;
279
280 /*
281 * XXX this can fail if max_*_wr is too large? Are we supposed
282 * to back off until we get a value that the hardware can support?
283 */
284 ret = rdma_create_qp(ic->i_cm_id, ic->i_pd, &attr);
285 if (ret) {
286 rdsdebug("rdma_create_qp failed: %d\n", ret);
287 goto out;
288 }
289
290 ic->i_send_hdrs = ib_dma_alloc_coherent(dev,
291 ic->i_send_ring.w_nr *
292 sizeof(struct rds_header),
293 &ic->i_send_hdrs_dma, GFP_KERNEL);
294 if (ic->i_send_hdrs == NULL) {
295 ret = -ENOMEM;
296 rdsdebug("ib_dma_alloc_coherent send failed\n");
297 goto out;
298 }
299
300 ic->i_recv_hdrs = ib_dma_alloc_coherent(dev,
301 ic->i_recv_ring.w_nr *
302 sizeof(struct rds_header),
303 &ic->i_recv_hdrs_dma, GFP_KERNEL);
304 if (ic->i_recv_hdrs == NULL) {
305 ret = -ENOMEM;
306 rdsdebug("ib_dma_alloc_coherent recv failed\n");
307 goto out;
308 }
309
310 ic->i_ack = ib_dma_alloc_coherent(dev, sizeof(struct rds_header),
311 &ic->i_ack_dma, GFP_KERNEL);
312 if (ic->i_ack == NULL) {
313 ret = -ENOMEM;
314 rdsdebug("ib_dma_alloc_coherent ack failed\n");
315 goto out;
316 }
317
318 ic->i_sends = vmalloc(ic->i_send_ring.w_nr * sizeof(struct rds_iw_send_work));
319 if (ic->i_sends == NULL) {
320 ret = -ENOMEM;
321 rdsdebug("send allocation failed\n");
322 goto out;
323 }
324 rds_iw_send_init_ring(ic);
325
326 ic->i_recvs = vmalloc(ic->i_recv_ring.w_nr * sizeof(struct rds_iw_recv_work));
327 if (ic->i_recvs == NULL) {
328 ret = -ENOMEM;
329 rdsdebug("recv allocation failed\n");
330 goto out;
331 }
332
333 rds_iw_recv_init_ring(ic);
334 rds_iw_recv_init_ack(ic);
335
336 /* Post receive buffers - as a side effect, this will update
337 * the posted credit count. */
338 rds_iw_recv_refill(conn, GFP_KERNEL, GFP_HIGHUSER, 1);
339
340 rdsdebug("conn %p pd %p mr %p cq %p %p\n", conn, ic->i_pd, ic->i_mr,
341 ic->i_send_cq, ic->i_recv_cq);
342
343out:
344 return ret;
345}
346
347static u32 rds_iw_protocol_compatible(const struct rds_iw_connect_private *dp)
348{
349 u16 common;
350 u32 version = 0;
351
352 /* rdma_cm private data is odd - when there is any private data in the
353 * request, we will be given a pretty large buffer without telling us the
354 * original size. The only way to tell the difference is by looking at
355 * the contents, which are initialized to zero.
356 * If the protocol version fields aren't set, this is a connection attempt
357 * from an older version. This could could be 3.0 or 2.0 - we can't tell.
358 * We really should have changed this for OFED 1.3 :-( */
359 if (dp->dp_protocol_major == 0)
360 return RDS_PROTOCOL_3_0;
361
362 common = be16_to_cpu(dp->dp_protocol_minor_mask) & RDS_IW_SUPPORTED_PROTOCOLS;
363 if (dp->dp_protocol_major == 3 && common) {
364 version = RDS_PROTOCOL_3_0;
365 while ((common >>= 1) != 0)
366 version++;
367 } else if (printk_ratelimit()) {
368 printk(KERN_NOTICE "RDS: Connection from %pI4 using "
369 "incompatible protocol version %u.%u\n",
370 &dp->dp_saddr,
371 dp->dp_protocol_major,
372 dp->dp_protocol_minor);
373 }
374 return version;
375}
376
377int rds_iw_cm_handle_connect(struct rdma_cm_id *cm_id,
378 struct rdma_cm_event *event)
379{
380 const struct rds_iw_connect_private *dp = event->param.conn.private_data;
381 struct rds_iw_connect_private dp_rep;
382 struct rds_connection *conn = NULL;
383 struct rds_iw_connection *ic = NULL;
384 struct rdma_conn_param conn_param;
385 struct rds_iw_device *rds_iwdev;
386 u32 version;
387 int err, destroy = 1;
388
389 /* Check whether the remote protocol version matches ours. */
390 version = rds_iw_protocol_compatible(dp);
391 if (!version)
392 goto out;
393
394 rdsdebug("saddr %pI4 daddr %pI4 RDSv%u.%u\n",
395 &dp->dp_saddr, &dp->dp_daddr,
396 RDS_PROTOCOL_MAJOR(version), RDS_PROTOCOL_MINOR(version));
397
398 conn = rds_conn_create(dp->dp_daddr, dp->dp_saddr, &rds_iw_transport,
399 GFP_KERNEL);
400 if (IS_ERR(conn)) {
401 rdsdebug("rds_conn_create failed (%ld)\n", PTR_ERR(conn));
402 conn = NULL;
403 goto out;
404 }
405
406 /*
407 * The connection request may occur while the
408 * previous connection exist, e.g. in case of failover.
409 * But as connections may be initiated simultaneously
410 * by both hosts, we have a random backoff mechanism -
411 * see the comment above rds_queue_reconnect()
412 */
413 mutex_lock(&conn->c_cm_lock);
414 if (!rds_conn_transition(conn, RDS_CONN_DOWN, RDS_CONN_CONNECTING)) {
415 if (rds_conn_state(conn) == RDS_CONN_UP) {
416 rdsdebug("incoming connect while connecting\n");
417 rds_conn_drop(conn);
418 rds_iw_stats_inc(s_iw_listen_closed_stale);
419 } else
420 if (rds_conn_state(conn) == RDS_CONN_CONNECTING) {
421 /* Wait and see - our connect may still be succeeding */
422 rds_iw_stats_inc(s_iw_connect_raced);
423 }
424 mutex_unlock(&conn->c_cm_lock);
425 goto out;
426 }
427
428 ic = conn->c_transport_data;
429
430 rds_iw_set_protocol(conn, version);
431 rds_iw_set_flow_control(conn, be32_to_cpu(dp->dp_credit));
432
433 /* If the peer gave us the last packet it saw, process this as if
434 * we had received a regular ACK. */
435 if (dp->dp_ack_seq)
436 rds_send_drop_acked(conn, be64_to_cpu(dp->dp_ack_seq), NULL);
437
438 BUG_ON(cm_id->context);
439 BUG_ON(ic->i_cm_id);
440
441 ic->i_cm_id = cm_id;
442 cm_id->context = conn;
443
444 rds_iwdev = ib_get_client_data(cm_id->device, &rds_iw_client);
445 ic->i_dma_local_lkey = rds_iwdev->dma_local_lkey;
446
447 /* We got halfway through setting up the ib_connection, if we
448 * fail now, we have to take the long route out of this mess. */
449 destroy = 0;
450
451 err = rds_iw_setup_qp(conn);
452 if (err) {
453 rds_iw_conn_error(conn, "rds_iw_setup_qp failed (%d)\n", err);
454 goto out;
455 }
456
457 rds_iw_cm_fill_conn_param(conn, &conn_param, &dp_rep, version);
458
459 /* rdma_accept() calls rdma_reject() internally if it fails */
460 err = rdma_accept(cm_id, &conn_param);
461 mutex_unlock(&conn->c_cm_lock);
462 if (err) {
463 rds_iw_conn_error(conn, "rdma_accept failed (%d)\n", err);
464 goto out;
465 }
466
467 return 0;
468
469out:
470 rdma_reject(cm_id, NULL, 0);
471 return destroy;
472}
473
474
475int rds_iw_cm_initiate_connect(struct rdma_cm_id *cm_id)
476{
477 struct rds_connection *conn = cm_id->context;
478 struct rds_iw_connection *ic = conn->c_transport_data;
479 struct rdma_conn_param conn_param;
480 struct rds_iw_connect_private dp;
481 int ret;
482
483 /* If the peer doesn't do protocol negotiation, we must
484 * default to RDSv3.0 */
485 rds_iw_set_protocol(conn, RDS_PROTOCOL_3_0);
486 ic->i_flowctl = rds_iw_sysctl_flow_control; /* advertise flow control */
487
488 ret = rds_iw_setup_qp(conn);
489 if (ret) {
490 rds_iw_conn_error(conn, "rds_iw_setup_qp failed (%d)\n", ret);
491 goto out;
492 }
493
494 rds_iw_cm_fill_conn_param(conn, &conn_param, &dp, RDS_PROTOCOL_VERSION);
495
496 ret = rdma_connect(cm_id, &conn_param);
497 if (ret)
498 rds_iw_conn_error(conn, "rdma_connect failed (%d)\n", ret);
499
500out:
501 /* Beware - returning non-zero tells the rdma_cm to destroy
502 * the cm_id. We should certainly not do it as long as we still
503 * "own" the cm_id. */
504 if (ret) {
505 struct rds_iw_connection *ic = conn->c_transport_data;
506
507 if (ic->i_cm_id == cm_id)
508 ret = 0;
509 }
510 return ret;
511}
512
513int rds_iw_conn_connect(struct rds_connection *conn)
514{
515 struct rds_iw_connection *ic = conn->c_transport_data;
516 struct rds_iw_device *rds_iwdev;
517 struct sockaddr_in src, dest;
518 int ret;
519
520 /* XXX I wonder what affect the port space has */
521 /* delegate cm event handler to rdma_transport */
522 ic->i_cm_id = rdma_create_id(rds_rdma_cm_event_handler, conn,
523 RDMA_PS_TCP);
524 if (IS_ERR(ic->i_cm_id)) {
525 ret = PTR_ERR(ic->i_cm_id);
526 ic->i_cm_id = NULL;
527 rdsdebug("rdma_create_id() failed: %d\n", ret);
528 goto out;
529 }
530
531 rdsdebug("created cm id %p for conn %p\n", ic->i_cm_id, conn);
532
533 src.sin_family = AF_INET;
534 src.sin_addr.s_addr = (__force u32)conn->c_laddr;
535 src.sin_port = (__force u16)htons(0);
536
537 /* First, bind to the local address and device. */
538 ret = rdma_bind_addr(ic->i_cm_id, (struct sockaddr *) &src);
539 if (ret) {
540 rdsdebug("rdma_bind_addr(%pI4) failed: %d\n",
541 &conn->c_laddr, ret);
542 rdma_destroy_id(ic->i_cm_id);
543 ic->i_cm_id = NULL;
544 goto out;
545 }
546
547 rds_iwdev = ib_get_client_data(ic->i_cm_id->device, &rds_iw_client);
548 ic->i_dma_local_lkey = rds_iwdev->dma_local_lkey;
549
550 dest.sin_family = AF_INET;
551 dest.sin_addr.s_addr = (__force u32)conn->c_faddr;
552 dest.sin_port = (__force u16)htons(RDS_PORT);
553
554 ret = rdma_resolve_addr(ic->i_cm_id, (struct sockaddr *)&src,
555 (struct sockaddr *)&dest,
556 RDS_RDMA_RESOLVE_TIMEOUT_MS);
557 if (ret) {
558 rdsdebug("addr resolve failed for cm id %p: %d\n", ic->i_cm_id,
559 ret);
560 rdma_destroy_id(ic->i_cm_id);
561 ic->i_cm_id = NULL;
562 }
563
564out:
565 return ret;
566}
567
568/*
569 * This is so careful about only cleaning up resources that were built up
570 * so that it can be called at any point during startup. In fact it
571 * can be called multiple times for a given connection.
572 */
573void rds_iw_conn_shutdown(struct rds_connection *conn)
574{
575 struct rds_iw_connection *ic = conn->c_transport_data;
576 int err = 0;
577 struct ib_qp_attr qp_attr;
578
579 rdsdebug("cm %p pd %p cq %p %p qp %p\n", ic->i_cm_id,
580 ic->i_pd, ic->i_send_cq, ic->i_recv_cq,
581 ic->i_cm_id ? ic->i_cm_id->qp : NULL);
582
583 if (ic->i_cm_id) {
584 struct ib_device *dev = ic->i_cm_id->device;
585
586 rdsdebug("disconnecting cm %p\n", ic->i_cm_id);
587 err = rdma_disconnect(ic->i_cm_id);
588 if (err) {
589 /* Actually this may happen quite frequently, when
590 * an outgoing connect raced with an incoming connect.
591 */
592 rdsdebug("rds_iw_conn_shutdown: failed to disconnect,"
593 " cm: %p err %d\n", ic->i_cm_id, err);
594 }
595
596 if (ic->i_cm_id->qp) {
597 qp_attr.qp_state = IB_QPS_ERR;
598 ib_modify_qp(ic->i_cm_id->qp, &qp_attr, IB_QP_STATE);
599 }
600
601 wait_event(rds_iw_ring_empty_wait,
602 rds_iw_ring_empty(&ic->i_send_ring) &&
603 rds_iw_ring_empty(&ic->i_recv_ring));
604
605 if (ic->i_send_hdrs)
606 ib_dma_free_coherent(dev,
607 ic->i_send_ring.w_nr *
608 sizeof(struct rds_header),
609 ic->i_send_hdrs,
610 ic->i_send_hdrs_dma);
611
612 if (ic->i_recv_hdrs)
613 ib_dma_free_coherent(dev,
614 ic->i_recv_ring.w_nr *
615 sizeof(struct rds_header),
616 ic->i_recv_hdrs,
617 ic->i_recv_hdrs_dma);
618
619 if (ic->i_ack)
620 ib_dma_free_coherent(dev, sizeof(struct rds_header),
621 ic->i_ack, ic->i_ack_dma);
622
623 if (ic->i_sends)
624 rds_iw_send_clear_ring(ic);
625 if (ic->i_recvs)
626 rds_iw_recv_clear_ring(ic);
627
628 if (ic->i_cm_id->qp)
629 rdma_destroy_qp(ic->i_cm_id);
630 if (ic->i_send_cq)
631 ib_destroy_cq(ic->i_send_cq);
632 if (ic->i_recv_cq)
633 ib_destroy_cq(ic->i_recv_cq);
634
635 /*
636 * If associated with an rds_iw_device:
637 * Move connection back to the nodev list.
638 * Remove cm_id from the device cm_id list.
639 */
640 if (ic->rds_iwdev) {
641
642 spin_lock_irq(&ic->rds_iwdev->spinlock);
643 BUG_ON(list_empty(&ic->iw_node));
644 list_del(&ic->iw_node);
645 spin_unlock_irq(&ic->rds_iwdev->spinlock);
646
647 spin_lock_irq(&iw_nodev_conns_lock);
648 list_add_tail(&ic->iw_node, &iw_nodev_conns);
649 spin_unlock_irq(&iw_nodev_conns_lock);
650 rds_iw_remove_cm_id(ic->rds_iwdev, ic->i_cm_id);
651 ic->rds_iwdev = NULL;
652 }
653
654 rdma_destroy_id(ic->i_cm_id);
655
656 ic->i_cm_id = NULL;
657 ic->i_pd = NULL;
658 ic->i_mr = NULL;
659 ic->i_send_cq = NULL;
660 ic->i_recv_cq = NULL;
661 ic->i_send_hdrs = NULL;
662 ic->i_recv_hdrs = NULL;
663 ic->i_ack = NULL;
664 }
665 BUG_ON(ic->rds_iwdev);
666
667 /* Clear pending transmit */
668 if (ic->i_rm) {
669 rds_message_put(ic->i_rm);
670 ic->i_rm = NULL;
671 }
672
673 /* Clear the ACK state */
674 clear_bit(IB_ACK_IN_FLIGHT, &ic->i_ack_flags);
675 rds_iw_set_64bit(&ic->i_ack_next, 0);
676 ic->i_ack_recv = 0;
677
678 /* Clear flow control state */
679 ic->i_flowctl = 0;
680 atomic_set(&ic->i_credits, 0);
681
682 rds_iw_ring_init(&ic->i_send_ring, rds_iw_sysctl_max_send_wr);
683 rds_iw_ring_init(&ic->i_recv_ring, rds_iw_sysctl_max_recv_wr);
684
685 if (ic->i_iwinc) {
686 rds_inc_put(&ic->i_iwinc->ii_inc);
687 ic->i_iwinc = NULL;
688 }
689
690 vfree(ic->i_sends);
691 ic->i_sends = NULL;
692 vfree(ic->i_recvs);
693 ic->i_recvs = NULL;
694 rdsdebug("shutdown complete\n");
695}
696
697int rds_iw_conn_alloc(struct rds_connection *conn, gfp_t gfp)
698{
699 struct rds_iw_connection *ic;
700 unsigned long flags;
701
702 /* XXX too lazy? */
703 ic = kzalloc(sizeof(struct rds_iw_connection), GFP_KERNEL);
704 if (ic == NULL)
705 return -ENOMEM;
706
707 INIT_LIST_HEAD(&ic->iw_node);
708 mutex_init(&ic->i_recv_mutex);
709
710 /*
711 * rds_iw_conn_shutdown() waits for these to be emptied so they
712 * must be initialized before it can be called.
713 */
714 rds_iw_ring_init(&ic->i_send_ring, rds_iw_sysctl_max_send_wr);
715 rds_iw_ring_init(&ic->i_recv_ring, rds_iw_sysctl_max_recv_wr);
716
717 ic->conn = conn;
718 conn->c_transport_data = ic;
719
720 spin_lock_irqsave(&iw_nodev_conns_lock, flags);
721 list_add_tail(&ic->iw_node, &iw_nodev_conns);
722 spin_unlock_irqrestore(&iw_nodev_conns_lock, flags);
723
724
725 rdsdebug("conn %p conn ic %p\n", conn, conn->c_transport_data);
726 return 0;
727}
728
729void rds_iw_conn_free(void *arg)
730{
731 struct rds_iw_connection *ic = arg;
732 rdsdebug("ic %p\n", ic);
733 list_del(&ic->iw_node);
734 kfree(ic);
735}
736
737/*
738 * An error occurred on the connection
739 */
740void
741__rds_iw_conn_error(struct rds_connection *conn, const char *fmt, ...)
742{
743 va_list ap;
744
745 rds_conn_drop(conn);
746
747 va_start(ap, fmt);
748 vprintk(fmt, ap);
749 va_end(ap);
750}
diff --git a/net/rds/iw_rdma.c b/net/rds/iw_rdma.c
new file mode 100644
index 000000000000..1c02a8f952d0
--- /dev/null
+++ b/net/rds/iw_rdma.c
@@ -0,0 +1,888 @@
1/*
2 * Copyright (c) 2006 Oracle. All rights reserved.
3 *
4 * This software is available to you under a choice of one of two
5 * licenses. You may choose to be licensed under the terms of the GNU
6 * General Public License (GPL) Version 2, available from the file
7 * COPYING in the main directory of this source tree, or the
8 * OpenIB.org BSD license below:
9 *
10 * Redistribution and use in source and binary forms, with or
11 * without modification, are permitted provided that the following
12 * conditions are met:
13 *
14 * - Redistributions of source code must retain the above
15 * copyright notice, this list of conditions and the following
16 * disclaimer.
17 *
18 * - Redistributions in binary form must reproduce the above
19 * copyright notice, this list of conditions and the following
20 * disclaimer in the documentation and/or other materials
21 * provided with the distribution.
22 *
23 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
24 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
25 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
26 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
27 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
28 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
29 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
30 * SOFTWARE.
31 *
32 */
33#include <linux/kernel.h>
34
35#include "rds.h"
36#include "rdma.h"
37#include "iw.h"
38
39
40/*
41 * This is stored as mr->r_trans_private.
42 */
43struct rds_iw_mr {
44 struct rds_iw_device *device;
45 struct rds_iw_mr_pool *pool;
46 struct rdma_cm_id *cm_id;
47
48 struct ib_mr *mr;
49 struct ib_fast_reg_page_list *page_list;
50
51 struct rds_iw_mapping mapping;
52 unsigned char remap_count;
53};
54
55/*
56 * Our own little MR pool
57 */
58struct rds_iw_mr_pool {
59 struct rds_iw_device *device; /* back ptr to the device that owns us */
60
61 struct mutex flush_lock; /* serialize fmr invalidate */
62 struct work_struct flush_worker; /* flush worker */
63
64 spinlock_t list_lock; /* protect variables below */
65 atomic_t item_count; /* total # of MRs */
66 atomic_t dirty_count; /* # dirty of MRs */
67 struct list_head dirty_list; /* dirty mappings */
68 struct list_head clean_list; /* unused & unamapped MRs */
69 atomic_t free_pinned; /* memory pinned by free MRs */
70 unsigned long max_message_size; /* in pages */
71 unsigned long max_items;
72 unsigned long max_items_soft;
73 unsigned long max_free_pinned;
74 int max_pages;
75};
76
77static int rds_iw_flush_mr_pool(struct rds_iw_mr_pool *pool, int free_all);
78static void rds_iw_mr_pool_flush_worker(struct work_struct *work);
79static int rds_iw_init_fastreg(struct rds_iw_mr_pool *pool, struct rds_iw_mr *ibmr);
80static int rds_iw_map_fastreg(struct rds_iw_mr_pool *pool,
81 struct rds_iw_mr *ibmr,
82 struct scatterlist *sg, unsigned int nents);
83static void rds_iw_free_fastreg(struct rds_iw_mr_pool *pool, struct rds_iw_mr *ibmr);
84static unsigned int rds_iw_unmap_fastreg_list(struct rds_iw_mr_pool *pool,
85 struct list_head *unmap_list,
86 struct list_head *kill_list);
87static void rds_iw_destroy_fastreg(struct rds_iw_mr_pool *pool, struct rds_iw_mr *ibmr);
88
89static int rds_iw_get_device(struct rds_sock *rs, struct rds_iw_device **rds_iwdev, struct rdma_cm_id **cm_id)
90{
91 struct rds_iw_device *iwdev;
92 struct rds_iw_cm_id *i_cm_id;
93
94 *rds_iwdev = NULL;
95 *cm_id = NULL;
96
97 list_for_each_entry(iwdev, &rds_iw_devices, list) {
98 spin_lock_irq(&iwdev->spinlock);
99 list_for_each_entry(i_cm_id, &iwdev->cm_id_list, list) {
100 struct sockaddr_in *src_addr, *dst_addr;
101
102 src_addr = (struct sockaddr_in *)&i_cm_id->cm_id->route.addr.src_addr;
103 dst_addr = (struct sockaddr_in *)&i_cm_id->cm_id->route.addr.dst_addr;
104
105 rdsdebug("local ipaddr = %x port %d, "
106 "remote ipaddr = %x port %d"
107 "..looking for %x port %d, "
108 "remote ipaddr = %x port %d\n",
109 src_addr->sin_addr.s_addr,
110 src_addr->sin_port,
111 dst_addr->sin_addr.s_addr,
112 dst_addr->sin_port,
113 rs->rs_bound_addr,
114 rs->rs_bound_port,
115 rs->rs_conn_addr,
116 rs->rs_conn_port);
117#ifdef WORKING_TUPLE_DETECTION
118 if (src_addr->sin_addr.s_addr == rs->rs_bound_addr &&
119 src_addr->sin_port == rs->rs_bound_port &&
120 dst_addr->sin_addr.s_addr == rs->rs_conn_addr &&
121 dst_addr->sin_port == rs->rs_conn_port) {
122#else
123 /* FIXME - needs to compare the local and remote
124 * ipaddr/port tuple, but the ipaddr is the only
125 * available infomation in the rds_sock (as the rest are
126 * zero'ed. It doesn't appear to be properly populated
127 * during connection setup...
128 */
129 if (src_addr->sin_addr.s_addr == rs->rs_bound_addr) {
130#endif
131 spin_unlock_irq(&iwdev->spinlock);
132 *rds_iwdev = iwdev;
133 *cm_id = i_cm_id->cm_id;
134 return 0;
135 }
136 }
137 spin_unlock_irq(&iwdev->spinlock);
138 }
139
140 return 1;
141}
142
143static int rds_iw_add_cm_id(struct rds_iw_device *rds_iwdev, struct rdma_cm_id *cm_id)
144{
145 struct rds_iw_cm_id *i_cm_id;
146
147 i_cm_id = kmalloc(sizeof *i_cm_id, GFP_KERNEL);
148 if (!i_cm_id)
149 return -ENOMEM;
150
151 i_cm_id->cm_id = cm_id;
152
153 spin_lock_irq(&rds_iwdev->spinlock);
154 list_add_tail(&i_cm_id->list, &rds_iwdev->cm_id_list);
155 spin_unlock_irq(&rds_iwdev->spinlock);
156
157 return 0;
158}
159
160void rds_iw_remove_cm_id(struct rds_iw_device *rds_iwdev, struct rdma_cm_id *cm_id)
161{
162 struct rds_iw_cm_id *i_cm_id;
163
164 spin_lock_irq(&rds_iwdev->spinlock);
165 list_for_each_entry(i_cm_id, &rds_iwdev->cm_id_list, list) {
166 if (i_cm_id->cm_id == cm_id) {
167 list_del(&i_cm_id->list);
168 kfree(i_cm_id);
169 break;
170 }
171 }
172 spin_unlock_irq(&rds_iwdev->spinlock);
173}
174
175
176int rds_iw_update_cm_id(struct rds_iw_device *rds_iwdev, struct rdma_cm_id *cm_id)
177{
178 struct sockaddr_in *src_addr, *dst_addr;
179 struct rds_iw_device *rds_iwdev_old;
180 struct rds_sock rs;
181 struct rdma_cm_id *pcm_id;
182 int rc;
183
184 src_addr = (struct sockaddr_in *)&cm_id->route.addr.src_addr;
185 dst_addr = (struct sockaddr_in *)&cm_id->route.addr.dst_addr;
186
187 rs.rs_bound_addr = src_addr->sin_addr.s_addr;
188 rs.rs_bound_port = src_addr->sin_port;
189 rs.rs_conn_addr = dst_addr->sin_addr.s_addr;
190 rs.rs_conn_port = dst_addr->sin_port;
191
192 rc = rds_iw_get_device(&rs, &rds_iwdev_old, &pcm_id);
193 if (rc)
194 rds_iw_remove_cm_id(rds_iwdev, cm_id);
195
196 return rds_iw_add_cm_id(rds_iwdev, cm_id);
197}
198
199int rds_iw_add_conn(struct rds_iw_device *rds_iwdev, struct rds_connection *conn)
200{
201 struct rds_iw_connection *ic = conn->c_transport_data;
202
203 /* conn was previously on the nodev_conns_list */
204 spin_lock_irq(&iw_nodev_conns_lock);
205 BUG_ON(list_empty(&iw_nodev_conns));
206 BUG_ON(list_empty(&ic->iw_node));
207 list_del(&ic->iw_node);
208 spin_unlock_irq(&iw_nodev_conns_lock);
209
210 spin_lock_irq(&rds_iwdev->spinlock);
211 list_add_tail(&ic->iw_node, &rds_iwdev->conn_list);
212 spin_unlock_irq(&rds_iwdev->spinlock);
213
214 ic->rds_iwdev = rds_iwdev;
215
216 return 0;
217}
218
219void rds_iw_remove_nodev_conns(void)
220{
221 struct rds_iw_connection *ic, *_ic;
222 LIST_HEAD(tmp_list);
223
224 /* avoid calling conn_destroy with irqs off */
225 spin_lock_irq(&iw_nodev_conns_lock);
226 list_splice(&iw_nodev_conns, &tmp_list);
227 INIT_LIST_HEAD(&iw_nodev_conns);
228 spin_unlock_irq(&iw_nodev_conns_lock);
229
230 list_for_each_entry_safe(ic, _ic, &tmp_list, iw_node) {
231 if (ic->conn->c_passive)
232 rds_conn_destroy(ic->conn->c_passive);
233 rds_conn_destroy(ic->conn);
234 }
235}
236
237void rds_iw_remove_conns(struct rds_iw_device *rds_iwdev)
238{
239 struct rds_iw_connection *ic, *_ic;
240 LIST_HEAD(tmp_list);
241
242 /* avoid calling conn_destroy with irqs off */
243 spin_lock_irq(&rds_iwdev->spinlock);
244 list_splice(&rds_iwdev->conn_list, &tmp_list);
245 INIT_LIST_HEAD(&rds_iwdev->conn_list);
246 spin_unlock_irq(&rds_iwdev->spinlock);
247
248 list_for_each_entry_safe(ic, _ic, &tmp_list, iw_node) {
249 if (ic->conn->c_passive)
250 rds_conn_destroy(ic->conn->c_passive);
251 rds_conn_destroy(ic->conn);
252 }
253}
254
255static void rds_iw_set_scatterlist(struct rds_iw_scatterlist *sg,
256 struct scatterlist *list, unsigned int sg_len)
257{
258 sg->list = list;
259 sg->len = sg_len;
260 sg->dma_len = 0;
261 sg->dma_npages = 0;
262 sg->bytes = 0;
263}
264
265static u64 *rds_iw_map_scatterlist(struct rds_iw_device *rds_iwdev,
266 struct rds_iw_scatterlist *sg,
267 unsigned int dma_page_shift)
268{
269 struct ib_device *dev = rds_iwdev->dev;
270 u64 *dma_pages = NULL;
271 u64 dma_mask;
272 unsigned int dma_page_size;
273 int i, j, ret;
274
275 dma_page_size = 1 << dma_page_shift;
276 dma_mask = dma_page_size - 1;
277
278 WARN_ON(sg->dma_len);
279
280 sg->dma_len = ib_dma_map_sg(dev, sg->list, sg->len, DMA_BIDIRECTIONAL);
281 if (unlikely(!sg->dma_len)) {
282 printk(KERN_WARNING "RDS/IW: dma_map_sg failed!\n");
283 return ERR_PTR(-EBUSY);
284 }
285
286 sg->bytes = 0;
287 sg->dma_npages = 0;
288
289 ret = -EINVAL;
290 for (i = 0; i < sg->dma_len; ++i) {
291 unsigned int dma_len = ib_sg_dma_len(dev, &sg->list[i]);
292 u64 dma_addr = ib_sg_dma_address(dev, &sg->list[i]);
293 u64 end_addr;
294
295 sg->bytes += dma_len;
296
297 end_addr = dma_addr + dma_len;
298 if (dma_addr & dma_mask) {
299 if (i > 0)
300 goto out_unmap;
301 dma_addr &= ~dma_mask;
302 }
303 if (end_addr & dma_mask) {
304 if (i < sg->dma_len - 1)
305 goto out_unmap;
306 end_addr = (end_addr + dma_mask) & ~dma_mask;
307 }
308
309 sg->dma_npages += (end_addr - dma_addr) >> dma_page_shift;
310 }
311
312 /* Now gather the dma addrs into one list */
313 if (sg->dma_npages > fastreg_message_size)
314 goto out_unmap;
315
316 dma_pages = kmalloc(sizeof(u64) * sg->dma_npages, GFP_ATOMIC);
317 if (!dma_pages) {
318 ret = -ENOMEM;
319 goto out_unmap;
320 }
321
322 for (i = j = 0; i < sg->dma_len; ++i) {
323 unsigned int dma_len = ib_sg_dma_len(dev, &sg->list[i]);
324 u64 dma_addr = ib_sg_dma_address(dev, &sg->list[i]);
325 u64 end_addr;
326
327 end_addr = dma_addr + dma_len;
328 dma_addr &= ~dma_mask;
329 for (; dma_addr < end_addr; dma_addr += dma_page_size)
330 dma_pages[j++] = dma_addr;
331 BUG_ON(j > sg->dma_npages);
332 }
333
334 return dma_pages;
335
336out_unmap:
337 ib_dma_unmap_sg(rds_iwdev->dev, sg->list, sg->len, DMA_BIDIRECTIONAL);
338 sg->dma_len = 0;
339 kfree(dma_pages);
340 return ERR_PTR(ret);
341}
342
343
344struct rds_iw_mr_pool *rds_iw_create_mr_pool(struct rds_iw_device *rds_iwdev)
345{
346 struct rds_iw_mr_pool *pool;
347
348 pool = kzalloc(sizeof(*pool), GFP_KERNEL);
349 if (!pool) {
350 printk(KERN_WARNING "RDS/IW: rds_iw_create_mr_pool alloc error\n");
351 return ERR_PTR(-ENOMEM);
352 }
353
354 pool->device = rds_iwdev;
355 INIT_LIST_HEAD(&pool->dirty_list);
356 INIT_LIST_HEAD(&pool->clean_list);
357 mutex_init(&pool->flush_lock);
358 spin_lock_init(&pool->list_lock);
359 INIT_WORK(&pool->flush_worker, rds_iw_mr_pool_flush_worker);
360
361 pool->max_message_size = fastreg_message_size;
362 pool->max_items = fastreg_pool_size;
363 pool->max_free_pinned = pool->max_items * pool->max_message_size / 4;
364 pool->max_pages = fastreg_message_size;
365
366 /* We never allow more than max_items MRs to be allocated.
367 * When we exceed more than max_items_soft, we start freeing
368 * items more aggressively.
369 * Make sure that max_items > max_items_soft > max_items / 2
370 */
371 pool->max_items_soft = pool->max_items * 3 / 4;
372
373 return pool;
374}
375
376void rds_iw_get_mr_info(struct rds_iw_device *rds_iwdev, struct rds_info_rdma_connection *iinfo)
377{
378 struct rds_iw_mr_pool *pool = rds_iwdev->mr_pool;
379
380 iinfo->rdma_mr_max = pool->max_items;
381 iinfo->rdma_mr_size = pool->max_pages;
382}
383
384void rds_iw_destroy_mr_pool(struct rds_iw_mr_pool *pool)
385{
386 flush_workqueue(rds_wq);
387 rds_iw_flush_mr_pool(pool, 1);
388 BUG_ON(atomic_read(&pool->item_count));
389 BUG_ON(atomic_read(&pool->free_pinned));
390 kfree(pool);
391}
392
393static inline struct rds_iw_mr *rds_iw_reuse_fmr(struct rds_iw_mr_pool *pool)
394{
395 struct rds_iw_mr *ibmr = NULL;
396 unsigned long flags;
397
398 spin_lock_irqsave(&pool->list_lock, flags);
399 if (!list_empty(&pool->clean_list)) {
400 ibmr = list_entry(pool->clean_list.next, struct rds_iw_mr, mapping.m_list);
401 list_del_init(&ibmr->mapping.m_list);
402 }
403 spin_unlock_irqrestore(&pool->list_lock, flags);
404
405 return ibmr;
406}
407
408static struct rds_iw_mr *rds_iw_alloc_mr(struct rds_iw_device *rds_iwdev)
409{
410 struct rds_iw_mr_pool *pool = rds_iwdev->mr_pool;
411 struct rds_iw_mr *ibmr = NULL;
412 int err = 0, iter = 0;
413
414 while (1) {
415 ibmr = rds_iw_reuse_fmr(pool);
416 if (ibmr)
417 return ibmr;
418
419 /* No clean MRs - now we have the choice of either
420 * allocating a fresh MR up to the limit imposed by the
421 * driver, or flush any dirty unused MRs.
422 * We try to avoid stalling in the send path if possible,
423 * so we allocate as long as we're allowed to.
424 *
425 * We're fussy with enforcing the FMR limit, though. If the driver
426 * tells us we can't use more than N fmrs, we shouldn't start
427 * arguing with it */
428 if (atomic_inc_return(&pool->item_count) <= pool->max_items)
429 break;
430
431 atomic_dec(&pool->item_count);
432
433 if (++iter > 2) {
434 rds_iw_stats_inc(s_iw_rdma_mr_pool_depleted);
435 return ERR_PTR(-EAGAIN);
436 }
437
438 /* We do have some empty MRs. Flush them out. */
439 rds_iw_stats_inc(s_iw_rdma_mr_pool_wait);
440 rds_iw_flush_mr_pool(pool, 0);
441 }
442
443 ibmr = kzalloc(sizeof(*ibmr), GFP_KERNEL);
444 if (!ibmr) {
445 err = -ENOMEM;
446 goto out_no_cigar;
447 }
448
449 spin_lock_init(&ibmr->mapping.m_lock);
450 INIT_LIST_HEAD(&ibmr->mapping.m_list);
451 ibmr->mapping.m_mr = ibmr;
452
453 err = rds_iw_init_fastreg(pool, ibmr);
454 if (err)
455 goto out_no_cigar;
456
457 rds_iw_stats_inc(s_iw_rdma_mr_alloc);
458 return ibmr;
459
460out_no_cigar:
461 if (ibmr) {
462 rds_iw_destroy_fastreg(pool, ibmr);
463 kfree(ibmr);
464 }
465 atomic_dec(&pool->item_count);
466 return ERR_PTR(err);
467}
468
469void rds_iw_sync_mr(void *trans_private, int direction)
470{
471 struct rds_iw_mr *ibmr = trans_private;
472 struct rds_iw_device *rds_iwdev = ibmr->device;
473
474 switch (direction) {
475 case DMA_FROM_DEVICE:
476 ib_dma_sync_sg_for_cpu(rds_iwdev->dev, ibmr->mapping.m_sg.list,
477 ibmr->mapping.m_sg.dma_len, DMA_BIDIRECTIONAL);
478 break;
479 case DMA_TO_DEVICE:
480 ib_dma_sync_sg_for_device(rds_iwdev->dev, ibmr->mapping.m_sg.list,
481 ibmr->mapping.m_sg.dma_len, DMA_BIDIRECTIONAL);
482 break;
483 }
484}
485
486static inline unsigned int rds_iw_flush_goal(struct rds_iw_mr_pool *pool, int free_all)
487{
488 unsigned int item_count;
489
490 item_count = atomic_read(&pool->item_count);
491 if (free_all)
492 return item_count;
493
494 return 0;
495}
496
497/*
498 * Flush our pool of MRs.
499 * At a minimum, all currently unused MRs are unmapped.
500 * If the number of MRs allocated exceeds the limit, we also try
501 * to free as many MRs as needed to get back to this limit.
502 */
503static int rds_iw_flush_mr_pool(struct rds_iw_mr_pool *pool, int free_all)
504{
505 struct rds_iw_mr *ibmr, *next;
506 LIST_HEAD(unmap_list);
507 LIST_HEAD(kill_list);
508 unsigned long flags;
509 unsigned int nfreed = 0, ncleaned = 0, free_goal;
510 int ret = 0;
511
512 rds_iw_stats_inc(s_iw_rdma_mr_pool_flush);
513
514 mutex_lock(&pool->flush_lock);
515
516 spin_lock_irqsave(&pool->list_lock, flags);
517 /* Get the list of all mappings to be destroyed */
518 list_splice_init(&pool->dirty_list, &unmap_list);
519 if (free_all)
520 list_splice_init(&pool->clean_list, &kill_list);
521 spin_unlock_irqrestore(&pool->list_lock, flags);
522
523 free_goal = rds_iw_flush_goal(pool, free_all);
524
525 /* Batched invalidate of dirty MRs.
526 * For FMR based MRs, the mappings on the unmap list are
527 * actually members of an ibmr (ibmr->mapping). They either
528 * migrate to the kill_list, or have been cleaned and should be
529 * moved to the clean_list.
530 * For fastregs, they will be dynamically allocated, and
531 * will be destroyed by the unmap function.
532 */
533 if (!list_empty(&unmap_list)) {
534 ncleaned = rds_iw_unmap_fastreg_list(pool, &unmap_list, &kill_list);
535 /* If we've been asked to destroy all MRs, move those
536 * that were simply cleaned to the kill list */
537 if (free_all)
538 list_splice_init(&unmap_list, &kill_list);
539 }
540
541 /* Destroy any MRs that are past their best before date */
542 list_for_each_entry_safe(ibmr, next, &kill_list, mapping.m_list) {
543 rds_iw_stats_inc(s_iw_rdma_mr_free);
544 list_del(&ibmr->mapping.m_list);
545 rds_iw_destroy_fastreg(pool, ibmr);
546 kfree(ibmr);
547 nfreed++;
548 }
549
550 /* Anything that remains are laundered ibmrs, which we can add
551 * back to the clean list. */
552 if (!list_empty(&unmap_list)) {
553 spin_lock_irqsave(&pool->list_lock, flags);
554 list_splice(&unmap_list, &pool->clean_list);
555 spin_unlock_irqrestore(&pool->list_lock, flags);
556 }
557
558 atomic_sub(ncleaned, &pool->dirty_count);
559 atomic_sub(nfreed, &pool->item_count);
560
561 mutex_unlock(&pool->flush_lock);
562 return ret;
563}
564
565static void rds_iw_mr_pool_flush_worker(struct work_struct *work)
566{
567 struct rds_iw_mr_pool *pool = container_of(work, struct rds_iw_mr_pool, flush_worker);
568
569 rds_iw_flush_mr_pool(pool, 0);
570}
571
572void rds_iw_free_mr(void *trans_private, int invalidate)
573{
574 struct rds_iw_mr *ibmr = trans_private;
575 struct rds_iw_mr_pool *pool = ibmr->device->mr_pool;
576
577 rdsdebug("RDS/IW: free_mr nents %u\n", ibmr->mapping.m_sg.len);
578 if (!pool)
579 return;
580
581 /* Return it to the pool's free list */
582 rds_iw_free_fastreg(pool, ibmr);
583
584 /* If we've pinned too many pages, request a flush */
585 if (atomic_read(&pool->free_pinned) >= pool->max_free_pinned
586 || atomic_read(&pool->dirty_count) >= pool->max_items / 10)
587 queue_work(rds_wq, &pool->flush_worker);
588
589 if (invalidate) {
590 if (likely(!in_interrupt())) {
591 rds_iw_flush_mr_pool(pool, 0);
592 } else {
593 /* We get here if the user created a MR marked
594 * as use_once and invalidate at the same time. */
595 queue_work(rds_wq, &pool->flush_worker);
596 }
597 }
598}
599
600void rds_iw_flush_mrs(void)
601{
602 struct rds_iw_device *rds_iwdev;
603
604 list_for_each_entry(rds_iwdev, &rds_iw_devices, list) {
605 struct rds_iw_mr_pool *pool = rds_iwdev->mr_pool;
606
607 if (pool)
608 rds_iw_flush_mr_pool(pool, 0);
609 }
610}
611
612void *rds_iw_get_mr(struct scatterlist *sg, unsigned long nents,
613 struct rds_sock *rs, u32 *key_ret)
614{
615 struct rds_iw_device *rds_iwdev;
616 struct rds_iw_mr *ibmr = NULL;
617 struct rdma_cm_id *cm_id;
618 int ret;
619
620 ret = rds_iw_get_device(rs, &rds_iwdev, &cm_id);
621 if (ret || !cm_id) {
622 ret = -ENODEV;
623 goto out;
624 }
625
626 if (!rds_iwdev->mr_pool) {
627 ret = -ENODEV;
628 goto out;
629 }
630
631 ibmr = rds_iw_alloc_mr(rds_iwdev);
632 if (IS_ERR(ibmr))
633 return ibmr;
634
635 ibmr->cm_id = cm_id;
636 ibmr->device = rds_iwdev;
637
638 ret = rds_iw_map_fastreg(rds_iwdev->mr_pool, ibmr, sg, nents);
639 if (ret == 0)
640 *key_ret = ibmr->mr->rkey;
641 else
642 printk(KERN_WARNING "RDS/IW: failed to map mr (errno=%d)\n", ret);
643
644out:
645 if (ret) {
646 if (ibmr)
647 rds_iw_free_mr(ibmr, 0);
648 ibmr = ERR_PTR(ret);
649 }
650 return ibmr;
651}
652
653/*
654 * iWARP fastreg handling
655 *
656 * The life cycle of a fastreg registration is a bit different from
657 * FMRs.
658 * The idea behind fastreg is to have one MR, to which we bind different
659 * mappings over time. To avoid stalling on the expensive map and invalidate
660 * operations, these operations are pipelined on the same send queue on
661 * which we want to send the message containing the r_key.
662 *
663 * This creates a bit of a problem for us, as we do not have the destination
664 * IP in GET_MR, so the connection must be setup prior to the GET_MR call for
665 * RDMA to be correctly setup. If a fastreg request is present, rds_iw_xmit
666 * will try to queue a LOCAL_INV (if needed) and a FAST_REG_MR work request
667 * before queuing the SEND. When completions for these arrive, they are
668 * dispatched to the MR has a bit set showing that RDMa can be performed.
669 *
670 * There is another interesting aspect that's related to invalidation.
671 * The application can request that a mapping is invalidated in FREE_MR.
672 * The expectation there is that this invalidation step includes ALL
673 * PREVIOUSLY FREED MRs.
674 */
675static int rds_iw_init_fastreg(struct rds_iw_mr_pool *pool,
676 struct rds_iw_mr *ibmr)
677{
678 struct rds_iw_device *rds_iwdev = pool->device;
679 struct ib_fast_reg_page_list *page_list = NULL;
680 struct ib_mr *mr;
681 int err;
682
683 mr = ib_alloc_fast_reg_mr(rds_iwdev->pd, pool->max_message_size);
684 if (IS_ERR(mr)) {
685 err = PTR_ERR(mr);
686
687 printk(KERN_WARNING "RDS/IW: ib_alloc_fast_reg_mr failed (err=%d)\n", err);
688 return err;
689 }
690
691 /* FIXME - this is overkill, but mapping->m_sg.dma_len/mapping->m_sg.dma_npages
692 * is not filled in.
693 */
694 page_list = ib_alloc_fast_reg_page_list(rds_iwdev->dev, pool->max_message_size);
695 if (IS_ERR(page_list)) {
696 err = PTR_ERR(page_list);
697
698 printk(KERN_WARNING "RDS/IW: ib_alloc_fast_reg_page_list failed (err=%d)\n", err);
699 ib_dereg_mr(mr);
700 return err;
701 }
702
703 ibmr->page_list = page_list;
704 ibmr->mr = mr;
705 return 0;
706}
707
708static int rds_iw_rdma_build_fastreg(struct rds_iw_mapping *mapping)
709{
710 struct rds_iw_mr *ibmr = mapping->m_mr;
711 struct ib_send_wr f_wr, *failed_wr;
712 int ret;
713
714 /*
715 * Perform a WR for the fast_reg_mr. Each individual page
716 * in the sg list is added to the fast reg page list and placed
717 * inside the fast_reg_mr WR. The key used is a rolling 8bit
718 * counter, which should guarantee uniqueness.
719 */
720 ib_update_fast_reg_key(ibmr->mr, ibmr->remap_count++);
721 mapping->m_rkey = ibmr->mr->rkey;
722
723 memset(&f_wr, 0, sizeof(f_wr));
724 f_wr.wr_id = RDS_IW_FAST_REG_WR_ID;
725 f_wr.opcode = IB_WR_FAST_REG_MR;
726 f_wr.wr.fast_reg.length = mapping->m_sg.bytes;
727 f_wr.wr.fast_reg.rkey = mapping->m_rkey;
728 f_wr.wr.fast_reg.page_list = ibmr->page_list;
729 f_wr.wr.fast_reg.page_list_len = mapping->m_sg.dma_len;
730 f_wr.wr.fast_reg.page_shift = ibmr->device->page_shift;
731 f_wr.wr.fast_reg.access_flags = IB_ACCESS_LOCAL_WRITE |
732 IB_ACCESS_REMOTE_READ |
733 IB_ACCESS_REMOTE_WRITE;
734 f_wr.wr.fast_reg.iova_start = 0;
735 f_wr.send_flags = IB_SEND_SIGNALED;
736
737 failed_wr = &f_wr;
738 ret = ib_post_send(ibmr->cm_id->qp, &f_wr, &failed_wr);
739 BUG_ON(failed_wr != &f_wr);
740 if (ret && printk_ratelimit())
741 printk(KERN_WARNING "RDS/IW: %s:%d ib_post_send returned %d\n",
742 __func__, __LINE__, ret);
743 return ret;
744}
745
746static int rds_iw_rdma_fastreg_inv(struct rds_iw_mr *ibmr)
747{
748 struct ib_send_wr s_wr, *failed_wr;
749 int ret = 0;
750
751 if (!ibmr->cm_id->qp || !ibmr->mr)
752 goto out;
753
754 memset(&s_wr, 0, sizeof(s_wr));
755 s_wr.wr_id = RDS_IW_LOCAL_INV_WR_ID;
756 s_wr.opcode = IB_WR_LOCAL_INV;
757 s_wr.ex.invalidate_rkey = ibmr->mr->rkey;
758 s_wr.send_flags = IB_SEND_SIGNALED;
759
760 failed_wr = &s_wr;
761 ret = ib_post_send(ibmr->cm_id->qp, &s_wr, &failed_wr);
762 if (ret && printk_ratelimit()) {
763 printk(KERN_WARNING "RDS/IW: %s:%d ib_post_send returned %d\n",
764 __func__, __LINE__, ret);
765 goto out;
766 }
767out:
768 return ret;
769}
770
771static int rds_iw_map_fastreg(struct rds_iw_mr_pool *pool,
772 struct rds_iw_mr *ibmr,
773 struct scatterlist *sg,
774 unsigned int sg_len)
775{
776 struct rds_iw_device *rds_iwdev = pool->device;
777 struct rds_iw_mapping *mapping = &ibmr->mapping;
778 u64 *dma_pages;
779 int i, ret = 0;
780
781 rds_iw_set_scatterlist(&mapping->m_sg, sg, sg_len);
782
783 dma_pages = rds_iw_map_scatterlist(rds_iwdev,
784 &mapping->m_sg,
785 rds_iwdev->page_shift);
786 if (IS_ERR(dma_pages)) {
787 ret = PTR_ERR(dma_pages);
788 dma_pages = NULL;
789 goto out;
790 }
791
792 if (mapping->m_sg.dma_len > pool->max_message_size) {
793 ret = -EMSGSIZE;
794 goto out;
795 }
796
797 for (i = 0; i < mapping->m_sg.dma_npages; ++i)
798 ibmr->page_list->page_list[i] = dma_pages[i];
799
800 ret = rds_iw_rdma_build_fastreg(mapping);
801 if (ret)
802 goto out;
803
804 rds_iw_stats_inc(s_iw_rdma_mr_used);
805
806out:
807 kfree(dma_pages);
808
809 return ret;
810}
811
812/*
813 * "Free" a fastreg MR.
814 */
815static void rds_iw_free_fastreg(struct rds_iw_mr_pool *pool,
816 struct rds_iw_mr *ibmr)
817{
818 unsigned long flags;
819 int ret;
820
821 if (!ibmr->mapping.m_sg.dma_len)
822 return;
823
824 ret = rds_iw_rdma_fastreg_inv(ibmr);
825 if (ret)
826 return;
827
828 /* Try to post the LOCAL_INV WR to the queue. */
829 spin_lock_irqsave(&pool->list_lock, flags);
830
831 list_add_tail(&ibmr->mapping.m_list, &pool->dirty_list);
832 atomic_add(ibmr->mapping.m_sg.len, &pool->free_pinned);
833 atomic_inc(&pool->dirty_count);
834
835 spin_unlock_irqrestore(&pool->list_lock, flags);
836}
837
838static unsigned int rds_iw_unmap_fastreg_list(struct rds_iw_mr_pool *pool,
839 struct list_head *unmap_list,
840 struct list_head *kill_list)
841{
842 struct rds_iw_mapping *mapping, *next;
843 unsigned int ncleaned = 0;
844 LIST_HEAD(laundered);
845
846 /* Batched invalidation of fastreg MRs.
847 * Why do we do it this way, even though we could pipeline unmap
848 * and remap? The reason is the application semantics - when the
849 * application requests an invalidation of MRs, it expects all
850 * previously released R_Keys to become invalid.
851 *
852 * If we implement MR reuse naively, we risk memory corruption
853 * (this has actually been observed). So the default behavior
854 * requires that a MR goes through an explicit unmap operation before
855 * we can reuse it again.
856 *
857 * We could probably improve on this a little, by allowing immediate
858 * reuse of a MR on the same socket (eg you could add small
859 * cache of unused MRs to strct rds_socket - GET_MR could grab one
860 * of these without requiring an explicit invalidate).
861 */
862 while (!list_empty(unmap_list)) {
863 unsigned long flags;
864
865 spin_lock_irqsave(&pool->list_lock, flags);
866 list_for_each_entry_safe(mapping, next, unmap_list, m_list) {
867 list_move(&mapping->m_list, &laundered);
868 ncleaned++;
869 }
870 spin_unlock_irqrestore(&pool->list_lock, flags);
871 }
872
873 /* Move all laundered mappings back to the unmap list.
874 * We do not kill any WRs right now - it doesn't seem the
875 * fastreg API has a max_remap limit. */
876 list_splice_init(&laundered, unmap_list);
877
878 return ncleaned;
879}
880
881static void rds_iw_destroy_fastreg(struct rds_iw_mr_pool *pool,
882 struct rds_iw_mr *ibmr)
883{
884 if (ibmr->page_list)
885 ib_free_fast_reg_page_list(ibmr->page_list);
886 if (ibmr->mr)
887 ib_dereg_mr(ibmr->mr);
888}
diff --git a/net/rds/iw_recv.c b/net/rds/iw_recv.c
new file mode 100644
index 000000000000..a1931f0027a2
--- /dev/null
+++ b/net/rds/iw_recv.c
@@ -0,0 +1,869 @@
1/*
2 * Copyright (c) 2006 Oracle. All rights reserved.
3 *
4 * This software is available to you under a choice of one of two
5 * licenses. You may choose to be licensed under the terms of the GNU
6 * General Public License (GPL) Version 2, available from the file
7 * COPYING in the main directory of this source tree, or the
8 * OpenIB.org BSD license below:
9 *
10 * Redistribution and use in source and binary forms, with or
11 * without modification, are permitted provided that the following
12 * conditions are met:
13 *
14 * - Redistributions of source code must retain the above
15 * copyright notice, this list of conditions and the following
16 * disclaimer.
17 *
18 * - Redistributions in binary form must reproduce the above
19 * copyright notice, this list of conditions and the following
20 * disclaimer in the documentation and/or other materials
21 * provided with the distribution.
22 *
23 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
24 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
25 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
26 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
27 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
28 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
29 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
30 * SOFTWARE.
31 *
32 */
33#include <linux/kernel.h>
34#include <linux/pci.h>
35#include <linux/dma-mapping.h>
36#include <rdma/rdma_cm.h>
37
38#include "rds.h"
39#include "iw.h"
40
41static struct kmem_cache *rds_iw_incoming_slab;
42static struct kmem_cache *rds_iw_frag_slab;
43static atomic_t rds_iw_allocation = ATOMIC_INIT(0);
44
45static void rds_iw_frag_drop_page(struct rds_page_frag *frag)
46{
47 rdsdebug("frag %p page %p\n", frag, frag->f_page);
48 __free_page(frag->f_page);
49 frag->f_page = NULL;
50}
51
52static void rds_iw_frag_free(struct rds_page_frag *frag)
53{
54 rdsdebug("frag %p page %p\n", frag, frag->f_page);
55 BUG_ON(frag->f_page != NULL);
56 kmem_cache_free(rds_iw_frag_slab, frag);
57}
58
59/*
60 * We map a page at a time. Its fragments are posted in order. This
61 * is called in fragment order as the fragments get send completion events.
62 * Only the last frag in the page performs the unmapping.
63 *
64 * It's OK for ring cleanup to call this in whatever order it likes because
65 * DMA is not in flight and so we can unmap while other ring entries still
66 * hold page references in their frags.
67 */
68static void rds_iw_recv_unmap_page(struct rds_iw_connection *ic,
69 struct rds_iw_recv_work *recv)
70{
71 struct rds_page_frag *frag = recv->r_frag;
72
73 rdsdebug("recv %p frag %p page %p\n", recv, frag, frag->f_page);
74 if (frag->f_mapped)
75 ib_dma_unmap_page(ic->i_cm_id->device,
76 frag->f_mapped,
77 RDS_FRAG_SIZE, DMA_FROM_DEVICE);
78 frag->f_mapped = 0;
79}
80
81void rds_iw_recv_init_ring(struct rds_iw_connection *ic)
82{
83 struct rds_iw_recv_work *recv;
84 u32 i;
85
86 for (i = 0, recv = ic->i_recvs; i < ic->i_recv_ring.w_nr; i++, recv++) {
87 struct ib_sge *sge;
88
89 recv->r_iwinc = NULL;
90 recv->r_frag = NULL;
91
92 recv->r_wr.next = NULL;
93 recv->r_wr.wr_id = i;
94 recv->r_wr.sg_list = recv->r_sge;
95 recv->r_wr.num_sge = RDS_IW_RECV_SGE;
96
97 sge = rds_iw_data_sge(ic, recv->r_sge);
98 sge->addr = 0;
99 sge->length = RDS_FRAG_SIZE;
100 sge->lkey = 0;
101
102 sge = rds_iw_header_sge(ic, recv->r_sge);
103 sge->addr = ic->i_recv_hdrs_dma + (i * sizeof(struct rds_header));
104 sge->length = sizeof(struct rds_header);
105 sge->lkey = 0;
106 }
107}
108
109static void rds_iw_recv_clear_one(struct rds_iw_connection *ic,
110 struct rds_iw_recv_work *recv)
111{
112 if (recv->r_iwinc) {
113 rds_inc_put(&recv->r_iwinc->ii_inc);
114 recv->r_iwinc = NULL;
115 }
116 if (recv->r_frag) {
117 rds_iw_recv_unmap_page(ic, recv);
118 if (recv->r_frag->f_page)
119 rds_iw_frag_drop_page(recv->r_frag);
120 rds_iw_frag_free(recv->r_frag);
121 recv->r_frag = NULL;
122 }
123}
124
125void rds_iw_recv_clear_ring(struct rds_iw_connection *ic)
126{
127 u32 i;
128
129 for (i = 0; i < ic->i_recv_ring.w_nr; i++)
130 rds_iw_recv_clear_one(ic, &ic->i_recvs[i]);
131
132 if (ic->i_frag.f_page)
133 rds_iw_frag_drop_page(&ic->i_frag);
134}
135
136static int rds_iw_recv_refill_one(struct rds_connection *conn,
137 struct rds_iw_recv_work *recv,
138 gfp_t kptr_gfp, gfp_t page_gfp)
139{
140 struct rds_iw_connection *ic = conn->c_transport_data;
141 dma_addr_t dma_addr;
142 struct ib_sge *sge;
143 int ret = -ENOMEM;
144
145 if (recv->r_iwinc == NULL) {
146 if (atomic_read(&rds_iw_allocation) >= rds_iw_sysctl_max_recv_allocation) {
147 rds_iw_stats_inc(s_iw_rx_alloc_limit);
148 goto out;
149 }
150 recv->r_iwinc = kmem_cache_alloc(rds_iw_incoming_slab,
151 kptr_gfp);
152 if (recv->r_iwinc == NULL)
153 goto out;
154 atomic_inc(&rds_iw_allocation);
155 INIT_LIST_HEAD(&recv->r_iwinc->ii_frags);
156 rds_inc_init(&recv->r_iwinc->ii_inc, conn, conn->c_faddr);
157 }
158
159 if (recv->r_frag == NULL) {
160 recv->r_frag = kmem_cache_alloc(rds_iw_frag_slab, kptr_gfp);
161 if (recv->r_frag == NULL)
162 goto out;
163 INIT_LIST_HEAD(&recv->r_frag->f_item);
164 recv->r_frag->f_page = NULL;
165 }
166
167 if (ic->i_frag.f_page == NULL) {
168 ic->i_frag.f_page = alloc_page(page_gfp);
169 if (ic->i_frag.f_page == NULL)
170 goto out;
171 ic->i_frag.f_offset = 0;
172 }
173
174 dma_addr = ib_dma_map_page(ic->i_cm_id->device,
175 ic->i_frag.f_page,
176 ic->i_frag.f_offset,
177 RDS_FRAG_SIZE,
178 DMA_FROM_DEVICE);
179 if (ib_dma_mapping_error(ic->i_cm_id->device, dma_addr))
180 goto out;
181
182 /*
183 * Once we get the RDS_PAGE_LAST_OFF frag then rds_iw_frag_unmap()
184 * must be called on this recv. This happens as completions hit
185 * in order or on connection shutdown.
186 */
187 recv->r_frag->f_page = ic->i_frag.f_page;
188 recv->r_frag->f_offset = ic->i_frag.f_offset;
189 recv->r_frag->f_mapped = dma_addr;
190
191 sge = rds_iw_data_sge(ic, recv->r_sge);
192 sge->addr = dma_addr;
193 sge->length = RDS_FRAG_SIZE;
194
195 sge = rds_iw_header_sge(ic, recv->r_sge);
196 sge->addr = ic->i_recv_hdrs_dma + (recv - ic->i_recvs) * sizeof(struct rds_header);
197 sge->length = sizeof(struct rds_header);
198
199 get_page(recv->r_frag->f_page);
200
201 if (ic->i_frag.f_offset < RDS_PAGE_LAST_OFF) {
202 ic->i_frag.f_offset += RDS_FRAG_SIZE;
203 } else {
204 put_page(ic->i_frag.f_page);
205 ic->i_frag.f_page = NULL;
206 ic->i_frag.f_offset = 0;
207 }
208
209 ret = 0;
210out:
211 return ret;
212}
213
214/*
215 * This tries to allocate and post unused work requests after making sure that
216 * they have all the allocations they need to queue received fragments into
217 * sockets. The i_recv_mutex is held here so that ring_alloc and _unalloc
218 * pairs don't go unmatched.
219 *
220 * -1 is returned if posting fails due to temporary resource exhaustion.
221 */
222int rds_iw_recv_refill(struct rds_connection *conn, gfp_t kptr_gfp,
223 gfp_t page_gfp, int prefill)
224{
225 struct rds_iw_connection *ic = conn->c_transport_data;
226 struct rds_iw_recv_work *recv;
227 struct ib_recv_wr *failed_wr;
228 unsigned int posted = 0;
229 int ret = 0;
230 u32 pos;
231
232 while ((prefill || rds_conn_up(conn))
233 && rds_iw_ring_alloc(&ic->i_recv_ring, 1, &pos)) {
234 if (pos >= ic->i_recv_ring.w_nr) {
235 printk(KERN_NOTICE "Argh - ring alloc returned pos=%u\n",
236 pos);
237 ret = -EINVAL;
238 break;
239 }
240
241 recv = &ic->i_recvs[pos];
242 ret = rds_iw_recv_refill_one(conn, recv, kptr_gfp, page_gfp);
243 if (ret) {
244 ret = -1;
245 break;
246 }
247
248 /* XXX when can this fail? */
249 ret = ib_post_recv(ic->i_cm_id->qp, &recv->r_wr, &failed_wr);
250 rdsdebug("recv %p iwinc %p page %p addr %lu ret %d\n", recv,
251 recv->r_iwinc, recv->r_frag->f_page,
252 (long) recv->r_frag->f_mapped, ret);
253 if (ret) {
254 rds_iw_conn_error(conn, "recv post on "
255 "%pI4 returned %d, disconnecting and "
256 "reconnecting\n", &conn->c_faddr,
257 ret);
258 ret = -1;
259 break;
260 }
261
262 posted++;
263 }
264
265 /* We're doing flow control - update the window. */
266 if (ic->i_flowctl && posted)
267 rds_iw_advertise_credits(conn, posted);
268
269 if (ret)
270 rds_iw_ring_unalloc(&ic->i_recv_ring, 1);
271 return ret;
272}
273
274void rds_iw_inc_purge(struct rds_incoming *inc)
275{
276 struct rds_iw_incoming *iwinc;
277 struct rds_page_frag *frag;
278 struct rds_page_frag *pos;
279
280 iwinc = container_of(inc, struct rds_iw_incoming, ii_inc);
281 rdsdebug("purging iwinc %p inc %p\n", iwinc, inc);
282
283 list_for_each_entry_safe(frag, pos, &iwinc->ii_frags, f_item) {
284 list_del_init(&frag->f_item);
285 rds_iw_frag_drop_page(frag);
286 rds_iw_frag_free(frag);
287 }
288}
289
290void rds_iw_inc_free(struct rds_incoming *inc)
291{
292 struct rds_iw_incoming *iwinc;
293
294 iwinc = container_of(inc, struct rds_iw_incoming, ii_inc);
295
296 rds_iw_inc_purge(inc);
297 rdsdebug("freeing iwinc %p inc %p\n", iwinc, inc);
298 BUG_ON(!list_empty(&iwinc->ii_frags));
299 kmem_cache_free(rds_iw_incoming_slab, iwinc);
300 atomic_dec(&rds_iw_allocation);
301 BUG_ON(atomic_read(&rds_iw_allocation) < 0);
302}
303
304int rds_iw_inc_copy_to_user(struct rds_incoming *inc, struct iovec *first_iov,
305 size_t size)
306{
307 struct rds_iw_incoming *iwinc;
308 struct rds_page_frag *frag;
309 struct iovec *iov = first_iov;
310 unsigned long to_copy;
311 unsigned long frag_off = 0;
312 unsigned long iov_off = 0;
313 int copied = 0;
314 int ret;
315 u32 len;
316
317 iwinc = container_of(inc, struct rds_iw_incoming, ii_inc);
318 frag = list_entry(iwinc->ii_frags.next, struct rds_page_frag, f_item);
319 len = be32_to_cpu(inc->i_hdr.h_len);
320
321 while (copied < size && copied < len) {
322 if (frag_off == RDS_FRAG_SIZE) {
323 frag = list_entry(frag->f_item.next,
324 struct rds_page_frag, f_item);
325 frag_off = 0;
326 }
327 while (iov_off == iov->iov_len) {
328 iov_off = 0;
329 iov++;
330 }
331
332 to_copy = min(iov->iov_len - iov_off, RDS_FRAG_SIZE - frag_off);
333 to_copy = min_t(size_t, to_copy, size - copied);
334 to_copy = min_t(unsigned long, to_copy, len - copied);
335
336 rdsdebug("%lu bytes to user [%p, %zu] + %lu from frag "
337 "[%p, %lu] + %lu\n",
338 to_copy, iov->iov_base, iov->iov_len, iov_off,
339 frag->f_page, frag->f_offset, frag_off);
340
341 /* XXX needs + offset for multiple recvs per page */
342 ret = rds_page_copy_to_user(frag->f_page,
343 frag->f_offset + frag_off,
344 iov->iov_base + iov_off,
345 to_copy);
346 if (ret) {
347 copied = ret;
348 break;
349 }
350
351 iov_off += to_copy;
352 frag_off += to_copy;
353 copied += to_copy;
354 }
355
356 return copied;
357}
358
359/* ic starts out kzalloc()ed */
360void rds_iw_recv_init_ack(struct rds_iw_connection *ic)
361{
362 struct ib_send_wr *wr = &ic->i_ack_wr;
363 struct ib_sge *sge = &ic->i_ack_sge;
364
365 sge->addr = ic->i_ack_dma;
366 sge->length = sizeof(struct rds_header);
367 sge->lkey = rds_iw_local_dma_lkey(ic);
368
369 wr->sg_list = sge;
370 wr->num_sge = 1;
371 wr->opcode = IB_WR_SEND;
372 wr->wr_id = RDS_IW_ACK_WR_ID;
373 wr->send_flags = IB_SEND_SIGNALED | IB_SEND_SOLICITED;
374}
375
376/*
377 * You'd think that with reliable IB connections you wouldn't need to ack
378 * messages that have been received. The problem is that IB hardware generates
379 * an ack message before it has DMAed the message into memory. This creates a
380 * potential message loss if the HCA is disabled for any reason between when it
381 * sends the ack and before the message is DMAed and processed. This is only a
382 * potential issue if another HCA is available for fail-over.
383 *
384 * When the remote host receives our ack they'll free the sent message from
385 * their send queue. To decrease the latency of this we always send an ack
386 * immediately after we've received messages.
387 *
388 * For simplicity, we only have one ack in flight at a time. This puts
389 * pressure on senders to have deep enough send queues to absorb the latency of
390 * a single ack frame being in flight. This might not be good enough.
391 *
392 * This is implemented by have a long-lived send_wr and sge which point to a
393 * statically allocated ack frame. This ack wr does not fall under the ring
394 * accounting that the tx and rx wrs do. The QP attribute specifically makes
395 * room for it beyond the ring size. Send completion notices its special
396 * wr_id and avoids working with the ring in that case.
397 */
398static void rds_iw_set_ack(struct rds_iw_connection *ic, u64 seq,
399 int ack_required)
400{
401 rds_iw_set_64bit(&ic->i_ack_next, seq);
402 if (ack_required) {
403 smp_mb__before_clear_bit();
404 set_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
405 }
406}
407
408static u64 rds_iw_get_ack(struct rds_iw_connection *ic)
409{
410 clear_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
411 smp_mb__after_clear_bit();
412
413 return ic->i_ack_next;
414}
415
416static void rds_iw_send_ack(struct rds_iw_connection *ic, unsigned int adv_credits)
417{
418 struct rds_header *hdr = ic->i_ack;
419 struct ib_send_wr *failed_wr;
420 u64 seq;
421 int ret;
422
423 seq = rds_iw_get_ack(ic);
424
425 rdsdebug("send_ack: ic %p ack %llu\n", ic, (unsigned long long) seq);
426 rds_message_populate_header(hdr, 0, 0, 0);
427 hdr->h_ack = cpu_to_be64(seq);
428 hdr->h_credit = adv_credits;
429 rds_message_make_checksum(hdr);
430 ic->i_ack_queued = jiffies;
431
432 ret = ib_post_send(ic->i_cm_id->qp, &ic->i_ack_wr, &failed_wr);
433 if (unlikely(ret)) {
434 /* Failed to send. Release the WR, and
435 * force another ACK.
436 */
437 clear_bit(IB_ACK_IN_FLIGHT, &ic->i_ack_flags);
438 set_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
439
440 rds_iw_stats_inc(s_iw_ack_send_failure);
441 /* Need to finesse this later. */
442 BUG();
443 } else
444 rds_iw_stats_inc(s_iw_ack_sent);
445}
446
447/*
448 * There are 3 ways of getting acknowledgements to the peer:
449 * 1. We call rds_iw_attempt_ack from the recv completion handler
450 * to send an ACK-only frame.
451 * However, there can be only one such frame in the send queue
452 * at any time, so we may have to postpone it.
453 * 2. When another (data) packet is transmitted while there's
454 * an ACK in the queue, we piggyback the ACK sequence number
455 * on the data packet.
456 * 3. If the ACK WR is done sending, we get called from the
457 * send queue completion handler, and check whether there's
458 * another ACK pending (postponed because the WR was on the
459 * queue). If so, we transmit it.
460 *
461 * We maintain 2 variables:
462 * - i_ack_flags, which keeps track of whether the ACK WR
463 * is currently in the send queue or not (IB_ACK_IN_FLIGHT)
464 * - i_ack_next, which is the last sequence number we received
465 *
466 * Potentially, send queue and receive queue handlers can run concurrently.
467 *
468 * Reconnecting complicates this picture just slightly. When we
469 * reconnect, we may be seeing duplicate packets. The peer
470 * is retransmitting them, because it hasn't seen an ACK for
471 * them. It is important that we ACK these.
472 *
473 * ACK mitigation adds a header flag "ACK_REQUIRED"; any packet with
474 * this flag set *MUST* be acknowledged immediately.
475 */
476
477/*
478 * When we get here, we're called from the recv queue handler.
479 * Check whether we ought to transmit an ACK.
480 */
481void rds_iw_attempt_ack(struct rds_iw_connection *ic)
482{
483 unsigned int adv_credits;
484
485 if (!test_bit(IB_ACK_REQUESTED, &ic->i_ack_flags))
486 return;
487
488 if (test_and_set_bit(IB_ACK_IN_FLIGHT, &ic->i_ack_flags)) {
489 rds_iw_stats_inc(s_iw_ack_send_delayed);
490 return;
491 }
492
493 /* Can we get a send credit? */
494 if (!rds_iw_send_grab_credits(ic, 1, &adv_credits, 0)) {
495 rds_iw_stats_inc(s_iw_tx_throttle);
496 clear_bit(IB_ACK_IN_FLIGHT, &ic->i_ack_flags);
497 return;
498 }
499
500 clear_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
501 rds_iw_send_ack(ic, adv_credits);
502}
503
504/*
505 * We get here from the send completion handler, when the
506 * adapter tells us the ACK frame was sent.
507 */
508void rds_iw_ack_send_complete(struct rds_iw_connection *ic)
509{
510 clear_bit(IB_ACK_IN_FLIGHT, &ic->i_ack_flags);
511 rds_iw_attempt_ack(ic);
512}
513
514/*
515 * This is called by the regular xmit code when it wants to piggyback
516 * an ACK on an outgoing frame.
517 */
518u64 rds_iw_piggyb_ack(struct rds_iw_connection *ic)
519{
520 if (test_and_clear_bit(IB_ACK_REQUESTED, &ic->i_ack_flags))
521 rds_iw_stats_inc(s_iw_ack_send_piggybacked);
522 return rds_iw_get_ack(ic);
523}
524
525/*
526 * It's kind of lame that we're copying from the posted receive pages into
527 * long-lived bitmaps. We could have posted the bitmaps and rdma written into
528 * them. But receiving new congestion bitmaps should be a *rare* event, so
529 * hopefully we won't need to invest that complexity in making it more
530 * efficient. By copying we can share a simpler core with TCP which has to
531 * copy.
532 */
533static void rds_iw_cong_recv(struct rds_connection *conn,
534 struct rds_iw_incoming *iwinc)
535{
536 struct rds_cong_map *map;
537 unsigned int map_off;
538 unsigned int map_page;
539 struct rds_page_frag *frag;
540 unsigned long frag_off;
541 unsigned long to_copy;
542 unsigned long copied;
543 uint64_t uncongested = 0;
544 void *addr;
545
546 /* catch completely corrupt packets */
547 if (be32_to_cpu(iwinc->ii_inc.i_hdr.h_len) != RDS_CONG_MAP_BYTES)
548 return;
549
550 map = conn->c_fcong;
551 map_page = 0;
552 map_off = 0;
553
554 frag = list_entry(iwinc->ii_frags.next, struct rds_page_frag, f_item);
555 frag_off = 0;
556
557 copied = 0;
558
559 while (copied < RDS_CONG_MAP_BYTES) {
560 uint64_t *src, *dst;
561 unsigned int k;
562
563 to_copy = min(RDS_FRAG_SIZE - frag_off, PAGE_SIZE - map_off);
564 BUG_ON(to_copy & 7); /* Must be 64bit aligned. */
565
566 addr = kmap_atomic(frag->f_page, KM_SOFTIRQ0);
567
568 src = addr + frag_off;
569 dst = (void *)map->m_page_addrs[map_page] + map_off;
570 for (k = 0; k < to_copy; k += 8) {
571 /* Record ports that became uncongested, ie
572 * bits that changed from 0 to 1. */
573 uncongested |= ~(*src) & *dst;
574 *dst++ = *src++;
575 }
576 kunmap_atomic(addr, KM_SOFTIRQ0);
577
578 copied += to_copy;
579
580 map_off += to_copy;
581 if (map_off == PAGE_SIZE) {
582 map_off = 0;
583 map_page++;
584 }
585
586 frag_off += to_copy;
587 if (frag_off == RDS_FRAG_SIZE) {
588 frag = list_entry(frag->f_item.next,
589 struct rds_page_frag, f_item);
590 frag_off = 0;
591 }
592 }
593
594 /* the congestion map is in little endian order */
595 uncongested = le64_to_cpu(uncongested);
596
597 rds_cong_map_updated(map, uncongested);
598}
599
600/*
601 * Rings are posted with all the allocations they'll need to queue the
602 * incoming message to the receiving socket so this can't fail.
603 * All fragments start with a header, so we can make sure we're not receiving
604 * garbage, and we can tell a small 8 byte fragment from an ACK frame.
605 */
606struct rds_iw_ack_state {
607 u64 ack_next;
608 u64 ack_recv;
609 unsigned int ack_required:1;
610 unsigned int ack_next_valid:1;
611 unsigned int ack_recv_valid:1;
612};
613
614static void rds_iw_process_recv(struct rds_connection *conn,
615 struct rds_iw_recv_work *recv, u32 byte_len,
616 struct rds_iw_ack_state *state)
617{
618 struct rds_iw_connection *ic = conn->c_transport_data;
619 struct rds_iw_incoming *iwinc = ic->i_iwinc;
620 struct rds_header *ihdr, *hdr;
621
622 /* XXX shut down the connection if port 0,0 are seen? */
623
624 rdsdebug("ic %p iwinc %p recv %p byte len %u\n", ic, iwinc, recv,
625 byte_len);
626
627 if (byte_len < sizeof(struct rds_header)) {
628 rds_iw_conn_error(conn, "incoming message "
629 "from %pI4 didn't inclue a "
630 "header, disconnecting and "
631 "reconnecting\n",
632 &conn->c_faddr);
633 return;
634 }
635 byte_len -= sizeof(struct rds_header);
636
637 ihdr = &ic->i_recv_hdrs[recv - ic->i_recvs];
638
639 /* Validate the checksum. */
640 if (!rds_message_verify_checksum(ihdr)) {
641 rds_iw_conn_error(conn, "incoming message "
642 "from %pI4 has corrupted header - "
643 "forcing a reconnect\n",
644 &conn->c_faddr);
645 rds_stats_inc(s_recv_drop_bad_checksum);
646 return;
647 }
648
649 /* Process the ACK sequence which comes with every packet */
650 state->ack_recv = be64_to_cpu(ihdr->h_ack);
651 state->ack_recv_valid = 1;
652
653 /* Process the credits update if there was one */
654 if (ihdr->h_credit)
655 rds_iw_send_add_credits(conn, ihdr->h_credit);
656
657 if (ihdr->h_sport == 0 && ihdr->h_dport == 0 && byte_len == 0) {
658 /* This is an ACK-only packet. The fact that it gets
659 * special treatment here is that historically, ACKs
660 * were rather special beasts.
661 */
662 rds_iw_stats_inc(s_iw_ack_received);
663
664 /*
665 * Usually the frags make their way on to incs and are then freed as
666 * the inc is freed. We don't go that route, so we have to drop the
667 * page ref ourselves. We can't just leave the page on the recv
668 * because that confuses the dma mapping of pages and each recv's use
669 * of a partial page. We can leave the frag, though, it will be
670 * reused.
671 *
672 * FIXME: Fold this into the code path below.
673 */
674 rds_iw_frag_drop_page(recv->r_frag);
675 return;
676 }
677
678 /*
679 * If we don't already have an inc on the connection then this
680 * fragment has a header and starts a message.. copy its header
681 * into the inc and save the inc so we can hang upcoming fragments
682 * off its list.
683 */
684 if (iwinc == NULL) {
685 iwinc = recv->r_iwinc;
686 recv->r_iwinc = NULL;
687 ic->i_iwinc = iwinc;
688
689 hdr = &iwinc->ii_inc.i_hdr;
690 memcpy(hdr, ihdr, sizeof(*hdr));
691 ic->i_recv_data_rem = be32_to_cpu(hdr->h_len);
692
693 rdsdebug("ic %p iwinc %p rem %u flag 0x%x\n", ic, iwinc,
694 ic->i_recv_data_rem, hdr->h_flags);
695 } else {
696 hdr = &iwinc->ii_inc.i_hdr;
697 /* We can't just use memcmp here; fragments of a
698 * single message may carry different ACKs */
699 if (hdr->h_sequence != ihdr->h_sequence
700 || hdr->h_len != ihdr->h_len
701 || hdr->h_sport != ihdr->h_sport
702 || hdr->h_dport != ihdr->h_dport) {
703 rds_iw_conn_error(conn,
704 "fragment header mismatch; forcing reconnect\n");
705 return;
706 }
707 }
708
709 list_add_tail(&recv->r_frag->f_item, &iwinc->ii_frags);
710 recv->r_frag = NULL;
711
712 if (ic->i_recv_data_rem > RDS_FRAG_SIZE)
713 ic->i_recv_data_rem -= RDS_FRAG_SIZE;
714 else {
715 ic->i_recv_data_rem = 0;
716 ic->i_iwinc = NULL;
717
718 if (iwinc->ii_inc.i_hdr.h_flags == RDS_FLAG_CONG_BITMAP)
719 rds_iw_cong_recv(conn, iwinc);
720 else {
721 rds_recv_incoming(conn, conn->c_faddr, conn->c_laddr,
722 &iwinc->ii_inc, GFP_ATOMIC,
723 KM_SOFTIRQ0);
724 state->ack_next = be64_to_cpu(hdr->h_sequence);
725 state->ack_next_valid = 1;
726 }
727
728 /* Evaluate the ACK_REQUIRED flag *after* we received
729 * the complete frame, and after bumping the next_rx
730 * sequence. */
731 if (hdr->h_flags & RDS_FLAG_ACK_REQUIRED) {
732 rds_stats_inc(s_recv_ack_required);
733 state->ack_required = 1;
734 }
735
736 rds_inc_put(&iwinc->ii_inc);
737 }
738}
739
740/*
741 * Plucking the oldest entry from the ring can be done concurrently with
742 * the thread refilling the ring. Each ring operation is protected by
743 * spinlocks and the transient state of refilling doesn't change the
744 * recording of which entry is oldest.
745 *
746 * This relies on IB only calling one cq comp_handler for each cq so that
747 * there will only be one caller of rds_recv_incoming() per RDS connection.
748 */
749void rds_iw_recv_cq_comp_handler(struct ib_cq *cq, void *context)
750{
751 struct rds_connection *conn = context;
752 struct rds_iw_connection *ic = conn->c_transport_data;
753 struct ib_wc wc;
754 struct rds_iw_ack_state state = { 0, };
755 struct rds_iw_recv_work *recv;
756
757 rdsdebug("conn %p cq %p\n", conn, cq);
758
759 rds_iw_stats_inc(s_iw_rx_cq_call);
760
761 ib_req_notify_cq(cq, IB_CQ_SOLICITED);
762
763 while (ib_poll_cq(cq, 1, &wc) > 0) {
764 rdsdebug("wc wr_id 0x%llx status %u byte_len %u imm_data %u\n",
765 (unsigned long long)wc.wr_id, wc.status, wc.byte_len,
766 be32_to_cpu(wc.ex.imm_data));
767 rds_iw_stats_inc(s_iw_rx_cq_event);
768
769 recv = &ic->i_recvs[rds_iw_ring_oldest(&ic->i_recv_ring)];
770
771 rds_iw_recv_unmap_page(ic, recv);
772
773 /*
774 * Also process recvs in connecting state because it is possible
775 * to get a recv completion _before_ the rdmacm ESTABLISHED
776 * event is processed.
777 */
778 if (rds_conn_up(conn) || rds_conn_connecting(conn)) {
779 /* We expect errors as the qp is drained during shutdown */
780 if (wc.status == IB_WC_SUCCESS) {
781 rds_iw_process_recv(conn, recv, wc.byte_len, &state);
782 } else {
783 rds_iw_conn_error(conn, "recv completion on "
784 "%pI4 had status %u, disconnecting and "
785 "reconnecting\n", &conn->c_faddr,
786 wc.status);
787 }
788 }
789
790 rds_iw_ring_free(&ic->i_recv_ring, 1);
791 }
792
793 if (state.ack_next_valid)
794 rds_iw_set_ack(ic, state.ack_next, state.ack_required);
795 if (state.ack_recv_valid && state.ack_recv > ic->i_ack_recv) {
796 rds_send_drop_acked(conn, state.ack_recv, NULL);
797 ic->i_ack_recv = state.ack_recv;
798 }
799 if (rds_conn_up(conn))
800 rds_iw_attempt_ack(ic);
801
802 /* If we ever end up with a really empty receive ring, we're
803 * in deep trouble, as the sender will definitely see RNR
804 * timeouts. */
805 if (rds_iw_ring_empty(&ic->i_recv_ring))
806 rds_iw_stats_inc(s_iw_rx_ring_empty);
807
808 /*
809 * If the ring is running low, then schedule the thread to refill.
810 */
811 if (rds_iw_ring_low(&ic->i_recv_ring))
812 queue_delayed_work(rds_wq, &conn->c_recv_w, 0);
813}
814
815int rds_iw_recv(struct rds_connection *conn)
816{
817 struct rds_iw_connection *ic = conn->c_transport_data;
818 int ret = 0;
819
820 rdsdebug("conn %p\n", conn);
821
822 /*
823 * If we get a temporary posting failure in this context then
824 * we're really low and we want the caller to back off for a bit.
825 */
826 mutex_lock(&ic->i_recv_mutex);
827 if (rds_iw_recv_refill(conn, GFP_KERNEL, GFP_HIGHUSER, 0))
828 ret = -ENOMEM;
829 else
830 rds_iw_stats_inc(s_iw_rx_refill_from_thread);
831 mutex_unlock(&ic->i_recv_mutex);
832
833 if (rds_conn_up(conn))
834 rds_iw_attempt_ack(ic);
835
836 return ret;
837}
838
839int __init rds_iw_recv_init(void)
840{
841 struct sysinfo si;
842 int ret = -ENOMEM;
843
844 /* Default to 30% of all available RAM for recv memory */
845 si_meminfo(&si);
846 rds_iw_sysctl_max_recv_allocation = si.totalram / 3 * PAGE_SIZE / RDS_FRAG_SIZE;
847
848 rds_iw_incoming_slab = kmem_cache_create("rds_iw_incoming",
849 sizeof(struct rds_iw_incoming),
850 0, 0, NULL);
851 if (rds_iw_incoming_slab == NULL)
852 goto out;
853
854 rds_iw_frag_slab = kmem_cache_create("rds_iw_frag",
855 sizeof(struct rds_page_frag),
856 0, 0, NULL);
857 if (rds_iw_frag_slab == NULL)
858 kmem_cache_destroy(rds_iw_incoming_slab);
859 else
860 ret = 0;
861out:
862 return ret;
863}
864
865void rds_iw_recv_exit(void)
866{
867 kmem_cache_destroy(rds_iw_incoming_slab);
868 kmem_cache_destroy(rds_iw_frag_slab);
869}
diff --git a/net/rds/iw_ring.c b/net/rds/iw_ring.c
new file mode 100644
index 000000000000..d422d4b5deef
--- /dev/null
+++ b/net/rds/iw_ring.c
@@ -0,0 +1,169 @@
1/*
2 * Copyright (c) 2006 Oracle. All rights reserved.
3 *
4 * This software is available to you under a choice of one of two
5 * licenses. You may choose to be licensed under the terms of the GNU
6 * General Public License (GPL) Version 2, available from the file
7 * COPYING in the main directory of this source tree, or the
8 * OpenIB.org BSD license below:
9 *
10 * Redistribution and use in source and binary forms, with or
11 * without modification, are permitted provided that the following
12 * conditions are met:
13 *
14 * - Redistributions of source code must retain the above
15 * copyright notice, this list of conditions and the following
16 * disclaimer.
17 *
18 * - Redistributions in binary form must reproduce the above
19 * copyright notice, this list of conditions and the following
20 * disclaimer in the documentation and/or other materials
21 * provided with the distribution.
22 *
23 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
24 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
25 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
26 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
27 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
28 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
29 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
30 * SOFTWARE.
31 *
32 */
33#include <linux/kernel.h>
34
35#include "rds.h"
36#include "iw.h"
37
38/*
39 * Locking for IB rings.
40 * We assume that allocation is always protected by a mutex
41 * in the caller (this is a valid assumption for the current
42 * implementation).
43 *
44 * Freeing always happens in an interrupt, and hence only
45 * races with allocations, but not with other free()s.
46 *
47 * The interaction between allocation and freeing is that
48 * the alloc code has to determine the number of free entries.
49 * To this end, we maintain two counters; an allocation counter
50 * and a free counter. Both are allowed to run freely, and wrap
51 * around.
52 * The number of used entries is always (alloc_ctr - free_ctr) % NR.
53 *
54 * The current implementation makes free_ctr atomic. When the
55 * caller finds an allocation fails, it should set an "alloc fail"
56 * bit and retry the allocation. The "alloc fail" bit essentially tells
57 * the CQ completion handlers to wake it up after freeing some
58 * more entries.
59 */
60
61/*
62 * This only happens on shutdown.
63 */
64DECLARE_WAIT_QUEUE_HEAD(rds_iw_ring_empty_wait);
65
66void rds_iw_ring_init(struct rds_iw_work_ring *ring, u32 nr)
67{
68 memset(ring, 0, sizeof(*ring));
69 ring->w_nr = nr;
70 rdsdebug("ring %p nr %u\n", ring, ring->w_nr);
71}
72
73static inline u32 __rds_iw_ring_used(struct rds_iw_work_ring *ring)
74{
75 u32 diff;
76
77 /* This assumes that atomic_t has at least as many bits as u32 */
78 diff = ring->w_alloc_ctr - (u32) atomic_read(&ring->w_free_ctr);
79 BUG_ON(diff > ring->w_nr);
80
81 return diff;
82}
83
84void rds_iw_ring_resize(struct rds_iw_work_ring *ring, u32 nr)
85{
86 /* We only ever get called from the connection setup code,
87 * prior to creating the QP. */
88 BUG_ON(__rds_iw_ring_used(ring));
89 ring->w_nr = nr;
90}
91
92static int __rds_iw_ring_empty(struct rds_iw_work_ring *ring)
93{
94 return __rds_iw_ring_used(ring) == 0;
95}
96
97u32 rds_iw_ring_alloc(struct rds_iw_work_ring *ring, u32 val, u32 *pos)
98{
99 u32 ret = 0, avail;
100
101 avail = ring->w_nr - __rds_iw_ring_used(ring);
102
103 rdsdebug("ring %p val %u next %u free %u\n", ring, val,
104 ring->w_alloc_ptr, avail);
105
106 if (val && avail) {
107 ret = min(val, avail);
108 *pos = ring->w_alloc_ptr;
109
110 ring->w_alloc_ptr = (ring->w_alloc_ptr + ret) % ring->w_nr;
111 ring->w_alloc_ctr += ret;
112 }
113
114 return ret;
115}
116
117void rds_iw_ring_free(struct rds_iw_work_ring *ring, u32 val)
118{
119 ring->w_free_ptr = (ring->w_free_ptr + val) % ring->w_nr;
120 atomic_add(val, &ring->w_free_ctr);
121
122 if (__rds_iw_ring_empty(ring) &&
123 waitqueue_active(&rds_iw_ring_empty_wait))
124 wake_up(&rds_iw_ring_empty_wait);
125}
126
127void rds_iw_ring_unalloc(struct rds_iw_work_ring *ring, u32 val)
128{
129 ring->w_alloc_ptr = (ring->w_alloc_ptr - val) % ring->w_nr;
130 ring->w_alloc_ctr -= val;
131}
132
133int rds_iw_ring_empty(struct rds_iw_work_ring *ring)
134{
135 return __rds_iw_ring_empty(ring);
136}
137
138int rds_iw_ring_low(struct rds_iw_work_ring *ring)
139{
140 return __rds_iw_ring_used(ring) <= (ring->w_nr >> 2);
141}
142
143
144/*
145 * returns the oldest alloced ring entry. This will be the next one
146 * freed. This can't be called if there are none allocated.
147 */
148u32 rds_iw_ring_oldest(struct rds_iw_work_ring *ring)
149{
150 return ring->w_free_ptr;
151}
152
153/*
154 * returns the number of completed work requests.
155 */
156
157u32 rds_iw_ring_completed(struct rds_iw_work_ring *ring, u32 wr_id, u32 oldest)
158{
159 u32 ret;
160
161 if (oldest <= (unsigned long long)wr_id)
162 ret = (unsigned long long)wr_id - oldest + 1;
163 else
164 ret = ring->w_nr - oldest + (unsigned long long)wr_id + 1;
165
166 rdsdebug("ring %p ret %u wr_id %u oldest %u\n", ring, ret,
167 wr_id, oldest);
168 return ret;
169}
diff --git a/net/rds/iw_send.c b/net/rds/iw_send.c
new file mode 100644
index 000000000000..22dd38ffd608
--- /dev/null
+++ b/net/rds/iw_send.c
@@ -0,0 +1,975 @@
1/*
2 * Copyright (c) 2006 Oracle. All rights reserved.
3 *
4 * This software is available to you under a choice of one of two
5 * licenses. You may choose to be licensed under the terms of the GNU
6 * General Public License (GPL) Version 2, available from the file
7 * COPYING in the main directory of this source tree, or the
8 * OpenIB.org BSD license below:
9 *
10 * Redistribution and use in source and binary forms, with or
11 * without modification, are permitted provided that the following
12 * conditions are met:
13 *
14 * - Redistributions of source code must retain the above
15 * copyright notice, this list of conditions and the following
16 * disclaimer.
17 *
18 * - Redistributions in binary form must reproduce the above
19 * copyright notice, this list of conditions and the following
20 * disclaimer in the documentation and/or other materials
21 * provided with the distribution.
22 *
23 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
24 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
25 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
26 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
27 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
28 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
29 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
30 * SOFTWARE.
31 *
32 */
33#include <linux/kernel.h>
34#include <linux/in.h>
35#include <linux/device.h>
36#include <linux/dmapool.h>
37
38#include "rds.h"
39#include "rdma.h"
40#include "iw.h"
41
42static void rds_iw_send_rdma_complete(struct rds_message *rm,
43 int wc_status)
44{
45 int notify_status;
46
47 switch (wc_status) {
48 case IB_WC_WR_FLUSH_ERR:
49 return;
50
51 case IB_WC_SUCCESS:
52 notify_status = RDS_RDMA_SUCCESS;
53 break;
54
55 case IB_WC_REM_ACCESS_ERR:
56 notify_status = RDS_RDMA_REMOTE_ERROR;
57 break;
58
59 default:
60 notify_status = RDS_RDMA_OTHER_ERROR;
61 break;
62 }
63 rds_rdma_send_complete(rm, notify_status);
64}
65
66static void rds_iw_send_unmap_rdma(struct rds_iw_connection *ic,
67 struct rds_rdma_op *op)
68{
69 if (op->r_mapped) {
70 ib_dma_unmap_sg(ic->i_cm_id->device,
71 op->r_sg, op->r_nents,
72 op->r_write ? DMA_TO_DEVICE : DMA_FROM_DEVICE);
73 op->r_mapped = 0;
74 }
75}
76
77static void rds_iw_send_unmap_rm(struct rds_iw_connection *ic,
78 struct rds_iw_send_work *send,
79 int wc_status)
80{
81 struct rds_message *rm = send->s_rm;
82
83 rdsdebug("ic %p send %p rm %p\n", ic, send, rm);
84
85 ib_dma_unmap_sg(ic->i_cm_id->device,
86 rm->m_sg, rm->m_nents,
87 DMA_TO_DEVICE);
88
89 if (rm->m_rdma_op != NULL) {
90 rds_iw_send_unmap_rdma(ic, rm->m_rdma_op);
91
92 /* If the user asked for a completion notification on this
93 * message, we can implement three different semantics:
94 * 1. Notify when we received the ACK on the RDS message
95 * that was queued with the RDMA. This provides reliable
96 * notification of RDMA status at the expense of a one-way
97 * packet delay.
98 * 2. Notify when the IB stack gives us the completion event for
99 * the RDMA operation.
100 * 3. Notify when the IB stack gives us the completion event for
101 * the accompanying RDS messages.
102 * Here, we implement approach #3. To implement approach #2,
103 * call rds_rdma_send_complete from the cq_handler. To implement #1,
104 * don't call rds_rdma_send_complete at all, and fall back to the notify
105 * handling in the ACK processing code.
106 *
107 * Note: There's no need to explicitly sync any RDMA buffers using
108 * ib_dma_sync_sg_for_cpu - the completion for the RDMA
109 * operation itself unmapped the RDMA buffers, which takes care
110 * of synching.
111 */
112 rds_iw_send_rdma_complete(rm, wc_status);
113
114 if (rm->m_rdma_op->r_write)
115 rds_stats_add(s_send_rdma_bytes, rm->m_rdma_op->r_bytes);
116 else
117 rds_stats_add(s_recv_rdma_bytes, rm->m_rdma_op->r_bytes);
118 }
119
120 /* If anyone waited for this message to get flushed out, wake
121 * them up now */
122 rds_message_unmapped(rm);
123
124 rds_message_put(rm);
125 send->s_rm = NULL;
126}
127
128void rds_iw_send_init_ring(struct rds_iw_connection *ic)
129{
130 struct rds_iw_send_work *send;
131 u32 i;
132
133 for (i = 0, send = ic->i_sends; i < ic->i_send_ring.w_nr; i++, send++) {
134 struct ib_sge *sge;
135
136 send->s_rm = NULL;
137 send->s_op = NULL;
138 send->s_mapping = NULL;
139
140 send->s_wr.next = NULL;
141 send->s_wr.wr_id = i;
142 send->s_wr.sg_list = send->s_sge;
143 send->s_wr.num_sge = 1;
144 send->s_wr.opcode = IB_WR_SEND;
145 send->s_wr.send_flags = 0;
146 send->s_wr.ex.imm_data = 0;
147
148 sge = rds_iw_data_sge(ic, send->s_sge);
149 sge->lkey = 0;
150
151 sge = rds_iw_header_sge(ic, send->s_sge);
152 sge->addr = ic->i_send_hdrs_dma + (i * sizeof(struct rds_header));
153 sge->length = sizeof(struct rds_header);
154 sge->lkey = 0;
155
156 send->s_mr = ib_alloc_fast_reg_mr(ic->i_pd, fastreg_message_size);
157 if (IS_ERR(send->s_mr)) {
158 printk(KERN_WARNING "RDS/IW: ib_alloc_fast_reg_mr failed\n");
159 break;
160 }
161
162 send->s_page_list = ib_alloc_fast_reg_page_list(
163 ic->i_cm_id->device, fastreg_message_size);
164 if (IS_ERR(send->s_page_list)) {
165 printk(KERN_WARNING "RDS/IW: ib_alloc_fast_reg_page_list failed\n");
166 break;
167 }
168 }
169}
170
171void rds_iw_send_clear_ring(struct rds_iw_connection *ic)
172{
173 struct rds_iw_send_work *send;
174 u32 i;
175
176 for (i = 0, send = ic->i_sends; i < ic->i_send_ring.w_nr; i++, send++) {
177 BUG_ON(!send->s_mr);
178 ib_dereg_mr(send->s_mr);
179 BUG_ON(!send->s_page_list);
180 ib_free_fast_reg_page_list(send->s_page_list);
181 if (send->s_wr.opcode == 0xdead)
182 continue;
183 if (send->s_rm)
184 rds_iw_send_unmap_rm(ic, send, IB_WC_WR_FLUSH_ERR);
185 if (send->s_op)
186 rds_iw_send_unmap_rdma(ic, send->s_op);
187 }
188}
189
190/*
191 * The _oldest/_free ring operations here race cleanly with the alloc/unalloc
192 * operations performed in the send path. As the sender allocs and potentially
193 * unallocs the next free entry in the ring it doesn't alter which is
194 * the next to be freed, which is what this is concerned with.
195 */
196void rds_iw_send_cq_comp_handler(struct ib_cq *cq, void *context)
197{
198 struct rds_connection *conn = context;
199 struct rds_iw_connection *ic = conn->c_transport_data;
200 struct ib_wc wc;
201 struct rds_iw_send_work *send;
202 u32 completed;
203 u32 oldest;
204 u32 i;
205 int ret;
206
207 rdsdebug("cq %p conn %p\n", cq, conn);
208 rds_iw_stats_inc(s_iw_tx_cq_call);
209 ret = ib_req_notify_cq(cq, IB_CQ_NEXT_COMP);
210 if (ret)
211 rdsdebug("ib_req_notify_cq send failed: %d\n", ret);
212
213 while (ib_poll_cq(cq, 1, &wc) > 0) {
214 rdsdebug("wc wr_id 0x%llx status %u byte_len %u imm_data %u\n",
215 (unsigned long long)wc.wr_id, wc.status, wc.byte_len,
216 be32_to_cpu(wc.ex.imm_data));
217 rds_iw_stats_inc(s_iw_tx_cq_event);
218
219 if (wc.status != IB_WC_SUCCESS) {
220 printk(KERN_ERR "WC Error: status = %d opcode = %d\n", wc.status, wc.opcode);
221 break;
222 }
223
224 if (wc.opcode == IB_WC_LOCAL_INV && wc.wr_id == RDS_IW_LOCAL_INV_WR_ID) {
225 ic->i_fastreg_posted = 0;
226 continue;
227 }
228
229 if (wc.opcode == IB_WC_FAST_REG_MR && wc.wr_id == RDS_IW_FAST_REG_WR_ID) {
230 ic->i_fastreg_posted = 1;
231 continue;
232 }
233
234 if (wc.wr_id == RDS_IW_ACK_WR_ID) {
235 if (ic->i_ack_queued + HZ/2 < jiffies)
236 rds_iw_stats_inc(s_iw_tx_stalled);
237 rds_iw_ack_send_complete(ic);
238 continue;
239 }
240
241 oldest = rds_iw_ring_oldest(&ic->i_send_ring);
242
243 completed = rds_iw_ring_completed(&ic->i_send_ring, wc.wr_id, oldest);
244
245 for (i = 0; i < completed; i++) {
246 send = &ic->i_sends[oldest];
247
248 /* In the error case, wc.opcode sometimes contains garbage */
249 switch (send->s_wr.opcode) {
250 case IB_WR_SEND:
251 if (send->s_rm)
252 rds_iw_send_unmap_rm(ic, send, wc.status);
253 break;
254 case IB_WR_FAST_REG_MR:
255 case IB_WR_RDMA_WRITE:
256 case IB_WR_RDMA_READ:
257 case IB_WR_RDMA_READ_WITH_INV:
258 /* Nothing to be done - the SG list will be unmapped
259 * when the SEND completes. */
260 break;
261 default:
262 if (printk_ratelimit())
263 printk(KERN_NOTICE
264 "RDS/IW: %s: unexpected opcode 0x%x in WR!\n",
265 __func__, send->s_wr.opcode);
266 break;
267 }
268
269 send->s_wr.opcode = 0xdead;
270 send->s_wr.num_sge = 1;
271 if (send->s_queued + HZ/2 < jiffies)
272 rds_iw_stats_inc(s_iw_tx_stalled);
273
274 /* If a RDMA operation produced an error, signal this right
275 * away. If we don't, the subsequent SEND that goes with this
276 * RDMA will be canceled with ERR_WFLUSH, and the application
277 * never learn that the RDMA failed. */
278 if (unlikely(wc.status == IB_WC_REM_ACCESS_ERR && send->s_op)) {
279 struct rds_message *rm;
280
281 rm = rds_send_get_message(conn, send->s_op);
282 if (rm)
283 rds_iw_send_rdma_complete(rm, wc.status);
284 }
285
286 oldest = (oldest + 1) % ic->i_send_ring.w_nr;
287 }
288
289 rds_iw_ring_free(&ic->i_send_ring, completed);
290
291 if (test_and_clear_bit(RDS_LL_SEND_FULL, &conn->c_flags)
292 || test_bit(0, &conn->c_map_queued))
293 queue_delayed_work(rds_wq, &conn->c_send_w, 0);
294
295 /* We expect errors as the qp is drained during shutdown */
296 if (wc.status != IB_WC_SUCCESS && rds_conn_up(conn)) {
297 rds_iw_conn_error(conn,
298 "send completion on %pI4 "
299 "had status %u, disconnecting and reconnecting\n",
300 &conn->c_faddr, wc.status);
301 }
302 }
303}
304
305/*
306 * This is the main function for allocating credits when sending
307 * messages.
308 *
309 * Conceptually, we have two counters:
310 * - send credits: this tells us how many WRs we're allowed
311 * to submit without overruning the reciever's queue. For
312 * each SEND WR we post, we decrement this by one.
313 *
314 * - posted credits: this tells us how many WRs we recently
315 * posted to the receive queue. This value is transferred
316 * to the peer as a "credit update" in a RDS header field.
317 * Every time we transmit credits to the peer, we subtract
318 * the amount of transferred credits from this counter.
319 *
320 * It is essential that we avoid situations where both sides have
321 * exhausted their send credits, and are unable to send new credits
322 * to the peer. We achieve this by requiring that we send at least
323 * one credit update to the peer before exhausting our credits.
324 * When new credits arrive, we subtract one credit that is withheld
325 * until we've posted new buffers and are ready to transmit these
326 * credits (see rds_iw_send_add_credits below).
327 *
328 * The RDS send code is essentially single-threaded; rds_send_xmit
329 * grabs c_send_lock to ensure exclusive access to the send ring.
330 * However, the ACK sending code is independent and can race with
331 * message SENDs.
332 *
333 * In the send path, we need to update the counters for send credits
334 * and the counter of posted buffers atomically - when we use the
335 * last available credit, we cannot allow another thread to race us
336 * and grab the posted credits counter. Hence, we have to use a
337 * spinlock to protect the credit counter, or use atomics.
338 *
339 * Spinlocks shared between the send and the receive path are bad,
340 * because they create unnecessary delays. An early implementation
341 * using a spinlock showed a 5% degradation in throughput at some
342 * loads.
343 *
344 * This implementation avoids spinlocks completely, putting both
345 * counters into a single atomic, and updating that atomic using
346 * atomic_add (in the receive path, when receiving fresh credits),
347 * and using atomic_cmpxchg when updating the two counters.
348 */
349int rds_iw_send_grab_credits(struct rds_iw_connection *ic,
350 u32 wanted, u32 *adv_credits, int need_posted)
351{
352 unsigned int avail, posted, got = 0, advertise;
353 long oldval, newval;
354
355 *adv_credits = 0;
356 if (!ic->i_flowctl)
357 return wanted;
358
359try_again:
360 advertise = 0;
361 oldval = newval = atomic_read(&ic->i_credits);
362 posted = IB_GET_POST_CREDITS(oldval);
363 avail = IB_GET_SEND_CREDITS(oldval);
364
365 rdsdebug("rds_iw_send_grab_credits(%u): credits=%u posted=%u\n",
366 wanted, avail, posted);
367
368 /* The last credit must be used to send a credit update. */
369 if (avail && !posted)
370 avail--;
371
372 if (avail < wanted) {
373 struct rds_connection *conn = ic->i_cm_id->context;
374
375 /* Oops, there aren't that many credits left! */
376 set_bit(RDS_LL_SEND_FULL, &conn->c_flags);
377 got = avail;
378 } else {
379 /* Sometimes you get what you want, lalala. */
380 got = wanted;
381 }
382 newval -= IB_SET_SEND_CREDITS(got);
383
384 /*
385 * If need_posted is non-zero, then the caller wants
386 * the posted regardless of whether any send credits are
387 * available.
388 */
389 if (posted && (got || need_posted)) {
390 advertise = min_t(unsigned int, posted, RDS_MAX_ADV_CREDIT);
391 newval -= IB_SET_POST_CREDITS(advertise);
392 }
393
394 /* Finally bill everything */
395 if (atomic_cmpxchg(&ic->i_credits, oldval, newval) != oldval)
396 goto try_again;
397
398 *adv_credits = advertise;
399 return got;
400}
401
402void rds_iw_send_add_credits(struct rds_connection *conn, unsigned int credits)
403{
404 struct rds_iw_connection *ic = conn->c_transport_data;
405
406 if (credits == 0)
407 return;
408
409 rdsdebug("rds_iw_send_add_credits(%u): current=%u%s\n",
410 credits,
411 IB_GET_SEND_CREDITS(atomic_read(&ic->i_credits)),
412 test_bit(RDS_LL_SEND_FULL, &conn->c_flags) ? ", ll_send_full" : "");
413
414 atomic_add(IB_SET_SEND_CREDITS(credits), &ic->i_credits);
415 if (test_and_clear_bit(RDS_LL_SEND_FULL, &conn->c_flags))
416 queue_delayed_work(rds_wq, &conn->c_send_w, 0);
417
418 WARN_ON(IB_GET_SEND_CREDITS(credits) >= 16384);
419
420 rds_iw_stats_inc(s_iw_rx_credit_updates);
421}
422
423void rds_iw_advertise_credits(struct rds_connection *conn, unsigned int posted)
424{
425 struct rds_iw_connection *ic = conn->c_transport_data;
426
427 if (posted == 0)
428 return;
429
430 atomic_add(IB_SET_POST_CREDITS(posted), &ic->i_credits);
431
432 /* Decide whether to send an update to the peer now.
433 * If we would send a credit update for every single buffer we
434 * post, we would end up with an ACK storm (ACK arrives,
435 * consumes buffer, we refill the ring, send ACK to remote
436 * advertising the newly posted buffer... ad inf)
437 *
438 * Performance pretty much depends on how often we send
439 * credit updates - too frequent updates mean lots of ACKs.
440 * Too infrequent updates, and the peer will run out of
441 * credits and has to throttle.
442 * For the time being, 16 seems to be a good compromise.
443 */
444 if (IB_GET_POST_CREDITS(atomic_read(&ic->i_credits)) >= 16)
445 set_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
446}
447
448static inline void
449rds_iw_xmit_populate_wr(struct rds_iw_connection *ic,
450 struct rds_iw_send_work *send, unsigned int pos,
451 unsigned long buffer, unsigned int length,
452 int send_flags)
453{
454 struct ib_sge *sge;
455
456 WARN_ON(pos != send - ic->i_sends);
457
458 send->s_wr.send_flags = send_flags;
459 send->s_wr.opcode = IB_WR_SEND;
460 send->s_wr.num_sge = 2;
461 send->s_wr.next = NULL;
462 send->s_queued = jiffies;
463 send->s_op = NULL;
464
465 if (length != 0) {
466 sge = rds_iw_data_sge(ic, send->s_sge);
467 sge->addr = buffer;
468 sge->length = length;
469 sge->lkey = rds_iw_local_dma_lkey(ic);
470
471 sge = rds_iw_header_sge(ic, send->s_sge);
472 } else {
473 /* We're sending a packet with no payload. There is only
474 * one SGE */
475 send->s_wr.num_sge = 1;
476 sge = &send->s_sge[0];
477 }
478
479 sge->addr = ic->i_send_hdrs_dma + (pos * sizeof(struct rds_header));
480 sge->length = sizeof(struct rds_header);
481 sge->lkey = rds_iw_local_dma_lkey(ic);
482}
483
484/*
485 * This can be called multiple times for a given message. The first time
486 * we see a message we map its scatterlist into the IB device so that
487 * we can provide that mapped address to the IB scatter gather entries
488 * in the IB work requests. We translate the scatterlist into a series
489 * of work requests that fragment the message. These work requests complete
490 * in order so we pass ownership of the message to the completion handler
491 * once we send the final fragment.
492 *
493 * The RDS core uses the c_send_lock to only enter this function once
494 * per connection. This makes sure that the tx ring alloc/unalloc pairs
495 * don't get out of sync and confuse the ring.
496 */
497int rds_iw_xmit(struct rds_connection *conn, struct rds_message *rm,
498 unsigned int hdr_off, unsigned int sg, unsigned int off)
499{
500 struct rds_iw_connection *ic = conn->c_transport_data;
501 struct ib_device *dev = ic->i_cm_id->device;
502 struct rds_iw_send_work *send = NULL;
503 struct rds_iw_send_work *first;
504 struct rds_iw_send_work *prev;
505 struct ib_send_wr *failed_wr;
506 struct scatterlist *scat;
507 u32 pos;
508 u32 i;
509 u32 work_alloc;
510 u32 credit_alloc;
511 u32 posted;
512 u32 adv_credits = 0;
513 int send_flags = 0;
514 int sent;
515 int ret;
516 int flow_controlled = 0;
517
518 BUG_ON(off % RDS_FRAG_SIZE);
519 BUG_ON(hdr_off != 0 && hdr_off != sizeof(struct rds_header));
520
521 /* Fastreg support */
522 if (rds_rdma_cookie_key(rm->m_rdma_cookie)
523 && !ic->i_fastreg_posted) {
524 ret = -EAGAIN;
525 goto out;
526 }
527
528 /* FIXME we may overallocate here */
529 if (be32_to_cpu(rm->m_inc.i_hdr.h_len) == 0)
530 i = 1;
531 else
532 i = ceil(be32_to_cpu(rm->m_inc.i_hdr.h_len), RDS_FRAG_SIZE);
533
534 work_alloc = rds_iw_ring_alloc(&ic->i_send_ring, i, &pos);
535 if (work_alloc == 0) {
536 set_bit(RDS_LL_SEND_FULL, &conn->c_flags);
537 rds_iw_stats_inc(s_iw_tx_ring_full);
538 ret = -ENOMEM;
539 goto out;
540 }
541
542 credit_alloc = work_alloc;
543 if (ic->i_flowctl) {
544 credit_alloc = rds_iw_send_grab_credits(ic, work_alloc, &posted, 0);
545 adv_credits += posted;
546 if (credit_alloc < work_alloc) {
547 rds_iw_ring_unalloc(&ic->i_send_ring, work_alloc - credit_alloc);
548 work_alloc = credit_alloc;
549 flow_controlled++;
550 }
551 if (work_alloc == 0) {
552 rds_iw_ring_unalloc(&ic->i_send_ring, work_alloc);
553 rds_iw_stats_inc(s_iw_tx_throttle);
554 ret = -ENOMEM;
555 goto out;
556 }
557 }
558
559 /* map the message the first time we see it */
560 if (ic->i_rm == NULL) {
561 /*
562 printk(KERN_NOTICE "rds_iw_xmit prep msg dport=%u flags=0x%x len=%d\n",
563 be16_to_cpu(rm->m_inc.i_hdr.h_dport),
564 rm->m_inc.i_hdr.h_flags,
565 be32_to_cpu(rm->m_inc.i_hdr.h_len));
566 */
567 if (rm->m_nents) {
568 rm->m_count = ib_dma_map_sg(dev,
569 rm->m_sg, rm->m_nents, DMA_TO_DEVICE);
570 rdsdebug("ic %p mapping rm %p: %d\n", ic, rm, rm->m_count);
571 if (rm->m_count == 0) {
572 rds_iw_stats_inc(s_iw_tx_sg_mapping_failure);
573 rds_iw_ring_unalloc(&ic->i_send_ring, work_alloc);
574 ret = -ENOMEM; /* XXX ? */
575 goto out;
576 }
577 } else {
578 rm->m_count = 0;
579 }
580
581 ic->i_unsignaled_wrs = rds_iw_sysctl_max_unsig_wrs;
582 ic->i_unsignaled_bytes = rds_iw_sysctl_max_unsig_bytes;
583 rds_message_addref(rm);
584 ic->i_rm = rm;
585
586 /* Finalize the header */
587 if (test_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags))
588 rm->m_inc.i_hdr.h_flags |= RDS_FLAG_ACK_REQUIRED;
589 if (test_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags))
590 rm->m_inc.i_hdr.h_flags |= RDS_FLAG_RETRANSMITTED;
591
592 /* If it has a RDMA op, tell the peer we did it. This is
593 * used by the peer to release use-once RDMA MRs. */
594 if (rm->m_rdma_op) {
595 struct rds_ext_header_rdma ext_hdr;
596
597 ext_hdr.h_rdma_rkey = cpu_to_be32(rm->m_rdma_op->r_key);
598 rds_message_add_extension(&rm->m_inc.i_hdr,
599 RDS_EXTHDR_RDMA, &ext_hdr, sizeof(ext_hdr));
600 }
601 if (rm->m_rdma_cookie) {
602 rds_message_add_rdma_dest_extension(&rm->m_inc.i_hdr,
603 rds_rdma_cookie_key(rm->m_rdma_cookie),
604 rds_rdma_cookie_offset(rm->m_rdma_cookie));
605 }
606
607 /* Note - rds_iw_piggyb_ack clears the ACK_REQUIRED bit, so
608 * we should not do this unless we have a chance of at least
609 * sticking the header into the send ring. Which is why we
610 * should call rds_iw_ring_alloc first. */
611 rm->m_inc.i_hdr.h_ack = cpu_to_be64(rds_iw_piggyb_ack(ic));
612 rds_message_make_checksum(&rm->m_inc.i_hdr);
613
614 /*
615 * Update adv_credits since we reset the ACK_REQUIRED bit.
616 */
617 rds_iw_send_grab_credits(ic, 0, &posted, 1);
618 adv_credits += posted;
619 BUG_ON(adv_credits > 255);
620 } else if (ic->i_rm != rm)
621 BUG();
622
623 send = &ic->i_sends[pos];
624 first = send;
625 prev = NULL;
626 scat = &rm->m_sg[sg];
627 sent = 0;
628 i = 0;
629
630 /* Sometimes you want to put a fence between an RDMA
631 * READ and the following SEND.
632 * We could either do this all the time
633 * or when requested by the user. Right now, we let
634 * the application choose.
635 */
636 if (rm->m_rdma_op && rm->m_rdma_op->r_fence)
637 send_flags = IB_SEND_FENCE;
638
639 /*
640 * We could be copying the header into the unused tail of the page.
641 * That would need to be changed in the future when those pages might
642 * be mapped userspace pages or page cache pages. So instead we always
643 * use a second sge and our long-lived ring of mapped headers. We send
644 * the header after the data so that the data payload can be aligned on
645 * the receiver.
646 */
647
648 /* handle a 0-len message */
649 if (be32_to_cpu(rm->m_inc.i_hdr.h_len) == 0) {
650 rds_iw_xmit_populate_wr(ic, send, pos, 0, 0, send_flags);
651 goto add_header;
652 }
653
654 /* if there's data reference it with a chain of work reqs */
655 for (; i < work_alloc && scat != &rm->m_sg[rm->m_count]; i++) {
656 unsigned int len;
657
658 send = &ic->i_sends[pos];
659
660 len = min(RDS_FRAG_SIZE, ib_sg_dma_len(dev, scat) - off);
661 rds_iw_xmit_populate_wr(ic, send, pos,
662 ib_sg_dma_address(dev, scat) + off, len,
663 send_flags);
664
665 /*
666 * We want to delay signaling completions just enough to get
667 * the batching benefits but not so much that we create dead time
668 * on the wire.
669 */
670 if (ic->i_unsignaled_wrs-- == 0) {
671 ic->i_unsignaled_wrs = rds_iw_sysctl_max_unsig_wrs;
672 send->s_wr.send_flags |= IB_SEND_SIGNALED | IB_SEND_SOLICITED;
673 }
674
675 ic->i_unsignaled_bytes -= len;
676 if (ic->i_unsignaled_bytes <= 0) {
677 ic->i_unsignaled_bytes = rds_iw_sysctl_max_unsig_bytes;
678 send->s_wr.send_flags |= IB_SEND_SIGNALED | IB_SEND_SOLICITED;
679 }
680
681 /*
682 * Always signal the last one if we're stopping due to flow control.
683 */
684 if (flow_controlled && i == (work_alloc-1))
685 send->s_wr.send_flags |= IB_SEND_SIGNALED | IB_SEND_SOLICITED;
686
687 rdsdebug("send %p wr %p num_sge %u next %p\n", send,
688 &send->s_wr, send->s_wr.num_sge, send->s_wr.next);
689
690 sent += len;
691 off += len;
692 if (off == ib_sg_dma_len(dev, scat)) {
693 scat++;
694 off = 0;
695 }
696
697add_header:
698 /* Tack on the header after the data. The header SGE should already
699 * have been set up to point to the right header buffer. */
700 memcpy(&ic->i_send_hdrs[pos], &rm->m_inc.i_hdr, sizeof(struct rds_header));
701
702 if (0) {
703 struct rds_header *hdr = &ic->i_send_hdrs[pos];
704
705 printk(KERN_NOTICE "send WR dport=%u flags=0x%x len=%d\n",
706 be16_to_cpu(hdr->h_dport),
707 hdr->h_flags,
708 be32_to_cpu(hdr->h_len));
709 }
710 if (adv_credits) {
711 struct rds_header *hdr = &ic->i_send_hdrs[pos];
712
713 /* add credit and redo the header checksum */
714 hdr->h_credit = adv_credits;
715 rds_message_make_checksum(hdr);
716 adv_credits = 0;
717 rds_iw_stats_inc(s_iw_tx_credit_updates);
718 }
719
720 if (prev)
721 prev->s_wr.next = &send->s_wr;
722 prev = send;
723
724 pos = (pos + 1) % ic->i_send_ring.w_nr;
725 }
726
727 /* Account the RDS header in the number of bytes we sent, but just once.
728 * The caller has no concept of fragmentation. */
729 if (hdr_off == 0)
730 sent += sizeof(struct rds_header);
731
732 /* if we finished the message then send completion owns it */
733 if (scat == &rm->m_sg[rm->m_count]) {
734 prev->s_rm = ic->i_rm;
735 prev->s_wr.send_flags |= IB_SEND_SIGNALED | IB_SEND_SOLICITED;
736 ic->i_rm = NULL;
737 }
738
739 if (i < work_alloc) {
740 rds_iw_ring_unalloc(&ic->i_send_ring, work_alloc - i);
741 work_alloc = i;
742 }
743 if (ic->i_flowctl && i < credit_alloc)
744 rds_iw_send_add_credits(conn, credit_alloc - i);
745
746 /* XXX need to worry about failed_wr and partial sends. */
747 failed_wr = &first->s_wr;
748 ret = ib_post_send(ic->i_cm_id->qp, &first->s_wr, &failed_wr);
749 rdsdebug("ic %p first %p (wr %p) ret %d wr %p\n", ic,
750 first, &first->s_wr, ret, failed_wr);
751 BUG_ON(failed_wr != &first->s_wr);
752 if (ret) {
753 printk(KERN_WARNING "RDS/IW: ib_post_send to %pI4 "
754 "returned %d\n", &conn->c_faddr, ret);
755 rds_iw_ring_unalloc(&ic->i_send_ring, work_alloc);
756 if (prev->s_rm) {
757 ic->i_rm = prev->s_rm;
758 prev->s_rm = NULL;
759 }
760 goto out;
761 }
762
763 ret = sent;
764out:
765 BUG_ON(adv_credits);
766 return ret;
767}
768
769static void rds_iw_build_send_fastreg(struct rds_iw_device *rds_iwdev, struct rds_iw_connection *ic, struct rds_iw_send_work *send, int nent, int len, u64 sg_addr)
770{
771 BUG_ON(nent > send->s_page_list->max_page_list_len);
772 /*
773 * Perform a WR for the fast_reg_mr. Each individual page
774 * in the sg list is added to the fast reg page list and placed
775 * inside the fast_reg_mr WR.
776 */
777 send->s_wr.opcode = IB_WR_FAST_REG_MR;
778 send->s_wr.wr.fast_reg.length = len;
779 send->s_wr.wr.fast_reg.rkey = send->s_mr->rkey;
780 send->s_wr.wr.fast_reg.page_list = send->s_page_list;
781 send->s_wr.wr.fast_reg.page_list_len = nent;
782 send->s_wr.wr.fast_reg.page_shift = rds_iwdev->page_shift;
783 send->s_wr.wr.fast_reg.access_flags = IB_ACCESS_REMOTE_WRITE;
784 send->s_wr.wr.fast_reg.iova_start = sg_addr;
785
786 ib_update_fast_reg_key(send->s_mr, send->s_remap_count++);
787}
788
789int rds_iw_xmit_rdma(struct rds_connection *conn, struct rds_rdma_op *op)
790{
791 struct rds_iw_connection *ic = conn->c_transport_data;
792 struct rds_iw_send_work *send = NULL;
793 struct rds_iw_send_work *first;
794 struct rds_iw_send_work *prev;
795 struct ib_send_wr *failed_wr;
796 struct rds_iw_device *rds_iwdev;
797 struct scatterlist *scat;
798 unsigned long len;
799 u64 remote_addr = op->r_remote_addr;
800 u32 pos, fr_pos;
801 u32 work_alloc;
802 u32 i;
803 u32 j;
804 int sent;
805 int ret;
806 int num_sge;
807
808 rds_iwdev = ib_get_client_data(ic->i_cm_id->device, &rds_iw_client);
809
810 /* map the message the first time we see it */
811 if (!op->r_mapped) {
812 op->r_count = ib_dma_map_sg(ic->i_cm_id->device,
813 op->r_sg, op->r_nents, (op->r_write) ?
814 DMA_TO_DEVICE : DMA_FROM_DEVICE);
815 rdsdebug("ic %p mapping op %p: %d\n", ic, op, op->r_count);
816 if (op->r_count == 0) {
817 rds_iw_stats_inc(s_iw_tx_sg_mapping_failure);
818 ret = -ENOMEM; /* XXX ? */
819 goto out;
820 }
821
822 op->r_mapped = 1;
823 }
824
825 if (!op->r_write) {
826 /* Alloc space on the send queue for the fastreg */
827 work_alloc = rds_iw_ring_alloc(&ic->i_send_ring, 1, &fr_pos);
828 if (work_alloc != 1) {
829 rds_iw_ring_unalloc(&ic->i_send_ring, work_alloc);
830 rds_iw_stats_inc(s_iw_tx_ring_full);
831 ret = -ENOMEM;
832 goto out;
833 }
834 }
835
836 /*
837 * Instead of knowing how to return a partial rdma read/write we insist that there
838 * be enough work requests to send the entire message.
839 */
840 i = ceil(op->r_count, rds_iwdev->max_sge);
841
842 work_alloc = rds_iw_ring_alloc(&ic->i_send_ring, i, &pos);
843 if (work_alloc != i) {
844 rds_iw_ring_unalloc(&ic->i_send_ring, work_alloc);
845 rds_iw_stats_inc(s_iw_tx_ring_full);
846 ret = -ENOMEM;
847 goto out;
848 }
849
850 send = &ic->i_sends[pos];
851 if (!op->r_write) {
852 first = prev = &ic->i_sends[fr_pos];
853 } else {
854 first = send;
855 prev = NULL;
856 }
857 scat = &op->r_sg[0];
858 sent = 0;
859 num_sge = op->r_count;
860
861 for (i = 0; i < work_alloc && scat != &op->r_sg[op->r_count]; i++) {
862 send->s_wr.send_flags = 0;
863 send->s_queued = jiffies;
864
865 /*
866 * We want to delay signaling completions just enough to get
867 * the batching benefits but not so much that we create dead time on the wire.
868 */
869 if (ic->i_unsignaled_wrs-- == 0) {
870 ic->i_unsignaled_wrs = rds_iw_sysctl_max_unsig_wrs;
871 send->s_wr.send_flags = IB_SEND_SIGNALED;
872 }
873
874 /* To avoid the need to have the plumbing to invalidate the fastreg_mr used
875 * for local access after RDS is finished with it, using
876 * IB_WR_RDMA_READ_WITH_INV will invalidate it after the read has completed.
877 */
878 if (op->r_write)
879 send->s_wr.opcode = IB_WR_RDMA_WRITE;
880 else
881 send->s_wr.opcode = IB_WR_RDMA_READ_WITH_INV;
882
883 send->s_wr.wr.rdma.remote_addr = remote_addr;
884 send->s_wr.wr.rdma.rkey = op->r_key;
885 send->s_op = op;
886
887 if (num_sge > rds_iwdev->max_sge) {
888 send->s_wr.num_sge = rds_iwdev->max_sge;
889 num_sge -= rds_iwdev->max_sge;
890 } else
891 send->s_wr.num_sge = num_sge;
892
893 send->s_wr.next = NULL;
894
895 if (prev)
896 prev->s_wr.next = &send->s_wr;
897
898 for (j = 0; j < send->s_wr.num_sge && scat != &op->r_sg[op->r_count]; j++) {
899 len = ib_sg_dma_len(ic->i_cm_id->device, scat);
900
901 if (send->s_wr.opcode == IB_WR_RDMA_READ_WITH_INV)
902 send->s_page_list->page_list[j] = ib_sg_dma_address(ic->i_cm_id->device, scat);
903 else {
904 send->s_sge[j].addr = ib_sg_dma_address(ic->i_cm_id->device, scat);
905 send->s_sge[j].length = len;
906 send->s_sge[j].lkey = rds_iw_local_dma_lkey(ic);
907 }
908
909 sent += len;
910 rdsdebug("ic %p sent %d remote_addr %llu\n", ic, sent, remote_addr);
911 remote_addr += len;
912
913 scat++;
914 }
915
916 if (send->s_wr.opcode == IB_WR_RDMA_READ_WITH_INV) {
917 send->s_wr.num_sge = 1;
918 send->s_sge[0].addr = conn->c_xmit_rm->m_rs->rs_user_addr;
919 send->s_sge[0].length = conn->c_xmit_rm->m_rs->rs_user_bytes;
920 send->s_sge[0].lkey = ic->i_sends[fr_pos].s_mr->lkey;
921 }
922
923 rdsdebug("send %p wr %p num_sge %u next %p\n", send,
924 &send->s_wr, send->s_wr.num_sge, send->s_wr.next);
925
926 prev = send;
927 if (++send == &ic->i_sends[ic->i_send_ring.w_nr])
928 send = ic->i_sends;
929 }
930
931 /* if we finished the message then send completion owns it */
932 if (scat == &op->r_sg[op->r_count])
933 first->s_wr.send_flags = IB_SEND_SIGNALED;
934
935 if (i < work_alloc) {
936 rds_iw_ring_unalloc(&ic->i_send_ring, work_alloc - i);
937 work_alloc = i;
938 }
939
940 /* On iWARP, local memory access by a remote system (ie, RDMA Read) is not
941 * recommended. Putting the lkey on the wire is a security hole, as it can
942 * allow for memory access to all of memory on the remote system. Some
943 * adapters do not allow using the lkey for this at all. To bypass this use a
944 * fastreg_mr (or possibly a dma_mr)
945 */
946 if (!op->r_write) {
947 rds_iw_build_send_fastreg(rds_iwdev, ic, &ic->i_sends[fr_pos],
948 op->r_count, sent, conn->c_xmit_rm->m_rs->rs_user_addr);
949 work_alloc++;
950 }
951
952 failed_wr = &first->s_wr;
953 ret = ib_post_send(ic->i_cm_id->qp, &first->s_wr, &failed_wr);
954 rdsdebug("ic %p first %p (wr %p) ret %d wr %p\n", ic,
955 first, &first->s_wr, ret, failed_wr);
956 BUG_ON(failed_wr != &first->s_wr);
957 if (ret) {
958 printk(KERN_WARNING "RDS/IW: rdma ib_post_send to %pI4 "
959 "returned %d\n", &conn->c_faddr, ret);
960 rds_iw_ring_unalloc(&ic->i_send_ring, work_alloc);
961 goto out;
962 }
963
964out:
965 return ret;
966}
967
968void rds_iw_xmit_complete(struct rds_connection *conn)
969{
970 struct rds_iw_connection *ic = conn->c_transport_data;
971
972 /* We may have a pending ACK or window update we were unable
973 * to send previously (due to flow control). Try again. */
974 rds_iw_attempt_ack(ic);
975}
diff --git a/net/rds/iw_stats.c b/net/rds/iw_stats.c
new file mode 100644
index 000000000000..ccc7e8f0bf0e
--- /dev/null
+++ b/net/rds/iw_stats.c
@@ -0,0 +1,95 @@
1/*
2 * Copyright (c) 2006 Oracle. All rights reserved.
3 *
4 * This software is available to you under a choice of one of two
5 * licenses. You may choose to be licensed under the terms of the GNU
6 * General Public License (GPL) Version 2, available from the file
7 * COPYING in the main directory of this source tree, or the
8 * OpenIB.org BSD license below:
9 *
10 * Redistribution and use in source and binary forms, with or
11 * without modification, are permitted provided that the following
12 * conditions are met:
13 *
14 * - Redistributions of source code must retain the above
15 * copyright notice, this list of conditions and the following
16 * disclaimer.
17 *
18 * - Redistributions in binary form must reproduce the above
19 * copyright notice, this list of conditions and the following
20 * disclaimer in the documentation and/or other materials
21 * provided with the distribution.
22 *
23 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
24 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
25 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
26 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
27 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
28 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
29 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
30 * SOFTWARE.
31 *
32 */
33#include <linux/percpu.h>
34#include <linux/seq_file.h>
35#include <linux/proc_fs.h>
36
37#include "rds.h"
38#include "iw.h"
39
40DEFINE_PER_CPU(struct rds_iw_statistics, rds_iw_stats) ____cacheline_aligned;
41
42static char *rds_iw_stat_names[] = {
43 "iw_connect_raced",
44 "iw_listen_closed_stale",
45 "iw_tx_cq_call",
46 "iw_tx_cq_event",
47 "iw_tx_ring_full",
48 "iw_tx_throttle",
49 "iw_tx_sg_mapping_failure",
50 "iw_tx_stalled",
51 "iw_tx_credit_updates",
52 "iw_rx_cq_call",
53 "iw_rx_cq_event",
54 "iw_rx_ring_empty",
55 "iw_rx_refill_from_cq",
56 "iw_rx_refill_from_thread",
57 "iw_rx_alloc_limit",
58 "iw_rx_credit_updates",
59 "iw_ack_sent",
60 "iw_ack_send_failure",
61 "iw_ack_send_delayed",
62 "iw_ack_send_piggybacked",
63 "iw_ack_received",
64 "iw_rdma_mr_alloc",
65 "iw_rdma_mr_free",
66 "iw_rdma_mr_used",
67 "iw_rdma_mr_pool_flush",
68 "iw_rdma_mr_pool_wait",
69 "iw_rdma_mr_pool_depleted",
70};
71
72unsigned int rds_iw_stats_info_copy(struct rds_info_iterator *iter,
73 unsigned int avail)
74{
75 struct rds_iw_statistics stats = {0, };
76 uint64_t *src;
77 uint64_t *sum;
78 size_t i;
79 int cpu;
80
81 if (avail < ARRAY_SIZE(rds_iw_stat_names))
82 goto out;
83
84 for_each_online_cpu(cpu) {
85 src = (uint64_t *)&(per_cpu(rds_iw_stats, cpu));
86 sum = (uint64_t *)&stats;
87 for (i = 0; i < sizeof(stats) / sizeof(uint64_t); i++)
88 *(sum++) += *(src++);
89 }
90
91 rds_stats_info_copy(iter, (uint64_t *)&stats, rds_iw_stat_names,
92 ARRAY_SIZE(rds_iw_stat_names));
93out:
94 return ARRAY_SIZE(rds_iw_stat_names);
95}
diff --git a/net/rds/iw_sysctl.c b/net/rds/iw_sysctl.c
new file mode 100644
index 000000000000..9590678cd616
--- /dev/null
+++ b/net/rds/iw_sysctl.c
@@ -0,0 +1,137 @@
1/*
2 * Copyright (c) 2006 Oracle. All rights reserved.
3 *
4 * This software is available to you under a choice of one of two
5 * licenses. You may choose to be licensed under the terms of the GNU
6 * General Public License (GPL) Version 2, available from the file
7 * COPYING in the main directory of this source tree, or the
8 * OpenIB.org BSD license below:
9 *
10 * Redistribution and use in source and binary forms, with or
11 * without modification, are permitted provided that the following
12 * conditions are met:
13 *
14 * - Redistributions of source code must retain the above
15 * copyright notice, this list of conditions and the following
16 * disclaimer.
17 *
18 * - Redistributions in binary form must reproduce the above
19 * copyright notice, this list of conditions and the following
20 * disclaimer in the documentation and/or other materials
21 * provided with the distribution.
22 *
23 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
24 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
25 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
26 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
27 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
28 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
29 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
30 * SOFTWARE.
31 *
32 */
33#include <linux/kernel.h>
34#include <linux/sysctl.h>
35#include <linux/proc_fs.h>
36
37#include "iw.h"
38
39static struct ctl_table_header *rds_iw_sysctl_hdr;
40
41unsigned long rds_iw_sysctl_max_send_wr = RDS_IW_DEFAULT_SEND_WR;
42unsigned long rds_iw_sysctl_max_recv_wr = RDS_IW_DEFAULT_RECV_WR;
43unsigned long rds_iw_sysctl_max_recv_allocation = (128 * 1024 * 1024) / RDS_FRAG_SIZE;
44static unsigned long rds_iw_sysctl_max_wr_min = 1;
45/* hardware will fail CQ creation long before this */
46static unsigned long rds_iw_sysctl_max_wr_max = (u32)~0;
47
48unsigned long rds_iw_sysctl_max_unsig_wrs = 16;
49static unsigned long rds_iw_sysctl_max_unsig_wr_min = 1;
50static unsigned long rds_iw_sysctl_max_unsig_wr_max = 64;
51
52unsigned long rds_iw_sysctl_max_unsig_bytes = (16 << 20);
53static unsigned long rds_iw_sysctl_max_unsig_bytes_min = 1;
54static unsigned long rds_iw_sysctl_max_unsig_bytes_max = ~0UL;
55
56unsigned int rds_iw_sysctl_flow_control = 1;
57
58ctl_table rds_iw_sysctl_table[] = {
59 {
60 .ctl_name = CTL_UNNUMBERED,
61 .procname = "max_send_wr",
62 .data = &rds_iw_sysctl_max_send_wr,
63 .maxlen = sizeof(unsigned long),
64 .mode = 0644,
65 .proc_handler = &proc_doulongvec_minmax,
66 .extra1 = &rds_iw_sysctl_max_wr_min,
67 .extra2 = &rds_iw_sysctl_max_wr_max,
68 },
69 {
70 .ctl_name = CTL_UNNUMBERED,
71 .procname = "max_recv_wr",
72 .data = &rds_iw_sysctl_max_recv_wr,
73 .maxlen = sizeof(unsigned long),
74 .mode = 0644,
75 .proc_handler = &proc_doulongvec_minmax,
76 .extra1 = &rds_iw_sysctl_max_wr_min,
77 .extra2 = &rds_iw_sysctl_max_wr_max,
78 },
79 {
80 .ctl_name = CTL_UNNUMBERED,
81 .procname = "max_unsignaled_wr",
82 .data = &rds_iw_sysctl_max_unsig_wrs,
83 .maxlen = sizeof(unsigned long),
84 .mode = 0644,
85 .proc_handler = &proc_doulongvec_minmax,
86 .extra1 = &rds_iw_sysctl_max_unsig_wr_min,
87 .extra2 = &rds_iw_sysctl_max_unsig_wr_max,
88 },
89 {
90 .ctl_name = CTL_UNNUMBERED,
91 .procname = "max_unsignaled_bytes",
92 .data = &rds_iw_sysctl_max_unsig_bytes,
93 .maxlen = sizeof(unsigned long),
94 .mode = 0644,
95 .proc_handler = &proc_doulongvec_minmax,
96 .extra1 = &rds_iw_sysctl_max_unsig_bytes_min,
97 .extra2 = &rds_iw_sysctl_max_unsig_bytes_max,
98 },
99 {
100 .ctl_name = CTL_UNNUMBERED,
101 .procname = "max_recv_allocation",
102 .data = &rds_iw_sysctl_max_recv_allocation,
103 .maxlen = sizeof(unsigned long),
104 .mode = 0644,
105 .proc_handler = &proc_doulongvec_minmax,
106 },
107 {
108 .ctl_name = CTL_UNNUMBERED,
109 .procname = "flow_control",
110 .data = &rds_iw_sysctl_flow_control,
111 .maxlen = sizeof(rds_iw_sysctl_flow_control),
112 .mode = 0644,
113 .proc_handler = &proc_dointvec,
114 },
115 { .ctl_name = 0}
116};
117
118static struct ctl_path rds_iw_sysctl_path[] = {
119 { .procname = "net", .ctl_name = CTL_NET, },
120 { .procname = "rds", .ctl_name = CTL_UNNUMBERED, },
121 { .procname = "iw", .ctl_name = CTL_UNNUMBERED, },
122 { }
123};
124
125void rds_iw_sysctl_exit(void)
126{
127 if (rds_iw_sysctl_hdr)
128 unregister_sysctl_table(rds_iw_sysctl_hdr);
129}
130
131int __init rds_iw_sysctl_init(void)
132{
133 rds_iw_sysctl_hdr = register_sysctl_paths(rds_iw_sysctl_path, rds_iw_sysctl_table);
134 if (rds_iw_sysctl_hdr == NULL)
135 return -ENOMEM;
136 return 0;
137}