aboutsummaryrefslogtreecommitdiffstats
path: root/net/rds/send.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/rds/send.c')
-rw-r--r--net/rds/send.c544
1 files changed, 331 insertions, 213 deletions
diff --git a/net/rds/send.c b/net/rds/send.c
index 9c1c6bcaa6c9..9b951a0ab6b7 100644
--- a/net/rds/send.c
+++ b/net/rds/send.c
@@ -37,7 +37,6 @@
37#include <linux/list.h> 37#include <linux/list.h>
38 38
39#include "rds.h" 39#include "rds.h"
40#include "rdma.h"
41 40
42/* When transmitting messages in rds_send_xmit, we need to emerge from 41/* When transmitting messages in rds_send_xmit, we need to emerge from
43 * time to time and briefly release the CPU. Otherwise the softlock watchdog 42 * time to time and briefly release the CPU. Otherwise the softlock watchdog
@@ -54,7 +53,8 @@ module_param(send_batch_count, int, 0444);
54MODULE_PARM_DESC(send_batch_count, " batch factor when working the send queue"); 53MODULE_PARM_DESC(send_batch_count, " batch factor when working the send queue");
55 54
56/* 55/*
57 * Reset the send state. Caller must hold c_send_lock when calling here. 56 * Reset the send state. Callers must ensure that this doesn't race with
57 * rds_send_xmit().
58 */ 58 */
59void rds_send_reset(struct rds_connection *conn) 59void rds_send_reset(struct rds_connection *conn)
60{ 60{
@@ -62,18 +62,22 @@ void rds_send_reset(struct rds_connection *conn)
62 unsigned long flags; 62 unsigned long flags;
63 63
64 if (conn->c_xmit_rm) { 64 if (conn->c_xmit_rm) {
65 rm = conn->c_xmit_rm;
66 conn->c_xmit_rm = NULL;
65 /* Tell the user the RDMA op is no longer mapped by the 67 /* Tell the user the RDMA op is no longer mapped by the
66 * transport. This isn't entirely true (it's flushed out 68 * transport. This isn't entirely true (it's flushed out
67 * independently) but as the connection is down, there's 69 * independently) but as the connection is down, there's
68 * no ongoing RDMA to/from that memory */ 70 * no ongoing RDMA to/from that memory */
69 rds_message_unmapped(conn->c_xmit_rm); 71 rds_message_unmapped(rm);
70 rds_message_put(conn->c_xmit_rm); 72 rds_message_put(rm);
71 conn->c_xmit_rm = NULL;
72 } 73 }
74
73 conn->c_xmit_sg = 0; 75 conn->c_xmit_sg = 0;
74 conn->c_xmit_hdr_off = 0; 76 conn->c_xmit_hdr_off = 0;
75 conn->c_xmit_data_off = 0; 77 conn->c_xmit_data_off = 0;
78 conn->c_xmit_atomic_sent = 0;
76 conn->c_xmit_rdma_sent = 0; 79 conn->c_xmit_rdma_sent = 0;
80 conn->c_xmit_data_sent = 0;
77 81
78 conn->c_map_queued = 0; 82 conn->c_map_queued = 0;
79 83
@@ -90,6 +94,25 @@ void rds_send_reset(struct rds_connection *conn)
90 spin_unlock_irqrestore(&conn->c_lock, flags); 94 spin_unlock_irqrestore(&conn->c_lock, flags);
91} 95}
92 96
97static int acquire_in_xmit(struct rds_connection *conn)
98{
99 return test_and_set_bit(RDS_IN_XMIT, &conn->c_flags) == 0;
100}
101
102static void release_in_xmit(struct rds_connection *conn)
103{
104 clear_bit(RDS_IN_XMIT, &conn->c_flags);
105 smp_mb__after_clear_bit();
106 /*
107 * We don't use wait_on_bit()/wake_up_bit() because our waking is in a
108 * hot path and finding waiters is very rare. We don't want to walk
109 * the system-wide hashed waitqueue buckets in the fast path only to
110 * almost never find waiters.
111 */
112 if (waitqueue_active(&conn->c_waitq))
113 wake_up_all(&conn->c_waitq);
114}
115
93/* 116/*
94 * We're making the concious trade-off here to only send one message 117 * We're making the concious trade-off here to only send one message
95 * down the connection at a time. 118 * down the connection at a time.
@@ -109,102 +132,69 @@ int rds_send_xmit(struct rds_connection *conn)
109 struct rds_message *rm; 132 struct rds_message *rm;
110 unsigned long flags; 133 unsigned long flags;
111 unsigned int tmp; 134 unsigned int tmp;
112 unsigned int send_quota = send_batch_count;
113 struct scatterlist *sg; 135 struct scatterlist *sg;
114 int ret = 0; 136 int ret = 0;
115 int was_empty = 0;
116 LIST_HEAD(to_be_dropped); 137 LIST_HEAD(to_be_dropped);
117 138
139restart:
140
118 /* 141 /*
119 * sendmsg calls here after having queued its message on the send 142 * sendmsg calls here after having queued its message on the send
120 * queue. We only have one task feeding the connection at a time. If 143 * queue. We only have one task feeding the connection at a time. If
121 * another thread is already feeding the queue then we back off. This 144 * another thread is already feeding the queue then we back off. This
122 * avoids blocking the caller and trading per-connection data between 145 * avoids blocking the caller and trading per-connection data between
123 * caches per message. 146 * caches per message.
124 *
125 * The sem holder will issue a retry if they notice that someone queued
126 * a message after they stopped walking the send queue but before they
127 * dropped the sem.
128 */ 147 */
129 if (!mutex_trylock(&conn->c_send_lock)) { 148 if (!acquire_in_xmit(conn)) {
130 rds_stats_inc(s_send_sem_contention); 149 rds_stats_inc(s_send_lock_contention);
131 ret = -ENOMEM; 150 ret = -ENOMEM;
132 goto out; 151 goto out;
133 } 152 }
134 153
154 /*
155 * rds_conn_shutdown() sets the conn state and then tests RDS_IN_XMIT,
156 * we do the opposite to avoid races.
157 */
158 if (!rds_conn_up(conn)) {
159 release_in_xmit(conn);
160 ret = 0;
161 goto out;
162 }
163
135 if (conn->c_trans->xmit_prepare) 164 if (conn->c_trans->xmit_prepare)
136 conn->c_trans->xmit_prepare(conn); 165 conn->c_trans->xmit_prepare(conn);
137 166
138 /* 167 /*
139 * spin trying to push headers and data down the connection until 168 * spin trying to push headers and data down the connection until
140 * the connection doens't make forward progress. 169 * the connection doesn't make forward progress.
141 */ 170 */
142 while (--send_quota) { 171 while (1) {
143 /*
144 * See if need to send a congestion map update if we're
145 * between sending messages. The send_sem protects our sole
146 * use of c_map_offset and _bytes.
147 * Note this is used only by transports that define a special
148 * xmit_cong_map function. For all others, we create allocate
149 * a cong_map message and treat it just like any other send.
150 */
151 if (conn->c_map_bytes) {
152 ret = conn->c_trans->xmit_cong_map(conn, conn->c_lcong,
153 conn->c_map_offset);
154 if (ret <= 0)
155 break;
156 172
157 conn->c_map_offset += ret;
158 conn->c_map_bytes -= ret;
159 if (conn->c_map_bytes)
160 continue;
161 }
162
163 /* If we're done sending the current message, clear the
164 * offset and S/G temporaries.
165 */
166 rm = conn->c_xmit_rm; 173 rm = conn->c_xmit_rm;
167 if (rm != NULL &&
168 conn->c_xmit_hdr_off == sizeof(struct rds_header) &&
169 conn->c_xmit_sg == rm->m_nents) {
170 conn->c_xmit_rm = NULL;
171 conn->c_xmit_sg = 0;
172 conn->c_xmit_hdr_off = 0;
173 conn->c_xmit_data_off = 0;
174 conn->c_xmit_rdma_sent = 0;
175 174
176 /* Release the reference to the previous message. */ 175 /*
177 rds_message_put(rm); 176 * If between sending messages, we can send a pending congestion
178 rm = NULL; 177 * map update.
179 }
180
181 /* If we're asked to send a cong map update, do so.
182 */ 178 */
183 if (rm == NULL && test_and_clear_bit(0, &conn->c_map_queued)) { 179 if (!rm && test_and_clear_bit(0, &conn->c_map_queued)) {
184 if (conn->c_trans->xmit_cong_map != NULL) {
185 conn->c_map_offset = 0;
186 conn->c_map_bytes = sizeof(struct rds_header) +
187 RDS_CONG_MAP_BYTES;
188 continue;
189 }
190
191 rm = rds_cong_update_alloc(conn); 180 rm = rds_cong_update_alloc(conn);
192 if (IS_ERR(rm)) { 181 if (IS_ERR(rm)) {
193 ret = PTR_ERR(rm); 182 ret = PTR_ERR(rm);
194 break; 183 break;
195 } 184 }
185 rm->data.op_active = 1;
196 186
197 conn->c_xmit_rm = rm; 187 conn->c_xmit_rm = rm;
198 } 188 }
199 189
200 /* 190 /*
201 * Grab the next message from the send queue, if there is one. 191 * If not already working on one, grab the next message.
202 * 192 *
203 * c_xmit_rm holds a ref while we're sending this message down 193 * c_xmit_rm holds a ref while we're sending this message down
204 * the connction. We can use this ref while holding the 194 * the connction. We can use this ref while holding the
205 * send_sem.. rds_send_reset() is serialized with it. 195 * send_sem.. rds_send_reset() is serialized with it.
206 */ 196 */
207 if (rm == NULL) { 197 if (!rm) {
208 unsigned int len; 198 unsigned int len;
209 199
210 spin_lock_irqsave(&conn->c_lock, flags); 200 spin_lock_irqsave(&conn->c_lock, flags);
@@ -224,10 +214,8 @@ int rds_send_xmit(struct rds_connection *conn)
224 214
225 spin_unlock_irqrestore(&conn->c_lock, flags); 215 spin_unlock_irqrestore(&conn->c_lock, flags);
226 216
227 if (rm == NULL) { 217 if (!rm)
228 was_empty = 1;
229 break; 218 break;
230 }
231 219
232 /* Unfortunately, the way Infiniband deals with 220 /* Unfortunately, the way Infiniband deals with
233 * RDMA to a bad MR key is by moving the entire 221 * RDMA to a bad MR key is by moving the entire
@@ -236,13 +224,12 @@ int rds_send_xmit(struct rds_connection *conn)
236 * connection. 224 * connection.
237 * Therefore, we never retransmit messages with RDMA ops. 225 * Therefore, we never retransmit messages with RDMA ops.
238 */ 226 */
239 if (rm->m_rdma_op && 227 if (rm->rdma.op_active &&
240 test_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags)) { 228 test_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags)) {
241 spin_lock_irqsave(&conn->c_lock, flags); 229 spin_lock_irqsave(&conn->c_lock, flags);
242 if (test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags)) 230 if (test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags))
243 list_move(&rm->m_conn_item, &to_be_dropped); 231 list_move(&rm->m_conn_item, &to_be_dropped);
244 spin_unlock_irqrestore(&conn->c_lock, flags); 232 spin_unlock_irqrestore(&conn->c_lock, flags);
245 rds_message_put(rm);
246 continue; 233 continue;
247 } 234 }
248 235
@@ -263,23 +250,55 @@ int rds_send_xmit(struct rds_connection *conn)
263 conn->c_xmit_rm = rm; 250 conn->c_xmit_rm = rm;
264 } 251 }
265 252
266 /* 253 /* The transport either sends the whole rdma or none of it */
267 * Try and send an rdma message. Let's see if we can 254 if (rm->rdma.op_active && !conn->c_xmit_rdma_sent) {
268 * keep this simple and require that the transport either 255 rm->m_final_op = &rm->rdma;
269 * send the whole rdma or none of it. 256 ret = conn->c_trans->xmit_rdma(conn, &rm->rdma);
270 */
271 if (rm->m_rdma_op && !conn->c_xmit_rdma_sent) {
272 ret = conn->c_trans->xmit_rdma(conn, rm->m_rdma_op);
273 if (ret) 257 if (ret)
274 break; 258 break;
275 conn->c_xmit_rdma_sent = 1; 259 conn->c_xmit_rdma_sent = 1;
260
276 /* The transport owns the mapped memory for now. 261 /* The transport owns the mapped memory for now.
277 * You can't unmap it while it's on the send queue */ 262 * You can't unmap it while it's on the send queue */
278 set_bit(RDS_MSG_MAPPED, &rm->m_flags); 263 set_bit(RDS_MSG_MAPPED, &rm->m_flags);
279 } 264 }
280 265
281 if (conn->c_xmit_hdr_off < sizeof(struct rds_header) || 266 if (rm->atomic.op_active && !conn->c_xmit_atomic_sent) {
282 conn->c_xmit_sg < rm->m_nents) { 267 rm->m_final_op = &rm->atomic;
268 ret = conn->c_trans->xmit_atomic(conn, &rm->atomic);
269 if (ret)
270 break;
271 conn->c_xmit_atomic_sent = 1;
272
273 /* The transport owns the mapped memory for now.
274 * You can't unmap it while it's on the send queue */
275 set_bit(RDS_MSG_MAPPED, &rm->m_flags);
276 }
277
278 /*
279 * A number of cases require an RDS header to be sent
280 * even if there is no data.
281 * We permit 0-byte sends; rds-ping depends on this.
282 * However, if there are exclusively attached silent ops,
283 * we skip the hdr/data send, to enable silent operation.
284 */
285 if (rm->data.op_nents == 0) {
286 int ops_present;
287 int all_ops_are_silent = 1;
288
289 ops_present = (rm->atomic.op_active || rm->rdma.op_active);
290 if (rm->atomic.op_active && !rm->atomic.op_silent)
291 all_ops_are_silent = 0;
292 if (rm->rdma.op_active && !rm->rdma.op_silent)
293 all_ops_are_silent = 0;
294
295 if (ops_present && all_ops_are_silent
296 && !rm->m_rdma_cookie)
297 rm->data.op_active = 0;
298 }
299
300 if (rm->data.op_active && !conn->c_xmit_data_sent) {
301 rm->m_final_op = &rm->data;
283 ret = conn->c_trans->xmit(conn, rm, 302 ret = conn->c_trans->xmit(conn, rm,
284 conn->c_xmit_hdr_off, 303 conn->c_xmit_hdr_off,
285 conn->c_xmit_sg, 304 conn->c_xmit_sg,
@@ -295,7 +314,7 @@ int rds_send_xmit(struct rds_connection *conn)
295 ret -= tmp; 314 ret -= tmp;
296 } 315 }
297 316
298 sg = &rm->m_sg[conn->c_xmit_sg]; 317 sg = &rm->data.op_sg[conn->c_xmit_sg];
299 while (ret) { 318 while (ret) {
300 tmp = min_t(int, ret, sg->length - 319 tmp = min_t(int, ret, sg->length -
301 conn->c_xmit_data_off); 320 conn->c_xmit_data_off);
@@ -306,49 +325,63 @@ int rds_send_xmit(struct rds_connection *conn)
306 sg++; 325 sg++;
307 conn->c_xmit_sg++; 326 conn->c_xmit_sg++;
308 BUG_ON(ret != 0 && 327 BUG_ON(ret != 0 &&
309 conn->c_xmit_sg == rm->m_nents); 328 conn->c_xmit_sg == rm->data.op_nents);
310 } 329 }
311 } 330 }
331
332 if (conn->c_xmit_hdr_off == sizeof(struct rds_header) &&
333 (conn->c_xmit_sg == rm->data.op_nents))
334 conn->c_xmit_data_sent = 1;
312 } 335 }
313 }
314 336
315 /* Nuke any messages we decided not to retransmit. */ 337 /*
316 if (!list_empty(&to_be_dropped)) 338 * A rm will only take multiple times through this loop
317 rds_send_remove_from_sock(&to_be_dropped, RDS_RDMA_DROPPED); 339 * if there is a data op. Thus, if the data is sent (or there was
340 * none), then we're done with the rm.
341 */
342 if (!rm->data.op_active || conn->c_xmit_data_sent) {
343 conn->c_xmit_rm = NULL;
344 conn->c_xmit_sg = 0;
345 conn->c_xmit_hdr_off = 0;
346 conn->c_xmit_data_off = 0;
347 conn->c_xmit_rdma_sent = 0;
348 conn->c_xmit_atomic_sent = 0;
349 conn->c_xmit_data_sent = 0;
350
351 rds_message_put(rm);
352 }
353 }
318 354
319 if (conn->c_trans->xmit_complete) 355 if (conn->c_trans->xmit_complete)
320 conn->c_trans->xmit_complete(conn); 356 conn->c_trans->xmit_complete(conn);
321 357
322 /* 358 release_in_xmit(conn);
323 * We might be racing with another sender who queued a message but
324 * backed off on noticing that we held the c_send_lock. If we check
325 * for queued messages after dropping the sem then either we'll
326 * see the queued message or the queuer will get the sem. If we
327 * notice the queued message then we trigger an immediate retry.
328 *
329 * We need to be careful only to do this when we stopped processing
330 * the send queue because it was empty. It's the only way we
331 * stop processing the loop when the transport hasn't taken
332 * responsibility for forward progress.
333 */
334 mutex_unlock(&conn->c_send_lock);
335 359
336 if (conn->c_map_bytes || (send_quota == 0 && !was_empty)) { 360 /* Nuke any messages we decided not to retransmit. */
337 /* We exhausted the send quota, but there's work left to 361 if (!list_empty(&to_be_dropped)) {
338 * do. Return and (re-)schedule the send worker. 362 /* irqs on here, so we can put(), unlike above */
339 */ 363 list_for_each_entry(rm, &to_be_dropped, m_conn_item)
340 ret = -EAGAIN; 364 rds_message_put(rm);
365 rds_send_remove_from_sock(&to_be_dropped, RDS_RDMA_DROPPED);
341 } 366 }
342 367
343 if (ret == 0 && was_empty) { 368 /*
344 /* A simple bit test would be way faster than taking the 369 * Other senders can queue a message after we last test the send queue
345 * spin lock */ 370 * but before we clear RDS_IN_XMIT. In that case they'd back off and
346 spin_lock_irqsave(&conn->c_lock, flags); 371 * not try and send their newly queued message. We need to check the
372 * send queue after having cleared RDS_IN_XMIT so that their message
373 * doesn't get stuck on the send queue.
374 *
375 * If the transport cannot continue (i.e ret != 0), then it must
376 * call us when more room is available, such as from the tx
377 * completion handler.
378 */
379 if (ret == 0) {
380 smp_mb();
347 if (!list_empty(&conn->c_send_queue)) { 381 if (!list_empty(&conn->c_send_queue)) {
348 rds_stats_inc(s_send_sem_queue_raced); 382 rds_stats_inc(s_send_lock_queue_raced);
349 ret = -EAGAIN; 383 goto restart;
350 } 384 }
351 spin_unlock_irqrestore(&conn->c_lock, flags);
352 } 385 }
353out: 386out:
354 return ret; 387 return ret;
@@ -376,52 +409,60 @@ static inline int rds_send_is_acked(struct rds_message *rm, u64 ack,
376} 409}
377 410
378/* 411/*
379 * Returns true if there are no messages on the send and retransmit queues 412 * This is pretty similar to what happens below in the ACK
380 * which have a sequence number greater than or equal to the given sequence 413 * handling code - except that we call here as soon as we get
381 * number. 414 * the IB send completion on the RDMA op and the accompanying
415 * message.
382 */ 416 */
383int rds_send_acked_before(struct rds_connection *conn, u64 seq) 417void rds_rdma_send_complete(struct rds_message *rm, int status)
384{ 418{
385 struct rds_message *rm, *tmp; 419 struct rds_sock *rs = NULL;
386 int ret = 1; 420 struct rm_rdma_op *ro;
421 struct rds_notifier *notifier;
422 unsigned long flags;
387 423
388 spin_lock(&conn->c_lock); 424 spin_lock_irqsave(&rm->m_rs_lock, flags);
389 425
390 list_for_each_entry_safe(rm, tmp, &conn->c_retrans, m_conn_item) { 426 ro = &rm->rdma;
391 if (be64_to_cpu(rm->m_inc.i_hdr.h_sequence) < seq) 427 if (test_bit(RDS_MSG_ON_SOCK, &rm->m_flags) &&
392 ret = 0; 428 ro->op_active && ro->op_notify && ro->op_notifier) {
393 break; 429 notifier = ro->op_notifier;
394 } 430 rs = rm->m_rs;
431 sock_hold(rds_rs_to_sk(rs));
395 432
396 list_for_each_entry_safe(rm, tmp, &conn->c_send_queue, m_conn_item) { 433 notifier->n_status = status;
397 if (be64_to_cpu(rm->m_inc.i_hdr.h_sequence) < seq) 434 spin_lock(&rs->rs_lock);
398 ret = 0; 435 list_add_tail(&notifier->n_list, &rs->rs_notify_queue);
399 break; 436 spin_unlock(&rs->rs_lock);
437
438 ro->op_notifier = NULL;
400 } 439 }
401 440
402 spin_unlock(&conn->c_lock); 441 spin_unlock_irqrestore(&rm->m_rs_lock, flags);
403 442
404 return ret; 443 if (rs) {
444 rds_wake_sk_sleep(rs);
445 sock_put(rds_rs_to_sk(rs));
446 }
405} 447}
448EXPORT_SYMBOL_GPL(rds_rdma_send_complete);
406 449
407/* 450/*
408 * This is pretty similar to what happens below in the ACK 451 * Just like above, except looks at atomic op
409 * handling code - except that we call here as soon as we get
410 * the IB send completion on the RDMA op and the accompanying
411 * message.
412 */ 452 */
413void rds_rdma_send_complete(struct rds_message *rm, int status) 453void rds_atomic_send_complete(struct rds_message *rm, int status)
414{ 454{
415 struct rds_sock *rs = NULL; 455 struct rds_sock *rs = NULL;
416 struct rds_rdma_op *ro; 456 struct rm_atomic_op *ao;
417 struct rds_notifier *notifier; 457 struct rds_notifier *notifier;
458 unsigned long flags;
418 459
419 spin_lock(&rm->m_rs_lock); 460 spin_lock_irqsave(&rm->m_rs_lock, flags);
420 461
421 ro = rm->m_rdma_op; 462 ao = &rm->atomic;
422 if (test_bit(RDS_MSG_ON_SOCK, &rm->m_flags) && 463 if (test_bit(RDS_MSG_ON_SOCK, &rm->m_flags)
423 ro && ro->r_notify && ro->r_notifier) { 464 && ao->op_active && ao->op_notify && ao->op_notifier) {
424 notifier = ro->r_notifier; 465 notifier = ao->op_notifier;
425 rs = rm->m_rs; 466 rs = rm->m_rs;
426 sock_hold(rds_rs_to_sk(rs)); 467 sock_hold(rds_rs_to_sk(rs));
427 468
@@ -430,17 +471,17 @@ void rds_rdma_send_complete(struct rds_message *rm, int status)
430 list_add_tail(&notifier->n_list, &rs->rs_notify_queue); 471 list_add_tail(&notifier->n_list, &rs->rs_notify_queue);
431 spin_unlock(&rs->rs_lock); 472 spin_unlock(&rs->rs_lock);
432 473
433 ro->r_notifier = NULL; 474 ao->op_notifier = NULL;
434 } 475 }
435 476
436 spin_unlock(&rm->m_rs_lock); 477 spin_unlock_irqrestore(&rm->m_rs_lock, flags);
437 478
438 if (rs) { 479 if (rs) {
439 rds_wake_sk_sleep(rs); 480 rds_wake_sk_sleep(rs);
440 sock_put(rds_rs_to_sk(rs)); 481 sock_put(rds_rs_to_sk(rs));
441 } 482 }
442} 483}
443EXPORT_SYMBOL_GPL(rds_rdma_send_complete); 484EXPORT_SYMBOL_GPL(rds_atomic_send_complete);
444 485
445/* 486/*
446 * This is the same as rds_rdma_send_complete except we 487 * This is the same as rds_rdma_send_complete except we
@@ -448,15 +489,23 @@ EXPORT_SYMBOL_GPL(rds_rdma_send_complete);
448 * socket, socket lock) and can just move the notifier. 489 * socket, socket lock) and can just move the notifier.
449 */ 490 */
450static inline void 491static inline void
451__rds_rdma_send_complete(struct rds_sock *rs, struct rds_message *rm, int status) 492__rds_send_complete(struct rds_sock *rs, struct rds_message *rm, int status)
452{ 493{
453 struct rds_rdma_op *ro; 494 struct rm_rdma_op *ro;
495 struct rm_atomic_op *ao;
496
497 ro = &rm->rdma;
498 if (ro->op_active && ro->op_notify && ro->op_notifier) {
499 ro->op_notifier->n_status = status;
500 list_add_tail(&ro->op_notifier->n_list, &rs->rs_notify_queue);
501 ro->op_notifier = NULL;
502 }
454 503
455 ro = rm->m_rdma_op; 504 ao = &rm->atomic;
456 if (ro && ro->r_notify && ro->r_notifier) { 505 if (ao->op_active && ao->op_notify && ao->op_notifier) {
457 ro->r_notifier->n_status = status; 506 ao->op_notifier->n_status = status;
458 list_add_tail(&ro->r_notifier->n_list, &rs->rs_notify_queue); 507 list_add_tail(&ao->op_notifier->n_list, &rs->rs_notify_queue);
459 ro->r_notifier = NULL; 508 ao->op_notifier = NULL;
460 } 509 }
461 510
462 /* No need to wake the app - caller does this */ 511 /* No need to wake the app - caller does this */
@@ -468,7 +517,7 @@ __rds_rdma_send_complete(struct rds_sock *rs, struct rds_message *rm, int status
468 * So speed is not an issue here. 517 * So speed is not an issue here.
469 */ 518 */
470struct rds_message *rds_send_get_message(struct rds_connection *conn, 519struct rds_message *rds_send_get_message(struct rds_connection *conn,
471 struct rds_rdma_op *op) 520 struct rm_rdma_op *op)
472{ 521{
473 struct rds_message *rm, *tmp, *found = NULL; 522 struct rds_message *rm, *tmp, *found = NULL;
474 unsigned long flags; 523 unsigned long flags;
@@ -476,7 +525,7 @@ struct rds_message *rds_send_get_message(struct rds_connection *conn,
476 spin_lock_irqsave(&conn->c_lock, flags); 525 spin_lock_irqsave(&conn->c_lock, flags);
477 526
478 list_for_each_entry_safe(rm, tmp, &conn->c_retrans, m_conn_item) { 527 list_for_each_entry_safe(rm, tmp, &conn->c_retrans, m_conn_item) {
479 if (rm->m_rdma_op == op) { 528 if (&rm->rdma == op) {
480 atomic_inc(&rm->m_refcount); 529 atomic_inc(&rm->m_refcount);
481 found = rm; 530 found = rm;
482 goto out; 531 goto out;
@@ -484,7 +533,7 @@ struct rds_message *rds_send_get_message(struct rds_connection *conn,
484 } 533 }
485 534
486 list_for_each_entry_safe(rm, tmp, &conn->c_send_queue, m_conn_item) { 535 list_for_each_entry_safe(rm, tmp, &conn->c_send_queue, m_conn_item) {
487 if (rm->m_rdma_op == op) { 536 if (&rm->rdma == op) {
488 atomic_inc(&rm->m_refcount); 537 atomic_inc(&rm->m_refcount);
489 found = rm; 538 found = rm;
490 break; 539 break;
@@ -544,19 +593,20 @@ void rds_send_remove_from_sock(struct list_head *messages, int status)
544 spin_lock(&rs->rs_lock); 593 spin_lock(&rs->rs_lock);
545 594
546 if (test_and_clear_bit(RDS_MSG_ON_SOCK, &rm->m_flags)) { 595 if (test_and_clear_bit(RDS_MSG_ON_SOCK, &rm->m_flags)) {
547 struct rds_rdma_op *ro = rm->m_rdma_op; 596 struct rm_rdma_op *ro = &rm->rdma;
548 struct rds_notifier *notifier; 597 struct rds_notifier *notifier;
549 598
550 list_del_init(&rm->m_sock_item); 599 list_del_init(&rm->m_sock_item);
551 rds_send_sndbuf_remove(rs, rm); 600 rds_send_sndbuf_remove(rs, rm);
552 601
553 if (ro && ro->r_notifier && (status || ro->r_notify)) { 602 if (ro->op_active && ro->op_notifier &&
554 notifier = ro->r_notifier; 603 (ro->op_notify || (ro->op_recverr && status))) {
604 notifier = ro->op_notifier;
555 list_add_tail(&notifier->n_list, 605 list_add_tail(&notifier->n_list,
556 &rs->rs_notify_queue); 606 &rs->rs_notify_queue);
557 if (!notifier->n_status) 607 if (!notifier->n_status)
558 notifier->n_status = status; 608 notifier->n_status = status;
559 rm->m_rdma_op->r_notifier = NULL; 609 rm->rdma.op_notifier = NULL;
560 } 610 }
561 was_on_sock = 1; 611 was_on_sock = 1;
562 rm->m_rs = NULL; 612 rm->m_rs = NULL;
@@ -619,9 +669,8 @@ void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in *dest)
619{ 669{
620 struct rds_message *rm, *tmp; 670 struct rds_message *rm, *tmp;
621 struct rds_connection *conn; 671 struct rds_connection *conn;
622 unsigned long flags, flags2; 672 unsigned long flags;
623 LIST_HEAD(list); 673 LIST_HEAD(list);
624 int wake = 0;
625 674
626 /* get all the messages we're dropping under the rs lock */ 675 /* get all the messages we're dropping under the rs lock */
627 spin_lock_irqsave(&rs->rs_lock, flags); 676 spin_lock_irqsave(&rs->rs_lock, flags);
@@ -631,59 +680,54 @@ void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in *dest)
631 dest->sin_port != rm->m_inc.i_hdr.h_dport)) 680 dest->sin_port != rm->m_inc.i_hdr.h_dport))
632 continue; 681 continue;
633 682
634 wake = 1;
635 list_move(&rm->m_sock_item, &list); 683 list_move(&rm->m_sock_item, &list);
636 rds_send_sndbuf_remove(rs, rm); 684 rds_send_sndbuf_remove(rs, rm);
637 clear_bit(RDS_MSG_ON_SOCK, &rm->m_flags); 685 clear_bit(RDS_MSG_ON_SOCK, &rm->m_flags);
638 } 686 }
639 687
640 /* order flag updates with the rs lock */ 688 /* order flag updates with the rs lock */
641 if (wake) 689 smp_mb__after_clear_bit();
642 smp_mb__after_clear_bit();
643 690
644 spin_unlock_irqrestore(&rs->rs_lock, flags); 691 spin_unlock_irqrestore(&rs->rs_lock, flags);
645 692
646 conn = NULL; 693 if (list_empty(&list))
694 return;
647 695
648 /* now remove the messages from the conn list as needed */ 696 /* Remove the messages from the conn */
649 list_for_each_entry(rm, &list, m_sock_item) { 697 list_for_each_entry(rm, &list, m_sock_item) {
650 /* We do this here rather than in the loop above, so that
651 * we don't have to nest m_rs_lock under rs->rs_lock */
652 spin_lock_irqsave(&rm->m_rs_lock, flags2);
653 /* If this is a RDMA operation, notify the app. */
654 spin_lock(&rs->rs_lock);
655 __rds_rdma_send_complete(rs, rm, RDS_RDMA_CANCELED);
656 spin_unlock(&rs->rs_lock);
657 rm->m_rs = NULL;
658 spin_unlock_irqrestore(&rm->m_rs_lock, flags2);
659 698
699 conn = rm->m_inc.i_conn;
700
701 spin_lock_irqsave(&conn->c_lock, flags);
660 /* 702 /*
661 * If we see this flag cleared then we're *sure* that someone 703 * Maybe someone else beat us to removing rm from the conn.
662 * else beat us to removing it from the conn. If we race 704 * If we race with their flag update we'll get the lock and
663 * with their flag update we'll get the lock and then really 705 * then really see that the flag has been cleared.
664 * see that the flag has been cleared.
665 */ 706 */
666 if (!test_bit(RDS_MSG_ON_CONN, &rm->m_flags)) 707 if (!test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags)) {
708 spin_unlock_irqrestore(&conn->c_lock, flags);
667 continue; 709 continue;
668
669 if (conn != rm->m_inc.i_conn) {
670 if (conn)
671 spin_unlock_irqrestore(&conn->c_lock, flags);
672 conn = rm->m_inc.i_conn;
673 spin_lock_irqsave(&conn->c_lock, flags);
674 } 710 }
711 list_del_init(&rm->m_conn_item);
712 spin_unlock_irqrestore(&conn->c_lock, flags);
675 713
676 if (test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags)) { 714 /*
677 list_del_init(&rm->m_conn_item); 715 * Couldn't grab m_rs_lock in top loop (lock ordering),
678 rds_message_put(rm); 716 * but we can now.
679 } 717 */
680 } 718 spin_lock_irqsave(&rm->m_rs_lock, flags);
681 719
682 if (conn) 720 spin_lock(&rs->rs_lock);
683 spin_unlock_irqrestore(&conn->c_lock, flags); 721 __rds_send_complete(rs, rm, RDS_RDMA_CANCELED);
722 spin_unlock(&rs->rs_lock);
684 723
685 if (wake) 724 rm->m_rs = NULL;
686 rds_wake_sk_sleep(rs); 725 spin_unlock_irqrestore(&rm->m_rs_lock, flags);
726
727 rds_message_put(rm);
728 }
729
730 rds_wake_sk_sleep(rs);
687 731
688 while (!list_empty(&list)) { 732 while (!list_empty(&list)) {
689 rm = list_entry(list.next, struct rds_message, m_sock_item); 733 rm = list_entry(list.next, struct rds_message, m_sock_item);
@@ -763,6 +807,63 @@ out:
763 return *queued; 807 return *queued;
764} 808}
765 809
810/*
811 * rds_message is getting to be quite complicated, and we'd like to allocate
812 * it all in one go. This figures out how big it needs to be up front.
813 */
814static int rds_rm_size(struct msghdr *msg, int data_len)
815{
816 struct cmsghdr *cmsg;
817 int size = 0;
818 int cmsg_groups = 0;
819 int retval;
820
821 for (cmsg = CMSG_FIRSTHDR(msg); cmsg; cmsg = CMSG_NXTHDR(msg, cmsg)) {
822 if (!CMSG_OK(msg, cmsg))
823 return -EINVAL;
824
825 if (cmsg->cmsg_level != SOL_RDS)
826 continue;
827
828 switch (cmsg->cmsg_type) {
829 case RDS_CMSG_RDMA_ARGS:
830 cmsg_groups |= 1;
831 retval = rds_rdma_extra_size(CMSG_DATA(cmsg));
832 if (retval < 0)
833 return retval;
834 size += retval;
835
836 break;
837
838 case RDS_CMSG_RDMA_DEST:
839 case RDS_CMSG_RDMA_MAP:
840 cmsg_groups |= 2;
841 /* these are valid but do no add any size */
842 break;
843
844 case RDS_CMSG_ATOMIC_CSWP:
845 case RDS_CMSG_ATOMIC_FADD:
846 case RDS_CMSG_MASKED_ATOMIC_CSWP:
847 case RDS_CMSG_MASKED_ATOMIC_FADD:
848 cmsg_groups |= 1;
849 size += sizeof(struct scatterlist);
850 break;
851
852 default:
853 return -EINVAL;
854 }
855
856 }
857
858 size += ceil(data_len, PAGE_SIZE) * sizeof(struct scatterlist);
859
860 /* Ensure (DEST, MAP) are never used with (ARGS, ATOMIC) */
861 if (cmsg_groups == 3)
862 return -EINVAL;
863
864 return size;
865}
866
766static int rds_cmsg_send(struct rds_sock *rs, struct rds_message *rm, 867static int rds_cmsg_send(struct rds_sock *rs, struct rds_message *rm,
767 struct msghdr *msg, int *allocated_mr) 868 struct msghdr *msg, int *allocated_mr)
768{ 869{
@@ -777,7 +878,7 @@ static int rds_cmsg_send(struct rds_sock *rs, struct rds_message *rm,
777 continue; 878 continue;
778 879
779 /* As a side effect, RDMA_DEST and RDMA_MAP will set 880 /* As a side effect, RDMA_DEST and RDMA_MAP will set
780 * rm->m_rdma_cookie and rm->m_rdma_mr. 881 * rm->rdma.m_rdma_cookie and rm->rdma.m_rdma_mr.
781 */ 882 */
782 switch (cmsg->cmsg_type) { 883 switch (cmsg->cmsg_type) {
783 case RDS_CMSG_RDMA_ARGS: 884 case RDS_CMSG_RDMA_ARGS:
@@ -793,6 +894,12 @@ static int rds_cmsg_send(struct rds_sock *rs, struct rds_message *rm,
793 if (!ret) 894 if (!ret)
794 *allocated_mr = 1; 895 *allocated_mr = 1;
795 break; 896 break;
897 case RDS_CMSG_ATOMIC_CSWP:
898 case RDS_CMSG_ATOMIC_FADD:
899 case RDS_CMSG_MASKED_ATOMIC_CSWP:
900 case RDS_CMSG_MASKED_ATOMIC_FADD:
901 ret = rds_cmsg_atomic(rs, rm, cmsg);
902 break;
796 903
797 default: 904 default:
798 return -EINVAL; 905 return -EINVAL;
@@ -850,13 +957,26 @@ int rds_sendmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg,
850 goto out; 957 goto out;
851 } 958 }
852 959
853 rm = rds_message_copy_from_user(msg->msg_iov, payload_len); 960 /* size of rm including all sgs */
854 if (IS_ERR(rm)) { 961 ret = rds_rm_size(msg, payload_len);
855 ret = PTR_ERR(rm); 962 if (ret < 0)
856 rm = NULL; 963 goto out;
964
965 rm = rds_message_alloc(ret, GFP_KERNEL);
966 if (!rm) {
967 ret = -ENOMEM;
857 goto out; 968 goto out;
858 } 969 }
859 970
971 /* Attach data to the rm */
972 if (payload_len) {
973 rm->data.op_sg = rds_message_alloc_sgs(rm, ceil(payload_len, PAGE_SIZE));
974 ret = rds_message_copy_from_user(rm, msg->msg_iov, payload_len);
975 if (ret)
976 goto out;
977 }
978 rm->data.op_active = 1;
979
860 rm->m_daddr = daddr; 980 rm->m_daddr = daddr;
861 981
862 /* rds_conn_create has a spinlock that runs with IRQ off. 982 /* rds_conn_create has a spinlock that runs with IRQ off.
@@ -879,22 +999,23 @@ int rds_sendmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg,
879 if (ret) 999 if (ret)
880 goto out; 1000 goto out;
881 1001
882 if ((rm->m_rdma_cookie || rm->m_rdma_op) && 1002 if (rm->rdma.op_active && !conn->c_trans->xmit_rdma) {
883 conn->c_trans->xmit_rdma == NULL) {
884 if (printk_ratelimit()) 1003 if (printk_ratelimit())
885 printk(KERN_NOTICE "rdma_op %p conn xmit_rdma %p\n", 1004 printk(KERN_NOTICE "rdma_op %p conn xmit_rdma %p\n",
886 rm->m_rdma_op, conn->c_trans->xmit_rdma); 1005 &rm->rdma, conn->c_trans->xmit_rdma);
887 ret = -EOPNOTSUPP; 1006 ret = -EOPNOTSUPP;
888 goto out; 1007 goto out;
889 } 1008 }
890 1009
891 /* If the connection is down, trigger a connect. We may 1010 if (rm->atomic.op_active && !conn->c_trans->xmit_atomic) {
892 * have scheduled a delayed reconnect however - in this case 1011 if (printk_ratelimit())
893 * we should not interfere. 1012 printk(KERN_NOTICE "atomic_op %p conn xmit_atomic %p\n",
894 */ 1013 &rm->atomic, conn->c_trans->xmit_atomic);
895 if (rds_conn_state(conn) == RDS_CONN_DOWN && 1014 ret = -EOPNOTSUPP;
896 !test_and_set_bit(RDS_RECONNECT_PENDING, &conn->c_flags)) 1015 goto out;
897 queue_delayed_work(rds_wq, &conn->c_conn_w, 0); 1016 }
1017
1018 rds_conn_connect_if_down(conn);
898 1019
899 ret = rds_cong_wait(conn->c_fcong, dport, nonblock, rs); 1020 ret = rds_cong_wait(conn->c_fcong, dport, nonblock, rs);
900 if (ret) { 1021 if (ret) {
@@ -938,7 +1059,7 @@ int rds_sendmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg,
938 rds_stats_inc(s_send_queued); 1059 rds_stats_inc(s_send_queued);
939 1060
940 if (!test_bit(RDS_LL_SEND_FULL, &conn->c_flags)) 1061 if (!test_bit(RDS_LL_SEND_FULL, &conn->c_flags))
941 rds_send_worker(&conn->c_send_w.work); 1062 rds_send_xmit(conn);
942 1063
943 rds_message_put(rm); 1064 rds_message_put(rm);
944 return payload_len; 1065 return payload_len;
@@ -966,20 +1087,15 @@ rds_send_pong(struct rds_connection *conn, __be16 dport)
966 int ret = 0; 1087 int ret = 0;
967 1088
968 rm = rds_message_alloc(0, GFP_ATOMIC); 1089 rm = rds_message_alloc(0, GFP_ATOMIC);
969 if (rm == NULL) { 1090 if (!rm) {
970 ret = -ENOMEM; 1091 ret = -ENOMEM;
971 goto out; 1092 goto out;
972 } 1093 }
973 1094
974 rm->m_daddr = conn->c_faddr; 1095 rm->m_daddr = conn->c_faddr;
1096 rm->data.op_active = 1;
975 1097
976 /* If the connection is down, trigger a connect. We may 1098 rds_conn_connect_if_down(conn);
977 * have scheduled a delayed reconnect however - in this case
978 * we should not interfere.
979 */
980 if (rds_conn_state(conn) == RDS_CONN_DOWN &&
981 !test_and_set_bit(RDS_RECONNECT_PENDING, &conn->c_flags))
982 queue_delayed_work(rds_wq, &conn->c_conn_w, 0);
983 1099
984 ret = rds_cong_wait(conn->c_fcong, dport, 1, NULL); 1100 ret = rds_cong_wait(conn->c_fcong, dport, 1, NULL);
985 if (ret) 1101 if (ret)
@@ -999,7 +1115,9 @@ rds_send_pong(struct rds_connection *conn, __be16 dport)
999 rds_stats_inc(s_send_queued); 1115 rds_stats_inc(s_send_queued);
1000 rds_stats_inc(s_send_pong); 1116 rds_stats_inc(s_send_pong);
1001 1117
1002 queue_delayed_work(rds_wq, &conn->c_send_w, 0); 1118 if (!test_bit(RDS_LL_SEND_FULL, &conn->c_flags))
1119 rds_send_xmit(conn);
1120
1003 rds_message_put(rm); 1121 rds_message_put(rm);
1004 return 0; 1122 return 0;
1005 1123