aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSowmini Varadhan <sowmini.varadhan@oracle.com>2018-02-15 13:49:36 -0500
committerDavid S. Miller <davem@davemloft.net>2018-02-16 16:04:17 -0500
commit0cebaccef3acbdfbc2d85880a2efb765d2f4e2e3 (patch)
treea8e355ad69ddb3de228816062631f9e4d356b8cc
parent01883eda72bd3f0a6c81447e4f223de14033fd9d (diff)
rds: zerocopy Tx support.
If the MSG_ZEROCOPY flag is specified with rds_sendmsg(), and, if the SO_ZEROCOPY socket option has been set on the PF_RDS socket, application pages sent down with rds_sendmsg() are pinned. The pinning uses the accounting infrastructure added by Commit a91dbff551a6 ("sock: ulimit on MSG_ZEROCOPY pages") The payload bytes in the message may not be modified for the duration that the message has been pinned. A multi-threaded application using this infrastructure may thus need to be notified about send-completion so that it can free/reuse the buffers passed to rds_sendmsg(). Notification of send-completion will identify each message-buffer by a cookie that the application must specify as ancillary data to rds_sendmsg(). The ancillary data in this case has cmsg_level == SOL_RDS and cmsg_type == RDS_CMSG_ZCOPY_COOKIE. Signed-off-by: Sowmini Varadhan <sowmini.varadhan@oracle.com> Acked-by: Santosh Shilimkar <santosh.shilimkar@oracle.com> Acked-by: Willem de Bruijn <willemb@google.com> Signed-off-by: David S. Miller <davem@davemloft.net>
-rw-r--r--include/uapi/linux/rds.h1
-rw-r--r--net/rds/message.c51
-rw-r--r--net/rds/rds.h3
-rw-r--r--net/rds/send.c44
4 files changed, 91 insertions, 8 deletions
diff --git a/include/uapi/linux/rds.h b/include/uapi/linux/rds.h
index e71d4491f225..12e3bca32cad 100644
--- a/include/uapi/linux/rds.h
+++ b/include/uapi/linux/rds.h
@@ -103,6 +103,7 @@
103#define RDS_CMSG_MASKED_ATOMIC_FADD 8 103#define RDS_CMSG_MASKED_ATOMIC_FADD 8
104#define RDS_CMSG_MASKED_ATOMIC_CSWP 9 104#define RDS_CMSG_MASKED_ATOMIC_CSWP 9
105#define RDS_CMSG_RXPATH_LATENCY 11 105#define RDS_CMSG_RXPATH_LATENCY 11
106#define RDS_CMSG_ZCOPY_COOKIE 12
106 107
107#define RDS_INFO_FIRST 10000 108#define RDS_INFO_FIRST 10000
108#define RDS_INFO_COUNTERS 10000 109#define RDS_INFO_COUNTERS 10000
diff --git a/net/rds/message.c b/net/rds/message.c
index bf1a656b198a..651834513481 100644
--- a/net/rds/message.c
+++ b/net/rds/message.c
@@ -341,12 +341,14 @@ struct rds_message *rds_message_map_pages(unsigned long *page_addrs, unsigned in
341 return rm; 341 return rm;
342} 342}
343 343
344int rds_message_copy_from_user(struct rds_message *rm, struct iov_iter *from) 344int rds_message_copy_from_user(struct rds_message *rm, struct iov_iter *from,
345 bool zcopy)
345{ 346{
346 unsigned long to_copy, nbytes; 347 unsigned long to_copy, nbytes;
347 unsigned long sg_off; 348 unsigned long sg_off;
348 struct scatterlist *sg; 349 struct scatterlist *sg;
349 int ret = 0; 350 int ret = 0;
351 int length = iov_iter_count(from);
350 352
351 rm->m_inc.i_hdr.h_len = cpu_to_be32(iov_iter_count(from)); 353 rm->m_inc.i_hdr.h_len = cpu_to_be32(iov_iter_count(from));
352 354
@@ -356,6 +358,53 @@ int rds_message_copy_from_user(struct rds_message *rm, struct iov_iter *from)
356 sg = rm->data.op_sg; 358 sg = rm->data.op_sg;
357 sg_off = 0; /* Dear gcc, sg->page will be null from kzalloc. */ 359 sg_off = 0; /* Dear gcc, sg->page will be null from kzalloc. */
358 360
361 if (zcopy) {
362 int total_copied = 0;
363 struct sk_buff *skb;
364
365 skb = alloc_skb(SO_EE_ORIGIN_MAX_ZCOOKIES * sizeof(u32),
366 GFP_KERNEL);
367 if (!skb)
368 return -ENOMEM;
369 rm->data.op_mmp_znotifier = RDS_ZCOPY_SKB(skb);
370 if (mm_account_pinned_pages(&rm->data.op_mmp_znotifier->z_mmp,
371 length)) {
372 ret = -ENOMEM;
373 goto err;
374 }
375 while (iov_iter_count(from)) {
376 struct page *pages;
377 size_t start;
378 ssize_t copied;
379
380 copied = iov_iter_get_pages(from, &pages, PAGE_SIZE,
381 1, &start);
382 if (copied < 0) {
383 struct mmpin *mmp;
384 int i;
385
386 for (i = 0; i < rm->data.op_nents; i++)
387 put_page(sg_page(&rm->data.op_sg[i]));
388 mmp = &rm->data.op_mmp_znotifier->z_mmp;
389 mm_unaccount_pinned_pages(mmp);
390 ret = -EFAULT;
391 goto err;
392 }
393 total_copied += copied;
394 iov_iter_advance(from, copied);
395 length -= copied;
396 sg_set_page(sg, pages, copied, start);
397 rm->data.op_nents++;
398 sg++;
399 }
400 WARN_ON_ONCE(length != 0);
401 return ret;
402err:
403 consume_skb(skb);
404 rm->data.op_mmp_znotifier = NULL;
405 return ret;
406 } /* zcopy */
407
359 while (iov_iter_count(from)) { 408 while (iov_iter_count(from)) {
360 if (!sg_page(sg)) { 409 if (!sg_page(sg)) {
361 ret = rds_page_remainder_alloc(sg, iov_iter_count(from), 410 ret = rds_page_remainder_alloc(sg, iov_iter_count(from),
diff --git a/net/rds/rds.h b/net/rds/rds.h
index 24576bc4a5e9..31cd38852050 100644
--- a/net/rds/rds.h
+++ b/net/rds/rds.h
@@ -785,7 +785,8 @@ rds_conn_connecting(struct rds_connection *conn)
785/* message.c */ 785/* message.c */
786struct rds_message *rds_message_alloc(unsigned int nents, gfp_t gfp); 786struct rds_message *rds_message_alloc(unsigned int nents, gfp_t gfp);
787struct scatterlist *rds_message_alloc_sgs(struct rds_message *rm, int nents); 787struct scatterlist *rds_message_alloc_sgs(struct rds_message *rm, int nents);
788int rds_message_copy_from_user(struct rds_message *rm, struct iov_iter *from); 788int rds_message_copy_from_user(struct rds_message *rm, struct iov_iter *from,
789 bool zcopy);
789struct rds_message *rds_message_map_pages(unsigned long *page_addrs, unsigned int total_len); 790struct rds_message *rds_message_map_pages(unsigned long *page_addrs, unsigned int total_len);
790void rds_message_populate_header(struct rds_header *hdr, __be16 sport, 791void rds_message_populate_header(struct rds_header *hdr, __be16 sport,
791 __be16 dport, u64 seq); 792 __be16 dport, u64 seq);
diff --git a/net/rds/send.c b/net/rds/send.c
index e8f3ff471b15..028ab598ac1b 100644
--- a/net/rds/send.c
+++ b/net/rds/send.c
@@ -875,12 +875,13 @@ out:
875 * rds_message is getting to be quite complicated, and we'd like to allocate 875 * rds_message is getting to be quite complicated, and we'd like to allocate
876 * it all in one go. This figures out how big it needs to be up front. 876 * it all in one go. This figures out how big it needs to be up front.
877 */ 877 */
878static int rds_rm_size(struct msghdr *msg, int data_len) 878static int rds_rm_size(struct msghdr *msg, int num_sgs)
879{ 879{
880 struct cmsghdr *cmsg; 880 struct cmsghdr *cmsg;
881 int size = 0; 881 int size = 0;
882 int cmsg_groups = 0; 882 int cmsg_groups = 0;
883 int retval; 883 int retval;
884 bool zcopy_cookie = false;
884 885
885 for_each_cmsghdr(cmsg, msg) { 886 for_each_cmsghdr(cmsg, msg) {
886 if (!CMSG_OK(msg, cmsg)) 887 if (!CMSG_OK(msg, cmsg))
@@ -899,6 +900,8 @@ static int rds_rm_size(struct msghdr *msg, int data_len)
899 900
900 break; 901 break;
901 902
903 case RDS_CMSG_ZCOPY_COOKIE:
904 zcopy_cookie = true;
902 case RDS_CMSG_RDMA_DEST: 905 case RDS_CMSG_RDMA_DEST:
903 case RDS_CMSG_RDMA_MAP: 906 case RDS_CMSG_RDMA_MAP:
904 cmsg_groups |= 2; 907 cmsg_groups |= 2;
@@ -919,7 +922,10 @@ static int rds_rm_size(struct msghdr *msg, int data_len)
919 922
920 } 923 }
921 924
922 size += ceil(data_len, PAGE_SIZE) * sizeof(struct scatterlist); 925 if ((msg->msg_flags & MSG_ZEROCOPY) && !zcopy_cookie)
926 return -EINVAL;
927
928 size += num_sgs * sizeof(struct scatterlist);
923 929
924 /* Ensure (DEST, MAP) are never used with (ARGS, ATOMIC) */ 930 /* Ensure (DEST, MAP) are never used with (ARGS, ATOMIC) */
925 if (cmsg_groups == 3) 931 if (cmsg_groups == 3)
@@ -928,6 +934,18 @@ static int rds_rm_size(struct msghdr *msg, int data_len)
928 return size; 934 return size;
929} 935}
930 936
937static int rds_cmsg_zcopy(struct rds_sock *rs, struct rds_message *rm,
938 struct cmsghdr *cmsg)
939{
940 u32 *cookie;
941
942 if (cmsg->cmsg_len < CMSG_LEN(sizeof(*cookie)))
943 return -EINVAL;
944 cookie = CMSG_DATA(cmsg);
945 rm->data.op_mmp_znotifier->z_cookie = *cookie;
946 return 0;
947}
948
931static int rds_cmsg_send(struct rds_sock *rs, struct rds_message *rm, 949static int rds_cmsg_send(struct rds_sock *rs, struct rds_message *rm,
932 struct msghdr *msg, int *allocated_mr) 950 struct msghdr *msg, int *allocated_mr)
933{ 951{
@@ -970,6 +988,10 @@ static int rds_cmsg_send(struct rds_sock *rs, struct rds_message *rm,
970 ret = rds_cmsg_atomic(rs, rm, cmsg); 988 ret = rds_cmsg_atomic(rs, rm, cmsg);
971 break; 989 break;
972 990
991 case RDS_CMSG_ZCOPY_COOKIE:
992 ret = rds_cmsg_zcopy(rs, rm, cmsg);
993 break;
994
973 default: 995 default:
974 return -EINVAL; 996 return -EINVAL;
975 } 997 }
@@ -1040,10 +1062,13 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
1040 long timeo = sock_sndtimeo(sk, nonblock); 1062 long timeo = sock_sndtimeo(sk, nonblock);
1041 struct rds_conn_path *cpath; 1063 struct rds_conn_path *cpath;
1042 size_t total_payload_len = payload_len, rdma_payload_len = 0; 1064 size_t total_payload_len = payload_len, rdma_payload_len = 0;
1065 bool zcopy = ((msg->msg_flags & MSG_ZEROCOPY) &&
1066 sock_flag(rds_rs_to_sk(rs), SOCK_ZEROCOPY));
1067 int num_sgs = ceil(payload_len, PAGE_SIZE);
1043 1068
1044 /* Mirror Linux UDP mirror of BSD error message compatibility */ 1069 /* Mirror Linux UDP mirror of BSD error message compatibility */
1045 /* XXX: Perhaps MSG_MORE someday */ 1070 /* XXX: Perhaps MSG_MORE someday */
1046 if (msg->msg_flags & ~(MSG_DONTWAIT | MSG_CMSG_COMPAT)) { 1071 if (msg->msg_flags & ~(MSG_DONTWAIT | MSG_CMSG_COMPAT | MSG_ZEROCOPY)) {
1047 ret = -EOPNOTSUPP; 1072 ret = -EOPNOTSUPP;
1048 goto out; 1073 goto out;
1049 } 1074 }
@@ -1087,8 +1112,15 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
1087 goto out; 1112 goto out;
1088 } 1113 }
1089 1114
1115 if (zcopy) {
1116 if (rs->rs_transport->t_type != RDS_TRANS_TCP) {
1117 ret = -EOPNOTSUPP;
1118 goto out;
1119 }
1120 num_sgs = iov_iter_npages(&msg->msg_iter, INT_MAX);
1121 }
1090 /* size of rm including all sgs */ 1122 /* size of rm including all sgs */
1091 ret = rds_rm_size(msg, payload_len); 1123 ret = rds_rm_size(msg, num_sgs);
1092 if (ret < 0) 1124 if (ret < 0)
1093 goto out; 1125 goto out;
1094 1126
@@ -1100,12 +1132,12 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
1100 1132
1101 /* Attach data to the rm */ 1133 /* Attach data to the rm */
1102 if (payload_len) { 1134 if (payload_len) {
1103 rm->data.op_sg = rds_message_alloc_sgs(rm, ceil(payload_len, PAGE_SIZE)); 1135 rm->data.op_sg = rds_message_alloc_sgs(rm, num_sgs);
1104 if (!rm->data.op_sg) { 1136 if (!rm->data.op_sg) {
1105 ret = -ENOMEM; 1137 ret = -ENOMEM;
1106 goto out; 1138 goto out;
1107 } 1139 }
1108 ret = rds_message_copy_from_user(rm, &msg->msg_iter); 1140 ret = rds_message_copy_from_user(rm, &msg->msg_iter, zcopy);
1109 if (ret) 1141 if (ret)
1110 goto out; 1142 goto out;
1111 } 1143 }