aboutsummaryrefslogtreecommitdiffstats
path: root/net/rds
diff options
context:
space:
mode:
authorAndy Grover <andy.grover@oracle.com>2010-01-20 00:25:26 -0500
committerAndy Grover <andy.grover@oracle.com>2010-09-08 21:11:55 -0400
commit241eef3e2f51fe4ad50abacd7f79c4e2d468197e (patch)
tree020170cb9c3bea79d767b19cd7362a51b8446667 /net/rds
parentd37c9359056f4f07b37e59810f0ece1031e280b2 (diff)
RDS: Implement silent atomics
Signed-off-by: Andy Grover <andy.grover@oracle.com>
Diffstat (limited to 'net/rds')
-rw-r--r--net/rds/ib.h2
-rw-r--r--net/rds/ib_send.c62
-rw-r--r--net/rds/message.c2
-rw-r--r--net/rds/rds.h3
-rw-r--r--net/rds/send.c11
5 files changed, 45 insertions, 35 deletions
diff --git a/net/rds/ib.h b/net/rds/ib.h
index 148818174a04..96769b86a536 100644
--- a/net/rds/ib.h
+++ b/net/rds/ib.h
@@ -336,7 +336,7 @@ void rds_ib_send_add_credits(struct rds_connection *conn, unsigned int credits);
336void rds_ib_advertise_credits(struct rds_connection *conn, unsigned int posted); 336void rds_ib_advertise_credits(struct rds_connection *conn, unsigned int posted);
337int rds_ib_send_grab_credits(struct rds_ib_connection *ic, u32 wanted, 337int rds_ib_send_grab_credits(struct rds_ib_connection *ic, u32 wanted,
338 u32 *adv_credits, int need_posted, int max_posted); 338 u32 *adv_credits, int need_posted, int max_posted);
339int rds_ib_xmit_atomic(struct rds_connection *conn, struct rm_atomic_op *op); 339int rds_ib_xmit_atomic(struct rds_connection *conn, struct rds_message *rm);
340 340
341/* ib_stats.c */ 341/* ib_stats.c */
342DECLARE_PER_CPU(struct rds_ib_statistics, rds_ib_stats); 342DECLARE_PER_CPU(struct rds_ib_statistics, rds_ib_stats);
diff --git a/net/rds/ib_send.c b/net/rds/ib_send.c
index d839b403d46b..e6745d827c3a 100644
--- a/net/rds/ib_send.c
+++ b/net/rds/ib_send.c
@@ -225,15 +225,12 @@ void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context)
225 /* In the error case, wc.opcode sometimes contains garbage */ 225 /* In the error case, wc.opcode sometimes contains garbage */
226 switch (send->s_wr.opcode) { 226 switch (send->s_wr.opcode) {
227 case IB_WR_SEND: 227 case IB_WR_SEND:
228 if (send->s_rm)
229 rds_ib_send_unmap_rm(ic, send, wc.status);
230 break;
231 case IB_WR_RDMA_WRITE: 228 case IB_WR_RDMA_WRITE:
232 case IB_WR_RDMA_READ: 229 case IB_WR_RDMA_READ:
233 case IB_WR_ATOMIC_FETCH_AND_ADD: 230 case IB_WR_ATOMIC_FETCH_AND_ADD:
234 case IB_WR_ATOMIC_CMP_AND_SWP: 231 case IB_WR_ATOMIC_CMP_AND_SWP:
235 /* Nothing to be done - the SG list will be unmapped 232 if (send->s_rm)
236 * when the SEND completes. */ 233 rds_ib_send_unmap_rm(ic, send, wc.status);
237 break; 234 break;
238 default: 235 default:
239 if (printk_ratelimit()) 236 if (printk_ratelimit())
@@ -425,6 +422,21 @@ void rds_ib_advertise_credits(struct rds_connection *conn, unsigned int posted)
425 set_bit(IB_ACK_REQUESTED, &ic->i_ack_flags); 422 set_bit(IB_ACK_REQUESTED, &ic->i_ack_flags);
426} 423}
427 424
425static inline void rds_ib_set_wr_signal_state(struct rds_ib_connection *ic,
426 struct rds_ib_send_work *send,
427 bool notify)
428{
429 /*
430 * We want to delay signaling completions just enough to get
431 * the batching benefits but not so much that we create dead time
432 * on the wire.
433 */
434 if (ic->i_unsignaled_wrs-- == 0 || notify) {
435 ic->i_unsignaled_wrs = rds_ib_sysctl_max_unsig_wrs;
436 send->s_wr.send_flags |= IB_SEND_SIGNALED;
437 }
438}
439
428/* 440/*
429 * This can be called multiple times for a given message. The first time 441 * This can be called multiple times for a given message. The first time
430 * we see a message we map its scatterlist into the IB device so that 442 * we see a message we map its scatterlist into the IB device so that
@@ -517,7 +529,6 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
517 rm->data.m_count = 0; 529 rm->data.m_count = 0;
518 } 530 }
519 531
520 ic->i_unsignaled_wrs = rds_ib_sysctl_max_unsig_wrs;
521 rds_message_addref(rm); 532 rds_message_addref(rm);
522 ic->i_rm = rm; 533 ic->i_rm = rm;
523 534
@@ -608,15 +619,7 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
608 } 619 }
609 } 620 }
610 621
611 /* 622 rds_ib_set_wr_signal_state(ic, send, 0);
612 * We want to delay signaling completions just enough to get
613 * the batching benefits but not so much that we create dead time
614 * on the wire.
615 */
616 if (ic->i_unsignaled_wrs-- == 0) {
617 ic->i_unsignaled_wrs = rds_ib_sysctl_max_unsig_wrs;
618 send->s_wr.send_flags |= IB_SEND_SIGNALED | IB_SEND_SOLICITED;
619 }
620 623
621 /* 624 /*
622 * Always signal the last one if we're stopping due to flow control. 625 * Always signal the last one if we're stopping due to flow control.
@@ -656,7 +659,7 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
656 /* if we finished the message then send completion owns it */ 659 /* if we finished the message then send completion owns it */
657 if (scat == &rm->data.m_sg[rm->data.m_count]) { 660 if (scat == &rm->data.m_sg[rm->data.m_count]) {
658 prev->s_rm = ic->i_rm; 661 prev->s_rm = ic->i_rm;
659 prev->s_wr.send_flags |= IB_SEND_SIGNALED | IB_SEND_SOLICITED; 662 prev->s_wr.send_flags |= IB_SEND_SOLICITED;
660 ic->i_rm = NULL; 663 ic->i_rm = NULL;
661 } 664 }
662 665
@@ -698,9 +701,10 @@ out:
698 * A simplified version of the rdma case, we always map 1 SG, and 701 * A simplified version of the rdma case, we always map 1 SG, and
699 * only 8 bytes, for the return value from the atomic operation. 702 * only 8 bytes, for the return value from the atomic operation.
700 */ 703 */
701int rds_ib_xmit_atomic(struct rds_connection *conn, struct rm_atomic_op *op) 704int rds_ib_xmit_atomic(struct rds_connection *conn, struct rds_message *rm)
702{ 705{
703 struct rds_ib_connection *ic = conn->c_transport_data; 706 struct rds_ib_connection *ic = conn->c_transport_data;
707 struct rm_atomic_op *op = &rm->atomic;
704 struct rds_ib_send_work *send = NULL; 708 struct rds_ib_send_work *send = NULL;
705 struct ib_send_wr *failed_wr; 709 struct ib_send_wr *failed_wr;
706 struct rds_ib_device *rds_ibdev; 710 struct rds_ib_device *rds_ibdev;
@@ -731,12 +735,20 @@ int rds_ib_xmit_atomic(struct rds_connection *conn, struct rm_atomic_op *op)
731 send->s_wr.wr.atomic.compare_add = op->op_swap_add; 735 send->s_wr.wr.atomic.compare_add = op->op_swap_add;
732 send->s_wr.wr.atomic.swap = 0; 736 send->s_wr.wr.atomic.swap = 0;
733 } 737 }
734 send->s_wr.send_flags = IB_SEND_SIGNALED; 738 rds_ib_set_wr_signal_state(ic, send, op->op_notify);
735 send->s_wr.num_sge = 1; 739 send->s_wr.num_sge = 1;
736 send->s_wr.next = NULL; 740 send->s_wr.next = NULL;
737 send->s_wr.wr.atomic.remote_addr = op->op_remote_addr; 741 send->s_wr.wr.atomic.remote_addr = op->op_remote_addr;
738 send->s_wr.wr.atomic.rkey = op->op_rkey; 742 send->s_wr.wr.atomic.rkey = op->op_rkey;
739 743
744 /*
745 * If there is no data or rdma ops in the message, then
746 * we must fill in s_rm ourselves, so we properly clean up
747 * on completion.
748 */
749 if (!rm->rdma.m_rdma_op.r_active && !rm->data.op_active)
750 send->s_rm = rm;
751
740 /* map 8 byte retval buffer to the device */ 752 /* map 8 byte retval buffer to the device */
741 ret = ib_dma_map_sg(ic->i_cm_id->device, op->op_sg, 1, DMA_FROM_DEVICE); 753 ret = ib_dma_map_sg(ic->i_cm_id->device, op->op_sg, 1, DMA_FROM_DEVICE);
742 rdsdebug("ic %p mapping atomic op %p. mapped %d pg\n", ic, op, ret); 754 rdsdebug("ic %p mapping atomic op %p. mapped %d pg\n", ic, op, ret);
@@ -836,14 +848,8 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rds_rdma_op *op)
836 for (i = 0; i < work_alloc && scat != &op->r_sg[op->r_count]; i++) { 848 for (i = 0; i < work_alloc && scat != &op->r_sg[op->r_count]; i++) {
837 send->s_wr.send_flags = 0; 849 send->s_wr.send_flags = 0;
838 send->s_queued = jiffies; 850 send->s_queued = jiffies;
839 /* 851
840 * We want to delay signaling completions just enough to get 852 rds_ib_set_wr_signal_state(ic, send, op->r_notify);
841 * the batching benefits but not so much that we create dead time on the wire.
842 */
843 if (ic->i_unsignaled_wrs-- == 0) {
844 ic->i_unsignaled_wrs = rds_ib_sysctl_max_unsig_wrs;
845 send->s_wr.send_flags = IB_SEND_SIGNALED;
846 }
847 853
848 send->s_wr.opcode = op->r_write ? IB_WR_RDMA_WRITE : IB_WR_RDMA_READ; 854 send->s_wr.opcode = op->r_write ? IB_WR_RDMA_WRITE : IB_WR_RDMA_READ;
849 send->s_wr.wr.rdma.remote_addr = remote_addr; 855 send->s_wr.wr.rdma.remote_addr = remote_addr;
@@ -884,10 +890,6 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rds_rdma_op *op)
884 send = ic->i_sends; 890 send = ic->i_sends;
885 } 891 }
886 892
887 /* if we finished the message then send completion owns it */
888 if (scat == &op->r_sg[op->r_count])
889 prev->s_wr.send_flags = IB_SEND_SIGNALED;
890
891 if (i < work_alloc) { 893 if (i < work_alloc) {
892 rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc - i); 894 rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc - i);
893 work_alloc = i; 895 work_alloc = i;
diff --git a/net/rds/message.c b/net/rds/message.c
index 3ea05c864cd4..a27e493a63a2 100644
--- a/net/rds/message.c
+++ b/net/rds/message.c
@@ -325,6 +325,8 @@ int rds_message_copy_from_user(struct rds_message *rm, struct iovec *first_iov,
325 sg++; 325 sg++;
326 } 326 }
327 327
328 rm->data.op_active = 1;
329
328out: 330out:
329 return ret; 331 return ret;
330} 332}
diff --git a/net/rds/rds.h b/net/rds/rds.h
index 0c610a102c20..bf2349da4db7 100644
--- a/net/rds/rds.h
+++ b/net/rds/rds.h
@@ -341,6 +341,7 @@ struct rds_message {
341 struct rds_mr *m_rdma_mr; 341 struct rds_mr *m_rdma_mr;
342 } rdma; 342 } rdma;
343 struct rm_data_op { 343 struct rm_data_op {
344 unsigned int op_active:1;
344 unsigned int m_nents; 345 unsigned int m_nents;
345 unsigned int m_count; 346 unsigned int m_count;
346 struct scatterlist *m_sg; 347 struct scatterlist *m_sg;
@@ -418,7 +419,7 @@ struct rds_transport {
418 int (*xmit_cong_map)(struct rds_connection *conn, 419 int (*xmit_cong_map)(struct rds_connection *conn,
419 struct rds_cong_map *map, unsigned long offset); 420 struct rds_cong_map *map, unsigned long offset);
420 int (*xmit_rdma)(struct rds_connection *conn, struct rds_rdma_op *op); 421 int (*xmit_rdma)(struct rds_connection *conn, struct rds_rdma_op *op);
421 int (*xmit_atomic)(struct rds_connection *conn, struct rm_atomic_op *op); 422 int (*xmit_atomic)(struct rds_connection *conn, struct rds_message *rm);
422 int (*recv)(struct rds_connection *conn); 423 int (*recv)(struct rds_connection *conn);
423 int (*inc_copy_to_user)(struct rds_incoming *inc, struct iovec *iov, 424 int (*inc_copy_to_user)(struct rds_incoming *inc, struct iovec *iov,
424 size_t size); 425 size_t size);
diff --git a/net/rds/send.c b/net/rds/send.c
index 5bc35d2f40ea..42fb934293be 100644
--- a/net/rds/send.c
+++ b/net/rds/send.c
@@ -266,7 +266,7 @@ int rds_send_xmit(struct rds_connection *conn)
266 266
267 267
268 if (rm->atomic.op_active && !conn->c_xmit_atomic_sent) { 268 if (rm->atomic.op_active && !conn->c_xmit_atomic_sent) {
269 ret = conn->c_trans->xmit_atomic(conn, &rm->atomic); 269 ret = conn->c_trans->xmit_atomic(conn, rm);
270 if (ret) 270 if (ret)
271 break; 271 break;
272 conn->c_xmit_atomic_sent = 1; 272 conn->c_xmit_atomic_sent = 1;
@@ -285,13 +285,18 @@ int rds_send_xmit(struct rds_connection *conn)
285 if (ret) 285 if (ret)
286 break; 286 break;
287 conn->c_xmit_rdma_sent = 1; 287 conn->c_xmit_rdma_sent = 1;
288
289 /* rdmas need data sent, even if just the header */
290 rm->data.op_active = 1;
291
288 /* The transport owns the mapped memory for now. 292 /* The transport owns the mapped memory for now.
289 * You can't unmap it while it's on the send queue */ 293 * You can't unmap it while it's on the send queue */
290 set_bit(RDS_MSG_MAPPED, &rm->m_flags); 294 set_bit(RDS_MSG_MAPPED, &rm->m_flags);
291 } 295 }
292 296
293 if (conn->c_xmit_hdr_off < sizeof(struct rds_header) || 297 if (rm->data.op_active
294 conn->c_xmit_sg < rm->data.m_nents) { 298 && (conn->c_xmit_hdr_off < sizeof(struct rds_header) ||
299 conn->c_xmit_sg < rm->data.m_nents)) {
295 ret = conn->c_trans->xmit(conn, rm, 300 ret = conn->c_trans->xmit(conn, rm,
296 conn->c_xmit_hdr_off, 301 conn->c_xmit_hdr_off,
297 conn->c_xmit_sg, 302 conn->c_xmit_sg,