aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSantosh Shilimkar <santosh.shilimkar@oracle.com>2015-09-06 02:18:51 -0400
committerSantosh Shilimkar <santosh.shilimkar@oracle.com>2015-10-05 14:19:01 -0400
commitf4f943c958a2869b0601092857c1cf0e485d3ce8 (patch)
tree883a059f0bbd2473c921ef29de131c32b83c711a
parentdb6526dcb51b054961a2d96ba43dec23e38818b3 (diff)
RDS: IB: ack more receive completions to improve performance
For better performance, we split the receive completion IRQ handler. That lets us acknowledge several WCE events in one call. We also limit the WC to max 32 to avoid latency. Acknowledging several completions in one call instead of several calls each time will provide better performance since less mutual exclusion locks are being performed. In next patch, send completion is also split which re-uses the poll_cq() and hence the code is moved to ib_cm.c Signed-off-by: Santosh Shilimkar <ssantosh@kernel.org> Signed-off-by: Santosh Shilimkar <santosh.shilimkar@oracle.com>
-rw-r--r--net/rds/ib.h28
-rw-r--r--net/rds/ib_cm.c70
-rw-r--r--net/rds/ib_recv.c136
-rw-r--r--net/rds/ib_stats.c3
4 files changed, 132 insertions, 105 deletions
diff --git a/net/rds/ib.h b/net/rds/ib.h
index f1fd5ffec4e1..727759b30579 100644
--- a/net/rds/ib.h
+++ b/net/rds/ib.h
@@ -24,6 +24,8 @@
24 24
25#define RDS_IB_RECYCLE_BATCH_COUNT 32 25#define RDS_IB_RECYCLE_BATCH_COUNT 32
26 26
27#define RDS_IB_WC_MAX 32
28
27extern struct rw_semaphore rds_ib_devices_lock; 29extern struct rw_semaphore rds_ib_devices_lock;
28extern struct list_head rds_ib_devices; 30extern struct list_head rds_ib_devices;
29 31
@@ -89,6 +91,20 @@ struct rds_ib_work_ring {
89 atomic_t w_free_ctr; 91 atomic_t w_free_ctr;
90}; 92};
91 93
94/* Rings are posted with all the allocations they'll need to queue the
95 * incoming message to the receiving socket so this can't fail.
96 * All fragments start with a header, so we can make sure we're not receiving
97 * garbage, and we can tell a small 8 byte fragment from an ACK frame.
98 */
99struct rds_ib_ack_state {
100 u64 ack_next;
101 u64 ack_recv;
102 unsigned int ack_required:1;
103 unsigned int ack_next_valid:1;
104 unsigned int ack_recv_valid:1;
105};
106
107
92struct rds_ib_device; 108struct rds_ib_device;
93 109
94struct rds_ib_connection { 110struct rds_ib_connection {
@@ -102,6 +118,10 @@ struct rds_ib_connection {
102 struct ib_pd *i_pd; 118 struct ib_pd *i_pd;
103 struct ib_cq *i_send_cq; 119 struct ib_cq *i_send_cq;
104 struct ib_cq *i_recv_cq; 120 struct ib_cq *i_recv_cq;
121 struct ib_wc i_recv_wc[RDS_IB_WC_MAX];
122
123 /* interrupt handling */
124 struct tasklet_struct i_recv_tasklet;
105 125
106 /* tx */ 126 /* tx */
107 struct rds_ib_work_ring i_send_ring; 127 struct rds_ib_work_ring i_send_ring;
@@ -112,7 +132,6 @@ struct rds_ib_connection {
112 atomic_t i_signaled_sends; 132 atomic_t i_signaled_sends;
113 133
114 /* rx */ 134 /* rx */
115 struct tasklet_struct i_recv_tasklet;
116 struct mutex i_recv_mutex; 135 struct mutex i_recv_mutex;
117 struct rds_ib_work_ring i_recv_ring; 136 struct rds_ib_work_ring i_recv_ring;
118 struct rds_ib_incoming *i_ibinc; 137 struct rds_ib_incoming *i_ibinc;
@@ -199,13 +218,14 @@ struct rds_ib_statistics {
199 uint64_t s_ib_connect_raced; 218 uint64_t s_ib_connect_raced;
200 uint64_t s_ib_listen_closed_stale; 219 uint64_t s_ib_listen_closed_stale;
201 uint64_t s_ib_tx_cq_call; 220 uint64_t s_ib_tx_cq_call;
221 uint64_t s_ib_evt_handler_call;
222 uint64_t s_ib_tasklet_call;
202 uint64_t s_ib_tx_cq_event; 223 uint64_t s_ib_tx_cq_event;
203 uint64_t s_ib_tx_ring_full; 224 uint64_t s_ib_tx_ring_full;
204 uint64_t s_ib_tx_throttle; 225 uint64_t s_ib_tx_throttle;
205 uint64_t s_ib_tx_sg_mapping_failure; 226 uint64_t s_ib_tx_sg_mapping_failure;
206 uint64_t s_ib_tx_stalled; 227 uint64_t s_ib_tx_stalled;
207 uint64_t s_ib_tx_credit_updates; 228 uint64_t s_ib_tx_credit_updates;
208 uint64_t s_ib_rx_cq_call;
209 uint64_t s_ib_rx_cq_event; 229 uint64_t s_ib_rx_cq_event;
210 uint64_t s_ib_rx_ring_empty; 230 uint64_t s_ib_rx_ring_empty;
211 uint64_t s_ib_rx_refill_from_cq; 231 uint64_t s_ib_rx_refill_from_cq;
@@ -324,7 +344,8 @@ void rds_ib_recv_free_caches(struct rds_ib_connection *ic);
324void rds_ib_recv_refill(struct rds_connection *conn, int prefill, gfp_t gfp); 344void rds_ib_recv_refill(struct rds_connection *conn, int prefill, gfp_t gfp);
325void rds_ib_inc_free(struct rds_incoming *inc); 345void rds_ib_inc_free(struct rds_incoming *inc);
326int rds_ib_inc_copy_to_user(struct rds_incoming *inc, struct iov_iter *to); 346int rds_ib_inc_copy_to_user(struct rds_incoming *inc, struct iov_iter *to);
327void rds_ib_recv_cq_comp_handler(struct ib_cq *cq, void *context); 347void rds_ib_recv_cqe_handler(struct rds_ib_connection *ic, struct ib_wc *wc,
348 struct rds_ib_ack_state *state);
328void rds_ib_recv_tasklet_fn(unsigned long data); 349void rds_ib_recv_tasklet_fn(unsigned long data);
329void rds_ib_recv_init_ring(struct rds_ib_connection *ic); 350void rds_ib_recv_init_ring(struct rds_ib_connection *ic);
330void rds_ib_recv_clear_ring(struct rds_ib_connection *ic); 351void rds_ib_recv_clear_ring(struct rds_ib_connection *ic);
@@ -332,6 +353,7 @@ void rds_ib_recv_init_ack(struct rds_ib_connection *ic);
332void rds_ib_attempt_ack(struct rds_ib_connection *ic); 353void rds_ib_attempt_ack(struct rds_ib_connection *ic);
333void rds_ib_ack_send_complete(struct rds_ib_connection *ic); 354void rds_ib_ack_send_complete(struct rds_ib_connection *ic);
334u64 rds_ib_piggyb_ack(struct rds_ib_connection *ic); 355u64 rds_ib_piggyb_ack(struct rds_ib_connection *ic);
356void rds_ib_set_ack(struct rds_ib_connection *ic, u64 seq, int ack_required);
335 357
336/* ib_ring.c */ 358/* ib_ring.c */
337void rds_ib_ring_init(struct rds_ib_work_ring *ring, u32 nr); 359void rds_ib_ring_init(struct rds_ib_work_ring *ring, u32 nr);
diff --git a/net/rds/ib_cm.c b/net/rds/ib_cm.c
index 9043f5c04787..28e0979720b2 100644
--- a/net/rds/ib_cm.c
+++ b/net/rds/ib_cm.c
@@ -216,6 +216,72 @@ static void rds_ib_cq_event_handler(struct ib_event *event, void *data)
216 event->event, ib_event_msg(event->event), data); 216 event->event, ib_event_msg(event->event), data);
217} 217}
218 218
219/* Plucking the oldest entry from the ring can be done concurrently with
220 * the thread refilling the ring. Each ring operation is protected by
221 * spinlocks and the transient state of refilling doesn't change the
222 * recording of which entry is oldest.
223 *
224 * This relies on IB only calling one cq comp_handler for each cq so that
225 * there will only be one caller of rds_recv_incoming() per RDS connection.
226 */
227static void rds_ib_cq_comp_handler_recv(struct ib_cq *cq, void *context)
228{
229 struct rds_connection *conn = context;
230 struct rds_ib_connection *ic = conn->c_transport_data;
231
232 rdsdebug("conn %p cq %p\n", conn, cq);
233
234 rds_ib_stats_inc(s_ib_evt_handler_call);
235
236 tasklet_schedule(&ic->i_recv_tasklet);
237}
238
239static void poll_cq(struct rds_ib_connection *ic, struct ib_cq *cq,
240 struct ib_wc *wcs,
241 struct rds_ib_ack_state *ack_state)
242{
243 int nr;
244 int i;
245 struct ib_wc *wc;
246
247 while ((nr = ib_poll_cq(cq, RDS_IB_WC_MAX, wcs)) > 0) {
248 for (i = 0; i < nr; i++) {
249 wc = wcs + i;
250 rdsdebug("wc wr_id 0x%llx status %u byte_len %u imm_data %u\n",
251 (unsigned long long)wc->wr_id, wc->status,
252 wc->byte_len, be32_to_cpu(wc->ex.imm_data));
253 rds_ib_recv_cqe_handler(ic, wc, ack_state);
254 }
255 }
256}
257
258static void rds_ib_tasklet_fn_recv(unsigned long data)
259{
260 struct rds_ib_connection *ic = (struct rds_ib_connection *)data;
261 struct rds_connection *conn = ic->conn;
262 struct rds_ib_device *rds_ibdev = ic->rds_ibdev;
263 struct rds_ib_ack_state state;
264
265 BUG_ON(!rds_ibdev);
266
267 rds_ib_stats_inc(s_ib_tasklet_call);
268
269 memset(&state, 0, sizeof(state));
270 poll_cq(ic, ic->i_recv_cq, ic->i_recv_wc, &state);
271 ib_req_notify_cq(ic->i_recv_cq, IB_CQ_SOLICITED);
272 poll_cq(ic, ic->i_recv_cq, ic->i_recv_wc, &state);
273
274 if (state.ack_next_valid)
275 rds_ib_set_ack(ic, state.ack_next, state.ack_required);
276 if (state.ack_recv_valid && state.ack_recv > ic->i_ack_recv) {
277 rds_send_drop_acked(conn, state.ack_recv, NULL);
278 ic->i_ack_recv = state.ack_recv;
279 }
280
281 if (rds_conn_up(conn))
282 rds_ib_attempt_ack(ic);
283}
284
219static void rds_ib_qp_event_handler(struct ib_event *event, void *data) 285static void rds_ib_qp_event_handler(struct ib_event *event, void *data)
220{ 286{
221 struct rds_connection *conn = data; 287 struct rds_connection *conn = data;
@@ -282,7 +348,7 @@ static int rds_ib_setup_qp(struct rds_connection *conn)
282 } 348 }
283 349
284 cq_attr.cqe = ic->i_recv_ring.w_nr; 350 cq_attr.cqe = ic->i_recv_ring.w_nr;
285 ic->i_recv_cq = ib_create_cq(dev, rds_ib_recv_cq_comp_handler, 351 ic->i_recv_cq = ib_create_cq(dev, rds_ib_cq_comp_handler_recv,
286 rds_ib_cq_event_handler, conn, 352 rds_ib_cq_event_handler, conn,
287 &cq_attr); 353 &cq_attr);
288 if (IS_ERR(ic->i_recv_cq)) { 354 if (IS_ERR(ic->i_recv_cq)) {
@@ -743,7 +809,7 @@ int rds_ib_conn_alloc(struct rds_connection *conn, gfp_t gfp)
743 } 809 }
744 810
745 INIT_LIST_HEAD(&ic->ib_node); 811 INIT_LIST_HEAD(&ic->ib_node);
746 tasklet_init(&ic->i_recv_tasklet, rds_ib_recv_tasklet_fn, 812 tasklet_init(&ic->i_recv_tasklet, rds_ib_tasklet_fn_recv,
747 (unsigned long) ic); 813 (unsigned long) ic);
748 mutex_init(&ic->i_recv_mutex); 814 mutex_init(&ic->i_recv_mutex);
749#ifndef KERNEL_HAS_ATOMIC64 815#ifndef KERNEL_HAS_ATOMIC64
diff --git a/net/rds/ib_recv.c b/net/rds/ib_recv.c
index f43831e4186a..96744b75db93 100644
--- a/net/rds/ib_recv.c
+++ b/net/rds/ib_recv.c
@@ -596,8 +596,7 @@ void rds_ib_recv_init_ack(struct rds_ib_connection *ic)
596 * wr_id and avoids working with the ring in that case. 596 * wr_id and avoids working with the ring in that case.
597 */ 597 */
598#ifndef KERNEL_HAS_ATOMIC64 598#ifndef KERNEL_HAS_ATOMIC64
599static void rds_ib_set_ack(struct rds_ib_connection *ic, u64 seq, 599void rds_ib_set_ack(struct rds_ib_connection *ic, u64 seq, int ack_required)
600 int ack_required)
601{ 600{
602 unsigned long flags; 601 unsigned long flags;
603 602
@@ -622,8 +621,7 @@ static u64 rds_ib_get_ack(struct rds_ib_connection *ic)
622 return seq; 621 return seq;
623} 622}
624#else 623#else
625static void rds_ib_set_ack(struct rds_ib_connection *ic, u64 seq, 624void rds_ib_set_ack(struct rds_ib_connection *ic, u64 seq, int ack_required)
626 int ack_required)
627{ 625{
628 atomic64_set(&ic->i_ack_next, seq); 626 atomic64_set(&ic->i_ack_next, seq);
629 if (ack_required) { 627 if (ack_required) {
@@ -830,20 +828,6 @@ static void rds_ib_cong_recv(struct rds_connection *conn,
830 rds_cong_map_updated(map, uncongested); 828 rds_cong_map_updated(map, uncongested);
831} 829}
832 830
833/*
834 * Rings are posted with all the allocations they'll need to queue the
835 * incoming message to the receiving socket so this can't fail.
836 * All fragments start with a header, so we can make sure we're not receiving
837 * garbage, and we can tell a small 8 byte fragment from an ACK frame.
838 */
839struct rds_ib_ack_state {
840 u64 ack_next;
841 u64 ack_recv;
842 unsigned int ack_required:1;
843 unsigned int ack_next_valid:1;
844 unsigned int ack_recv_valid:1;
845};
846
847static void rds_ib_process_recv(struct rds_connection *conn, 831static void rds_ib_process_recv(struct rds_connection *conn,
848 struct rds_ib_recv_work *recv, u32 data_len, 832 struct rds_ib_recv_work *recv, u32 data_len,
849 struct rds_ib_ack_state *state) 833 struct rds_ib_ack_state *state)
@@ -969,96 +953,50 @@ static void rds_ib_process_recv(struct rds_connection *conn,
969 } 953 }
970} 954}
971 955
972/* 956void rds_ib_recv_cqe_handler(struct rds_ib_connection *ic,
973 * Plucking the oldest entry from the ring can be done concurrently with 957 struct ib_wc *wc,
974 * the thread refilling the ring. Each ring operation is protected by 958 struct rds_ib_ack_state *state)
975 * spinlocks and the transient state of refilling doesn't change the
976 * recording of which entry is oldest.
977 *
978 * This relies on IB only calling one cq comp_handler for each cq so that
979 * there will only be one caller of rds_recv_incoming() per RDS connection.
980 */
981void rds_ib_recv_cq_comp_handler(struct ib_cq *cq, void *context)
982{
983 struct rds_connection *conn = context;
984 struct rds_ib_connection *ic = conn->c_transport_data;
985
986 rdsdebug("conn %p cq %p\n", conn, cq);
987
988 rds_ib_stats_inc(s_ib_rx_cq_call);
989
990 tasklet_schedule(&ic->i_recv_tasklet);
991}
992
993static inline void rds_poll_cq(struct rds_ib_connection *ic,
994 struct rds_ib_ack_state *state)
995{ 959{
996 struct rds_connection *conn = ic->conn; 960 struct rds_connection *conn = ic->conn;
997 struct ib_wc wc;
998 struct rds_ib_recv_work *recv; 961 struct rds_ib_recv_work *recv;
999 962
1000 while (ib_poll_cq(ic->i_recv_cq, 1, &wc) > 0) { 963 rdsdebug("wc wr_id 0x%llx status %u (%s) byte_len %u imm_data %u\n",
1001 rdsdebug("wc wr_id 0x%llx status %u (%s) byte_len %u imm_data %u\n", 964 (unsigned long long)wc->wr_id, wc->status,
1002 (unsigned long long)wc.wr_id, wc.status, 965 ib_wc_status_msg(wc->status), wc->byte_len,
1003 ib_wc_status_msg(wc.status), wc.byte_len, 966 be32_to_cpu(wc->ex.imm_data));
1004 be32_to_cpu(wc.ex.imm_data));
1005 rds_ib_stats_inc(s_ib_rx_cq_event);
1006 967
1007 recv = &ic->i_recvs[rds_ib_ring_oldest(&ic->i_recv_ring)]; 968 rds_ib_stats_inc(s_ib_rx_cq_event);
1008 969 recv = &ic->i_recvs[rds_ib_ring_oldest(&ic->i_recv_ring)];
1009 ib_dma_unmap_sg(ic->i_cm_id->device, &recv->r_frag->f_sg, 1, DMA_FROM_DEVICE); 970 ib_dma_unmap_sg(ic->i_cm_id->device, &recv->r_frag->f_sg, 1,
1010 971 DMA_FROM_DEVICE);
1011 /*
1012 * Also process recvs in connecting state because it is possible
1013 * to get a recv completion _before_ the rdmacm ESTABLISHED
1014 * event is processed.
1015 */
1016 if (wc.status == IB_WC_SUCCESS) {
1017 rds_ib_process_recv(conn, recv, wc.byte_len, state);
1018 } else {
1019 /* We expect errors as the qp is drained during shutdown */
1020 if (rds_conn_up(conn) || rds_conn_connecting(conn))
1021 rds_ib_conn_error(conn, "recv completion on %pI4 had "
1022 "status %u (%s), disconnecting and "
1023 "reconnecting\n", &conn->c_faddr,
1024 wc.status,
1025 ib_wc_status_msg(wc.status));
1026 }
1027 972
1028 /* 973 /* Also process recvs in connecting state because it is possible
1029 * rds_ib_process_recv() doesn't always consume the frag, and 974 * to get a recv completion _before_ the rdmacm ESTABLISHED
1030 * we might not have called it at all if the wc didn't indicate 975 * event is processed.
1031 * success. We already unmapped the frag's pages, though, and 976 */
1032 * the following rds_ib_ring_free() call tells the refill path 977 if (wc->status == IB_WC_SUCCESS) {
1033 * that it will not find an allocated frag here. Make sure we 978 rds_ib_process_recv(conn, recv, wc->byte_len, state);
1034 * keep that promise by freeing a frag that's still on the ring. 979 } else {
1035 */ 980 /* We expect errors as the qp is drained during shutdown */
1036 if (recv->r_frag) { 981 if (rds_conn_up(conn) || rds_conn_connecting(conn))
1037 rds_ib_frag_free(ic, recv->r_frag); 982 rds_ib_conn_error(conn, "recv completion on %pI4 had status %u (%s), disconnecting and reconnecting\n",
1038 recv->r_frag = NULL; 983 &conn->c_faddr,
1039 } 984 wc->status,
1040 rds_ib_ring_free(&ic->i_recv_ring, 1); 985 ib_wc_status_msg(wc->status));
1041 } 986 }
1042}
1043 987
1044void rds_ib_recv_tasklet_fn(unsigned long data) 988 /* rds_ib_process_recv() doesn't always consume the frag, and
1045{ 989 * we might not have called it at all if the wc didn't indicate
1046 struct rds_ib_connection *ic = (struct rds_ib_connection *) data; 990 * success. We already unmapped the frag's pages, though, and
1047 struct rds_connection *conn = ic->conn; 991 * the following rds_ib_ring_free() call tells the refill path
1048 struct rds_ib_ack_state state = { 0, }; 992 * that it will not find an allocated frag here. Make sure we
1049 993 * keep that promise by freeing a frag that's still on the ring.
1050 rds_poll_cq(ic, &state); 994 */
1051 ib_req_notify_cq(ic->i_recv_cq, IB_CQ_SOLICITED); 995 if (recv->r_frag) {
1052 rds_poll_cq(ic, &state); 996 rds_ib_frag_free(ic, recv->r_frag);
1053 997 recv->r_frag = NULL;
1054 if (state.ack_next_valid)
1055 rds_ib_set_ack(ic, state.ack_next, state.ack_required);
1056 if (state.ack_recv_valid && state.ack_recv > ic->i_ack_recv) {
1057 rds_send_drop_acked(conn, state.ack_recv, NULL);
1058 ic->i_ack_recv = state.ack_recv;
1059 } 998 }
1060 if (rds_conn_up(conn)) 999 rds_ib_ring_free(&ic->i_recv_ring, 1);
1061 rds_ib_attempt_ack(ic);
1062 1000
1063 /* If we ever end up with a really empty receive ring, we're 1001 /* If we ever end up with a really empty receive ring, we're
1064 * in deep trouble, as the sender will definitely see RNR 1002 * in deep trouble, as the sender will definitely see RNR
diff --git a/net/rds/ib_stats.c b/net/rds/ib_stats.c
index 2d5965d6e97c..bdf6115ef6e1 100644
--- a/net/rds/ib_stats.c
+++ b/net/rds/ib_stats.c
@@ -42,14 +42,15 @@ DEFINE_PER_CPU_SHARED_ALIGNED(struct rds_ib_statistics, rds_ib_stats);
42static const char *const rds_ib_stat_names[] = { 42static const char *const rds_ib_stat_names[] = {
43 "ib_connect_raced", 43 "ib_connect_raced",
44 "ib_listen_closed_stale", 44 "ib_listen_closed_stale",
45 "s_ib_evt_handler_call",
45 "ib_tx_cq_call", 46 "ib_tx_cq_call",
47 "ib_tasklet_call",
46 "ib_tx_cq_event", 48 "ib_tx_cq_event",
47 "ib_tx_ring_full", 49 "ib_tx_ring_full",
48 "ib_tx_throttle", 50 "ib_tx_throttle",
49 "ib_tx_sg_mapping_failure", 51 "ib_tx_sg_mapping_failure",
50 "ib_tx_stalled", 52 "ib_tx_stalled",
51 "ib_tx_credit_updates", 53 "ib_tx_credit_updates",
52 "ib_rx_cq_call",
53 "ib_rx_cq_event", 54 "ib_rx_cq_event",
54 "ib_rx_ring_empty", 55 "ib_rx_ring_empty",
55 "ib_rx_refill_from_cq", 56 "ib_rx_refill_from_cq",