aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc/socket.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/tipc/socket.c')
-rw-r--r--net/tipc/socket.c527
1 files changed, 260 insertions, 267 deletions
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index 800caaa699a1..43e4045e72bc 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -35,6 +35,8 @@
35 */ 35 */
36 36
37#include <linux/rhashtable.h> 37#include <linux/rhashtable.h>
38#include <linux/sched/signal.h>
39
38#include "core.h" 40#include "core.h"
39#include "name_table.h" 41#include "name_table.h"
40#include "node.h" 42#include "node.h"
@@ -67,16 +69,19 @@ enum {
67 * @max_pkt: maximum packet size "hint" used when building messages sent by port 69 * @max_pkt: maximum packet size "hint" used when building messages sent by port
68 * @portid: unique port identity in TIPC socket hash table 70 * @portid: unique port identity in TIPC socket hash table
69 * @phdr: preformatted message header used when sending messages 71 * @phdr: preformatted message header used when sending messages
72 * #cong_links: list of congested links
70 * @publications: list of publications for port 73 * @publications: list of publications for port
74 * @blocking_link: address of the congested link we are currently sleeping on
71 * @pub_count: total # of publications port has made during its lifetime 75 * @pub_count: total # of publications port has made during its lifetime
72 * @probing_state: 76 * @probing_state:
73 * @conn_timeout: the time we can wait for an unresponded setup request 77 * @conn_timeout: the time we can wait for an unresponded setup request
74 * @dupl_rcvcnt: number of bytes counted twice, in both backlog and rcv queue 78 * @dupl_rcvcnt: number of bytes counted twice, in both backlog and rcv queue
75 * @link_cong: non-zero if owner must sleep because of link congestion 79 * @cong_link_cnt: number of congested links
76 * @sent_unacked: # messages sent by socket, and not yet acked by peer 80 * @sent_unacked: # messages sent by socket, and not yet acked by peer
77 * @rcv_unacked: # messages read by user, but not yet acked back to peer 81 * @rcv_unacked: # messages read by user, but not yet acked back to peer
78 * @peer: 'connected' peer for dgram/rdm 82 * @peer: 'connected' peer for dgram/rdm
79 * @node: hash table node 83 * @node: hash table node
84 * @mc_method: cookie for use between socket and broadcast layer
80 * @rcu: rcu struct for tipc_sock 85 * @rcu: rcu struct for tipc_sock
81 */ 86 */
82struct tipc_sock { 87struct tipc_sock {
@@ -87,13 +92,13 @@ struct tipc_sock {
87 u32 max_pkt; 92 u32 max_pkt;
88 u32 portid; 93 u32 portid;
89 struct tipc_msg phdr; 94 struct tipc_msg phdr;
90 struct list_head sock_list; 95 struct list_head cong_links;
91 struct list_head publications; 96 struct list_head publications;
92 u32 pub_count; 97 u32 pub_count;
93 uint conn_timeout; 98 uint conn_timeout;
94 atomic_t dupl_rcvcnt; 99 atomic_t dupl_rcvcnt;
95 bool probe_unacked; 100 bool probe_unacked;
96 bool link_cong; 101 u16 cong_link_cnt;
97 u16 snt_unacked; 102 u16 snt_unacked;
98 u16 snd_win; 103 u16 snd_win;
99 u16 peer_caps; 104 u16 peer_caps;
@@ -101,6 +106,7 @@ struct tipc_sock {
101 u16 rcv_win; 106 u16 rcv_win;
102 struct sockaddr_tipc peer; 107 struct sockaddr_tipc peer;
103 struct rhash_head node; 108 struct rhash_head node;
109 struct tipc_mc_method mc_method;
104 struct rcu_head rcu; 110 struct rcu_head rcu;
105}; 111};
106 112
@@ -110,7 +116,6 @@ static void tipc_write_space(struct sock *sk);
110static void tipc_sock_destruct(struct sock *sk); 116static void tipc_sock_destruct(struct sock *sk);
111static int tipc_release(struct socket *sock); 117static int tipc_release(struct socket *sock);
112static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags); 118static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags);
113static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p);
114static void tipc_sk_timeout(unsigned long data); 119static void tipc_sk_timeout(unsigned long data);
115static int tipc_sk_publish(struct tipc_sock *tsk, uint scope, 120static int tipc_sk_publish(struct tipc_sock *tsk, uint scope,
116 struct tipc_name_seq const *seq); 121 struct tipc_name_seq const *seq);
@@ -119,8 +124,7 @@ static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope,
119static struct tipc_sock *tipc_sk_lookup(struct net *net, u32 portid); 124static struct tipc_sock *tipc_sk_lookup(struct net *net, u32 portid);
120static int tipc_sk_insert(struct tipc_sock *tsk); 125static int tipc_sk_insert(struct tipc_sock *tsk);
121static void tipc_sk_remove(struct tipc_sock *tsk); 126static void tipc_sk_remove(struct tipc_sock *tsk);
122static int __tipc_send_stream(struct socket *sock, struct msghdr *m, 127static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dsz);
123 size_t dsz);
124static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dsz); 128static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dsz);
125 129
126static const struct proto_ops packet_ops; 130static const struct proto_ops packet_ops;
@@ -334,6 +338,49 @@ static int tipc_set_sk_state(struct sock *sk, int state)
334 return res; 338 return res;
335} 339}
336 340
341static int tipc_sk_sock_err(struct socket *sock, long *timeout)
342{
343 struct sock *sk = sock->sk;
344 int err = sock_error(sk);
345 int typ = sock->type;
346
347 if (err)
348 return err;
349 if (typ == SOCK_STREAM || typ == SOCK_SEQPACKET) {
350 if (sk->sk_state == TIPC_DISCONNECTING)
351 return -EPIPE;
352 else if (!tipc_sk_connected(sk))
353 return -ENOTCONN;
354 }
355 if (!*timeout)
356 return -EAGAIN;
357 if (signal_pending(current))
358 return sock_intr_errno(*timeout);
359
360 return 0;
361}
362
363#define tipc_wait_for_cond(sock_, timeout_, condition_) \
364({ \
365 int rc_ = 0; \
366 int done_ = 0; \
367 \
368 while (!(condition_) && !done_) { \
369 struct sock *sk_ = sock->sk; \
370 DEFINE_WAIT_FUNC(wait_, woken_wake_function); \
371 \
372 rc_ = tipc_sk_sock_err(sock_, timeout_); \
373 if (rc_) \
374 break; \
375 prepare_to_wait(sk_sleep(sk_), &wait_, \
376 TASK_INTERRUPTIBLE); \
377 done_ = sk_wait_event(sk_, timeout_, \
378 (condition_), &wait_); \
379 remove_wait_queue(sk_sleep(sk_), &wait_); \
380 } \
381 rc_; \
382})
383
337/** 384/**
338 * tipc_sk_create - create a TIPC socket 385 * tipc_sk_create - create a TIPC socket
339 * @net: network namespace (must be default network) 386 * @net: network namespace (must be default network)
@@ -382,10 +429,9 @@ static int tipc_sk_create(struct net *net, struct socket *sock,
382 tsk = tipc_sk(sk); 429 tsk = tipc_sk(sk);
383 tsk->max_pkt = MAX_PKT_DEFAULT; 430 tsk->max_pkt = MAX_PKT_DEFAULT;
384 INIT_LIST_HEAD(&tsk->publications); 431 INIT_LIST_HEAD(&tsk->publications);
432 INIT_LIST_HEAD(&tsk->cong_links);
385 msg = &tsk->phdr; 433 msg = &tsk->phdr;
386 tn = net_generic(sock_net(sk), tipc_net_id); 434 tn = net_generic(sock_net(sk), tipc_net_id);
387 tipc_msg_init(tn->own_addr, msg, TIPC_LOW_IMPORTANCE, TIPC_NAMED_MSG,
388 NAMED_H_SIZE, 0);
389 435
390 /* Finish initializing socket data structures */ 436 /* Finish initializing socket data structures */
391 sock->ops = ops; 437 sock->ops = ops;
@@ -395,6 +441,13 @@ static int tipc_sk_create(struct net *net, struct socket *sock,
395 pr_warn("Socket create failed; port number exhausted\n"); 441 pr_warn("Socket create failed; port number exhausted\n");
396 return -EINVAL; 442 return -EINVAL;
397 } 443 }
444
445 /* Ensure tsk is visible before we read own_addr. */
446 smp_mb();
447
448 tipc_msg_init(tn->own_addr, msg, TIPC_LOW_IMPORTANCE, TIPC_NAMED_MSG,
449 NAMED_H_SIZE, 0);
450
398 msg_set_origport(msg, tsk->portid); 451 msg_set_origport(msg, tsk->portid);
399 setup_timer(&sk->sk_timer, tipc_sk_timeout, (unsigned long)tsk); 452 setup_timer(&sk->sk_timer, tipc_sk_timeout, (unsigned long)tsk);
400 sk->sk_shutdown = 0; 453 sk->sk_shutdown = 0;
@@ -432,9 +485,14 @@ static void __tipc_shutdown(struct socket *sock, int error)
432 struct sock *sk = sock->sk; 485 struct sock *sk = sock->sk;
433 struct tipc_sock *tsk = tipc_sk(sk); 486 struct tipc_sock *tsk = tipc_sk(sk);
434 struct net *net = sock_net(sk); 487 struct net *net = sock_net(sk);
488 long timeout = CONN_TIMEOUT_DEFAULT;
435 u32 dnode = tsk_peer_node(tsk); 489 u32 dnode = tsk_peer_node(tsk);
436 struct sk_buff *skb; 490 struct sk_buff *skb;
437 491
492 /* Avoid that hi-prio shutdown msgs bypass msgs in link wakeup queue */
493 tipc_wait_for_cond(sock, &timeout, (!tsk->cong_link_cnt &&
494 !tsk_conn_cong(tsk)));
495
438 /* Reject all unreceived messages, except on an active connection 496 /* Reject all unreceived messages, except on an active connection
439 * (which disconnects locally & sends a 'FIN+' to peer). 497 * (which disconnects locally & sends a 'FIN+' to peer).
440 */ 498 */
@@ -505,7 +563,8 @@ static int tipc_release(struct socket *sock)
505 563
506 /* Reject any messages that accumulated in backlog queue */ 564 /* Reject any messages that accumulated in backlog queue */
507 release_sock(sk); 565 release_sock(sk);
508 566 u32_list_purge(&tsk->cong_links);
567 tsk->cong_link_cnt = 0;
509 call_rcu(&tsk->rcu, tipc_sk_callback); 568 call_rcu(&tsk->rcu, tipc_sk_callback);
510 sock->sk = NULL; 569 sock->sk = NULL;
511 570
@@ -648,7 +707,7 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock,
648 707
649 switch (sk->sk_state) { 708 switch (sk->sk_state) {
650 case TIPC_ESTABLISHED: 709 case TIPC_ESTABLISHED:
651 if (!tsk->link_cong && !tsk_conn_cong(tsk)) 710 if (!tsk->cong_link_cnt && !tsk_conn_cong(tsk))
652 mask |= POLLOUT; 711 mask |= POLLOUT;
653 /* fall thru' */ 712 /* fall thru' */
654 case TIPC_LISTEN: 713 case TIPC_LISTEN:
@@ -657,7 +716,7 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock,
657 mask |= (POLLIN | POLLRDNORM); 716 mask |= (POLLIN | POLLRDNORM);
658 break; 717 break;
659 case TIPC_OPEN: 718 case TIPC_OPEN:
660 if (!tsk->link_cong) 719 if (!tsk->cong_link_cnt)
661 mask |= POLLOUT; 720 mask |= POLLOUT;
662 if (tipc_sk_type_connectionless(sk) && 721 if (tipc_sk_type_connectionless(sk) &&
663 (!skb_queue_empty(&sk->sk_receive_queue))) 722 (!skb_queue_empty(&sk->sk_receive_queue)))
@@ -676,63 +735,60 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock,
676 * @sock: socket structure 735 * @sock: socket structure
677 * @seq: destination address 736 * @seq: destination address
678 * @msg: message to send 737 * @msg: message to send
679 * @dsz: total length of message data 738 * @dlen: length of data to send
680 * @timeo: timeout to wait for wakeup 739 * @timeout: timeout to wait for wakeup
681 * 740 *
682 * Called from function tipc_sendmsg(), which has done all sanity checks 741 * Called from function tipc_sendmsg(), which has done all sanity checks
683 * Returns the number of bytes sent on success, or errno 742 * Returns the number of bytes sent on success, or errno
684 */ 743 */
685static int tipc_sendmcast(struct socket *sock, struct tipc_name_seq *seq, 744static int tipc_sendmcast(struct socket *sock, struct tipc_name_seq *seq,
686 struct msghdr *msg, size_t dsz, long timeo) 745 struct msghdr *msg, size_t dlen, long timeout)
687{ 746{
688 struct sock *sk = sock->sk; 747 struct sock *sk = sock->sk;
689 struct tipc_sock *tsk = tipc_sk(sk); 748 struct tipc_sock *tsk = tipc_sk(sk);
749 struct tipc_msg *hdr = &tsk->phdr;
690 struct net *net = sock_net(sk); 750 struct net *net = sock_net(sk);
691 struct tipc_msg *mhdr = &tsk->phdr; 751 int mtu = tipc_bcast_get_mtu(net);
692 struct sk_buff_head pktchain; 752 struct tipc_mc_method *method = &tsk->mc_method;
693 struct iov_iter save = msg->msg_iter; 753 u32 domain = addr_domain(net, TIPC_CLUSTER_SCOPE);
694 uint mtu; 754 struct sk_buff_head pkts;
755 struct tipc_nlist dsts;
695 int rc; 756 int rc;
696 757
697 if (!timeo && tsk->link_cong) 758 /* Block or return if any destination link is congested */
698 return -ELINKCONG; 759 rc = tipc_wait_for_cond(sock, &timeout, !tsk->cong_link_cnt);
760 if (unlikely(rc))
761 return rc;
699 762
700 msg_set_type(mhdr, TIPC_MCAST_MSG); 763 /* Lookup destination nodes */
701 msg_set_lookup_scope(mhdr, TIPC_CLUSTER_SCOPE); 764 tipc_nlist_init(&dsts, tipc_own_addr(net));
702 msg_set_destport(mhdr, 0); 765 tipc_nametbl_lookup_dst_nodes(net, seq->type, seq->lower,
703 msg_set_destnode(mhdr, 0); 766 seq->upper, domain, &dsts);
704 msg_set_nametype(mhdr, seq->type); 767 if (!dsts.local && !dsts.remote)
705 msg_set_namelower(mhdr, seq->lower); 768 return -EHOSTUNREACH;
706 msg_set_nameupper(mhdr, seq->upper);
707 msg_set_hdr_sz(mhdr, MCAST_H_SIZE);
708 769
709 skb_queue_head_init(&pktchain); 770 /* Build message header */
771 msg_set_type(hdr, TIPC_MCAST_MSG);
772 msg_set_hdr_sz(hdr, MCAST_H_SIZE);
773 msg_set_lookup_scope(hdr, TIPC_CLUSTER_SCOPE);
774 msg_set_destport(hdr, 0);
775 msg_set_destnode(hdr, 0);
776 msg_set_nametype(hdr, seq->type);
777 msg_set_namelower(hdr, seq->lower);
778 msg_set_nameupper(hdr, seq->upper);
710 779
711new_mtu: 780 /* Build message as chain of buffers */
712 mtu = tipc_bcast_get_mtu(net); 781 skb_queue_head_init(&pkts);
713 rc = tipc_msg_build(mhdr, msg, 0, dsz, mtu, &pktchain); 782 rc = tipc_msg_build(hdr, msg, 0, dlen, mtu, &pkts);
714 if (unlikely(rc < 0))
715 return rc;
716 783
717 do { 784 /* Send message if build was successful */
718 rc = tipc_bcast_xmit(net, &pktchain); 785 if (unlikely(rc == dlen))
719 if (likely(!rc)) 786 rc = tipc_mcast_xmit(net, &pkts, method, &dsts,
720 return dsz; 787 &tsk->cong_link_cnt);
721 788
722 if (rc == -ELINKCONG) { 789 tipc_nlist_purge(&dsts);
723 tsk->link_cong = 1; 790
724 rc = tipc_wait_for_sndmsg(sock, &timeo); 791 return rc ? rc : dlen;
725 if (!rc)
726 continue;
727 }
728 __skb_queue_purge(&pktchain);
729 if (rc == -EMSGSIZE) {
730 msg->msg_iter = save;
731 goto new_mtu;
732 }
733 break;
734 } while (1);
735 return rc;
736} 792}
737 793
738/** 794/**
@@ -746,7 +802,7 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
746 struct sk_buff_head *inputq) 802 struct sk_buff_head *inputq)
747{ 803{
748 struct tipc_msg *msg; 804 struct tipc_msg *msg;
749 struct tipc_plist dports; 805 struct list_head dports;
750 u32 portid; 806 u32 portid;
751 u32 scope = TIPC_CLUSTER_SCOPE; 807 u32 scope = TIPC_CLUSTER_SCOPE;
752 struct sk_buff_head tmpq; 808 struct sk_buff_head tmpq;
@@ -754,7 +810,7 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
754 struct sk_buff *skb, *_skb; 810 struct sk_buff *skb, *_skb;
755 811
756 __skb_queue_head_init(&tmpq); 812 __skb_queue_head_init(&tmpq);
757 tipc_plist_init(&dports); 813 INIT_LIST_HEAD(&dports);
758 814
759 skb = tipc_skb_peek(arrvq, &inputq->lock); 815 skb = tipc_skb_peek(arrvq, &inputq->lock);
760 for (; skb; skb = tipc_skb_peek(arrvq, &inputq->lock)) { 816 for (; skb; skb = tipc_skb_peek(arrvq, &inputq->lock)) {
@@ -768,8 +824,8 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
768 tipc_nametbl_mc_translate(net, 824 tipc_nametbl_mc_translate(net,
769 msg_nametype(msg), msg_namelower(msg), 825 msg_nametype(msg), msg_namelower(msg),
770 msg_nameupper(msg), scope, &dports); 826 msg_nameupper(msg), scope, &dports);
771 portid = tipc_plist_pop(&dports); 827 portid = u32_pop(&dports);
772 for (; portid; portid = tipc_plist_pop(&dports)) { 828 for (; portid; portid = u32_pop(&dports)) {
773 _skb = __pskb_copy(skb, hsz, GFP_ATOMIC); 829 _skb = __pskb_copy(skb, hsz, GFP_ATOMIC);
774 if (_skb) { 830 if (_skb) {
775 msg_set_destport(buf_msg(_skb), portid); 831 msg_set_destport(buf_msg(_skb), portid);
@@ -830,31 +886,6 @@ exit:
830 kfree_skb(skb); 886 kfree_skb(skb);
831} 887}
832 888
833static int tipc_wait_for_sndmsg(struct socket *sock, long *timeo_p)
834{
835 DEFINE_WAIT_FUNC(wait, woken_wake_function);
836 struct sock *sk = sock->sk;
837 struct tipc_sock *tsk = tipc_sk(sk);
838 int done;
839
840 do {
841 int err = sock_error(sk);
842 if (err)
843 return err;
844 if (sk->sk_shutdown & SEND_SHUTDOWN)
845 return -EPIPE;
846 if (!*timeo_p)
847 return -EAGAIN;
848 if (signal_pending(current))
849 return sock_intr_errno(*timeo_p);
850
851 add_wait_queue(sk_sleep(sk), &wait);
852 done = sk_wait_event(sk, timeo_p, !tsk->link_cong, &wait);
853 remove_wait_queue(sk_sleep(sk), &wait);
854 } while (!done);
855 return 0;
856}
857
858/** 889/**
859 * tipc_sendmsg - send message in connectionless manner 890 * tipc_sendmsg - send message in connectionless manner
860 * @sock: socket structure 891 * @sock: socket structure
@@ -881,35 +912,38 @@ static int tipc_sendmsg(struct socket *sock,
881 return ret; 912 return ret;
882} 913}
883 914
884static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dsz) 915static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen)
885{ 916{
886 DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
887 struct sock *sk = sock->sk; 917 struct sock *sk = sock->sk;
888 struct tipc_sock *tsk = tipc_sk(sk);
889 struct net *net = sock_net(sk); 918 struct net *net = sock_net(sk);
890 struct tipc_msg *mhdr = &tsk->phdr; 919 struct tipc_sock *tsk = tipc_sk(sk);
891 u32 dnode, dport; 920 DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
892 struct sk_buff_head pktchain; 921 long timeout = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
893 bool is_connectionless = tipc_sk_type_connectionless(sk); 922 struct list_head *clinks = &tsk->cong_links;
894 struct sk_buff *skb; 923 bool syn = !tipc_sk_type_connectionless(sk);
924 struct tipc_msg *hdr = &tsk->phdr;
895 struct tipc_name_seq *seq; 925 struct tipc_name_seq *seq;
896 struct iov_iter save; 926 struct sk_buff_head pkts;
897 u32 mtu; 927 u32 type, inst, domain;
898 long timeo; 928 u32 dnode, dport;
899 int rc; 929 int mtu, rc;
900 930
901 if (dsz > TIPC_MAX_USER_MSG_SIZE) 931 if (unlikely(dlen > TIPC_MAX_USER_MSG_SIZE))
902 return -EMSGSIZE; 932 return -EMSGSIZE;
933
903 if (unlikely(!dest)) { 934 if (unlikely(!dest)) {
904 if (is_connectionless && tsk->peer.family == AF_TIPC) 935 dest = &tsk->peer;
905 dest = &tsk->peer; 936 if (!syn || dest->family != AF_TIPC)
906 else
907 return -EDESTADDRREQ; 937 return -EDESTADDRREQ;
908 } else if (unlikely(m->msg_namelen < sizeof(*dest)) ||
909 dest->family != AF_TIPC) {
910 return -EINVAL;
911 } 938 }
912 if (!is_connectionless) { 939
940 if (unlikely(m->msg_namelen < sizeof(*dest)))
941 return -EINVAL;
942
943 if (unlikely(dest->family != AF_TIPC))
944 return -EINVAL;
945
946 if (unlikely(syn)) {
913 if (sk->sk_state == TIPC_LISTEN) 947 if (sk->sk_state == TIPC_LISTEN)
914 return -EPIPE; 948 return -EPIPE;
915 if (sk->sk_state != TIPC_OPEN) 949 if (sk->sk_state != TIPC_OPEN)
@@ -921,102 +955,62 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dsz)
921 tsk->conn_instance = dest->addr.name.name.instance; 955 tsk->conn_instance = dest->addr.name.name.instance;
922 } 956 }
923 } 957 }
924 seq = &dest->addr.nameseq;
925 timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
926 958
927 if (dest->addrtype == TIPC_ADDR_MCAST) { 959 seq = &dest->addr.nameseq;
928 return tipc_sendmcast(sock, seq, m, dsz, timeo); 960 if (dest->addrtype == TIPC_ADDR_MCAST)
929 } else if (dest->addrtype == TIPC_ADDR_NAME) { 961 return tipc_sendmcast(sock, seq, m, dlen, timeout);
930 u32 type = dest->addr.name.name.type;
931 u32 inst = dest->addr.name.name.instance;
932 u32 domain = dest->addr.name.domain;
933 962
963 if (dest->addrtype == TIPC_ADDR_NAME) {
964 type = dest->addr.name.name.type;
965 inst = dest->addr.name.name.instance;
966 domain = dest->addr.name.domain;
934 dnode = domain; 967 dnode = domain;
935 msg_set_type(mhdr, TIPC_NAMED_MSG); 968 msg_set_type(hdr, TIPC_NAMED_MSG);
936 msg_set_hdr_sz(mhdr, NAMED_H_SIZE); 969 msg_set_hdr_sz(hdr, NAMED_H_SIZE);
937 msg_set_nametype(mhdr, type); 970 msg_set_nametype(hdr, type);
938 msg_set_nameinst(mhdr, inst); 971 msg_set_nameinst(hdr, inst);
939 msg_set_lookup_scope(mhdr, tipc_addr_scope(domain)); 972 msg_set_lookup_scope(hdr, tipc_addr_scope(domain));
940 dport = tipc_nametbl_translate(net, type, inst, &dnode); 973 dport = tipc_nametbl_translate(net, type, inst, &dnode);
941 msg_set_destnode(mhdr, dnode); 974 msg_set_destnode(hdr, dnode);
942 msg_set_destport(mhdr, dport); 975 msg_set_destport(hdr, dport);
943 if (unlikely(!dport && !dnode)) 976 if (unlikely(!dport && !dnode))
944 return -EHOSTUNREACH; 977 return -EHOSTUNREACH;
978
945 } else if (dest->addrtype == TIPC_ADDR_ID) { 979 } else if (dest->addrtype == TIPC_ADDR_ID) {
946 dnode = dest->addr.id.node; 980 dnode = dest->addr.id.node;
947 msg_set_type(mhdr, TIPC_DIRECT_MSG); 981 msg_set_type(hdr, TIPC_DIRECT_MSG);
948 msg_set_lookup_scope(mhdr, 0); 982 msg_set_lookup_scope(hdr, 0);
949 msg_set_destnode(mhdr, dnode); 983 msg_set_destnode(hdr, dnode);
950 msg_set_destport(mhdr, dest->addr.id.ref); 984 msg_set_destport(hdr, dest->addr.id.ref);
951 msg_set_hdr_sz(mhdr, BASIC_H_SIZE); 985 msg_set_hdr_sz(hdr, BASIC_H_SIZE);
952 } 986 }
953 987
954 skb_queue_head_init(&pktchain); 988 /* Block or return if destination link is congested */
955 save = m->msg_iter; 989 rc = tipc_wait_for_cond(sock, &timeout, !u32_find(clinks, dnode));
956new_mtu: 990 if (unlikely(rc))
957 mtu = tipc_node_get_mtu(net, dnode, tsk->portid);
958 rc = tipc_msg_build(mhdr, m, 0, dsz, mtu, &pktchain);
959 if (rc < 0)
960 return rc; 991 return rc;
961 992
962 do { 993 skb_queue_head_init(&pkts);
963 skb = skb_peek(&pktchain); 994 mtu = tipc_node_get_mtu(net, dnode, tsk->portid);
964 TIPC_SKB_CB(skb)->wakeup_pending = tsk->link_cong; 995 rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts);
965 rc = tipc_node_xmit(net, &pktchain, dnode, tsk->portid); 996 if (unlikely(rc != dlen))
966 if (likely(!rc)) { 997 return rc;
967 if (!is_connectionless)
968 tipc_set_sk_state(sk, TIPC_CONNECTING);
969 return dsz;
970 }
971 if (rc == -ELINKCONG) {
972 tsk->link_cong = 1;
973 rc = tipc_wait_for_sndmsg(sock, &timeo);
974 if (!rc)
975 continue;
976 }
977 __skb_queue_purge(&pktchain);
978 if (rc == -EMSGSIZE) {
979 m->msg_iter = save;
980 goto new_mtu;
981 }
982 break;
983 } while (1);
984
985 return rc;
986}
987 998
988static int tipc_wait_for_sndpkt(struct socket *sock, long *timeo_p) 999 rc = tipc_node_xmit(net, &pkts, dnode, tsk->portid);
989{ 1000 if (unlikely(rc == -ELINKCONG)) {
990 DEFINE_WAIT_FUNC(wait, woken_wake_function); 1001 u32_push(clinks, dnode);
991 struct sock *sk = sock->sk; 1002 tsk->cong_link_cnt++;
992 struct tipc_sock *tsk = tipc_sk(sk); 1003 rc = 0;
993 int done; 1004 }
994 1005
995 do { 1006 if (unlikely(syn && !rc))
996 int err = sock_error(sk); 1007 tipc_set_sk_state(sk, TIPC_CONNECTING);
997 if (err)
998 return err;
999 if (sk->sk_state == TIPC_DISCONNECTING)
1000 return -EPIPE;
1001 else if (!tipc_sk_connected(sk))
1002 return -ENOTCONN;
1003 if (!*timeo_p)
1004 return -EAGAIN;
1005 if (signal_pending(current))
1006 return sock_intr_errno(*timeo_p);
1007 1008
1008 add_wait_queue(sk_sleep(sk), &wait); 1009 return rc ? rc : dlen;
1009 done = sk_wait_event(sk, timeo_p,
1010 (!tsk->link_cong &&
1011 !tsk_conn_cong(tsk)) ||
1012 !tipc_sk_connected(sk), &wait);
1013 remove_wait_queue(sk_sleep(sk), &wait);
1014 } while (!done);
1015 return 0;
1016} 1010}
1017 1011
1018/** 1012/**
1019 * tipc_send_stream - send stream-oriented data 1013 * tipc_sendstream - send stream-oriented data
1020 * @sock: socket structure 1014 * @sock: socket structure
1021 * @m: data to send 1015 * @m: data to send
1022 * @dsz: total length of data to be transmitted 1016 * @dsz: total length of data to be transmitted
@@ -1026,94 +1020,69 @@ static int tipc_wait_for_sndpkt(struct socket *sock, long *timeo_p)
1026 * Returns the number of bytes sent on success (or partial success), 1020 * Returns the number of bytes sent on success (or partial success),
1027 * or errno if no data sent 1021 * or errno if no data sent
1028 */ 1022 */
1029static int tipc_send_stream(struct socket *sock, struct msghdr *m, size_t dsz) 1023static int tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dsz)
1030{ 1024{
1031 struct sock *sk = sock->sk; 1025 struct sock *sk = sock->sk;
1032 int ret; 1026 int ret;
1033 1027
1034 lock_sock(sk); 1028 lock_sock(sk);
1035 ret = __tipc_send_stream(sock, m, dsz); 1029 ret = __tipc_sendstream(sock, m, dsz);
1036 release_sock(sk); 1030 release_sock(sk);
1037 1031
1038 return ret; 1032 return ret;
1039} 1033}
1040 1034
1041static int __tipc_send_stream(struct socket *sock, struct msghdr *m, size_t dsz) 1035static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dlen)
1042{ 1036{
1043 struct sock *sk = sock->sk; 1037 struct sock *sk = sock->sk;
1044 struct net *net = sock_net(sk);
1045 struct tipc_sock *tsk = tipc_sk(sk);
1046 struct tipc_msg *mhdr = &tsk->phdr;
1047 struct sk_buff_head pktchain;
1048 DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name); 1038 DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
1049 u32 portid = tsk->portid; 1039 long timeout = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
1050 int rc = -EINVAL; 1040 struct tipc_sock *tsk = tipc_sk(sk);
1051 long timeo; 1041 struct tipc_msg *hdr = &tsk->phdr;
1052 u32 dnode; 1042 struct net *net = sock_net(sk);
1053 uint mtu, send, sent = 0; 1043 struct sk_buff_head pkts;
1054 struct iov_iter save; 1044 u32 dnode = tsk_peer_node(tsk);
1055 int hlen = MIN_H_SIZE; 1045 int send, sent = 0;
1056 1046 int rc = 0;
1057 /* Handle implied connection establishment */
1058 if (unlikely(dest)) {
1059 rc = __tipc_sendmsg(sock, m, dsz);
1060 hlen = msg_hdr_sz(mhdr);
1061 if (dsz && (dsz == rc))
1062 tsk->snt_unacked = tsk_inc(tsk, dsz + hlen);
1063 return rc;
1064 }
1065 if (dsz > (uint)INT_MAX)
1066 return -EMSGSIZE;
1067
1068 if (unlikely(!tipc_sk_connected(sk))) {
1069 if (sk->sk_state == TIPC_DISCONNECTING)
1070 return -EPIPE;
1071 else
1072 return -ENOTCONN;
1073 }
1074 1047
1075 timeo = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT); 1048 skb_queue_head_init(&pkts);
1076 if (!timeo && tsk->link_cong)
1077 return -ELINKCONG;
1078 1049
1079 dnode = tsk_peer_node(tsk); 1050 if (unlikely(dlen > INT_MAX))
1080 skb_queue_head_init(&pktchain); 1051 return -EMSGSIZE;
1081 1052
1082next: 1053 /* Handle implicit connection setup */
1083 save = m->msg_iter; 1054 if (unlikely(dest)) {
1084 mtu = tsk->max_pkt; 1055 rc = __tipc_sendmsg(sock, m, dlen);
1085 send = min_t(uint, dsz - sent, TIPC_MAX_USER_MSG_SIZE); 1056 if (dlen && (dlen == rc))
1086 rc = tipc_msg_build(mhdr, m, sent, send, mtu, &pktchain); 1057 tsk->snt_unacked = tsk_inc(tsk, dlen + msg_hdr_sz(hdr));
1087 if (unlikely(rc < 0))
1088 return rc; 1058 return rc;
1059 }
1089 1060
1090 do { 1061 do {
1091 if (likely(!tsk_conn_cong(tsk))) { 1062 rc = tipc_wait_for_cond(sock, &timeout,
1092 rc = tipc_node_xmit(net, &pktchain, dnode, portid); 1063 (!tsk->cong_link_cnt &&
1093 if (likely(!rc)) { 1064 !tsk_conn_cong(tsk) &&
1094 tsk->snt_unacked += tsk_inc(tsk, send + hlen); 1065 tipc_sk_connected(sk)));
1095 sent += send; 1066 if (unlikely(rc))
1096 if (sent == dsz) 1067 break;
1097 return dsz;
1098 goto next;
1099 }
1100 if (rc == -EMSGSIZE) {
1101 __skb_queue_purge(&pktchain);
1102 tsk->max_pkt = tipc_node_get_mtu(net, dnode,
1103 portid);
1104 m->msg_iter = save;
1105 goto next;
1106 }
1107 if (rc != -ELINKCONG)
1108 break;
1109 1068
1110 tsk->link_cong = 1; 1069 send = min_t(size_t, dlen - sent, TIPC_MAX_USER_MSG_SIZE);
1070 rc = tipc_msg_build(hdr, m, sent, send, tsk->max_pkt, &pkts);
1071 if (unlikely(rc != send))
1072 break;
1073
1074 rc = tipc_node_xmit(net, &pkts, dnode, tsk->portid);
1075 if (unlikely(rc == -ELINKCONG)) {
1076 tsk->cong_link_cnt = 1;
1077 rc = 0;
1111 } 1078 }
1112 rc = tipc_wait_for_sndpkt(sock, &timeo); 1079 if (likely(!rc)) {
1113 } while (!rc); 1080 tsk->snt_unacked += tsk_inc(tsk, send + MIN_H_SIZE);
1081 sent += send;
1082 }
1083 } while (sent < dlen && !rc);
1114 1084
1115 __skb_queue_purge(&pktchain); 1085 return rc ? rc : sent;
1116 return sent ? sent : rc;
1117} 1086}
1118 1087
1119/** 1088/**
@@ -1131,7 +1100,7 @@ static int tipc_send_packet(struct socket *sock, struct msghdr *m, size_t dsz)
1131 if (dsz > TIPC_MAX_USER_MSG_SIZE) 1100 if (dsz > TIPC_MAX_USER_MSG_SIZE)
1132 return -EMSGSIZE; 1101 return -EMSGSIZE;
1133 1102
1134 return tipc_send_stream(sock, m, dsz); 1103 return tipc_sendstream(sock, m, dsz);
1135} 1104}
1136 1105
1137/* tipc_sk_finish_conn - complete the setup of a connection 1106/* tipc_sk_finish_conn - complete the setup of a connection
@@ -1698,6 +1667,7 @@ static bool filter_rcv(struct sock *sk, struct sk_buff *skb,
1698 unsigned int limit = rcvbuf_limit(sk, skb); 1667 unsigned int limit = rcvbuf_limit(sk, skb);
1699 int err = TIPC_OK; 1668 int err = TIPC_OK;
1700 int usr = msg_user(hdr); 1669 int usr = msg_user(hdr);
1670 u32 onode;
1701 1671
1702 if (unlikely(msg_user(hdr) == CONN_MANAGER)) { 1672 if (unlikely(msg_user(hdr) == CONN_MANAGER)) {
1703 tipc_sk_proto_rcv(tsk, skb, xmitq); 1673 tipc_sk_proto_rcv(tsk, skb, xmitq);
@@ -1705,8 +1675,10 @@ static bool filter_rcv(struct sock *sk, struct sk_buff *skb,
1705 } 1675 }
1706 1676
1707 if (unlikely(usr == SOCK_WAKEUP)) { 1677 if (unlikely(usr == SOCK_WAKEUP)) {
1678 onode = msg_orignode(hdr);
1708 kfree_skb(skb); 1679 kfree_skb(skb);
1709 tsk->link_cong = 0; 1680 u32_del(&tsk->cong_links, onode);
1681 tsk->cong_link_cnt--;
1710 sk->sk_write_space(sk); 1682 sk->sk_write_space(sk);
1711 return false; 1683 return false;
1712 } 1684 }
@@ -2114,7 +2086,7 @@ static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags)
2114 struct msghdr m = {NULL,}; 2086 struct msghdr m = {NULL,};
2115 2087
2116 tsk_advance_rx_queue(sk); 2088 tsk_advance_rx_queue(sk);
2117 __tipc_send_stream(new_sock, &m, 0); 2089 __tipc_sendstream(new_sock, &m, 0);
2118 } else { 2090 } else {
2119 __skb_dequeue(&sk->sk_receive_queue); 2091 __skb_dequeue(&sk->sk_receive_queue);
2120 __skb_queue_head(&new_sk->sk_receive_queue, buf); 2092 __skb_queue_head(&new_sk->sk_receive_queue, buf);
@@ -2269,24 +2241,27 @@ static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope,
2269void tipc_sk_reinit(struct net *net) 2241void tipc_sk_reinit(struct net *net)
2270{ 2242{
2271 struct tipc_net *tn = net_generic(net, tipc_net_id); 2243 struct tipc_net *tn = net_generic(net, tipc_net_id);
2272 const struct bucket_table *tbl; 2244 struct rhashtable_iter iter;
2273 struct rhash_head *pos;
2274 struct tipc_sock *tsk; 2245 struct tipc_sock *tsk;
2275 struct tipc_msg *msg; 2246 struct tipc_msg *msg;
2276 int i;
2277 2247
2278 rcu_read_lock(); 2248 rhashtable_walk_enter(&tn->sk_rht, &iter);
2279 tbl = rht_dereference_rcu((&tn->sk_rht)->tbl, &tn->sk_rht); 2249
2280 for (i = 0; i < tbl->size; i++) { 2250 do {
2281 rht_for_each_entry_rcu(tsk, pos, tbl, i, node) { 2251 tsk = ERR_PTR(rhashtable_walk_start(&iter));
2252 if (tsk)
2253 continue;
2254
2255 while ((tsk = rhashtable_walk_next(&iter)) && !IS_ERR(tsk)) {
2282 spin_lock_bh(&tsk->sk.sk_lock.slock); 2256 spin_lock_bh(&tsk->sk.sk_lock.slock);
2283 msg = &tsk->phdr; 2257 msg = &tsk->phdr;
2284 msg_set_prevnode(msg, tn->own_addr); 2258 msg_set_prevnode(msg, tn->own_addr);
2285 msg_set_orignode(msg, tn->own_addr); 2259 msg_set_orignode(msg, tn->own_addr);
2286 spin_unlock_bh(&tsk->sk.sk_lock.slock); 2260 spin_unlock_bh(&tsk->sk.sk_lock.slock);
2287 } 2261 }
2288 } 2262
2289 rcu_read_unlock(); 2263 rhashtable_walk_stop(&iter);
2264 } while (tsk == ERR_PTR(-EAGAIN));
2290} 2265}
2291 2266
2292static struct tipc_sock *tipc_sk_lookup(struct net *net, u32 portid) 2267static struct tipc_sock *tipc_sk_lookup(struct net *net, u32 portid)
@@ -2382,18 +2357,29 @@ static int tipc_setsockopt(struct socket *sock, int lvl, int opt,
2382{ 2357{
2383 struct sock *sk = sock->sk; 2358 struct sock *sk = sock->sk;
2384 struct tipc_sock *tsk = tipc_sk(sk); 2359 struct tipc_sock *tsk = tipc_sk(sk);
2385 u32 value; 2360 u32 value = 0;
2386 int res; 2361 int res = 0;
2387 2362
2388 if ((lvl == IPPROTO_TCP) && (sock->type == SOCK_STREAM)) 2363 if ((lvl == IPPROTO_TCP) && (sock->type == SOCK_STREAM))
2389 return 0; 2364 return 0;
2390 if (lvl != SOL_TIPC) 2365 if (lvl != SOL_TIPC)
2391 return -ENOPROTOOPT; 2366 return -ENOPROTOOPT;
2392 if (ol < sizeof(value)) 2367
2393 return -EINVAL; 2368 switch (opt) {
2394 res = get_user(value, (u32 __user *)ov); 2369 case TIPC_IMPORTANCE:
2395 if (res) 2370 case TIPC_SRC_DROPPABLE:
2396 return res; 2371 case TIPC_DEST_DROPPABLE:
2372 case TIPC_CONN_TIMEOUT:
2373 if (ol < sizeof(value))
2374 return -EINVAL;
2375 res = get_user(value, (u32 __user *)ov);
2376 if (res)
2377 return res;
2378 break;
2379 default:
2380 if (ov || ol)
2381 return -EINVAL;
2382 }
2397 2383
2398 lock_sock(sk); 2384 lock_sock(sk);
2399 2385
@@ -2412,7 +2398,14 @@ static int tipc_setsockopt(struct socket *sock, int lvl, int opt,
2412 break; 2398 break;
2413 case TIPC_CONN_TIMEOUT: 2399 case TIPC_CONN_TIMEOUT:
2414 tipc_sk(sk)->conn_timeout = value; 2400 tipc_sk(sk)->conn_timeout = value;
2415 /* no need to set "res", since already 0 at this point */ 2401 break;
2402 case TIPC_MCAST_BROADCAST:
2403 tsk->mc_method.rcast = false;
2404 tsk->mc_method.mandatory = true;
2405 break;
2406 case TIPC_MCAST_REPLICAST:
2407 tsk->mc_method.rcast = true;
2408 tsk->mc_method.mandatory = true;
2416 break; 2409 break;
2417 default: 2410 default:
2418 res = -EINVAL; 2411 res = -EINVAL;
@@ -2575,7 +2568,7 @@ static const struct proto_ops stream_ops = {
2575 .shutdown = tipc_shutdown, 2568 .shutdown = tipc_shutdown,
2576 .setsockopt = tipc_setsockopt, 2569 .setsockopt = tipc_setsockopt,
2577 .getsockopt = tipc_getsockopt, 2570 .getsockopt = tipc_getsockopt,
2578 .sendmsg = tipc_send_stream, 2571 .sendmsg = tipc_sendstream,
2579 .recvmsg = tipc_recv_stream, 2572 .recvmsg = tipc_recv_stream,
2580 .mmap = sock_no_mmap, 2573 .mmap = sock_no_mmap,
2581 .sendpage = sock_no_sendpage 2574 .sendpage = sock_no_sendpage