aboutsummaryrefslogtreecommitdiffstats
path: root/net/rds/send.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/rds/send.c')
-rw-r--r--net/rds/send.c554
1 files changed, 339 insertions, 215 deletions
diff --git a/net/rds/send.c b/net/rds/send.c
index 9c1c6bcaa6c9..d58ae5f9339e 100644
--- a/net/rds/send.c
+++ b/net/rds/send.c
@@ -37,7 +37,6 @@
37#include <linux/list.h> 37#include <linux/list.h>
38 38
39#include "rds.h" 39#include "rds.h"
40#include "rdma.h"
41 40
42/* When transmitting messages in rds_send_xmit, we need to emerge from 41/* When transmitting messages in rds_send_xmit, we need to emerge from
43 * time to time and briefly release the CPU. Otherwise the softlock watchdog 42 * time to time and briefly release the CPU. Otherwise the softlock watchdog
@@ -53,8 +52,11 @@ static int send_batch_count = 64;
53module_param(send_batch_count, int, 0444); 52module_param(send_batch_count, int, 0444);
54MODULE_PARM_DESC(send_batch_count, " batch factor when working the send queue"); 53MODULE_PARM_DESC(send_batch_count, " batch factor when working the send queue");
55 54
55static void rds_send_remove_from_sock(struct list_head *messages, int status);
56
56/* 57/*
57 * Reset the send state. Caller must hold c_send_lock when calling here. 58 * Reset the send state. Callers must ensure that this doesn't race with
59 * rds_send_xmit().
58 */ 60 */
59void rds_send_reset(struct rds_connection *conn) 61void rds_send_reset(struct rds_connection *conn)
60{ 62{
@@ -62,18 +64,22 @@ void rds_send_reset(struct rds_connection *conn)
62 unsigned long flags; 64 unsigned long flags;
63 65
64 if (conn->c_xmit_rm) { 66 if (conn->c_xmit_rm) {
67 rm = conn->c_xmit_rm;
68 conn->c_xmit_rm = NULL;
65 /* Tell the user the RDMA op is no longer mapped by the 69 /* Tell the user the RDMA op is no longer mapped by the
66 * transport. This isn't entirely true (it's flushed out 70 * transport. This isn't entirely true (it's flushed out
67 * independently) but as the connection is down, there's 71 * independently) but as the connection is down, there's
68 * no ongoing RDMA to/from that memory */ 72 * no ongoing RDMA to/from that memory */
69 rds_message_unmapped(conn->c_xmit_rm); 73 rds_message_unmapped(rm);
70 rds_message_put(conn->c_xmit_rm); 74 rds_message_put(rm);
71 conn->c_xmit_rm = NULL;
72 } 75 }
76
73 conn->c_xmit_sg = 0; 77 conn->c_xmit_sg = 0;
74 conn->c_xmit_hdr_off = 0; 78 conn->c_xmit_hdr_off = 0;
75 conn->c_xmit_data_off = 0; 79 conn->c_xmit_data_off = 0;
80 conn->c_xmit_atomic_sent = 0;
76 conn->c_xmit_rdma_sent = 0; 81 conn->c_xmit_rdma_sent = 0;
82 conn->c_xmit_data_sent = 0;
77 83
78 conn->c_map_queued = 0; 84 conn->c_map_queued = 0;
79 85
@@ -90,8 +96,27 @@ void rds_send_reset(struct rds_connection *conn)
90 spin_unlock_irqrestore(&conn->c_lock, flags); 96 spin_unlock_irqrestore(&conn->c_lock, flags);
91} 97}
92 98
99static int acquire_in_xmit(struct rds_connection *conn)
100{
101 return test_and_set_bit(RDS_IN_XMIT, &conn->c_flags) == 0;
102}
103
104static void release_in_xmit(struct rds_connection *conn)
105{
106 clear_bit(RDS_IN_XMIT, &conn->c_flags);
107 smp_mb__after_clear_bit();
108 /*
109 * We don't use wait_on_bit()/wake_up_bit() because our waking is in a
110 * hot path and finding waiters is very rare. We don't want to walk
111 * the system-wide hashed waitqueue buckets in the fast path only to
112 * almost never find waiters.
113 */
114 if (waitqueue_active(&conn->c_waitq))
115 wake_up_all(&conn->c_waitq);
116}
117
93/* 118/*
94 * We're making the concious trade-off here to only send one message 119 * We're making the conscious trade-off here to only send one message
95 * down the connection at a time. 120 * down the connection at a time.
96 * Pro: 121 * Pro:
97 * - tx queueing is a simple fifo list 122 * - tx queueing is a simple fifo list
@@ -109,102 +134,69 @@ int rds_send_xmit(struct rds_connection *conn)
109 struct rds_message *rm; 134 struct rds_message *rm;
110 unsigned long flags; 135 unsigned long flags;
111 unsigned int tmp; 136 unsigned int tmp;
112 unsigned int send_quota = send_batch_count;
113 struct scatterlist *sg; 137 struct scatterlist *sg;
114 int ret = 0; 138 int ret = 0;
115 int was_empty = 0;
116 LIST_HEAD(to_be_dropped); 139 LIST_HEAD(to_be_dropped);
117 140
141restart:
142
118 /* 143 /*
119 * sendmsg calls here after having queued its message on the send 144 * sendmsg calls here after having queued its message on the send
120 * queue. We only have one task feeding the connection at a time. If 145 * queue. We only have one task feeding the connection at a time. If
121 * another thread is already feeding the queue then we back off. This 146 * another thread is already feeding the queue then we back off. This
122 * avoids blocking the caller and trading per-connection data between 147 * avoids blocking the caller and trading per-connection data between
123 * caches per message. 148 * caches per message.
124 *
125 * The sem holder will issue a retry if they notice that someone queued
126 * a message after they stopped walking the send queue but before they
127 * dropped the sem.
128 */ 149 */
129 if (!mutex_trylock(&conn->c_send_lock)) { 150 if (!acquire_in_xmit(conn)) {
130 rds_stats_inc(s_send_sem_contention); 151 rds_stats_inc(s_send_lock_contention);
131 ret = -ENOMEM; 152 ret = -ENOMEM;
132 goto out; 153 goto out;
133 } 154 }
134 155
156 /*
157 * rds_conn_shutdown() sets the conn state and then tests RDS_IN_XMIT,
158 * we do the opposite to avoid races.
159 */
160 if (!rds_conn_up(conn)) {
161 release_in_xmit(conn);
162 ret = 0;
163 goto out;
164 }
165
135 if (conn->c_trans->xmit_prepare) 166 if (conn->c_trans->xmit_prepare)
136 conn->c_trans->xmit_prepare(conn); 167 conn->c_trans->xmit_prepare(conn);
137 168
138 /* 169 /*
139 * spin trying to push headers and data down the connection until 170 * spin trying to push headers and data down the connection until
140 * the connection doens't make forward progress. 171 * the connection doesn't make forward progress.
141 */ 172 */
142 while (--send_quota) { 173 while (1) {
143 /*
144 * See if need to send a congestion map update if we're
145 * between sending messages. The send_sem protects our sole
146 * use of c_map_offset and _bytes.
147 * Note this is used only by transports that define a special
148 * xmit_cong_map function. For all others, we create allocate
149 * a cong_map message and treat it just like any other send.
150 */
151 if (conn->c_map_bytes) {
152 ret = conn->c_trans->xmit_cong_map(conn, conn->c_lcong,
153 conn->c_map_offset);
154 if (ret <= 0)
155 break;
156 174
157 conn->c_map_offset += ret;
158 conn->c_map_bytes -= ret;
159 if (conn->c_map_bytes)
160 continue;
161 }
162
163 /* If we're done sending the current message, clear the
164 * offset and S/G temporaries.
165 */
166 rm = conn->c_xmit_rm; 175 rm = conn->c_xmit_rm;
167 if (rm != NULL &&
168 conn->c_xmit_hdr_off == sizeof(struct rds_header) &&
169 conn->c_xmit_sg == rm->m_nents) {
170 conn->c_xmit_rm = NULL;
171 conn->c_xmit_sg = 0;
172 conn->c_xmit_hdr_off = 0;
173 conn->c_xmit_data_off = 0;
174 conn->c_xmit_rdma_sent = 0;
175
176 /* Release the reference to the previous message. */
177 rds_message_put(rm);
178 rm = NULL;
179 }
180 176
181 /* If we're asked to send a cong map update, do so. 177 /*
178 * If between sending messages, we can send a pending congestion
179 * map update.
182 */ 180 */
183 if (rm == NULL && test_and_clear_bit(0, &conn->c_map_queued)) { 181 if (!rm && test_and_clear_bit(0, &conn->c_map_queued)) {
184 if (conn->c_trans->xmit_cong_map != NULL) {
185 conn->c_map_offset = 0;
186 conn->c_map_bytes = sizeof(struct rds_header) +
187 RDS_CONG_MAP_BYTES;
188 continue;
189 }
190
191 rm = rds_cong_update_alloc(conn); 182 rm = rds_cong_update_alloc(conn);
192 if (IS_ERR(rm)) { 183 if (IS_ERR(rm)) {
193 ret = PTR_ERR(rm); 184 ret = PTR_ERR(rm);
194 break; 185 break;
195 } 186 }
187 rm->data.op_active = 1;
196 188
197 conn->c_xmit_rm = rm; 189 conn->c_xmit_rm = rm;
198 } 190 }
199 191
200 /* 192 /*
201 * Grab the next message from the send queue, if there is one. 193 * If not already working on one, grab the next message.
202 * 194 *
203 * c_xmit_rm holds a ref while we're sending this message down 195 * c_xmit_rm holds a ref while we're sending this message down
204 * the connction. We can use this ref while holding the 196 * the connction. We can use this ref while holding the
205 * send_sem.. rds_send_reset() is serialized with it. 197 * send_sem.. rds_send_reset() is serialized with it.
206 */ 198 */
207 if (rm == NULL) { 199 if (!rm) {
208 unsigned int len; 200 unsigned int len;
209 201
210 spin_lock_irqsave(&conn->c_lock, flags); 202 spin_lock_irqsave(&conn->c_lock, flags);
@@ -224,10 +216,8 @@ int rds_send_xmit(struct rds_connection *conn)
224 216
225 spin_unlock_irqrestore(&conn->c_lock, flags); 217 spin_unlock_irqrestore(&conn->c_lock, flags);
226 218
227 if (rm == NULL) { 219 if (!rm)
228 was_empty = 1;
229 break; 220 break;
230 }
231 221
232 /* Unfortunately, the way Infiniband deals with 222 /* Unfortunately, the way Infiniband deals with
233 * RDMA to a bad MR key is by moving the entire 223 * RDMA to a bad MR key is by moving the entire
@@ -236,13 +226,12 @@ int rds_send_xmit(struct rds_connection *conn)
236 * connection. 226 * connection.
237 * Therefore, we never retransmit messages with RDMA ops. 227 * Therefore, we never retransmit messages with RDMA ops.
238 */ 228 */
239 if (rm->m_rdma_op && 229 if (rm->rdma.op_active &&
240 test_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags)) { 230 test_bit(RDS_MSG_RETRANSMITTED, &rm->m_flags)) {
241 spin_lock_irqsave(&conn->c_lock, flags); 231 spin_lock_irqsave(&conn->c_lock, flags);
242 if (test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags)) 232 if (test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags))
243 list_move(&rm->m_conn_item, &to_be_dropped); 233 list_move(&rm->m_conn_item, &to_be_dropped);
244 spin_unlock_irqrestore(&conn->c_lock, flags); 234 spin_unlock_irqrestore(&conn->c_lock, flags);
245 rds_message_put(rm);
246 continue; 235 continue;
247 } 236 }
248 237
@@ -263,23 +252,55 @@ int rds_send_xmit(struct rds_connection *conn)
263 conn->c_xmit_rm = rm; 252 conn->c_xmit_rm = rm;
264 } 253 }
265 254
266 /* 255 /* The transport either sends the whole rdma or none of it */
267 * Try and send an rdma message. Let's see if we can 256 if (rm->rdma.op_active && !conn->c_xmit_rdma_sent) {
268 * keep this simple and require that the transport either 257 rm->m_final_op = &rm->rdma;
269 * send the whole rdma or none of it. 258 ret = conn->c_trans->xmit_rdma(conn, &rm->rdma);
270 */
271 if (rm->m_rdma_op && !conn->c_xmit_rdma_sent) {
272 ret = conn->c_trans->xmit_rdma(conn, rm->m_rdma_op);
273 if (ret) 259 if (ret)
274 break; 260 break;
275 conn->c_xmit_rdma_sent = 1; 261 conn->c_xmit_rdma_sent = 1;
262
263 /* The transport owns the mapped memory for now.
264 * You can't unmap it while it's on the send queue */
265 set_bit(RDS_MSG_MAPPED, &rm->m_flags);
266 }
267
268 if (rm->atomic.op_active && !conn->c_xmit_atomic_sent) {
269 rm->m_final_op = &rm->atomic;
270 ret = conn->c_trans->xmit_atomic(conn, &rm->atomic);
271 if (ret)
272 break;
273 conn->c_xmit_atomic_sent = 1;
274
276 /* The transport owns the mapped memory for now. 275 /* The transport owns the mapped memory for now.
277 * You can't unmap it while it's on the send queue */ 276 * You can't unmap it while it's on the send queue */
278 set_bit(RDS_MSG_MAPPED, &rm->m_flags); 277 set_bit(RDS_MSG_MAPPED, &rm->m_flags);
279 } 278 }
280 279
281 if (conn->c_xmit_hdr_off < sizeof(struct rds_header) || 280 /*
282 conn->c_xmit_sg < rm->m_nents) { 281 * A number of cases require an RDS header to be sent
282 * even if there is no data.
283 * We permit 0-byte sends; rds-ping depends on this.
284 * However, if there are exclusively attached silent ops,
285 * we skip the hdr/data send, to enable silent operation.
286 */
287 if (rm->data.op_nents == 0) {
288 int ops_present;
289 int all_ops_are_silent = 1;
290
291 ops_present = (rm->atomic.op_active || rm->rdma.op_active);
292 if (rm->atomic.op_active && !rm->atomic.op_silent)
293 all_ops_are_silent = 0;
294 if (rm->rdma.op_active && !rm->rdma.op_silent)
295 all_ops_are_silent = 0;
296
297 if (ops_present && all_ops_are_silent
298 && !rm->m_rdma_cookie)
299 rm->data.op_active = 0;
300 }
301
302 if (rm->data.op_active && !conn->c_xmit_data_sent) {
303 rm->m_final_op = &rm->data;
283 ret = conn->c_trans->xmit(conn, rm, 304 ret = conn->c_trans->xmit(conn, rm,
284 conn->c_xmit_hdr_off, 305 conn->c_xmit_hdr_off,
285 conn->c_xmit_sg, 306 conn->c_xmit_sg,
@@ -295,7 +316,7 @@ int rds_send_xmit(struct rds_connection *conn)
295 ret -= tmp; 316 ret -= tmp;
296 } 317 }
297 318
298 sg = &rm->m_sg[conn->c_xmit_sg]; 319 sg = &rm->data.op_sg[conn->c_xmit_sg];
299 while (ret) { 320 while (ret) {
300 tmp = min_t(int, ret, sg->length - 321 tmp = min_t(int, ret, sg->length -
301 conn->c_xmit_data_off); 322 conn->c_xmit_data_off);
@@ -306,49 +327,63 @@ int rds_send_xmit(struct rds_connection *conn)
306 sg++; 327 sg++;
307 conn->c_xmit_sg++; 328 conn->c_xmit_sg++;
308 BUG_ON(ret != 0 && 329 BUG_ON(ret != 0 &&
309 conn->c_xmit_sg == rm->m_nents); 330 conn->c_xmit_sg == rm->data.op_nents);
310 } 331 }
311 } 332 }
333
334 if (conn->c_xmit_hdr_off == sizeof(struct rds_header) &&
335 (conn->c_xmit_sg == rm->data.op_nents))
336 conn->c_xmit_data_sent = 1;
312 } 337 }
313 }
314 338
315 /* Nuke any messages we decided not to retransmit. */ 339 /*
316 if (!list_empty(&to_be_dropped)) 340 * A rm will only take multiple times through this loop
317 rds_send_remove_from_sock(&to_be_dropped, RDS_RDMA_DROPPED); 341 * if there is a data op. Thus, if the data is sent (or there was
342 * none), then we're done with the rm.
343 */
344 if (!rm->data.op_active || conn->c_xmit_data_sent) {
345 conn->c_xmit_rm = NULL;
346 conn->c_xmit_sg = 0;
347 conn->c_xmit_hdr_off = 0;
348 conn->c_xmit_data_off = 0;
349 conn->c_xmit_rdma_sent = 0;
350 conn->c_xmit_atomic_sent = 0;
351 conn->c_xmit_data_sent = 0;
352
353 rds_message_put(rm);
354 }
355 }
318 356
319 if (conn->c_trans->xmit_complete) 357 if (conn->c_trans->xmit_complete)
320 conn->c_trans->xmit_complete(conn); 358 conn->c_trans->xmit_complete(conn);
321 359
322 /* 360 release_in_xmit(conn);
323 * We might be racing with another sender who queued a message but
324 * backed off on noticing that we held the c_send_lock. If we check
325 * for queued messages after dropping the sem then either we'll
326 * see the queued message or the queuer will get the sem. If we
327 * notice the queued message then we trigger an immediate retry.
328 *
329 * We need to be careful only to do this when we stopped processing
330 * the send queue because it was empty. It's the only way we
331 * stop processing the loop when the transport hasn't taken
332 * responsibility for forward progress.
333 */
334 mutex_unlock(&conn->c_send_lock);
335 361
336 if (conn->c_map_bytes || (send_quota == 0 && !was_empty)) { 362 /* Nuke any messages we decided not to retransmit. */
337 /* We exhausted the send quota, but there's work left to 363 if (!list_empty(&to_be_dropped)) {
338 * do. Return and (re-)schedule the send worker. 364 /* irqs on here, so we can put(), unlike above */
339 */ 365 list_for_each_entry(rm, &to_be_dropped, m_conn_item)
340 ret = -EAGAIN; 366 rds_message_put(rm);
367 rds_send_remove_from_sock(&to_be_dropped, RDS_RDMA_DROPPED);
341 } 368 }
342 369
343 if (ret == 0 && was_empty) { 370 /*
344 /* A simple bit test would be way faster than taking the 371 * Other senders can queue a message after we last test the send queue
345 * spin lock */ 372 * but before we clear RDS_IN_XMIT. In that case they'd back off and
346 spin_lock_irqsave(&conn->c_lock, flags); 373 * not try and send their newly queued message. We need to check the
374 * send queue after having cleared RDS_IN_XMIT so that their message
375 * doesn't get stuck on the send queue.
376 *
377 * If the transport cannot continue (i.e ret != 0), then it must
378 * call us when more room is available, such as from the tx
379 * completion handler.
380 */
381 if (ret == 0) {
382 smp_mb();
347 if (!list_empty(&conn->c_send_queue)) { 383 if (!list_empty(&conn->c_send_queue)) {
348 rds_stats_inc(s_send_sem_queue_raced); 384 rds_stats_inc(s_send_lock_queue_raced);
349 ret = -EAGAIN; 385 goto restart;
350 } 386 }
351 spin_unlock_irqrestore(&conn->c_lock, flags);
352 } 387 }
353out: 388out:
354 return ret; 389 return ret;
@@ -376,52 +411,60 @@ static inline int rds_send_is_acked(struct rds_message *rm, u64 ack,
376} 411}
377 412
378/* 413/*
379 * Returns true if there are no messages on the send and retransmit queues 414 * This is pretty similar to what happens below in the ACK
380 * which have a sequence number greater than or equal to the given sequence 415 * handling code - except that we call here as soon as we get
381 * number. 416 * the IB send completion on the RDMA op and the accompanying
417 * message.
382 */ 418 */
383int rds_send_acked_before(struct rds_connection *conn, u64 seq) 419void rds_rdma_send_complete(struct rds_message *rm, int status)
384{ 420{
385 struct rds_message *rm, *tmp; 421 struct rds_sock *rs = NULL;
386 int ret = 1; 422 struct rm_rdma_op *ro;
423 struct rds_notifier *notifier;
424 unsigned long flags;
387 425
388 spin_lock(&conn->c_lock); 426 spin_lock_irqsave(&rm->m_rs_lock, flags);
389 427
390 list_for_each_entry_safe(rm, tmp, &conn->c_retrans, m_conn_item) { 428 ro = &rm->rdma;
391 if (be64_to_cpu(rm->m_inc.i_hdr.h_sequence) < seq) 429 if (test_bit(RDS_MSG_ON_SOCK, &rm->m_flags) &&
392 ret = 0; 430 ro->op_active && ro->op_notify && ro->op_notifier) {
393 break; 431 notifier = ro->op_notifier;
394 } 432 rs = rm->m_rs;
433 sock_hold(rds_rs_to_sk(rs));
395 434
396 list_for_each_entry_safe(rm, tmp, &conn->c_send_queue, m_conn_item) { 435 notifier->n_status = status;
397 if (be64_to_cpu(rm->m_inc.i_hdr.h_sequence) < seq) 436 spin_lock(&rs->rs_lock);
398 ret = 0; 437 list_add_tail(&notifier->n_list, &rs->rs_notify_queue);
399 break; 438 spin_unlock(&rs->rs_lock);
439
440 ro->op_notifier = NULL;
400 } 441 }
401 442
402 spin_unlock(&conn->c_lock); 443 spin_unlock_irqrestore(&rm->m_rs_lock, flags);
403 444
404 return ret; 445 if (rs) {
446 rds_wake_sk_sleep(rs);
447 sock_put(rds_rs_to_sk(rs));
448 }
405} 449}
450EXPORT_SYMBOL_GPL(rds_rdma_send_complete);
406 451
407/* 452/*
408 * This is pretty similar to what happens below in the ACK 453 * Just like above, except looks at atomic op
409 * handling code - except that we call here as soon as we get
410 * the IB send completion on the RDMA op and the accompanying
411 * message.
412 */ 454 */
413void rds_rdma_send_complete(struct rds_message *rm, int status) 455void rds_atomic_send_complete(struct rds_message *rm, int status)
414{ 456{
415 struct rds_sock *rs = NULL; 457 struct rds_sock *rs = NULL;
416 struct rds_rdma_op *ro; 458 struct rm_atomic_op *ao;
417 struct rds_notifier *notifier; 459 struct rds_notifier *notifier;
460 unsigned long flags;
418 461
419 spin_lock(&rm->m_rs_lock); 462 spin_lock_irqsave(&rm->m_rs_lock, flags);
420 463
421 ro = rm->m_rdma_op; 464 ao = &rm->atomic;
422 if (test_bit(RDS_MSG_ON_SOCK, &rm->m_flags) && 465 if (test_bit(RDS_MSG_ON_SOCK, &rm->m_flags)
423 ro && ro->r_notify && ro->r_notifier) { 466 && ao->op_active && ao->op_notify && ao->op_notifier) {
424 notifier = ro->r_notifier; 467 notifier = ao->op_notifier;
425 rs = rm->m_rs; 468 rs = rm->m_rs;
426 sock_hold(rds_rs_to_sk(rs)); 469 sock_hold(rds_rs_to_sk(rs));
427 470
@@ -430,17 +473,17 @@ void rds_rdma_send_complete(struct rds_message *rm, int status)
430 list_add_tail(&notifier->n_list, &rs->rs_notify_queue); 473 list_add_tail(&notifier->n_list, &rs->rs_notify_queue);
431 spin_unlock(&rs->rs_lock); 474 spin_unlock(&rs->rs_lock);
432 475
433 ro->r_notifier = NULL; 476 ao->op_notifier = NULL;
434 } 477 }
435 478
436 spin_unlock(&rm->m_rs_lock); 479 spin_unlock_irqrestore(&rm->m_rs_lock, flags);
437 480
438 if (rs) { 481 if (rs) {
439 rds_wake_sk_sleep(rs); 482 rds_wake_sk_sleep(rs);
440 sock_put(rds_rs_to_sk(rs)); 483 sock_put(rds_rs_to_sk(rs));
441 } 484 }
442} 485}
443EXPORT_SYMBOL_GPL(rds_rdma_send_complete); 486EXPORT_SYMBOL_GPL(rds_atomic_send_complete);
444 487
445/* 488/*
446 * This is the same as rds_rdma_send_complete except we 489 * This is the same as rds_rdma_send_complete except we
@@ -448,15 +491,23 @@ EXPORT_SYMBOL_GPL(rds_rdma_send_complete);
448 * socket, socket lock) and can just move the notifier. 491 * socket, socket lock) and can just move the notifier.
449 */ 492 */
450static inline void 493static inline void
451__rds_rdma_send_complete(struct rds_sock *rs, struct rds_message *rm, int status) 494__rds_send_complete(struct rds_sock *rs, struct rds_message *rm, int status)
452{ 495{
453 struct rds_rdma_op *ro; 496 struct rm_rdma_op *ro;
497 struct rm_atomic_op *ao;
498
499 ro = &rm->rdma;
500 if (ro->op_active && ro->op_notify && ro->op_notifier) {
501 ro->op_notifier->n_status = status;
502 list_add_tail(&ro->op_notifier->n_list, &rs->rs_notify_queue);
503 ro->op_notifier = NULL;
504 }
454 505
455 ro = rm->m_rdma_op; 506 ao = &rm->atomic;
456 if (ro && ro->r_notify && ro->r_notifier) { 507 if (ao->op_active && ao->op_notify && ao->op_notifier) {
457 ro->r_notifier->n_status = status; 508 ao->op_notifier->n_status = status;
458 list_add_tail(&ro->r_notifier->n_list, &rs->rs_notify_queue); 509 list_add_tail(&ao->op_notifier->n_list, &rs->rs_notify_queue);
459 ro->r_notifier = NULL; 510 ao->op_notifier = NULL;
460 } 511 }
461 512
462 /* No need to wake the app - caller does this */ 513 /* No need to wake the app - caller does this */
@@ -468,7 +519,7 @@ __rds_rdma_send_complete(struct rds_sock *rs, struct rds_message *rm, int status
468 * So speed is not an issue here. 519 * So speed is not an issue here.
469 */ 520 */
470struct rds_message *rds_send_get_message(struct rds_connection *conn, 521struct rds_message *rds_send_get_message(struct rds_connection *conn,
471 struct rds_rdma_op *op) 522 struct rm_rdma_op *op)
472{ 523{
473 struct rds_message *rm, *tmp, *found = NULL; 524 struct rds_message *rm, *tmp, *found = NULL;
474 unsigned long flags; 525 unsigned long flags;
@@ -476,7 +527,7 @@ struct rds_message *rds_send_get_message(struct rds_connection *conn,
476 spin_lock_irqsave(&conn->c_lock, flags); 527 spin_lock_irqsave(&conn->c_lock, flags);
477 528
478 list_for_each_entry_safe(rm, tmp, &conn->c_retrans, m_conn_item) { 529 list_for_each_entry_safe(rm, tmp, &conn->c_retrans, m_conn_item) {
479 if (rm->m_rdma_op == op) { 530 if (&rm->rdma == op) {
480 atomic_inc(&rm->m_refcount); 531 atomic_inc(&rm->m_refcount);
481 found = rm; 532 found = rm;
482 goto out; 533 goto out;
@@ -484,7 +535,7 @@ struct rds_message *rds_send_get_message(struct rds_connection *conn,
484 } 535 }
485 536
486 list_for_each_entry_safe(rm, tmp, &conn->c_send_queue, m_conn_item) { 537 list_for_each_entry_safe(rm, tmp, &conn->c_send_queue, m_conn_item) {
487 if (rm->m_rdma_op == op) { 538 if (&rm->rdma == op) {
488 atomic_inc(&rm->m_refcount); 539 atomic_inc(&rm->m_refcount);
489 found = rm; 540 found = rm;
490 break; 541 break;
@@ -506,7 +557,7 @@ EXPORT_SYMBOL_GPL(rds_send_get_message);
506 * removing the messages from the 'messages' list regardless of if it found 557 * removing the messages from the 'messages' list regardless of if it found
507 * the messages on the socket list or not. 558 * the messages on the socket list or not.
508 */ 559 */
509void rds_send_remove_from_sock(struct list_head *messages, int status) 560static void rds_send_remove_from_sock(struct list_head *messages, int status)
510{ 561{
511 unsigned long flags; 562 unsigned long flags;
512 struct rds_sock *rs = NULL; 563 struct rds_sock *rs = NULL;
@@ -544,19 +595,20 @@ void rds_send_remove_from_sock(struct list_head *messages, int status)
544 spin_lock(&rs->rs_lock); 595 spin_lock(&rs->rs_lock);
545 596
546 if (test_and_clear_bit(RDS_MSG_ON_SOCK, &rm->m_flags)) { 597 if (test_and_clear_bit(RDS_MSG_ON_SOCK, &rm->m_flags)) {
547 struct rds_rdma_op *ro = rm->m_rdma_op; 598 struct rm_rdma_op *ro = &rm->rdma;
548 struct rds_notifier *notifier; 599 struct rds_notifier *notifier;
549 600
550 list_del_init(&rm->m_sock_item); 601 list_del_init(&rm->m_sock_item);
551 rds_send_sndbuf_remove(rs, rm); 602 rds_send_sndbuf_remove(rs, rm);
552 603
553 if (ro && ro->r_notifier && (status || ro->r_notify)) { 604 if (ro->op_active && ro->op_notifier &&
554 notifier = ro->r_notifier; 605 (ro->op_notify || (ro->op_recverr && status))) {
606 notifier = ro->op_notifier;
555 list_add_tail(&notifier->n_list, 607 list_add_tail(&notifier->n_list,
556 &rs->rs_notify_queue); 608 &rs->rs_notify_queue);
557 if (!notifier->n_status) 609 if (!notifier->n_status)
558 notifier->n_status = status; 610 notifier->n_status = status;
559 rm->m_rdma_op->r_notifier = NULL; 611 rm->rdma.op_notifier = NULL;
560 } 612 }
561 was_on_sock = 1; 613 was_on_sock = 1;
562 rm->m_rs = NULL; 614 rm->m_rs = NULL;
@@ -619,9 +671,8 @@ void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in *dest)
619{ 671{
620 struct rds_message *rm, *tmp; 672 struct rds_message *rm, *tmp;
621 struct rds_connection *conn; 673 struct rds_connection *conn;
622 unsigned long flags, flags2; 674 unsigned long flags;
623 LIST_HEAD(list); 675 LIST_HEAD(list);
624 int wake = 0;
625 676
626 /* get all the messages we're dropping under the rs lock */ 677 /* get all the messages we're dropping under the rs lock */
627 spin_lock_irqsave(&rs->rs_lock, flags); 678 spin_lock_irqsave(&rs->rs_lock, flags);
@@ -631,59 +682,54 @@ void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in *dest)
631 dest->sin_port != rm->m_inc.i_hdr.h_dport)) 682 dest->sin_port != rm->m_inc.i_hdr.h_dport))
632 continue; 683 continue;
633 684
634 wake = 1;
635 list_move(&rm->m_sock_item, &list); 685 list_move(&rm->m_sock_item, &list);
636 rds_send_sndbuf_remove(rs, rm); 686 rds_send_sndbuf_remove(rs, rm);
637 clear_bit(RDS_MSG_ON_SOCK, &rm->m_flags); 687 clear_bit(RDS_MSG_ON_SOCK, &rm->m_flags);
638 } 688 }
639 689
640 /* order flag updates with the rs lock */ 690 /* order flag updates with the rs lock */
641 if (wake) 691 smp_mb__after_clear_bit();
642 smp_mb__after_clear_bit();
643 692
644 spin_unlock_irqrestore(&rs->rs_lock, flags); 693 spin_unlock_irqrestore(&rs->rs_lock, flags);
645 694
646 conn = NULL; 695 if (list_empty(&list))
696 return;
647 697
648 /* now remove the messages from the conn list as needed */ 698 /* Remove the messages from the conn */
649 list_for_each_entry(rm, &list, m_sock_item) { 699 list_for_each_entry(rm, &list, m_sock_item) {
650 /* We do this here rather than in the loop above, so that
651 * we don't have to nest m_rs_lock under rs->rs_lock */
652 spin_lock_irqsave(&rm->m_rs_lock, flags2);
653 /* If this is a RDMA operation, notify the app. */
654 spin_lock(&rs->rs_lock);
655 __rds_rdma_send_complete(rs, rm, RDS_RDMA_CANCELED);
656 spin_unlock(&rs->rs_lock);
657 rm->m_rs = NULL;
658 spin_unlock_irqrestore(&rm->m_rs_lock, flags2);
659 700
701 conn = rm->m_inc.i_conn;
702
703 spin_lock_irqsave(&conn->c_lock, flags);
660 /* 704 /*
661 * If we see this flag cleared then we're *sure* that someone 705 * Maybe someone else beat us to removing rm from the conn.
662 * else beat us to removing it from the conn. If we race 706 * If we race with their flag update we'll get the lock and
663 * with their flag update we'll get the lock and then really 707 * then really see that the flag has been cleared.
664 * see that the flag has been cleared.
665 */ 708 */
666 if (!test_bit(RDS_MSG_ON_CONN, &rm->m_flags)) 709 if (!test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags)) {
710 spin_unlock_irqrestore(&conn->c_lock, flags);
667 continue; 711 continue;
668
669 if (conn != rm->m_inc.i_conn) {
670 if (conn)
671 spin_unlock_irqrestore(&conn->c_lock, flags);
672 conn = rm->m_inc.i_conn;
673 spin_lock_irqsave(&conn->c_lock, flags);
674 } 712 }
713 list_del_init(&rm->m_conn_item);
714 spin_unlock_irqrestore(&conn->c_lock, flags);
675 715
676 if (test_and_clear_bit(RDS_MSG_ON_CONN, &rm->m_flags)) { 716 /*
677 list_del_init(&rm->m_conn_item); 717 * Couldn't grab m_rs_lock in top loop (lock ordering),
678 rds_message_put(rm); 718 * but we can now.
679 } 719 */
680 } 720 spin_lock_irqsave(&rm->m_rs_lock, flags);
681 721
682 if (conn) 722 spin_lock(&rs->rs_lock);
683 spin_unlock_irqrestore(&conn->c_lock, flags); 723 __rds_send_complete(rs, rm, RDS_RDMA_CANCELED);
724 spin_unlock(&rs->rs_lock);
684 725
685 if (wake) 726 rm->m_rs = NULL;
686 rds_wake_sk_sleep(rs); 727 spin_unlock_irqrestore(&rm->m_rs_lock, flags);
728
729 rds_message_put(rm);
730 }
731
732 rds_wake_sk_sleep(rs);
687 733
688 while (!list_empty(&list)) { 734 while (!list_empty(&list)) {
689 rm = list_entry(list.next, struct rds_message, m_sock_item); 735 rm = list_entry(list.next, struct rds_message, m_sock_item);
@@ -763,6 +809,63 @@ out:
763 return *queued; 809 return *queued;
764} 810}
765 811
812/*
813 * rds_message is getting to be quite complicated, and we'd like to allocate
814 * it all in one go. This figures out how big it needs to be up front.
815 */
816static int rds_rm_size(struct msghdr *msg, int data_len)
817{
818 struct cmsghdr *cmsg;
819 int size = 0;
820 int cmsg_groups = 0;
821 int retval;
822
823 for (cmsg = CMSG_FIRSTHDR(msg); cmsg; cmsg = CMSG_NXTHDR(msg, cmsg)) {
824 if (!CMSG_OK(msg, cmsg))
825 return -EINVAL;
826
827 if (cmsg->cmsg_level != SOL_RDS)
828 continue;
829
830 switch (cmsg->cmsg_type) {
831 case RDS_CMSG_RDMA_ARGS:
832 cmsg_groups |= 1;
833 retval = rds_rdma_extra_size(CMSG_DATA(cmsg));
834 if (retval < 0)
835 return retval;
836 size += retval;
837
838 break;
839
840 case RDS_CMSG_RDMA_DEST:
841 case RDS_CMSG_RDMA_MAP:
842 cmsg_groups |= 2;
843 /* these are valid but do no add any size */
844 break;
845
846 case RDS_CMSG_ATOMIC_CSWP:
847 case RDS_CMSG_ATOMIC_FADD:
848 case RDS_CMSG_MASKED_ATOMIC_CSWP:
849 case RDS_CMSG_MASKED_ATOMIC_FADD:
850 cmsg_groups |= 1;
851 size += sizeof(struct scatterlist);
852 break;
853
854 default:
855 return -EINVAL;
856 }
857
858 }
859
860 size += ceil(data_len, PAGE_SIZE) * sizeof(struct scatterlist);
861
862 /* Ensure (DEST, MAP) are never used with (ARGS, ATOMIC) */
863 if (cmsg_groups == 3)
864 return -EINVAL;
865
866 return size;
867}
868
766static int rds_cmsg_send(struct rds_sock *rs, struct rds_message *rm, 869static int rds_cmsg_send(struct rds_sock *rs, struct rds_message *rm,
767 struct msghdr *msg, int *allocated_mr) 870 struct msghdr *msg, int *allocated_mr)
768{ 871{
@@ -777,7 +880,7 @@ static int rds_cmsg_send(struct rds_sock *rs, struct rds_message *rm,
777 continue; 880 continue;
778 881
779 /* As a side effect, RDMA_DEST and RDMA_MAP will set 882 /* As a side effect, RDMA_DEST and RDMA_MAP will set
780 * rm->m_rdma_cookie and rm->m_rdma_mr. 883 * rm->rdma.m_rdma_cookie and rm->rdma.m_rdma_mr.
781 */ 884 */
782 switch (cmsg->cmsg_type) { 885 switch (cmsg->cmsg_type) {
783 case RDS_CMSG_RDMA_ARGS: 886 case RDS_CMSG_RDMA_ARGS:
@@ -793,6 +896,12 @@ static int rds_cmsg_send(struct rds_sock *rs, struct rds_message *rm,
793 if (!ret) 896 if (!ret)
794 *allocated_mr = 1; 897 *allocated_mr = 1;
795 break; 898 break;
899 case RDS_CMSG_ATOMIC_CSWP:
900 case RDS_CMSG_ATOMIC_FADD:
901 case RDS_CMSG_MASKED_ATOMIC_CSWP:
902 case RDS_CMSG_MASKED_ATOMIC_FADD:
903 ret = rds_cmsg_atomic(rs, rm, cmsg);
904 break;
796 905
797 default: 906 default:
798 return -EINVAL; 907 return -EINVAL;
@@ -850,13 +959,30 @@ int rds_sendmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg,
850 goto out; 959 goto out;
851 } 960 }
852 961
853 rm = rds_message_copy_from_user(msg->msg_iov, payload_len); 962 /* size of rm including all sgs */
854 if (IS_ERR(rm)) { 963 ret = rds_rm_size(msg, payload_len);
855 ret = PTR_ERR(rm); 964 if (ret < 0)
856 rm = NULL; 965 goto out;
966
967 rm = rds_message_alloc(ret, GFP_KERNEL);
968 if (!rm) {
969 ret = -ENOMEM;
857 goto out; 970 goto out;
858 } 971 }
859 972
973 /* Attach data to the rm */
974 if (payload_len) {
975 rm->data.op_sg = rds_message_alloc_sgs(rm, ceil(payload_len, PAGE_SIZE));
976 if (!rm->data.op_sg) {
977 ret = -ENOMEM;
978 goto out;
979 }
980 ret = rds_message_copy_from_user(rm, msg->msg_iov, payload_len);
981 if (ret)
982 goto out;
983 }
984 rm->data.op_active = 1;
985
860 rm->m_daddr = daddr; 986 rm->m_daddr = daddr;
861 987
862 /* rds_conn_create has a spinlock that runs with IRQ off. 988 /* rds_conn_create has a spinlock that runs with IRQ off.
@@ -879,22 +1005,23 @@ int rds_sendmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg,
879 if (ret) 1005 if (ret)
880 goto out; 1006 goto out;
881 1007
882 if ((rm->m_rdma_cookie || rm->m_rdma_op) && 1008 if (rm->rdma.op_active && !conn->c_trans->xmit_rdma) {
883 conn->c_trans->xmit_rdma == NULL) {
884 if (printk_ratelimit()) 1009 if (printk_ratelimit())
885 printk(KERN_NOTICE "rdma_op %p conn xmit_rdma %p\n", 1010 printk(KERN_NOTICE "rdma_op %p conn xmit_rdma %p\n",
886 rm->m_rdma_op, conn->c_trans->xmit_rdma); 1011 &rm->rdma, conn->c_trans->xmit_rdma);
887 ret = -EOPNOTSUPP; 1012 ret = -EOPNOTSUPP;
888 goto out; 1013 goto out;
889 } 1014 }
890 1015
891 /* If the connection is down, trigger a connect. We may 1016 if (rm->atomic.op_active && !conn->c_trans->xmit_atomic) {
892 * have scheduled a delayed reconnect however - in this case 1017 if (printk_ratelimit())
893 * we should not interfere. 1018 printk(KERN_NOTICE "atomic_op %p conn xmit_atomic %p\n",
894 */ 1019 &rm->atomic, conn->c_trans->xmit_atomic);
895 if (rds_conn_state(conn) == RDS_CONN_DOWN && 1020 ret = -EOPNOTSUPP;
896 !test_and_set_bit(RDS_RECONNECT_PENDING, &conn->c_flags)) 1021 goto out;
897 queue_delayed_work(rds_wq, &conn->c_conn_w, 0); 1022 }
1023
1024 rds_conn_connect_if_down(conn);
898 1025
899 ret = rds_cong_wait(conn->c_fcong, dport, nonblock, rs); 1026 ret = rds_cong_wait(conn->c_fcong, dport, nonblock, rs);
900 if (ret) { 1027 if (ret) {
@@ -938,7 +1065,7 @@ int rds_sendmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg,
938 rds_stats_inc(s_send_queued); 1065 rds_stats_inc(s_send_queued);
939 1066
940 if (!test_bit(RDS_LL_SEND_FULL, &conn->c_flags)) 1067 if (!test_bit(RDS_LL_SEND_FULL, &conn->c_flags))
941 rds_send_worker(&conn->c_send_w.work); 1068 rds_send_xmit(conn);
942 1069
943 rds_message_put(rm); 1070 rds_message_put(rm);
944 return payload_len; 1071 return payload_len;
@@ -966,20 +1093,15 @@ rds_send_pong(struct rds_connection *conn, __be16 dport)
966 int ret = 0; 1093 int ret = 0;
967 1094
968 rm = rds_message_alloc(0, GFP_ATOMIC); 1095 rm = rds_message_alloc(0, GFP_ATOMIC);
969 if (rm == NULL) { 1096 if (!rm) {
970 ret = -ENOMEM; 1097 ret = -ENOMEM;
971 goto out; 1098 goto out;
972 } 1099 }
973 1100
974 rm->m_daddr = conn->c_faddr; 1101 rm->m_daddr = conn->c_faddr;
1102 rm->data.op_active = 1;
975 1103
976 /* If the connection is down, trigger a connect. We may 1104 rds_conn_connect_if_down(conn);
977 * have scheduled a delayed reconnect however - in this case
978 * we should not interfere.
979 */
980 if (rds_conn_state(conn) == RDS_CONN_DOWN &&
981 !test_and_set_bit(RDS_RECONNECT_PENDING, &conn->c_flags))
982 queue_delayed_work(rds_wq, &conn->c_conn_w, 0);
983 1105
984 ret = rds_cong_wait(conn->c_fcong, dport, 1, NULL); 1106 ret = rds_cong_wait(conn->c_fcong, dport, 1, NULL);
985 if (ret) 1107 if (ret)
@@ -999,7 +1121,9 @@ rds_send_pong(struct rds_connection *conn, __be16 dport)
999 rds_stats_inc(s_send_queued); 1121 rds_stats_inc(s_send_queued);
1000 rds_stats_inc(s_send_pong); 1122 rds_stats_inc(s_send_pong);
1001 1123
1002 queue_delayed_work(rds_wq, &conn->c_send_w, 0); 1124 if (!test_bit(RDS_LL_SEND_FULL, &conn->c_flags))
1125 rds_send_xmit(conn);
1126
1003 rds_message_put(rm); 1127 rds_message_put(rm);
1004 return 0; 1128 return 0;
1005 1129