aboutsummaryrefslogtreecommitdiffstats
path: root/net/sunrpc
diff options
context:
space:
mode:
authorJ. Bruce Fields <bfields@citi.umich.edu>2008-07-03 16:24:06 -0400
committerJ. Bruce Fields <bfields@citi.umich.edu>2008-07-03 16:24:06 -0400
commite86322f611eef95aafaf726fd3965e5b211f1985 (patch)
tree28547e26df4fc6ae671dc8cc6912a53717e4db08 /net/sunrpc
parentb001a1b6aa960949a24c2cdc28257dfcc9428d74 (diff)
parent8948896c9e098c6fd31a6a698a598a7cbd7fa40e (diff)
Merge branch 'for-bfields' of git://linux-nfs.org/~tomtucker/xprt-switch-2.6 into for-2.6.27
Diffstat (limited to 'net/sunrpc')
-rw-r--r--net/sunrpc/auth_generic.c8
-rw-r--r--net/sunrpc/svc_xprt.c23
-rw-r--r--net/sunrpc/svcauth_unix.c4
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma.c35
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_recvfrom.c186
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_sendto.c177
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_transport.c419
7 files changed, 414 insertions, 438 deletions
diff --git a/net/sunrpc/auth_generic.c b/net/sunrpc/auth_generic.c
index d927d9f57412..744b79fdcb19 100644
--- a/net/sunrpc/auth_generic.c
+++ b/net/sunrpc/auth_generic.c
@@ -17,8 +17,8 @@
17# define RPCDBG_FACILITY RPCDBG_AUTH 17# define RPCDBG_FACILITY RPCDBG_AUTH
18#endif 18#endif
19 19
20#define RPC_ANONYMOUS_USERID ((uid_t)-2) 20#define RPC_MACHINE_CRED_USERID ((uid_t)0)
21#define RPC_ANONYMOUS_GROUPID ((gid_t)-2) 21#define RPC_MACHINE_CRED_GROUPID ((gid_t)0)
22 22
23struct generic_cred { 23struct generic_cred {
24 struct rpc_cred gc_base; 24 struct rpc_cred gc_base;
@@ -44,8 +44,8 @@ EXPORT_SYMBOL_GPL(rpc_lookup_cred);
44struct rpc_cred *rpc_lookup_machine_cred(void) 44struct rpc_cred *rpc_lookup_machine_cred(void)
45{ 45{
46 struct auth_cred acred = { 46 struct auth_cred acred = {
47 .uid = RPC_ANONYMOUS_USERID, 47 .uid = RPC_MACHINE_CRED_USERID,
48 .gid = RPC_ANONYMOUS_GROUPID, 48 .gid = RPC_MACHINE_CRED_GROUPID,
49 .machine_cred = 1, 49 .machine_cred = 1,
50 }; 50 };
51 51
diff --git a/net/sunrpc/svc_xprt.c b/net/sunrpc/svc_xprt.c
index d8e8d79a8451..e46c825f4954 100644
--- a/net/sunrpc/svc_xprt.c
+++ b/net/sunrpc/svc_xprt.c
@@ -6,30 +6,9 @@
6 6
7#include <linux/sched.h> 7#include <linux/sched.h>
8#include <linux/errno.h> 8#include <linux/errno.h>
9#include <linux/fcntl.h>
10#include <linux/net.h>
11#include <linux/in.h>
12#include <linux/inet.h>
13#include <linux/udp.h>
14#include <linux/tcp.h>
15#include <linux/unistd.h>
16#include <linux/slab.h>
17#include <linux/netdevice.h>
18#include <linux/skbuff.h>
19#include <linux/file.h>
20#include <linux/freezer.h> 9#include <linux/freezer.h>
21#include <linux/kthread.h> 10#include <linux/kthread.h>
22#include <net/sock.h> 11#include <net/sock.h>
23#include <net/checksum.h>
24#include <net/ip.h>
25#include <net/ipv6.h>
26#include <net/tcp_states.h>
27#include <linux/uaccess.h>
28#include <asm/ioctls.h>
29
30#include <linux/sunrpc/types.h>
31#include <linux/sunrpc/clnt.h>
32#include <linux/sunrpc/xdr.h>
33#include <linux/sunrpc/stats.h> 12#include <linux/sunrpc/stats.h>
34#include <linux/sunrpc/svc_xprt.h> 13#include <linux/sunrpc/svc_xprt.h>
35 14
@@ -296,8 +275,6 @@ void svc_xprt_enqueue(struct svc_xprt *xprt)
296 if (!(xprt->xpt_flags & 275 if (!(xprt->xpt_flags &
297 ((1<<XPT_CONN)|(1<<XPT_DATA)|(1<<XPT_CLOSE)|(1<<XPT_DEFERRED)))) 276 ((1<<XPT_CONN)|(1<<XPT_DATA)|(1<<XPT_CLOSE)|(1<<XPT_DEFERRED))))
298 return; 277 return;
299 if (test_bit(XPT_DEAD, &xprt->xpt_flags))
300 return;
301 278
302 cpu = get_cpu(); 279 cpu = get_cpu();
303 pool = svc_pool_for_cpu(xprt->xpt_server, cpu); 280 pool = svc_pool_for_cpu(xprt->xpt_server, cpu);
diff --git a/net/sunrpc/svcauth_unix.c b/net/sunrpc/svcauth_unix.c
index 3f30ee6006ae..f24800f2c098 100644
--- a/net/sunrpc/svcauth_unix.c
+++ b/net/sunrpc/svcauth_unix.c
@@ -278,7 +278,7 @@ static int ip_map_show(struct seq_file *m,
278 dom = im->m_client->h.name; 278 dom = im->m_client->h.name;
279 279
280 if (ipv6_addr_v4mapped(&addr)) { 280 if (ipv6_addr_v4mapped(&addr)) {
281 seq_printf(m, "%s" NIPQUAD_FMT "%s\n", 281 seq_printf(m, "%s " NIPQUAD_FMT " %s\n",
282 im->m_class, 282 im->m_class,
283 ntohl(addr.s6_addr32[3]) >> 24 & 0xff, 283 ntohl(addr.s6_addr32[3]) >> 24 & 0xff,
284 ntohl(addr.s6_addr32[3]) >> 16 & 0xff, 284 ntohl(addr.s6_addr32[3]) >> 16 & 0xff,
@@ -286,7 +286,7 @@ static int ip_map_show(struct seq_file *m,
286 ntohl(addr.s6_addr32[3]) >> 0 & 0xff, 286 ntohl(addr.s6_addr32[3]) >> 0 & 0xff,
287 dom); 287 dom);
288 } else { 288 } else {
289 seq_printf(m, "%s" NIP6_FMT "%s\n", 289 seq_printf(m, "%s " NIP6_FMT " %s\n",
290 im->m_class, NIP6(addr), dom); 290 im->m_class, NIP6(addr), dom);
291 } 291 }
292 return 0; 292 return 0;
diff --git a/net/sunrpc/xprtrdma/svc_rdma.c b/net/sunrpc/xprtrdma/svc_rdma.c
index 88c0ca20bb1e..87101177825b 100644
--- a/net/sunrpc/xprtrdma/svc_rdma.c
+++ b/net/sunrpc/xprtrdma/svc_rdma.c
@@ -69,6 +69,10 @@ atomic_t rdma_stat_rq_prod;
69atomic_t rdma_stat_sq_poll; 69atomic_t rdma_stat_sq_poll;
70atomic_t rdma_stat_sq_prod; 70atomic_t rdma_stat_sq_prod;
71 71
72/* Temporary NFS request map and context caches */
73struct kmem_cache *svc_rdma_map_cachep;
74struct kmem_cache *svc_rdma_ctxt_cachep;
75
72/* 76/*
73 * This function implements reading and resetting an atomic_t stat 77 * This function implements reading and resetting an atomic_t stat
74 * variable through read/write to a proc file. Any write to the file 78 * variable through read/write to a proc file. Any write to the file
@@ -236,11 +240,14 @@ static ctl_table svcrdma_root_table[] = {
236void svc_rdma_cleanup(void) 240void svc_rdma_cleanup(void)
237{ 241{
238 dprintk("SVCRDMA Module Removed, deregister RPC RDMA transport\n"); 242 dprintk("SVCRDMA Module Removed, deregister RPC RDMA transport\n");
243 flush_scheduled_work();
239 if (svcrdma_table_header) { 244 if (svcrdma_table_header) {
240 unregister_sysctl_table(svcrdma_table_header); 245 unregister_sysctl_table(svcrdma_table_header);
241 svcrdma_table_header = NULL; 246 svcrdma_table_header = NULL;
242 } 247 }
243 svc_unreg_xprt_class(&svc_rdma_class); 248 svc_unreg_xprt_class(&svc_rdma_class);
249 kmem_cache_destroy(svc_rdma_map_cachep);
250 kmem_cache_destroy(svc_rdma_ctxt_cachep);
244} 251}
245 252
246int svc_rdma_init(void) 253int svc_rdma_init(void)
@@ -255,9 +262,37 @@ int svc_rdma_init(void)
255 svcrdma_table_header = 262 svcrdma_table_header =
256 register_sysctl_table(svcrdma_root_table); 263 register_sysctl_table(svcrdma_root_table);
257 264
265 /* Create the temporary map cache */
266 svc_rdma_map_cachep = kmem_cache_create("svc_rdma_map_cache",
267 sizeof(struct svc_rdma_req_map),
268 0,
269 SLAB_HWCACHE_ALIGN,
270 NULL);
271 if (!svc_rdma_map_cachep) {
272 printk(KERN_INFO "Could not allocate map cache.\n");
273 goto err0;
274 }
275
276 /* Create the temporary context cache */
277 svc_rdma_ctxt_cachep =
278 kmem_cache_create("svc_rdma_ctxt_cache",
279 sizeof(struct svc_rdma_op_ctxt),
280 0,
281 SLAB_HWCACHE_ALIGN,
282 NULL);
283 if (!svc_rdma_ctxt_cachep) {
284 printk(KERN_INFO "Could not allocate WR ctxt cache.\n");
285 goto err1;
286 }
287
258 /* Register RDMA with the SVC transport switch */ 288 /* Register RDMA with the SVC transport switch */
259 svc_reg_xprt_class(&svc_rdma_class); 289 svc_reg_xprt_class(&svc_rdma_class);
260 return 0; 290 return 0;
291 err1:
292 kmem_cache_destroy(svc_rdma_map_cachep);
293 err0:
294 unregister_sysctl_table(svcrdma_table_header);
295 return -ENOMEM;
261} 296}
262MODULE_AUTHOR("Tom Tucker <tom@opengridcomputing.com>"); 297MODULE_AUTHOR("Tom Tucker <tom@opengridcomputing.com>");
263MODULE_DESCRIPTION("SVC RDMA Transport"); 298MODULE_DESCRIPTION("SVC RDMA Transport");
diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
index c22d6b6f2db4..b4b17f44cb29 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
@@ -112,11 +112,6 @@ static void rdma_build_arg_xdr(struct svc_rqst *rqstp,
112 rqstp->rq_arg.tail[0].iov_len = 0; 112 rqstp->rq_arg.tail[0].iov_len = 0;
113} 113}
114 114
115struct chunk_sge {
116 int start; /* sge no for this chunk */
117 int count; /* sge count for this chunk */
118};
119
120/* Encode a read-chunk-list as an array of IB SGE 115/* Encode a read-chunk-list as an array of IB SGE
121 * 116 *
122 * Assumptions: 117 * Assumptions:
@@ -134,8 +129,8 @@ static int rdma_rcl_to_sge(struct svcxprt_rdma *xprt,
134 struct svc_rqst *rqstp, 129 struct svc_rqst *rqstp,
135 struct svc_rdma_op_ctxt *head, 130 struct svc_rdma_op_ctxt *head,
136 struct rpcrdma_msg *rmsgp, 131 struct rpcrdma_msg *rmsgp,
137 struct ib_sge *sge, 132 struct svc_rdma_req_map *rpl_map,
138 struct chunk_sge *ch_sge_ary, 133 struct svc_rdma_req_map *chl_map,
139 int ch_count, 134 int ch_count,
140 int byte_count) 135 int byte_count)
141{ 136{
@@ -156,22 +151,18 @@ static int rdma_rcl_to_sge(struct svcxprt_rdma *xprt,
156 head->arg.head[0] = rqstp->rq_arg.head[0]; 151 head->arg.head[0] = rqstp->rq_arg.head[0];
157 head->arg.tail[0] = rqstp->rq_arg.tail[0]; 152 head->arg.tail[0] = rqstp->rq_arg.tail[0];
158 head->arg.pages = &head->pages[head->count]; 153 head->arg.pages = &head->pages[head->count];
159 head->sge[0].length = head->count; /* save count of hdr pages */ 154 head->hdr_count = head->count; /* save count of hdr pages */
160 head->arg.page_base = 0; 155 head->arg.page_base = 0;
161 head->arg.page_len = ch_bytes; 156 head->arg.page_len = ch_bytes;
162 head->arg.len = rqstp->rq_arg.len + ch_bytes; 157 head->arg.len = rqstp->rq_arg.len + ch_bytes;
163 head->arg.buflen = rqstp->rq_arg.buflen + ch_bytes; 158 head->arg.buflen = rqstp->rq_arg.buflen + ch_bytes;
164 head->count++; 159 head->count++;
165 ch_sge_ary[0].start = 0; 160 chl_map->ch[0].start = 0;
166 while (byte_count) { 161 while (byte_count) {
162 rpl_map->sge[sge_no].iov_base =
163 page_address(rqstp->rq_arg.pages[page_no]) + page_off;
167 sge_bytes = min_t(int, PAGE_SIZE-page_off, ch_bytes); 164 sge_bytes = min_t(int, PAGE_SIZE-page_off, ch_bytes);
168 sge[sge_no].addr = 165 rpl_map->sge[sge_no].iov_len = sge_bytes;
169 ib_dma_map_page(xprt->sc_cm_id->device,
170 rqstp->rq_arg.pages[page_no],
171 page_off, sge_bytes,
172 DMA_FROM_DEVICE);
173 sge[sge_no].length = sge_bytes;
174 sge[sge_no].lkey = xprt->sc_phys_mr->lkey;
175 /* 166 /*
176 * Don't bump head->count here because the same page 167 * Don't bump head->count here because the same page
177 * may be used by multiple SGE. 168 * may be used by multiple SGE.
@@ -187,11 +178,11 @@ static int rdma_rcl_to_sge(struct svcxprt_rdma *xprt,
187 * SGE, move to the next SGE 178 * SGE, move to the next SGE
188 */ 179 */
189 if (ch_bytes == 0) { 180 if (ch_bytes == 0) {
190 ch_sge_ary[ch_no].count = 181 chl_map->ch[ch_no].count =
191 sge_no - ch_sge_ary[ch_no].start; 182 sge_no - chl_map->ch[ch_no].start;
192 ch_no++; 183 ch_no++;
193 ch++; 184 ch++;
194 ch_sge_ary[ch_no].start = sge_no; 185 chl_map->ch[ch_no].start = sge_no;
195 ch_bytes = ch->rc_target.rs_length; 186 ch_bytes = ch->rc_target.rs_length;
196 /* If bytes remaining account for next chunk */ 187 /* If bytes remaining account for next chunk */
197 if (byte_count) { 188 if (byte_count) {
@@ -220,18 +211,25 @@ static int rdma_rcl_to_sge(struct svcxprt_rdma *xprt,
220 return sge_no; 211 return sge_no;
221} 212}
222 213
223static void rdma_set_ctxt_sge(struct svc_rdma_op_ctxt *ctxt, 214static void rdma_set_ctxt_sge(struct svcxprt_rdma *xprt,
224 struct ib_sge *sge, 215 struct svc_rdma_op_ctxt *ctxt,
216 struct kvec *vec,
225 u64 *sgl_offset, 217 u64 *sgl_offset,
226 int count) 218 int count)
227{ 219{
228 int i; 220 int i;
229 221
230 ctxt->count = count; 222 ctxt->count = count;
223 ctxt->direction = DMA_FROM_DEVICE;
231 for (i = 0; i < count; i++) { 224 for (i = 0; i < count; i++) {
232 ctxt->sge[i].addr = sge[i].addr; 225 atomic_inc(&xprt->sc_dma_used);
233 ctxt->sge[i].length = sge[i].length; 226 ctxt->sge[i].addr =
234 *sgl_offset = *sgl_offset + sge[i].length; 227 ib_dma_map_single(xprt->sc_cm_id->device,
228 vec[i].iov_base, vec[i].iov_len,
229 DMA_FROM_DEVICE);
230 ctxt->sge[i].length = vec[i].iov_len;
231 ctxt->sge[i].lkey = xprt->sc_phys_mr->lkey;
232 *sgl_offset = *sgl_offset + vec[i].iov_len;
235 } 233 }
236} 234}
237 235
@@ -260,11 +258,16 @@ static int rdma_read_max_sge(struct svcxprt_rdma *xprt, int sge_count)
260 * On our side, we need to read into a pagelist. The first page immediately 258 * On our side, we need to read into a pagelist. The first page immediately
261 * follows the RPC header. 259 * follows the RPC header.
262 * 260 *
263 * This function returns 1 to indicate success. The data is not yet in 261 * This function returns:
262 * 0 - No error and no read-list found.
263 *
264 * 1 - Successful read-list processing. The data is not yet in
264 * the pagelist and therefore the RPC request must be deferred. The 265 * the pagelist and therefore the RPC request must be deferred. The
265 * I/O completion will enqueue the transport again and 266 * I/O completion will enqueue the transport again and
266 * svc_rdma_recvfrom will complete the request. 267 * svc_rdma_recvfrom will complete the request.
267 * 268 *
269 * <0 - Error processing/posting read-list.
270 *
268 * NOTE: The ctxt must not be touched after the last WR has been posted 271 * NOTE: The ctxt must not be touched after the last WR has been posted
269 * because the I/O completion processing may occur on another 272 * because the I/O completion processing may occur on another
270 * processor and free / modify the context. Ne touche pas! 273 * processor and free / modify the context. Ne touche pas!
@@ -277,50 +280,38 @@ static int rdma_read_xdr(struct svcxprt_rdma *xprt,
277 struct ib_send_wr read_wr; 280 struct ib_send_wr read_wr;
278 int err = 0; 281 int err = 0;
279 int ch_no; 282 int ch_no;
280 struct ib_sge *sge;
281 int ch_count; 283 int ch_count;
282 int byte_count; 284 int byte_count;
283 int sge_count; 285 int sge_count;
284 u64 sgl_offset; 286 u64 sgl_offset;
285 struct rpcrdma_read_chunk *ch; 287 struct rpcrdma_read_chunk *ch;
286 struct svc_rdma_op_ctxt *ctxt = NULL; 288 struct svc_rdma_op_ctxt *ctxt = NULL;
287 struct svc_rdma_op_ctxt *head; 289 struct svc_rdma_req_map *rpl_map;
288 struct svc_rdma_op_ctxt *tmp_sge_ctxt; 290 struct svc_rdma_req_map *chl_map;
289 struct svc_rdma_op_ctxt *tmp_ch_ctxt;
290 struct chunk_sge *ch_sge_ary;
291 291
292 /* If no read list is present, return 0 */ 292 /* If no read list is present, return 0 */
293 ch = svc_rdma_get_read_chunk(rmsgp); 293 ch = svc_rdma_get_read_chunk(rmsgp);
294 if (!ch) 294 if (!ch)
295 return 0; 295 return 0;
296 296
297 /* Allocate temporary contexts to keep SGE */ 297 /* Allocate temporary reply and chunk maps */
298 BUG_ON(sizeof(struct ib_sge) < sizeof(struct chunk_sge)); 298 rpl_map = svc_rdma_get_req_map();
299 tmp_sge_ctxt = svc_rdma_get_context(xprt); 299 chl_map = svc_rdma_get_req_map();
300 sge = tmp_sge_ctxt->sge;
301 tmp_ch_ctxt = svc_rdma_get_context(xprt);
302 ch_sge_ary = (struct chunk_sge *)tmp_ch_ctxt->sge;
303 300
304 svc_rdma_rcl_chunk_counts(ch, &ch_count, &byte_count); 301 svc_rdma_rcl_chunk_counts(ch, &ch_count, &byte_count);
302 if (ch_count > RPCSVC_MAXPAGES)
303 return -EINVAL;
305 sge_count = rdma_rcl_to_sge(xprt, rqstp, hdr_ctxt, rmsgp, 304 sge_count = rdma_rcl_to_sge(xprt, rqstp, hdr_ctxt, rmsgp,
306 sge, ch_sge_ary, 305 rpl_map, chl_map,
307 ch_count, byte_count); 306 ch_count, byte_count);
308 head = svc_rdma_get_context(xprt);
309 sgl_offset = 0; 307 sgl_offset = 0;
310 ch_no = 0; 308 ch_no = 0;
311 309
312 for (ch = (struct rpcrdma_read_chunk *)&rmsgp->rm_body.rm_chunks[0]; 310 for (ch = (struct rpcrdma_read_chunk *)&rmsgp->rm_body.rm_chunks[0];
313 ch->rc_discrim != 0; ch++, ch_no++) { 311 ch->rc_discrim != 0; ch++, ch_no++) {
314next_sge: 312next_sge:
315 if (!ctxt) 313 ctxt = svc_rdma_get_context(xprt);
316 ctxt = head;
317 else {
318 ctxt->next = svc_rdma_get_context(xprt);
319 ctxt = ctxt->next;
320 }
321 ctxt->next = NULL;
322 ctxt->direction = DMA_FROM_DEVICE; 314 ctxt->direction = DMA_FROM_DEVICE;
323 clear_bit(RDMACTXT_F_READ_DONE, &ctxt->flags);
324 clear_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags); 315 clear_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
325 316
326 /* Prepare READ WR */ 317 /* Prepare READ WR */
@@ -333,50 +324,46 @@ next_sge:
333 read_wr.wr.rdma.remote_addr = 324 read_wr.wr.rdma.remote_addr =
334 get_unaligned(&(ch->rc_target.rs_offset)) + 325 get_unaligned(&(ch->rc_target.rs_offset)) +
335 sgl_offset; 326 sgl_offset;
336 read_wr.sg_list = &sge[ch_sge_ary[ch_no].start]; 327 read_wr.sg_list = ctxt->sge;
337 read_wr.num_sge = 328 read_wr.num_sge =
338 rdma_read_max_sge(xprt, ch_sge_ary[ch_no].count); 329 rdma_read_max_sge(xprt, chl_map->ch[ch_no].count);
339 rdma_set_ctxt_sge(ctxt, &sge[ch_sge_ary[ch_no].start], 330 rdma_set_ctxt_sge(xprt, ctxt,
331 &rpl_map->sge[chl_map->ch[ch_no].start],
340 &sgl_offset, 332 &sgl_offset,
341 read_wr.num_sge); 333 read_wr.num_sge);
342 if (((ch+1)->rc_discrim == 0) && 334 if (((ch+1)->rc_discrim == 0) &&
343 (read_wr.num_sge == ch_sge_ary[ch_no].count)) { 335 (read_wr.num_sge == chl_map->ch[ch_no].count)) {
344 /* 336 /*
345 * Mark the last RDMA_READ with a bit to 337 * Mark the last RDMA_READ with a bit to
346 * indicate all RPC data has been fetched from 338 * indicate all RPC data has been fetched from
347 * the client and the RPC needs to be enqueued. 339 * the client and the RPC needs to be enqueued.
348 */ 340 */
349 set_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags); 341 set_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
350 ctxt->next = hdr_ctxt; 342 ctxt->read_hdr = hdr_ctxt;
351 hdr_ctxt->next = head;
352 } 343 }
353 /* Post the read */ 344 /* Post the read */
354 err = svc_rdma_send(xprt, &read_wr); 345 err = svc_rdma_send(xprt, &read_wr);
355 if (err) { 346 if (err) {
356 printk(KERN_ERR "svcrdma: Error posting send = %d\n", 347 printk(KERN_ERR "svcrdma: Error %d posting RDMA_READ\n",
357 err); 348 err);
358 /* 349 set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
359 * Break the circular list so free knows when 350 svc_rdma_put_context(ctxt, 0);
360 * to stop if the error happened to occur on
361 * the last read
362 */
363 ctxt->next = NULL;
364 goto out; 351 goto out;
365 } 352 }
366 atomic_inc(&rdma_stat_read); 353 atomic_inc(&rdma_stat_read);
367 354
368 if (read_wr.num_sge < ch_sge_ary[ch_no].count) { 355 if (read_wr.num_sge < chl_map->ch[ch_no].count) {
369 ch_sge_ary[ch_no].count -= read_wr.num_sge; 356 chl_map->ch[ch_no].count -= read_wr.num_sge;
370 ch_sge_ary[ch_no].start += read_wr.num_sge; 357 chl_map->ch[ch_no].start += read_wr.num_sge;
371 goto next_sge; 358 goto next_sge;
372 } 359 }
373 sgl_offset = 0; 360 sgl_offset = 0;
374 err = 0; 361 err = 1;
375 } 362 }
376 363
377 out: 364 out:
378 svc_rdma_put_context(tmp_sge_ctxt, 0); 365 svc_rdma_put_req_map(rpl_map);
379 svc_rdma_put_context(tmp_ch_ctxt, 0); 366 svc_rdma_put_req_map(chl_map);
380 367
381 /* Detach arg pages. svc_recv will replenish them */ 368 /* Detach arg pages. svc_recv will replenish them */
382 for (ch_no = 0; &rqstp->rq_pages[ch_no] < rqstp->rq_respages; ch_no++) 369 for (ch_no = 0; &rqstp->rq_pages[ch_no] < rqstp->rq_respages; ch_no++)
@@ -389,25 +376,12 @@ next_sge:
389 while (rqstp->rq_resused) 376 while (rqstp->rq_resused)
390 rqstp->rq_respages[--rqstp->rq_resused] = NULL; 377 rqstp->rq_respages[--rqstp->rq_resused] = NULL;
391 378
392 if (err) { 379 return err;
393 printk(KERN_ERR "svcrdma : RDMA_READ error = %d\n", err);
394 set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
395 /* Free the linked list of read contexts */
396 while (head != NULL) {
397 ctxt = head->next;
398 svc_rdma_put_context(head, 1);
399 head = ctxt;
400 }
401 return 0;
402 }
403
404 return 1;
405} 380}
406 381
407static int rdma_read_complete(struct svc_rqst *rqstp, 382static int rdma_read_complete(struct svc_rqst *rqstp,
408 struct svc_rdma_op_ctxt *data) 383 struct svc_rdma_op_ctxt *head)
409{ 384{
410 struct svc_rdma_op_ctxt *head = data->next;
411 int page_no; 385 int page_no;
412 int ret; 386 int ret;
413 387
@@ -419,7 +393,7 @@ static int rdma_read_complete(struct svc_rqst *rqstp,
419 rqstp->rq_pages[page_no] = head->pages[page_no]; 393 rqstp->rq_pages[page_no] = head->pages[page_no];
420 } 394 }
421 /* Point rq_arg.pages past header */ 395 /* Point rq_arg.pages past header */
422 rqstp->rq_arg.pages = &rqstp->rq_pages[head->sge[0].length]; 396 rqstp->rq_arg.pages = &rqstp->rq_pages[head->hdr_count];
423 rqstp->rq_arg.page_len = head->arg.page_len; 397 rqstp->rq_arg.page_len = head->arg.page_len;
424 rqstp->rq_arg.page_base = head->arg.page_base; 398 rqstp->rq_arg.page_base = head->arg.page_base;
425 399
@@ -433,21 +407,12 @@ static int rdma_read_complete(struct svc_rqst *rqstp,
433 rqstp->rq_arg.len = head->arg.len; 407 rqstp->rq_arg.len = head->arg.len;
434 rqstp->rq_arg.buflen = head->arg.buflen; 408 rqstp->rq_arg.buflen = head->arg.buflen;
435 409
410 /* Free the context */
411 svc_rdma_put_context(head, 0);
412
436 /* XXX: What should this be? */ 413 /* XXX: What should this be? */
437 rqstp->rq_prot = IPPROTO_MAX; 414 rqstp->rq_prot = IPPROTO_MAX;
438 415 svc_xprt_copy_addrs(rqstp, rqstp->rq_xprt);
439 /*
440 * Free the contexts we used to build the RDMA_READ. We have
441 * to be careful here because the context list uses the same
442 * next pointer used to chain the contexts associated with the
443 * RDMA_READ
444 */
445 data->next = NULL; /* terminate circular list */
446 do {
447 data = head->next;
448 svc_rdma_put_context(head, 0);
449 head = data;
450 } while (head != NULL);
451 416
452 ret = rqstp->rq_arg.head[0].iov_len 417 ret = rqstp->rq_arg.head[0].iov_len
453 + rqstp->rq_arg.page_len 418 + rqstp->rq_arg.page_len
@@ -457,8 +422,6 @@ static int rdma_read_complete(struct svc_rqst *rqstp,
457 ret, rqstp->rq_arg.len, rqstp->rq_arg.head[0].iov_base, 422 ret, rqstp->rq_arg.len, rqstp->rq_arg.head[0].iov_base,
458 rqstp->rq_arg.head[0].iov_len); 423 rqstp->rq_arg.head[0].iov_len);
459 424
460 /* Indicate that we've consumed an RQ credit */
461 rqstp->rq_xprt_ctxt = rqstp->rq_xprt;
462 svc_xprt_received(rqstp->rq_xprt); 425 svc_xprt_received(rqstp->rq_xprt);
463 return ret; 426 return ret;
464} 427}
@@ -480,13 +443,6 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
480 443
481 dprintk("svcrdma: rqstp=%p\n", rqstp); 444 dprintk("svcrdma: rqstp=%p\n", rqstp);
482 445
483 /*
484 * The rq_xprt_ctxt indicates if we've consumed an RQ credit
485 * or not. It is used in the rdma xpo_release_rqst function to
486 * determine whether or not to return an RQ WQE to the RQ.
487 */
488 rqstp->rq_xprt_ctxt = NULL;
489
490 spin_lock_bh(&rdma_xprt->sc_read_complete_lock); 446 spin_lock_bh(&rdma_xprt->sc_read_complete_lock);
491 if (!list_empty(&rdma_xprt->sc_read_complete_q)) { 447 if (!list_empty(&rdma_xprt->sc_read_complete_q)) {
492 ctxt = list_entry(rdma_xprt->sc_read_complete_q.next, 448 ctxt = list_entry(rdma_xprt->sc_read_complete_q.next,
@@ -537,21 +493,22 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
537 /* If the request is invalid, reply with an error */ 493 /* If the request is invalid, reply with an error */
538 if (len < 0) { 494 if (len < 0) {
539 if (len == -ENOSYS) 495 if (len == -ENOSYS)
540 (void)svc_rdma_send_error(rdma_xprt, rmsgp, ERR_VERS); 496 svc_rdma_send_error(rdma_xprt, rmsgp, ERR_VERS);
541 goto close_out; 497 goto close_out;
542 } 498 }
543 499
544 /* Read read-list data. If we would need to wait, defer 500 /* Read read-list data. */
545 * it. Not that in this case, we don't return the RQ credit 501 ret = rdma_read_xdr(rdma_xprt, rmsgp, rqstp, ctxt);
546 * until after the read completes. 502 if (ret > 0) {
547 */ 503 /* read-list posted, defer until data received from client. */
548 if (rdma_read_xdr(rdma_xprt, rmsgp, rqstp, ctxt)) {
549 svc_xprt_received(xprt); 504 svc_xprt_received(xprt);
550 return 0; 505 return 0;
551 } 506 }
552 507 if (ret < 0) {
553 /* Indicate we've consumed an RQ credit */ 508 /* Post of read-list failed, free context. */
554 rqstp->rq_xprt_ctxt = rqstp->rq_xprt; 509 svc_rdma_put_context(ctxt, 1);
510 return 0;
511 }
555 512
556 ret = rqstp->rq_arg.head[0].iov_len 513 ret = rqstp->rq_arg.head[0].iov_len
557 + rqstp->rq_arg.page_len 514 + rqstp->rq_arg.page_len
@@ -569,11 +526,8 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
569 return ret; 526 return ret;
570 527
571 close_out: 528 close_out:
572 if (ctxt) { 529 if (ctxt)
573 svc_rdma_put_context(ctxt, 1); 530 svc_rdma_put_context(ctxt, 1);
574 /* Indicate we've consumed an RQ credit */
575 rqstp->rq_xprt_ctxt = rqstp->rq_xprt;
576 }
577 dprintk("svcrdma: transport %p is closing\n", xprt); 531 dprintk("svcrdma: transport %p is closing\n", xprt);
578 /* 532 /*
579 * Set the close bit and enqueue it. svc_recv will see the 533 * Set the close bit and enqueue it. svc_recv will see the
diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
index 981f190c1b39..a19b22b452a3 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
@@ -63,52 +63,44 @@
63 * SGE[2..sge_count-2] data from xdr->pages[] 63 * SGE[2..sge_count-2] data from xdr->pages[]
64 * SGE[sge_count-1] data from xdr->tail. 64 * SGE[sge_count-1] data from xdr->tail.
65 * 65 *
66 * The max SGE we need is the length of the XDR / pagesize + one for
67 * head + one for tail + one for RPCRDMA header. Since RPCSVC_MAXPAGES
68 * reserves a page for both the request and the reply header, and this
69 * array is only concerned with the reply we are assured that we have
70 * on extra page for the RPCRMDA header.
66 */ 71 */
67static struct ib_sge *xdr_to_sge(struct svcxprt_rdma *xprt, 72static void xdr_to_sge(struct svcxprt_rdma *xprt,
68 struct xdr_buf *xdr, 73 struct xdr_buf *xdr,
69 struct ib_sge *sge, 74 struct svc_rdma_req_map *vec)
70 int *sge_count)
71{ 75{
72 /* Max we need is the length of the XDR / pagesize + one for
73 * head + one for tail + one for RPCRDMA header
74 */
75 int sge_max = (xdr->len+PAGE_SIZE-1) / PAGE_SIZE + 3; 76 int sge_max = (xdr->len+PAGE_SIZE-1) / PAGE_SIZE + 3;
76 int sge_no; 77 int sge_no;
77 u32 byte_count = xdr->len;
78 u32 sge_bytes; 78 u32 sge_bytes;
79 u32 page_bytes; 79 u32 page_bytes;
80 int page_off; 80 u32 page_off;
81 int page_no; 81 int page_no;
82 82
83 BUG_ON(xdr->len !=
84 (xdr->head[0].iov_len + xdr->page_len + xdr->tail[0].iov_len));
85
83 /* Skip the first sge, this is for the RPCRDMA header */ 86 /* Skip the first sge, this is for the RPCRDMA header */
84 sge_no = 1; 87 sge_no = 1;
85 88
86 /* Head SGE */ 89 /* Head SGE */
87 sge[sge_no].addr = ib_dma_map_single(xprt->sc_cm_id->device, 90 vec->sge[sge_no].iov_base = xdr->head[0].iov_base;
88 xdr->head[0].iov_base, 91 vec->sge[sge_no].iov_len = xdr->head[0].iov_len;
89 xdr->head[0].iov_len,
90 DMA_TO_DEVICE);
91 sge_bytes = min_t(u32, byte_count, xdr->head[0].iov_len);
92 byte_count -= sge_bytes;
93 sge[sge_no].length = sge_bytes;
94 sge[sge_no].lkey = xprt->sc_phys_mr->lkey;
95 sge_no++; 92 sge_no++;
96 93
97 /* pages SGE */ 94 /* pages SGE */
98 page_no = 0; 95 page_no = 0;
99 page_bytes = xdr->page_len; 96 page_bytes = xdr->page_len;
100 page_off = xdr->page_base; 97 page_off = xdr->page_base;
101 while (byte_count && page_bytes) { 98 while (page_bytes) {
102 sge_bytes = min_t(u32, byte_count, (PAGE_SIZE-page_off)); 99 vec->sge[sge_no].iov_base =
103 sge[sge_no].addr = 100 page_address(xdr->pages[page_no]) + page_off;
104 ib_dma_map_page(xprt->sc_cm_id->device, 101 sge_bytes = min_t(u32, page_bytes, (PAGE_SIZE - page_off));
105 xdr->pages[page_no], page_off,
106 sge_bytes, DMA_TO_DEVICE);
107 sge_bytes = min(sge_bytes, page_bytes);
108 byte_count -= sge_bytes;
109 page_bytes -= sge_bytes; 102 page_bytes -= sge_bytes;
110 sge[sge_no].length = sge_bytes; 103 vec->sge[sge_no].iov_len = sge_bytes;
111 sge[sge_no].lkey = xprt->sc_phys_mr->lkey;
112 104
113 sge_no++; 105 sge_no++;
114 page_no++; 106 page_no++;
@@ -116,36 +108,24 @@ static struct ib_sge *xdr_to_sge(struct svcxprt_rdma *xprt,
116 } 108 }
117 109
118 /* Tail SGE */ 110 /* Tail SGE */
119 if (byte_count && xdr->tail[0].iov_len) { 111 if (xdr->tail[0].iov_len) {
120 sge[sge_no].addr = 112 vec->sge[sge_no].iov_base = xdr->tail[0].iov_base;
121 ib_dma_map_single(xprt->sc_cm_id->device, 113 vec->sge[sge_no].iov_len = xdr->tail[0].iov_len;
122 xdr->tail[0].iov_base,
123 xdr->tail[0].iov_len,
124 DMA_TO_DEVICE);
125 sge_bytes = min_t(u32, byte_count, xdr->tail[0].iov_len);
126 byte_count -= sge_bytes;
127 sge[sge_no].length = sge_bytes;
128 sge[sge_no].lkey = xprt->sc_phys_mr->lkey;
129 sge_no++; 114 sge_no++;
130 } 115 }
131 116
132 BUG_ON(sge_no > sge_max); 117 BUG_ON(sge_no > sge_max);
133 BUG_ON(byte_count != 0); 118 vec->count = sge_no;
134
135 *sge_count = sge_no;
136 return sge;
137} 119}
138 120
139
140/* Assumptions: 121/* Assumptions:
141 * - The specified write_len can be represented in sc_max_sge * PAGE_SIZE 122 * - The specified write_len can be represented in sc_max_sge * PAGE_SIZE
142 */ 123 */
143static int send_write(struct svcxprt_rdma *xprt, struct svc_rqst *rqstp, 124static int send_write(struct svcxprt_rdma *xprt, struct svc_rqst *rqstp,
144 u32 rmr, u64 to, 125 u32 rmr, u64 to,
145 u32 xdr_off, int write_len, 126 u32 xdr_off, int write_len,
146 struct ib_sge *xdr_sge, int sge_count) 127 struct svc_rdma_req_map *vec)
147{ 128{
148 struct svc_rdma_op_ctxt *tmp_sge_ctxt;
149 struct ib_send_wr write_wr; 129 struct ib_send_wr write_wr;
150 struct ib_sge *sge; 130 struct ib_sge *sge;
151 int xdr_sge_no; 131 int xdr_sge_no;
@@ -154,25 +134,23 @@ static int send_write(struct svcxprt_rdma *xprt, struct svc_rqst *rqstp,
154 int sge_off; 134 int sge_off;
155 int bc; 135 int bc;
156 struct svc_rdma_op_ctxt *ctxt; 136 struct svc_rdma_op_ctxt *ctxt;
157 int ret = 0;
158 137
159 BUG_ON(sge_count > RPCSVC_MAXPAGES); 138 BUG_ON(vec->count > RPCSVC_MAXPAGES);
160 dprintk("svcrdma: RDMA_WRITE rmr=%x, to=%llx, xdr_off=%d, " 139 dprintk("svcrdma: RDMA_WRITE rmr=%x, to=%llx, xdr_off=%d, "
161 "write_len=%d, xdr_sge=%p, sge_count=%d\n", 140 "write_len=%d, vec->sge=%p, vec->count=%lu\n",
162 rmr, (unsigned long long)to, xdr_off, 141 rmr, (unsigned long long)to, xdr_off,
163 write_len, xdr_sge, sge_count); 142 write_len, vec->sge, vec->count);
164 143
165 ctxt = svc_rdma_get_context(xprt); 144 ctxt = svc_rdma_get_context(xprt);
166 ctxt->count = 0; 145 ctxt->direction = DMA_TO_DEVICE;
167 tmp_sge_ctxt = svc_rdma_get_context(xprt); 146 sge = ctxt->sge;
168 sge = tmp_sge_ctxt->sge;
169 147
170 /* Find the SGE associated with xdr_off */ 148 /* Find the SGE associated with xdr_off */
171 for (bc = xdr_off, xdr_sge_no = 1; bc && xdr_sge_no < sge_count; 149 for (bc = xdr_off, xdr_sge_no = 1; bc && xdr_sge_no < vec->count;
172 xdr_sge_no++) { 150 xdr_sge_no++) {
173 if (xdr_sge[xdr_sge_no].length > bc) 151 if (vec->sge[xdr_sge_no].iov_len > bc)
174 break; 152 break;
175 bc -= xdr_sge[xdr_sge_no].length; 153 bc -= vec->sge[xdr_sge_no].iov_len;
176 } 154 }
177 155
178 sge_off = bc; 156 sge_off = bc;
@@ -180,21 +158,28 @@ static int send_write(struct svcxprt_rdma *xprt, struct svc_rqst *rqstp,
180 sge_no = 0; 158 sge_no = 0;
181 159
182 /* Copy the remaining SGE */ 160 /* Copy the remaining SGE */
183 while (bc != 0 && xdr_sge_no < sge_count) { 161 while (bc != 0 && xdr_sge_no < vec->count) {
184 sge[sge_no].addr = xdr_sge[xdr_sge_no].addr + sge_off; 162 sge[sge_no].lkey = xprt->sc_phys_mr->lkey;
185 sge[sge_no].lkey = xdr_sge[xdr_sge_no].lkey;
186 sge_bytes = min((size_t)bc, 163 sge_bytes = min((size_t)bc,
187 (size_t)(xdr_sge[xdr_sge_no].length-sge_off)); 164 (size_t)(vec->sge[xdr_sge_no].iov_len-sge_off));
188 sge[sge_no].length = sge_bytes; 165 sge[sge_no].length = sge_bytes;
189 166 atomic_inc(&xprt->sc_dma_used);
167 sge[sge_no].addr =
168 ib_dma_map_single(xprt->sc_cm_id->device,
169 (void *)
170 vec->sge[xdr_sge_no].iov_base + sge_off,
171 sge_bytes, DMA_TO_DEVICE);
172 if (dma_mapping_error(sge[sge_no].addr))
173 goto err;
190 sge_off = 0; 174 sge_off = 0;
191 sge_no++; 175 sge_no++;
176 ctxt->count++;
192 xdr_sge_no++; 177 xdr_sge_no++;
193 bc -= sge_bytes; 178 bc -= sge_bytes;
194 } 179 }
195 180
196 BUG_ON(bc != 0); 181 BUG_ON(bc != 0);
197 BUG_ON(xdr_sge_no > sge_count); 182 BUG_ON(xdr_sge_no > vec->count);
198 183
199 /* Prepare WRITE WR */ 184 /* Prepare WRITE WR */
200 memset(&write_wr, 0, sizeof write_wr); 185 memset(&write_wr, 0, sizeof write_wr);
@@ -209,21 +194,20 @@ static int send_write(struct svcxprt_rdma *xprt, struct svc_rqst *rqstp,
209 194
210 /* Post It */ 195 /* Post It */
211 atomic_inc(&rdma_stat_write); 196 atomic_inc(&rdma_stat_write);
212 if (svc_rdma_send(xprt, &write_wr)) { 197 if (svc_rdma_send(xprt, &write_wr))
213 svc_rdma_put_context(ctxt, 1); 198 goto err;
214 /* Fatal error, close transport */ 199 return 0;
215 ret = -EIO; 200 err:
216 } 201 svc_rdma_put_context(ctxt, 0);
217 svc_rdma_put_context(tmp_sge_ctxt, 0); 202 /* Fatal error, close transport */
218 return ret; 203 return -EIO;
219} 204}
220 205
221static int send_write_chunks(struct svcxprt_rdma *xprt, 206static int send_write_chunks(struct svcxprt_rdma *xprt,
222 struct rpcrdma_msg *rdma_argp, 207 struct rpcrdma_msg *rdma_argp,
223 struct rpcrdma_msg *rdma_resp, 208 struct rpcrdma_msg *rdma_resp,
224 struct svc_rqst *rqstp, 209 struct svc_rqst *rqstp,
225 struct ib_sge *sge, 210 struct svc_rdma_req_map *vec)
226 int sge_count)
227{ 211{
228 u32 xfer_len = rqstp->rq_res.page_len + rqstp->rq_res.tail[0].iov_len; 212 u32 xfer_len = rqstp->rq_res.page_len + rqstp->rq_res.tail[0].iov_len;
229 int write_len; 213 int write_len;
@@ -269,8 +253,7 @@ static int send_write_chunks(struct svcxprt_rdma *xprt,
269 rs_offset + chunk_off, 253 rs_offset + chunk_off,
270 xdr_off, 254 xdr_off,
271 this_write, 255 this_write,
272 sge, 256 vec);
273 sge_count);
274 if (ret) { 257 if (ret) {
275 dprintk("svcrdma: RDMA_WRITE failed, ret=%d\n", 258 dprintk("svcrdma: RDMA_WRITE failed, ret=%d\n",
276 ret); 259 ret);
@@ -292,8 +275,7 @@ static int send_reply_chunks(struct svcxprt_rdma *xprt,
292 struct rpcrdma_msg *rdma_argp, 275 struct rpcrdma_msg *rdma_argp,
293 struct rpcrdma_msg *rdma_resp, 276 struct rpcrdma_msg *rdma_resp,
294 struct svc_rqst *rqstp, 277 struct svc_rqst *rqstp,
295 struct ib_sge *sge, 278 struct svc_rdma_req_map *vec)
296 int sge_count)
297{ 279{
298 u32 xfer_len = rqstp->rq_res.len; 280 u32 xfer_len = rqstp->rq_res.len;
299 int write_len; 281 int write_len;
@@ -341,8 +323,7 @@ static int send_reply_chunks(struct svcxprt_rdma *xprt,
341 rs_offset + chunk_off, 323 rs_offset + chunk_off,
342 xdr_off, 324 xdr_off,
343 this_write, 325 this_write,
344 sge, 326 vec);
345 sge_count);
346 if (ret) { 327 if (ret) {
347 dprintk("svcrdma: RDMA_WRITE failed, ret=%d\n", 328 dprintk("svcrdma: RDMA_WRITE failed, ret=%d\n",
348 ret); 329 ret);
@@ -380,7 +361,7 @@ static int send_reply(struct svcxprt_rdma *rdma,
380 struct page *page, 361 struct page *page,
381 struct rpcrdma_msg *rdma_resp, 362 struct rpcrdma_msg *rdma_resp,
382 struct svc_rdma_op_ctxt *ctxt, 363 struct svc_rdma_op_ctxt *ctxt,
383 int sge_count, 364 struct svc_rdma_req_map *vec,
384 int byte_count) 365 int byte_count)
385{ 366{
386 struct ib_send_wr send_wr; 367 struct ib_send_wr send_wr;
@@ -389,11 +370,23 @@ static int send_reply(struct svcxprt_rdma *rdma,
389 int page_no; 370 int page_no;
390 int ret; 371 int ret;
391 372
373 /* Post a recv buffer to handle another request. */
374 ret = svc_rdma_post_recv(rdma);
375 if (ret) {
376 printk(KERN_INFO
377 "svcrdma: could not post a receive buffer, err=%d."
378 "Closing transport %p.\n", ret, rdma);
379 set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
380 svc_rdma_put_context(ctxt, 0);
381 return -ENOTCONN;
382 }
383
392 /* Prepare the context */ 384 /* Prepare the context */
393 ctxt->pages[0] = page; 385 ctxt->pages[0] = page;
394 ctxt->count = 1; 386 ctxt->count = 1;
395 387
396 /* Prepare the SGE for the RPCRDMA Header */ 388 /* Prepare the SGE for the RPCRDMA Header */
389 atomic_inc(&rdma->sc_dma_used);
397 ctxt->sge[0].addr = 390 ctxt->sge[0].addr =
398 ib_dma_map_page(rdma->sc_cm_id->device, 391 ib_dma_map_page(rdma->sc_cm_id->device,
399 page, 0, PAGE_SIZE, DMA_TO_DEVICE); 392 page, 0, PAGE_SIZE, DMA_TO_DEVICE);
@@ -402,10 +395,16 @@ static int send_reply(struct svcxprt_rdma *rdma,
402 ctxt->sge[0].lkey = rdma->sc_phys_mr->lkey; 395 ctxt->sge[0].lkey = rdma->sc_phys_mr->lkey;
403 396
404 /* Determine how many of our SGE are to be transmitted */ 397 /* Determine how many of our SGE are to be transmitted */
405 for (sge_no = 1; byte_count && sge_no < sge_count; sge_no++) { 398 for (sge_no = 1; byte_count && sge_no < vec->count; sge_no++) {
406 sge_bytes = min((size_t)ctxt->sge[sge_no].length, 399 sge_bytes = min_t(size_t, vec->sge[sge_no].iov_len, byte_count);
407 (size_t)byte_count);
408 byte_count -= sge_bytes; 400 byte_count -= sge_bytes;
401 atomic_inc(&rdma->sc_dma_used);
402 ctxt->sge[sge_no].addr =
403 ib_dma_map_single(rdma->sc_cm_id->device,
404 vec->sge[sge_no].iov_base,
405 sge_bytes, DMA_TO_DEVICE);
406 ctxt->sge[sge_no].length = sge_bytes;
407 ctxt->sge[sge_no].lkey = rdma->sc_phys_mr->lkey;
409 } 408 }
410 BUG_ON(byte_count != 0); 409 BUG_ON(byte_count != 0);
411 410
@@ -417,8 +416,10 @@ static int send_reply(struct svcxprt_rdma *rdma,
417 ctxt->pages[page_no+1] = rqstp->rq_respages[page_no]; 416 ctxt->pages[page_no+1] = rqstp->rq_respages[page_no];
418 ctxt->count++; 417 ctxt->count++;
419 rqstp->rq_respages[page_no] = NULL; 418 rqstp->rq_respages[page_no] = NULL;
419 /* If there are more pages than SGE, terminate SGE list */
420 if (page_no+1 >= sge_no)
421 ctxt->sge[page_no+1].length = 0;
420 } 422 }
421
422 BUG_ON(sge_no > rdma->sc_max_sge); 423 BUG_ON(sge_no > rdma->sc_max_sge);
423 memset(&send_wr, 0, sizeof send_wr); 424 memset(&send_wr, 0, sizeof send_wr);
424 ctxt->wr_op = IB_WR_SEND; 425 ctxt->wr_op = IB_WR_SEND;
@@ -462,20 +463,20 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
462 enum rpcrdma_proc reply_type; 463 enum rpcrdma_proc reply_type;
463 int ret; 464 int ret;
464 int inline_bytes; 465 int inline_bytes;
465 struct ib_sge *sge;
466 int sge_count = 0;
467 struct page *res_page; 466 struct page *res_page;
468 struct svc_rdma_op_ctxt *ctxt; 467 struct svc_rdma_op_ctxt *ctxt;
468 struct svc_rdma_req_map *vec;
469 469
470 dprintk("svcrdma: sending response for rqstp=%p\n", rqstp); 470 dprintk("svcrdma: sending response for rqstp=%p\n", rqstp);
471 471
472 /* Get the RDMA request header. */ 472 /* Get the RDMA request header. */
473 rdma_argp = xdr_start(&rqstp->rq_arg); 473 rdma_argp = xdr_start(&rqstp->rq_arg);
474 474
475 /* Build an SGE for the XDR */ 475 /* Build an req vec for the XDR */
476 ctxt = svc_rdma_get_context(rdma); 476 ctxt = svc_rdma_get_context(rdma);
477 ctxt->direction = DMA_TO_DEVICE; 477 ctxt->direction = DMA_TO_DEVICE;
478 sge = xdr_to_sge(rdma, &rqstp->rq_res, ctxt->sge, &sge_count); 478 vec = svc_rdma_get_req_map();
479 xdr_to_sge(rdma, &rqstp->rq_res, vec);
479 480
480 inline_bytes = rqstp->rq_res.len; 481 inline_bytes = rqstp->rq_res.len;
481 482
@@ -492,7 +493,7 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
492 493
493 /* Send any write-chunk data and build resp write-list */ 494 /* Send any write-chunk data and build resp write-list */
494 ret = send_write_chunks(rdma, rdma_argp, rdma_resp, 495 ret = send_write_chunks(rdma, rdma_argp, rdma_resp,
495 rqstp, sge, sge_count); 496 rqstp, vec);
496 if (ret < 0) { 497 if (ret < 0) {
497 printk(KERN_ERR "svcrdma: failed to send write chunks, rc=%d\n", 498 printk(KERN_ERR "svcrdma: failed to send write chunks, rc=%d\n",
498 ret); 499 ret);
@@ -502,7 +503,7 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
502 503
503 /* Send any reply-list data and update resp reply-list */ 504 /* Send any reply-list data and update resp reply-list */
504 ret = send_reply_chunks(rdma, rdma_argp, rdma_resp, 505 ret = send_reply_chunks(rdma, rdma_argp, rdma_resp,
505 rqstp, sge, sge_count); 506 rqstp, vec);
506 if (ret < 0) { 507 if (ret < 0) {
507 printk(KERN_ERR "svcrdma: failed to send reply chunks, rc=%d\n", 508 printk(KERN_ERR "svcrdma: failed to send reply chunks, rc=%d\n",
508 ret); 509 ret);
@@ -510,11 +511,13 @@ int svc_rdma_sendto(struct svc_rqst *rqstp)
510 } 511 }
511 inline_bytes -= ret; 512 inline_bytes -= ret;
512 513
513 ret = send_reply(rdma, rqstp, res_page, rdma_resp, ctxt, sge_count, 514 ret = send_reply(rdma, rqstp, res_page, rdma_resp, ctxt, vec,
514 inline_bytes); 515 inline_bytes);
516 svc_rdma_put_req_map(vec);
515 dprintk("svcrdma: send_reply returns %d\n", ret); 517 dprintk("svcrdma: send_reply returns %d\n", ret);
516 return ret; 518 return ret;
517 error: 519 error:
520 svc_rdma_put_req_map(vec);
518 svc_rdma_put_context(ctxt, 0); 521 svc_rdma_put_context(ctxt, 0);
519 put_page(res_page); 522 put_page(res_page);
520 return ret; 523 return ret;
diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c
index af408fc12634..19ddc382b777 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_transport.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c
@@ -84,67 +84,37 @@ struct svc_xprt_class svc_rdma_class = {
84 .xcl_max_payload = RPCSVC_MAXPAYLOAD_TCP, 84 .xcl_max_payload = RPCSVC_MAXPAYLOAD_TCP,
85}; 85};
86 86
87static int rdma_bump_context_cache(struct svcxprt_rdma *xprt) 87/* WR context cache. Created in svc_rdma.c */
88extern struct kmem_cache *svc_rdma_ctxt_cachep;
89
90struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *xprt)
88{ 91{
89 int target;
90 int at_least_one = 0;
91 struct svc_rdma_op_ctxt *ctxt; 92 struct svc_rdma_op_ctxt *ctxt;
92 93
93 target = min(xprt->sc_ctxt_cnt + xprt->sc_ctxt_bump, 94 while (1) {
94 xprt->sc_ctxt_max); 95 ctxt = kmem_cache_alloc(svc_rdma_ctxt_cachep, GFP_KERNEL);
95 96 if (ctxt)
96 spin_lock_bh(&xprt->sc_ctxt_lock);
97 while (xprt->sc_ctxt_cnt < target) {
98 xprt->sc_ctxt_cnt++;
99 spin_unlock_bh(&xprt->sc_ctxt_lock);
100
101 ctxt = kmalloc(sizeof(*ctxt), GFP_KERNEL);
102
103 spin_lock_bh(&xprt->sc_ctxt_lock);
104 if (ctxt) {
105 at_least_one = 1;
106 ctxt->next = xprt->sc_ctxt_head;
107 xprt->sc_ctxt_head = ctxt;
108 } else {
109 /* kmalloc failed...give up for now */
110 xprt->sc_ctxt_cnt--;
111 break; 97 break;
112 } 98 schedule_timeout_uninterruptible(msecs_to_jiffies(500));
113 } 99 }
114 spin_unlock_bh(&xprt->sc_ctxt_lock); 100 ctxt->xprt = xprt;
115 dprintk("svcrdma: sc_ctxt_max=%d, sc_ctxt_cnt=%d\n", 101 INIT_LIST_HEAD(&ctxt->dto_q);
116 xprt->sc_ctxt_max, xprt->sc_ctxt_cnt); 102 ctxt->count = 0;
117 return at_least_one; 103 atomic_inc(&xprt->sc_ctxt_used);
104 return ctxt;
118} 105}
119 106
120struct svc_rdma_op_ctxt *svc_rdma_get_context(struct svcxprt_rdma *xprt) 107static void svc_rdma_unmap_dma(struct svc_rdma_op_ctxt *ctxt)
121{ 108{
122 struct svc_rdma_op_ctxt *ctxt; 109 struct svcxprt_rdma *xprt = ctxt->xprt;
123 110 int i;
124 while (1) { 111 for (i = 0; i < ctxt->count && ctxt->sge[i].length; i++) {
125 spin_lock_bh(&xprt->sc_ctxt_lock); 112 atomic_dec(&xprt->sc_dma_used);
126 if (unlikely(xprt->sc_ctxt_head == NULL)) { 113 ib_dma_unmap_single(xprt->sc_cm_id->device,
127 /* Try to bump my cache. */ 114 ctxt->sge[i].addr,
128 spin_unlock_bh(&xprt->sc_ctxt_lock); 115 ctxt->sge[i].length,
129 116 ctxt->direction);
130 if (rdma_bump_context_cache(xprt))
131 continue;
132
133 printk(KERN_INFO "svcrdma: sleeping waiting for "
134 "context memory on xprt=%p\n",
135 xprt);
136 schedule_timeout_uninterruptible(msecs_to_jiffies(500));
137 continue;
138 }
139 ctxt = xprt->sc_ctxt_head;
140 xprt->sc_ctxt_head = ctxt->next;
141 spin_unlock_bh(&xprt->sc_ctxt_lock);
142 ctxt->xprt = xprt;
143 INIT_LIST_HEAD(&ctxt->dto_q);
144 ctxt->count = 0;
145 break;
146 } 117 }
147 return ctxt;
148} 118}
149 119
150void svc_rdma_put_context(struct svc_rdma_op_ctxt *ctxt, int free_pages) 120void svc_rdma_put_context(struct svc_rdma_op_ctxt *ctxt, int free_pages)
@@ -158,15 +128,34 @@ void svc_rdma_put_context(struct svc_rdma_op_ctxt *ctxt, int free_pages)
158 for (i = 0; i < ctxt->count; i++) 128 for (i = 0; i < ctxt->count; i++)
159 put_page(ctxt->pages[i]); 129 put_page(ctxt->pages[i]);
160 130
161 for (i = 0; i < ctxt->count; i++) 131 kmem_cache_free(svc_rdma_ctxt_cachep, ctxt);
162 dma_unmap_single(xprt->sc_cm_id->device->dma_device, 132 atomic_dec(&xprt->sc_ctxt_used);
163 ctxt->sge[i].addr, 133}
164 ctxt->sge[i].length, 134
165 ctxt->direction); 135/* Temporary NFS request map cache. Created in svc_rdma.c */
166 spin_lock_bh(&xprt->sc_ctxt_lock); 136extern struct kmem_cache *svc_rdma_map_cachep;
167 ctxt->next = xprt->sc_ctxt_head; 137
168 xprt->sc_ctxt_head = ctxt; 138/*
169 spin_unlock_bh(&xprt->sc_ctxt_lock); 139 * Temporary NFS req mappings are shared across all transport
140 * instances. These are short lived and should be bounded by the number
141 * of concurrent server threads * depth of the SQ.
142 */
143struct svc_rdma_req_map *svc_rdma_get_req_map(void)
144{
145 struct svc_rdma_req_map *map;
146 while (1) {
147 map = kmem_cache_alloc(svc_rdma_map_cachep, GFP_KERNEL);
148 if (map)
149 break;
150 schedule_timeout_uninterruptible(msecs_to_jiffies(500));
151 }
152 map->count = 0;
153 return map;
154}
155
156void svc_rdma_put_req_map(struct svc_rdma_req_map *map)
157{
158 kmem_cache_free(svc_rdma_map_cachep, map);
170} 159}
171 160
172/* ib_cq event handler */ 161/* ib_cq event handler */
@@ -228,23 +217,8 @@ static void dto_tasklet_func(unsigned long data)
228 list_del_init(&xprt->sc_dto_q); 217 list_del_init(&xprt->sc_dto_q);
229 spin_unlock_irqrestore(&dto_lock, flags); 218 spin_unlock_irqrestore(&dto_lock, flags);
230 219
231 if (test_and_clear_bit(RDMAXPRT_RQ_PENDING, &xprt->sc_flags)) { 220 rq_cq_reap(xprt);
232 ib_req_notify_cq(xprt->sc_rq_cq, IB_CQ_NEXT_COMP); 221 sq_cq_reap(xprt);
233 rq_cq_reap(xprt);
234 set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags);
235 /*
236 * If data arrived before established event,
237 * don't enqueue. This defers RPC I/O until the
238 * RDMA connection is complete.
239 */
240 if (!test_bit(RDMAXPRT_CONN_PENDING, &xprt->sc_flags))
241 svc_xprt_enqueue(&xprt->sc_xprt);
242 }
243
244 if (test_and_clear_bit(RDMAXPRT_SQ_PENDING, &xprt->sc_flags)) {
245 ib_req_notify_cq(xprt->sc_sq_cq, IB_CQ_NEXT_COMP);
246 sq_cq_reap(xprt);
247 }
248 222
249 svc_xprt_put(&xprt->sc_xprt); 223 svc_xprt_put(&xprt->sc_xprt);
250 spin_lock_irqsave(&dto_lock, flags); 224 spin_lock_irqsave(&dto_lock, flags);
@@ -263,11 +237,15 @@ static void rq_comp_handler(struct ib_cq *cq, void *cq_context)
263 struct svcxprt_rdma *xprt = cq_context; 237 struct svcxprt_rdma *xprt = cq_context;
264 unsigned long flags; 238 unsigned long flags;
265 239
240 /* Guard against unconditional flush call for destroyed QP */
241 if (atomic_read(&xprt->sc_xprt.xpt_ref.refcount)==0)
242 return;
243
266 /* 244 /*
267 * Set the bit regardless of whether or not it's on the list 245 * Set the bit regardless of whether or not it's on the list
268 * because it may be on the list already due to an SQ 246 * because it may be on the list already due to an SQ
269 * completion. 247 * completion.
270 */ 248 */
271 set_bit(RDMAXPRT_RQ_PENDING, &xprt->sc_flags); 249 set_bit(RDMAXPRT_RQ_PENDING, &xprt->sc_flags);
272 250
273 /* 251 /*
@@ -290,6 +268,8 @@ static void rq_comp_handler(struct ib_cq *cq, void *cq_context)
290 * 268 *
291 * Take all completing WC off the CQE and enqueue the associated DTO 269 * Take all completing WC off the CQE and enqueue the associated DTO
292 * context on the dto_q for the transport. 270 * context on the dto_q for the transport.
271 *
272 * Note that caller must hold a transport reference.
293 */ 273 */
294static void rq_cq_reap(struct svcxprt_rdma *xprt) 274static void rq_cq_reap(struct svcxprt_rdma *xprt)
295{ 275{
@@ -297,29 +277,48 @@ static void rq_cq_reap(struct svcxprt_rdma *xprt)
297 struct ib_wc wc; 277 struct ib_wc wc;
298 struct svc_rdma_op_ctxt *ctxt = NULL; 278 struct svc_rdma_op_ctxt *ctxt = NULL;
299 279
280 if (!test_and_clear_bit(RDMAXPRT_RQ_PENDING, &xprt->sc_flags))
281 return;
282
283 ib_req_notify_cq(xprt->sc_rq_cq, IB_CQ_NEXT_COMP);
300 atomic_inc(&rdma_stat_rq_poll); 284 atomic_inc(&rdma_stat_rq_poll);
301 285
302 spin_lock_bh(&xprt->sc_rq_dto_lock);
303 while ((ret = ib_poll_cq(xprt->sc_rq_cq, 1, &wc)) > 0) { 286 while ((ret = ib_poll_cq(xprt->sc_rq_cq, 1, &wc)) > 0) {
304 ctxt = (struct svc_rdma_op_ctxt *)(unsigned long)wc.wr_id; 287 ctxt = (struct svc_rdma_op_ctxt *)(unsigned long)wc.wr_id;
305 ctxt->wc_status = wc.status; 288 ctxt->wc_status = wc.status;
306 ctxt->byte_len = wc.byte_len; 289 ctxt->byte_len = wc.byte_len;
290 svc_rdma_unmap_dma(ctxt);
307 if (wc.status != IB_WC_SUCCESS) { 291 if (wc.status != IB_WC_SUCCESS) {
308 /* Close the transport */ 292 /* Close the transport */
293 dprintk("svcrdma: transport closing putting ctxt %p\n", ctxt);
309 set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags); 294 set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
310 svc_rdma_put_context(ctxt, 1); 295 svc_rdma_put_context(ctxt, 1);
296 svc_xprt_put(&xprt->sc_xprt);
311 continue; 297 continue;
312 } 298 }
299 spin_lock_bh(&xprt->sc_rq_dto_lock);
313 list_add_tail(&ctxt->dto_q, &xprt->sc_rq_dto_q); 300 list_add_tail(&ctxt->dto_q, &xprt->sc_rq_dto_q);
301 spin_unlock_bh(&xprt->sc_rq_dto_lock);
302 svc_xprt_put(&xprt->sc_xprt);
314 } 303 }
315 spin_unlock_bh(&xprt->sc_rq_dto_lock);
316 304
317 if (ctxt) 305 if (ctxt)
318 atomic_inc(&rdma_stat_rq_prod); 306 atomic_inc(&rdma_stat_rq_prod);
307
308 set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags);
309 /*
310 * If data arrived before established event,
311 * don't enqueue. This defers RPC I/O until the
312 * RDMA connection is complete.
313 */
314 if (!test_bit(RDMAXPRT_CONN_PENDING, &xprt->sc_flags))
315 svc_xprt_enqueue(&xprt->sc_xprt);
319} 316}
320 317
321/* 318/*
322 * Send Queue Completion Handler - potentially called on interrupt context. 319 * Send Queue Completion Handler - potentially called on interrupt context.
320 *
321 * Note that caller must hold a transport reference.
323 */ 322 */
324static void sq_cq_reap(struct svcxprt_rdma *xprt) 323static void sq_cq_reap(struct svcxprt_rdma *xprt)
325{ 324{
@@ -328,11 +327,17 @@ static void sq_cq_reap(struct svcxprt_rdma *xprt)
328 struct ib_cq *cq = xprt->sc_sq_cq; 327 struct ib_cq *cq = xprt->sc_sq_cq;
329 int ret; 328 int ret;
330 329
330
331 if (!test_and_clear_bit(RDMAXPRT_SQ_PENDING, &xprt->sc_flags))
332 return;
333
334 ib_req_notify_cq(xprt->sc_sq_cq, IB_CQ_NEXT_COMP);
331 atomic_inc(&rdma_stat_sq_poll); 335 atomic_inc(&rdma_stat_sq_poll);
332 while ((ret = ib_poll_cq(cq, 1, &wc)) > 0) { 336 while ((ret = ib_poll_cq(cq, 1, &wc)) > 0) {
333 ctxt = (struct svc_rdma_op_ctxt *)(unsigned long)wc.wr_id; 337 ctxt = (struct svc_rdma_op_ctxt *)(unsigned long)wc.wr_id;
334 xprt = ctxt->xprt; 338 xprt = ctxt->xprt;
335 339
340 svc_rdma_unmap_dma(ctxt);
336 if (wc.status != IB_WC_SUCCESS) 341 if (wc.status != IB_WC_SUCCESS)
337 /* Close the transport */ 342 /* Close the transport */
338 set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags); 343 set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
@@ -343,20 +348,25 @@ static void sq_cq_reap(struct svcxprt_rdma *xprt)
343 348
344 switch (ctxt->wr_op) { 349 switch (ctxt->wr_op) {
345 case IB_WR_SEND: 350 case IB_WR_SEND:
346 case IB_WR_RDMA_WRITE:
347 svc_rdma_put_context(ctxt, 1); 351 svc_rdma_put_context(ctxt, 1);
348 break; 352 break;
349 353
354 case IB_WR_RDMA_WRITE:
355 svc_rdma_put_context(ctxt, 0);
356 break;
357
350 case IB_WR_RDMA_READ: 358 case IB_WR_RDMA_READ:
351 if (test_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags)) { 359 if (test_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags)) {
360 struct svc_rdma_op_ctxt *read_hdr = ctxt->read_hdr;
361 BUG_ON(!read_hdr);
352 set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags); 362 set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags);
353 set_bit(RDMACTXT_F_READ_DONE, &ctxt->flags);
354 spin_lock_bh(&xprt->sc_read_complete_lock); 363 spin_lock_bh(&xprt->sc_read_complete_lock);
355 list_add_tail(&ctxt->dto_q, 364 list_add_tail(&read_hdr->dto_q,
356 &xprt->sc_read_complete_q); 365 &xprt->sc_read_complete_q);
357 spin_unlock_bh(&xprt->sc_read_complete_lock); 366 spin_unlock_bh(&xprt->sc_read_complete_lock);
358 svc_xprt_enqueue(&xprt->sc_xprt); 367 svc_xprt_enqueue(&xprt->sc_xprt);
359 } 368 }
369 svc_rdma_put_context(ctxt, 0);
360 break; 370 break;
361 371
362 default: 372 default:
@@ -365,6 +375,7 @@ static void sq_cq_reap(struct svcxprt_rdma *xprt)
365 wc.opcode, wc.status); 375 wc.opcode, wc.status);
366 break; 376 break;
367 } 377 }
378 svc_xprt_put(&xprt->sc_xprt);
368 } 379 }
369 380
370 if (ctxt) 381 if (ctxt)
@@ -376,11 +387,15 @@ static void sq_comp_handler(struct ib_cq *cq, void *cq_context)
376 struct svcxprt_rdma *xprt = cq_context; 387 struct svcxprt_rdma *xprt = cq_context;
377 unsigned long flags; 388 unsigned long flags;
378 389
390 /* Guard against unconditional flush call for destroyed QP */
391 if (atomic_read(&xprt->sc_xprt.xpt_ref.refcount)==0)
392 return;
393
379 /* 394 /*
380 * Set the bit regardless of whether or not it's on the list 395 * Set the bit regardless of whether or not it's on the list
381 * because it may be on the list already due to an RQ 396 * because it may be on the list already due to an RQ
382 * completion. 397 * completion.
383 */ 398 */
384 set_bit(RDMAXPRT_SQ_PENDING, &xprt->sc_flags); 399 set_bit(RDMAXPRT_SQ_PENDING, &xprt->sc_flags);
385 400
386 /* 401 /*
@@ -398,39 +413,6 @@ static void sq_comp_handler(struct ib_cq *cq, void *cq_context)
398 tasklet_schedule(&dto_tasklet); 413 tasklet_schedule(&dto_tasklet);
399} 414}
400 415
401static void create_context_cache(struct svcxprt_rdma *xprt,
402 int ctxt_count, int ctxt_bump, int ctxt_max)
403{
404 struct svc_rdma_op_ctxt *ctxt;
405 int i;
406
407 xprt->sc_ctxt_max = ctxt_max;
408 xprt->sc_ctxt_bump = ctxt_bump;
409 xprt->sc_ctxt_cnt = 0;
410 xprt->sc_ctxt_head = NULL;
411 for (i = 0; i < ctxt_count; i++) {
412 ctxt = kmalloc(sizeof(*ctxt), GFP_KERNEL);
413 if (ctxt) {
414 ctxt->next = xprt->sc_ctxt_head;
415 xprt->sc_ctxt_head = ctxt;
416 xprt->sc_ctxt_cnt++;
417 }
418 }
419}
420
421static void destroy_context_cache(struct svc_rdma_op_ctxt *ctxt)
422{
423 struct svc_rdma_op_ctxt *next;
424 if (!ctxt)
425 return;
426
427 do {
428 next = ctxt->next;
429 kfree(ctxt);
430 ctxt = next;
431 } while (next);
432}
433
434static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *serv, 416static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *serv,
435 int listener) 417 int listener)
436{ 418{
@@ -447,7 +429,6 @@ static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *serv,
447 429
448 spin_lock_init(&cma_xprt->sc_lock); 430 spin_lock_init(&cma_xprt->sc_lock);
449 spin_lock_init(&cma_xprt->sc_read_complete_lock); 431 spin_lock_init(&cma_xprt->sc_read_complete_lock);
450 spin_lock_init(&cma_xprt->sc_ctxt_lock);
451 spin_lock_init(&cma_xprt->sc_rq_dto_lock); 432 spin_lock_init(&cma_xprt->sc_rq_dto_lock);
452 433
453 cma_xprt->sc_ord = svcrdma_ord; 434 cma_xprt->sc_ord = svcrdma_ord;
@@ -456,21 +437,9 @@ static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *serv,
456 cma_xprt->sc_max_requests = svcrdma_max_requests; 437 cma_xprt->sc_max_requests = svcrdma_max_requests;
457 cma_xprt->sc_sq_depth = svcrdma_max_requests * RPCRDMA_SQ_DEPTH_MULT; 438 cma_xprt->sc_sq_depth = svcrdma_max_requests * RPCRDMA_SQ_DEPTH_MULT;
458 atomic_set(&cma_xprt->sc_sq_count, 0); 439 atomic_set(&cma_xprt->sc_sq_count, 0);
440 atomic_set(&cma_xprt->sc_ctxt_used, 0);
459 441
460 if (!listener) { 442 if (listener)
461 int reqs = cma_xprt->sc_max_requests;
462 create_context_cache(cma_xprt,
463 reqs << 1, /* starting size */
464 reqs, /* bump amount */
465 reqs +
466 cma_xprt->sc_sq_depth +
467 RPCRDMA_MAX_THREADS + 1); /* max */
468 if (!cma_xprt->sc_ctxt_head) {
469 kfree(cma_xprt);
470 return NULL;
471 }
472 clear_bit(XPT_LISTENER, &cma_xprt->sc_xprt.xpt_flags);
473 } else
474 set_bit(XPT_LISTENER, &cma_xprt->sc_xprt.xpt_flags); 443 set_bit(XPT_LISTENER, &cma_xprt->sc_xprt.xpt_flags);
475 444
476 return cma_xprt; 445 return cma_xprt;
@@ -506,6 +475,7 @@ int svc_rdma_post_recv(struct svcxprt_rdma *xprt)
506 BUG_ON(sge_no >= xprt->sc_max_sge); 475 BUG_ON(sge_no >= xprt->sc_max_sge);
507 page = svc_rdma_get_page(); 476 page = svc_rdma_get_page();
508 ctxt->pages[sge_no] = page; 477 ctxt->pages[sge_no] = page;
478 atomic_inc(&xprt->sc_dma_used);
509 pa = ib_dma_map_page(xprt->sc_cm_id->device, 479 pa = ib_dma_map_page(xprt->sc_cm_id->device,
510 page, 0, PAGE_SIZE, 480 page, 0, PAGE_SIZE,
511 DMA_FROM_DEVICE); 481 DMA_FROM_DEVICE);
@@ -520,7 +490,12 @@ int svc_rdma_post_recv(struct svcxprt_rdma *xprt)
520 recv_wr.num_sge = ctxt->count; 490 recv_wr.num_sge = ctxt->count;
521 recv_wr.wr_id = (u64)(unsigned long)ctxt; 491 recv_wr.wr_id = (u64)(unsigned long)ctxt;
522 492
493 svc_xprt_get(&xprt->sc_xprt);
523 ret = ib_post_recv(xprt->sc_qp, &recv_wr, &bad_recv_wr); 494 ret = ib_post_recv(xprt->sc_qp, &recv_wr, &bad_recv_wr);
495 if (ret) {
496 svc_xprt_put(&xprt->sc_xprt);
497 svc_rdma_put_context(ctxt, 1);
498 }
524 return ret; 499 return ret;
525} 500}
526 501
@@ -535,10 +510,11 @@ int svc_rdma_post_recv(struct svcxprt_rdma *xprt)
535 * will call the recvfrom method on the listen xprt which will accept the new 510 * will call the recvfrom method on the listen xprt which will accept the new
536 * connection. 511 * connection.
537 */ 512 */
538static void handle_connect_req(struct rdma_cm_id *new_cma_id) 513static void handle_connect_req(struct rdma_cm_id *new_cma_id, size_t client_ird)
539{ 514{
540 struct svcxprt_rdma *listen_xprt = new_cma_id->context; 515 struct svcxprt_rdma *listen_xprt = new_cma_id->context;
541 struct svcxprt_rdma *newxprt; 516 struct svcxprt_rdma *newxprt;
517 struct sockaddr *sa;
542 518
543 /* Create a new transport */ 519 /* Create a new transport */
544 newxprt = rdma_create_xprt(listen_xprt->sc_xprt.xpt_server, 0); 520 newxprt = rdma_create_xprt(listen_xprt->sc_xprt.xpt_server, 0);
@@ -551,6 +527,15 @@ static void handle_connect_req(struct rdma_cm_id *new_cma_id)
551 dprintk("svcrdma: Creating newxprt=%p, cm_id=%p, listenxprt=%p\n", 527 dprintk("svcrdma: Creating newxprt=%p, cm_id=%p, listenxprt=%p\n",
552 newxprt, newxprt->sc_cm_id, listen_xprt); 528 newxprt, newxprt->sc_cm_id, listen_xprt);
553 529
530 /* Save client advertised inbound read limit for use later in accept. */
531 newxprt->sc_ord = client_ird;
532
533 /* Set the local and remote addresses in the transport */
534 sa = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.dst_addr;
535 svc_xprt_set_remote(&newxprt->sc_xprt, sa, svc_addr_len(sa));
536 sa = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.src_addr;
537 svc_xprt_set_local(&newxprt->sc_xprt, sa, svc_addr_len(sa));
538
554 /* 539 /*
555 * Enqueue the new transport on the accept queue of the listening 540 * Enqueue the new transport on the accept queue of the listening
556 * transport 541 * transport
@@ -581,7 +566,8 @@ static int rdma_listen_handler(struct rdma_cm_id *cma_id,
581 case RDMA_CM_EVENT_CONNECT_REQUEST: 566 case RDMA_CM_EVENT_CONNECT_REQUEST:
582 dprintk("svcrdma: Connect request on cma_id=%p, xprt = %p, " 567 dprintk("svcrdma: Connect request on cma_id=%p, xprt = %p, "
583 "event=%d\n", cma_id, cma_id->context, event->event); 568 "event=%d\n", cma_id, cma_id->context, event->event);
584 handle_connect_req(cma_id); 569 handle_connect_req(cma_id,
570 event->param.conn.responder_resources);
585 break; 571 break;
586 572
587 case RDMA_CM_EVENT_ESTABLISHED: 573 case RDMA_CM_EVENT_ESTABLISHED:
@@ -627,6 +613,7 @@ static int rdma_cma_handler(struct rdma_cm_id *cma_id,
627 if (xprt) { 613 if (xprt) {
628 set_bit(XPT_CLOSE, &xprt->xpt_flags); 614 set_bit(XPT_CLOSE, &xprt->xpt_flags);
629 svc_xprt_enqueue(xprt); 615 svc_xprt_enqueue(xprt);
616 svc_xprt_put(xprt);
630 } 617 }
631 break; 618 break;
632 case RDMA_CM_EVENT_DEVICE_REMOVAL: 619 case RDMA_CM_EVENT_DEVICE_REMOVAL:
@@ -661,31 +648,27 @@ static struct svc_xprt *svc_rdma_create(struct svc_serv *serv,
661 648
662 cma_xprt = rdma_create_xprt(serv, 1); 649 cma_xprt = rdma_create_xprt(serv, 1);
663 if (!cma_xprt) 650 if (!cma_xprt)
664 return ERR_PTR(ENOMEM); 651 return ERR_PTR(-ENOMEM);
665 xprt = &cma_xprt->sc_xprt; 652 xprt = &cma_xprt->sc_xprt;
666 653
667 listen_id = rdma_create_id(rdma_listen_handler, cma_xprt, RDMA_PS_TCP); 654 listen_id = rdma_create_id(rdma_listen_handler, cma_xprt, RDMA_PS_TCP);
668 if (IS_ERR(listen_id)) { 655 if (IS_ERR(listen_id)) {
669 svc_xprt_put(&cma_xprt->sc_xprt); 656 ret = PTR_ERR(listen_id);
670 dprintk("svcrdma: rdma_create_id failed = %ld\n", 657 dprintk("svcrdma: rdma_create_id failed = %d\n", ret);
671 PTR_ERR(listen_id)); 658 goto err0;
672 return (void *)listen_id;
673 } 659 }
660
674 ret = rdma_bind_addr(listen_id, sa); 661 ret = rdma_bind_addr(listen_id, sa);
675 if (ret) { 662 if (ret) {
676 rdma_destroy_id(listen_id);
677 svc_xprt_put(&cma_xprt->sc_xprt);
678 dprintk("svcrdma: rdma_bind_addr failed = %d\n", ret); 663 dprintk("svcrdma: rdma_bind_addr failed = %d\n", ret);
679 return ERR_PTR(ret); 664 goto err1;
680 } 665 }
681 cma_xprt->sc_cm_id = listen_id; 666 cma_xprt->sc_cm_id = listen_id;
682 667
683 ret = rdma_listen(listen_id, RPCRDMA_LISTEN_BACKLOG); 668 ret = rdma_listen(listen_id, RPCRDMA_LISTEN_BACKLOG);
684 if (ret) { 669 if (ret) {
685 rdma_destroy_id(listen_id);
686 svc_xprt_put(&cma_xprt->sc_xprt);
687 dprintk("svcrdma: rdma_listen failed = %d\n", ret); 670 dprintk("svcrdma: rdma_listen failed = %d\n", ret);
688 return ERR_PTR(ret); 671 goto err1;
689 } 672 }
690 673
691 /* 674 /*
@@ -696,6 +679,12 @@ static struct svc_xprt *svc_rdma_create(struct svc_serv *serv,
696 svc_xprt_set_local(&cma_xprt->sc_xprt, sa, salen); 679 svc_xprt_set_local(&cma_xprt->sc_xprt, sa, salen);
697 680
698 return &cma_xprt->sc_xprt; 681 return &cma_xprt->sc_xprt;
682
683 err1:
684 rdma_destroy_id(listen_id);
685 err0:
686 kfree(cma_xprt);
687 return ERR_PTR(ret);
699} 688}
700 689
701/* 690/*
@@ -716,7 +705,6 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
716 struct rdma_conn_param conn_param; 705 struct rdma_conn_param conn_param;
717 struct ib_qp_init_attr qp_attr; 706 struct ib_qp_init_attr qp_attr;
718 struct ib_device_attr devattr; 707 struct ib_device_attr devattr;
719 struct sockaddr *sa;
720 int ret; 708 int ret;
721 int i; 709 int i;
722 710
@@ -753,8 +741,12 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
753 (size_t)svcrdma_max_requests); 741 (size_t)svcrdma_max_requests);
754 newxprt->sc_sq_depth = RPCRDMA_SQ_DEPTH_MULT * newxprt->sc_max_requests; 742 newxprt->sc_sq_depth = RPCRDMA_SQ_DEPTH_MULT * newxprt->sc_max_requests;
755 743
756 newxprt->sc_ord = min((size_t)devattr.max_qp_rd_atom, 744 /*
757 (size_t)svcrdma_ord); 745 * Limit ORD based on client limit, local device limit, and
746 * configured svcrdma limit.
747 */
748 newxprt->sc_ord = min_t(size_t, devattr.max_qp_rd_atom, newxprt->sc_ord);
749 newxprt->sc_ord = min_t(size_t, svcrdma_ord, newxprt->sc_ord);
758 750
759 newxprt->sc_pd = ib_alloc_pd(newxprt->sc_cm_id->device); 751 newxprt->sc_pd = ib_alloc_pd(newxprt->sc_cm_id->device);
760 if (IS_ERR(newxprt->sc_pd)) { 752 if (IS_ERR(newxprt->sc_pd)) {
@@ -826,7 +818,6 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
826 newxprt->sc_sq_depth = qp_attr.cap.max_send_wr; 818 newxprt->sc_sq_depth = qp_attr.cap.max_send_wr;
827 newxprt->sc_max_requests = qp_attr.cap.max_recv_wr; 819 newxprt->sc_max_requests = qp_attr.cap.max_recv_wr;
828 } 820 }
829 svc_xprt_get(&newxprt->sc_xprt);
830 newxprt->sc_qp = newxprt->sc_cm_id->qp; 821 newxprt->sc_qp = newxprt->sc_cm_id->qp;
831 822
832 /* Register all of physical memory */ 823 /* Register all of physical memory */
@@ -850,6 +841,13 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
850 /* Swap out the handler */ 841 /* Swap out the handler */
851 newxprt->sc_cm_id->event_handler = rdma_cma_handler; 842 newxprt->sc_cm_id->event_handler = rdma_cma_handler;
852 843
844 /*
845 * Arm the CQs for the SQ and RQ before accepting so we can't
846 * miss the first message
847 */
848 ib_req_notify_cq(newxprt->sc_sq_cq, IB_CQ_NEXT_COMP);
849 ib_req_notify_cq(newxprt->sc_rq_cq, IB_CQ_NEXT_COMP);
850
853 /* Accept Connection */ 851 /* Accept Connection */
854 set_bit(RDMAXPRT_CONN_PENDING, &newxprt->sc_flags); 852 set_bit(RDMAXPRT_CONN_PENDING, &newxprt->sc_flags);
855 memset(&conn_param, 0, sizeof conn_param); 853 memset(&conn_param, 0, sizeof conn_param);
@@ -886,58 +884,26 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
886 newxprt->sc_max_requests, 884 newxprt->sc_max_requests,
887 newxprt->sc_ord); 885 newxprt->sc_ord);
888 886
889 /* Set the local and remote addresses in the transport */
890 sa = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.dst_addr;
891 svc_xprt_set_remote(&newxprt->sc_xprt, sa, svc_addr_len(sa));
892 sa = (struct sockaddr *)&newxprt->sc_cm_id->route.addr.src_addr;
893 svc_xprt_set_local(&newxprt->sc_xprt, sa, svc_addr_len(sa));
894
895 ib_req_notify_cq(newxprt->sc_sq_cq, IB_CQ_NEXT_COMP);
896 ib_req_notify_cq(newxprt->sc_rq_cq, IB_CQ_NEXT_COMP);
897 return &newxprt->sc_xprt; 887 return &newxprt->sc_xprt;
898 888
899 errout: 889 errout:
900 dprintk("svcrdma: failure accepting new connection rc=%d.\n", ret); 890 dprintk("svcrdma: failure accepting new connection rc=%d.\n", ret);
901 /* Take a reference in case the DTO handler runs */ 891 /* Take a reference in case the DTO handler runs */
902 svc_xprt_get(&newxprt->sc_xprt); 892 svc_xprt_get(&newxprt->sc_xprt);
903 if (newxprt->sc_qp && !IS_ERR(newxprt->sc_qp)) { 893 if (newxprt->sc_qp && !IS_ERR(newxprt->sc_qp))
904 ib_destroy_qp(newxprt->sc_qp); 894 ib_destroy_qp(newxprt->sc_qp);
905 svc_xprt_put(&newxprt->sc_xprt);
906 }
907 rdma_destroy_id(newxprt->sc_cm_id); 895 rdma_destroy_id(newxprt->sc_cm_id);
908 /* This call to put will destroy the transport */ 896 /* This call to put will destroy the transport */
909 svc_xprt_put(&newxprt->sc_xprt); 897 svc_xprt_put(&newxprt->sc_xprt);
910 return NULL; 898 return NULL;
911} 899}
912 900
913/*
914 * Post an RQ WQE to the RQ when the rqst is being released. This
915 * effectively returns an RQ credit to the client. The rq_xprt_ctxt
916 * will be null if the request is deferred due to an RDMA_READ or the
917 * transport had no data ready (EAGAIN). Note that an RPC deferred in
918 * svc_process will still return the credit, this is because the data
919 * is copied and no longer consume a WQE/WC.
920 */
921static void svc_rdma_release_rqst(struct svc_rqst *rqstp) 901static void svc_rdma_release_rqst(struct svc_rqst *rqstp)
922{ 902{
923 int err;
924 struct svcxprt_rdma *rdma =
925 container_of(rqstp->rq_xprt, struct svcxprt_rdma, sc_xprt);
926 if (rqstp->rq_xprt_ctxt) {
927 BUG_ON(rqstp->rq_xprt_ctxt != rdma);
928 err = svc_rdma_post_recv(rdma);
929 if (err)
930 dprintk("svcrdma: failed to post an RQ WQE error=%d\n",
931 err);
932 }
933 rqstp->rq_xprt_ctxt = NULL;
934} 903}
935 904
936/* 905/*
937 * When connected, an svc_xprt has at least three references: 906 * When connected, an svc_xprt has at least two references:
938 *
939 * - A reference held by the QP. We still hold that here because this
940 * code deletes the QP and puts the reference.
941 * 907 *
942 * - A reference held by the cm_id between the ESTABLISHED and 908 * - A reference held by the cm_id between the ESTABLISHED and
943 * DISCONNECTED events. If the remote peer disconnected first, this 909 * DISCONNECTED events. If the remote peer disconnected first, this
@@ -946,7 +912,7 @@ static void svc_rdma_release_rqst(struct svc_rqst *rqstp)
946 * - A reference held by the svc_recv code that called this function 912 * - A reference held by the svc_recv code that called this function
947 * as part of close processing. 913 * as part of close processing.
948 * 914 *
949 * At a minimum two references should still be held. 915 * At a minimum one references should still be held.
950 */ 916 */
951static void svc_rdma_detach(struct svc_xprt *xprt) 917static void svc_rdma_detach(struct svc_xprt *xprt)
952{ 918{
@@ -956,23 +922,50 @@ static void svc_rdma_detach(struct svc_xprt *xprt)
956 922
957 /* Disconnect and flush posted WQE */ 923 /* Disconnect and flush posted WQE */
958 rdma_disconnect(rdma->sc_cm_id); 924 rdma_disconnect(rdma->sc_cm_id);
959
960 /* Destroy the QP if present (not a listener) */
961 if (rdma->sc_qp && !IS_ERR(rdma->sc_qp)) {
962 ib_destroy_qp(rdma->sc_qp);
963 svc_xprt_put(xprt);
964 }
965
966 /* Destroy the CM ID */
967 rdma_destroy_id(rdma->sc_cm_id);
968} 925}
969 926
970static void svc_rdma_free(struct svc_xprt *xprt) 927static void __svc_rdma_free(struct work_struct *work)
971{ 928{
972 struct svcxprt_rdma *rdma = (struct svcxprt_rdma *)xprt; 929 struct svcxprt_rdma *rdma =
930 container_of(work, struct svcxprt_rdma, sc_work);
973 dprintk("svcrdma: svc_rdma_free(%p)\n", rdma); 931 dprintk("svcrdma: svc_rdma_free(%p)\n", rdma);
932
974 /* We should only be called from kref_put */ 933 /* We should only be called from kref_put */
975 BUG_ON(atomic_read(&xprt->xpt_ref.refcount) != 0); 934 BUG_ON(atomic_read(&rdma->sc_xprt.xpt_ref.refcount) != 0);
935
936 /*
937 * Destroy queued, but not processed read completions. Note
938 * that this cleanup has to be done before destroying the
939 * cm_id because the device ptr is needed to unmap the dma in
940 * svc_rdma_put_context.
941 */
942 while (!list_empty(&rdma->sc_read_complete_q)) {
943 struct svc_rdma_op_ctxt *ctxt;
944 ctxt = list_entry(rdma->sc_read_complete_q.next,
945 struct svc_rdma_op_ctxt,
946 dto_q);
947 list_del_init(&ctxt->dto_q);
948 svc_rdma_put_context(ctxt, 1);
949 }
950
951 /* Destroy queued, but not processed recv completions */
952 while (!list_empty(&rdma->sc_rq_dto_q)) {
953 struct svc_rdma_op_ctxt *ctxt;
954 ctxt = list_entry(rdma->sc_rq_dto_q.next,
955 struct svc_rdma_op_ctxt,
956 dto_q);
957 list_del_init(&ctxt->dto_q);
958 svc_rdma_put_context(ctxt, 1);
959 }
960
961 /* Warn if we leaked a resource or under-referenced */
962 WARN_ON(atomic_read(&rdma->sc_ctxt_used) != 0);
963 WARN_ON(atomic_read(&rdma->sc_dma_used) != 0);
964
965 /* Destroy the QP if present (not a listener) */
966 if (rdma->sc_qp && !IS_ERR(rdma->sc_qp))
967 ib_destroy_qp(rdma->sc_qp);
968
976 if (rdma->sc_sq_cq && !IS_ERR(rdma->sc_sq_cq)) 969 if (rdma->sc_sq_cq && !IS_ERR(rdma->sc_sq_cq))
977 ib_destroy_cq(rdma->sc_sq_cq); 970 ib_destroy_cq(rdma->sc_sq_cq);
978 971
@@ -985,10 +978,20 @@ static void svc_rdma_free(struct svc_xprt *xprt)
985 if (rdma->sc_pd && !IS_ERR(rdma->sc_pd)) 978 if (rdma->sc_pd && !IS_ERR(rdma->sc_pd))
986 ib_dealloc_pd(rdma->sc_pd); 979 ib_dealloc_pd(rdma->sc_pd);
987 980
988 destroy_context_cache(rdma->sc_ctxt_head); 981 /* Destroy the CM ID */
982 rdma_destroy_id(rdma->sc_cm_id);
983
989 kfree(rdma); 984 kfree(rdma);
990} 985}
991 986
987static void svc_rdma_free(struct svc_xprt *xprt)
988{
989 struct svcxprt_rdma *rdma =
990 container_of(xprt, struct svcxprt_rdma, sc_xprt);
991 INIT_WORK(&rdma->sc_work, __svc_rdma_free);
992 schedule_work(&rdma->sc_work);
993}
994
992static int svc_rdma_has_wspace(struct svc_xprt *xprt) 995static int svc_rdma_has_wspace(struct svc_xprt *xprt)
993{ 996{
994 struct svcxprt_rdma *rdma = 997 struct svcxprt_rdma *rdma =
@@ -1018,7 +1021,7 @@ int svc_rdma_send(struct svcxprt_rdma *xprt, struct ib_send_wr *wr)
1018 int ret; 1021 int ret;
1019 1022
1020 if (test_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags)) 1023 if (test_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags))
1021 return 0; 1024 return -ENOTCONN;
1022 1025
1023 BUG_ON(wr->send_flags != IB_SEND_SIGNALED); 1026 BUG_ON(wr->send_flags != IB_SEND_SIGNALED);
1024 BUG_ON(((struct svc_rdma_op_ctxt *)(unsigned long)wr->wr_id)->wr_op != 1027 BUG_ON(((struct svc_rdma_op_ctxt *)(unsigned long)wr->wr_id)->wr_op !=
@@ -1029,7 +1032,8 @@ int svc_rdma_send(struct svcxprt_rdma *xprt, struct ib_send_wr *wr)
1029 if (xprt->sc_sq_depth == atomic_read(&xprt->sc_sq_count)) { 1032 if (xprt->sc_sq_depth == atomic_read(&xprt->sc_sq_count)) {
1030 spin_unlock_bh(&xprt->sc_lock); 1033 spin_unlock_bh(&xprt->sc_lock);
1031 atomic_inc(&rdma_stat_sq_starve); 1034 atomic_inc(&rdma_stat_sq_starve);
1032 /* See if we can reap some SQ WR */ 1035
1036 /* See if we can opportunistically reap SQ WR to make room */
1033 sq_cq_reap(xprt); 1037 sq_cq_reap(xprt);
1034 1038
1035 /* Wait until SQ WR available if SQ still full */ 1039 /* Wait until SQ WR available if SQ still full */
@@ -1041,22 +1045,25 @@ int svc_rdma_send(struct svcxprt_rdma *xprt, struct ib_send_wr *wr)
1041 continue; 1045 continue;
1042 } 1046 }
1043 /* Bumped used SQ WR count and post */ 1047 /* Bumped used SQ WR count and post */
1048 svc_xprt_get(&xprt->sc_xprt);
1044 ret = ib_post_send(xprt->sc_qp, wr, &bad_wr); 1049 ret = ib_post_send(xprt->sc_qp, wr, &bad_wr);
1045 if (!ret) 1050 if (!ret)
1046 atomic_inc(&xprt->sc_sq_count); 1051 atomic_inc(&xprt->sc_sq_count);
1047 else 1052 else {
1053 svc_xprt_put(&xprt->sc_xprt);
1048 dprintk("svcrdma: failed to post SQ WR rc=%d, " 1054 dprintk("svcrdma: failed to post SQ WR rc=%d, "
1049 "sc_sq_count=%d, sc_sq_depth=%d\n", 1055 "sc_sq_count=%d, sc_sq_depth=%d\n",
1050 ret, atomic_read(&xprt->sc_sq_count), 1056 ret, atomic_read(&xprt->sc_sq_count),
1051 xprt->sc_sq_depth); 1057 xprt->sc_sq_depth);
1058 }
1052 spin_unlock_bh(&xprt->sc_lock); 1059 spin_unlock_bh(&xprt->sc_lock);
1053 break; 1060 break;
1054 } 1061 }
1055 return ret; 1062 return ret;
1056} 1063}
1057 1064
1058int svc_rdma_send_error(struct svcxprt_rdma *xprt, struct rpcrdma_msg *rmsgp, 1065void svc_rdma_send_error(struct svcxprt_rdma *xprt, struct rpcrdma_msg *rmsgp,
1059 enum rpcrdma_errcode err) 1066 enum rpcrdma_errcode err)
1060{ 1067{
1061 struct ib_send_wr err_wr; 1068 struct ib_send_wr err_wr;
1062 struct ib_sge sge; 1069 struct ib_sge sge;
@@ -1073,6 +1080,7 @@ int svc_rdma_send_error(struct svcxprt_rdma *xprt, struct rpcrdma_msg *rmsgp,
1073 length = svc_rdma_xdr_encode_error(xprt, rmsgp, err, va); 1080 length = svc_rdma_xdr_encode_error(xprt, rmsgp, err, va);
1074 1081
1075 /* Prepare SGE for local address */ 1082 /* Prepare SGE for local address */
1083 atomic_inc(&xprt->sc_dma_used);
1076 sge.addr = ib_dma_map_page(xprt->sc_cm_id->device, 1084 sge.addr = ib_dma_map_page(xprt->sc_cm_id->device,
1077 p, 0, PAGE_SIZE, DMA_FROM_DEVICE); 1085 p, 0, PAGE_SIZE, DMA_FROM_DEVICE);
1078 sge.lkey = xprt->sc_phys_mr->lkey; 1086 sge.lkey = xprt->sc_phys_mr->lkey;
@@ -1094,9 +1102,8 @@ int svc_rdma_send_error(struct svcxprt_rdma *xprt, struct rpcrdma_msg *rmsgp,
1094 /* Post It */ 1102 /* Post It */
1095 ret = svc_rdma_send(xprt, &err_wr); 1103 ret = svc_rdma_send(xprt, &err_wr);
1096 if (ret) { 1104 if (ret) {
1097 dprintk("svcrdma: Error posting send = %d\n", ret); 1105 dprintk("svcrdma: Error %d posting send for protocol error\n",
1106 ret);
1098 svc_rdma_put_context(ctxt, 1); 1107 svc_rdma_put_context(ctxt, 1);
1099 } 1108 }
1100
1101 return ret;
1102} 1109}