aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc/socket.c
diff options
context:
space:
mode:
authorJon Paul Maloy <jon.maloy@ericsson.com>2017-01-03 10:55:11 -0500
committerDavid S. Miller <davem@davemloft.net>2017-01-03 11:13:05 -0500
commit365ad353c2564bba8835290061308ba825166b3a (patch)
tree06439f0724f3df34e29e8d3fb32432894e6d8ff0 /net/tipc/socket.c
parent4d8642d896c53966d32d5e343c3620813dd0e7c8 (diff)
tipc: reduce risk of user starvation during link congestion
The socket code currently handles link congestion by either blocking and trying to send again when the congestion has abated, or just returning to the user with -EAGAIN and let him re-try later. This mechanism is prone to starvation, because the wakeup algorithm is non-atomic. During the time the link issues a wakeup signal, until the socket wakes up and re-attempts sending, other senders may have come in between and occupied the free buffer space in the link. This in turn may lead to a socket having to make many send attempts before it is successful. In extremely loaded systems we have observed latency times of several seconds before a low-priority socket is able to send out a message. In this commit, we simplify this mechanism and reduce the risk of the described scenario happening. When a message is attempted sent via a congested link, we now let it be added to the link's backlog queue anyway, thus permitting an oversubscription of one message per source socket. We still create a wakeup item and return an error code, hence instructing the sender to block or stop sending. Only when enough space has been freed up in the link's backlog queue do we issue a wakeup event that allows the sender to continue with the next message, if any. The fact that a socket now can consider a message sent even when the link returns a congestion code means that the sending socket code can be simplified. Also, since this is a good opportunity to get rid of the obsolete 'mtu change' condition in the three socket send functions, we now choose to refactor those functions completely. Signed-off-by: Parthasarathy Bhuvaragan <parthasarathy.bhuvaragan@ericsson.com> Acked-by: Ying Xue <ying.xue@windriver.com> Signed-off-by: Jon Maloy <jon.maloy@ericsson.com> Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net/tipc/socket.c')
-rw-r--r--net/tipc/socket.c347
1 files changed, 154 insertions, 193 deletions
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index fae6a55ef1b0..d2f353934f82 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -67,12 +67,14 @@ enum {
67 * @max_pkt: maximum packet size "hint" used when building messages sent by port 67 * @max_pkt: maximum packet size "hint" used when building messages sent by port
68 * @portid: unique port identity in TIPC socket hash table 68 * @portid: unique port identity in TIPC socket hash table
69 * @phdr: preformatted message header used when sending messages 69 * @phdr: preformatted message header used when sending messages
70 * #cong_links: list of congested links
70 * @publications: list of publications for port 71 * @publications: list of publications for port
72 * @blocking_link: address of the congested link we are currently sleeping on
71 * @pub_count: total # of publications port has made during its lifetime 73 * @pub_count: total # of publications port has made during its lifetime
72 * @probing_state: 74 * @probing_state:
73 * @conn_timeout: the time we can wait for an unresponded setup request 75 * @conn_timeout: the time we can wait for an unresponded setup request
74 * @dupl_rcvcnt: number of bytes counted twice, in both backlog and rcv queue 76 * @dupl_rcvcnt: number of bytes counted twice, in both backlog and rcv queue
75 * @link_cong: non-zero if owner must sleep because of link congestion 77 * @cong_link_cnt: number of congested links
76 * @sent_unacked: # messages sent by socket, and not yet acked by peer 78 * @sent_unacked: # messages sent by socket, and not yet acked by peer
77 * @rcv_unacked: # messages read by user, but not yet acked back to peer 79 * @rcv_unacked: # messages read by user, but not yet acked back to peer
78 * @peer: 'connected' peer for dgram/rdm 80 * @peer: 'connected' peer for dgram/rdm
@@ -87,13 +89,13 @@ struct tipc_sock {
87 u32 max_pkt; 89 u32 max_pkt;
88 u32 portid; 90 u32 portid;
89 struct tipc_msg phdr; 91 struct tipc_msg phdr;
90 struct list_head sock_list; 92 struct list_head cong_links;
91 struct list_head publications; 93 struct list_head publications;
92 u32 pub_count; 94 u32 pub_count;
93 uint conn_timeout; 95 uint conn_timeout;
94 atomic_t dupl_rcvcnt; 96 atomic_t dupl_rcvcnt;
95 bool probe_unacked; 97 bool probe_unacked;
96 bool link_cong; 98 u16 cong_link_cnt;
97 u16 snt_unacked; 99 u16 snt_unacked;
98 u16 snd_win; 100 u16 snd_win;
99 u16 peer_caps; 101 u16 peer_caps;
@@ -118,8 +120,7 @@ static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope,
118static struct tipc_sock *tipc_sk_lookup(struct net *net, u32 portid); 120static struct tipc_sock *tipc_sk_lookup(struct net *net, u32 portid);
119static int tipc_sk_insert(struct tipc_sock *tsk); 121static int tipc_sk_insert(struct tipc_sock *tsk);
120static void tipc_sk_remove(struct tipc_sock *tsk); 122static void tipc_sk_remove(struct tipc_sock *tsk);
121static int __tipc_send_stream(struct socket *sock, struct msghdr *m, 123static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dsz);
122 size_t dsz);
123static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dsz); 124static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dsz);
124 125
125static const struct proto_ops packet_ops; 126static const struct proto_ops packet_ops;
@@ -424,6 +425,7 @@ static int tipc_sk_create(struct net *net, struct socket *sock,
424 tsk = tipc_sk(sk); 425 tsk = tipc_sk(sk);
425 tsk->max_pkt = MAX_PKT_DEFAULT; 426 tsk->max_pkt = MAX_PKT_DEFAULT;
426 INIT_LIST_HEAD(&tsk->publications); 427 INIT_LIST_HEAD(&tsk->publications);
428 INIT_LIST_HEAD(&tsk->cong_links);
427 msg = &tsk->phdr; 429 msg = &tsk->phdr;
428 tn = net_generic(sock_net(sk), tipc_net_id); 430 tn = net_generic(sock_net(sk), tipc_net_id);
429 tipc_msg_init(tn->own_addr, msg, TIPC_LOW_IMPORTANCE, TIPC_NAMED_MSG, 431 tipc_msg_init(tn->own_addr, msg, TIPC_LOW_IMPORTANCE, TIPC_NAMED_MSG,
@@ -474,9 +476,14 @@ static void __tipc_shutdown(struct socket *sock, int error)
474 struct sock *sk = sock->sk; 476 struct sock *sk = sock->sk;
475 struct tipc_sock *tsk = tipc_sk(sk); 477 struct tipc_sock *tsk = tipc_sk(sk);
476 struct net *net = sock_net(sk); 478 struct net *net = sock_net(sk);
479 long timeout = CONN_TIMEOUT_DEFAULT;
477 u32 dnode = tsk_peer_node(tsk); 480 u32 dnode = tsk_peer_node(tsk);
478 struct sk_buff *skb; 481 struct sk_buff *skb;
479 482
483 /* Avoid that hi-prio shutdown msgs bypass msgs in link wakeup queue */
484 tipc_wait_for_cond(sock, &timeout, (!tsk->cong_link_cnt &&
485 !tsk_conn_cong(tsk)));
486
480 /* Reject all unreceived messages, except on an active connection 487 /* Reject all unreceived messages, except on an active connection
481 * (which disconnects locally & sends a 'FIN+' to peer). 488 * (which disconnects locally & sends a 'FIN+' to peer).
482 */ 489 */
@@ -547,7 +554,8 @@ static int tipc_release(struct socket *sock)
547 554
548 /* Reject any messages that accumulated in backlog queue */ 555 /* Reject any messages that accumulated in backlog queue */
549 release_sock(sk); 556 release_sock(sk);
550 557 u32_list_purge(&tsk->cong_links);
558 tsk->cong_link_cnt = 0;
551 call_rcu(&tsk->rcu, tipc_sk_callback); 559 call_rcu(&tsk->rcu, tipc_sk_callback);
552 sock->sk = NULL; 560 sock->sk = NULL;
553 561
@@ -690,7 +698,7 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock,
690 698
691 switch (sk->sk_state) { 699 switch (sk->sk_state) {
692 case TIPC_ESTABLISHED: 700 case TIPC_ESTABLISHED:
693 if (!tsk->link_cong && !tsk_conn_cong(tsk)) 701 if (!tsk->cong_link_cnt && !tsk_conn_cong(tsk))
694 mask |= POLLOUT; 702 mask |= POLLOUT;
695 /* fall thru' */ 703 /* fall thru' */
696 case TIPC_LISTEN: 704 case TIPC_LISTEN:
@@ -699,7 +707,7 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock,
699 mask |= (POLLIN | POLLRDNORM); 707 mask |= (POLLIN | POLLRDNORM);
700 break; 708 break;
701 case TIPC_OPEN: 709 case TIPC_OPEN:
702 if (!tsk->link_cong) 710 if (!tsk->cong_link_cnt)
703 mask |= POLLOUT; 711 mask |= POLLOUT;
704 if (tipc_sk_type_connectionless(sk) && 712 if (tipc_sk_type_connectionless(sk) &&
705 (!skb_queue_empty(&sk->sk_receive_queue))) 713 (!skb_queue_empty(&sk->sk_receive_queue)))
@@ -718,63 +726,48 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock,
718 * @sock: socket structure 726 * @sock: socket structure
719 * @seq: destination address 727 * @seq: destination address
720 * @msg: message to send 728 * @msg: message to send
721 * @dsz: total length of message data 729 * @dlen: length of data to send
722 * @timeo: timeout to wait for wakeup 730 * @timeout: timeout to wait for wakeup
723 * 731 *
724 * Called from function tipc_sendmsg(), which has done all sanity checks 732 * Called from function tipc_sendmsg(), which has done all sanity checks
725 * Returns the number of bytes sent on success, or errno 733 * Returns the number of bytes sent on success, or errno
726 */ 734 */
727static int tipc_sendmcast(struct socket *sock, struct tipc_name_seq *seq, 735static int tipc_sendmcast(struct socket *sock, struct tipc_name_seq *seq,
728 struct msghdr *msg, size_t dsz, long timeo) 736 struct msghdr *msg, size_t dlen, long timeout)
729{ 737{
730 struct sock *sk = sock->sk; 738 struct sock *sk = sock->sk;
731 struct tipc_sock *tsk = tipc_sk(sk); 739 struct tipc_sock *tsk = tipc_sk(sk);
740 struct tipc_msg *hdr = &tsk->phdr;
732 struct net *net = sock_net(sk); 741 struct net *net = sock_net(sk);
733 struct tipc_msg *mhdr = &tsk->phdr; 742 int mtu = tipc_bcast_get_mtu(net);
734 struct sk_buff_head pktchain; 743 struct sk_buff_head pkts;
735 struct iov_iter save = msg->msg_iter;
736 uint mtu;
737 int rc; 744 int rc;
738 745
739 if (!timeo && tsk->link_cong) 746 rc = tipc_wait_for_cond(sock, &timeout, !tsk->cong_link_cnt);
740 return -ELINKCONG; 747 if (unlikely(rc))
741 748 return rc;
742 msg_set_type(mhdr, TIPC_MCAST_MSG);
743 msg_set_lookup_scope(mhdr, TIPC_CLUSTER_SCOPE);
744 msg_set_destport(mhdr, 0);
745 msg_set_destnode(mhdr, 0);
746 msg_set_nametype(mhdr, seq->type);
747 msg_set_namelower(mhdr, seq->lower);
748 msg_set_nameupper(mhdr, seq->upper);
749 msg_set_hdr_sz(mhdr, MCAST_H_SIZE);
750
751 skb_queue_head_init(&pktchain);
752 749
753new_mtu: 750 msg_set_type(hdr, TIPC_MCAST_MSG);
754 mtu = tipc_bcast_get_mtu(net); 751 msg_set_lookup_scope(hdr, TIPC_CLUSTER_SCOPE);
755 rc = tipc_msg_build(mhdr, msg, 0, dsz, mtu, &pktchain); 752 msg_set_destport(hdr, 0);
756 if (unlikely(rc < 0)) 753 msg_set_destnode(hdr, 0);
754 msg_set_nametype(hdr, seq->type);
755 msg_set_namelower(hdr, seq->lower);
756 msg_set_nameupper(hdr, seq->upper);
757 msg_set_hdr_sz(hdr, MCAST_H_SIZE);
758
759 skb_queue_head_init(&pkts);
760 rc = tipc_msg_build(hdr, msg, 0, dlen, mtu, &pkts);
761 if (unlikely(rc != dlen))
757 return rc; 762 return rc;
758 763
759 do { 764 rc = tipc_bcast_xmit(net, &pkts);
760 rc = tipc_bcast_xmit(net, &pktchain); 765 if (unlikely(rc == -ELINKCONG)) {
761 if (likely(!rc)) 766 tsk->cong_link_cnt = 1;
762 return dsz; 767 rc = 0;
763 768 }
764 if (rc == -ELINKCONG) { 769
765 tsk->link_cong = 1; 770 return rc ? rc : dlen;
766 rc = tipc_wait_for_cond(sock, &timeo, !tsk->link_cong);
767 if (!rc)
768 continue;
769 }
770 __skb_queue_purge(&pktchain);
771 if (rc == -EMSGSIZE) {
772 msg->msg_iter = save;
773 goto new_mtu;
774 }
775 break;
776 } while (1);
777 return rc;
778} 771}
779 772
780/** 773/**
@@ -898,35 +891,38 @@ static int tipc_sendmsg(struct socket *sock,
898 return ret; 891 return ret;
899} 892}
900 893
901static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dsz) 894static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen)
902{ 895{
903 DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
904 struct sock *sk = sock->sk; 896 struct sock *sk = sock->sk;
905 struct tipc_sock *tsk = tipc_sk(sk);
906 struct net *net = sock_net(sk); 897 struct net *net = sock_net(sk);
907 struct tipc_msg *mhdr = &tsk->phdr; 898 struct tipc_sock *tsk = tipc_sk(sk);
908 u32 dnode, dport; 899 DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
909 struct sk_buff_head pktchain; 900 long timeout = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
910 bool is_connectionless = tipc_sk_type_connectionless(sk); 901 struct list_head *clinks = &tsk->cong_links;
911 struct sk_buff *skb; 902 bool syn = !tipc_sk_type_connectionless(sk);
903 struct tipc_msg *hdr = &tsk->phdr;
912 struct tipc_name_seq *seq; 904 struct tipc_name_seq *seq;
913 struct iov_iter save; 905 struct sk_buff_head pkts;
914 u32 mtu; 906 u32 type, inst, domain;
915 long timeo; 907 u32 dnode, dport;
916 int rc; 908 int mtu, rc;
917 909
918 if (dsz > TIPC_MAX_USER_MSG_SIZE) 910 if (unlikely(dlen > TIPC_MAX_USER_MSG_SIZE))
919 return -EMSGSIZE; 911 return -EMSGSIZE;
912
920 if (unlikely(!dest)) { 913 if (unlikely(!dest)) {
921 if (is_connectionless && tsk->peer.family == AF_TIPC) 914 dest = &tsk->peer;
922 dest = &tsk->peer; 915 if (!syn || dest->family != AF_TIPC)
923 else
924 return -EDESTADDRREQ; 916 return -EDESTADDRREQ;
925 } else if (unlikely(m->msg_namelen < sizeof(*dest)) ||
926 dest->family != AF_TIPC) {
927 return -EINVAL;
928 } 917 }
929 if (!is_connectionless) { 918
919 if (unlikely(m->msg_namelen < sizeof(*dest)))
920 return -EINVAL;
921
922 if (unlikely(dest->family != AF_TIPC))
923 return -EINVAL;
924
925 if (unlikely(syn)) {
930 if (sk->sk_state == TIPC_LISTEN) 926 if (sk->sk_state == TIPC_LISTEN)
931 return -EPIPE; 927 return -EPIPE;
932 if (sk->sk_state != TIPC_OPEN) 928 if (sk->sk_state != TIPC_OPEN)
@@ -938,72 +934,62 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dsz)
938 tsk->conn_instance = dest->addr.name.name.instance; 934 tsk->conn_instance = dest->addr.name.name.instance;
939 } 935 }
940 } 936 }
941 seq = &dest->addr.nameseq;
942 timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
943 937
944 if (dest->addrtype == TIPC_ADDR_MCAST) { 938 seq = &dest->addr.nameseq;
945 return tipc_sendmcast(sock, seq, m, dsz, timeo); 939 if (dest->addrtype == TIPC_ADDR_MCAST)
946 } else if (dest->addrtype == TIPC_ADDR_NAME) { 940 return tipc_sendmcast(sock, seq, m, dlen, timeout);
947 u32 type = dest->addr.name.name.type;
948 u32 inst = dest->addr.name.name.instance;
949 u32 domain = dest->addr.name.domain;
950 941
942 if (dest->addrtype == TIPC_ADDR_NAME) {
943 type = dest->addr.name.name.type;
944 inst = dest->addr.name.name.instance;
945 domain = dest->addr.name.domain;
951 dnode = domain; 946 dnode = domain;
952 msg_set_type(mhdr, TIPC_NAMED_MSG); 947 msg_set_type(hdr, TIPC_NAMED_MSG);
953 msg_set_hdr_sz(mhdr, NAMED_H_SIZE); 948 msg_set_hdr_sz(hdr, NAMED_H_SIZE);
954 msg_set_nametype(mhdr, type); 949 msg_set_nametype(hdr, type);
955 msg_set_nameinst(mhdr, inst); 950 msg_set_nameinst(hdr, inst);
956 msg_set_lookup_scope(mhdr, tipc_addr_scope(domain)); 951 msg_set_lookup_scope(hdr, tipc_addr_scope(domain));
957 dport = tipc_nametbl_translate(net, type, inst, &dnode); 952 dport = tipc_nametbl_translate(net, type, inst, &dnode);
958 msg_set_destnode(mhdr, dnode); 953 msg_set_destnode(hdr, dnode);
959 msg_set_destport(mhdr, dport); 954 msg_set_destport(hdr, dport);
960 if (unlikely(!dport && !dnode)) 955 if (unlikely(!dport && !dnode))
961 return -EHOSTUNREACH; 956 return -EHOSTUNREACH;
957
962 } else if (dest->addrtype == TIPC_ADDR_ID) { 958 } else if (dest->addrtype == TIPC_ADDR_ID) {
963 dnode = dest->addr.id.node; 959 dnode = dest->addr.id.node;
964 msg_set_type(mhdr, TIPC_DIRECT_MSG); 960 msg_set_type(hdr, TIPC_DIRECT_MSG);
965 msg_set_lookup_scope(mhdr, 0); 961 msg_set_lookup_scope(hdr, 0);
966 msg_set_destnode(mhdr, dnode); 962 msg_set_destnode(hdr, dnode);
967 msg_set_destport(mhdr, dest->addr.id.ref); 963 msg_set_destport(hdr, dest->addr.id.ref);
968 msg_set_hdr_sz(mhdr, BASIC_H_SIZE); 964 msg_set_hdr_sz(hdr, BASIC_H_SIZE);
969 } 965 }
970 966
971 skb_queue_head_init(&pktchain); 967 /* Block or return if destination link is congested */
972 save = m->msg_iter; 968 rc = tipc_wait_for_cond(sock, &timeout, !u32_find(clinks, dnode));
973new_mtu: 969 if (unlikely(rc))
970 return rc;
971
972 skb_queue_head_init(&pkts);
974 mtu = tipc_node_get_mtu(net, dnode, tsk->portid); 973 mtu = tipc_node_get_mtu(net, dnode, tsk->portid);
975 rc = tipc_msg_build(mhdr, m, 0, dsz, mtu, &pktchain); 974 rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts);
976 if (rc < 0) 975 if (unlikely(rc != dlen))
977 return rc; 976 return rc;
978 977
979 do { 978 rc = tipc_node_xmit(net, &pkts, dnode, tsk->portid);
980 skb = skb_peek(&pktchain); 979 if (unlikely(rc == -ELINKCONG)) {
981 TIPC_SKB_CB(skb)->wakeup_pending = tsk->link_cong; 980 u32_push(clinks, dnode);
982 rc = tipc_node_xmit(net, &pktchain, dnode, tsk->portid); 981 tsk->cong_link_cnt++;
983 if (likely(!rc)) { 982 rc = 0;
984 if (!is_connectionless) 983 }
985 tipc_set_sk_state(sk, TIPC_CONNECTING);
986 return dsz;
987 }
988 if (rc == -ELINKCONG) {
989 tsk->link_cong = 1;
990 rc = tipc_wait_for_cond(sock, &timeo, !tsk->link_cong);
991 if (!rc)
992 continue;
993 }
994 __skb_queue_purge(&pktchain);
995 if (rc == -EMSGSIZE) {
996 m->msg_iter = save;
997 goto new_mtu;
998 }
999 break;
1000 } while (1);
1001 984
1002 return rc; 985 if (unlikely(syn && !rc))
986 tipc_set_sk_state(sk, TIPC_CONNECTING);
987
988 return rc ? rc : dlen;
1003} 989}
1004 990
1005/** 991/**
1006 * tipc_send_stream - send stream-oriented data 992 * tipc_sendstream - send stream-oriented data
1007 * @sock: socket structure 993 * @sock: socket structure
1008 * @m: data to send 994 * @m: data to send
1009 * @dsz: total length of data to be transmitted 995 * @dsz: total length of data to be transmitted
@@ -1013,97 +999,69 @@ new_mtu:
1013 * Returns the number of bytes sent on success (or partial success), 999 * Returns the number of bytes sent on success (or partial success),
1014 * or errno if no data sent 1000 * or errno if no data sent
1015 */ 1001 */
1016static int tipc_send_stream(struct socket *sock, struct msghdr *m, size_t dsz) 1002static int tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dsz)
1017{ 1003{
1018 struct sock *sk = sock->sk; 1004 struct sock *sk = sock->sk;
1019 int ret; 1005 int ret;
1020 1006
1021 lock_sock(sk); 1007 lock_sock(sk);
1022 ret = __tipc_send_stream(sock, m, dsz); 1008 ret = __tipc_sendstream(sock, m, dsz);
1023 release_sock(sk); 1009 release_sock(sk);
1024 1010
1025 return ret; 1011 return ret;
1026} 1012}
1027 1013
1028static int __tipc_send_stream(struct socket *sock, struct msghdr *m, size_t dsz) 1014static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dlen)
1029{ 1015{
1030 struct sock *sk = sock->sk; 1016 struct sock *sk = sock->sk;
1031 struct net *net = sock_net(sk);
1032 struct tipc_sock *tsk = tipc_sk(sk);
1033 struct tipc_msg *mhdr = &tsk->phdr;
1034 struct sk_buff_head pktchain;
1035 DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name); 1017 DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
1036 u32 portid = tsk->portid; 1018 long timeout = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
1037 int rc = -EINVAL; 1019 struct tipc_sock *tsk = tipc_sk(sk);
1038 long timeo; 1020 struct tipc_msg *hdr = &tsk->phdr;
1039 u32 dnode; 1021 struct net *net = sock_net(sk);
1040 uint mtu, send, sent = 0; 1022 struct sk_buff_head pkts;
1041 struct iov_iter save; 1023 u32 dnode = tsk_peer_node(tsk);
1042 int hlen = MIN_H_SIZE; 1024 int send, sent = 0;
1043 1025 int rc = 0;
1044 /* Handle implied connection establishment */
1045 if (unlikely(dest)) {
1046 rc = __tipc_sendmsg(sock, m, dsz);
1047 hlen = msg_hdr_sz(mhdr);
1048 if (dsz && (dsz == rc))
1049 tsk->snt_unacked = tsk_inc(tsk, dsz + hlen);
1050 return rc;
1051 }
1052 if (dsz > (uint)INT_MAX)
1053 return -EMSGSIZE;
1054
1055 if (unlikely(!tipc_sk_connected(sk))) {
1056 if (sk->sk_state == TIPC_DISCONNECTING)
1057 return -EPIPE;
1058 else
1059 return -ENOTCONN;
1060 }
1061 1026
1062 timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT); 1027 skb_queue_head_init(&pkts);
1063 if (!timeo && tsk->link_cong)
1064 return -ELINKCONG;
1065 1028
1066 dnode = tsk_peer_node(tsk); 1029 if (unlikely(dlen > INT_MAX))
1067 skb_queue_head_init(&pktchain); 1030 return -EMSGSIZE;
1068 1031
1069next: 1032 /* Handle implicit connection setup */
1070 save = m->msg_iter; 1033 if (unlikely(dest)) {
1071 mtu = tsk->max_pkt; 1034 rc = __tipc_sendmsg(sock, m, dlen);
1072 send = min_t(uint, dsz - sent, TIPC_MAX_USER_MSG_SIZE); 1035 if (dlen && (dlen == rc))
1073 rc = tipc_msg_build(mhdr, m, sent, send, mtu, &pktchain); 1036 tsk->snt_unacked = tsk_inc(tsk, dlen + msg_hdr_sz(hdr));
1074 if (unlikely(rc < 0))
1075 return rc; 1037 return rc;
1038 }
1076 1039
1077 do { 1040 do {
1078 if (likely(!tsk_conn_cong(tsk))) { 1041 rc = tipc_wait_for_cond(sock, &timeout,
1079 rc = tipc_node_xmit(net, &pktchain, dnode, portid); 1042 (!tsk->cong_link_cnt &&
1080 if (likely(!rc)) {
1081 tsk->snt_unacked += tsk_inc(tsk, send + hlen);
1082 sent += send;
1083 if (sent == dsz)
1084 return dsz;
1085 goto next;
1086 }
1087 if (rc == -EMSGSIZE) {
1088 __skb_queue_purge(&pktchain);
1089 tsk->max_pkt = tipc_node_get_mtu(net, dnode,
1090 portid);
1091 m->msg_iter = save;
1092 goto next;
1093 }
1094 if (rc != -ELINKCONG)
1095 break;
1096
1097 tsk->link_cong = 1;
1098 }
1099 rc = tipc_wait_for_cond(sock, &timeo,
1100 (!tsk->link_cong &&
1101 !tsk_conn_cong(tsk) && 1043 !tsk_conn_cong(tsk) &&
1102 tipc_sk_connected(sk))); 1044 tipc_sk_connected(sk)));
1103 } while (!rc); 1045 if (unlikely(rc))
1046 break;
1047
1048 send = min_t(size_t, dlen - sent, TIPC_MAX_USER_MSG_SIZE);
1049 rc = tipc_msg_build(hdr, m, sent, send, tsk->max_pkt, &pkts);
1050 if (unlikely(rc != send))
1051 break;
1052
1053 rc = tipc_node_xmit(net, &pkts, dnode, tsk->portid);
1054 if (unlikely(rc == -ELINKCONG)) {
1055 tsk->cong_link_cnt = 1;
1056 rc = 0;
1057 }
1058 if (likely(!rc)) {
1059 tsk->snt_unacked += tsk_inc(tsk, send + MIN_H_SIZE);
1060 sent += send;
1061 }
1062 } while (sent < dlen && !rc);
1104 1063
1105 __skb_queue_purge(&pktchain); 1064 return rc ? rc : sent;
1106 return sent ? sent : rc;
1107} 1065}
1108 1066
1109/** 1067/**
@@ -1121,7 +1079,7 @@ static int tipc_send_packet(struct socket *sock, struct msghdr *m, size_t dsz)
1121 if (dsz > TIPC_MAX_USER_MSG_SIZE) 1079 if (dsz > TIPC_MAX_USER_MSG_SIZE)
1122 return -EMSGSIZE; 1080 return -EMSGSIZE;
1123 1081
1124 return tipc_send_stream(sock, m, dsz); 1082 return tipc_sendstream(sock, m, dsz);
1125} 1083}
1126 1084
1127/* tipc_sk_finish_conn - complete the setup of a connection 1085/* tipc_sk_finish_conn - complete the setup of a connection
@@ -1688,6 +1646,7 @@ static bool filter_rcv(struct sock *sk, struct sk_buff *skb,
1688 unsigned int limit = rcvbuf_limit(sk, skb); 1646 unsigned int limit = rcvbuf_limit(sk, skb);
1689 int err = TIPC_OK; 1647 int err = TIPC_OK;
1690 int usr = msg_user(hdr); 1648 int usr = msg_user(hdr);
1649 u32 onode;
1691 1650
1692 if (unlikely(msg_user(hdr) == CONN_MANAGER)) { 1651 if (unlikely(msg_user(hdr) == CONN_MANAGER)) {
1693 tipc_sk_proto_rcv(tsk, skb, xmitq); 1652 tipc_sk_proto_rcv(tsk, skb, xmitq);
@@ -1695,8 +1654,10 @@ static bool filter_rcv(struct sock *sk, struct sk_buff *skb,
1695 } 1654 }
1696 1655
1697 if (unlikely(usr == SOCK_WAKEUP)) { 1656 if (unlikely(usr == SOCK_WAKEUP)) {
1657 onode = msg_orignode(hdr);
1698 kfree_skb(skb); 1658 kfree_skb(skb);
1699 tsk->link_cong = 0; 1659 u32_del(&tsk->cong_links, onode);
1660 tsk->cong_link_cnt--;
1700 sk->sk_write_space(sk); 1661 sk->sk_write_space(sk);
1701 return false; 1662 return false;
1702 } 1663 }
@@ -2104,7 +2065,7 @@ static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags)
2104 struct msghdr m = {NULL,}; 2065 struct msghdr m = {NULL,};
2105 2066
2106 tsk_advance_rx_queue(sk); 2067 tsk_advance_rx_queue(sk);
2107 __tipc_send_stream(new_sock, &m, 0); 2068 __tipc_sendstream(new_sock, &m, 0);
2108 } else { 2069 } else {
2109 __skb_dequeue(&sk->sk_receive_queue); 2070 __skb_dequeue(&sk->sk_receive_queue);
2110 __skb_queue_head(&new_sk->sk_receive_queue, buf); 2071 __skb_queue_head(&new_sk->sk_receive_queue, buf);
@@ -2565,7 +2526,7 @@ static const struct proto_ops stream_ops = {
2565 .shutdown = tipc_shutdown, 2526 .shutdown = tipc_shutdown,
2566 .setsockopt = tipc_setsockopt, 2527 .setsockopt = tipc_setsockopt,
2567 .getsockopt = tipc_getsockopt, 2528 .getsockopt = tipc_getsockopt,
2568 .sendmsg = tipc_send_stream, 2529 .sendmsg = tipc_sendstream,
2569 .recvmsg = tipc_recv_stream, 2530 .recvmsg = tipc_recv_stream,
2570 .mmap = sock_no_mmap, 2531 .mmap = sock_no_mmap,
2571 .sendpage = sock_no_sendpage 2532 .sendpage = sock_no_sendpage