aboutsummaryrefslogtreecommitdiffstats
path: root/net/rds
diff options
context:
space:
mode:
authorAndy Grover <andy.grover@oracle.com>2010-01-12 17:33:38 -0500
committerAndy Grover <andy.grover@oracle.com>2010-09-08 21:11:41 -0400
commit15133f6e67d8d646d0744336b4daa3135452cb0d (patch)
treee5675d5a3ab240edc9a66af6b891dd75fa9eabae /net/rds
parenta63273d4992603979ddb181b6a8f07082839b39f (diff)
RDS: Implement atomic operations
Implement a CMSG-based interface to do FADD and CSWP ops. Alter send routines to handle atomic ops. Add atomic counters to stats. Add xmit_atomic() to struct rds_transport Inline rds_ib_send_unmap_rdma into unmap_rm Signed-off-by: Andy Grover <andy.grover@oracle.com>
Diffstat (limited to 'net/rds')
-rw-r--r--net/rds/ib.c1
-rw-r--r--net/rds/ib.h1
-rw-r--r--net/rds/ib_rdma.c4
-rw-r--r--net/rds/ib_send.c140
-rw-r--r--net/rds/rdma.c73
-rw-r--r--net/rds/rds.h33
-rw-r--r--net/rds/send.c71
-rw-r--r--net/rds/stats.c2
8 files changed, 302 insertions, 23 deletions
diff --git a/net/rds/ib.c b/net/rds/ib.c
index 8f2d6dd7700a..f0d29656baff 100644
--- a/net/rds/ib.c
+++ b/net/rds/ib.c
@@ -264,6 +264,7 @@ struct rds_transport rds_ib_transport = {
264 .xmit = rds_ib_xmit, 264 .xmit = rds_ib_xmit,
265 .xmit_cong_map = NULL, 265 .xmit_cong_map = NULL,
266 .xmit_rdma = rds_ib_xmit_rdma, 266 .xmit_rdma = rds_ib_xmit_rdma,
267 .xmit_atomic = rds_ib_xmit_atomic,
267 .recv = rds_ib_recv, 268 .recv = rds_ib_recv,
268 .conn_alloc = rds_ib_conn_alloc, 269 .conn_alloc = rds_ib_conn_alloc,
269 .conn_free = rds_ib_conn_free, 270 .conn_free = rds_ib_conn_free,
diff --git a/net/rds/ib.h b/net/rds/ib.h
index 64df4e79b29f..d2fd0aa4fde7 100644
--- a/net/rds/ib.h
+++ b/net/rds/ib.h
@@ -336,6 +336,7 @@ void rds_ib_send_add_credits(struct rds_connection *conn, unsigned int credits);
336void rds_ib_advertise_credits(struct rds_connection *conn, unsigned int posted); 336void rds_ib_advertise_credits(struct rds_connection *conn, unsigned int posted);
337int rds_ib_send_grab_credits(struct rds_ib_connection *ic, u32 wanted, 337int rds_ib_send_grab_credits(struct rds_ib_connection *ic, u32 wanted,
338 u32 *adv_credits, int need_posted, int max_posted); 338 u32 *adv_credits, int need_posted, int max_posted);
339int rds_ib_xmit_atomic(struct rds_connection *conn, struct rm_atomic_op *op);
339 340
340/* ib_stats.c */ 341/* ib_stats.c */
341DECLARE_PER_CPU(struct rds_ib_statistics, rds_ib_stats); 342DECLARE_PER_CPU(struct rds_ib_statistics, rds_ib_stats);
diff --git a/net/rds/ib_rdma.c b/net/rds/ib_rdma.c
index 0f3b5a2f3fe0..242231f09464 100644
--- a/net/rds/ib_rdma.c
+++ b/net/rds/ib_rdma.c
@@ -298,7 +298,9 @@ static struct rds_ib_mr *rds_ib_alloc_fmr(struct rds_ib_device *rds_ibdev)
298 ibmr->fmr = ib_alloc_fmr(rds_ibdev->pd, 298 ibmr->fmr = ib_alloc_fmr(rds_ibdev->pd,
299 (IB_ACCESS_LOCAL_WRITE | 299 (IB_ACCESS_LOCAL_WRITE |
300 IB_ACCESS_REMOTE_READ | 300 IB_ACCESS_REMOTE_READ |
301 IB_ACCESS_REMOTE_WRITE), 301 IB_ACCESS_REMOTE_WRITE|
302 IB_ACCESS_REMOTE_ATOMIC),
303
302 &pool->fmr_attr); 304 &pool->fmr_attr);
303 if (IS_ERR(ibmr->fmr)) { 305 if (IS_ERR(ibmr->fmr)) {
304 err = PTR_ERR(ibmr->fmr); 306 err = PTR_ERR(ibmr->fmr);
diff --git a/net/rds/ib_send.c b/net/rds/ib_send.c
index f0edfdb2866c..b2bd164434ad 100644
--- a/net/rds/ib_send.c
+++ b/net/rds/ib_send.c
@@ -62,15 +62,17 @@ static void rds_ib_send_rdma_complete(struct rds_message *rm,
62 rds_rdma_send_complete(rm, notify_status); 62 rds_rdma_send_complete(rm, notify_status);
63} 63}
64 64
65static void rds_ib_send_unmap_rdma(struct rds_ib_connection *ic, 65static void rds_ib_send_atomic_complete(struct rds_message *rm,
66 struct rds_rdma_op *op) 66 int wc_status)
67{ 67{
68 if (op->r_mapped) { 68 int notify_status;
69 ib_dma_unmap_sg(ic->i_cm_id->device, 69
70 op->r_sg, op->r_nents, 70 if (wc_status != IB_WC_SUCCESS)
71 op->r_write ? DMA_TO_DEVICE : DMA_FROM_DEVICE); 71 notify_status = RDS_RDMA_OTHER_ERROR;
72 op->r_mapped = 0; 72 else
73 } 73 notify_status = RDS_RDMA_SUCCESS;
74
75 rds_atomic_send_complete(rm, notify_status);
74} 76}
75 77
76static void rds_ib_send_unmap_rm(struct rds_ib_connection *ic, 78static void rds_ib_send_unmap_rm(struct rds_ib_connection *ic,
@@ -86,7 +88,14 @@ static void rds_ib_send_unmap_rm(struct rds_ib_connection *ic,
86 DMA_TO_DEVICE); 88 DMA_TO_DEVICE);
87 89
88 if (rm->rdma.m_rdma_op.r_active) { 90 if (rm->rdma.m_rdma_op.r_active) {
89 rds_ib_send_unmap_rdma(ic, &rm->rdma.m_rdma_op); 91 struct rds_rdma_op *op = &rm->rdma.m_rdma_op;
92
93 if (op->r_mapped) {
94 ib_dma_unmap_sg(ic->i_cm_id->device,
95 op->r_sg, op->r_nents,
96 op->r_write ? DMA_TO_DEVICE : DMA_FROM_DEVICE);
97 op->r_mapped = 0;
98 }
90 99
91 /* If the user asked for a completion notification on this 100 /* If the user asked for a completion notification on this
92 * message, we can implement three different semantics: 101 * message, we can implement three different semantics:
@@ -116,6 +125,24 @@ static void rds_ib_send_unmap_rm(struct rds_ib_connection *ic,
116 rds_stats_add(s_recv_rdma_bytes, rm->rdma.m_rdma_op.r_bytes); 125 rds_stats_add(s_recv_rdma_bytes, rm->rdma.m_rdma_op.r_bytes);
117 } 126 }
118 127
128 if (rm->atomic.op_active) {
129 struct rm_atomic_op *op = &rm->atomic;
130
131 /* unmap atomic recvbuf */
132 if (op->op_mapped) {
133 ib_dma_unmap_sg(ic->i_cm_id->device, op->op_sg, 1,
134 DMA_FROM_DEVICE);
135 op->op_mapped = 0;
136 }
137
138 rds_ib_send_atomic_complete(rm, wc_status);
139
140 if (rm->atomic.op_type == RDS_ATOMIC_TYPE_CSWP)
141 rds_stats_inc(s_atomic_cswp);
142 else
143 rds_stats_inc(s_atomic_fadd);
144 }
145
119 /* If anyone waited for this message to get flushed out, wake 146 /* If anyone waited for this message to get flushed out, wake
120 * them up now */ 147 * them up now */
121 rds_message_unmapped(rm); 148 rds_message_unmapped(rm);
@@ -158,12 +185,9 @@ void rds_ib_send_clear_ring(struct rds_ib_connection *ic)
158 u32 i; 185 u32 i;
159 186
160 for (i = 0, send = ic->i_sends; i < ic->i_send_ring.w_nr; i++, send++) { 187 for (i = 0, send = ic->i_sends; i < ic->i_send_ring.w_nr; i++, send++) {
161 if (send->s_wr.opcode == 0xdead) 188 if (!send->s_rm || send->s_wr.opcode == 0xdead)
162 continue; 189 continue;
163 if (send->s_rm) 190 rds_ib_send_unmap_rm(ic, send, IB_WC_WR_FLUSH_ERR);
164 rds_ib_send_unmap_rm(ic, send, IB_WC_WR_FLUSH_ERR);
165 if (send->s_op)
166 rds_ib_send_unmap_rdma(ic, send->s_op);
167 } 191 }
168} 192}
169 193
@@ -218,6 +242,8 @@ void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context)
218 break; 242 break;
219 case IB_WR_RDMA_WRITE: 243 case IB_WR_RDMA_WRITE:
220 case IB_WR_RDMA_READ: 244 case IB_WR_RDMA_READ:
245 case IB_WR_ATOMIC_FETCH_AND_ADD:
246 case IB_WR_ATOMIC_CMP_AND_SWP:
221 /* Nothing to be done - the SG list will be unmapped 247 /* Nothing to be done - the SG list will be unmapped
222 * when the SEND completes. */ 248 * when the SEND completes. */
223 break; 249 break;
@@ -243,8 +269,7 @@ void rds_ib_send_cq_comp_handler(struct ib_cq *cq, void *context)
243 269
244 rm = rds_send_get_message(conn, send->s_op); 270 rm = rds_send_get_message(conn, send->s_op);
245 if (rm) { 271 if (rm) {
246 if (rm->rdma.m_rdma_op.r_active) 272 rds_ib_send_unmap_rm(ic, send, wc.status);
247 rds_ib_send_unmap_rdma(ic, &rm->rdma.m_rdma_op);
248 rds_ib_send_rdma_complete(rm, wc.status); 273 rds_ib_send_rdma_complete(rm, wc.status);
249 rds_message_put(rm); 274 rds_message_put(rm);
250 } 275 }
@@ -736,6 +761,89 @@ out:
736 return ret; 761 return ret;
737} 762}
738 763
764/*
765 * Issue atomic operation.
766 * A simplified version of the rdma case, we always map 1 SG, and
767 * only 8 bytes, for the return value from the atomic operation.
768 */
769int rds_ib_xmit_atomic(struct rds_connection *conn, struct rm_atomic_op *op)
770{
771 struct rds_ib_connection *ic = conn->c_transport_data;
772 struct rds_ib_send_work *send = NULL;
773 struct ib_send_wr *failed_wr;
774 struct rds_ib_device *rds_ibdev;
775 u32 pos;
776 u32 work_alloc;
777 int ret;
778
779 rds_ibdev = ib_get_client_data(ic->i_cm_id->device, &rds_ib_client);
780
781 work_alloc = rds_ib_ring_alloc(&ic->i_send_ring, 1, &pos);
782 if (work_alloc != 1) {
783 rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
784 rds_ib_stats_inc(s_ib_tx_ring_full);
785 ret = -ENOMEM;
786 goto out;
787 }
788
789 /* address of send request in ring */
790 send = &ic->i_sends[pos];
791 send->s_queued = jiffies;
792
793 if (op->op_type == RDS_ATOMIC_TYPE_CSWP) {
794 send->s_wr.opcode = IB_WR_ATOMIC_CMP_AND_SWP;
795 send->s_wr.wr.atomic.compare_add = op->op_compare;
796 send->s_wr.wr.atomic.swap = op->op_swap_add;
797 } else { /* FADD */
798 send->s_wr.opcode = IB_WR_ATOMIC_FETCH_AND_ADD;
799 send->s_wr.wr.atomic.compare_add = op->op_swap_add;
800 send->s_wr.wr.atomic.swap = 0;
801 }
802 send->s_wr.send_flags = IB_SEND_SIGNALED;
803 send->s_wr.num_sge = 1;
804 send->s_wr.next = NULL;
805 send->s_wr.wr.atomic.remote_addr = op->op_remote_addr;
806 send->s_wr.wr.atomic.rkey = op->op_rkey;
807
808 /* map 8 byte retval buffer to the device */
809 ret = ib_dma_map_sg(ic->i_cm_id->device, op->op_sg, 1, DMA_FROM_DEVICE);
810 rdsdebug("ic %p mapping atomic op %p. mapped %d pg\n", ic, op, ret);
811 if (ret != 1) {
812 rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
813 rds_ib_stats_inc(s_ib_tx_sg_mapping_failure);
814 ret = -ENOMEM; /* XXX ? */
815 goto out;
816 }
817
818 /* Convert our struct scatterlist to struct ib_sge */
819 send->s_sge[0].addr = ib_sg_dma_address(ic->i_cm_id->device, op->op_sg);
820 send->s_sge[0].length = ib_sg_dma_len(ic->i_cm_id->device, op->op_sg);
821 send->s_sge[0].lkey = ic->i_mr->lkey;
822
823 rdsdebug("rva %Lx rpa %Lx len %u\n", op->op_remote_addr,
824 send->s_sge[0].addr, send->s_sge[0].length);
825
826 failed_wr = &send->s_wr;
827 ret = ib_post_send(ic->i_cm_id->qp, &send->s_wr, &failed_wr);
828 rdsdebug("ic %p send %p (wr %p) ret %d wr %p\n", ic,
829 send, &send->s_wr, ret, failed_wr);
830 BUG_ON(failed_wr != &send->s_wr);
831 if (ret) {
832 printk(KERN_WARNING "RDS/IB: atomic ib_post_send to %pI4 "
833 "returned %d\n", &conn->c_faddr, ret);
834 rds_ib_ring_unalloc(&ic->i_send_ring, work_alloc);
835 goto out;
836 }
837
838 if (unlikely(failed_wr != &send->s_wr)) {
839 printk(KERN_WARNING "RDS/IB: atomic ib_post_send() rc=%d, but failed_wqe updated!\n", ret);
840 BUG_ON(failed_wr != &send->s_wr);
841 }
842
843out:
844 return ret;
845}
846
739int rds_ib_xmit_rdma(struct rds_connection *conn, struct rds_rdma_op *op) 847int rds_ib_xmit_rdma(struct rds_connection *conn, struct rds_rdma_op *op)
740{ 848{
741 struct rds_ib_connection *ic = conn->c_transport_data; 849 struct rds_ib_connection *ic = conn->c_transport_data;
diff --git a/net/rds/rdma.c b/net/rds/rdma.c
index 4fda33045598..a7019df38c70 100644
--- a/net/rds/rdma.c
+++ b/net/rds/rdma.c
@@ -719,3 +719,76 @@ int rds_cmsg_rdma_map(struct rds_sock *rs, struct rds_message *rm,
719 719
720 return __rds_rdma_map(rs, CMSG_DATA(cmsg), &rm->m_rdma_cookie, &rm->rdma.m_rdma_mr); 720 return __rds_rdma_map(rs, CMSG_DATA(cmsg), &rm->m_rdma_cookie, &rm->rdma.m_rdma_mr);
721} 721}
722
723/*
724 * Fill in rds_message for an atomic request.
725 */
726int rds_cmsg_atomic(struct rds_sock *rs, struct rds_message *rm,
727 struct cmsghdr *cmsg)
728{
729 struct page *page = NULL;
730 struct rds_atomic_args *args;
731 int ret = 0;
732
733 if (cmsg->cmsg_len < CMSG_LEN(sizeof(struct rds_atomic_args))
734 || rm->atomic.op_active)
735 return -EINVAL;
736
737 args = CMSG_DATA(cmsg);
738
739 if (cmsg->cmsg_type == RDS_CMSG_ATOMIC_CSWP) {
740 rm->atomic.op_type = RDS_ATOMIC_TYPE_CSWP;
741 rm->atomic.op_swap_add = args->cswp.swap;
742 rm->atomic.op_compare = args->cswp.compare;
743 } else {
744 rm->atomic.op_type = RDS_ATOMIC_TYPE_FADD;
745 rm->atomic.op_swap_add = args->fadd.add;
746 }
747
748 rm->m_rdma_cookie = args->cookie;
749 rm->atomic.op_notify = !!(args->flags & RDS_RDMA_NOTIFY_ME);
750 rm->atomic.op_recverr = rs->rs_recverr;
751 rm->atomic.op_sg = rds_message_alloc_sgs(rm, 1);
752
753 /* verify 8 byte-aligned */
754 if (args->local_addr & 0x7) {
755 ret = -EFAULT;
756 goto err;
757 }
758
759 ret = rds_pin_pages(args->local_addr, 1, &page, 1);
760 if (ret != 1)
761 goto err;
762 ret = 0;
763
764 sg_set_page(rm->atomic.op_sg, page, 8, offset_in_page(args->local_addr));
765
766 if (rm->atomic.op_notify || rm->atomic.op_recverr) {
767 /* We allocate an uninitialized notifier here, because
768 * we don't want to do that in the completion handler. We
769 * would have to use GFP_ATOMIC there, and don't want to deal
770 * with failed allocations.
771 */
772 rm->atomic.op_notifier = kmalloc(sizeof(*rm->atomic.op_notifier), GFP_KERNEL);
773 if (!rm->atomic.op_notifier) {
774 ret = -ENOMEM;
775 goto err;
776 }
777
778 rm->atomic.op_notifier->n_user_token = args->user_token;
779 rm->atomic.op_notifier->n_status = RDS_RDMA_SUCCESS;
780 }
781
782 rm->atomic.op_rkey = rds_rdma_cookie_key(rm->m_rdma_cookie);
783 rm->atomic.op_remote_addr = args->remote_addr + rds_rdma_cookie_offset(args->cookie);
784
785 rm->atomic.op_active = 1;
786
787 return ret;
788err:
789 if (page)
790 put_page(page);
791 kfree(rm->atomic.op_notifier);
792
793 return ret;
794}
diff --git a/net/rds/rds.h b/net/rds/rds.h
index 0bb4957e0cfc..830e2bbb3332 100644
--- a/net/rds/rds.h
+++ b/net/rds/rds.h
@@ -97,6 +97,7 @@ struct rds_connection {
97 unsigned int c_xmit_hdr_off; 97 unsigned int c_xmit_hdr_off;
98 unsigned int c_xmit_data_off; 98 unsigned int c_xmit_data_off;
99 unsigned int c_xmit_rdma_sent; 99 unsigned int c_xmit_rdma_sent;
100 unsigned int c_xmit_atomic_sent;
100 101
101 spinlock_t c_lock; /* protect msg queues */ 102 spinlock_t c_lock; /* protect msg queues */
102 u64 c_next_tx_seq; 103 u64 c_next_tx_seq;
@@ -260,6 +261,10 @@ static inline u32 rds_rdma_cookie_offset(rds_rdma_cookie_t cookie)
260 return cookie >> 32; 261 return cookie >> 32;
261} 262}
262 263
264/* atomic operation types */
265#define RDS_ATOMIC_TYPE_CSWP 0
266#define RDS_ATOMIC_TYPE_FADD 1
267
263/* 268/*
264 * m_sock_item and m_conn_item are on lists that are serialized under 269 * m_sock_item and m_conn_item are on lists that are serialized under
265 * conn->c_lock. m_sock_item has additional meaning in that once it is empty 270 * conn->c_lock. m_sock_item has additional meaning in that once it is empty
@@ -315,11 +320,27 @@ struct rds_message {
315 struct rds_sock *m_rs; 320 struct rds_sock *m_rs;
316 rds_rdma_cookie_t m_rdma_cookie; 321 rds_rdma_cookie_t m_rdma_cookie;
317 struct { 322 struct {
318 struct { 323 struct rm_atomic_op {
324 int op_type;
325 uint64_t op_swap_add;
326 uint64_t op_compare;
327
328 u32 op_rkey;
329 u64 op_remote_addr;
330 unsigned int op_notify:1;
331 unsigned int op_recverr:1;
332 unsigned int op_mapped:1;
333 unsigned int op_active:1;
334 struct rds_notifier *op_notifier;
335 struct scatterlist *op_sg;
336
337 struct rds_mr *op_rdma_mr;
338 } atomic;
339 struct rm_rdma_op {
319 struct rds_rdma_op m_rdma_op; 340 struct rds_rdma_op m_rdma_op;
320 struct rds_mr *m_rdma_mr; 341 struct rds_mr *m_rdma_mr;
321 } rdma; 342 } rdma;
322 struct { 343 struct rm_data_op {
323 unsigned int m_nents; 344 unsigned int m_nents;
324 unsigned int m_count; 345 unsigned int m_count;
325 struct scatterlist *m_sg; 346 struct scatterlist *m_sg;
@@ -397,6 +418,7 @@ struct rds_transport {
397 int (*xmit_cong_map)(struct rds_connection *conn, 418 int (*xmit_cong_map)(struct rds_connection *conn,
398 struct rds_cong_map *map, unsigned long offset); 419 struct rds_cong_map *map, unsigned long offset);
399 int (*xmit_rdma)(struct rds_connection *conn, struct rds_rdma_op *op); 420 int (*xmit_rdma)(struct rds_connection *conn, struct rds_rdma_op *op);
421 int (*xmit_atomic)(struct rds_connection *conn, struct rm_atomic_op *op);
400 int (*recv)(struct rds_connection *conn); 422 int (*recv)(struct rds_connection *conn);
401 int (*inc_copy_to_user)(struct rds_incoming *inc, struct iovec *iov, 423 int (*inc_copy_to_user)(struct rds_incoming *inc, struct iovec *iov,
402 size_t size); 424 size_t size);
@@ -546,6 +568,8 @@ struct rds_statistics {
546 uint64_t s_cong_update_received; 568 uint64_t s_cong_update_received;
547 uint64_t s_cong_send_error; 569 uint64_t s_cong_send_error;
548 uint64_t s_cong_send_blocked; 570 uint64_t s_cong_send_blocked;
571 uint64_t s_atomic_cswp;
572 uint64_t s_atomic_fadd;
549}; 573};
550 574
551/* af_rds.c */ 575/* af_rds.c */
@@ -722,7 +746,10 @@ int rds_cmsg_rdma_args(struct rds_sock *rs, struct rds_message *rm,
722int rds_cmsg_rdma_map(struct rds_sock *rs, struct rds_message *rm, 746int rds_cmsg_rdma_map(struct rds_sock *rs, struct rds_message *rm,
723 struct cmsghdr *cmsg); 747 struct cmsghdr *cmsg);
724void rds_rdma_free_op(struct rds_rdma_op *ro); 748void rds_rdma_free_op(struct rds_rdma_op *ro);
725void rds_rdma_send_complete(struct rds_message *rm, int); 749void rds_rdma_send_complete(struct rds_message *rm, int wc_status);
750void rds_atomic_send_complete(struct rds_message *rm, int wc_status);
751int rds_cmsg_atomic(struct rds_sock *rs, struct rds_message *rm,
752 struct cmsghdr *cmsg);
726 753
727extern void __rds_put_mr_final(struct rds_mr *mr); 754extern void __rds_put_mr_final(struct rds_mr *mr);
728static inline void rds_mr_put(struct rds_mr *mr) 755static inline void rds_mr_put(struct rds_mr *mr)
diff --git a/net/rds/send.c b/net/rds/send.c
index b751a8e77c41..f3f4e79274bf 100644
--- a/net/rds/send.c
+++ b/net/rds/send.c
@@ -73,6 +73,7 @@ void rds_send_reset(struct rds_connection *conn)
73 conn->c_xmit_hdr_off = 0; 73 conn->c_xmit_hdr_off = 0;
74 conn->c_xmit_data_off = 0; 74 conn->c_xmit_data_off = 0;
75 conn->c_xmit_rdma_sent = 0; 75 conn->c_xmit_rdma_sent = 0;
76 conn->c_xmit_atomic_sent = 0;
76 77
77 conn->c_map_queued = 0; 78 conn->c_map_queued = 0;
78 79
@@ -171,6 +172,7 @@ int rds_send_xmit(struct rds_connection *conn)
171 conn->c_xmit_hdr_off = 0; 172 conn->c_xmit_hdr_off = 0;
172 conn->c_xmit_data_off = 0; 173 conn->c_xmit_data_off = 0;
173 conn->c_xmit_rdma_sent = 0; 174 conn->c_xmit_rdma_sent = 0;
175 conn->c_xmit_atomic_sent = 0;
174 176
175 /* Release the reference to the previous message. */ 177 /* Release the reference to the previous message. */
176 rds_message_put(rm); 178 rds_message_put(rm);
@@ -262,6 +264,17 @@ int rds_send_xmit(struct rds_connection *conn)
262 conn->c_xmit_rm = rm; 264 conn->c_xmit_rm = rm;
263 } 265 }
264 266
267
268 if (rm->atomic.op_active && !conn->c_xmit_atomic_sent) {
269 ret = conn->c_trans->xmit_atomic(conn, &rm->atomic);
270 if (ret)
271 break;
272 conn->c_xmit_atomic_sent = 1;
273 /* The transport owns the mapped memory for now.
274 * You can't unmap it while it's on the send queue */
275 set_bit(RDS_MSG_MAPPED, &rm->m_flags);
276 }
277
265 /* 278 /*
266 * Try and send an rdma message. Let's see if we can 279 * Try and send an rdma message. Let's see if we can
267 * keep this simple and require that the transport either 280 * keep this simple and require that the transport either
@@ -443,6 +456,41 @@ void rds_rdma_send_complete(struct rds_message *rm, int status)
443EXPORT_SYMBOL_GPL(rds_rdma_send_complete); 456EXPORT_SYMBOL_GPL(rds_rdma_send_complete);
444 457
445/* 458/*
459 * Just like above, except looks at atomic op
460 */
461void rds_atomic_send_complete(struct rds_message *rm, int status)
462{
463 struct rds_sock *rs = NULL;
464 struct rm_atomic_op *ao;
465 struct rds_notifier *notifier;
466
467 spin_lock(&rm->m_rs_lock);
468
469 ao = &rm->atomic;
470 if (test_bit(RDS_MSG_ON_SOCK, &rm->m_flags)
471 && ao->op_active && ao->op_notify && ao->op_notifier) {
472 notifier = ao->op_notifier;
473 rs = rm->m_rs;
474 sock_hold(rds_rs_to_sk(rs));
475
476 notifier->n_status = status;
477 spin_lock(&rs->rs_lock);
478 list_add_tail(&notifier->n_list, &rs->rs_notify_queue);
479 spin_unlock(&rs->rs_lock);
480
481 ao->op_notifier = NULL;
482 }
483
484 spin_unlock(&rm->m_rs_lock);
485
486 if (rs) {
487 rds_wake_sk_sleep(rs);
488 sock_put(rds_rs_to_sk(rs));
489 }
490}
491EXPORT_SYMBOL_GPL(rds_atomic_send_complete);
492
493/*
446 * This is the same as rds_rdma_send_complete except we 494 * This is the same as rds_rdma_send_complete except we
447 * don't do any locking - we have all the ingredients (message, 495 * don't do any locking - we have all the ingredients (message,
448 * socket, socket lock) and can just move the notifier. 496 * socket, socket lock) and can just move the notifier.
@@ -788,6 +836,11 @@ static int rds_rm_size(struct msghdr *msg, int data_len)
788 /* these are valid but do no add any size */ 836 /* these are valid but do no add any size */
789 break; 837 break;
790 838
839 case RDS_CMSG_ATOMIC_CSWP:
840 case RDS_CMSG_ATOMIC_FADD:
841 size += sizeof(struct scatterlist);
842 break;
843
791 default: 844 default:
792 return -EINVAL; 845 return -EINVAL;
793 } 846 }
@@ -813,7 +866,7 @@ static int rds_cmsg_send(struct rds_sock *rs, struct rds_message *rm,
813 continue; 866 continue;
814 867
815 /* As a side effect, RDMA_DEST and RDMA_MAP will set 868 /* As a side effect, RDMA_DEST and RDMA_MAP will set
816 * rm->m_rdma_cookie and rm->m_rdma_mr. 869 * rm->rdma.m_rdma_cookie and rm->rdma.m_rdma_mr.
817 */ 870 */
818 switch (cmsg->cmsg_type) { 871 switch (cmsg->cmsg_type) {
819 case RDS_CMSG_RDMA_ARGS: 872 case RDS_CMSG_RDMA_ARGS:
@@ -829,6 +882,10 @@ static int rds_cmsg_send(struct rds_sock *rs, struct rds_message *rm,
829 if (!ret) 882 if (!ret)
830 *allocated_mr = 1; 883 *allocated_mr = 1;
831 break; 884 break;
885 case RDS_CMSG_ATOMIC_CSWP:
886 case RDS_CMSG_ATOMIC_FADD:
887 ret = rds_cmsg_atomic(rs, rm, cmsg);
888 break;
832 889
833 default: 890 default:
834 return -EINVAL; 891 return -EINVAL;
@@ -926,10 +983,18 @@ int rds_sendmsg(struct kiocb *iocb, struct socket *sock, struct msghdr *msg,
926 goto out; 983 goto out;
927 984
928 if ((rm->m_rdma_cookie || rm->rdma.m_rdma_op.r_active) && 985 if ((rm->m_rdma_cookie || rm->rdma.m_rdma_op.r_active) &&
929 !conn->c_trans->xmit_rdma) { 986 !conn->c_trans->xmit_rdma) {
930 if (printk_ratelimit()) 987 if (printk_ratelimit())
931 printk(KERN_NOTICE "rdma_op %p conn xmit_rdma %p\n", 988 printk(KERN_NOTICE "rdma_op %p conn xmit_rdma %p\n",
932 &rm->rdma.m_rdma_op, conn->c_trans->xmit_rdma); 989 &rm->rdma.m_rdma_op, conn->c_trans->xmit_rdma);
990 ret = -EOPNOTSUPP;
991 goto out;
992 }
993
994 if (rm->atomic.op_active && !conn->c_trans->xmit_atomic) {
995 if (printk_ratelimit())
996 printk(KERN_NOTICE "atomic_op %p conn xmit_atomic %p\n",
997 &rm->atomic, conn->c_trans->xmit_atomic);
933 ret = -EOPNOTSUPP; 998 ret = -EOPNOTSUPP;
934 goto out; 999 goto out;
935 } 1000 }
diff --git a/net/rds/stats.c b/net/rds/stats.c
index 7598eb07cfb1..c66d95d9c262 100644
--- a/net/rds/stats.c
+++ b/net/rds/stats.c
@@ -75,6 +75,8 @@ static const char *const rds_stat_names[] = {
75 "cong_update_received", 75 "cong_update_received",
76 "cong_send_error", 76 "cong_send_error",
77 "cong_send_blocked", 77 "cong_send_blocked",
78 "s_atomic_cswp",
79 "s_atomic_fadd",
78}; 80};
79 81
80void rds_stats_info_copy(struct rds_info_iterator *iter, 82void rds_stats_info_copy(struct rds_info_iterator *iter,