aboutsummaryrefslogtreecommitdiffstats
path: root/net
diff options
context:
space:
mode:
authorAndy Grover <andy.grover@oracle.com>2010-03-01 17:03:09 -0500
committerAndy Grover <andy.grover@oracle.com>2010-09-08 21:12:08 -0400
commitff3d7d36134ef7138803734fdbf91cc986ea7976 (patch)
tree77ef55e071f84f1b97550dbc6d6755cb3089339b /net
parentaa0a4ef4ac3a3c5ffa35e32520bfbc0922ef3630 (diff)
RDS: Perform unmapping ops in stages
Previously, RDS would wait until the final send WR had completed and then handle cleanup. With silent ops, we do not know if an atomic, rdma, or data op will be last. This patch handles any of these cases by keeping a pointer to the last op in the message in m_last_op. When the TX completion event fires, rds dispatches to per-op-type cleanup functions, and then does whole-message cleanup, if the last op equalled m_last_op. This patch also moves towards having op-specific functions take the op struct, instead of the overall rm struct. rds_ib_connection has a pointer to keep track of a a partially- completed data send operation. This patch changes it from an rds_message pointer to the narrower rm_data_op pointer, and modifies places that use this pointer as needed. Signed-off-by: Andy Grover <andy.grover@oracle.com>
Diffstat (limited to 'net')
-rw-r--r--net/rds/ib.h7
-rw-r--r--net/rds/ib_cm.c9
-rw-r--r--net/rds/ib_send.c242
-rw-r--r--net/rds/rds.h4
-rw-r--r--net/rds/send.c6
5 files changed, 142 insertions, 126 deletions
diff --git a/net/rds/ib.h b/net/rds/ib.h
index d64b5087eefe..202140a84f0c 100644
--- a/net/rds/ib.h
+++ b/net/rds/ib.h
@@ -53,8 +53,7 @@ struct rds_ib_connect_private {
53}; 53};
54 54
55struct rds_ib_send_work { 55struct rds_ib_send_work {
56 struct rds_message *s_rm; 56 void *s_op;
57 struct rm_rdma_op *s_op;
58 struct ib_send_wr s_wr; 57 struct ib_send_wr s_wr;
59 struct ib_sge s_sge[RDS_IB_MAX_SGE]; 58 struct ib_sge s_sge[RDS_IB_MAX_SGE];
60 unsigned long s_queued; 59 unsigned long s_queued;
@@ -92,7 +91,7 @@ struct rds_ib_connection {
92 91
93 /* tx */ 92 /* tx */
94 struct rds_ib_work_ring i_send_ring; 93 struct rds_ib_work_ring i_send_ring;
95 struct rds_message *i_rm; 94 struct rm_data_op *i_data_op;
96 struct rds_header *i_send_hdrs; 95 struct rds_header *i_send_hdrs;
97 u64 i_send_hdrs_dma; 96 u64 i_send_hdrs_dma;
98 struct rds_ib_send_work *i_sends; 97 struct rds_ib_send_work *i_sends;
@@ -336,7 +335,7 @@ void rds_ib_send_add_credits(struct rds_connection *conn, unsigned int credits);
336void rds_ib_advertise_credits(struct rds_connection *conn, unsigned int posted); 335void rds_ib_advertise_credits(struct rds_connection *conn, unsigned int posted);
337int rds_ib_send_grab_credits(struct rds_ib_connection *ic, u32 wanted, 336int rds_ib_send_grab_credits(struct rds_ib_connection *ic, u32 wanted,
338 u32 *adv_credits, int need_posted, int max_posted); 337 u32 *adv_credits, int need_posted, int max_posted);
339int rds_ib_xmit_atomic(struct rds_connection *conn, struct rds_message *rm); 338int rds_ib_xmit_atomic(struct rds_connection *conn, struct rm_atomic_op *op);
340 339
341/* ib_stats.c */ 340/* ib_stats.c */
342DECLARE_PER_CPU(struct rds_ib_statistics, rds_ib_stats); 341DECLARE_PER_CPU(struct rds_ib_statistics, rds_ib_stats);
diff --git a/net/rds/ib_cm.c b/net/rds/ib_cm.c
index 8b0c743c0900..b5b5ebbc0bb6 100644
--- a/net/rds/ib_cm.c
+++ b/net/rds/ib_cm.c
@@ -673,9 +673,12 @@ void rds_ib_conn_shutdown(struct rds_connection *conn)
673 BUG_ON(ic->rds_ibdev); 673 BUG_ON(ic->rds_ibdev);
674 674
675 /* Clear pending transmit */ 675 /* Clear pending transmit */
676 if (ic->i_rm) { 676 if (ic->i_data_op) {
677 rds_message_put(ic->i_rm); 677 struct rds_message *rm;
678 ic->i_rm = NULL; 678
679 rm = container_of(ic->i_data_op, struct rds_message, data);
680 rds_message_put(rm);
681 ic->i_data_op = NULL;
679 } 682 }
680 683
681 /* Clear the ACK state */ 684 /* Clear the ACK state */
diff --git a/net/rds/ib_send.c b/net/rds/ib_send.c
index 95f15247acd7..6461a152bd5b 100644
--- a/net/rds/ib_send.c
+++ b/net/rds/ib_send.c
@@ -67,80 +67,122 @@ static void rds_ib_send_complete(struct rds_message *rm,
67 complete(rm, notify_status); 67 complete(rm, notify_status);
68} 68}
69 69
70static void rds_ib_send_unmap_rm(struct rds_ib_connection *ic, 70static void rds_ib_send_unmap_data(struct rds_ib_connection *ic,
71 struct rds_ib_send_work *send, 71 struct rm_data_op *op,
72 int wc_status) 72 int wc_status)
73{ 73{
74 struct rds_message *rm = send->s_rm; 74 if (op->op_nents)
75 75 ib_dma_unmap_sg(ic->i_cm_id->device,
76 rdsdebug("ic %p send %p rm %p\n", ic, send, rm); 76 op->op_sg, op->op_nents,
77 77 DMA_TO_DEVICE);
78 ib_dma_unmap_sg(ic->i_cm_id->device, 78}
79 rm->data.op_sg, rm->data.op_nents,
80 DMA_TO_DEVICE);
81 79
82 if (rm->rdma.op_active) { 80static void rds_ib_send_unmap_rdma(struct rds_ib_connection *ic,
83 struct rm_rdma_op *op = &rm->rdma; 81 struct rm_rdma_op *op,
82 int wc_status)
83{
84 if (op->op_mapped) {
85 ib_dma_unmap_sg(ic->i_cm_id->device,
86 op->op_sg, op->op_nents,
87 op->op_write ? DMA_TO_DEVICE : DMA_FROM_DEVICE);
88 op->op_mapped = 0;
89 }
84 90
85 if (op->op_mapped) { 91 /* If the user asked for a completion notification on this
86 ib_dma_unmap_sg(ic->i_cm_id->device, 92 * message, we can implement three different semantics:
87 op->op_sg, op->op_nents, 93 * 1. Notify when we received the ACK on the RDS message
88 op->op_write ? DMA_TO_DEVICE : DMA_FROM_DEVICE); 94 * that was queued with the RDMA. This provides reliable
89 op->op_mapped = 0; 95 * notification of RDMA status at the expense of a one-way
90 } 96 * packet delay.
97 * 2. Notify when the IB stack gives us the completion event for
98 * the RDMA operation.
99 * 3. Notify when the IB stack gives us the completion event for
100 * the accompanying RDS messages.
101 * Here, we implement approach #3. To implement approach #2,
102 * we would need to take an event for the rdma WR. To implement #1,
103 * don't call rds_rdma_send_complete at all, and fall back to the notify
104 * handling in the ACK processing code.
105 *
106 * Note: There's no need to explicitly sync any RDMA buffers using
107 * ib_dma_sync_sg_for_cpu - the completion for the RDMA
108 * operation itself unmapped the RDMA buffers, which takes care
109 * of synching.
110 */
111 rds_ib_send_complete(container_of(op, struct rds_message, rdma),
112 wc_status, rds_rdma_send_complete);
91 113
92 /* If the user asked for a completion notification on this 114 if (op->op_write)
93 * message, we can implement three different semantics: 115 rds_stats_add(s_send_rdma_bytes, op->op_bytes);
94 * 1. Notify when we received the ACK on the RDS message 116 else
95 * that was queued with the RDMA. This provides reliable 117 rds_stats_add(s_recv_rdma_bytes, op->op_bytes);
96 * notification of RDMA status at the expense of a one-way 118}
97 * packet delay.
98 * 2. Notify when the IB stack gives us the completion event for
99 * the RDMA operation.
100 * 3. Notify when the IB stack gives us the completion event for
101 * the accompanying RDS messages.
102 * Here, we implement approach #3. To implement approach #2,
103 * call rds_rdma_send_complete from the cq_handler. To implement #1,
104 * don't call rds_rdma_send_complete at all, and fall back to the notify
105 * handling in the ACK processing code.
106 *
107 * Note: There's no need to explicitly sync any RDMA buffers using
108 * ib_dma_sync_sg_for_cpu - the completion for the RDMA
109 * operation itself unmapped the RDMA buffers, which takes care
110 * of synching.
111 */
112 rds_ib_send_complete(rm, wc_status, rds_rdma_send_complete);
113 119
114 if (rm->rdma.op_write) 120static void rds_ib_send_unmap_atomic(struct rds_ib_connection *ic,
115 rds_stats_add(s_send_rdma_bytes, rm->rdma.op_bytes); 121 struct rm_atomic_op *op,
116 else 122 int wc_status)
117 rds_stats_add(s_recv_rdma_bytes, rm->rdma.op_bytes); 123{
124 /* unmap atomic recvbuf */
125 if (op->op_mapped) {
126 ib_dma_unmap_sg(ic->i_cm_id->device, op->op_sg, 1,
127 DMA_FROM_DEVICE);
128 op->op_mapped = 0;
118 } 129 }
119 130
120 if (rm->atomic.op_active) { 131 rds_ib_send_complete(container_of(op, struct rds_message, atomic),
121 struct rm_atomic_op *op = &rm->atomic; 132 wc_status, rds_atomic_send_complete);
122
123 /* unmap atomic recvbuf */
124 if (op->op_mapped) {
125 ib_dma_unmap_sg(ic->i_cm_id->device, op->op_sg, 1,
126 DMA_FROM_DEVICE);
127 op->op_mapped = 0;
128 }
129 133
130 rds_ib_send_complete(rm, wc_status, rds_atomic_send_complete); 134 if (op->op_type == RDS_ATOMIC_TYPE_CSWP)
135 rds_stats_inc(s_atomic_cswp);
136 else
137 rds_stats_inc(s_atomic_fadd);
138}
131 139
132 if (rm->atomic.op_type == RDS_ATOMIC_TYPE_CSWP) 140/*
133 rds_stats_inc(s_atomic_cswp); 141 * Unmap the resources associated with a struct send_work.
134 else 142 *
135 rds_stats_inc(s_atomic_fadd); 143 * Returns the rm for no good reason other than it is unobtainable
144 * other than by switching on wr.opcode, currently, and the caller,
145 * the event handler, needs it.
146 */
147static struct rds_message *rds_ib_send_unmap_op(struct rds_ib_connection *ic,
148 struct rds_ib_send_work *send,
149 int wc_status)
150{
151 struct rds_message *rm = NULL;
152
153 /* In the error case, wc.opcode sometimes contains garbage */
154 switch (send->s_wr.opcode) {
155 case IB_WR_SEND:
156 if (send->s_op) {
157 rm = container_of(send->s_op, struct rds_message, data);
158 rds_ib_send_unmap_data(ic, send->s_op, wc_status);
159 }
160 break;
161 case IB_WR_RDMA_WRITE:
162 case IB_WR_RDMA_READ:
163 if (send->s_op) {
164 rm = container_of(send->s_op, struct rds_message, rdma);
165 rds_ib_send_unmap_rdma(ic, send->s_op, wc_status);
166 }
167 break;
168 case IB_WR_ATOMIC_FETCH_AND_ADD:
169 case IB_WR_ATOMIC_CMP_AND_SWP:
170 if (send->s_op) {
171 rm = container_of(send->s_op, struct rds_message, atomic);
172 rds_ib_send_unmap_atomic(ic, send->s_op, wc_status);
173 }
174 break;
175 default:
176 if (printk_ratelimit())
177 printk(KERN_NOTICE
178 "RDS/IB: %s: unexpected opcode 0x%x in WR!\n",
179 __func__, send->s_wr.opcode);
180 break;
136 } 181 }
137 182
138 /* If anyone waited for this message to get flushed out, wake 183 send->s_wr.opcode = 0xdead;
139 * them up now */
140 rds_message_unmapped(rm);
141 184
142 rds_message_put(rm); 185 return rm;
143 send->s_rm = NULL;
144} 186}
145 187
146void rds_ib_send_init_ring(struct rds_ib_connection *ic) 188void rds_ib_send_init_ring(struct rds_ib_connection *ic)
@@ -151,7 +193,6 @@ void rds_ib_send_init_ring(struct rds_ib_connection *ic)
151 for (i = 0, send = ic->i_sends; i < ic->i_send_ring.w_nr; i++, send++) { 193 for (i = 0, send = ic->i_sends; i < ic->i_send_ring.w_nr; i++, send++) {
152 struct ib_sge *sge; 194 struct ib_sge *sge;
153 195
154 send->s_rm = NULL;
155 send->s_op = NULL; 196 send->s_op = NULL;
156 197
157 send->s_wr.wr_id = i; 198 send->s_wr.wr_id = i;
@@ -173,9 +214,8 @@ void rds_ib_send_clear_ring(struct rds_ib_connection *ic)
173 u32 i; 214 u32 i;
174 215
175 for (i = 0, send = ic->i_sends; i < ic->i_send_ring.w_nr; i++, send++) { 216 for (i = 0, send = ic->i_sends; i < ic->i_send_ring.w_nr; i++, send++) {
176 if (!send->s_rm || send->s_wr.opcode == 0xdead) 217 if (send->s_op && send->s_wr.opcode != 0xdead)
177 continue; 218 rds_ib_send_unmap_op(ic, send, IB_WC_WR_FLUSH_ERR);
178 rds_ib_send_unmap_rm(ic, send, IB_WC_WR_FLUSH_ERR);
179 } 219 }
180} 220}
181 221
@@ -189,6 +229,7 @@ void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context)
189{ 229{
190 struct rds_connection *conn = context; 230 struct rds_connection *conn = context;
191 struct rds_ib_connection *ic = conn->c_transport_data; 231 struct rds_ib_connection *ic = conn->c_transport_data;
232 struct rds_message *rm = NULL;
192 struct ib_wc wc; 233 struct ib_wc wc;
193 struct rds_ib_send_work *send; 234 struct rds_ib_send_work *send;
194 u32 completed; 235 u32 completed;
@@ -222,42 +263,18 @@ void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context)
222 for (i = 0; i < completed; i++) { 263 for (i = 0; i < completed; i++) {
223 send = &ic->i_sends[oldest]; 264 send = &ic->i_sends[oldest];
224 265
225 /* In the error case, wc.opcode sometimes contains garbage */ 266 rm = rds_ib_send_unmap_op(ic, send, wc.status);
226 switch (send->s_wr.opcode) {
227 case IB_WR_SEND:
228 case IB_WR_RDMA_WRITE:
229 case IB_WR_RDMA_READ:
230 case IB_WR_ATOMIC_FETCH_AND_ADD:
231 case IB_WR_ATOMIC_CMP_AND_SWP:
232 if (send->s_rm)
233 rds_ib_send_unmap_rm(ic, send, wc.status);
234 break;
235 default:
236 if (printk_ratelimit())
237 printk(KERN_NOTICE
238 "RDS/IB: %s: unexpected opcode 0x%x in WR!\n",
239 __func__, send->s_wr.opcode);
240 break;
241 }
242 267
243 send->s_wr.opcode = 0xdead;
244 send->s_wr.num_sge = 1;
245 if (send->s_queued + HZ/2 < jiffies) 268 if (send->s_queued + HZ/2 < jiffies)
246 rds_ib_stats_inc(s_ib_tx_stalled); 269 rds_ib_stats_inc(s_ib_tx_stalled);
247 270
248 /* If a RDMA operation produced an error, signal this right 271 if (&send->s_op == &rm->m_final_op) {
249 * away. If we don't, the subsequent SEND that goes with this 272 /* If anyone waited for this message to get flushed out, wake
250 * RDMA will be canceled with ERR_WFLUSH, and the application 273 * them up now */
251 * never learn that the RDMA failed. */ 274 rds_message_unmapped(rm);
252 if (unlikely(wc.status == IB_WC_REM_ACCESS_ERR && send->s_op)) { 275
253 struct rds_message *rm; 276 rds_message_put(rm);
254 277 send->s_op = NULL;
255 rm = rds_send_get_message(conn, send->s_op);
256 if (rm) {
257 rds_ib_send_unmap_rm(ic, send, wc.status);
258 rds_ib_send_complete(rm, wc.status, rds_rdma_send_complete);
259 rds_message_put(rm);
260 }
261 } 278 }
262 279
263 oldest = (oldest + 1) % ic->i_send_ring.w_nr; 280 oldest = (oldest + 1) % ic->i_send_ring.w_nr;
@@ -512,7 +529,7 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
512 } 529 }
513 530
514 /* map the message the first time we see it */ 531 /* map the message the first time we see it */
515 if (!ic->i_rm) { 532 if (!ic->i_data_op) {
516 if (rm->data.op_nents) { 533 if (rm->data.op_nents) {
517 rm->data.op_count = ib_dma_map_sg(dev, 534 rm->data.op_count = ib_dma_map_sg(dev,
518 rm->data.op_sg, 535 rm->data.op_sg,
@@ -530,7 +547,7 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
530 } 547 }
531 548
532 rds_message_addref(rm); 549 rds_message_addref(rm);
533 ic->i_rm = rm; 550 ic->i_data_op = &rm->data;
534 551
535 /* Finalize the header */ 552 /* Finalize the header */
536 if (test_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags)) 553 if (test_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags))
@@ -583,7 +600,7 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
583 send = &ic->i_sends[pos]; 600 send = &ic->i_sends[pos];
584 first = send; 601 first = send;
585 prev = NULL; 602 prev = NULL;
586 scat = &rm->data.op_sg[sg]; 603 scat = &ic->i_data_op->op_sg[sg];
587 i = 0; 604 i = 0;
588 do { 605 do {
589 unsigned int len = 0; 606 unsigned int len = 0;
@@ -658,9 +675,9 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
658 675
659 /* if we finished the message then send completion owns it */ 676 /* if we finished the message then send completion owns it */
660 if (scat == &rm->data.op_sg[rm->data.op_count]) { 677 if (scat == &rm->data.op_sg[rm->data.op_count]) {
661 prev->s_rm = ic->i_rm; 678 prev->s_op = ic->i_data_op;
662 prev->s_wr.send_flags |= IB_SEND_SOLICITED; 679 prev->s_wr.send_flags |= IB_SEND_SOLICITED;
663 ic->i_rm = NULL; 680 ic->i_data_op = NULL;
664 } 681 }
665 682
666 /* Put back wrs & credits we didn't use */ 683 /* Put back wrs & credits we didn't use */
@@ -681,9 +698,9 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
681 printk(KERN_WARNING "RDS/IB: ib_post_send to %pI4 " 698 printk(KERN_WARNING "RDS/IB: ib_post_send to %pI4 "
682 "returned %d\n", &conn->c_faddr, ret); 699 "returned %d\n", &conn->c_faddr, ret);
683 rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc); 700 rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
684 if (prev->s_rm) { 701 if (prev->s_op) {
685 ic->i_rm = prev->s_rm; 702 ic->i_data_op = prev->s_op;
686 prev->s_rm = NULL; 703 prev->s_op = NULL;
687 } 704 }
688 705
689 rds_ib_conn_error(ic->conn, "ib_post_send failed\n"); 706 rds_ib_conn_error(ic->conn, "ib_post_send failed\n");
@@ -701,10 +718,9 @@ out:
701 * A simplified version of the rdma case, we always map 1 SG, and 718 * A simplified version of the rdma case, we always map 1 SG, and
702 * only 8 bytes, for the return value from the atomic operation. 719 * only 8 bytes, for the return value from the atomic operation.
703 */ 720 */
704int rds_ib_xmit_atomic(struct rds_connection *conn, struct rds_message *rm) 721int rds_ib_xmit_atomic(struct rds_connection *conn, struct rm_atomic_op *op)
705{ 722{
706 struct rds_ib_connection *ic = conn->c_transport_data; 723 struct rds_ib_connection *ic = conn->c_transport_data;
707 struct rm_atomic_op *op = &rm->atomic;
708 struct rds_ib_send_work *send = NULL; 724 struct rds_ib_send_work *send = NULL;
709 struct ib_send_wr *failed_wr; 725 struct ib_send_wr *failed_wr;
710 struct rds_ib_device *rds_ibdev; 726 struct rds_ib_device *rds_ibdev;
@@ -741,14 +757,6 @@ int rds_ib_xmit_atomic(struct rds_connection *conn, struct rds_message *rm)
741 send->s_wr.wr.atomic.remote_addr = op->op_remote_addr; 757 send->s_wr.wr.atomic.remote_addr = op->op_remote_addr;
742 send->s_wr.wr.atomic.rkey = op->op_rkey; 758 send->s_wr.wr.atomic.rkey = op->op_rkey;
743 759
744 /*
745 * If there is no data or rdma ops in the message, then
746 * we must fill in s_rm ourselves, so we properly clean up
747 * on completion.
748 */
749 if (!rm->rdma.op_active && !rm->data.op_active)
750 send->s_rm = rm;
751
752 /* map 8 byte retval buffer to the device */ 760 /* map 8 byte retval buffer to the device */
753 ret = ib_dma_map_sg(ic->i_cm_id->device, op->op_sg, 1, DMA_FROM_DEVICE); 761 ret = ib_dma_map_sg(ic->i_cm_id->device, op->op_sg, 1, DMA_FROM_DEVICE);
754 rdsdebug("ic %p mapping atomic op %p. mapped %d pg\n", ic, op, ret); 762 rdsdebug("ic %p mapping atomic op %p. mapped %d pg\n", ic, op, ret);
@@ -809,7 +817,7 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op)
809 817
810 rds_ibdev = ib_get_client_data(ic->i_cm_id->device, &rds_ib_client); 818 rds_ibdev = ib_get_client_data(ic->i_cm_id->device, &rds_ib_client);
811 819
812 /* map the message the first time we see it */ 820 /* map the op the first time we see it */
813 if (!op->op_mapped) { 821 if (!op->op_mapped) {
814 op->op_count = ib_dma_map_sg(ic->i_cm_id->device, 822 op->op_count = ib_dma_map_sg(ic->i_cm_id->device,
815 op->op_sg, op->op_nents, (op->op_write) ? 823 op->op_sg, op->op_nents, (op->op_write) ?
diff --git a/net/rds/rds.h b/net/rds/rds.h
index 23b921000e74..7291f006f364 100644
--- a/net/rds/rds.h
+++ b/net/rds/rds.h
@@ -308,6 +308,8 @@ struct rds_message {
308 unsigned int m_used_sgs; 308 unsigned int m_used_sgs;
309 unsigned int m_total_sgs; 309 unsigned int m_total_sgs;
310 310
311 void *m_final_op;
312
311 struct { 313 struct {
312 struct rm_atomic_op { 314 struct rm_atomic_op {
313 int op_type; 315 int op_type;
@@ -421,7 +423,7 @@ struct rds_transport {
421 int (*xmit_cong_map)(struct rds_connection *conn, 423 int (*xmit_cong_map)(struct rds_connection *conn,
422 struct rds_cong_map *map, unsigned long offset); 424 struct rds_cong_map *map, unsigned long offset);
423 int (*xmit_rdma)(struct rds_connection *conn, struct rm_rdma_op *op); 425 int (*xmit_rdma)(struct rds_connection *conn, struct rm_rdma_op *op);
424 int (*xmit_atomic)(struct rds_connection *conn, struct rds_message *rm); 426 int (*xmit_atomic)(struct rds_connection *conn, struct rm_atomic_op *op);
425 int (*recv)(struct rds_connection *conn); 427 int (*recv)(struct rds_connection *conn);
426 int (*inc_copy_to_user)(struct rds_incoming *inc, struct iovec *iov, 428 int (*inc_copy_to_user)(struct rds_incoming *inc, struct iovec *iov,
427 size_t size); 429 size_t size);
diff --git a/net/rds/send.c b/net/rds/send.c
index 69ab1040d02d..d1f364e44e36 100644
--- a/net/rds/send.c
+++ b/net/rds/send.c
@@ -252,6 +252,7 @@ int rds_send_xmit(struct rds_connection *conn)
252 252
253 /* The transport either sends the whole rdma or none of it */ 253 /* The transport either sends the whole rdma or none of it */
254 if (rm->rdma.op_active && !conn->c_xmit_rdma_sent) { 254 if (rm->rdma.op_active && !conn->c_xmit_rdma_sent) {
255 rm->m_final_op = &rm->rdma;
255 ret = conn->c_trans->xmit_rdma(conn, &rm->rdma); 256 ret = conn->c_trans->xmit_rdma(conn, &rm->rdma);
256 if (ret) 257 if (ret)
257 break; 258 break;
@@ -263,10 +264,12 @@ int rds_send_xmit(struct rds_connection *conn)
263 } 264 }
264 265
265 if (rm->atomic.op_active && !conn->c_xmit_atomic_sent) { 266 if (rm->atomic.op_active && !conn->c_xmit_atomic_sent) {
266 ret = conn->c_trans->xmit_atomic(conn, rm); 267 rm->m_final_op = &rm->atomic;
268 ret = conn->c_trans->xmit_atomic(conn, &rm->atomic);
267 if (ret) 269 if (ret)
268 break; 270 break;
269 conn->c_xmit_atomic_sent = 1; 271 conn->c_xmit_atomic_sent = 1;
272
270 /* The transport owns the mapped memory for now. 273 /* The transport owns the mapped memory for now.
271 * You can't unmap it while it's on the send queue */ 274 * You can't unmap it while it's on the send queue */
272 set_bit(RDS_MSG_MAPPED, &rm->m_flags); 275 set_bit(RDS_MSG_MAPPED, &rm->m_flags);
@@ -295,6 +298,7 @@ int rds_send_xmit(struct rds_connection *conn)
295 } 298 }
296 299
297 if (rm->data.op_active && !conn->c_xmit_data_sent) { 300 if (rm->data.op_active && !conn->c_xmit_data_sent) {
301 rm->m_final_op = &rm->data;
298 ret = conn->c_trans->xmit(conn, rm, 302 ret = conn->c_trans->xmit(conn, rm,
299 conn->c_xmit_hdr_off, 303 conn->c_xmit_hdr_off,
300 conn->c_xmit_sg, 304 conn->c_xmit_sg,