aboutsummaryrefslogtreecommitdiffstats
path: root/net/rds/iw_recv.c
diff options
context:
space:
mode:
authorAndy Grover <andy.grover@oracle.com>2009-02-24 10:30:36 -0500
committerDavid S. Miller <davem@davemloft.net>2009-02-27 02:39:33 -0500
commitfcd8b7c0ecf792dd824b2edcd63cb2c08563c340 (patch)
tree9ba10fe626cc9129f9138bc81b1fa61c74097e4f /net/rds/iw_recv.c
parente6babe4cc4ce48577d743cc0de19a214f2527956 (diff)
RDS: Add iWARP support
Support for iWARP NICs is implemented as a separate RDS transport from IB. The code, however, is very similar to IB (it was forked, basically.) so let's keep it in one changeset. The reason for this duplicationis that despite its similarity to IB, there are a number of places where it has different semantics. iwarp zcopy support is still under development, and giving it its own sandbox ensures that IB code isn't disrupted while iwarp changes. Over time these transports will re-converge. Signed-off-by: Andy Grover <andy.grover@oracle.com> Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net/rds/iw_recv.c')
-rw-r--r--net/rds/iw_recv.c869
1 files changed, 869 insertions, 0 deletions
diff --git a/net/rds/iw_recv.c b/net/rds/iw_recv.c
new file mode 100644
index 000000000000..a1931f0027a2
--- /dev/null
+++ b/net/rds/iw_recv.c
@@ -0,0 +1,869 @@
1/*
2 * Copyright (c) 2006 Oracle. All rights reserved.
3 *
4 * This software is available to you under a choice of one of two
5 * licenses. You may choose to be licensed under the terms of the GNU
6 * General Public License (GPL) Version 2, available from the file
7 * COPYING in the main directory of this source tree, or the
8 * OpenIB.org BSD license below:
9 *
10 * Redistribution and use in source and binary forms, with or
11 * without modification, are permitted provided that the following
12 * conditions are met:
13 *
14 * - Redistributions of source code must retain the above
15 * copyright notice, this list of conditions and the following
16 * disclaimer.
17 *
18 * - Redistributions in binary form must reproduce the above
19 * copyright notice, this list of conditions and the following
20 * disclaimer in the documentation and/or other materials
21 * provided with the distribution.
22 *
23 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
24 * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
25 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
26 * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
27 * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
28 * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
29 * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
30 * SOFTWARE.
31 *
32 */
33#include <linux/kernel.h>
34#include <linux/pci.h>
35#include <linux/dma-mapping.h>
36#include <rdma/rdma_cm.h>
37
38#include "rds.h"
39#include "iw.h"
40
41static struct kmem_cache *rds_iw_incoming_slab;
42static struct kmem_cache *rds_iw_frag_slab;
43static atomic_t rds_iw_allocation = ATOMIC_INIT(0);
44
45static void rds_iw_frag_drop_page(struct rds_page_frag *frag)
46{
47 rdsdebug("frag %p page %p\n", frag, frag->f_page);
48 __free_page(frag->f_page);
49 frag->f_page = NULL;
50}
51
52static void rds_iw_frag_free(struct rds_page_frag *frag)
53{
54 rdsdebug("frag %p page %p\n", frag, frag->f_page);
55 BUG_ON(frag->f_page != NULL);
56 kmem_cache_free(rds_iw_frag_slab, frag);
57}
58
59/*
60 * We map a page at a time. Its fragments are posted in order. This
61 * is called in fragment order as the fragments get send completion events.
62 * Only the last frag in the page performs the unmapping.
63 *
64 * It's OK for ring cleanup to call this in whatever order it likes because
65 * DMA is not in flight and so we can unmap while other ring entries still
66 * hold page references in their frags.
67 */
68static void rds_iw_recv_unmap_page(struct rds_iw_connection *ic,
69 struct rds_iw_recv_work *recv)
70{
71 struct rds_page_frag *frag = recv->r_frag;
72
73 rdsdebug("recv %p frag %p page %p\n", recv, frag, frag->f_page);
74 if (frag->f_mapped)
75 ib_dma_unmap_page(ic->i_cm_id->device,
76 frag->f_mapped,
77 RDS_FRAG_SIZE, DMA_FROM_DEVICE);
78 frag->f_mapped = 0;
79}
80
81void rds_iw_recv_init_ring(struct rds_iw_connection *ic)
82{
83 struct rds_iw_recv_work *recv;
84 u32 i;
85
86 for (i = 0, recv = ic->i_recvs; i < ic->i_recv_ring.w_nr; i++, recv++) {
87 struct ib_sge *sge;
88
89 recv->r_iwinc = NULL;
90 recv->r_frag = NULL;
91
92 recv->r_wr.next = NULL;
93 recv->r_wr.wr_id = i;
94 recv->r_wr.sg_list = recv->r_sge;
95 recv->r_wr.num_sge = RDS_IW_RECV_SGE;
96
97 sge = rds_iw_data_sge(ic, recv->r_sge);
98 sge->addr = 0;
99 sge->length = RDS_FRAG_SIZE;
100 sge->lkey = 0;
101
102 sge = rds_iw_header_sge(ic, recv->r_sge);
103 sge->addr = ic->i_recv_hdrs_dma + (i * sizeof(struct rds_header));
104 sge->length = sizeof(struct rds_header);
105 sge->lkey = 0;
106 }
107}
108
109static void rds_iw_recv_clear_one(struct rds_iw_connection *ic,
110 struct rds_iw_recv_work *recv)
111{
112 if (recv->r_iwinc) {
113 rds_inc_put(&recv->r_iwinc->ii_inc);
114 recv->r_iwinc = NULL;
115 }
116 if (recv->r_frag) {
117 rds_iw_recv_unmap_page(ic, recv);
118 if (recv->r_frag->f_page)
119 rds_iw_frag_drop_page(recv->r_frag);
120 rds_iw_frag_free(recv->r_frag);
121 recv->r_frag = NULL;
122 }
123}
124
125void rds_iw_recv_clear_ring(struct rds_iw_connection *ic)
126{
127 u32 i;
128
129 for (i = 0; i < ic->i_recv_ring.w_nr; i++)
130 rds_iw_recv_clear_one(ic, &ic->i_recvs[i]);
131
132 if (ic->i_frag.f_page)
133 rds_iw_frag_drop_page(&ic->i_frag);
134}
135
136static int rds_iw_recv_refill_one(struct rds_connection *conn,
137 struct rds_iw_recv_work *recv,
138 gfp_t kptr_gfp, gfp_t page_gfp)
139{
140 struct rds_iw_connection *ic = conn->c_transport_data;
141 dma_addr_t dma_addr;
142 struct ib_sge *sge;
143 int ret = -ENOMEM;
144
145 if (recv->r_iwinc == NULL) {
146 if (atomic_read(&rds_iw_allocation) >= rds_iw_sysctl_max_recv_allocation) {
147 rds_iw_stats_inc(s_iw_rx_alloc_limit);
148 goto out;
149 }
150 recv->r_iwinc = kmem_cache_alloc(rds_iw_incoming_slab,
151 kptr_gfp);
152 if (recv->r_iwinc == NULL)
153 goto out;
154 atomic_inc(&rds_iw_allocation);
155 INIT_LIST_HEAD(&recv->r_iwinc->ii_frags);
156 rds_inc_init(&recv->r_iwinc->ii_inc, conn, conn->c_faddr);
157 }
158
159 if (recv->r_frag == NULL) {
160 recv->r_frag = kmem_cache_alloc(rds_iw_frag_slab, kptr_gfp);
161 if (recv->r_frag == NULL)
162 goto out;
163 INIT_LIST_HEAD(&recv->r_frag->f_item);
164 recv->r_frag->f_page = NULL;
165 }
166
167 if (ic->i_frag.f_page == NULL) {
168 ic->i_frag.f_page = alloc_page(page_gfp);
169 if (ic->i_frag.f_page == NULL)
170 goto out;
171 ic->i_frag.f_offset = 0;
172 }
173
174 dma_addr = ib_dma_map_page(ic->i_cm_id->device,
175 ic->i_frag.f_page,
176 ic->i_frag.f_offset,
177 RDS_FRAG_SIZE,
178 DMA_FROM_DEVICE);
179 if (ib_dma_mapping_error(ic->i_cm_id->device, dma_addr))
180 goto out;
181
182 /*
183 * Once we get the RDS_PAGE_LAST_OFF frag then rds_iw_frag_unmap()
184 * must be called on this recv. This happens as completions hit
185 * in order or on connection shutdown.
186 */
187 recv->r_frag->f_page = ic->i_frag.f_page;
188 recv->r_frag->f_offset = ic->i_frag.f_offset;
189 recv->r_frag->f_mapped = dma_addr;
190
191 sge = rds_iw_data_sge(ic, recv->r_sge);
192 sge->addr = dma_addr;
193 sge->length = RDS_FRAG_SIZE;
194
195 sge = rds_iw_header_sge(ic, recv->r_sge);
196 sge->addr = ic->i_recv_hdrs_dma + (recv - ic->i_recvs) * sizeof(struct rds_header);
197 sge->length = sizeof(struct rds_header);
198
199 get_page(recv->r_frag->f_page);
200
201 if (ic->i_frag.f_offset < RDS_PAGE_LAST_OFF) {
202 ic->i_frag.f_offset += RDS_FRAG_SIZE;
203 } else {
204 put_page(ic->i_frag.f_page);
205 ic->i_frag.f_page = NULL;
206 ic->i_frag.f_offset = 0;
207 }
208
209 ret = 0;
210out:
211 return ret;
212}
213
214/*
215 * This tries to allocate and post unused work requests after making sure that
216 * they have all the allocations they need to queue received fragments into
217 * sockets. The i_recv_mutex is held here so that ring_alloc and _unalloc
218 * pairs don't go unmatched.
219 *
220 * -1 is returned if posting fails due to temporary resource exhaustion.
221 */
222int rds_iw_recv_refill(struct rds_connection *conn, gfp_t kptr_gfp,
223 gfp_t page_gfp, int prefill)
224{
225 struct rds_iw_connection *ic = conn->c_transport_data;
226 struct rds_iw_recv_work *recv;
227 struct ib_recv_wr *failed_wr;
228 unsigned int posted = 0;
229 int ret = 0;
230 u32 pos;
231
232 while ((prefill || rds_conn_up(conn))
233 && rds_iw_ring_alloc(&ic->i_recv_ring, 1, &pos)) {
234 if (pos >= ic->i_recv_ring.w_nr) {
235 printk(KERN_NOTICE "Argh - ring alloc returned pos=%u\n",
236 pos);
237 ret = -EINVAL;
238 break;
239 }
240
241 recv = &ic->i_recvs[pos];
242 ret = rds_iw_recv_refill_one(conn, recv, kptr_gfp, page_gfp);
243 if (ret) {
244 ret = -1;
245 break;
246 }
247
248 /* XXX when can this fail? */
249 ret = ib_post_recv(ic->i_cm_id->qp, &recv->r_wr, &failed_wr);
250 rdsdebug("recv %p iwinc %p page %p addr %lu ret %d\n", recv,
251 recv->r_iwinc, recv->r_frag->f_page,
252 (long) recv->r_frag->f_mapped, ret);
253 if (ret) {
254 rds_iw_conn_error(conn, "recv post on "
255 "%pI4 returned %d, disconnecting and "
256 "reconnecting\n", &conn->c_faddr,
257 ret);
258 ret = -1;
259 break;
260 }
261
262 posted++;
263 }
264
265 /* We're doing flow control - update the window. */
266 if (ic->i_flowctl && posted)
267 rds_iw_advertise_credits(conn, posted);
268
269 if (ret)
270 rds_iw_ring_unalloc(&ic->i_recv_ring, 1);
271 return ret;
272}
273
274void rds_iw_inc_purge(struct rds_incoming *inc)
275{
276 struct rds_iw_incoming *iwinc;
277 struct rds_page_frag *frag;
278 struct rds_page_frag *pos;
279
280 iwinc = container_of(inc, struct rds_iw_incoming, ii_inc);
281 rdsdebug("purging iwinc %p inc %p\n", iwinc, inc);
282
283 list_for_each_entry_safe(frag, pos, &iwinc->ii_frags, f_item) {
284 list_del_init(&frag->f_item);
285 rds_iw_frag_drop_page(frag);
286 rds_iw_frag_free(frag);
287 }
288}
289
290void rds_iw_inc_free(struct rds_incoming *inc)
291{
292 struct rds_iw_incoming *iwinc;
293
294 iwinc = container_of(inc, struct rds_iw_incoming, ii_inc);
295
296 rds_iw_inc_purge(inc);
297 rdsdebug("freeing iwinc %p inc %p\n", iwinc, inc);
298 BUG_ON(!list_empty(&iwinc->ii_frags));
299 kmem_cache_free(rds_iw_incoming_slab, iwinc);
300 atomic_dec(&rds_iw_allocation);
301 BUG_ON(atomic_read(&rds_iw_allocation) < 0);
302}
303
304int rds_iw_inc_copy_to_user(struct rds_incoming *inc, struct iovec *first_iov,
305 size_t size)
306{
307 struct rds_iw_incoming *iwinc;
308 struct rds_page_frag *frag;
309 struct iovec *iov = first_iov;
310 unsigned long to_copy;
311 unsigned long frag_off = 0;
312 unsigned long iov_off = 0;
313 int copied = 0;
314 int ret;
315 u32 len;
316
317 iwinc = container_of(inc, struct rds_iw_incoming, ii_inc);
318 frag = list_entry(iwinc->ii_frags.next, struct rds_page_frag, f_item);
319 len = be32_to_cpu(inc->i_hdr.h_len);
320
321 while (copied < size && copied < len) {
322 if (frag_off == RDS_FRAG_SIZE) {
323 frag = list_entry(frag->f_item.next,
324 struct rds_page_frag, f_item);
325 frag_off = 0;
326 }
327 while (iov_off == iov->iov_len) {
328 iov_off = 0;
329 iov++;
330 }
331
332 to_copy = min(iov->iov_len - iov_off, RDS_FRAG_SIZE - frag_off);
333 to_copy = min_t(size_t, to_copy, size - copied);
334 to_copy = min_t(unsigned long, to_copy, len - copied);
335
336 rdsdebug("%lu bytes to user [%p, %zu] + %lu from frag "
337 "[%p, %lu] + %lu\n",
338 to_copy, iov->iov_base, iov->iov_len, iov_off,
339 frag->f_page, frag->f_offset, frag_off);
340
341 /* XXX needs + offset for multiple recvs per page */
342 ret = rds_page_copy_to_user(frag->f_page,
343 frag->f_offset + frag_off,
344 iov->iov_base + iov_off,
345 to_copy);
346 if (ret) {
347 copied = ret;
348 break;
349 }
350
351 iov_off += to_copy;
352 frag_off += to_copy;
353 copied += to_copy;
354 }
355
356 return copied;
357}
358
359/* ic starts out kzalloc()ed */
360void rds_iw_recv_init_ack(struct rds_iw_connection *ic)
361{
362 struct ib_send_wr *wr = &ic->i_ack_wr;
363 struct ib_sge *sge = &ic->i_ack_sge;
364
365 sge->addr = ic->i_ack_dma;
366 sge->length = sizeof(struct rds_header);
367 sge->lkey = rds_iw_local_dma_lkey(ic);
368
369 wr->sg_list = sge;
370 wr->num_sge = 1;
371 wr->opcode = IB_WR_SEND;
372 wr->wr_id = RDS_IW_ACK_WR_ID;
373 wr->send_flags = IB_SEND_SIGNALED | IB_SEND_SOLICITED;
374}
375
376/*
377 * You'd think that with reliable IB connections you wouldn't need to ack
378 * messages that have been received. The problem is that IB hardware generates
379 * an ack message before it has DMAed the message into memory. This creates a
380 * potential message loss if the HCA is disabled for any reason between when it
381 * sends the ack and before the message is DMAed and processed. This is only a
382 * potential issue if another HCA is available for fail-over.
383 *
384 * When the remote host receives our ack they'll free the sent message from
385 * their send queue. To decrease the latency of this we always send an ack
386 * immediately after we've received messages.
387 *
388 * For simplicity, we only have one ack in flight at a time. This puts
389 * pressure on senders to have deep enough send queues to absorb the latency of
390 * a single ack frame being in flight. This might not be good enough.
391 *
392 * This is implemented by have a long-lived send_wr and sge which point to a
393 * statically allocated ack frame. This ack wr does not fall under the ring
394 * accounting that the tx and rx wrs do. The QP attribute specifically makes
395 * room for it beyond the ring size. Send completion notices its special
396 * wr_id and avoids working with the ring in that case.
397 */
398static void rds_iw_set_ack(struct rds_iw_connection *ic, u64 seq,
399 int ack_required)
400{
401 rds_iw_set_64bit(&ic->i_ack_next, seq);
402 if (ack_required) {
403 smp_mb__before_clear_bit();
404 set_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
405 }
406}
407
408static u64 rds_iw_get_ack(struct rds_iw_connection *ic)
409{
410 clear_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
411 smp_mb__after_clear_bit();
412
413 return ic->i_ack_next;
414}
415
416static void rds_iw_send_ack(struct rds_iw_connection *ic, unsigned int adv_credits)
417{
418 struct rds_header *hdr = ic->i_ack;
419 struct ib_send_wr *failed_wr;
420 u64 seq;
421 int ret;
422
423 seq = rds_iw_get_ack(ic);
424
425 rdsdebug("send_ack: ic %p ack %llu\n", ic, (unsigned long long) seq);
426 rds_message_populate_header(hdr, 0, 0, 0);
427 hdr->h_ack = cpu_to_be64(seq);
428 hdr->h_credit = adv_credits;
429 rds_message_make_checksum(hdr);
430 ic->i_ack_queued = jiffies;
431
432 ret = ib_post_send(ic->i_cm_id->qp, &ic->i_ack_wr, &failed_wr);
433 if (unlikely(ret)) {
434 /* Failed to send. Release the WR, and
435 * force another ACK.
436 */
437 clear_bit(IB_ACK_IN_FLIGHT, &ic->i_ack_flags);
438 set_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
439
440 rds_iw_stats_inc(s_iw_ack_send_failure);
441 /* Need to finesse this later. */
442 BUG();
443 } else
444 rds_iw_stats_inc(s_iw_ack_sent);
445}
446
447/*
448 * There are 3 ways of getting acknowledgements to the peer:
449 * 1. We call rds_iw_attempt_ack from the recv completion handler
450 * to send an ACK-only frame.
451 * However, there can be only one such frame in the send queue
452 * at any time, so we may have to postpone it.
453 * 2. When another (data) packet is transmitted while there's
454 * an ACK in the queue, we piggyback the ACK sequence number
455 * on the data packet.
456 * 3. If the ACK WR is done sending, we get called from the
457 * send queue completion handler, and check whether there's
458 * another ACK pending (postponed because the WR was on the
459 * queue). If so, we transmit it.
460 *
461 * We maintain 2 variables:
462 * - i_ack_flags, which keeps track of whether the ACK WR
463 * is currently in the send queue or not (IB_ACK_IN_FLIGHT)
464 * - i_ack_next, which is the last sequence number we received
465 *
466 * Potentially, send queue and receive queue handlers can run concurrently.
467 *
468 * Reconnecting complicates this picture just slightly. When we
469 * reconnect, we may be seeing duplicate packets. The peer
470 * is retransmitting them, because it hasn't seen an ACK for
471 * them. It is important that we ACK these.
472 *
473 * ACK mitigation adds a header flag "ACK_REQUIRED"; any packet with
474 * this flag set *MUST* be acknowledged immediately.
475 */
476
477/*
478 * When we get here, we're called from the recv queue handler.
479 * Check whether we ought to transmit an ACK.
480 */
481void rds_iw_attempt_ack(struct rds_iw_connection *ic)
482{
483 unsigned int adv_credits;
484
485 if (!test_bit(IB_ACK_REQUESTED, &ic->i_ack_flags))
486 return;
487
488 if (test_and_set_bit(IB_ACK_IN_FLIGHT, &ic->i_ack_flags)) {
489 rds_iw_stats_inc(s_iw_ack_send_delayed);
490 return;
491 }
492
493 /* Can we get a send credit? */
494 if (!rds_iw_send_grab_credits(ic, 1, &adv_credits, 0)) {
495 rds_iw_stats_inc(s_iw_tx_throttle);
496 clear_bit(IB_ACK_IN_FLIGHT, &ic->i_ack_flags);
497 return;
498 }
499
500 clear_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
501 rds_iw_send_ack(ic, adv_credits);
502}
503
504/*
505 * We get here from the send completion handler, when the
506 * adapter tells us the ACK frame was sent.
507 */
508void rds_iw_ack_send_complete(struct rds_iw_connection *ic)
509{
510 clear_bit(IB_ACK_IN_FLIGHT, &ic->i_ack_flags);
511 rds_iw_attempt_ack(ic);
512}
513
514/*
515 * This is called by the regular xmit code when it wants to piggyback
516 * an ACK on an outgoing frame.
517 */
518u64 rds_iw_piggyb_ack(struct rds_iw_connection *ic)
519{
520 if (test_and_clear_bit(IB_ACK_REQUESTED, &ic->i_ack_flags))
521 rds_iw_stats_inc(s_iw_ack_send_piggybacked);
522 return rds_iw_get_ack(ic);
523}
524
525/*
526 * It's kind of lame that we're copying from the posted receive pages into
527 * long-lived bitmaps. We could have posted the bitmaps and rdma written into
528 * them. But receiving new congestion bitmaps should be a *rare* event, so
529 * hopefully we won't need to invest that complexity in making it more
530 * efficient. By copying we can share a simpler core with TCP which has to
531 * copy.
532 */
533static void rds_iw_cong_recv(struct rds_connection *conn,
534 struct rds_iw_incoming *iwinc)
535{
536 struct rds_cong_map *map;
537 unsigned int map_off;
538 unsigned int map_page;
539 struct rds_page_frag *frag;
540 unsigned long frag_off;
541 unsigned long to_copy;
542 unsigned long copied;
543 uint64_t uncongested = 0;
544 void *addr;
545
546 /* catch completely corrupt packets */
547 if (be32_to_cpu(iwinc->ii_inc.i_hdr.h_len) != RDS_CONG_MAP_BYTES)
548 return;
549
550 map = conn->c_fcong;
551 map_page = 0;
552 map_off = 0;
553
554 frag = list_entry(iwinc->ii_frags.next, struct rds_page_frag, f_item);
555 frag_off = 0;
556
557 copied = 0;
558
559 while (copied < RDS_CONG_MAP_BYTES) {
560 uint64_t *src, *dst;
561 unsigned int k;
562
563 to_copy = min(RDS_FRAG_SIZE - frag_off, PAGE_SIZE - map_off);
564 BUG_ON(to_copy & 7); /* Must be 64bit aligned. */
565
566 addr = kmap_atomic(frag->f_page, KM_SOFTIRQ0);
567
568 src = addr + frag_off;
569 dst = (void *)map->m_page_addrs[map_page] + map_off;
570 for (k = 0; k < to_copy; k += 8) {
571 /* Record ports that became uncongested, ie
572 * bits that changed from 0 to 1. */
573 uncongested |= ~(*src) & *dst;
574 *dst++ = *src++;
575 }
576 kunmap_atomic(addr, KM_SOFTIRQ0);
577
578 copied += to_copy;
579
580 map_off += to_copy;
581 if (map_off == PAGE_SIZE) {
582 map_off = 0;
583 map_page++;
584 }
585
586 frag_off += to_copy;
587 if (frag_off == RDS_FRAG_SIZE) {
588 frag = list_entry(frag->f_item.next,
589 struct rds_page_frag, f_item);
590 frag_off = 0;
591 }
592 }
593
594 /* the congestion map is in little endian order */
595 uncongested = le64_to_cpu(uncongested);
596
597 rds_cong_map_updated(map, uncongested);
598}
599
600/*
601 * Rings are posted with all the allocations they'll need to queue the
602 * incoming message to the receiving socket so this can't fail.
603 * All fragments start with a header, so we can make sure we're not receiving
604 * garbage, and we can tell a small 8 byte fragment from an ACK frame.
605 */
606struct rds_iw_ack_state {
607 u64 ack_next;
608 u64 ack_recv;
609 unsigned int ack_required:1;
610 unsigned int ack_next_valid:1;
611 unsigned int ack_recv_valid:1;
612};
613
614static void rds_iw_process_recv(struct rds_connection *conn,
615 struct rds_iw_recv_work *recv, u32 byte_len,
616 struct rds_iw_ack_state *state)
617{
618 struct rds_iw_connection *ic = conn->c_transport_data;
619 struct rds_iw_incoming *iwinc = ic->i_iwinc;
620 struct rds_header *ihdr, *hdr;
621
622 /* XXX shut down the connection if port 0,0 are seen? */
623
624 rdsdebug("ic %p iwinc %p recv %p byte len %u\n", ic, iwinc, recv,
625 byte_len);
626
627 if (byte_len < sizeof(struct rds_header)) {
628 rds_iw_conn_error(conn, "incoming message "
629 "from %pI4 didn't inclue a "
630 "header, disconnecting and "
631 "reconnecting\n",
632 &conn->c_faddr);
633 return;
634 }
635 byte_len -= sizeof(struct rds_header);
636
637 ihdr = &ic->i_recv_hdrs[recv - ic->i_recvs];
638
639 /* Validate the checksum. */
640 if (!rds_message_verify_checksum(ihdr)) {
641 rds_iw_conn_error(conn, "incoming message "
642 "from %pI4 has corrupted header - "
643 "forcing a reconnect\n",
644 &conn->c_faddr);
645 rds_stats_inc(s_recv_drop_bad_checksum);
646 return;
647 }
648
649 /* Process the ACK sequence which comes with every packet */
650 state->ack_recv = be64_to_cpu(ihdr->h_ack);
651 state->ack_recv_valid = 1;
652
653 /* Process the credits update if there was one */
654 if (ihdr->h_credit)
655 rds_iw_send_add_credits(conn, ihdr->h_credit);
656
657 if (ihdr->h_sport == 0 && ihdr->h_dport == 0 && byte_len == 0) {
658 /* This is an ACK-only packet. The fact that it gets
659 * special treatment here is that historically, ACKs
660 * were rather special beasts.
661 */
662 rds_iw_stats_inc(s_iw_ack_received);
663
664 /*
665 * Usually the frags make their way on to incs and are then freed as
666 * the inc is freed. We don't go that route, so we have to drop the
667 * page ref ourselves. We can't just leave the page on the recv
668 * because that confuses the dma mapping of pages and each recv's use
669 * of a partial page. We can leave the frag, though, it will be
670 * reused.
671 *
672 * FIXME: Fold this into the code path below.
673 */
674 rds_iw_frag_drop_page(recv->r_frag);
675 return;
676 }
677
678 /*
679 * If we don't already have an inc on the connection then this
680 * fragment has a header and starts a message.. copy its header
681 * into the inc and save the inc so we can hang upcoming fragments
682 * off its list.
683 */
684 if (iwinc == NULL) {
685 iwinc = recv->r_iwinc;
686 recv->r_iwinc = NULL;
687 ic->i_iwinc = iwinc;
688
689 hdr = &iwinc->ii_inc.i_hdr;
690 memcpy(hdr, ihdr, sizeof(*hdr));
691 ic->i_recv_data_rem = be32_to_cpu(hdr->h_len);
692
693 rdsdebug("ic %p iwinc %p rem %u flag 0x%x\n", ic, iwinc,
694 ic->i_recv_data_rem, hdr->h_flags);
695 } else {
696 hdr = &iwinc->ii_inc.i_hdr;
697 /* We can't just use memcmp here; fragments of a
698 * single message may carry different ACKs */
699 if (hdr->h_sequence != ihdr->h_sequence
700 || hdr->h_len != ihdr->h_len
701 || hdr->h_sport != ihdr->h_sport
702 || hdr->h_dport != ihdr->h_dport) {
703 rds_iw_conn_error(conn,
704 "fragment header mismatch; forcing reconnect\n");
705 return;
706 }
707 }
708
709 list_add_tail(&recv->r_frag->f_item, &iwinc->ii_frags);
710 recv->r_frag = NULL;
711
712 if (ic->i_recv_data_rem > RDS_FRAG_SIZE)
713 ic->i_recv_data_rem -= RDS_FRAG_SIZE;
714 else {
715 ic->i_recv_data_rem = 0;
716 ic->i_iwinc = NULL;
717
718 if (iwinc->ii_inc.i_hdr.h_flags == RDS_FLAG_CONG_BITMAP)
719 rds_iw_cong_recv(conn, iwinc);
720 else {
721 rds_recv_incoming(conn, conn->c_faddr, conn->c_laddr,
722 &iwinc->ii_inc, GFP_ATOMIC,
723 KM_SOFTIRQ0);
724 state->ack_next = be64_to_cpu(hdr->h_sequence);
725 state->ack_next_valid = 1;
726 }
727
728 /* Evaluate the ACK_REQUIRED flag *after* we received
729 * the complete frame, and after bumping the next_rx
730 * sequence. */
731 if (hdr->h_flags & RDS_FLAG_ACK_REQUIRED) {
732 rds_stats_inc(s_recv_ack_required);
733 state->ack_required = 1;
734 }
735
736 rds_inc_put(&iwinc->ii_inc);
737 }
738}
739
740/*
741 * Plucking the oldest entry from the ring can be done concurrently with
742 * the thread refilling the ring. Each ring operation is protected by
743 * spinlocks and the transient state of refilling doesn't change the
744 * recording of which entry is oldest.
745 *
746 * This relies on IB only calling one cq comp_handler for each cq so that
747 * there will only be one caller of rds_recv_incoming() per RDS connection.
748 */
749void rds_iw_recv_cq_comp_handler(struct ib_cq *cq, void *context)
750{
751 struct rds_connection *conn = context;
752 struct rds_iw_connection *ic = conn->c_transport_data;
753 struct ib_wc wc;
754 struct rds_iw_ack_state state = { 0, };
755 struct rds_iw_recv_work *recv;
756
757 rdsdebug("conn %p cq %p\n", conn, cq);
758
759 rds_iw_stats_inc(s_iw_rx_cq_call);
760
761 ib_req_notify_cq(cq, IB_CQ_SOLICITED);
762
763 while (ib_poll_cq(cq, 1, &wc) > 0) {
764 rdsdebug("wc wr_id 0x%llx status %u byte_len %u imm_data %u\n",
765 (unsigned long long)wc.wr_id, wc.status, wc.byte_len,
766 be32_to_cpu(wc.ex.imm_data));
767 rds_iw_stats_inc(s_iw_rx_cq_event);
768
769 recv = &ic->i_recvs[rds_iw_ring_oldest(&ic->i_recv_ring)];
770
771 rds_iw_recv_unmap_page(ic, recv);
772
773 /*
774 * Also process recvs in connecting state because it is possible
775 * to get a recv completion _before_ the rdmacm ESTABLISHED
776 * event is processed.
777 */
778 if (rds_conn_up(conn) || rds_conn_connecting(conn)) {
779 /* We expect errors as the qp is drained during shutdown */
780 if (wc.status == IB_WC_SUCCESS) {
781 rds_iw_process_recv(conn, recv, wc.byte_len, &state);
782 } else {
783 rds_iw_conn_error(conn, "recv completion on "
784 "%pI4 had status %u, disconnecting and "
785 "reconnecting\n", &conn->c_faddr,
786 wc.status);
787 }
788 }
789
790 rds_iw_ring_free(&ic->i_recv_ring, 1);
791 }
792
793 if (state.ack_next_valid)
794 rds_iw_set_ack(ic, state.ack_next, state.ack_required);
795 if (state.ack_recv_valid && state.ack_recv > ic->i_ack_recv) {
796 rds_send_drop_acked(conn, state.ack_recv, NULL);
797 ic->i_ack_recv = state.ack_recv;
798 }
799 if (rds_conn_up(conn))
800 rds_iw_attempt_ack(ic);
801
802 /* If we ever end up with a really empty receive ring, we're
803 * in deep trouble, as the sender will definitely see RNR
804 * timeouts. */
805 if (rds_iw_ring_empty(&ic->i_recv_ring))
806 rds_iw_stats_inc(s_iw_rx_ring_empty);
807
808 /*
809 * If the ring is running low, then schedule the thread to refill.
810 */
811 if (rds_iw_ring_low(&ic->i_recv_ring))
812 queue_delayed_work(rds_wq, &conn->c_recv_w, 0);
813}
814
815int rds_iw_recv(struct rds_connection *conn)
816{
817 struct rds_iw_connection *ic = conn->c_transport_data;
818 int ret = 0;
819
820 rdsdebug("conn %p\n", conn);
821
822 /*
823 * If we get a temporary posting failure in this context then
824 * we're really low and we want the caller to back off for a bit.
825 */
826 mutex_lock(&ic->i_recv_mutex);
827 if (rds_iw_recv_refill(conn, GFP_KERNEL, GFP_HIGHUSER, 0))
828 ret = -ENOMEM;
829 else
830 rds_iw_stats_inc(s_iw_rx_refill_from_thread);
831 mutex_unlock(&ic->i_recv_mutex);
832
833 if (rds_conn_up(conn))
834 rds_iw_attempt_ack(ic);
835
836 return ret;
837}
838
839int __init rds_iw_recv_init(void)
840{
841 struct sysinfo si;
842 int ret = -ENOMEM;
843
844 /* Default to 30% of all available RAM for recv memory */
845 si_meminfo(&si);
846 rds_iw_sysctl_max_recv_allocation = si.totalram / 3 * PAGE_SIZE / RDS_FRAG_SIZE;
847
848 rds_iw_incoming_slab = kmem_cache_create("rds_iw_incoming",
849 sizeof(struct rds_iw_incoming),
850 0, 0, NULL);
851 if (rds_iw_incoming_slab == NULL)
852 goto out;
853
854 rds_iw_frag_slab = kmem_cache_create("rds_iw_frag",
855 sizeof(struct rds_page_frag),
856 0, 0, NULL);
857 if (rds_iw_frag_slab == NULL)
858 kmem_cache_destroy(rds_iw_incoming_slab);
859 else
860 ret = 0;
861out:
862 return ret;
863}
864
865void rds_iw_recv_exit(void)
866{
867 kmem_cache_destroy(rds_iw_incoming_slab);
868 kmem_cache_destroy(rds_iw_frag_slab);
869}