aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--net/rds/ib.h7
-rw-r--r--net/rds/ib_cm.c9
-rw-r--r--net/rds/ib_send.c242
-rw-r--r--net/rds/rds.h4
-rw-r--r--net/rds/send.c6
5 files changed, 142 insertions, 126 deletions
diff --git a/net/rds/ib.h b/net/rds/ib.h
index d64b5087eefe..202140a84f0c 100644
--- a/net/rds/ib.h
+++ b/net/rds/ib.h
@@ -53,8 +53,7 @@ struct rds_ib_connect_private {
53}; 53};
54 54
55struct rds_ib_send_work { 55struct rds_ib_send_work {
56 struct rds_message *s_rm; 56 void *s_op;
57 struct rm_rdma_op *s_op;
58 struct ib_send_wr s_wr; 57 struct ib_send_wr s_wr;
59 struct ib_sge s_sge[RDS_IB_MAX_SGE]; 58 struct ib_sge s_sge[RDS_IB_MAX_SGE];
60 unsigned long s_queued; 59 unsigned long s_queued;
@@ -92,7 +91,7 @@ struct rds_ib_connection {
92 91
93 /* tx */ 92 /* tx */
94 struct rds_ib_work_ring i_send_ring; 93 struct rds_ib_work_ring i_send_ring;
95 struct rds_message *i_rm; 94 struct rm_data_op *i_data_op;
96 struct rds_header *i_send_hdrs; 95 struct rds_header *i_send_hdrs;
97 u64 i_send_hdrs_dma; 96 u64 i_send_hdrs_dma;
98 struct rds_ib_send_work *i_sends; 97 struct rds_ib_send_work *i_sends;
@@ -336,7 +335,7 @@ void rds_ib_send_add_credits(struct rds_connection *conn, unsigned int credits);
336void rds_ib_advertise_credits(struct rds_connection *conn, unsigned int posted); 335void rds_ib_advertise_credits(struct rds_connection *conn, unsigned int posted);
337int rds_ib_send_grab_credits(struct rds_ib_connection *ic, u32 wanted, 336int rds_ib_send_grab_credits(struct rds_ib_connection *ic, u32 wanted,
338 u32 *adv_credits, int need_posted, int max_posted); 337 u32 *adv_credits, int need_posted, int max_posted);
339int rds_ib_xmit_atomic(struct rds_connection *conn, struct rds_message *rm); 338int rds_ib_xmit_atomic(struct rds_connection *conn, struct rm_atomic_op *op);
340 339
341/* ib_stats.c */ 340/* ib_stats.c */
342DECLARE_PER_CPU(struct rds_ib_statistics, rds_ib_stats); 341DECLARE_PER_CPU(struct rds_ib_statistics, rds_ib_stats);
diff --git a/net/rds/ib_cm.c b/net/rds/ib_cm.c
index 8b0c743c0900..b5b5ebbc0bb6 100644
--- a/net/rds/ib_cm.c
+++ b/net/rds/ib_cm.c
@@ -673,9 +673,12 @@ void rds_ib_conn_shutdown(struct rds_connection *conn)
673 BUG_ON(ic->rds_ibdev); 673 BUG_ON(ic->rds_ibdev);
674 674
675 /* Clear pending transmit */ 675 /* Clear pending transmit */
676 if (ic->i_rm) { 676 if (ic->i_data_op) {
677 rds_message_put(ic->i_rm); 677 struct rds_message *rm;
678 ic->i_rm = NULL; 678
679 rm = container_of(ic->i_data_op, struct rds_message, data);
680 rds_message_put(rm);
681 ic->i_data_op = NULL;
679 } 682 }
680 683
681 /* Clear the ACK state */ 684 /* Clear the ACK state */
diff --git a/net/rds/ib_send.c b/net/rds/ib_send.c
index 95f15247acd7..6461a152bd5b 100644
--- a/net/rds/ib_send.c
+++ b/net/rds/ib_send.c
@@ -67,80 +67,122 @@ static void rds_ib_send_complete(struct rds_message *rm,
67 complete(rm, notify_status); 67 complete(rm, notify_status);
68} 68}
69 69
70static void rds_ib_send_unmap_rm(struct rds_ib_connection *ic, 70static void rds_ib_send_unmap_data(struct rds_ib_connection *ic,
71 struct rds_ib_send_work *send, 71 struct rm_data_op *op,
72 int wc_status) 72 int wc_status)
73{ 73{
74 struct rds_message *rm = send->s_rm; 74 if (op->op_nents)
75 75 ib_dma_unmap_sg(ic->i_cm_id->device,
76 rdsdebug("ic %p send %p rm %p\n", ic, send, rm); 76 op->op_sg, op->op_nents,
77 77 DMA_TO_DEVICE);
78 ib_dma_unmap_sg(ic->i_cm_id->device, 78}
79 rm->data.op_sg, rm->data.op_nents,
80 DMA_TO_DEVICE);
81 79
82 if (rm->rdma.op_active) { 80static void rds_ib_send_unmap_rdma(struct rds_ib_connection *ic,
83 struct rm_rdma_op *op = &rm->rdma; 81 struct rm_rdma_op *op,
82 int wc_status)
83{
84 if (op->op_mapped) {
85 ib_dma_unmap_sg(ic->i_cm_id->device,
86 op->op_sg, op->op_nents,
87 op->op_write ? DMA_TO_DEVICE : DMA_FROM_DEVICE);
88 op->op_mapped = 0;
89 }
84 90
85 if (op->op_mapped) { 91 /* If the user asked for a completion notification on this
86 ib_dma_unmap_sg(ic->i_cm_id->device, 92 * message, we can implement three different semantics:
87 op->op_sg, op->op_nents, 93 * 1. Notify when we received the ACK on the RDS message
88 op->op_write ? DMA_TO_DEVICE : DMA_FROM_DEVICE); 94 * that was queued with the RDMA. This provides reliable
89 op->op_mapped = 0; 95 * notification of RDMA status at the expense of a one-way
90 } 96 * packet delay.
97 * 2. Notify when the IB stack gives us the completion event for
98 * the RDMA operation.
99 * 3. Notify when the IB stack gives us the completion event for
100 * the accompanying RDS messages.
101 * Here, we implement approach #3. To implement approach #2,
102 * we would need to take an event for the rdma WR. To implement #1,
103 * don't call rds_rdma_send_complete at all, and fall back to the notify
104 * handling in the ACK processing code.
105 *
106 * Note: There's no need to explicitly sync any RDMA buffers using
107 * ib_dma_sync_sg_for_cpu - the completion for the RDMA
108 * operation itself unmapped the RDMA buffers, which takes care
109 * of synching.
110 */
111 rds_ib_send_complete(container_of(op, struct rds_message, rdma),
112 wc_status, rds_rdma_send_complete);
91 113
92 /* If the user asked for a completion notification on this 114 if (op->op_write)
93 * message, we can implement three different semantics: 115 rds_stats_add(s_send_rdma_bytes, op->op_bytes);
94 * 1. Notify when we received the ACK on the RDS message 116 else
95 * that was queued with the RDMA. This provides reliable 117 rds_stats_add(s_recv_rdma_bytes, op->op_bytes);
96 * notification of RDMA status at the expense of a one-way 118}
97 * packet delay.
98 * 2. Notify when the IB stack gives us the completion event for
99 * the RDMA operation.
100 * 3. Notify when the IB stack gives us the completion event for
101 * the accompanying RDS messages.
102 * Here, we implement approach #3. To implement approach #2,
103 * call rds_rdma_send_complete from the cq_handler. To implement #1,
104 * don't call rds_rdma_send_complete at all, and fall back to the notify
105 * handling in the ACK processing code.
106 *
107 * Note: There's no need to explicitly sync any RDMA buffers using
108 * ib_dma_sync_sg_for_cpu - the completion for the RDMA
109 * operation itself unmapped the RDMA buffers, which takes care
110 * of synching.
111 */
112 rds_ib_send_complete(rm, wc_status, rds_rdma_send_complete);
113 119
114 if (rm->rdma.op_write) 120static void rds_ib_send_unmap_atomic(struct rds_ib_connection *ic,
115 rds_stats_add(s_send_rdma_bytes, rm->rdma.op_bytes); 121 struct rm_atomic_op *op,
116 else 122 int wc_status)
117 rds_stats_add(s_recv_rdma_bytes, rm->rdma.op_bytes); 123{
124 /* unmap atomic recvbuf */
125 if (op->op_mapped) {
126 ib_dma_unmap_sg(ic->i_cm_id->device, op->op_sg, 1,
127 DMA_FROM_DEVICE);
128 op->op_mapped = 0;
118 } 129 }
119 130
120 if (rm->atomic.op_active) { 131 rds_ib_send_complete(container_of(op, struct rds_message, atomic),
121 struct rm_atomic_op *op = &rm->atomic; 132 wc_status, rds_atomic_send_complete);
122
123 /* unmap atomic recvbuf */
124 if (op->op_mapped) {
125 ib_dma_unmap_sg(ic->i_cm_id->device, op->op_sg, 1,
126 DMA_FROM_DEVICE);
127 op->op_mapped = 0;
128 }
129 133
130 rds_ib_send_complete(rm, wc_status, rds_atomic_send_complete); 134 if (op->op_type == RDS_ATOMIC_TYPE_CSWP)
135 rds_stats_inc(s_atomic_cswp);
136 else
137 rds_stats_inc(s_atomic_fadd);
138}
131 139
132 if (rm->atomic.op_type == RDS_ATOMIC_TYPE_CSWP) 140/*
133 rds_stats_inc(s_atomic_cswp); 141 * Unmap the resources associated with a struct send_work.
134 else 142 *
135 rds_stats_inc(s_atomic_fadd); 143 * Returns the rm for no good reason other than it is unobtainable
144 * other than by switching on wr.opcode, currently, and the caller,
145 * the event handler, needs it.
146 */
147static struct rds_message *rds_ib_send_unmap_op(struct rds_ib_connection *ic,
148 struct rds_ib_send_work *send,
149 int wc_status)
150{
151 struct rds_message *rm = NULL;
152
153 /* In the error case, wc.opcode sometimes contains garbage */
154 switch (send->s_wr.opcode) {
155 case IB_WR_SEND:
156 if (send->s_op) {
157 rm = container_of(send->s_op, struct rds_message, data);
158 rds_ib_send_unmap_data(ic, send->s_op, wc_status);
159 }
160 break;
161 case IB_WR_RDMA_WRITE:
162 case IB_WR_RDMA_READ:
163 if (send->s_op) {
164 rm = container_of(send->s_op, struct rds_message, rdma);
165 rds_ib_send_unmap_rdma(ic, send->s_op, wc_status);
166 }
167 break;
168 case IB_WR_ATOMIC_FETCH_AND_ADD:
169 case IB_WR_ATOMIC_CMP_AND_SWP:
170 if (send->s_op) {
171 rm = container_of(send->s_op, struct rds_message, atomic);
172 rds_ib_send_unmap_atomic(ic, send->s_op, wc_status);
173 }
174 break;
175 default:
176 if (printk_ratelimit())
177 printk(KERN_NOTICE
178 "RDS/IB: %s: unexpected opcode 0x%x in WR!\n",
179 __func__, send->s_wr.opcode);
180 break;
136 } 181 }
137 182
138 /* If anyone waited for this message to get flushed out, wake 183 send->s_wr.opcode = 0xdead;
139 * them up now */
140 rds_message_unmapped(rm);
141 184
142 rds_message_put(rm); 185 return rm;
143 send->s_rm = NULL;
144} 186}
145 187
146void rds_ib_send_init_ring(struct rds_ib_connection *ic) 188void rds_ib_send_init_ring(struct rds_ib_connection *ic)
@@ -151,7 +193,6 @@ void rds_ib_send_init_ring(struct rds_ib_connection *ic)
151 for (i = 0, send = ic->i_sends; i < ic->i_send_ring.w_nr; i++, send++) { 193 for (i = 0, send = ic->i_sends; i < ic->i_send_ring.w_nr; i++, send++) {
152 struct ib_sge *sge; 194 struct ib_sge *sge;
153 195
154 send->s_rm = NULL;
155 send->s_op = NULL; 196 send->s_op = NULL;
156 197
157 send->s_wr.wr_id = i; 198 send->s_wr.wr_id = i;
@@ -173,9 +214,8 @@ void rds_ib_send_clear_ring(struct rds_ib_connection *ic)
173 u32 i; 214 u32 i;
174 215
175 for (i = 0, send = ic->i_sends; i < ic->i_send_ring.w_nr; i++, send++) { 216 for (i = 0, send = ic->i_sends; i < ic->i_send_ring.w_nr; i++, send++) {
176 if (!send->s_rm || send->s_wr.opcode == 0xdead) 217 if (send->s_op && send->s_wr.opcode != 0xdead)
177 continue; 218 rds_ib_send_unmap_op(ic, send, IB_WC_WR_FLUSH_ERR);
178 rds_ib_send_unmap_rm(ic, send, IB_WC_WR_FLUSH_ERR);
179 } 219 }
180} 220}
181 221
@@ -189,6 +229,7 @@ void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context)
189{ 229{
190 struct rds_connection *conn = context; 230 struct rds_connection *conn = context;
191 struct rds_ib_connection *ic = conn->c_transport_data; 231 struct rds_ib_connection *ic = conn->c_transport_data;
232 struct rds_message *rm = NULL;
192 struct ib_wc wc; 233 struct ib_wc wc;
193 struct rds_ib_send_work *send; 234 struct rds_ib_send_work *send;
194 u32 completed; 235 u32 completed;
@@ -222,42 +263,18 @@ void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context)
222 for (i = 0; i < completed; i++) { 263 for (i = 0; i < completed; i++) {
223 send = &ic->i_sends[oldest]; 264 send = &ic->i_sends[oldest];
224 265
225 /* In the error case, wc.opcode sometimes contains garbage */ 266 rm = rds_ib_send_unmap_op(ic, send, wc.status);
226 switch (send->s_wr.opcode) {
227 case IB_WR_SEND:
228 case IB_WR_RDMA_WRITE:
229 case IB_WR_RDMA_READ:
230 case IB_WR_ATOMIC_FETCH_AND_ADD:
231 case IB_WR_ATOMIC_CMP_AND_SWP:
232 if (send->s_rm)
233 rds_ib_send_unmap_rm(ic, send, wc.status);
234 break;
235 default:
236 if (printk_ratelimit())
237 printk(KERN_NOTICE
238 "RDS/IB: %s: unexpected opcode 0x%x in WR!\n",
239 __func__, send->s_wr.opcode);
240 break;
241 }
242 267
243 send->s_wr.opcode = 0xdead;
244 send->s_wr.num_sge = 1;
245 if (send->s_queued + HZ/2 < jiffies) 268 if (send->s_queued + HZ/2 < jiffies)
246 rds_ib_stats_inc(s_ib_tx_stalled); 269 rds_ib_stats_inc(s_ib_tx_stalled);
247 270
248 /* If a RDMA operation produced an error, signal this right 271 if (&send->s_op == &rm->m_final_op) {
249 * away. If we don't, the subsequent SEND that goes with this 272 /* If anyone waited for this message to get flushed out, wake
250 * RDMA will be canceled with ERR_WFLUSH, and the application 273 * them up now */
251 * never learn that the RDMA failed. */ 274 rds_message_unmapped(rm);
252 if (unlikely(wc.status == IB_WC_REM_ACCESS_ERR && send->s_op)) { 275
253 struct rds_message *rm; 276 rds_message_put(rm);
254 277 send->s_op = NULL;
255 rm = rds_send_get_message(conn, send->s_op);
256 if (rm) {
257 rds_ib_send_unmap_rm(ic, send, wc.status);
258 rds_ib_send_complete(rm, wc.status, rds_rdma_send_complete);
259 rds_message_put(rm);
260 }
261 } 278 }
262 279
263 oldest = (oldest + 1) % ic->i_send_ring.w_nr; 280 oldest = (oldest + 1) % ic->i_send_ring.w_nr;
@@ -512,7 +529,7 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
512 } 529 }
513 530
514 /* map the message the first time we see it */ 531 /* map the message the first time we see it */
515 if (!ic->i_rm) { 532 if (!ic->i_data_op) {
516 if (rm->data.op_nents) { 533 if (rm->data.op_nents) {
517 rm->data.op_count = ib_dma_map_sg(dev, 534 rm->data.op_count = ib_dma_map_sg(dev,
518 rm->data.op_sg, 535 rm->data.op_sg,
@@ -530,7 +547,7 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
530 } 547 }
531 548
532 rds_message_addref(rm); 549 rds_message_addref(rm);
533 ic->i_rm = rm; 550 ic->i_data_op = &rm->data;
534 551
535 /* Finalize the header */ 552 /* Finalize the header */
536 if (test_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags)) 553 if (test_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags))
@@ -583,7 +600,7 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
583 send = &ic->i_sends[pos]; 600 send = &ic->i_sends[pos];
584 first = send; 601 first = send;
585 prev = NULL; 602 prev = NULL;
586 scat = &rm->data.op_sg[sg]; 603 scat = &ic->i_data_op->op_sg[sg];
587 i = 0; 604 i = 0;
588 do { 605 do {
589 unsigned int len = 0; 606 unsigned int len = 0;
@@ -658,9 +675,9 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
658 675
659 /* if we finished the message then send completion owns it */ 676 /* if we finished the message then send completion owns it */
660 if (scat == &rm->data.op_sg[rm->data.op_count]) { 677 if (scat == &rm->data.op_sg[rm->data.op_count]) {
661 prev->s_rm = ic->i_rm; 678 prev->s_op = ic->i_data_op;
662 prev->s_wr.send_flags |= IB_SEND_SOLICITED; 679 prev->s_wr.send_flags |= IB_SEND_SOLICITED;
663 ic->i_rm = NULL; 680 ic->i_data_op = NULL;
664 } 681 }
665 682
666 /* Put back wrs & credits we didn't use */ 683 /* Put back wrs & credits we didn't use */
@@ -681,9 +698,9 @@ int rds_ib_xmit(struct rds_connection *conn, struct rds_message *rm,
681 printk(KERN_WARNING "RDS/IB: ib_post_send to %pI4 " 698 printk(KERN_WARNING "RDS/IB: ib_post_send to %pI4 "
682 "returned %d\n", &conn->c_faddr, ret); 699 "returned %d\n", &conn->c_faddr, ret);
683 rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc); 700 rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
684 if (prev->s_rm) { 701 if (prev->s_op) {
685 ic->i_rm = prev->s_rm; 702 ic->i_data_op = prev->s_op;
686 prev->s_rm = NULL; 703 prev->s_op = NULL;
687 } 704 }
688 705
689 rds_ib_conn_error(ic->conn, "ib_post_send failed\n"); 706 rds_ib_conn_error(ic->conn, "ib_post_send failed\n");
@@ -701,10 +718,9 @@ out:
701 * A simplified version of the rdma case, we always map 1 SG, and 718 * A simplified version of the rdma case, we always map 1 SG, and
702 * only 8 bytes, for the return value from the atomic operation. 719 * only 8 bytes, for the return value from the atomic operation.
703 */ 720 */
704int rds_ib_xmit_atomic(struct rds_connection *conn, struct rds_message *rm) 721int rds_ib_xmit_atomic(struct rds_connection *conn, struct rm_atomic_op *op)
705{ 722{
706 struct rds_ib_connection *ic = conn->c_transport_data; 723 struct rds_ib_connection *ic = conn->c_transport_data;
707 struct rm_atomic_op *op = &rm->atomic;
708 struct rds_ib_send_work *send = NULL; 724 struct rds_ib_send_work *send = NULL;
709 struct ib_send_wr *failed_wr; 725 struct ib_send_wr *failed_wr;
710 struct rds_ib_device *rds_ibdev; 726 struct rds_ib_device *rds_ibdev;
@@ -741,14 +757,6 @@ int rds_ib_xmit_atomic(struct rds_connection *conn, struct rds_message *rm)
741 send->s_wr.wr.atomic.remote_addr = op->op_remote_addr; 757 send->s_wr.wr.atomic.remote_addr = op->op_remote_addr;
742 send->s_wr.wr.atomic.rkey = op->op_rkey; 758 send->s_wr.wr.atomic.rkey = op->op_rkey;
743 759
744 /*
745 * If there is no data or rdma ops in the message, then
746 * we must fill in s_rm ourselves, so we properly clean up
747 * on completion.
748 */
749 if (!rm->rdma.op_active && !rm->data.op_active)
750 send->s_rm = rm;
751
752 /* map 8 byte retval buffer to the device */ 760 /* map 8 byte retval buffer to the device */
753 ret = ib_dma_map_sg(ic->i_cm_id->device, op->op_sg, 1, DMA_FROM_DEVICE); 761 ret = ib_dma_map_sg(ic->i_cm_id->device, op->op_sg, 1, DMA_FROM_DEVICE);
754 rdsdebug("ic %p mapping atomic op %p. mapped %d pg\n", ic, op, ret); 762 rdsdebug("ic %p mapping atomic op %p. mapped %d pg\n", ic, op, ret);
@@ -809,7 +817,7 @@ int rds_ib_xmit_rdma(struct rds_connection *conn, struct rm_rdma_op *op)
809 817
810 rds_ibdev = ib_get_client_data(ic->i_cm_id->device, &rds_ib_client); 818 rds_ibdev = ib_get_client_data(ic->i_cm_id->device, &rds_ib_client);
811 819
812 /* map the message the first time we see it */ 820 /* map the op the first time we see it */
813 if (!op->op_mapped) { 821 if (!op->op_mapped) {
814 op->op_count = ib_dma_map_sg(ic->i_cm_id->device, 822 op->op_count = ib_dma_map_sg(ic->i_cm_id->device,
815 op->op_sg, op->op_nents, (op->op_write) ? 823 op->op_sg, op->op_nents, (op->op_write) ?
diff --git a/net/rds/rds.h b/net/rds/rds.h
index 23b921000e74..7291f006f364 100644
--- a/net/rds/rds.h
+++ b/net/rds/rds.h
@@ -308,6 +308,8 @@ struct rds_message {
308 unsigned int m_used_sgs; 308 unsigned int m_used_sgs;
309 unsigned int m_total_sgs; 309 unsigned int m_total_sgs;
310 310
311 void *m_final_op;
312
311 struct { 313 struct {
312 struct rm_atomic_op { 314 struct rm_atomic_op {
313 int op_type; 315 int op_type;
@@ -421,7 +423,7 @@ struct rds_transport {
421 int (*xmit_cong_map)(struct rds_connection *conn, 423 int (*xmit_cong_map)(struct rds_connection *conn,
422 struct rds_cong_map *map, unsigned long offset); 424 struct rds_cong_map *map, unsigned long offset);
423 int (*xmit_rdma)(struct rds_connection *conn, struct rm_rdma_op *op); 425 int (*xmit_rdma)(struct rds_connection *conn, struct rm_rdma_op *op);
424 int (*xmit_atomic)(struct rds_connection *conn, struct rds_message *rm); 426 int (*xmit_atomic)(struct rds_connection *conn, struct rm_atomic_op *op);
425 int (*recv)(struct rds_connection *conn); 427 int (*recv)(struct rds_connection *conn);
426 int (*inc_copy_to_user)(struct rds_incoming *inc, struct iovec *iov, 428 int (*inc_copy_to_user)(struct rds_incoming *inc, struct iovec *iov,
427 size_t size); 429 size_t size);
diff --git a/net/rds/send.c b/net/rds/send.c
index 69ab1040d02d..d1f364e44e36 100644
--- a/net/rds/send.c
+++ b/net/rds/send.c
@@ -252,6 +252,7 @@ int rds_send_xmit(struct rds_connection *conn)
252 252
253 /* The transport either sends the whole rdma or none of it */ 253 /* The transport either sends the whole rdma or none of it */
254 if (rm->rdma.op_active && !conn->c_xmit_rdma_sent) { 254 if (rm->rdma.op_active && !conn->c_xmit_rdma_sent) {
255 rm->m_final_op = &rm->rdma;
255 ret = conn->c_trans->xmit_rdma(conn, &rm->rdma); 256 ret = conn->c_trans->xmit_rdma(conn, &rm->rdma);
256 if (ret) 257 if (ret)
257 break; 258 break;
@@ -263,10 +264,12 @@ int rds_send_xmit(struct rds_connection *conn)
263 } 264 }
264 265
265 if (rm->atomic.op_active && !conn->c_xmit_atomic_sent) { 266 if (rm->atomic.op_active && !conn->c_xmit_atomic_sent) {
266 ret = conn->c_trans->xmit_atomic(conn, rm); 267 rm->m_final_op = &rm->atomic;
268 ret = conn->c_trans->xmit_atomic(conn, &rm->atomic);
267 if (ret) 269 if (ret)
268 break; 270 break;
269 conn->c_xmit_atomic_sent = 1; 271 conn->c_xmit_atomic_sent = 1;
272
270 /* The transport owns the mapped memory for now. 273 /* The transport owns the mapped memory for now.
271 * You can't unmap it while it's on the send queue */ 274 * You can't unmap it while it's on the send queue */
272 set_bit(RDS_MSG_MAPPED, &rm->m_flags); 275 set_bit(RDS_MSG_MAPPED, &rm->m_flags);
@@ -295,6 +298,7 @@ int rds_send_xmit(struct rds_connection *conn)
295 } 298 }
296 299
297 if (rm->data.op_active && !conn->c_xmit_data_sent) { 300 if (rm->data.op_active && !conn->c_xmit_data_sent) {
301 rm->m_final_op = &rm->data;
298 ret = conn->c_trans->xmit(conn, rm, 302 ret = conn->c_trans->xmit(conn, rm,
299 conn->c_xmit_hdr_off, 303 conn->c_xmit_hdr_off,
300 conn->c_xmit_sg, 304 conn->c_xmit_sg,