aboutsummaryrefslogtreecommitdiffstats
path: root/net/rds/ib_send.c
diff options
context:
space:
mode:
authorAndy Grover <andy.grover@oracle.com>2010-03-01 17:03:09 -0500
committerAndy Grover <andy.grover@oracle.com>2010-09-08 21:12:08 -0400
commitff3d7d36134ef7138803734fdbf91cc986ea7976 (patch)
tree77ef55e071f84f1b97550dbc6d6755cb3089339b /net/rds/ib_send.c
parentaa0a4ef4ac3a3c5ffa35e32520bfbc0922ef3630 (diff)
RDS: Perform unmapping ops in stages
Previously, RDS would wait until the final send WR had completed and then handle cleanup. With silent ops, we do not know if an atomic, rdma, or data op will be last. This patch handles any of these cases by keeping a pointer to the last op in the message in m_last_op. When the TX completion event fires, rds dispatches to per-op-type cleanup functions, and then does whole-message cleanup, if the last op equalled m_last_op. This patch also moves towards having op-specific functions take the op struct, instead of the overall rm struct. rds_ib_connection has a pointer to keep track of a a partially- completed data send operation. This patch changes it from an rds_message pointer to the narrower rm_data_op pointer, and modifies places that use this pointer as needed. Signed-off-by: Andy Grover <andy.grover@oracle.com>
Diffstat (limited to 'net/rds/ib_send.c')
-rw-r--r--net/rds/ib_send.c242
1 files changed, 125 insertions, 117 deletions
diff --git a/net/rds/ib_send.c b/net/rds/ib_send.c
index 95f15247acd7..6461a152bd5b 100644
--- a/net/rds/ib_send.c
+++ b/net/rds/ib_send.c
@@ -67,80 +67,122 @@ static void rds_ib_send_complete(struct rds_message *rm,
67 complete(rm, notify_status); 67 complete(rm, notify_status);
68} 68}
69 69
70static void rds_ib_send_unmap_rm(struct rds_ib_connection *ic, 70static void rds_ib_send_unmap_data(struct rds_ib_connection *ic,
71 struct rds_ib_send_work *send, 71 struct rm_data_op *op,
72 int wc_status) 72 int wc_status)
73{ 73{
74 struct rds_message *rm = send->s_rm; 74 if (op->op_nents)
75 75 ib_dma_unmap_sg(ic->i_cm_id->device,
76 rdsdebug("ic %p send %p rm %p\n", ic, send, rm); 76 op->op_sg, op->op_nents,
77 77 DMA_TO_DEVICE);
78 ib_dma_unmap_sg(ic->i_cm_id->device, 78}
79 rm->data.op_sg, rm->data.op_nents,
80 DMA_TO_DEVICE);
81 79
82 if (rm->rdma.op_active) { 80static void rds_ib_send_unmap_rdma(struct rds_ib_connection *ic,
83 struct rm_rdma_op *op = &rm->rdma; 81 struct rm_rdma_op *op,
82 int wc_status)
83{
84 if (op->op_mapped) {
85 ib_dma_unmap_sg(ic->i_cm_id->device,
86 op->op_sg, op->op_nents,
87 op->op_write ? DMA_TO_DEVICE : DMA_FROM_DEVICE);
88 op->op_mapped = 0;
89 }
84 90
85 if (op->op_mapped) { 91 /* If the user asked for a completion notification on this
86 ib_dma_unmap_sg(ic->i_cm_id->device, 92 * message, we can implement three different semantics:
87 op->op_sg, op->op_nents, 93 * 1. Notify when we received the ACK on the RDS message
88 op->op_write ? DMA_TO_DEVICE : DMA_FROM_DEVICE); 94 * that was queued with the RDMA. This provides reliable
89 op->op_mapped = 0; 95 * notification of RDMA status at the expense of a one-way
90 } 96 * packet delay.
97 * 2. Notify when the IB stack gives us the completion event for
98 * the RDMA operation.
99 * 3. Notify when the IB stack gives us the completion event for
100 * the accompanying RDS messages.
101 * Here, we implement approach #3. To implement approach #2,
102 * we would need to take an event for the rdma WR. To implement #1,
103 * don't call rds_rdma_send_complete at all, and fall back to the notify
104 * handling in the ACK processing code.
105 *
106 * Note: There's no need to explicitly sync any RDMA buffers using
107 * ib_dma_sync_sg_for_cpu - the completion for the RDMA
108 * operation itself unmapped the RDMA buffers, which takes care
109 * of synching.
110 */
111 rds_ib_send_complete(container_of(op, struct rds_message, rdma),
112 wc_status, rds_rdma_send_complete);
91 113
92 /* If the user asked for a completion notification on this 114 if (op->op_write)
93 * message, we can implement three different semantics: 115 rds_stats_add(s_send_rdma_bytes, op->op_bytes);
94 * 1. Notify when we received the ACK on the RDS message 116 else
95 * that was queued with the RDMA. This provides reliable 117 rds_stats_add(s_recv_rdma_bytes, op->op_bytes);
96 * notification of RDMA status at the expense of a one-way 118}
97 * packet delay.
98 * 2. Notify when the IB stack gives us the completion event for
99 * the RDMA operation.
100 * 3. Notify when the IB stack gives us the completion event for
101 * the accompanying RDS messages.
102 * Here, we implement approach #3. To implement approach #2,
103 * call rds_rdma_send_complete from the cq_handler. To implement #1,
104 * don't call rds_rdma_send_complete at all, and fall back to the notify
105 * handling in the ACK processing code.
106 *
107 * Note: There's no need to explicitly sync any RDMA buffers using
108 * ib_dma_sync_sg_for_cpu - the completion for the RDMA
109 * operation itself unmapped the RDMA buffers, which takes care
110 * of synching.
111 */
112 rds_ib_send_complete(rm, wc_status, rds_rdma_send_complete);
113 119
114 if (rm->rdma.op_write) 120static void rds_ib_send_unmap_atomic(struct rds_ib_connection *ic,
115 rds_stats_add(s_send_rdma_bytes, rm->rdma.op_bytes); 121 struct rm_atomic_op *op,
116 else 122 int wc_status)
117 rds_stats_add(s_recv_rdma_bytes, rm->rdma.op_bytes); 123{
124 /* unmap atomic recvbuf */
125 if (op->op_mapped) {
126 ib_dma_unmap_sg(ic->i_cm_id->device, op->op_sg, 1,
127 DMA_FROM_DEVICE);
128 op->op_mapped = 0;
118 } 129 }
119 130
120 if (rm->atomic.op_active) { 131 rds_ib_send_complete(container_of(op, struct rds_message, atomic),
121 struct rm_atomic_op *op = &rm->atomic; 132 wc_status, rds_atomic_send_complete);
122
123 /* unmap atomic recvbuf */
124 if (op->op_mapped) {
125 ib_dma_unmap_sg(ic->i_cm_id->device, op->op_sg, 1,
126 DMA_FROM_DEVICE);
127 op->op_mapped = 0;
128 }
129 133
130 rds_ib_send_complete(rm, wc_status, rds_atomic_send_complete); 134 if (op->op_type == RDS_ATOMIC_TYPE_CSWP)
135 rds_stats_inc(s_atomic_cswp);
136 else
137 rds_stats_inc(s_atomic_fadd);
138}
131 139
132 if (rm->atomic.op_type == RDS_ATOMIC_TYPE_CSWP) 140/*
133 rds_stats_inc(s_atomic_cswp); 141 * Unmap the resources associated with a struct send_work.
134 else 142 *
135 rds_stats_inc(s_atomic_fadd); 143 * Returns the rm for no good reason other than it is unobtainable
144 * other than by switching on wr.opcode, currently, and the caller,
145 * the event handler, needs it.
146 */
147static struct rds_message *rds_ib_send_unmap_op(struct rds_ib_connection *ic,
148 struct rds_ib_send_work *send,
149 int wc_status)
150{
151 struct rds_message *rm = NULL;
152
153 /* In the error case, wc.opcode sometimes contains garbage */
154 switch (send->s_wr.opcode) {
155 case IB_WR_SEND:
156 if (send->s_op) {
157 rm = container_of(send->s_op, struct rds_message, data);
158 rds_ib_send_unmap_data(ic, send->s_op, wc_status);
159 }
160 break;
161 case IB_WR_RDMA_WRITE:
162 case IB_WR_RDMA_READ:
163 if (send->s_op) {
164 rm = container_of(send->s_op, struct rds_message, rdma);
165 rds_ib_send_unmap_rdma(ic, send->s_op, wc_status);
166 }
167 break;
168 case IB_WR_ATOMIC_FETCH_AND_ADD:
169 case IB_WR_ATOMIC_CMP_AND_SWP:
170 if (send->s_op) {
171 rm = container_of(send->s_op, struct rds_message, atomic);
172 rds_ib_send_unmap_atomic(ic, send->s_op, wc_status);
173 }
174 break;
175 default:
176 if (printk_ratelimit())
177 printk(KERN_NOTICE
178 "RDS/IB: %s: unexpected opcode 0x%x in WR!\n",
179 __func__, send->s_wr.opcode);
180 break;
136 } 181 }
137 182
138 /* If anyone waited for this message to get flushed out, wake 183 send->s_wr.opcode = 0xdead;
139 * them up now */
140 rds_message_unmapped(rm);
141 184
142 rds_message_put(rm); 185 return rm;
143 send->s_rm = NULL;
144} 186}
145 187
146void rds_ib_send_init_ring(struct rds_ib_connection *ic) 188void rds_ib_send_init_ring(struct rds_ib_connection *ic)
@@ -151,7 +193,6 @@ void rds_ib_send_init_ring(struct rds_ib_connection *ic)
151 for (i = 0, send = ic->i_sends; i < ic->i_send_ring.w_nr; i++, send++) { 193 for (i = 0, send = ic->i_sends; i < ic->i_send_ring.w_nr; i++, send++) {
152 struct ib_sge *sge; 194 struct ib_sge *sge;
153 195
154 send->s_rm = NULL;
155 send->s_op = NULL; 196 send->s_op = NULL;
156 197
157 send->s_wr.wr_id = i; 198 send->s_wr.wr_id = i;
@@ -173,9 +214,8 @@ void rds_ib_send_clear_ring(struct rds_ib_connection *ic)
173 u32 i; 214 u32 i;
174 215
175 for (i = 0, send = ic->i_sends; i < ic->i_send_ring.w_nr; i++, send++) { 216 for (i = 0, send = ic->i_sends; i < ic->i_send_ring.w_nr; i++, send++) {
176 if (!send->s_rm || send->s_wr.opcode == 0xdead) 217 if (send->s_op && send->s_wr.opcode != 0xdead)
177 continue; 218 rds_ib_send_unmap_op(ic, send, IB_WC_WR_FLUSH_ERR);
178 rds_ib_send_unmap_rm(ic, send, IB_WC_WR_FLUSH_ERR);
179 } 219 }
180} 220}
181 221
@@ -189,6 +229,7 @@ void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context)
189{ 229{
190 struct rds_connection *conn = context; 230 struct rds_connection *conn = context;
191 struct rds_ib_connection *ic = conn->c_transport_data; 231 struct rds_ib_connection *ic = conn->c_transport_data;
232 struct rds_message *rm = NULL;
192 struct ib_wc wc; 233 struct ib_wc wc;
193 struct rds_ib_send_work *send; 234 struct rds_ib_send_work *send;
194 u32 completed; 235 u32 completed;
@@ -222,42 +263,18 @@ void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context)
222 for (i = 0; i < completed; i++) { 263 for (i = 0; i < completed; i++) {
223 send = &ic->i_sends[oldest]; 264 send = &ic->i_sends[oldest];
224 265
225 /* In the error case, wc.opcode sometimes contains garbage */ 266 rm = rds_ib_send_unmap_op(ic, send, wc.status);
226 switch (send->s_wr.opcode) {
227 case IB_WR_SEND:
228 case IB_WR_RDMA_WRITE:
229 case IB_WR_RDMA_READ:
230 case IB_WR_ATOMIC_FETCH_AND_ADD:
231 case IB_WR_ATOMIC_CMP_AND_SWP:
232 if (send->s_rm)
233 rds_ib_send_unmap_rm(ic, send, wc.status);
234 break;
235 default:
236 if (printk_ratelimit())
237 printk(KERN_NOTICE
238 "RDS/IB: %s: unexpected opcode 0x%x in WR!\n",
239 __func__, send->s_wr.opcode);
240 break;
241 }
242 267
243 send->s_wr.opcode = 0xdead;
244 send->s_wr.num_sge = 1;
245 if (send->s_queued + HZ/2 < jiffies) 268 if (send->s_queued + HZ/2 < jiffies)
246 rds_ib_stats_inc(s_ib_tx_stalled); 269 rds_ib_stats_inc(s_ib_tx_stalled);
247 270
248 /* If a RDMA operation produced an error, signal this right 271 if (&send->s_op == &rm->m_final_op) {
249 * away. If we don't, the subsequent SEND that goes with this 272 /* If anyone waited for this message to get flushed out, wake
250 * RDMA will be canceled with ERR_WFLUSH, and the application 273 * them up now */
251 * never learn that the RDMA failed. */ 274 rds_message_unmapped(rm);
252 if (unlikely(wc.status == IB_WC_REM_ACCESS_ERR && send->s_op)) { 275
253 struct rds_message *rm; 276 rds_message_put(rm);
254 277 send->s_op = NULL;
255 rm = rds_send_get_message(conn, send->s_op);
256 if (rm) {
257 rds_ib_send_unmap_rm(ic, send, wc.status);
258 rds_ib_send_complete(rm, wc.status, rds_rdma_send_complete);
259 rds_message_put(rm);
260 }
261 } 278 }
262 279
263 oldest = (oldest + 1) % ic->i_send_ring.w_nr; 280 oldest = (oldest + 1) % ic->i_send_ring.w_nr;
@@ -512,7 +529,7 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
512 } 529 }
513 530
514 /* map the message the first time we see it */ 531 /* map the message the first time we see it */
515 if (!ic->i_rm) { 532 if (!ic->i_data_op) {
516 if (rm->data.op_nents) { 533 if (rm->data.op_nents) {
517 rm->data.op_count = ib_dma_map_sg(dev, 534 rm->data.op_count = ib_dma_map_sg(dev,
518 rm->data.op_sg, 535 rm->data.op_sg,
@@ -530,7 +547,7 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
530 } 547 }
531 548
532 rds_message_addref(rm); 549 rds_message_addref(rm);
533 ic->i_rm = rm; 550 ic->i_data_op = &rm->data;
534 551
535 /* Finalize the header */ 552 /* Finalize the header */
536 if (test_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags)) 553 if (test_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags))
@@ -583,7 +600,7 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
583 send = &ic->i_sends[pos]; 600 send = &ic->i_sends[pos];
584 first = send; 601 first = send;
585 prev = NULL; 602 prev = NULL;
586 scat = &rm->data.op_sg[sg]; 603 scat = &ic->i_data_op->op_sg[sg];
587 i = 0; 604 i = 0;
588 do { 605 do {
589 unsigned int len = 0; 606 unsigned int len = 0;
@@ -658,9 +675,9 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
658 675
659 /* if we finished the message then send completion owns it */ 676 /* if we finished the message then send completion owns it */
660 if (scat == &rm->data.op_sg[rm->data.op_count]) { 677 if (scat == &rm->data.op_sg[rm->data.op_count]) {
661 prev->s_rm = ic->i_rm; 678 prev->s_op = ic->i_data_op;
662 prev->s_wr.send_flags |= IB_SEND_SOLICITED; 679 prev->s_wr.send_flags |= IB_SEND_SOLICITED;
663 ic->i_rm = NULL; 680 ic->i_data_op = NULL;
664 } 681 }
665 682
666 /* Put back wrs & credits we didn't use */ 683 /* Put back wrs & credits we didn't use */
@@ -681,9 +698,9 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
681 printk(KERN_WARNING "RDS/IB: ib_post_send to %pI4 " 698 printk(KERN_WARNING "RDS/IB: ib_post_send to %pI4 "
682 "returned %d\n", &conn->c_faddr, ret); 699 "returned %d\n", &conn->c_faddr, ret);
683 rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc); 700 rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
684 if (prev->s_rm) { 701 if (prev->s_op) {
685 ic->i_rm = prev->s_rm; 702 ic->i_data_op = prev->s_op;
686 prev->s_rm = NULL; 703 prev->s_op = NULL;
687 } 704 }
688 705
689 rds_ib_conn_error(ic->conn, "ib_post_send failed\n"); 706 rds_ib_conn_error(ic->conn, "ib_post_send failed\n");
@@ -701,10 +718,9 @@ out:
701 * A simplified version of the rdma case, we always map 1 SG, and 718 * A simplified version of the rdma case, we always map 1 SG, and
702 * only 8 bytes, for the return value from the atomic operation. 719 * only 8 bytes, for the return value from the atomic operation.
703 */ 720 */
704int rds_ib_xmit_atomic(struct rds_connection *conn, struct rds_message *rm) 721int rds_ib_xmit_atomic(struct rds_connection *conn, struct rm_atomic_op *op)
705{ 722{
706 struct rds_ib_connection *ic = conn->c_transport_data; 723 struct rds_ib_connection *ic = conn->c_transport_data;
707 struct rm_atomic_op *op = &rm->atomic;
708 struct rds_ib_send_work *send = NULL; 724 struct rds_ib_send_work *send = NULL;
709 struct ib_send_wr *failed_wr; 725 struct ib_send_wr *failed_wr;
710 struct rds_ib_device *rds_ibdev; 726 struct rds_ib_device *rds_ibdev;
@@ -741,14 +757,6 @@ int rds_ib_xmit_atomic(struct rds_connection *conn, struct rds_message *rm)
741 send->s_wr.wr.atomic.remote_addr = op->op_remote_addr; 757 send->s_wr.wr.atomic.remote_addr = op->op_remote_addr;
742 send->s_wr.wr.atomic.rkey = op->op_rkey; 758 send->s_wr.wr.atomic.rkey = op->op_rkey;
743 759
744 /*
745 * If there is no data or rdma ops in the message, then
746 * we must fill in s_rm ourselves, so we properly clean up
747 * on completion.
748 */
749 if (!rm->rdma.op_active && !rm->data.op_active)
750 send->s_rm = rm;
751
752 /* map 8 byte retval buffer to the device */ 760 /* map 8 byte retval buffer to the device */
753 ret = ib_dma_map_sg(ic->i_cm_id->device, op->op_sg, 1, DMA_FROM_DEVICE); 761 ret = ib_dma_map_sg(ic->i_cm_id->device, op->op_sg, 1, DMA_FROM_DEVICE);
754 rdsdebug("ic %p mapping atomic op %p. mapped %d pg\n", ic, op, ret); 762 rdsdebug("ic %p mapping atomic op %p. mapped %d pg\n", ic, op, ret);
@@ -809,7 +817,7 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op)
809 817
810 rds_ibdev = ib_get_client_data(ic->i_cm_id->device, &rds_ib_client); 818 rds_ibdev = ib_get_client_data(ic->i_cm_id->device, &rds_ib_client);
811 819
812 /* map the message the first time we see it */ 820 /* map the op the first time we see it */
813 if (!op->op_mapped) { 821 if (!op->op_mapped) {
814 op->op_count = ib_dma_map_sg(ic->i_cm_id->device, 822 op->op_count = ib_dma_map_sg(ic->i_cm_id->device,
815 op->op_sg, op->op_nents, (op->op_write) ? 823 op->op_sg, op->op_nents, (op->op_write) ?