aboutsummaryrefslogtreecommitdiffstats
path: root/net/rds
diff options
context:
space:
mode:
authorSowmini Varadhan <sowmini.varadhan@oracle.com>2016-06-13 12:44:34 -0400
committerDavid S. Miller <davem@davemloft.net>2016-06-15 02:50:43 -0400
commit1f9ecd7eacfd9ee52a114b87292bfe885aafdb1f (patch)
tree021e855ec65bfcddc393f00faa9b0c40eae3e98f /net/rds
parent780a6d9e16d1827eb97c2497d7814fe34d280c15 (diff)
RDS: Pass rds_conn_path to rds_send_xmit()
Pass a struct rds_conn_path to rds_send_xmit so that MP capable transports can transmit packets on something other than c_path[0]. The eventual goal for MP capable transports is to hash the rds socket to a path based on the bound local address/port, and use this path as the argument to rds_send_xmit() Signed-off-by: Sowmini Varadhan <sowmini.varadhan@oracle.com> Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net/rds')
-rw-r--r--net/rds/ib_cm.c2
-rw-r--r--net/rds/rds.h4
-rw-r--r--net/rds/send.c149
-rw-r--r--net/rds/threads.c2
4 files changed, 87 insertions, 70 deletions
diff --git a/net/rds/ib_cm.c b/net/rds/ib_cm.c
index 4de5a35f5c40..334287602b78 100644
--- a/net/rds/ib_cm.c
+++ b/net/rds/ib_cm.c
@@ -274,7 +274,7 @@ static void rds_ib_tasklet_fn_send(unsigned long data)
274 if (rds_conn_up(conn) && 274 if (rds_conn_up(conn) &&
275 (!test_bit(RDS_LL_SEND_FULL, &conn->c_flags) || 275 (!test_bit(RDS_LL_SEND_FULL, &conn->c_flags) ||
276 test_bit(0, &conn->c_map_queued))) 276 test_bit(0, &conn->c_map_queued)))
277 rds_send_xmit(ic->conn); 277 rds_send_xmit(&ic->conn->c_path[0]);
278} 278}
279 279
280static void poll_rcq(struct rds_ib_connection *ic, struct ib_cq *cq, 280static void poll_rcq(struct rds_ib_connection *ic, struct ib_cq *cq,
diff --git a/net/rds/rds.h b/net/rds/rds.h
index 2cffd37a550f..b6072eb05fb6 100644
--- a/net/rds/rds.h
+++ b/net/rds/rds.h
@@ -457,7 +457,9 @@ struct rds_transport {
457 int (*conn_connect)(struct rds_connection *conn); 457 int (*conn_connect)(struct rds_connection *conn);
458 void (*conn_shutdown)(struct rds_connection *conn); 458 void (*conn_shutdown)(struct rds_connection *conn);
459 void (*xmit_prepare)(struct rds_connection *conn); 459 void (*xmit_prepare)(struct rds_connection *conn);
460 void (*xmit_path_prepare)(struct rds_conn_path *cp);
460 void (*xmit_complete)(struct rds_connection *conn); 461 void (*xmit_complete)(struct rds_connection *conn);
462 void (*xmit_path_complete)(struct rds_conn_path *cp);
461 int (*xmit)(struct rds_connection *conn, struct rds_message *rm, 463 int (*xmit)(struct rds_connection *conn, struct rds_message *rm,
462 unsigned int hdr_off, unsigned int sg, unsigned int off); 464 unsigned int hdr_off, unsigned int sg, unsigned int off);
463 int (*xmit_rdma)(struct rds_connection *conn, struct rm_rdma_op *op); 465 int (*xmit_rdma)(struct rds_connection *conn, struct rm_rdma_op *op);
@@ -780,7 +782,7 @@ void rds_inc_info_copy(struct rds_incoming *inc,
780/* send.c */ 782/* send.c */
781int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len); 783int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len);
782void rds_send_reset(struct rds_connection *conn); 784void rds_send_reset(struct rds_connection *conn);
783int rds_send_xmit(struct rds_connection *conn); 785int rds_send_xmit(struct rds_conn_path *cp);
784struct sockaddr_in; 786struct sockaddr_in;
785void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in *dest); 787void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in *dest);
786typedef int (*is_acked_func)(struct rds_message *rm, uint64_t ack); 788typedef int (*is_acked_func)(struct rds_message *rm, uint64_t ack);
diff --git a/net/rds/send.c b/net/rds/send.c
index 076ee413d21c..966311d135af 100644
--- a/net/rds/send.c
+++ b/net/rds/send.c
@@ -107,14 +107,14 @@ void rds_send_reset(struct rds_connection *conn)
107} 107}
108EXPORT_SYMBOL_GPL(rds_send_reset); 108EXPORT_SYMBOL_GPL(rds_send_reset);
109 109
110static int acquire_in_xmit(struct rds_connection *conn) 110static int acquire_in_xmit(struct rds_conn_path *cp)
111{ 111{
112 return test_and_set_bit(RDS_IN_XMIT, &conn->c_flags) == 0; 112 return test_and_set_bit(RDS_IN_XMIT, &cp->cp_flags) == 0;
113} 113}
114 114
115static void release_in_xmit(struct rds_connection *conn) 115static void release_in_xmit(struct rds_conn_path *cp)
116{ 116{
117 clear_bit(RDS_IN_XMIT, &conn->c_flags); 117 clear_bit(RDS_IN_XMIT, &cp->cp_flags);
118 smp_mb__after_atomic(); 118 smp_mb__after_atomic();
119 /* 119 /*
120 * We don't use wait_on_bit()/wake_up_bit() because our waking is in a 120 * We don't use wait_on_bit()/wake_up_bit() because our waking is in a
@@ -122,8 +122,8 @@ static void release_in_xmit(struct rds_connection *conn)
122 * the system-wide hashed waitqueue buckets in the fast path only to 122 * the system-wide hashed waitqueue buckets in the fast path only to
123 * almost never find waiters. 123 * almost never find waiters.
124 */ 124 */
125 if (waitqueue_active(&conn->c_waitq)) 125 if (waitqueue_active(&cp->cp_waitq))
126 wake_up_all(&conn->c_waitq); 126 wake_up_all(&cp->cp_waitq);
127} 127}
128 128
129/* 129/*
@@ -140,8 +140,9 @@ static void release_in_xmit(struct rds_connection *conn)
140 * - small message latency is higher behind queued large messages 140 * - small message latency is higher behind queued large messages
141 * - large message latency isn't starved by intervening small sends 141 * - large message latency isn't starved by intervening small sends
142 */ 142 */
143int rds_send_xmit(struct rds_connection *conn) 143int rds_send_xmit(struct rds_conn_path *cp)
144{ 144{
145 struct rds_connection *conn = cp->cp_conn;
145 struct rds_message *rm; 146 struct rds_message *rm;
146 unsigned long flags; 147 unsigned long flags;
147 unsigned int tmp; 148 unsigned int tmp;
@@ -161,7 +162,7 @@ restart:
161 * avoids blocking the caller and trading per-connection data between 162 * avoids blocking the caller and trading per-connection data between
162 * caches per message. 163 * caches per message.
163 */ 164 */
164 if (!acquire_in_xmit(conn)) { 165 if (!acquire_in_xmit(cp)) {
165 rds_stats_inc(s_send_lock_contention); 166 rds_stats_inc(s_send_lock_contention);
166 ret = -ENOMEM; 167 ret = -ENOMEM;
167 goto out; 168 goto out;
@@ -175,21 +176,25 @@ restart:
175 * The acquire_in_xmit() check above ensures that only one 176 * The acquire_in_xmit() check above ensures that only one
176 * caller can increment c_send_gen at any time. 177 * caller can increment c_send_gen at any time.
177 */ 178 */
178 conn->c_send_gen++; 179 cp->cp_send_gen++;
179 send_gen = conn->c_send_gen; 180 send_gen = cp->cp_send_gen;
180 181
181 /* 182 /*
182 * rds_conn_shutdown() sets the conn state and then tests RDS_IN_XMIT, 183 * rds_conn_shutdown() sets the conn state and then tests RDS_IN_XMIT,
183 * we do the opposite to avoid races. 184 * we do the opposite to avoid races.
184 */ 185 */
185 if (!rds_conn_up(conn)) { 186 if (!rds_conn_path_up(cp)) {
186 release_in_xmit(conn); 187 release_in_xmit(cp);
187 ret = 0; 188 ret = 0;
188 goto out; 189 goto out;
189 } 190 }
190 191
191 if (conn->c_trans->xmit_prepare) 192 if (conn->c_trans->t_mp_capable) {
193 if (conn->c_trans->xmit_path_prepare)
194 conn->c_trans->xmit_path_prepare(cp);
195 } else if (conn->c_trans->xmit_prepare) {
192 conn->c_trans->xmit_prepare(conn); 196 conn->c_trans->xmit_prepare(conn);
197 }
193 198
194 /* 199 /*
195 * spin trying to push headers and data down the connection until 200 * spin trying to push headers and data down the connection until
@@ -197,7 +202,7 @@ restart:
197 */ 202 */
198 while (1) { 203 while (1) {
199 204
200 rm = conn->c_xmit_rm; 205 rm = cp->cp_xmit_rm;
201 206
202 /* 207 /*
203 * If between sending messages, we can send a pending congestion 208 * If between sending messages, we can send a pending congestion
@@ -210,14 +215,16 @@ restart:
210 break; 215 break;
211 } 216 }
212 rm->data.op_active = 1; 217 rm->data.op_active = 1;
218 rm->m_inc.i_conn_path = cp;
219 rm->m_inc.i_conn = cp->cp_conn;
213 220
214 conn->c_xmit_rm = rm; 221 cp->cp_xmit_rm = rm;
215 } 222 }
216 223
217 /* 224 /*
218 * If not already working on one, grab the next message. 225 * If not already working on one, grab the next message.
219 * 226 *
220 * c_xmit_rm holds a ref while we're sending this message down 227 * cp_xmit_rm holds a ref while we're sending this message down
221 * the connction. We can use this ref while holding the 228 * the connction. We can use this ref while holding the
222 * send_sem.. rds_send_reset() is serialized with it. 229 * send_sem.. rds_send_reset() is serialized with it.
223 */ 230 */
@@ -234,10 +241,10 @@ restart:
234 if (batch_count >= send_batch_count) 241 if (batch_count >= send_batch_count)
235 goto over_batch; 242 goto over_batch;
236 243
237 spin_lock_irqsave(&conn->c_lock, flags); 244 spin_lock_irqsave(&cp->cp_lock, flags);
238 245
239 if (!list_empty(&conn->c_send_queue)) { 246 if (!list_empty(&cp->cp_send_queue)) {
240 rm = list_entry(conn->c_send_queue.next, 247 rm = list_entry(cp->cp_send_queue.next,
241 struct rds_message, 248 struct rds_message,
242 m_conn_item); 249 m_conn_item);
243 rds_message_addref(rm); 250 rds_message_addref(rm);
@@ -246,10 +253,11 @@ restart:
246 * Move the message from the send queue to the retransmit 253 * Move the message from the send queue to the retransmit
247 * list right away. 254 * list right away.
248 */ 255 */
249 list_move_tail(&rm->m_conn_item, &conn->c_retrans); 256 list_move_tail(&rm->m_conn_item,
257 &cp->cp_retrans);
250 } 258 }
251 259
252 spin_unlock_irqrestore(&conn->c_lock, flags); 260 spin_unlock_irqrestore(&cp->cp_lock, flags);
253 261
254 if (!rm) 262 if (!rm)
255 break; 263 break;
@@ -263,32 +271,34 @@ restart:
263 */ 271 */
264 if (rm->rdma.op_active && 272 if (rm->rdma.op_active &&
265 test_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags)) { 273 test_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags)) {
266 spin_lock_irqsave(&conn->c_lock, flags); 274 spin_lock_irqsave(&cp->cp_lock, flags);
267 if (test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags)) 275 if (test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags))
268 list_move(&rm->m_conn_item, &to_be_dropped); 276 list_move(&rm->m_conn_item, &to_be_dropped);
269 spin_unlock_irqrestore(&conn->c_lock, flags); 277 spin_unlock_irqrestore(&cp->cp_lock, flags);
270 continue; 278 continue;
271 } 279 }
272 280
273 /* Require an ACK every once in a while */ 281 /* Require an ACK every once in a while */
274 len = ntohl(rm->m_inc.i_hdr.h_len); 282 len = ntohl(rm->m_inc.i_hdr.h_len);
275 if (conn->c_unacked_packets == 0 || 283 if (cp->cp_unacked_packets == 0 ||
276 conn->c_unacked_bytes < len) { 284 cp->cp_unacked_bytes < len) {
277 __set_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags); 285 __set_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags);
278 286
279 conn->c_unacked_packets = rds_sysctl_max_unacked_packets; 287 cp->cp_unacked_packets =
280 conn->c_unacked_bytes = rds_sysctl_max_unacked_bytes; 288 rds_sysctl_max_unacked_packets;
289 cp->cp_unacked_bytes =
290 rds_sysctl_max_unacked_bytes;
281 rds_stats_inc(s_send_ack_required); 291 rds_stats_inc(s_send_ack_required);
282 } else { 292 } else {
283 conn->c_unacked_bytes -= len; 293 cp->cp_unacked_bytes -= len;
284 conn->c_unacked_packets--; 294 cp->cp_unacked_packets--;
285 } 295 }
286 296
287 conn->c_xmit_rm = rm; 297 cp->cp_xmit_rm = rm;
288 } 298 }
289 299
290 /* The transport either sends the whole rdma or none of it */ 300 /* The transport either sends the whole rdma or none of it */
291 if (rm->rdma.op_active && !conn->c_xmit_rdma_sent) { 301 if (rm->rdma.op_active && !cp->cp_xmit_rdma_sent) {
292 rm->m_final_op = &rm->rdma; 302 rm->m_final_op = &rm->rdma;
293 /* The transport owns the mapped memory for now. 303 /* The transport owns the mapped memory for now.
294 * You can't unmap it while it's on the send queue 304 * You can't unmap it while it's on the send queue
@@ -300,11 +310,11 @@ restart:
300 wake_up_interruptible(&rm->m_flush_wait); 310 wake_up_interruptible(&rm->m_flush_wait);
301 break; 311 break;
302 } 312 }
303 conn->c_xmit_rdma_sent = 1; 313 cp->cp_xmit_rdma_sent = 1;
304 314
305 } 315 }
306 316
307 if (rm->atomic.op_active && !conn->c_xmit_atomic_sent) { 317 if (rm->atomic.op_active && !cp->cp_xmit_atomic_sent) {
308 rm->m_final_op = &rm->atomic; 318 rm->m_final_op = &rm->atomic;
309 /* The transport owns the mapped memory for now. 319 /* The transport owns the mapped memory for now.
310 * You can't unmap it while it's on the send queue 320 * You can't unmap it while it's on the send queue
@@ -316,7 +326,7 @@ restart:
316 wake_up_interruptible(&rm->m_flush_wait); 326 wake_up_interruptible(&rm->m_flush_wait);
317 break; 327 break;
318 } 328 }
319 conn->c_xmit_atomic_sent = 1; 329 cp->cp_xmit_atomic_sent = 1;
320 330
321 } 331 }
322 332
@@ -342,41 +352,42 @@ restart:
342 rm->data.op_active = 0; 352 rm->data.op_active = 0;
343 } 353 }
344 354
345 if (rm->data.op_active && !conn->c_xmit_data_sent) { 355 if (rm->data.op_active && !cp->cp_xmit_data_sent) {
346 rm->m_final_op = &rm->data; 356 rm->m_final_op = &rm->data;
357
347 ret = conn->c_trans->xmit(conn, rm, 358 ret = conn->c_trans->xmit(conn, rm,
348 conn->c_xmit_hdr_off, 359 cp->cp_xmit_hdr_off,
349 conn->c_xmit_sg, 360 cp->cp_xmit_sg,
350 conn->c_xmit_data_off); 361 cp->cp_xmit_data_off);
351 if (ret <= 0) 362 if (ret <= 0)
352 break; 363 break;
353 364
354 if (conn->c_xmit_hdr_off < sizeof(struct rds_header)) { 365 if (cp->cp_xmit_hdr_off < sizeof(struct rds_header)) {
355 tmp = min_t(int, ret, 366 tmp = min_t(int, ret,
356 sizeof(struct rds_header) - 367 sizeof(struct rds_header) -
357 conn->c_xmit_hdr_off); 368 cp->cp_xmit_hdr_off);
358 conn->c_xmit_hdr_off += tmp; 369 cp->cp_xmit_hdr_off += tmp;
359 ret -= tmp; 370 ret -= tmp;
360 } 371 }
361 372
362 sg = &rm->data.op_sg[conn->c_xmit_sg]; 373 sg = &rm->data.op_sg[cp->cp_xmit_sg];
363 while (ret) { 374 while (ret) {
364 tmp = min_t(int, ret, sg->length - 375 tmp = min_t(int, ret, sg->length -
365 conn->c_xmit_data_off); 376 cp->cp_xmit_data_off);
366 conn->c_xmit_data_off += tmp; 377 cp->cp_xmit_data_off += tmp;
367 ret -= tmp; 378 ret -= tmp;
368 if (conn->c_xmit_data_off == sg->length) { 379 if (cp->cp_xmit_data_off == sg->length) {
369 conn->c_xmit_data_off = 0; 380 cp->cp_xmit_data_off = 0;
370 sg++; 381 sg++;
371 conn->c_xmit_sg++; 382 cp->cp_xmit_sg++;
372 BUG_ON(ret != 0 && 383 BUG_ON(ret != 0 && cp->cp_xmit_sg ==
373 conn->c_xmit_sg == rm->data.op_nents); 384 rm->data.op_nents);
374 } 385 }
375 } 386 }
376 387
377 if (conn->c_xmit_hdr_off == sizeof(struct rds_header) && 388 if (cp->cp_xmit_hdr_off == sizeof(struct rds_header) &&
378 (conn->c_xmit_sg == rm->data.op_nents)) 389 (cp->cp_xmit_sg == rm->data.op_nents))
379 conn->c_xmit_data_sent = 1; 390 cp->cp_xmit_data_sent = 1;
380 } 391 }
381 392
382 /* 393 /*
@@ -384,23 +395,27 @@ restart:
384 * if there is a data op. Thus, if the data is sent (or there was 395 * if there is a data op. Thus, if the data is sent (or there was
385 * none), then we're done with the rm. 396 * none), then we're done with the rm.
386 */ 397 */
387 if (!rm->data.op_active || conn->c_xmit_data_sent) { 398 if (!rm->data.op_active || cp->cp_xmit_data_sent) {
388 conn->c_xmit_rm = NULL; 399 cp->cp_xmit_rm = NULL;
389 conn->c_xmit_sg = 0; 400 cp->cp_xmit_sg = 0;
390 conn->c_xmit_hdr_off = 0; 401 cp->cp_xmit_hdr_off = 0;
391 conn->c_xmit_data_off = 0; 402 cp->cp_xmit_data_off = 0;
392 conn->c_xmit_rdma_sent = 0; 403 cp->cp_xmit_rdma_sent = 0;
393 conn->c_xmit_atomic_sent = 0; 404 cp->cp_xmit_atomic_sent = 0;
394 conn->c_xmit_data_sent = 0; 405 cp->cp_xmit_data_sent = 0;
395 406
396 rds_message_put(rm); 407 rds_message_put(rm);
397 } 408 }
398 } 409 }
399 410
400over_batch: 411over_batch:
401 if (conn->c_trans->xmit_complete) 412 if (conn->c_trans->t_mp_capable) {
413 if (conn->c_trans->xmit_path_complete)
414 conn->c_trans->xmit_path_complete(cp);
415 } else if (conn->c_trans->xmit_complete) {
402 conn->c_trans->xmit_complete(conn); 416 conn->c_trans->xmit_complete(conn);
403 release_in_xmit(conn); 417 }
418 release_in_xmit(cp);
404 419
405 /* Nuke any messages we decided not to retransmit. */ 420 /* Nuke any messages we decided not to retransmit. */
406 if (!list_empty(&to_be_dropped)) { 421 if (!list_empty(&to_be_dropped)) {
@@ -428,12 +443,12 @@ over_batch:
428 if (ret == 0) { 443 if (ret == 0) {
429 smp_mb(); 444 smp_mb();
430 if ((test_bit(0, &conn->c_map_queued) || 445 if ((test_bit(0, &conn->c_map_queued) ||
431 !list_empty(&conn->c_send_queue)) && 446 !list_empty(&cp->cp_send_queue)) &&
432 send_gen == conn->c_send_gen) { 447 send_gen == cp->cp_send_gen) {
433 rds_stats_inc(s_send_lock_queue_raced); 448 rds_stats_inc(s_send_lock_queue_raced);
434 if (batch_count < send_batch_count) 449 if (batch_count < send_batch_count)
435 goto restart; 450 goto restart;
436 queue_delayed_work(rds_wq, &conn->c_send_w, 1); 451 queue_delayed_work(rds_wq, &cp->cp_send_w, 1);
437 } 452 }
438 } 453 }
439out: 454out:
@@ -1110,9 +1125,9 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
1110 */ 1125 */
1111 rds_stats_inc(s_send_queued); 1126 rds_stats_inc(s_send_queued);
1112 1127
1113 ret = rds_send_xmit(conn); 1128 ret = rds_send_xmit(cpath);
1114 if (ret == -ENOMEM || ret == -EAGAIN) 1129 if (ret == -ENOMEM || ret == -EAGAIN)
1115 queue_delayed_work(rds_wq, &conn->c_send_w, 1); 1130 queue_delayed_work(rds_wq, &cpath->cp_send_w, 1);
1116 1131
1117 rds_message_put(rm); 1132 rds_message_put(rm);
1118 return payload_len; 1133 return payload_len;
diff --git a/net/rds/threads.c b/net/rds/threads.c
index 6d0979b8dc63..50d26576dee7 100644
--- a/net/rds/threads.c
+++ b/net/rds/threads.c
@@ -177,7 +177,7 @@ void rds_send_worker(struct work_struct *work)
177 177
178 if (rds_conn_path_state(cp) == RDS_CONN_UP) { 178 if (rds_conn_path_state(cp) == RDS_CONN_UP) {
179 clear_bit(RDS_LL_SEND_FULL, &cp->cp_flags); 179 clear_bit(RDS_LL_SEND_FULL, &cp->cp_flags);
180 ret = rds_send_xmit(cp->cp_conn); 180 ret = rds_send_xmit(cp);
181 cond_resched(); 181 cond_resched();
182 rdsdebug("conn %p ret %d\n", cp->cp_conn, ret); 182 rdsdebug("conn %p ret %d\n", cp->cp_conn, ret);
183 switch (ret) { 183 switch (ret) {