aboutsummaryrefslogtreecommitdiffstats
path: root/net/rds
diff options
context:
space:
mode:
authorAndy Grover <andy.grover@oracle.com>2010-03-01 19:10:40 -0500
committerAndy Grover <andy.grover@oracle.com>2010-09-08 21:12:06 -0400
commit2c3a5f9abb1dc5efdab8ba9a568b1661c65fd1e3 (patch)
tree72b6a4c5298fecf46637987fd8ae42296dc90a97 /net/rds
parent7e3bd65ebfd5d6cd76b8b979920c632d6e6b4b2a (diff)
RDS: Add flag for silent ops. Do atomic op before RDMA
Add a flag to the API so users can indicate they want silent operations. This is needed because silent ops cannot be used with USE_ONCE MRs, so we can't just assume silent. Also, change send_xmit to do atomic op before rdma op if both are present, and centralize the hairy logic to determine if we want to attempt silent, or not. Signed-off-by: Andy Grover <andy.grover@oracle.com>
Diffstat (limited to 'net/rds')
-rw-r--r--net/rds/rdma.c2
-rw-r--r--net/rds/rds.h2
-rw-r--r--net/rds/send.c55
3 files changed, 35 insertions, 24 deletions
diff --git a/net/rds/rdma.c b/net/rds/rdma.c
index 5ba514684431..48781fe4431c 100644
--- a/net/rds/rdma.c
+++ b/net/rds/rdma.c
@@ -559,6 +559,7 @@ int rds_cmsg_rdma_args(struct rds_sock *rs, struct rds_message *rm,
559 op->op_write = !!(args->flags & RDS_RDMA_READWRITE); 559 op->op_write = !!(args->flags & RDS_RDMA_READWRITE);
560 op->op_fence = !!(args->flags & RDS_RDMA_FENCE); 560 op->op_fence = !!(args->flags & RDS_RDMA_FENCE);
561 op->op_notify = !!(args->flags & RDS_RDMA_NOTIFY_ME); 561 op->op_notify = !!(args->flags & RDS_RDMA_NOTIFY_ME);
562 op->op_silent = !!(args->flags & RDS_RDMA_SILENT);
562 op->op_active = 1; 563 op->op_active = 1;
563 op->op_recverr = rs->rs_recverr; 564 op->op_recverr = rs->rs_recverr;
564 WARN_ON(!nr_pages); 565 WARN_ON(!nr_pages);
@@ -747,6 +748,7 @@ int rds_cmsg_atomic(struct rds_sock *rs, struct rds_message *rm,
747 } 748 }
748 749
749 rm->atomic.op_notify = !!(args->flags & RDS_RDMA_NOTIFY_ME); 750 rm->atomic.op_notify = !!(args->flags & RDS_RDMA_NOTIFY_ME);
751 rm->atomic.op_silent = !!(args->flags & RDS_RDMA_SILENT);
750 rm->atomic.op_active = 1; 752 rm->atomic.op_active = 1;
751 rm->atomic.op_recverr = rs->rs_recverr; 753 rm->atomic.op_recverr = rs->rs_recverr;
752 rm->atomic.op_sg = rds_message_alloc_sgs(rm, 1); 754 rm->atomic.op_sg = rds_message_alloc_sgs(rm, 1);
diff --git a/net/rds/rds.h b/net/rds/rds.h
index 46d190d08549..23b921000e74 100644
--- a/net/rds/rds.h
+++ b/net/rds/rds.h
@@ -319,6 +319,7 @@ struct rds_message {
319 unsigned int op_notify:1; 319 unsigned int op_notify:1;
320 unsigned int op_recverr:1; 320 unsigned int op_recverr:1;
321 unsigned int op_mapped:1; 321 unsigned int op_mapped:1;
322 unsigned int op_silent:1;
322 unsigned int op_active:1; 323 unsigned int op_active:1;
323 struct scatterlist *op_sg; 324 struct scatterlist *op_sg;
324 struct rds_notifier *op_notifier; 325 struct rds_notifier *op_notifier;
@@ -333,6 +334,7 @@ struct rds_message {
333 unsigned int op_notify:1; 334 unsigned int op_notify:1;
334 unsigned int op_recverr:1; 335 unsigned int op_recverr:1;
335 unsigned int op_mapped:1; 336 unsigned int op_mapped:1;
337 unsigned int op_silent:1;
336 unsigned int op_active:1; 338 unsigned int op_active:1;
337 unsigned int op_bytes; 339 unsigned int op_bytes;
338 unsigned int op_nents; 340 unsigned int op_nents;
diff --git a/net/rds/send.c b/net/rds/send.c
index cdca9747fcbc..38567f3ee7e8 100644
--- a/net/rds/send.c
+++ b/net/rds/send.c
@@ -250,42 +250,50 @@ int rds_send_xmit(struct rds_connection *conn)
250 conn->c_xmit_rm = rm; 250 conn->c_xmit_rm = rm;
251 } 251 }
252 252
253 if (rm->atomic.op_active && !conn->c_xmit_atomic_sent) { 253 /* The transport either sends the whole rdma or none of it */
254 ret = conn->c_trans->xmit_atomic(conn, rm); 254 if (rm->rdma.op_active && !conn->c_xmit_rdma_sent) {
255 ret = conn->c_trans->xmit_rdma(conn, &rm->rdma);
255 if (ret) 256 if (ret)
256 break; 257 break;
257 conn->c_xmit_atomic_sent = 1; 258 conn->c_xmit_rdma_sent = 1;
259
258 /* The transport owns the mapped memory for now. 260 /* The transport owns the mapped memory for now.
259 * You can't unmap it while it's on the send queue */ 261 * You can't unmap it while it's on the send queue */
260 set_bit(RDS_MSG_MAPPED, &rm->m_flags); 262 set_bit(RDS_MSG_MAPPED, &rm->m_flags);
261
262 /*
263 * This is evil, muahaha.
264 * We permit 0-byte sends. (rds-ping depends on this.)
265 * BUT if there is an atomic op and no sent data,
266 * we turn off sending the header, to achieve
267 * "silent" atomics.
268 * But see below; RDMA op might toggle this back on!
269 */
270 if (rm->data.op_nents == 0)
271 rm->data.op_active = 0;
272 } 263 }
273 264
274 /* The transport either sends the whole rdma or none of it */ 265 if (rm->atomic.op_active && !conn->c_xmit_atomic_sent) {
275 if (rm->rdma.op_active && !conn->c_xmit_rdma_sent) { 266 ret = conn->c_trans->xmit_atomic(conn, rm);
276 ret = conn->c_trans->xmit_rdma(conn, &rm->rdma);
277 if (ret) 267 if (ret)
278 break; 268 break;
279 conn->c_xmit_rdma_sent = 1; 269 conn->c_xmit_atomic_sent = 1;
280
281 /* rdmas need data sent, even if just the header */
282 rm->data.op_active = 1;
283
284 /* The transport owns the mapped memory for now. 270 /* The transport owns the mapped memory for now.
285 * You can't unmap it while it's on the send queue */ 271 * You can't unmap it while it's on the send queue */
286 set_bit(RDS_MSG_MAPPED, &rm->m_flags); 272 set_bit(RDS_MSG_MAPPED, &rm->m_flags);
287 } 273 }
288 274
275 /*
276 * A number of cases require an RDS header to be sent
277 * even if there is no data.
278 * We permit 0-byte sends; rds-ping depends on this.
279 * However, if there are exclusively attached silent ops,
280 * we skip the hdr/data send, to enable silent operation.
281 */
282 if (rm->data.op_nents == 0) {
283 int ops_present;
284 int all_ops_are_silent = 1;
285
286 ops_present = (rm->atomic.op_active || rm->rdma.op_active);
287 if (rm->atomic.op_active && !rm->atomic.op_silent)
288 all_ops_are_silent = 0;
289 if (rm->rdma.op_active && !rm->rdma.op_silent)
290 all_ops_are_silent = 0;
291
292 if (ops_present && all_ops_are_silent
293 && !rm->m_rdma_cookie)
294 rm->data.op_active = 0;
295 }
296
289 if (rm->data.op_active && !conn->c_xmit_data_sent) { 297 if (rm->data.op_active && !conn->c_xmit_data_sent) {
290 ret = conn->c_trans->xmit(conn, rm, 298 ret = conn->c_trans->xmit(conn, rm,
291 conn->c_xmit_hdr_off, 299 conn->c_xmit_hdr_off,
@@ -1009,8 +1017,7 @@ int rds_sendmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg,
1009 if (ret) 1017 if (ret)
1010 goto out; 1018 goto out;
1011 1019
1012 if ((rm->m_rdma_cookie || rm->rdma.op_active) && 1020 if (rm->rdma.op_active && !conn->c_trans->xmit_rdma) {
1013 !conn->c_trans->xmit_rdma) {
1014 if (printk_ratelimit()) 1021 if (printk_ratelimit())
1015 printk(KERN_NOTICE "rdma_op %p conn xmit_rdma %p\n", 1022 printk(KERN_NOTICE "rdma_op %p conn xmit_rdma %p\n",
1016 &rm->rdma, conn->c_trans->xmit_rdma); 1023 &rm->rdma, conn->c_trans->xmit_rdma);