aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorChuck Lever <chuck.lever@oracle.com>2017-04-09 13:06:16 -0400
committerJ. Bruce Fields <bfields@redhat.com>2017-04-25 17:25:55 -0400
commitf13193f50b64e2e0c87706b838d6b9895626a892 (patch)
tree189018c10910c8ffbb253829936dcc7ba7eee233
parentc238c4c034f857d12d7efbf9934d96b8bb68fbc7 (diff)
svcrdma: Introduce local rdma_rw API helpers
The plan is to replace the local bespoke code that constructs and posts RDMA Read and Write Work Requests with calls to the rdma_rw API. This shares code with other RDMA-enabled ULPs that manages the gory details of buffer registration and posting Work Requests. Some design notes: o The structure of RPC-over-RDMA transport headers is flexible, allowing multiple segments per Reply with arbitrary alignment, each with a unique R_key. Write and Send WRs continue to be built and posted in separate code paths. However, one whole chunk (with one or more RDMA segments apiece) gets exactly one ib_post_send and one work completion. o svc_xprt reference counting is modified, since a chain of rdma_rw_ctx structs generates one completion, no matter how many Write WRs are posted. o The current code builds the transport header as it is construct- ing Write WRs. I've replaced that with marshaling of transport header data items in a separate step. This is because the exact structure of client-provided segments may not align with the components of the server's reply xdr_buf, or the pages in the page list. Thus parts of each client-provided segment may be written at different points in the send path. Signed-off-by: Chuck Lever <chuck.lever@oracle.com> Signed-off-by: J. Bruce Fields <bfields@redhat.com>
-rw-r--r--include/linux/sunrpc/svc_rdma.h11
-rw-r--r--net/sunrpc/Kconfig1
-rw-r--r--net/sunrpc/xprtrdma/Makefile2
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_rw.c512
-rw-r--r--net/sunrpc/xprtrdma/svc_rdma_transport.c4
5 files changed, 529 insertions, 1 deletions
diff --git a/include/linux/sunrpc/svc_rdma.h b/include/linux/sunrpc/svc_rdma.h
index 11d5aa123f17..ca08671fb7e2 100644
--- a/include/linux/sunrpc/svc_rdma.h
+++ b/include/linux/sunrpc/svc_rdma.h
@@ -145,12 +145,15 @@ struct svcxprt_rdma {
145 u32 sc_max_requests; /* Max requests */ 145 u32 sc_max_requests; /* Max requests */
146 u32 sc_max_bc_requests;/* Backward credits */ 146 u32 sc_max_bc_requests;/* Backward credits */
147 int sc_max_req_size; /* Size of each RQ WR buf */ 147 int sc_max_req_size; /* Size of each RQ WR buf */
148 u8 sc_port_num;
148 149
149 struct ib_pd *sc_pd; 150 struct ib_pd *sc_pd;
150 151
151 spinlock_t sc_ctxt_lock; 152 spinlock_t sc_ctxt_lock;
152 struct list_head sc_ctxts; 153 struct list_head sc_ctxts;
153 int sc_ctxt_used; 154 int sc_ctxt_used;
155 spinlock_t sc_rw_ctxt_lock;
156 struct list_head sc_rw_ctxts;
154 spinlock_t sc_map_lock; 157 spinlock_t sc_map_lock;
155 struct list_head sc_maps; 158 struct list_head sc_maps;
156 159
@@ -224,6 +227,14 @@ extern int rdma_read_chunk_frmr(struct svcxprt_rdma *, struct svc_rqst *,
224 struct svc_rdma_op_ctxt *, int *, u32 *, 227 struct svc_rdma_op_ctxt *, int *, u32 *,
225 u32, u32, u64, bool); 228 u32, u32, u64, bool);
226 229
230/* svc_rdma_rw.c */
231extern void svc_rdma_destroy_rw_ctxts(struct svcxprt_rdma *rdma);
232extern int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma,
233 __be32 *wr_ch, struct xdr_buf *xdr);
234extern int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma,
235 __be32 *rp_ch, bool writelist,
236 struct xdr_buf *xdr);
237
227/* svc_rdma_sendto.c */ 238/* svc_rdma_sendto.c */
228extern int svc_rdma_map_xdr(struct svcxprt_rdma *, struct xdr_buf *, 239extern int svc_rdma_map_xdr(struct svcxprt_rdma *, struct xdr_buf *,
229 struct svc_rdma_req_map *, bool); 240 struct svc_rdma_req_map *, bool);
diff --git a/net/sunrpc/Kconfig b/net/sunrpc/Kconfig
index 04ce2c0b660e..ac09ca803296 100644
--- a/net/sunrpc/Kconfig
+++ b/net/sunrpc/Kconfig
@@ -52,6 +52,7 @@ config SUNRPC_XPRT_RDMA
52 tristate "RPC-over-RDMA transport" 52 tristate "RPC-over-RDMA transport"
53 depends on SUNRPC && INFINIBAND && INFINIBAND_ADDR_TRANS 53 depends on SUNRPC && INFINIBAND && INFINIBAND_ADDR_TRANS
54 default SUNRPC && INFINIBAND 54 default SUNRPC && INFINIBAND
55 select SG_POOL
55 help 56 help
56 This option allows the NFS client and server to use RDMA 57 This option allows the NFS client and server to use RDMA
57 transports (InfiniBand, iWARP, or RoCE). 58 transports (InfiniBand, iWARP, or RoCE).
diff --git a/net/sunrpc/xprtrdma/Makefile b/net/sunrpc/xprtrdma/Makefile
index ef19fa42c50f..c1ae8142ab73 100644
--- a/net/sunrpc/xprtrdma/Makefile
+++ b/net/sunrpc/xprtrdma/Makefile
@@ -4,5 +4,5 @@ rpcrdma-y := transport.o rpc_rdma.o verbs.o \
4 fmr_ops.o frwr_ops.o \ 4 fmr_ops.o frwr_ops.o \
5 svc_rdma.o svc_rdma_backchannel.o svc_rdma_transport.o \ 5 svc_rdma.o svc_rdma_backchannel.o svc_rdma_transport.o \
6 svc_rdma_marshal.o svc_rdma_sendto.o svc_rdma_recvfrom.o \ 6 svc_rdma_marshal.o svc_rdma_sendto.o svc_rdma_recvfrom.o \
7 module.o 7 svc_rdma_rw.o module.o
8rpcrdma-$(CONFIG_SUNRPC_BACKCHANNEL) += backchannel.o 8rpcrdma-$(CONFIG_SUNRPC_BACKCHANNEL) += backchannel.o
diff --git a/net/sunrpc/xprtrdma/svc_rdma_rw.c b/net/sunrpc/xprtrdma/svc_rdma_rw.c
new file mode 100644
index 000000000000..0cf620277693
--- /dev/null
+++ b/net/sunrpc/xprtrdma/svc_rdma_rw.c
@@ -0,0 +1,512 @@
1/*
2 * Copyright (c) 2016 Oracle. All rights reserved.
3 *
4 * Use the core R/W API to move RPC-over-RDMA Read and Write chunks.
5 */
6
7#include <linux/sunrpc/rpc_rdma.h>
8#include <linux/sunrpc/svc_rdma.h>
9#include <linux/sunrpc/debug.h>
10
11#include <rdma/rw.h>
12
13#define RPCDBG_FACILITY RPCDBG_SVCXPRT
14
15/* Each R/W context contains state for one chain of RDMA Read or
16 * Write Work Requests.
17 *
18 * Each WR chain handles a single contiguous server-side buffer,
19 * because scatterlist entries after the first have to start on
20 * page alignment. xdr_buf iovecs cannot guarantee alignment.
21 *
22 * Each WR chain handles only one R_key. Each RPC-over-RDMA segment
23 * from a client may contain a unique R_key, so each WR chain moves
24 * up to one segment at a time.
25 *
26 * The scatterlist makes this data structure over 4KB in size. To
27 * make it less likely to fail, and to handle the allocation for
28 * smaller I/O requests without disabling bottom-halves, these
29 * contexts are created on demand, but cached and reused until the
30 * controlling svcxprt_rdma is destroyed.
31 */
32struct svc_rdma_rw_ctxt {
33 struct list_head rw_list;
34 struct rdma_rw_ctx rw_ctx;
35 int rw_nents;
36 struct sg_table rw_sg_table;
37 struct scatterlist rw_first_sgl[0];
38};
39
40static inline struct svc_rdma_rw_ctxt *
41svc_rdma_next_ctxt(struct list_head *list)
42{
43 return list_first_entry_or_null(list, struct svc_rdma_rw_ctxt,
44 rw_list);
45}
46
47static struct svc_rdma_rw_ctxt *
48svc_rdma_get_rw_ctxt(struct svcxprt_rdma *rdma, unsigned int sges)
49{
50 struct svc_rdma_rw_ctxt *ctxt;
51
52 spin_lock(&rdma->sc_rw_ctxt_lock);
53
54 ctxt = svc_rdma_next_ctxt(&rdma->sc_rw_ctxts);
55 if (ctxt) {
56 list_del(&ctxt->rw_list);
57 spin_unlock(&rdma->sc_rw_ctxt_lock);
58 } else {
59 spin_unlock(&rdma->sc_rw_ctxt_lock);
60 ctxt = kmalloc(sizeof(*ctxt) +
61 SG_CHUNK_SIZE * sizeof(struct scatterlist),
62 GFP_KERNEL);
63 if (!ctxt)
64 goto out;
65 INIT_LIST_HEAD(&ctxt->rw_list);
66 }
67
68 ctxt->rw_sg_table.sgl = ctxt->rw_first_sgl;
69 if (sg_alloc_table_chained(&ctxt->rw_sg_table, sges,
70 ctxt->rw_sg_table.sgl)) {
71 kfree(ctxt);
72 ctxt = NULL;
73 }
74out:
75 return ctxt;
76}
77
78static void svc_rdma_put_rw_ctxt(struct svcxprt_rdma *rdma,
79 struct svc_rdma_rw_ctxt *ctxt)
80{
81 sg_free_table_chained(&ctxt->rw_sg_table, true);
82
83 spin_lock(&rdma->sc_rw_ctxt_lock);
84 list_add(&ctxt->rw_list, &rdma->sc_rw_ctxts);
85 spin_unlock(&rdma->sc_rw_ctxt_lock);
86}
87
88/**
89 * svc_rdma_destroy_rw_ctxts - Free accumulated R/W contexts
90 * @rdma: transport about to be destroyed
91 *
92 */
93void svc_rdma_destroy_rw_ctxts(struct svcxprt_rdma *rdma)
94{
95 struct svc_rdma_rw_ctxt *ctxt;
96
97 while ((ctxt = svc_rdma_next_ctxt(&rdma->sc_rw_ctxts)) != NULL) {
98 list_del(&ctxt->rw_list);
99 kfree(ctxt);
100 }
101}
102
103/* A chunk context tracks all I/O for moving one Read or Write
104 * chunk. This is a a set of rdma_rw's that handle data movement
105 * for all segments of one chunk.
106 *
107 * These are small, acquired with a single allocator call, and
108 * no more than one is needed per chunk. They are allocated on
109 * demand, and not cached.
110 */
111struct svc_rdma_chunk_ctxt {
112 struct ib_cqe cc_cqe;
113 struct svcxprt_rdma *cc_rdma;
114 struct list_head cc_rwctxts;
115 int cc_sqecount;
116 enum dma_data_direction cc_dir;
117};
118
119static void svc_rdma_cc_init(struct svcxprt_rdma *rdma,
120 struct svc_rdma_chunk_ctxt *cc,
121 enum dma_data_direction dir)
122{
123 cc->cc_rdma = rdma;
124 svc_xprt_get(&rdma->sc_xprt);
125
126 INIT_LIST_HEAD(&cc->cc_rwctxts);
127 cc->cc_sqecount = 0;
128 cc->cc_dir = dir;
129}
130
131static void svc_rdma_cc_release(struct svc_rdma_chunk_ctxt *cc)
132{
133 struct svcxprt_rdma *rdma = cc->cc_rdma;
134 struct svc_rdma_rw_ctxt *ctxt;
135
136 while ((ctxt = svc_rdma_next_ctxt(&cc->cc_rwctxts)) != NULL) {
137 list_del(&ctxt->rw_list);
138
139 rdma_rw_ctx_destroy(&ctxt->rw_ctx, rdma->sc_qp,
140 rdma->sc_port_num, ctxt->rw_sg_table.sgl,
141 ctxt->rw_nents, cc->cc_dir);
142 svc_rdma_put_rw_ctxt(rdma, ctxt);
143 }
144 svc_xprt_put(&rdma->sc_xprt);
145}
146
147/* State for sending a Write or Reply chunk.
148 * - Tracks progress of writing one chunk over all its segments
149 * - Stores arguments for the SGL constructor functions
150 */
151struct svc_rdma_write_info {
152 /* write state of this chunk */
153 unsigned int wi_seg_off;
154 unsigned int wi_seg_no;
155 unsigned int wi_nsegs;
156 __be32 *wi_segs;
157
158 /* SGL constructor arguments */
159 struct xdr_buf *wi_xdr;
160 unsigned char *wi_base;
161 unsigned int wi_next_off;
162
163 struct svc_rdma_chunk_ctxt wi_cc;
164};
165
166static struct svc_rdma_write_info *
167svc_rdma_write_info_alloc(struct svcxprt_rdma *rdma, __be32 *chunk)
168{
169 struct svc_rdma_write_info *info;
170
171 info = kmalloc(sizeof(*info), GFP_KERNEL);
172 if (!info)
173 return info;
174
175 info->wi_seg_off = 0;
176 info->wi_seg_no = 0;
177 info->wi_nsegs = be32_to_cpup(++chunk);
178 info->wi_segs = ++chunk;
179 svc_rdma_cc_init(rdma, &info->wi_cc, DMA_TO_DEVICE);
180 return info;
181}
182
183static void svc_rdma_write_info_free(struct svc_rdma_write_info *info)
184{
185 svc_rdma_cc_release(&info->wi_cc);
186 kfree(info);
187}
188
189/**
190 * svc_rdma_write_done - Write chunk completion
191 * @cq: controlling Completion Queue
192 * @wc: Work Completion
193 *
194 * Pages under I/O are freed by a subsequent Send completion.
195 */
196static void svc_rdma_write_done(struct ib_cq *cq, struct ib_wc *wc)
197{
198 struct ib_cqe *cqe = wc->wr_cqe;
199 struct svc_rdma_chunk_ctxt *cc =
200 container_of(cqe, struct svc_rdma_chunk_ctxt, cc_cqe);
201 struct svcxprt_rdma *rdma = cc->cc_rdma;
202 struct svc_rdma_write_info *info =
203 container_of(cc, struct svc_rdma_write_info, wi_cc);
204
205 atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail);
206 wake_up(&rdma->sc_send_wait);
207
208 if (unlikely(wc->status != IB_WC_SUCCESS)) {
209 set_bit(XPT_CLOSE, &rdma->sc_xprt.xpt_flags);
210 if (wc->status != IB_WC_WR_FLUSH_ERR)
211 pr_err("svcrdma: write ctx: %s (%u/0x%x)\n",
212 ib_wc_status_msg(wc->status),
213 wc->status, wc->vendor_err);
214 }
215
216 svc_rdma_write_info_free(info);
217}
218
219/* This function sleeps when the transport's Send Queue is congested.
220 *
221 * Assumptions:
222 * - If ib_post_send() succeeds, only one completion is expected,
223 * even if one or more WRs are flushed. This is true when posting
224 * an rdma_rw_ctx or when posting a single signaled WR.
225 */
226static int svc_rdma_post_chunk_ctxt(struct svc_rdma_chunk_ctxt *cc)
227{
228 struct svcxprt_rdma *rdma = cc->cc_rdma;
229 struct svc_xprt *xprt = &rdma->sc_xprt;
230 struct ib_send_wr *first_wr, *bad_wr;
231 struct list_head *tmp;
232 struct ib_cqe *cqe;
233 int ret;
234
235 first_wr = NULL;
236 cqe = &cc->cc_cqe;
237 list_for_each(tmp, &cc->cc_rwctxts) {
238 struct svc_rdma_rw_ctxt *ctxt;
239
240 ctxt = list_entry(tmp, struct svc_rdma_rw_ctxt, rw_list);
241 first_wr = rdma_rw_ctx_wrs(&ctxt->rw_ctx, rdma->sc_qp,
242 rdma->sc_port_num, cqe, first_wr);
243 cqe = NULL;
244 }
245
246 do {
247 if (atomic_sub_return(cc->cc_sqecount,
248 &rdma->sc_sq_avail) > 0) {
249 ret = ib_post_send(rdma->sc_qp, first_wr, &bad_wr);
250 if (ret)
251 break;
252 return 0;
253 }
254
255 atomic_inc(&rdma_stat_sq_starve);
256 atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail);
257 wait_event(rdma->sc_send_wait,
258 atomic_read(&rdma->sc_sq_avail) > cc->cc_sqecount);
259 } while (1);
260
261 pr_err("svcrdma: ib_post_send failed (%d)\n", ret);
262 set_bit(XPT_CLOSE, &xprt->xpt_flags);
263
264 /* If even one was posted, there will be a completion. */
265 if (bad_wr != first_wr)
266 return 0;
267
268 atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail);
269 wake_up(&rdma->sc_send_wait);
270 return -ENOTCONN;
271}
272
273/* Build and DMA-map an SGL that covers one kvec in an xdr_buf
274 */
275static void svc_rdma_vec_to_sg(struct svc_rdma_write_info *info,
276 unsigned int len,
277 struct svc_rdma_rw_ctxt *ctxt)
278{
279 struct scatterlist *sg = ctxt->rw_sg_table.sgl;
280
281 sg_set_buf(&sg[0], info->wi_base, len);
282 info->wi_base += len;
283
284 ctxt->rw_nents = 1;
285}
286
287/* Build and DMA-map an SGL that covers part of an xdr_buf's pagelist.
288 */
289static void svc_rdma_pagelist_to_sg(struct svc_rdma_write_info *info,
290 unsigned int remaining,
291 struct svc_rdma_rw_ctxt *ctxt)
292{
293 unsigned int sge_no, sge_bytes, page_off, page_no;
294 struct xdr_buf *xdr = info->wi_xdr;
295 struct scatterlist *sg;
296 struct page **page;
297
298 page_off = (info->wi_next_off + xdr->page_base) & ~PAGE_MASK;
299 page_no = (info->wi_next_off + xdr->page_base) >> PAGE_SHIFT;
300 page = xdr->pages + page_no;
301 info->wi_next_off += remaining;
302 sg = ctxt->rw_sg_table.sgl;
303 sge_no = 0;
304 do {
305 sge_bytes = min_t(unsigned int, remaining,
306 PAGE_SIZE - page_off);
307 sg_set_page(sg, *page, sge_bytes, page_off);
308
309 remaining -= sge_bytes;
310 sg = sg_next(sg);
311 page_off = 0;
312 sge_no++;
313 page++;
314 } while (remaining);
315
316 ctxt->rw_nents = sge_no;
317}
318
319/* Construct RDMA Write WRs to send a portion of an xdr_buf containing
320 * an RPC Reply.
321 */
322static int
323svc_rdma_build_writes(struct svc_rdma_write_info *info,
324 void (*constructor)(struct svc_rdma_write_info *info,
325 unsigned int len,
326 struct svc_rdma_rw_ctxt *ctxt),
327 unsigned int remaining)
328{
329 struct svc_rdma_chunk_ctxt *cc = &info->wi_cc;
330 struct svcxprt_rdma *rdma = cc->cc_rdma;
331 struct svc_rdma_rw_ctxt *ctxt;
332 __be32 *seg;
333 int ret;
334
335 cc->cc_cqe.done = svc_rdma_write_done;
336 seg = info->wi_segs + info->wi_seg_no * rpcrdma_segment_maxsz;
337 do {
338 unsigned int write_len;
339 u32 seg_length, seg_handle;
340 u64 seg_offset;
341
342 if (info->wi_seg_no >= info->wi_nsegs)
343 goto out_overflow;
344
345 seg_handle = be32_to_cpup(seg);
346 seg_length = be32_to_cpup(seg + 1);
347 xdr_decode_hyper(seg + 2, &seg_offset);
348 seg_offset += info->wi_seg_off;
349
350 write_len = min(remaining, seg_length - info->wi_seg_off);
351 ctxt = svc_rdma_get_rw_ctxt(rdma,
352 (write_len >> PAGE_SHIFT) + 2);
353 if (!ctxt)
354 goto out_noctx;
355
356 constructor(info, write_len, ctxt);
357 ret = rdma_rw_ctx_init(&ctxt->rw_ctx, rdma->sc_qp,
358 rdma->sc_port_num, ctxt->rw_sg_table.sgl,
359 ctxt->rw_nents, 0, seg_offset,
360 seg_handle, DMA_TO_DEVICE);
361 if (ret < 0)
362 goto out_initerr;
363
364 list_add(&ctxt->rw_list, &cc->cc_rwctxts);
365 cc->cc_sqecount += ret;
366 if (write_len == seg_length - info->wi_seg_off) {
367 seg += 4;
368 info->wi_seg_no++;
369 info->wi_seg_off = 0;
370 } else {
371 info->wi_seg_off += write_len;
372 }
373 remaining -= write_len;
374 } while (remaining);
375
376 return 0;
377
378out_overflow:
379 dprintk("svcrdma: inadequate space in Write chunk (%u)\n",
380 info->wi_nsegs);
381 return -E2BIG;
382
383out_noctx:
384 dprintk("svcrdma: no R/W ctxs available\n");
385 return -ENOMEM;
386
387out_initerr:
388 svc_rdma_put_rw_ctxt(rdma, ctxt);
389 pr_err("svcrdma: failed to map pagelist (%d)\n", ret);
390 return -EIO;
391}
392
393/* Send one of an xdr_buf's kvecs by itself. To send a Reply
394 * chunk, the whole RPC Reply is written back to the client.
395 * This function writes either the head or tail of the xdr_buf
396 * containing the Reply.
397 */
398static int svc_rdma_send_xdr_kvec(struct svc_rdma_write_info *info,
399 struct kvec *vec)
400{
401 info->wi_base = vec->iov_base;
402 return svc_rdma_build_writes(info, svc_rdma_vec_to_sg,
403 vec->iov_len);
404}
405
406/* Send an xdr_buf's page list by itself. A Write chunk is
407 * just the page list. a Reply chunk is the head, page list,
408 * and tail. This function is shared between the two types
409 * of chunk.
410 */
411static int svc_rdma_send_xdr_pagelist(struct svc_rdma_write_info *info,
412 struct xdr_buf *xdr)
413{
414 info->wi_xdr = xdr;
415 info->wi_next_off = 0;
416 return svc_rdma_build_writes(info, svc_rdma_pagelist_to_sg,
417 xdr->page_len);
418}
419
420/**
421 * svc_rdma_send_write_chunk - Write all segments in a Write chunk
422 * @rdma: controlling RDMA transport
423 * @wr_ch: Write chunk provided by client
424 * @xdr: xdr_buf containing the data payload
425 *
426 * Returns a non-negative number of bytes the chunk consumed, or
427 * %-E2BIG if the payload was larger than the Write chunk,
428 * %-ENOMEM if rdma_rw context pool was exhausted,
429 * %-ENOTCONN if posting failed (connection is lost),
430 * %-EIO if rdma_rw initialization failed (DMA mapping, etc).
431 */
432int svc_rdma_send_write_chunk(struct svcxprt_rdma *rdma, __be32 *wr_ch,
433 struct xdr_buf *xdr)
434{
435 struct svc_rdma_write_info *info;
436 int ret;
437
438 if (!xdr->page_len)
439 return 0;
440
441 info = svc_rdma_write_info_alloc(rdma, wr_ch);
442 if (!info)
443 return -ENOMEM;
444
445 ret = svc_rdma_send_xdr_pagelist(info, xdr);
446 if (ret < 0)
447 goto out_err;
448
449 ret = svc_rdma_post_chunk_ctxt(&info->wi_cc);
450 if (ret < 0)
451 goto out_err;
452 return xdr->page_len;
453
454out_err:
455 svc_rdma_write_info_free(info);
456 return ret;
457}
458
459/**
460 * svc_rdma_send_reply_chunk - Write all segments in the Reply chunk
461 * @rdma: controlling RDMA transport
462 * @rp_ch: Reply chunk provided by client
463 * @writelist: true if client provided a Write list
464 * @xdr: xdr_buf containing an RPC Reply
465 *
466 * Returns a non-negative number of bytes the chunk consumed, or
467 * %-E2BIG if the payload was larger than the Reply chunk,
468 * %-ENOMEM if rdma_rw context pool was exhausted,
469 * %-ENOTCONN if posting failed (connection is lost),
470 * %-EIO if rdma_rw initialization failed (DMA mapping, etc).
471 */
472int svc_rdma_send_reply_chunk(struct svcxprt_rdma *rdma, __be32 *rp_ch,
473 bool writelist, struct xdr_buf *xdr)
474{
475 struct svc_rdma_write_info *info;
476 int consumed, ret;
477
478 info = svc_rdma_write_info_alloc(rdma, rp_ch);
479 if (!info)
480 return -ENOMEM;
481
482 ret = svc_rdma_send_xdr_kvec(info, &xdr->head[0]);
483 if (ret < 0)
484 goto out_err;
485 consumed = xdr->head[0].iov_len;
486
487 /* Send the page list in the Reply chunk only if the
488 * client did not provide Write chunks.
489 */
490 if (!writelist && xdr->page_len) {
491 ret = svc_rdma_send_xdr_pagelist(info, xdr);
492 if (ret < 0)
493 goto out_err;
494 consumed += xdr->page_len;
495 }
496
497 if (xdr->tail[0].iov_len) {
498 ret = svc_rdma_send_xdr_kvec(info, &xdr->tail[0]);
499 if (ret < 0)
500 goto out_err;
501 consumed += xdr->tail[0].iov_len;
502 }
503
504 ret = svc_rdma_post_chunk_ctxt(&info->wi_cc);
505 if (ret < 0)
506 goto out_err;
507 return consumed;
508
509out_err:
510 svc_rdma_write_info_free(info);
511 return ret;
512}
diff --git a/net/sunrpc/xprtrdma/svc_rdma_transport.c b/net/sunrpc/xprtrdma/svc_rdma_transport.c
index e1097cc6d1eb..b25c50992a95 100644
--- a/net/sunrpc/xprtrdma/svc_rdma_transport.c
+++ b/net/sunrpc/xprtrdma/svc_rdma_transport.c
@@ -561,6 +561,7 @@ static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *serv,
561 INIT_LIST_HEAD(&cma_xprt->sc_read_complete_q); 561 INIT_LIST_HEAD(&cma_xprt->sc_read_complete_q);
562 INIT_LIST_HEAD(&cma_xprt->sc_frmr_q); 562 INIT_LIST_HEAD(&cma_xprt->sc_frmr_q);
563 INIT_LIST_HEAD(&cma_xprt->sc_ctxts); 563 INIT_LIST_HEAD(&cma_xprt->sc_ctxts);
564 INIT_LIST_HEAD(&cma_xprt->sc_rw_ctxts);
564 INIT_LIST_HEAD(&cma_xprt->sc_maps); 565 INIT_LIST_HEAD(&cma_xprt->sc_maps);
565 init_waitqueue_head(&cma_xprt->sc_send_wait); 566 init_waitqueue_head(&cma_xprt->sc_send_wait);
566 567
@@ -568,6 +569,7 @@ static struct svcxprt_rdma *rdma_create_xprt(struct svc_serv *serv,
568 spin_lock_init(&cma_xprt->sc_rq_dto_lock); 569 spin_lock_init(&cma_xprt->sc_rq_dto_lock);
569 spin_lock_init(&cma_xprt->sc_frmr_q_lock); 570 spin_lock_init(&cma_xprt->sc_frmr_q_lock);
570 spin_lock_init(&cma_xprt->sc_ctxt_lock); 571 spin_lock_init(&cma_xprt->sc_ctxt_lock);
572 spin_lock_init(&cma_xprt->sc_rw_ctxt_lock);
571 spin_lock_init(&cma_xprt->sc_map_lock); 573 spin_lock_init(&cma_xprt->sc_map_lock);
572 574
573 /* 575 /*
@@ -999,6 +1001,7 @@ static struct svc_xprt *svc_rdma_accept(struct svc_xprt *xprt)
999 newxprt, newxprt->sc_cm_id); 1001 newxprt, newxprt->sc_cm_id);
1000 1002
1001 dev = newxprt->sc_cm_id->device; 1003 dev = newxprt->sc_cm_id->device;
1004 newxprt->sc_port_num = newxprt->sc_cm_id->port_num;
1002 1005
1003 /* Qualify the transport resource defaults with the 1006 /* Qualify the transport resource defaults with the
1004 * capabilities of this particular device */ 1007 * capabilities of this particular device */
@@ -1248,6 +1251,7 @@ static void __svc_rdma_free(struct work_struct *work)
1248 } 1251 }
1249 1252
1250 rdma_dealloc_frmr_q(rdma); 1253 rdma_dealloc_frmr_q(rdma);
1254 svc_rdma_destroy_rw_ctxts(rdma);
1251 svc_rdma_destroy_ctxts(rdma); 1255 svc_rdma_destroy_ctxts(rdma);
1252 svc_rdma_destroy_maps(rdma); 1256 svc_rdma_destroy_maps(rdma);
1253 1257