summaryrefslogtreecommitdiffstats
path: root/net/rds
diff options
context:
space:
mode:
authorSowmini Varadhan <sowmini.varadhan@oracle.com>2018-02-15 13:49:35 -0500
committerDavid S. Miller <davem@davemloft.net>2018-02-16 16:04:17 -0500
commit01883eda72bd3f0a6c81447e4f223de14033fd9d (patch)
tree1d70322d05664e531520ae30d47ee5ef7267c644 /net/rds
parent28190752c709272de3c2b6b092029da3f1614c5a (diff)
rds: support for zcopy completion notification
RDS removes a datagram (rds_message) from the retransmit queue when an ACK is received. The ACK indicates that the receiver has queued the RDS datagram, so that the sender can safely forget the datagram. When all references to the rds_message are quiesced, rds_message_purge is called to release resources used by the rds_message If the datagram to be removed had pinned pages set up, add an entry to the rs->rs_znotify_queue so that the notifcation will be sent up via rds_rm_zerocopy_callback() when the rds_message is eventually freed by rds_message_purge. rds_rm_zerocopy_callback() attempts to batch the number of cookies sent with each notification to a max of SO_EE_ORIGIN_MAX_ZCOOKIES. This is achieved by checking the tail skb in the sk_error_queue: if this has room for one more cookie, the cookie from the current notification is added; else a new skb is added to the sk_error_queue. Every invocation of rds_rm_zerocopy_callback() will trigger a ->sk_error_report to notify the application. Signed-off-by: Sowmini Varadhan <sowmini.varadhan@oracle.com> Acked-by: Santosh Shilimkar <santosh.shilimkar@oracle.com> Acked-by: Willem de Bruijn <willemb@google.com> Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net/rds')
-rw-r--r--net/rds/af_rds.c2
-rw-r--r--net/rds/message.c83
-rw-r--r--net/rds/rds.h14
-rw-r--r--net/rds/recv.c2
4 files changed, 94 insertions, 7 deletions
diff --git a/net/rds/af_rds.c b/net/rds/af_rds.c
index 0a8eefd256b3..a937f18896ae 100644
--- a/net/rds/af_rds.c
+++ b/net/rds/af_rds.c
@@ -182,6 +182,8 @@ static __poll_t rds_poll(struct file *file, struct socket *sock,
182 mask |= (EPOLLIN | EPOLLRDNORM); 182 mask |= (EPOLLIN | EPOLLRDNORM);
183 if (rs->rs_snd_bytes < rds_sk_sndbuf(rs)) 183 if (rs->rs_snd_bytes < rds_sk_sndbuf(rs))
184 mask |= (EPOLLOUT | EPOLLWRNORM); 184 mask |= (EPOLLOUT | EPOLLWRNORM);
185 if (sk->sk_err || !skb_queue_empty(&sk->sk_error_queue))
186 mask |= POLLERR;
185 read_unlock_irqrestore(&rs->rs_recv_lock, flags); 187 read_unlock_irqrestore(&rs->rs_recv_lock, flags);
186 188
187 /* clear state any time we wake a seen-congested socket */ 189 /* clear state any time we wake a seen-congested socket */
diff --git a/net/rds/message.c b/net/rds/message.c
index ef3daafa3d79..bf1a656b198a 100644
--- a/net/rds/message.c
+++ b/net/rds/message.c
@@ -33,6 +33,9 @@
33#include <linux/kernel.h> 33#include <linux/kernel.h>
34#include <linux/slab.h> 34#include <linux/slab.h>
35#include <linux/export.h> 35#include <linux/export.h>
36#include <linux/skbuff.h>
37#include <linux/list.h>
38#include <linux/errqueue.h>
36 39
37#include "rds.h" 40#include "rds.h"
38 41
@@ -53,29 +56,95 @@ void rds_message_addref(struct rds_message *rm)
53} 56}
54EXPORT_SYMBOL_GPL(rds_message_addref); 57EXPORT_SYMBOL_GPL(rds_message_addref);
55 58
59static inline bool skb_zcookie_add(struct sk_buff *skb, u32 cookie)
60{
61 struct sock_exterr_skb *serr = SKB_EXT_ERR(skb);
62 int ncookies;
63 u32 *ptr;
64
65 if (serr->ee.ee_origin != SO_EE_ORIGIN_ZCOOKIE)
66 return false;
67 ncookies = serr->ee.ee_data;
68 if (ncookies == SO_EE_ORIGIN_MAX_ZCOOKIES)
69 return false;
70 ptr = skb_put(skb, sizeof(u32));
71 *ptr = cookie;
72 serr->ee.ee_data = ++ncookies;
73 return true;
74}
75
76static void rds_rm_zerocopy_callback(struct rds_sock *rs,
77 struct rds_znotifier *znotif)
78{
79 struct sock *sk = rds_rs_to_sk(rs);
80 struct sk_buff *skb, *tail;
81 struct sock_exterr_skb *serr;
82 unsigned long flags;
83 struct sk_buff_head *q;
84 u32 cookie = znotif->z_cookie;
85
86 q = &sk->sk_error_queue;
87 spin_lock_irqsave(&q->lock, flags);
88 tail = skb_peek_tail(q);
89
90 if (tail && skb_zcookie_add(tail, cookie)) {
91 spin_unlock_irqrestore(&q->lock, flags);
92 mm_unaccount_pinned_pages(&znotif->z_mmp);
93 consume_skb(rds_skb_from_znotifier(znotif));
94 sk->sk_error_report(sk);
95 return;
96 }
97
98 skb = rds_skb_from_znotifier(znotif);
99 serr = SKB_EXT_ERR(skb);
100 memset(&serr->ee, 0, sizeof(serr->ee));
101 serr->ee.ee_errno = 0;
102 serr->ee.ee_origin = SO_EE_ORIGIN_ZCOOKIE;
103 serr->ee.ee_info = 0;
104 WARN_ON(!skb_zcookie_add(skb, cookie));
105
106 __skb_queue_tail(q, skb);
107
108 spin_unlock_irqrestore(&q->lock, flags);
109 sk->sk_error_report(sk);
110
111 mm_unaccount_pinned_pages(&znotif->z_mmp);
112}
113
56/* 114/*
57 * This relies on dma_map_sg() not touching sg[].page during merging. 115 * This relies on dma_map_sg() not touching sg[].page during merging.
58 */ 116 */
59static void rds_message_purge(struct rds_message *rm) 117static void rds_message_purge(struct rds_message *rm)
60{ 118{
61 unsigned long i, flags; 119 unsigned long i, flags;
120 bool zcopy = false;
62 121
63 if (unlikely(test_bit(RDS_MSG_PAGEVEC, &rm->m_flags))) 122 if (unlikely(test_bit(RDS_MSG_PAGEVEC, &rm->m_flags)))
64 return; 123 return;
65 124
66 for (i = 0; i < rm->data.op_nents; i++) {
67 rdsdebug("putting data page %p\n", (void *)sg_page(&rm->data.op_sg[i]));
68 /* XXX will have to put_page for page refs */
69 __free_page(sg_page(&rm->data.op_sg[i]));
70 }
71 rm->data.op_nents = 0;
72 spin_lock_irqsave(&rm->m_rs_lock, flags); 125 spin_lock_irqsave(&rm->m_rs_lock, flags);
73 if (rm->m_rs) { 126 if (rm->m_rs) {
74 sock_put(rds_rs_to_sk(rm->m_rs)); 127 struct rds_sock *rs = rm->m_rs;
128
129 if (rm->data.op_mmp_znotifier) {
130 zcopy = true;
131 rds_rm_zerocopy_callback(rs, rm->data.op_mmp_znotifier);
132 rm->data.op_mmp_znotifier = NULL;
133 }
134 sock_put(rds_rs_to_sk(rs));
75 rm->m_rs = NULL; 135 rm->m_rs = NULL;
76 } 136 }
77 spin_unlock_irqrestore(&rm->m_rs_lock, flags); 137 spin_unlock_irqrestore(&rm->m_rs_lock, flags);
78 138
139 for (i = 0; i < rm->data.op_nents; i++) {
140 /* XXX will have to put_page for page refs */
141 if (!zcopy)
142 __free_page(sg_page(&rm->data.op_sg[i]));
143 else
144 put_page(sg_page(&rm->data.op_sg[i]));
145 }
146 rm->data.op_nents = 0;
147
79 if (rm->rdma.op_active) 148 if (rm->rdma.op_active)
80 rds_rdma_free_op(&rm->rdma); 149 rds_rdma_free_op(&rm->rdma);
81 if (rm->rdma.op_rdma_mr) 150 if (rm->rdma.op_rdma_mr)
diff --git a/net/rds/rds.h b/net/rds/rds.h
index 7301b9b01890..24576bc4a5e9 100644
--- a/net/rds/rds.h
+++ b/net/rds/rds.h
@@ -356,6 +356,19 @@ static inline u32 rds_rdma_cookie_offset(rds_rdma_cookie_t cookie)
356#define RDS_MSG_PAGEVEC 7 356#define RDS_MSG_PAGEVEC 7
357#define RDS_MSG_FLUSH 8 357#define RDS_MSG_FLUSH 8
358 358
359struct rds_znotifier {
360 struct list_head z_list;
361 struct mmpin z_mmp;
362 u32 z_cookie;
363};
364
365#define RDS_ZCOPY_SKB(__skb) ((struct rds_znotifier *)&((__skb)->cb[0]))
366
367static inline struct sk_buff *rds_skb_from_znotifier(struct rds_znotifier *z)
368{
369 return container_of((void *)z, struct sk_buff, cb);
370}
371
359struct rds_message { 372struct rds_message {
360 refcount_t m_refcount; 373 refcount_t m_refcount;
361 struct list_head m_sock_item; 374 struct list_head m_sock_item;
@@ -436,6 +449,7 @@ struct rds_message {
436 unsigned int op_count; 449 unsigned int op_count;
437 unsigned int op_dmasg; 450 unsigned int op_dmasg;
438 unsigned int op_dmaoff; 451 unsigned int op_dmaoff;
452 struct rds_znotifier *op_mmp_znotifier;
439 struct scatterlist *op_sg; 453 struct scatterlist *op_sg;
440 } data; 454 } data;
441 }; 455 };
diff --git a/net/rds/recv.c b/net/rds/recv.c
index b25bcfe411ca..b080961464df 100644
--- a/net/rds/recv.c
+++ b/net/rds/recv.c
@@ -594,6 +594,8 @@ int rds_recvmsg(struct socket *sock, struct msghdr *msg, size_t size,
594 594
595 if (msg_flags & MSG_OOB) 595 if (msg_flags & MSG_OOB)
596 goto out; 596 goto out;
597 if (msg_flags & MSG_ERRQUEUE)
598 return sock_recv_errqueue(sk, msg, size, SOL_IP, IP_RECVERR);
597 599
598 while (1) { 600 while (1) {
599 /* If there are pending notifications, do those - and nothing else */ 601 /* If there are pending notifications, do those - and nothing else */