aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--net/rds/ib.h28
-rw-r--r--net/rds/ib_cm.c70
-rw-r--r--net/rds/ib_recv.c136
-rw-r--r--net/rds/ib_stats.c3
4 files changed, 132 insertions, 105 deletions
diff --git a/net/rds/ib.h b/net/rds/ib.h
index f1fd5ffec4e1..727759b30579 100644
--- a/net/rds/ib.h
+++ b/net/rds/ib.h
@@ -24,6 +24,8 @@
24 24
25#define RDS_IB_RECYCLE_BATCH_COUNT 32 25#define RDS_IB_RECYCLE_BATCH_COUNT 32
26 26
27#define RDS_IB_WC_MAX 32
28
27extern struct rw_semaphore rds_ib_devices_lock; 29extern struct rw_semaphore rds_ib_devices_lock;
28extern struct list_head rds_ib_devices; 30extern struct list_head rds_ib_devices;
29 31
@@ -89,6 +91,20 @@ struct rds_ib_work_ring {
89 atomic_t w_free_ctr; 91 atomic_t w_free_ctr;
90}; 92};
91 93
94/* Rings are posted with all the allocations they'll need to queue the
95 * incoming message to the receiving socket so this can't fail.
96 * All fragments start with a header, so we can make sure we're not receiving
97 * garbage, and we can tell a small 8 byte fragment from an ACK frame.
98 */
99struct rds_ib_ack_state {
100 u64 ack_next;
101 u64 ack_recv;
102 unsigned int ack_required:1;
103 unsigned int ack_next_valid:1;
104 unsigned int ack_recv_valid:1;
105};
106
107
92struct rds_ib_device; 108struct rds_ib_device;
93 109
94struct rds_ib_connection { 110struct rds_ib_connection {
@@ -102,6 +118,10 @@ struct rds_ib_connection {
102 struct ib_pd *i_pd; 118 struct ib_pd *i_pd;
103 struct ib_cq *i_send_cq; 119 struct ib_cq *i_send_cq;
104 struct ib_cq *i_recv_cq; 120 struct ib_cq *i_recv_cq;
121 struct ib_wc i_recv_wc[RDS_IB_WC_MAX];
122
123 /* interrupt handling */
124 struct tasklet_struct i_recv_tasklet;
105 125
106 /* tx */ 126 /* tx */
107 struct rds_ib_work_ring i_send_ring; 127 struct rds_ib_work_ring i_send_ring;
@@ -112,7 +132,6 @@ struct rds_ib_connection {
112 atomic_t i_signaled_sends; 132 atomic_t i_signaled_sends;
113 133
114 /* rx */ 134 /* rx */
115 struct tasklet_struct i_recv_tasklet;
116 struct mutex i_recv_mutex; 135 struct mutex i_recv_mutex;
117 struct rds_ib_work_ring i_recv_ring; 136 struct rds_ib_work_ring i_recv_ring;
118 struct rds_ib_incoming *i_ibinc; 137 struct rds_ib_incoming *i_ibinc;
@@ -199,13 +218,14 @@ struct rds_ib_statistics {
199 uint64_t s_ib_connect_raced; 218 uint64_t s_ib_connect_raced;
200 uint64_t s_ib_listen_closed_stale; 219 uint64_t s_ib_listen_closed_stale;
201 uint64_t s_ib_tx_cq_call; 220 uint64_t s_ib_tx_cq_call;
221 uint64_t s_ib_evt_handler_call;
222 uint64_t s_ib_tasklet_call;
202 uint64_t s_ib_tx_cq_event; 223 uint64_t s_ib_tx_cq_event;
203 uint64_t s_ib_tx_ring_full; 224 uint64_t s_ib_tx_ring_full;
204 uint64_t s_ib_tx_throttle; 225 uint64_t s_ib_tx_throttle;
205 uint64_t s_ib_tx_sg_mapping_failure; 226 uint64_t s_ib_tx_sg_mapping_failure;
206 uint64_t s_ib_tx_stalled; 227 uint64_t s_ib_tx_stalled;
207 uint64_t s_ib_tx_credit_updates; 228 uint64_t s_ib_tx_credit_updates;
208 uint64_t s_ib_rx_cq_call;
209 uint64_t s_ib_rx_cq_event; 229 uint64_t s_ib_rx_cq_event;
210 uint64_t s_ib_rx_ring_empty; 230 uint64_t s_ib_rx_ring_empty;
211 uint64_t s_ib_rx_refill_from_cq; 231 uint64_t s_ib_rx_refill_from_cq;
@@ -324,7 +344,8 @@ void rds_ib_recv_free_caches(struct rds_ib_connection *ic);
324void rds_ib_recv_refill(struct rds_connection *conn, int prefill, gfp_t gfp); 344void rds_ib_recv_refill(struct rds_connection *conn, int prefill, gfp_t gfp);
325void rds_ib_inc_free(struct rds_incoming *inc); 345void rds_ib_inc_free(struct rds_incoming *inc);
326int rds_ib_inc_copy_to_user(struct rds_incoming *inc, struct iov_iter *to); 346int rds_ib_inc_copy_to_user(struct rds_incoming *inc, struct iov_iter *to);
327void rds_ib_recv_cq_comp_handler(struct ib_cq *cq, void *context); 347void rds_ib_recv_cqe_handler(struct rds_ib_connection *ic, struct ib_wc *wc,
348 struct rds_ib_ack_state *state);
328void rds_ib_recv_tasklet_fn(unsigned long data); 349void rds_ib_recv_tasklet_fn(unsigned long data);
329void rds_ib_recv_init_ring(struct rds_ib_connection *ic); 350void rds_ib_recv_init_ring(struct rds_ib_connection *ic);
330void rds_ib_recv_clear_ring(struct rds_ib_connection *ic); 351void rds_ib_recv_clear_ring(struct rds_ib_connection *ic);
@@ -332,6 +353,7 @@ void rds_ib_recv_init_ack(struct rds_ib_connection *ic);
332void rds_ib_attempt_ack(struct rds_ib_connection *ic); 353void rds_ib_attempt_ack(struct rds_ib_connection *ic);
333void rds_ib_ack_send_complete(struct rds_ib_connection *ic); 354void rds_ib_ack_send_complete(struct rds_ib_connection *ic);
334u64 rds_ib_piggyb_ack(struct rds_ib_connection *ic); 355u64 rds_ib_piggyb_ack(struct rds_ib_connection *ic);
356void rds_ib_set_ack(struct rds_ib_connection *ic, u64 seq, int ack_required);
335 357
336/* ib_ring.c */ 358/* ib_ring.c */
337void rds_ib_ring_init(struct rds_ib_work_ring *ring, u32 nr); 359void rds_ib_ring_init(struct rds_ib_work_ring *ring, u32 nr);
diff --git a/net/rds/ib_cm.c b/net/rds/ib_cm.c
index 9043f5c04787..28e0979720b2 100644
--- a/net/rds/ib_cm.c
+++ b/net/rds/ib_cm.c
@@ -216,6 +216,72 @@ static void rds_ib_cq_event_handler(struct ib_event *event, void *data)
216 event->event, ib_event_msg(event->event), data); 216 event->event, ib_event_msg(event->event), data);
217} 217}
218 218
219/* Plucking the oldest entry from the ring can be done concurrently with
220 * the thread refilling the ring. Each ring operation is protected by
221 * spinlocks and the transient state of refilling doesn't change the
222 * recording of which entry is oldest.
223 *
224 * This relies on IB only calling one cq comp_handler for each cq so that
225 * there will only be one caller of rds_recv_incoming() per RDS connection.
226 */
227static void rds_ib_cq_comp_handler_recv(struct ib_cq *cq, void *context)
228{
229 struct rds_connection *conn = context;
230 struct rds_ib_connection *ic = conn->c_transport_data;
231
232 rdsdebug("conn %p cq %p\n", conn, cq);
233
234 rds_ib_stats_inc(s_ib_evt_handler_call);
235
236 tasklet_schedule(&ic->i_recv_tasklet);
237}
238
239static void poll_cq(struct rds_ib_connection *ic, struct ib_cq *cq,
240 struct ib_wc *wcs,
241 struct rds_ib_ack_state *ack_state)
242{
243 int nr;
244 int i;
245 struct ib_wc *wc;
246
247 while ((nr = ib_poll_cq(cq, RDS_IB_WC_MAX, wcs)) > 0) {
248 for (i = 0; i < nr; i++) {
249 wc = wcs + i;
250 rdsdebug("wc wr_id 0x%llx status %u byte_len %u imm_data %u\n",
251 (unsigned long long)wc->wr_id, wc->status,
252 wc->byte_len, be32_to_cpu(wc->ex.imm_data));
253 rds_ib_recv_cqe_handler(ic, wc, ack_state);
254 }
255 }
256}
257
258static void rds_ib_tasklet_fn_recv(unsigned long data)
259{
260 struct rds_ib_connection *ic = (struct rds_ib_connection *)data;
261 struct rds_connection *conn = ic->conn;
262 struct rds_ib_device *rds_ibdev = ic->rds_ibdev;
263 struct rds_ib_ack_state state;
264
265 BUG_ON(!rds_ibdev);
266
267 rds_ib_stats_inc(s_ib_tasklet_call);
268
269 memset(&state, 0, sizeof(state));
270 poll_cq(ic, ic->i_recv_cq, ic->i_recv_wc, &state);
271 ib_req_notify_cq(ic->i_recv_cq, IB_CQ_SOLICITED);
272 poll_cq(ic, ic->i_recv_cq, ic->i_recv_wc, &state);
273
274 if (state.ack_next_valid)
275 rds_ib_set_ack(ic, state.ack_next, state.ack_required);
276 if (state.ack_recv_valid && state.ack_recv > ic->i_ack_recv) {
277 rds_send_drop_acked(conn, state.ack_recv, NULL);
278 ic->i_ack_recv = state.ack_recv;
279 }
280
281 if (rds_conn_up(conn))
282 rds_ib_attempt_ack(ic);
283}
284
219static void rds_ib_qp_event_handler(struct ib_event *event, void *data) 285static void rds_ib_qp_event_handler(struct ib_event *event, void *data)
220{ 286{
221 struct rds_connection *conn = data; 287 struct rds_connection *conn = data;
@@ -282,7 +348,7 @@ static int rds_ib_setup_qp(struct rds_connection *conn)
282 } 348 }
283 349
284 cq_attr.cqe = ic->i_recv_ring.w_nr; 350 cq_attr.cqe = ic->i_recv_ring.w_nr;
285 ic->i_recv_cq = ib_create_cq(dev, rds_ib_recv_cq_comp_handler, 351 ic->i_recv_cq = ib_create_cq(dev, rds_ib_cq_comp_handler_recv,
286 rds_ib_cq_event_handler, conn, 352 rds_ib_cq_event_handler, conn,
287 &cq_attr); 353 &cq_attr);
288 if (IS_ERR(ic->i_recv_cq)) { 354 if (IS_ERR(ic->i_recv_cq)) {
@@ -743,7 +809,7 @@ int rds_ib_conn_alloc(struct rds_connection *conn, gfp_t gfp)
743 } 809 }
744 810
745 INIT_LIST_HEAD(&ic->ib_node); 811 INIT_LIST_HEAD(&ic->ib_node);
746 tasklet_init(&ic->i_recv_tasklet, rds_ib_recv_tasklet_fn, 812 tasklet_init(&ic->i_recv_tasklet, rds_ib_tasklet_fn_recv,
747 (unsigned long) ic); 813 (unsigned long) ic);
748 mutex_init(&ic->i_recv_mutex); 814 mutex_init(&ic->i_recv_mutex);
749#ifndef KERNEL_HAS_ATOMIC64 815#ifndef KERNEL_HAS_ATOMIC64
diff --git a/net/rds/ib_recv.c b/net/rds/ib_recv.c
index f43831e4186a..96744b75db93 100644
--- a/net/rds/ib_recv.c
+++ b/net/rds/ib_recv.c
@@ -596,8 +596,7 @@ void rds_ib_recv_init_ack(struct rds_ib_connection *ic)
596 * wr_id and avoids working with the ring in that case. 596 * wr_id and avoids working with the ring in that case.
597 */ 597 */
598#ifndef KERNEL_HAS_ATOMIC64 598#ifndef KERNEL_HAS_ATOMIC64
599static void rds_ib_set_ack(struct rds_ib_connection *ic, u64 seq, 599void rds_ib_set_ack(struct rds_ib_connection *ic, u64 seq, int ack_required)
600 int ack_required)
601{ 600{
602 unsigned long flags; 601 unsigned long flags;
603 602
@@ -622,8 +621,7 @@ static u64 rds_ib_get_ack(struct rds_ib_connection *ic)
622 return seq; 621 return seq;
623} 622}
624#else 623#else
625static void rds_ib_set_ack(struct rds_ib_connection *ic, u64 seq, 624void rds_ib_set_ack(struct rds_ib_connection *ic, u64 seq, int ack_required)
626 int ack_required)
627{ 625{
628 atomic64_set(&ic->i_ack_next, seq); 626 atomic64_set(&ic->i_ack_next, seq);
629 if (ack_required) { 627 if (ack_required) {
@@ -830,20 +828,6 @@ static void rds_ib_cong_recv(struct rds_connection *conn,
830 rds_cong_map_updated(map, uncongested); 828 rds_cong_map_updated(map, uncongested);
831} 829}
832 830
833/*
834 * Rings are posted with all the allocations they'll need to queue the
835 * incoming message to the receiving socket so this can't fail.
836 * All fragments start with a header, so we can make sure we're not receiving
837 * garbage, and we can tell a small 8 byte fragment from an ACK frame.
838 */
839struct rds_ib_ack_state {
840 u64 ack_next;
841 u64 ack_recv;
842 unsigned int ack_required:1;
843 unsigned int ack_next_valid:1;
844 unsigned int ack_recv_valid:1;
845};
846
847static void rds_ib_process_recv(struct rds_connection *conn, 831static void rds_ib_process_recv(struct rds_connection *conn,
848 struct rds_ib_recv_work *recv, u32 data_len, 832 struct rds_ib_recv_work *recv, u32 data_len,
849 struct rds_ib_ack_state *state) 833 struct rds_ib_ack_state *state)
@@ -969,96 +953,50 @@ static void rds_ib_process_recv(struct rds_connection *conn,
969 } 953 }
970} 954}
971 955
972/* 956void rds_ib_recv_cqe_handler(struct rds_ib_connection *ic,
973 * Plucking the oldest entry from the ring can be done concurrently with 957 struct ib_wc *wc,
974 * the thread refilling the ring. Each ring operation is protected by 958 struct rds_ib_ack_state *state)
975 * spinlocks and the transient state of refilling doesn't change the
976 * recording of which entry is oldest.
977 *
978 * This relies on IB only calling one cq comp_handler for each cq so that
979 * there will only be one caller of rds_recv_incoming() per RDS connection.
980 */
981void rds_ib_recv_cq_comp_handler(struct ib_cq *cq, void *context)
982{
983 struct rds_connection *conn = context;
984 struct rds_ib_connection *ic = conn->c_transport_data;
985
986 rdsdebug("conn %p cq %p\n", conn, cq);
987
988 rds_ib_stats_inc(s_ib_rx_cq_call);
989
990 tasklet_schedule(&ic->i_recv_tasklet);
991}
992
993static inline void rds_poll_cq(struct rds_ib_connection *ic,
994 struct rds_ib_ack_state *state)
995{ 959{
996 struct rds_connection *conn = ic->conn; 960 struct rds_connection *conn = ic->conn;
997 struct ib_wc wc;
998 struct rds_ib_recv_work *recv; 961 struct rds_ib_recv_work *recv;
999 962
1000 while (ib_poll_cq(ic->i_recv_cq, 1, &wc) > 0) { 963 rdsdebug("wc wr_id 0x%llx status %u (%s) byte_len %u imm_data %u\n",
1001 rdsdebug("wc wr_id 0x%llx status %u (%s) byte_len %u imm_data %u\n", 964 (unsigned long long)wc->wr_id, wc->status,
1002 (unsigned long long)wc.wr_id, wc.status, 965 ib_wc_status_msg(wc->status), wc->byte_len,
1003 ib_wc_status_msg(wc.status), wc.byte_len, 966 be32_to_cpu(wc->ex.imm_data));
1004 be32_to_cpu(wc.ex.imm_data));
1005 rds_ib_stats_inc(s_ib_rx_cq_event);
1006 967
1007 recv = &ic->i_recvs[rds_ib_ring_oldest(&ic->i_recv_ring)]; 968 rds_ib_stats_inc(s_ib_rx_cq_event);
1008 969 recv = &ic->i_recvs[rds_ib_ring_oldest(&ic->i_recv_ring)];
1009 ib_dma_unmap_sg(ic->i_cm_id->device, &recv->r_frag->f_sg, 1, DMA_FROM_DEVICE); 970 ib_dma_unmap_sg(ic->i_cm_id->device, &recv->r_frag->f_sg, 1,
1010 971 DMA_FROM_DEVICE);
1011 /*
1012 * Also process recvs in connecting state because it is possible
1013 * to get a recv completion _before_ the rdmacm ESTABLISHED
1014 * event is processed.
1015 */
1016 if (wc.status == IB_WC_SUCCESS) {
1017 rds_ib_process_recv(conn, recv, wc.byte_len, state);
1018 } else {
1019 /* We expect errors as the qp is drained during shutdown */
1020 if (rds_conn_up(conn) || rds_conn_connecting(conn))
1021 rds_ib_conn_error(conn, "recv completion on %pI4 had "
1022 "status %u (%s), disconnecting and "
1023 "reconnecting\n", &conn->c_faddr,
1024 wc.status,
1025 ib_wc_status_msg(wc.status));
1026 }
1027 972
1028 /* 973 /* Also process recvs in connecting state because it is possible
1029 * rds_ib_process_recv() doesn't always consume the frag, and 974 * to get a recv completion _before_ the rdmacm ESTABLISHED
1030 * we might not have called it at all if the wc didn't indicate 975 * event is processed.
1031 * success. We already unmapped the frag's pages, though, and 976 */
1032 * the following rds_ib_ring_free() call tells the refill path 977 if (wc->status == IB_WC_SUCCESS) {
1033 * that it will not find an allocated frag here. Make sure we 978 rds_ib_process_recv(conn, recv, wc->byte_len, state);
1034 * keep that promise by freeing a frag that's still on the ring. 979 } else {
1035 */ 980 /* We expect errors as the qp is drained during shutdown */
1036 if (recv->r_frag) { 981 if (rds_conn_up(conn) || rds_conn_connecting(conn))
1037 rds_ib_frag_free(ic, recv->r_frag); 982 rds_ib_conn_error(conn, "recv completion on %pI4 had status %u (%s), disconnecting and reconnecting\n",
1038 recv->r_frag = NULL; 983 &conn->c_faddr,
1039 } 984 wc->status,
1040 rds_ib_ring_free(&ic->i_recv_ring, 1); 985 ib_wc_status_msg(wc->status));
1041 } 986 }
1042}
1043 987
1044void rds_ib_recv_tasklet_fn(unsigned long data) 988 /* rds_ib_process_recv() doesn't always consume the frag, and
1045{ 989 * we might not have called it at all if the wc didn't indicate
1046 struct rds_ib_connection *ic = (struct rds_ib_connection *) data; 990 * success. We already unmapped the frag's pages, though, and
1047 struct rds_connection *conn = ic->conn; 991 * the following rds_ib_ring_free() call tells the refill path
1048 struct rds_ib_ack_state state = { 0, }; 992 * that it will not find an allocated frag here. Make sure we
1049 993 * keep that promise by freeing a frag that's still on the ring.
1050 rds_poll_cq(ic, &state); 994 */
1051 ib_req_notify_cq(ic->i_recv_cq, IB_CQ_SOLICITED); 995 if (recv->r_frag) {
1052 rds_poll_cq(ic, &state); 996 rds_ib_frag_free(ic, recv->r_frag);
1053 997 recv->r_frag = NULL;
1054 if (state.ack_next_valid)
1055 rds_ib_set_ack(ic, state.ack_next, state.ack_required);
1056 if (state.ack_recv_valid && state.ack_recv > ic->i_ack_recv) {
1057 rds_send_drop_acked(conn, state.ack_recv, NULL);
1058 ic->i_ack_recv = state.ack_recv;
1059 } 998 }
1060 if (rds_conn_up(conn)) 999 rds_ib_ring_free(&ic->i_recv_ring, 1);
1061 rds_ib_attempt_ack(ic);
1062 1000
1063 /* If we ever end up with a really empty receive ring, we're 1001 /* If we ever end up with a really empty receive ring, we're
1064 * in deep trouble, as the sender will definitely see RNR 1002 * in deep trouble, as the sender will definitely see RNR
diff --git a/net/rds/ib_stats.c b/net/rds/ib_stats.c
index 2d5965d6e97c..bdf6115ef6e1 100644
--- a/net/rds/ib_stats.c
+++ b/net/rds/ib_stats.c
@@ -42,14 +42,15 @@ DEFINE_PER_CPU_SHARED_ALIGNED(struct rds_ib_statistics, rds_ib_stats);
42static const char *const rds_ib_stat_names[] = { 42static const char *const rds_ib_stat_names[] = {
43 "ib_connect_raced", 43 "ib_connect_raced",
44 "ib_listen_closed_stale", 44 "ib_listen_closed_stale",
45 "s_ib_evt_handler_call",
45 "ib_tx_cq_call", 46 "ib_tx_cq_call",
47 "ib_tasklet_call",
46 "ib_tx_cq_event", 48 "ib_tx_cq_event",
47 "ib_tx_ring_full", 49 "ib_tx_ring_full",
48 "ib_tx_throttle", 50 "ib_tx_throttle",
49 "ib_tx_sg_mapping_failure", 51 "ib_tx_sg_mapping_failure",
50 "ib_tx_stalled", 52 "ib_tx_stalled",
51 "ib_tx_credit_updates", 53 "ib_tx_credit_updates",
52 "ib_rx_cq_call",
53 "ib_rx_cq_event", 54 "ib_rx_cq_event",
54 "ib_rx_ring_empty", 55 "ib_rx_ring_empty",
55 "ib_rx_refill_from_cq", 56 "ib_rx_refill_from_cq",