aboutsummaryrefslogtreecommitdiffstats
path: root/net/sunrpc/xprtrdma
diff options
context:
space:
mode:
authorSteve Wise <swise@opengridcomputing.com>2014-05-28 16:12:01 -0400
committerJ. Bruce Fields <bfields@redhat.com>2014-06-06 19:22:50 -0400
commit0bf4828983dff062cd502f27ab8644b32774e72e (patch)
tree80006e8385b5aeb75b7d0af80160800b9f16b946 /net/sunrpc/xprtrdma
parent1b19453d1c6abcfa7c312ba6c9f11a277568fc94 (diff)
svcrdma: refactor marshalling logic
This patch refactors the NFSRDMA server marshalling logic to remove the intermediary map structures. It also fixes an existing bug where the NFSRDMA server was not minding the device fast register page list length limitations. Signed-off-by: Tom Tucker <tom@opengridcomputing.com> Signed-off-by: Steve Wise <swise@opengridcomputing.com>
Diffstat (limited to 'net/sunrpc/xprtrdma')
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_recvfrom.c643
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_sendto.c230
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_transport.c62
3 files changed, 331 insertions, 604 deletions
diff --git a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
index 8d904e4eef15..52d9f2ce20b0 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_recvfrom.c
@@ -1,4 +1,5 @@
1/* 1/*
2 * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved.
2 * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved. 3 * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved.
3 * 4 *
4 * This software is available to you under a choice of one of two 5 * This software is available to you under a choice of one of two
@@ -69,7 +70,8 @@ static void rdma_build_arg_xdr(struct svc_rqst *rqstp,
69 70
70 /* Set up the XDR head */ 71 /* Set up the XDR head */
71 rqstp->rq_arg.head[0].iov_base = page_address(page); 72 rqstp->rq_arg.head[0].iov_base = page_address(page);
72 rqstp->rq_arg.head[0].iov_len = min(byte_count, ctxt->sge[0].length); 73 rqstp->rq_arg.head[0].iov_len =
74 min_t(size_t, byte_count, ctxt->sge[0].length);
73 rqstp->rq_arg.len = byte_count; 75 rqstp->rq_arg.len = byte_count;
74 rqstp->rq_arg.buflen = byte_count; 76 rqstp->rq_arg.buflen = byte_count;
75 77
@@ -85,7 +87,7 @@ static void rdma_build_arg_xdr(struct svc_rqst *rqstp,
85 page = ctxt->pages[sge_no]; 87 page = ctxt->pages[sge_no];
86 put_page(rqstp->rq_pages[sge_no]); 88 put_page(rqstp->rq_pages[sge_no]);
87 rqstp->rq_pages[sge_no] = page; 89 rqstp->rq_pages[sge_no] = page;
88 bc -= min(bc, ctxt->sge[sge_no].length); 90 bc -= min_t(u32, bc, ctxt->sge[sge_no].length);
89 rqstp->rq_arg.buflen += ctxt->sge[sge_no].length; 91 rqstp->rq_arg.buflen += ctxt->sge[sge_no].length;
90 sge_no++; 92 sge_no++;
91 } 93 }
@@ -113,291 +115,265 @@ static void rdma_build_arg_xdr(struct svc_rqst *rqstp,
113 rqstp->rq_arg.tail[0].iov_len = 0; 115 rqstp->rq_arg.tail[0].iov_len = 0;
114} 116}
115 117
116/* Encode a read-chunk-list as an array of IB SGE 118static int rdma_read_max_sge(struct svcxprt_rdma *xprt, int sge_count)
117 *
118 * Assumptions:
119 * - chunk[0]->position points to pages[0] at an offset of 0
120 * - pages[] is not physically or virtually contiguous and consists of
121 * PAGE_SIZE elements.
122 *
123 * Output:
124 * - sge array pointing into pages[] array.
125 * - chunk_sge array specifying sge index and count for each
126 * chunk in the read list
127 *
128 */
129static int map_read_chunks(struct svcxprt_rdma *xprt,
130 struct svc_rqst *rqstp,
131 struct svc_rdma_op_ctxt *head,
132 struct rpcrdma_msg *rmsgp,
133 struct svc_rdma_req_map *rpl_map,
134 struct svc_rdma_req_map *chl_map,
135 int ch_count,
136 int byte_count)
137{ 119{
138 int sge_no; 120 if (rdma_node_get_transport(xprt->sc_cm_id->device->node_type) ==
139 int sge_bytes; 121 RDMA_TRANSPORT_IWARP)
140 int page_off; 122 return 1;
141 int page_no; 123 else
142 int ch_bytes; 124 return min_t(int, sge_count, xprt->sc_max_sge);
143 int ch_no; 125}
144 struct rpcrdma_read_chunk *ch;
145 126
146 sge_no = 0; 127typedef int (*rdma_reader_fn)(struct svcxprt_rdma *xprt,
147 page_no = 0; 128 struct svc_rqst *rqstp,
148 page_off = 0; 129 struct svc_rdma_op_ctxt *head,
149 ch = (struct rpcrdma_read_chunk *)&rmsgp->rm_body.rm_chunks[0]; 130 int *page_no,
150 ch_no = 0; 131 u32 *page_offset,
151 ch_bytes = ntohl(ch->rc_target.rs_length); 132 u32 rs_handle,
152 head->arg.head[0] = rqstp->rq_arg.head[0]; 133 u32 rs_length,
153 head->arg.tail[0] = rqstp->rq_arg.tail[0]; 134 u64 rs_offset,
154 head->arg.pages = &head->pages[head->count]; 135 int last);
155 head->hdr_count = head->count; /* save count of hdr pages */ 136
156 head->arg.page_base = 0; 137/* Issue an RDMA_READ using the local lkey to map the data sink */
157 head->arg.page_len = ch_bytes; 138static int rdma_read_chunk_lcl(struct svcxprt_rdma *xprt,
158 head->arg.len = rqstp->rq_arg.len + ch_bytes; 139 struct svc_rqst *rqstp,
159 head->arg.buflen = rqstp->rq_arg.buflen + ch_bytes; 140 struct svc_rdma_op_ctxt *head,
160 head->count++; 141 int *page_no,
161 chl_map->ch[0].start = 0; 142 u32 *page_offset,
162 while (byte_count) { 143 u32 rs_handle,
163 rpl_map->sge[sge_no].iov_base = 144 u32 rs_length,
164 page_address(rqstp->rq_arg.pages[page_no]) + page_off; 145 u64 rs_offset,
165 sge_bytes = min_t(int, PAGE_SIZE-page_off, ch_bytes); 146 int last)
166 rpl_map->sge[sge_no].iov_len = sge_bytes; 147{
167 /* 148 struct ib_send_wr read_wr;
168 * Don't bump head->count here because the same page 149 int pages_needed = PAGE_ALIGN(*page_offset + rs_length) >> PAGE_SHIFT;
169 * may be used by multiple SGE. 150 struct svc_rdma_op_ctxt *ctxt = svc_rdma_get_context(xprt);
170 */ 151 int ret, read, pno;
171 head->arg.pages[page_no] = rqstp->rq_arg.pages[page_no]; 152 u32 pg_off = *page_offset;
172 rqstp->rq_respages = &rqstp->rq_arg.pages[page_no+1]; 153 u32 pg_no = *page_no;
154
155 ctxt->direction = DMA_FROM_DEVICE;
156 ctxt->read_hdr = head;
157 pages_needed =
158 min_t(int, pages_needed, rdma_read_max_sge(xprt, pages_needed));
159 read = min_t(int, pages_needed << PAGE_SHIFT, rs_length);
160
161 for (pno = 0; pno < pages_needed; pno++) {
162 int len = min_t(int, rs_length, PAGE_SIZE - pg_off);
163
164 head->arg.pages[pg_no] = rqstp->rq_arg.pages[pg_no];
165 head->arg.page_len += len;
166 head->arg.len += len;
167 if (!pg_off)
168 head->count++;
169 rqstp->rq_respages = &rqstp->rq_arg.pages[pg_no+1];
173 rqstp->rq_next_page = rqstp->rq_respages + 1; 170 rqstp->rq_next_page = rqstp->rq_respages + 1;
171 ctxt->sge[pno].addr =
172 ib_dma_map_page(xprt->sc_cm_id->device,
173 head->arg.pages[pg_no], pg_off,
174 PAGE_SIZE - pg_off,
175 DMA_FROM_DEVICE);
176 ret = ib_dma_mapping_error(xprt->sc_cm_id->device,
177 ctxt->sge[pno].addr);
178 if (ret)
179 goto err;
180 atomic_inc(&xprt->sc_dma_used);
174 181
175 byte_count -= sge_bytes; 182 /* The lkey here is either a local dma lkey or a dma_mr lkey */
176 ch_bytes -= sge_bytes; 183 ctxt->sge[pno].lkey = xprt->sc_dma_lkey;
177 sge_no++; 184 ctxt->sge[pno].length = len;
178 /* 185 ctxt->count++;
179 * If all bytes for this chunk have been mapped to an 186
180 * SGE, move to the next SGE 187 /* adjust offset and wrap to next page if needed */
181 */ 188 pg_off += len;
182 if (ch_bytes == 0) { 189 if (pg_off == PAGE_SIZE) {
183 chl_map->ch[ch_no].count = 190 pg_off = 0;
184 sge_no - chl_map->ch[ch_no].start; 191 pg_no++;
185 ch_no++;
186 ch++;
187 chl_map->ch[ch_no].start = sge_no;
188 ch_bytes = ntohl(ch->rc_target.rs_length);
189 /* If bytes remaining account for next chunk */
190 if (byte_count) {
191 head->arg.page_len += ch_bytes;
192 head->arg.len += ch_bytes;
193 head->arg.buflen += ch_bytes;
194 }
195 } 192 }
196 /* 193 rs_length -= len;
197 * If this SGE consumed all of the page, move to the
198 * next page
199 */
200 if ((sge_bytes + page_off) == PAGE_SIZE) {
201 page_no++;
202 page_off = 0;
203 /*
204 * If there are still bytes left to map, bump
205 * the page count
206 */
207 if (byte_count)
208 head->count++;
209 } else
210 page_off += sge_bytes;
211 } 194 }
212 BUG_ON(byte_count != 0); 195
213 return sge_no; 196 if (last && rs_length == 0)
197 set_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
198 else
199 clear_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
200
201 memset(&read_wr, 0, sizeof(read_wr));
202 read_wr.wr_id = (unsigned long)ctxt;
203 read_wr.opcode = IB_WR_RDMA_READ;
204 ctxt->wr_op = read_wr.opcode;
205 read_wr.send_flags = IB_SEND_SIGNALED;
206 read_wr.wr.rdma.rkey = rs_handle;
207 read_wr.wr.rdma.remote_addr = rs_offset;
208 read_wr.sg_list = ctxt->sge;
209 read_wr.num_sge = pages_needed;
210
211 ret = svc_rdma_send(xprt, &read_wr);
212 if (ret) {
213 pr_err("svcrdma: Error %d posting RDMA_READ\n", ret);
214 set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
215 goto err;
216 }
217
218 /* return current location in page array */
219 *page_no = pg_no;
220 *page_offset = pg_off;
221 ret = read;
222 atomic_inc(&rdma_stat_read);
223 return ret;
224 err:
225 svc_rdma_unmap_dma(ctxt);
226 svc_rdma_put_context(ctxt, 0);
227 return ret;
214} 228}
215 229
216/* Map a read-chunk-list to an XDR and fast register the page-list. 230/* Issue an RDMA_READ using an FRMR to map the data sink */
217 * 231static int rdma_read_chunk_frmr(struct svcxprt_rdma *xprt,
218 * Assumptions:
219 * - chunk[0] position points to pages[0] at an offset of 0
220 * - pages[] will be made physically contiguous by creating a one-off memory
221 * region using the fastreg verb.
222 * - byte_count is # of bytes in read-chunk-list
223 * - ch_count is # of chunks in read-chunk-list
224 *
225 * Output:
226 * - sge array pointing into pages[] array.
227 * - chunk_sge array specifying sge index and count for each
228 * chunk in the read list
229 */
230static int fast_reg_read_chunks(struct svcxprt_rdma *xprt,
231 struct svc_rqst *rqstp, 232 struct svc_rqst *rqstp,
232 struct svc_rdma_op_ctxt *head, 233 struct svc_rdma_op_ctxt *head,
233 struct rpcrdma_msg *rmsgp, 234 int *page_no,
234 struct svc_rdma_req_map *rpl_map, 235 u32 *page_offset,
235 struct svc_rdma_req_map *chl_map, 236 u32 rs_handle,
236 int ch_count, 237 u32 rs_length,
237 int byte_count) 238 u64 rs_offset,
239 int last)
238{ 240{
239 int page_no; 241 struct ib_send_wr read_wr;
240 int ch_no; 242 struct ib_send_wr inv_wr;
241 u32 offset; 243 struct ib_send_wr fastreg_wr;
242 struct rpcrdma_read_chunk *ch; 244 u8 key;
243 struct svc_rdma_fastreg_mr *frmr; 245 int pages_needed = PAGE_ALIGN(*page_offset + rs_length) >> PAGE_SHIFT;
244 int ret = 0; 246 struct svc_rdma_op_ctxt *ctxt = svc_rdma_get_context(xprt);
247 struct svc_rdma_fastreg_mr *frmr = svc_rdma_get_frmr(xprt);
248 int ret, read, pno;
249 u32 pg_off = *page_offset;
250 u32 pg_no = *page_no;
245 251
246 frmr = svc_rdma_get_frmr(xprt);
247 if (IS_ERR(frmr)) 252 if (IS_ERR(frmr))
248 return -ENOMEM; 253 return -ENOMEM;
249 254
250 head->frmr = frmr; 255 ctxt->direction = DMA_FROM_DEVICE;
251 head->arg.head[0] = rqstp->rq_arg.head[0]; 256 ctxt->frmr = frmr;
252 head->arg.tail[0] = rqstp->rq_arg.tail[0]; 257 pages_needed = min_t(int, pages_needed, xprt->sc_frmr_pg_list_len);
253 head->arg.pages = &head->pages[head->count]; 258 read = min_t(int, pages_needed << PAGE_SHIFT, rs_length);
254 head->hdr_count = head->count; /* save count of hdr pages */
255 head->arg.page_base = 0;
256 head->arg.page_len = byte_count;
257 head->arg.len = rqstp->rq_arg.len + byte_count;
258 head->arg.buflen = rqstp->rq_arg.buflen + byte_count;
259 259
260 /* Fast register the page list */ 260 frmr->kva = page_address(rqstp->rq_arg.pages[pg_no]);
261 frmr->kva = page_address(rqstp->rq_arg.pages[0]);
262 frmr->direction = DMA_FROM_DEVICE; 261 frmr->direction = DMA_FROM_DEVICE;
263 frmr->access_flags = (IB_ACCESS_LOCAL_WRITE|IB_ACCESS_REMOTE_WRITE); 262 frmr->access_flags = (IB_ACCESS_LOCAL_WRITE|IB_ACCESS_REMOTE_WRITE);
264 frmr->map_len = byte_count; 263 frmr->map_len = pages_needed << PAGE_SHIFT;
265 frmr->page_list_len = PAGE_ALIGN(byte_count) >> PAGE_SHIFT; 264 frmr->page_list_len = pages_needed;
266 for (page_no = 0; page_no < frmr->page_list_len; page_no++) { 265
267 frmr->page_list->page_list[page_no] = 266 for (pno = 0; pno < pages_needed; pno++) {
267 int len = min_t(int, rs_length, PAGE_SIZE - pg_off);
268
269 head->arg.pages[pg_no] = rqstp->rq_arg.pages[pg_no];
270 head->arg.page_len += len;
271 head->arg.len += len;
272 if (!pg_off)
273 head->count++;
274 rqstp->rq_respages = &rqstp->rq_arg.pages[pg_no+1];
275 rqstp->rq_next_page = rqstp->rq_respages + 1;
276 frmr->page_list->page_list[pno] =
268 ib_dma_map_page(xprt->sc_cm_id->device, 277 ib_dma_map_page(xprt->sc_cm_id->device,
269 rqstp->rq_arg.pages[page_no], 0, 278 head->arg.pages[pg_no], 0,
270 PAGE_SIZE, DMA_FROM_DEVICE); 279 PAGE_SIZE, DMA_FROM_DEVICE);
271 if (ib_dma_mapping_error(xprt->sc_cm_id->device, 280 ret = ib_dma_mapping_error(xprt->sc_cm_id->device,
272 frmr->page_list->page_list[page_no])) 281 frmr->page_list->page_list[pno]);
273 goto fatal_err; 282 if (ret)
283 goto err;
274 atomic_inc(&xprt->sc_dma_used); 284 atomic_inc(&xprt->sc_dma_used);
275 head->arg.pages[page_no] = rqstp->rq_arg.pages[page_no];
276 }
277 head->count += page_no;
278
279 /* rq_respages points one past arg pages */
280 rqstp->rq_respages = &rqstp->rq_arg.pages[page_no];
281 rqstp->rq_next_page = rqstp->rq_respages + 1;
282 285
283 /* Create the reply and chunk maps */ 286 /* adjust offset and wrap to next page if needed */
284 offset = 0; 287 pg_off += len;
285 ch = (struct rpcrdma_read_chunk *)&rmsgp->rm_body.rm_chunks[0]; 288 if (pg_off == PAGE_SIZE) {
286 for (ch_no = 0; ch_no < ch_count; ch_no++) { 289 pg_off = 0;
287 int len = ntohl(ch->rc_target.rs_length); 290 pg_no++;
288 rpl_map->sge[ch_no].iov_base = frmr->kva + offset; 291 }
289 rpl_map->sge[ch_no].iov_len = len; 292 rs_length -= len;
290 chl_map->ch[ch_no].count = 1;
291 chl_map->ch[ch_no].start = ch_no;
292 offset += len;
293 ch++;
294 } 293 }
295 294
296 ret = svc_rdma_fastreg(xprt, frmr); 295 if (last && rs_length == 0)
297 if (ret) 296 set_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
298 goto fatal_err; 297 else
299 298 clear_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
300 return ch_no;
301
302 fatal_err:
303 printk("svcrdma: error fast registering xdr for xprt %p", xprt);
304 svc_rdma_put_frmr(xprt, frmr);
305 return -EIO;
306}
307
308static int rdma_set_ctxt_sge(struct svcxprt_rdma *xprt,
309 struct svc_rdma_op_ctxt *ctxt,
310 struct svc_rdma_fastreg_mr *frmr,
311 struct kvec *vec,
312 u64 *sgl_offset,
313 int count)
314{
315 int i;
316 unsigned long off;
317 299
318 ctxt->count = count; 300 /* Bump the key */
319 ctxt->direction = DMA_FROM_DEVICE; 301 key = (u8)(frmr->mr->lkey & 0x000000FF);
320 for (i = 0; i < count; i++) { 302 ib_update_fast_reg_key(frmr->mr, ++key);
321 ctxt->sge[i].length = 0; /* in case map fails */ 303
322 if (!frmr) { 304 ctxt->sge[0].addr = (unsigned long)frmr->kva + *page_offset;
323 BUG_ON(!virt_to_page(vec[i].iov_base)); 305 ctxt->sge[0].lkey = frmr->mr->lkey;
324 off = (unsigned long)vec[i].iov_base & ~PAGE_MASK; 306 ctxt->sge[0].length = read;
325 ctxt->sge[i].addr = 307 ctxt->count = 1;
326 ib_dma_map_page(xprt->sc_cm_id->device, 308 ctxt->read_hdr = head;
327 virt_to_page(vec[i].iov_base), 309
328 off, 310 /* Prepare FASTREG WR */
329 vec[i].iov_len, 311 memset(&fastreg_wr, 0, sizeof(fastreg_wr));
330 DMA_FROM_DEVICE); 312 fastreg_wr.opcode = IB_WR_FAST_REG_MR;
331 if (ib_dma_mapping_error(xprt->sc_cm_id->device, 313 fastreg_wr.send_flags = IB_SEND_SIGNALED;
332 ctxt->sge[i].addr)) 314 fastreg_wr.wr.fast_reg.iova_start = (unsigned long)frmr->kva;
333 return -EINVAL; 315 fastreg_wr.wr.fast_reg.page_list = frmr->page_list;
334 ctxt->sge[i].lkey = xprt->sc_dma_lkey; 316 fastreg_wr.wr.fast_reg.page_list_len = frmr->page_list_len;
335 atomic_inc(&xprt->sc_dma_used); 317 fastreg_wr.wr.fast_reg.page_shift = PAGE_SHIFT;
336 } else { 318 fastreg_wr.wr.fast_reg.length = frmr->map_len;
337 ctxt->sge[i].addr = (unsigned long)vec[i].iov_base; 319 fastreg_wr.wr.fast_reg.access_flags = frmr->access_flags;
338 ctxt->sge[i].lkey = frmr->mr->lkey; 320 fastreg_wr.wr.fast_reg.rkey = frmr->mr->lkey;
339 } 321 fastreg_wr.next = &read_wr;
340 ctxt->sge[i].length = vec[i].iov_len; 322
341 *sgl_offset = *sgl_offset + vec[i].iov_len; 323 /* Prepare RDMA_READ */
324 memset(&read_wr, 0, sizeof(read_wr));
325 read_wr.send_flags = IB_SEND_SIGNALED;
326 read_wr.wr.rdma.rkey = rs_handle;
327 read_wr.wr.rdma.remote_addr = rs_offset;
328 read_wr.sg_list = ctxt->sge;
329 read_wr.num_sge = 1;
330 if (xprt->sc_dev_caps & SVCRDMA_DEVCAP_READ_W_INV) {
331 read_wr.opcode = IB_WR_RDMA_READ_WITH_INV;
332 read_wr.wr_id = (unsigned long)ctxt;
333 read_wr.ex.invalidate_rkey = ctxt->frmr->mr->lkey;
334 } else {
335 read_wr.opcode = IB_WR_RDMA_READ;
336 read_wr.next = &inv_wr;
337 /* Prepare invalidate */
338 memset(&inv_wr, 0, sizeof(inv_wr));
339 inv_wr.wr_id = (unsigned long)ctxt;
340 inv_wr.opcode = IB_WR_LOCAL_INV;
341 inv_wr.send_flags = IB_SEND_SIGNALED;
342 inv_wr.ex.invalidate_rkey = frmr->mr->lkey;
343 }
344 ctxt->wr_op = read_wr.opcode;
345
346 /* Post the chain */
347 ret = svc_rdma_send(xprt, &fastreg_wr);
348 if (ret) {
349 pr_err("svcrdma: Error %d posting RDMA_READ\n", ret);
350 set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
351 goto err;
342 } 352 }
343 return 0;
344}
345 353
346static int rdma_read_max_sge(struct svcxprt_rdma *xprt, int sge_count) 354 /* return current location in page array */
347{ 355 *page_no = pg_no;
348 if ((rdma_node_get_transport(xprt->sc_cm_id->device->node_type) == 356 *page_offset = pg_off;
349 RDMA_TRANSPORT_IWARP) && 357 ret = read;
350 sge_count > 1) 358 atomic_inc(&rdma_stat_read);
351 return 1; 359 return ret;
352 else 360 err:
353 return min_t(int, sge_count, xprt->sc_max_sge); 361 svc_rdma_unmap_dma(ctxt);
362 svc_rdma_put_context(ctxt, 0);
363 svc_rdma_put_frmr(xprt, frmr);
364 return ret;
354} 365}
355 366
356/* 367static int rdma_read_chunks(struct svcxprt_rdma *xprt,
357 * Use RDMA_READ to read data from the advertised client buffer into the 368 struct rpcrdma_msg *rmsgp,
358 * XDR stream starting at rq_arg.head[0].iov_base. 369 struct svc_rqst *rqstp,
359 * Each chunk in the array 370 struct svc_rdma_op_ctxt *head)
360 * contains the following fields:
361 * discrim - '1', This isn't used for data placement
362 * position - The xdr stream offset (the same for every chunk)
363 * handle - RMR for client memory region
364 * length - data transfer length
365 * offset - 64 bit tagged offset in remote memory region
366 *
367 * On our side, we need to read into a pagelist. The first page immediately
368 * follows the RPC header.
369 *
370 * This function returns:
371 * 0 - No error and no read-list found.
372 *
373 * 1 - Successful read-list processing. The data is not yet in
374 * the pagelist and therefore the RPC request must be deferred. The
375 * I/O completion will enqueue the transport again and
376 * svc_rdma_recvfrom will complete the request.
377 *
378 * <0 - Error processing/posting read-list.
379 *
380 * NOTE: The ctxt must not be touched after the last WR has been posted
381 * because the I/O completion processing may occur on another
382 * processor and free / modify the context. Ne touche pas!
383 */
384static int rdma_read_xdr(struct svcxprt_rdma *xprt,
385 struct rpcrdma_msg *rmsgp,
386 struct svc_rqst *rqstp,
387 struct svc_rdma_op_ctxt *hdr_ctxt)
388{ 371{
389 struct ib_send_wr read_wr; 372 int page_no, ch_count, ret;
390 struct ib_send_wr inv_wr;
391 int err = 0;
392 int ch_no;
393 int ch_count;
394 int byte_count;
395 int sge_count;
396 u64 sgl_offset;
397 struct rpcrdma_read_chunk *ch; 373 struct rpcrdma_read_chunk *ch;
398 struct svc_rdma_op_ctxt *ctxt = NULL; 374 u32 page_offset, byte_count;
399 struct svc_rdma_req_map *rpl_map; 375 u64 rs_offset;
400 struct svc_rdma_req_map *chl_map; 376 rdma_reader_fn reader;
401 377
402 /* If no read list is present, return 0 */ 378 /* If no read list is present, return 0 */
403 ch = svc_rdma_get_read_chunk(rmsgp); 379 ch = svc_rdma_get_read_chunk(rmsgp);
@@ -408,122 +384,55 @@ static int rdma_read_xdr(struct svcxprt_rdma *xprt,
408 if (ch_count > RPCSVC_MAXPAGES) 384 if (ch_count > RPCSVC_MAXPAGES)
409 return -EINVAL; 385 return -EINVAL;
410 386
411 /* Allocate temporary reply and chunk maps */ 387 /* The request is completed when the RDMA_READs complete. The
412 rpl_map = svc_rdma_get_req_map(); 388 * head context keeps all the pages that comprise the
413 chl_map = svc_rdma_get_req_map(); 389 * request.
390 */
391 head->arg.head[0] = rqstp->rq_arg.head[0];
392 head->arg.tail[0] = rqstp->rq_arg.tail[0];
393 head->arg.pages = &head->pages[head->count];
394 head->hdr_count = head->count;
395 head->arg.page_base = 0;
396 head->arg.page_len = 0;
397 head->arg.len = rqstp->rq_arg.len;
398 head->arg.buflen = rqstp->rq_arg.buflen;
414 399
415 if (!xprt->sc_frmr_pg_list_len) 400 /* Use FRMR if supported */
416 sge_count = map_read_chunks(xprt, rqstp, hdr_ctxt, rmsgp, 401 if (xprt->sc_dev_caps & SVCRDMA_DEVCAP_FAST_REG)
417 rpl_map, chl_map, ch_count, 402 reader = rdma_read_chunk_frmr;
418 byte_count);
419 else 403 else
420 sge_count = fast_reg_read_chunks(xprt, rqstp, hdr_ctxt, rmsgp, 404 reader = rdma_read_chunk_lcl;
421 rpl_map, chl_map, ch_count,
422 byte_count);
423 if (sge_count < 0) {
424 err = -EIO;
425 goto out;
426 }
427
428 sgl_offset = 0;
429 ch_no = 0;
430 405
406 page_no = 0; page_offset = 0;
431 for (ch = (struct rpcrdma_read_chunk *)&rmsgp->rm_body.rm_chunks[0]; 407 for (ch = (struct rpcrdma_read_chunk *)&rmsgp->rm_body.rm_chunks[0];
432 ch->rc_discrim != 0; ch++, ch_no++) { 408 ch->rc_discrim != 0; ch++) {
433 u64 rs_offset;
434next_sge:
435 ctxt = svc_rdma_get_context(xprt);
436 ctxt->direction = DMA_FROM_DEVICE;
437 ctxt->frmr = hdr_ctxt->frmr;
438 ctxt->read_hdr = NULL;
439 clear_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
440 clear_bit(RDMACTXT_F_FAST_UNREG, &ctxt->flags);
441 409
442 /* Prepare READ WR */
443 memset(&read_wr, 0, sizeof read_wr);
444 read_wr.wr_id = (unsigned long)ctxt;
445 read_wr.opcode = IB_WR_RDMA_READ;
446 ctxt->wr_op = read_wr.opcode;
447 read_wr.send_flags = IB_SEND_SIGNALED;
448 read_wr.wr.rdma.rkey = ntohl(ch->rc_target.rs_handle);
449 xdr_decode_hyper((__be32 *)&ch->rc_target.rs_offset, 410 xdr_decode_hyper((__be32 *)&ch->rc_target.rs_offset,
450 &rs_offset); 411 &rs_offset);
451 read_wr.wr.rdma.remote_addr = rs_offset + sgl_offset; 412 byte_count = ntohl(ch->rc_target.rs_length);
452 read_wr.sg_list = ctxt->sge; 413
453 read_wr.num_sge = 414 while (byte_count > 0) {
454 rdma_read_max_sge(xprt, chl_map->ch[ch_no].count); 415 ret = reader(xprt, rqstp, head,
455 err = rdma_set_ctxt_sge(xprt, ctxt, hdr_ctxt->frmr, 416 &page_no, &page_offset,
456 &rpl_map->sge[chl_map->ch[ch_no].start], 417 ntohl(ch->rc_target.rs_handle),
457 &sgl_offset, 418 byte_count, rs_offset,
458 read_wr.num_sge); 419 ((ch+1)->rc_discrim == 0) /* last */
459 if (err) { 420 );
460 svc_rdma_unmap_dma(ctxt); 421 if (ret < 0)
461 svc_rdma_put_context(ctxt, 0); 422 goto err;
462 goto out; 423 byte_count -= ret;
463 } 424 rs_offset += ret;
464 if (((ch+1)->rc_discrim == 0) && 425 head->arg.buflen += ret;
465 (read_wr.num_sge == chl_map->ch[ch_no].count)) {
466 /*
467 * Mark the last RDMA_READ with a bit to
468 * indicate all RPC data has been fetched from
469 * the client and the RPC needs to be enqueued.
470 */
471 set_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags);
472 if (hdr_ctxt->frmr) {
473 set_bit(RDMACTXT_F_FAST_UNREG, &ctxt->flags);
474 /*
475 * Invalidate the local MR used to map the data
476 * sink.
477 */
478 if (xprt->sc_dev_caps &
479 SVCRDMA_DEVCAP_READ_W_INV) {
480 read_wr.opcode =
481 IB_WR_RDMA_READ_WITH_INV;
482 ctxt->wr_op = read_wr.opcode;
483 read_wr.ex.invalidate_rkey =
484 ctxt->frmr->mr->lkey;
485 } else {
486 /* Prepare INVALIDATE WR */
487 memset(&inv_wr, 0, sizeof inv_wr);
488 inv_wr.opcode = IB_WR_LOCAL_INV;
489 inv_wr.send_flags = IB_SEND_SIGNALED;
490 inv_wr.ex.invalidate_rkey =
491 hdr_ctxt->frmr->mr->lkey;
492 read_wr.next = &inv_wr;
493 }
494 }
495 ctxt->read_hdr = hdr_ctxt;
496 }
497 /* Post the read */
498 err = svc_rdma_send(xprt, &read_wr);
499 if (err) {
500 printk(KERN_ERR "svcrdma: Error %d posting RDMA_READ\n",
501 err);
502 set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
503 svc_rdma_unmap_dma(ctxt);
504 svc_rdma_put_context(ctxt, 0);
505 goto out;
506 } 426 }
507 atomic_inc(&rdma_stat_read);
508
509 if (read_wr.num_sge < chl_map->ch[ch_no].count) {
510 chl_map->ch[ch_no].count -= read_wr.num_sge;
511 chl_map->ch[ch_no].start += read_wr.num_sge;
512 goto next_sge;
513 }
514 sgl_offset = 0;
515 err = 1;
516 } 427 }
517 428 ret = 1;
518 out: 429 err:
519 svc_rdma_put_req_map(rpl_map);
520 svc_rdma_put_req_map(chl_map);
521
522 /* Detach arg pages. svc_recv will replenish them */ 430 /* Detach arg pages. svc_recv will replenish them */
523 for (ch_no = 0; &rqstp->rq_pages[ch_no] < rqstp->rq_respages; ch_no++) 431 for (page_no = 0;
524 rqstp->rq_pages[ch_no] = NULL; 432 &rqstp->rq_pages[page_no] < rqstp->rq_respages; page_no++)
433 rqstp->rq_pages[page_no] = NULL;
525 434
526 return err; 435 return ret;
527} 436}
528 437
529static int rdma_read_complete(struct svc_rqst *rqstp, 438static int rdma_read_complete(struct svc_rqst *rqstp,
@@ -595,13 +504,9 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
595 struct svc_rdma_op_ctxt, 504 struct svc_rdma_op_ctxt,
596 dto_q); 505 dto_q);
597 list_del_init(&ctxt->dto_q); 506 list_del_init(&ctxt->dto_q);
598 }
599 if (ctxt) {
600 spin_unlock_bh(&rdma_xprt->sc_rq_dto_lock); 507 spin_unlock_bh(&rdma_xprt->sc_rq_dto_lock);
601 return rdma_read_complete(rqstp, ctxt); 508 return rdma_read_complete(rqstp, ctxt);
602 } 509 } else if (!list_empty(&rdma_xprt->sc_rq_dto_q)) {
603
604 if (!list_empty(&rdma_xprt->sc_rq_dto_q)) {
605 ctxt = list_entry(rdma_xprt->sc_rq_dto_q.next, 510 ctxt = list_entry(rdma_xprt->sc_rq_dto_q.next,
606 struct svc_rdma_op_ctxt, 511 struct svc_rdma_op_ctxt,
607 dto_q); 512 dto_q);
@@ -621,7 +526,6 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
621 if (test_bit(XPT_CLOSE, &xprt->xpt_flags)) 526 if (test_bit(XPT_CLOSE, &xprt->xpt_flags))
622 goto close_out; 527 goto close_out;
623 528
624 BUG_ON(ret);
625 goto out; 529 goto out;
626 } 530 }
627 dprintk("svcrdma: processing ctxt=%p on xprt=%p, rqstp=%p, status=%d\n", 531 dprintk("svcrdma: processing ctxt=%p on xprt=%p, rqstp=%p, status=%d\n",
@@ -644,12 +548,11 @@ int svc_rdma_recvfrom(struct svc_rqst *rqstp)
644 } 548 }
645 549
646 /* Read read-list data. */ 550 /* Read read-list data. */
647 ret = rdma_read_xdr(rdma_xprt, rmsgp, rqstp, ctxt); 551 ret = rdma_read_chunks(rdma_xprt, rmsgp, rqstp, ctxt);
648 if (ret > 0) { 552 if (ret > 0) {
649 /* read-list posted, defer until data received from client. */ 553 /* read-list posted, defer until data received from client. */
650 goto defer; 554 goto defer;
651 } 555 } else if (ret < 0) {
652 if (ret < 0) {
653 /* Post of read-list failed, free context. */ 556 /* Post of read-list failed, free context. */
654 svc_rdma_put_context(ctxt, 1); 557 svc_rdma_put_context(ctxt, 1);
655 return 0; 558 return 0;
diff --git a/net/sunrpc/xprtrdma/svc_rdma_sendto.c b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
index 7e024a51617e..49fd21a5c215 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_sendto.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_sendto.c
@@ -1,4 +1,5 @@
1/* 1/*
2 * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved.
2 * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved. 3 * Copyright (c) 2005-2006 Network Appliance, Inc. All rights reserved.
3 * 4 *
4 * This software is available to you under a choice of one of two 5 * This software is available to you under a choice of one of two
@@ -49,152 +50,6 @@
49 50
50#define RPCDBG_FACILITY RPCDBG_SVCXPRT 51#define RPCDBG_FACILITY RPCDBG_SVCXPRT
51 52
52/* Encode an XDR as an array of IB SGE
53 *
54 * Assumptions:
55 * - head[0] is physically contiguous.
56 * - tail[0] is physically contiguous.
57 * - pages[] is not physically or virtually contiguous and consists of
58 * PAGE_SIZE elements.
59 *
60 * Output:
61 * SGE[0] reserved for RCPRDMA header
62 * SGE[1] data from xdr->head[]
63 * SGE[2..sge_count-2] data from xdr->pages[]
64 * SGE[sge_count-1] data from xdr->tail.
65 *
66 * The max SGE we need is the length of the XDR / pagesize + one for
67 * head + one for tail + one for RPCRDMA header. Since RPCSVC_MAXPAGES
68 * reserves a page for both the request and the reply header, and this
69 * array is only concerned with the reply we are assured that we have
70 * on extra page for the RPCRMDA header.
71 */
72static int fast_reg_xdr(struct svcxprt_rdma *xprt,
73 struct xdr_buf *xdr,
74 struct svc_rdma_req_map *vec)
75{
76 int sge_no;
77 u32 sge_bytes;
78 u32 page_bytes;
79 u32 page_off;
80 int page_no = 0;
81 u8 *frva;
82 struct svc_rdma_fastreg_mr *frmr;
83
84 frmr = svc_rdma_get_frmr(xprt);
85 if (IS_ERR(frmr))
86 return -ENOMEM;
87 vec->frmr = frmr;
88
89 /* Skip the RPCRDMA header */
90 sge_no = 1;
91
92 /* Map the head. */
93 frva = (void *)((unsigned long)(xdr->head[0].iov_base) & PAGE_MASK);
94 vec->sge[sge_no].iov_base = xdr->head[0].iov_base;
95 vec->sge[sge_no].iov_len = xdr->head[0].iov_len;
96 vec->count = 2;
97 sge_no++;
98
99 /* Map the XDR head */
100 frmr->kva = frva;
101 frmr->direction = DMA_TO_DEVICE;
102 frmr->access_flags = 0;
103 frmr->map_len = PAGE_SIZE;
104 frmr->page_list_len = 1;
105 page_off = (unsigned long)xdr->head[0].iov_base & ~PAGE_MASK;
106 frmr->page_list->page_list[page_no] =
107 ib_dma_map_page(xprt->sc_cm_id->device,
108 virt_to_page(xdr->head[0].iov_base),
109 page_off,
110 PAGE_SIZE - page_off,
111 DMA_TO_DEVICE);
112 if (ib_dma_mapping_error(xprt->sc_cm_id->device,
113 frmr->page_list->page_list[page_no]))
114 goto fatal_err;
115 atomic_inc(&xprt->sc_dma_used);
116
117 /* Map the XDR page list */
118 page_off = xdr->page_base;
119 page_bytes = xdr->page_len + page_off;
120 if (!page_bytes)
121 goto encode_tail;
122
123 /* Map the pages */
124 vec->sge[sge_no].iov_base = frva + frmr->map_len + page_off;
125 vec->sge[sge_no].iov_len = page_bytes;
126 sge_no++;
127 while (page_bytes) {
128 struct page *page;
129
130 page = xdr->pages[page_no++];
131 sge_bytes = min_t(u32, page_bytes, (PAGE_SIZE - page_off));
132 page_bytes -= sge_bytes;
133
134 frmr->page_list->page_list[page_no] =
135 ib_dma_map_page(xprt->sc_cm_id->device,
136 page, page_off,
137 sge_bytes, DMA_TO_DEVICE);
138 if (ib_dma_mapping_error(xprt->sc_cm_id->device,
139 frmr->page_list->page_list[page_no]))
140 goto fatal_err;
141
142 atomic_inc(&xprt->sc_dma_used);
143 page_off = 0; /* reset for next time through loop */
144 frmr->map_len += PAGE_SIZE;
145 frmr->page_list_len++;
146 }
147 vec->count++;
148
149 encode_tail:
150 /* Map tail */
151 if (0 == xdr->tail[0].iov_len)
152 goto done;
153
154 vec->count++;
155 vec->sge[sge_no].iov_len = xdr->tail[0].iov_len;
156
157 if (((unsigned long)xdr->tail[0].iov_base & PAGE_MASK) ==
158 ((unsigned long)xdr->head[0].iov_base & PAGE_MASK)) {
159 /*
160 * If head and tail use the same page, we don't need
161 * to map it again.
162 */
163 vec->sge[sge_no].iov_base = xdr->tail[0].iov_base;
164 } else {
165 void *va;
166
167 /* Map another page for the tail */
168 page_off = (unsigned long)xdr->tail[0].iov_base & ~PAGE_MASK;
169 va = (void *)((unsigned long)xdr->tail[0].iov_base & PAGE_MASK);
170 vec->sge[sge_no].iov_base = frva + frmr->map_len + page_off;
171
172 frmr->page_list->page_list[page_no] =
173 ib_dma_map_page(xprt->sc_cm_id->device, virt_to_page(va),
174 page_off,
175 PAGE_SIZE,
176 DMA_TO_DEVICE);
177 if (ib_dma_mapping_error(xprt->sc_cm_id->device,
178 frmr->page_list->page_list[page_no]))
179 goto fatal_err;
180 atomic_inc(&xprt->sc_dma_used);
181 frmr->map_len += PAGE_SIZE;
182 frmr->page_list_len++;
183 }
184
185 done:
186 if (svc_rdma_fastreg(xprt, frmr))
187 goto fatal_err;
188
189 return 0;
190
191 fatal_err:
192 printk("svcrdma: Error fast registering memory for xprt %p\n", xprt);
193 vec->frmr = NULL;
194 svc_rdma_put_frmr(xprt, frmr);
195 return -EIO;
196}
197
198static int map_xdr(struct svcxprt_rdma *xprt, 53static int map_xdr(struct svcxprt_rdma *xprt,
199 struct xdr_buf *xdr, 54 struct xdr_buf *xdr,
200 struct svc_rdma_req_map *vec) 55 struct svc_rdma_req_map *vec)
@@ -208,9 +63,6 @@ static int map_xdr(struct svcxprt_rdma *xprt,
208 BUG_ON(xdr->len != 63 BUG_ON(xdr->len !=
209 (xdr->head[0].iov_len + xdr->page_len + xdr->tail[0].iov_len)); 64 (xdr->head[0].iov_len + xdr->page_len + xdr->tail[0].iov_len));
210 65
211 if (xprt->sc_frmr_pg_list_len)
212 return fast_reg_xdr(xprt, xdr, vec);
213
214 /* Skip the first sge, this is for the RPCRDMA header */ 66 /* Skip the first sge, this is for the RPCRDMA header */
215 sge_no = 1; 67 sge_no = 1;
216 68
@@ -282,8 +134,6 @@ static dma_addr_t dma_map_xdr(struct svcxprt_rdma *xprt,
282} 134}
283 135
284/* Assumptions: 136/* Assumptions:
285 * - We are using FRMR
286 * - or -
287 * - The specified write_len can be represented in sc_max_sge * PAGE_SIZE 137 * - The specified write_len can be represented in sc_max_sge * PAGE_SIZE
288 */ 138 */
289static int send_write(struct svcxprt_rdma *xprt, struct svc_rqst *rqstp, 139static int send_write(struct svcxprt_rdma *xprt, struct svc_rqst *rqstp,
@@ -327,23 +177,16 @@ static int send_write(struct svcxprt_rdma *xprt, struct svc_rqst *rqstp,
327 sge_bytes = min_t(size_t, 177 sge_bytes = min_t(size_t,
328 bc, vec->sge[xdr_sge_no].iov_len-sge_off); 178 bc, vec->sge[xdr_sge_no].iov_len-sge_off);
329 sge[sge_no].length = sge_bytes; 179 sge[sge_no].length = sge_bytes;
330 if (!vec->frmr) { 180 sge[sge_no].addr =
331 sge[sge_no].addr = 181 dma_map_xdr(xprt, &rqstp->rq_res, xdr_off,
332 dma_map_xdr(xprt, &rqstp->rq_res, xdr_off, 182 sge_bytes, DMA_TO_DEVICE);
333 sge_bytes, DMA_TO_DEVICE); 183 xdr_off += sge_bytes;
334 xdr_off += sge_bytes; 184 if (ib_dma_mapping_error(xprt->sc_cm_id->device,
335 if (ib_dma_mapping_error(xprt->sc_cm_id->device, 185 sge[sge_no].addr))
336 sge[sge_no].addr)) 186 goto err;
337 goto err; 187 atomic_inc(&xprt->sc_dma_used);
338 atomic_inc(&xprt->sc_dma_used); 188 sge[sge_no].lkey = xprt->sc_dma_lkey;
339 sge[sge_no].lkey = xprt->sc_dma_lkey;
340 } else {
341 sge[sge_no].addr = (unsigned long)
342 vec->sge[xdr_sge_no].iov_base + sge_off;
343 sge[sge_no].lkey = vec->frmr->mr->lkey;
344 }
345 ctxt->count++; 189 ctxt->count++;
346 ctxt->frmr = vec->frmr;
347 sge_off = 0; 190 sge_off = 0;
348 sge_no++; 191 sge_no++;
349 xdr_sge_no++; 192 xdr_sge_no++;
@@ -369,7 +212,6 @@ static int send_write(struct svcxprt_rdma *xprt, struct svc_rqst *rqstp,
369 return 0; 212 return 0;
370 err: 213 err:
371 svc_rdma_unmap_dma(ctxt); 214 svc_rdma_unmap_dma(ctxt);
372 svc_rdma_put_frmr(xprt, vec->frmr);
373 svc_rdma_put_context(ctxt, 0); 215 svc_rdma_put_context(ctxt, 0);
374 /* Fatal error, close transport */ 216 /* Fatal error, close transport */
375 return -EIO; 217 return -EIO;
@@ -397,10 +239,7 @@ static int send_write_chunks(struct svcxprt_rdma *xprt,
397 res_ary = (struct rpcrdma_write_array *) 239 res_ary = (struct rpcrdma_write_array *)
398 &rdma_resp->rm_body.rm_chunks[1]; 240 &rdma_resp->rm_body.rm_chunks[1];
399 241
400 if (vec->frmr) 242 max_write = xprt->sc_max_sge * PAGE_SIZE;
401 max_write = vec->frmr->map_len;
402 else
403 max_write = xprt->sc_max_sge * PAGE_SIZE;
404 243
405 /* Write chunks start at the pagelist */ 244 /* Write chunks start at the pagelist */
406 for (xdr_off = rqstp->rq_res.head[0].iov_len, chunk_no = 0; 245 for (xdr_off = rqstp->rq_res.head[0].iov_len, chunk_no = 0;
@@ -472,10 +311,7 @@ static int send_reply_chunks(struct svcxprt_rdma *xprt,
472 res_ary = (struct rpcrdma_write_array *) 311 res_ary = (struct rpcrdma_write_array *)
473 &rdma_resp->rm_body.rm_chunks[2]; 312 &rdma_resp->rm_body.rm_chunks[2];
474 313
475 if (vec->frmr) 314 max_write = xprt->sc_max_sge * PAGE_SIZE;
476 max_write = vec->frmr->map_len;
477 else
478 max_write = xprt->sc_max_sge * PAGE_SIZE;
479 315
480 /* xdr offset starts at RPC message */ 316 /* xdr offset starts at RPC message */
481 nchunks = ntohl(arg_ary->wc_nchunks); 317 nchunks = ntohl(arg_ary->wc_nchunks);
@@ -545,7 +381,6 @@ static int send_reply(struct svcxprt_rdma *rdma,
545 int byte_count) 381 int byte_count)
546{ 382{
547 struct ib_send_wr send_wr; 383 struct ib_send_wr send_wr;
548 struct ib_send_wr inv_wr;
549 int sge_no; 384 int sge_no;
550 int sge_bytes; 385 int sge_bytes;
551 int page_no; 386 int page_no;
@@ -559,7 +394,6 @@ static int send_reply(struct svcxprt_rdma *rdma,
559 "svcrdma: could not post a receive buffer, err=%d." 394 "svcrdma: could not post a receive buffer, err=%d."
560 "Closing transport %p.\n", ret, rdma); 395 "Closing transport %p.\n", ret, rdma);
561 set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags); 396 set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
562 svc_rdma_put_frmr(rdma, vec->frmr);
563 svc_rdma_put_context(ctxt, 0); 397 svc_rdma_put_context(ctxt, 0);
564 return -ENOTCONN; 398 return -ENOTCONN;
565 } 399 }
@@ -567,11 +401,6 @@ static int send_reply(struct svcxprt_rdma *rdma,
567 /* Prepare the context */ 401 /* Prepare the context */
568 ctxt->pages[0] = page; 402 ctxt->pages[0] = page;
569 ctxt->count = 1; 403 ctxt->count = 1;
570 ctxt->frmr = vec->frmr;
571 if (vec->frmr)
572 set_bit(RDMACTXT_F_FAST_UNREG, &ctxt->flags);
573 else
574 clear_bit(RDMACTXT_F_FAST_UNREG, &ctxt->flags);
575 404
576 /* Prepare the SGE for the RPCRDMA Header */ 405 /* Prepare the SGE for the RPCRDMA Header */
577 ctxt->sge[0].lkey = rdma->sc_dma_lkey; 406 ctxt->sge[0].lkey = rdma->sc_dma_lkey;
@@ -590,21 +419,15 @@ static int send_reply(struct svcxprt_rdma *rdma,
590 int xdr_off = 0; 419 int xdr_off = 0;
591 sge_bytes = min_t(size_t, vec->sge[sge_no].iov_len, byte_count); 420 sge_bytes = min_t(size_t, vec->sge[sge_no].iov_len, byte_count);
592 byte_count -= sge_bytes; 421 byte_count -= sge_bytes;
593 if (!vec->frmr) { 422 ctxt->sge[sge_no].addr =
594 ctxt->sge[sge_no].addr = 423 dma_map_xdr(rdma, &rqstp->rq_res, xdr_off,
595 dma_map_xdr(rdma, &rqstp->rq_res, xdr_off, 424 sge_bytes, DMA_TO_DEVICE);
596 sge_bytes, DMA_TO_DEVICE); 425 xdr_off += sge_bytes;
597 xdr_off += sge_bytes; 426 if (ib_dma_mapping_error(rdma->sc_cm_id->device,
598 if (ib_dma_mapping_error(rdma->sc_cm_id->device, 427 ctxt->sge[sge_no].addr))
599 ctxt->sge[sge_no].addr)) 428 goto err;
600 goto err; 429 atomic_inc(&rdma->sc_dma_used);
601 atomic_inc(&rdma->sc_dma_used); 430 ctxt->sge[sge_no].lkey = rdma->sc_dma_lkey;
602 ctxt->sge[sge_no].lkey = rdma->sc_dma_lkey;
603 } else {
604 ctxt->sge[sge_no].addr = (unsigned long)
605 vec->sge[sge_no].iov_base;
606 ctxt->sge[sge_no].lkey = vec->frmr->mr->lkey;
607 }
608 ctxt->sge[sge_no].length = sge_bytes; 431 ctxt->sge[sge_no].length = sge_bytes;
609 } 432 }
610 BUG_ON(byte_count != 0); 433 BUG_ON(byte_count != 0);
@@ -627,6 +450,7 @@ static int send_reply(struct svcxprt_rdma *rdma,
627 ctxt->sge[page_no+1].length = 0; 450 ctxt->sge[page_no+1].length = 0;
628 } 451 }
629 rqstp->rq_next_page = rqstp->rq_respages + 1; 452 rqstp->rq_next_page = rqstp->rq_respages + 1;
453
630 BUG_ON(sge_no > rdma->sc_max_sge); 454 BUG_ON(sge_no > rdma->sc_max_sge);
631 memset(&send_wr, 0, sizeof send_wr); 455 memset(&send_wr, 0, sizeof send_wr);
632 ctxt->wr_op = IB_WR_SEND; 456 ctxt->wr_op = IB_WR_SEND;
@@ -635,15 +459,6 @@ static int send_reply(struct svcxprt_rdma *rdma,
635 send_wr.num_sge = sge_no; 459 send_wr.num_sge = sge_no;
636 send_wr.opcode = IB_WR_SEND; 460 send_wr.opcode = IB_WR_SEND;
637 send_wr.send_flags = IB_SEND_SIGNALED; 461 send_wr.send_flags = IB_SEND_SIGNALED;
638 if (vec->frmr) {
639 /* Prepare INVALIDATE WR */
640 memset(&inv_wr, 0, sizeof inv_wr);
641 inv_wr.opcode = IB_WR_LOCAL_INV;
642 inv_wr.send_flags = IB_SEND_SIGNALED;
643 inv_wr.ex.invalidate_rkey =
644 vec->frmr->mr->lkey;
645 send_wr.next = &inv_wr;
646 }
647 462
648 ret = svc_rdma_send(rdma, &send_wr); 463 ret = svc_rdma_send(rdma, &send_wr);
649 if (ret) 464 if (ret)
@@ -653,7 +468,6 @@ static int send_reply(struct svcxprt_rdma *rdma,
653 468
654 err: 469 err:
655 svc_rdma_unmap_dma(ctxt); 470 svc_rdma_unmap_dma(ctxt);
656 svc_rdma_put_frmr(rdma, vec->frmr);
657 svc_rdma_put_context(ctxt, 1); 471 svc_rdma_put_context(ctxt, 1);
658 return -EIO; 472 return -EIO;
659} 473}
diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c
index 02db8d9cc994..e7323fbbd348 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_transport.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c
@@ -1,4 +1,5 @@
1/* 1/*
2 * Copyright (c) 2014 Open Grid Computing, Inc. All rights reserved.
2 * Copyright (c) 2005-2007 Network Appliance, Inc. All rights reserved. 3 * Copyright (c) 2005-2007 Network Appliance, Inc. All rights reserved.
3 * 4 *
4 * This software is available to you under a choice of one of two 5 * This software is available to you under a choice of one of two
@@ -162,7 +163,6 @@ struct svc_rdma_req_map *svc_rdma_get_req_map(void)
162 schedule_timeout_uninterruptible(msecs_to_jiffies(500)); 163 schedule_timeout_uninterruptible(msecs_to_jiffies(500));
163 } 164 }
164 map->count = 0; 165 map->count = 0;
165 map->frmr = NULL;
166 return map; 166 return map;
167} 167}
168 168
@@ -338,22 +338,21 @@ static void process_context(struct svcxprt_rdma *xprt,
338 338
339 switch (ctxt->wr_op) { 339 switch (ctxt->wr_op) {
340 case IB_WR_SEND: 340 case IB_WR_SEND:
341 if (test_bit(RDMACTXT_F_FAST_UNREG, &ctxt->flags)) 341 BUG_ON(ctxt->frmr);
342 svc_rdma_put_frmr(xprt, ctxt->frmr);
343 svc_rdma_put_context(ctxt, 1); 342 svc_rdma_put_context(ctxt, 1);
344 break; 343 break;
345 344
346 case IB_WR_RDMA_WRITE: 345 case IB_WR_RDMA_WRITE:
346 BUG_ON(ctxt->frmr);
347 svc_rdma_put_context(ctxt, 0); 347 svc_rdma_put_context(ctxt, 0);
348 break; 348 break;
349 349
350 case IB_WR_RDMA_READ: 350 case IB_WR_RDMA_READ:
351 case IB_WR_RDMA_READ_WITH_INV: 351 case IB_WR_RDMA_READ_WITH_INV:
352 svc_rdma_put_frmr(xprt, ctxt->frmr);
352 if (test_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags)) { 353 if (test_bit(RDMACTXT_F_LAST_CTXT, &ctxt->flags)) {
353 struct svc_rdma_op_ctxt *read_hdr = ctxt->read_hdr; 354 struct svc_rdma_op_ctxt *read_hdr = ctxt->read_hdr;
354 BUG_ON(!read_hdr); 355 BUG_ON(!read_hdr);
355 if (test_bit(RDMACTXT_F_FAST_UNREG, &ctxt->flags))
356 svc_rdma_put_frmr(xprt, ctxt->frmr);
357 spin_lock_bh(&xprt->sc_rq_dto_lock); 356 spin_lock_bh(&xprt->sc_rq_dto_lock);
358 set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags); 357 set_bit(XPT_DATA, &xprt->sc_xprt.xpt_flags);
359 list_add_tail(&read_hdr->dto_q, 358 list_add_tail(&read_hdr->dto_q,
@@ -365,6 +364,7 @@ static void process_context(struct svcxprt_rdma *xprt,
365 break; 364 break;
366 365
367 default: 366 default:
367 BUG_ON(1);
368 printk(KERN_ERR "svcrdma: unexpected completion type, " 368 printk(KERN_ERR "svcrdma: unexpected completion type, "
369 "opcode=%d\n", 369 "opcode=%d\n",
370 ctxt->wr_op); 370 ctxt->wr_op);
@@ -380,29 +380,42 @@ static void process_context(struct svcxprt_rdma *xprt,
380static void sq_cq_reap(struct svcxprt_rdma *xprt) 380static void sq_cq_reap(struct svcxprt_rdma *xprt)
381{ 381{
382 struct svc_rdma_op_ctxt *ctxt = NULL; 382 struct svc_rdma_op_ctxt *ctxt = NULL;
383 struct ib_wc wc; 383 struct ib_wc wc_a[6];
384 struct ib_wc *wc;
384 struct ib_cq *cq = xprt->sc_sq_cq; 385 struct ib_cq *cq = xprt->sc_sq_cq;
385 int ret; 386 int ret;
386 387
388 memset(wc_a, 0, sizeof(wc_a));
389
387 if (!test_and_clear_bit(RDMAXPRT_SQ_PENDING, &xprt->sc_flags)) 390 if (!test_and_clear_bit(RDMAXPRT_SQ_PENDING, &xprt->sc_flags))
388 return; 391 return;
389 392
390 ib_req_notify_cq(xprt->sc_sq_cq, IB_CQ_NEXT_COMP); 393 ib_req_notify_cq(xprt->sc_sq_cq, IB_CQ_NEXT_COMP);
391 atomic_inc(&rdma_stat_sq_poll); 394 atomic_inc(&rdma_stat_sq_poll);
392 while ((ret = ib_poll_cq(cq, 1, &wc)) > 0) { 395 while ((ret = ib_poll_cq(cq, ARRAY_SIZE(wc_a), wc_a)) > 0) {
393 if (wc.status != IB_WC_SUCCESS) 396 int i;
394 /* Close the transport */
395 set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
396 397
397 /* Decrement used SQ WR count */ 398 for (i = 0; i < ret; i++) {
398 atomic_dec(&xprt->sc_sq_count); 399 wc = &wc_a[i];
399 wake_up(&xprt->sc_send_wait); 400 if (wc->status != IB_WC_SUCCESS) {
401 dprintk("svcrdma: sq wc err status %d\n",
402 wc->status);
400 403
401 ctxt = (struct svc_rdma_op_ctxt *)(unsigned long)wc.wr_id; 404 /* Close the transport */
402 if (ctxt) 405 set_bit(XPT_CLOSE, &xprt->sc_xprt.xpt_flags);
403 process_context(xprt, ctxt); 406 }
404 407
405 svc_xprt_put(&xprt->sc_xprt); 408 /* Decrement used SQ WR count */
409 atomic_dec(&xprt->sc_sq_count);
410 wake_up(&xprt->sc_send_wait);
411
412 ctxt = (struct svc_rdma_op_ctxt *)
413 (unsigned long)wc->wr_id;
414 if (ctxt)
415 process_context(xprt, ctxt);
416
417 svc_xprt_put(&xprt->sc_xprt);
418 }
406 } 419 }
407 420
408 if (ctxt) 421 if (ctxt)
@@ -995,7 +1008,11 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
995 need_dma_mr = 0; 1008 need_dma_mr = 0;
996 break; 1009 break;
997 case RDMA_TRANSPORT_IB: 1010 case RDMA_TRANSPORT_IB:
998 if (!(devattr.device_cap_flags & IB_DEVICE_LOCAL_DMA_LKEY)) { 1011 if (!(newxprt->sc_dev_caps & SVCRDMA_DEVCAP_FAST_REG)) {
1012 need_dma_mr = 1;
1013 dma_mr_acc = IB_ACCESS_LOCAL_WRITE;
1014 } else if (!(devattr.device_cap_flags &
1015 IB_DEVICE_LOCAL_DMA_LKEY)) {
999 need_dma_mr = 1; 1016 need_dma_mr = 1;
1000 dma_mr_acc = IB_ACCESS_LOCAL_WRITE; 1017 dma_mr_acc = IB_ACCESS_LOCAL_WRITE;
1001 } else 1018 } else
@@ -1192,14 +1209,7 @@ static int svc_rdma_has_wspace(struct svc_xprt *xprt)
1192 container_of(xprt, struct svcxprt_rdma, sc_xprt); 1209 container_of(xprt, struct svcxprt_rdma, sc_xprt);
1193 1210
1194 /* 1211 /*
1195 * If there are fewer SQ WR available than required to send a 1212 * If there are already waiters on the SQ,
1196 * simple response, return false.
1197 */
1198 if ((rdma->sc_sq_depth - atomic_read(&rdma->sc_sq_count) < 3))
1199 return 0;
1200
1201 /*
1202 * ...or there are already waiters on the SQ,
1203 * return false. 1213 * return false.
1204 */ 1214 */
1205 if (waitqueue_active(&rdma->sc_send_wait)) 1215 if (waitqueue_active(&rdma->sc_send_wait))