aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc/socket.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/tipc/socket.c')
-rw-r--r--net/tipc/socket.c845
1 files changed, 651 insertions, 194 deletions
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index d50edd6e0019..5d18c0caa92b 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -1,7 +1,7 @@
1/* 1/*
2 * net/tipc/socket.c: TIPC socket API 2 * net/tipc/socket.c: TIPC socket API
3 * 3 *
4 * Copyright (c) 2001-2007, 2012-2016, Ericsson AB 4 * Copyright (c) 2001-2007, 2012-2017, Ericsson AB
5 * Copyright (c) 2004-2008, 2010-2013, Wind River Systems 5 * Copyright (c) 2004-2008, 2010-2013, Wind River Systems
6 * All rights reserved. 6 * All rights reserved.
7 * 7 *
@@ -45,9 +45,10 @@
45#include "socket.h" 45#include "socket.h"
46#include "bcast.h" 46#include "bcast.h"
47#include "netlink.h" 47#include "netlink.h"
48#include "group.h"
48 49
49#define CONN_TIMEOUT_DEFAULT 8000 /* default connect timeout = 8s */ 50#define CONN_TIMEOUT_DEFAULT 8000 /* default connect timeout = 8s */
50#define CONN_PROBING_INTERVAL msecs_to_jiffies(3600000) /* [ms] => 1 h */ 51#define CONN_PROBING_INTV msecs_to_jiffies(3600000) /* [ms] => 1 h */
51#define TIPC_FWD_MSG 1 52#define TIPC_FWD_MSG 1
52#define TIPC_MAX_PORT 0xffffffff 53#define TIPC_MAX_PORT 0xffffffff
53#define TIPC_MIN_PORT 1 54#define TIPC_MIN_PORT 1
@@ -61,6 +62,11 @@ enum {
61 TIPC_CONNECTING = TCP_SYN_SENT, 62 TIPC_CONNECTING = TCP_SYN_SENT,
62}; 63};
63 64
65struct sockaddr_pair {
66 struct sockaddr_tipc sock;
67 struct sockaddr_tipc member;
68};
69
64/** 70/**
65 * struct tipc_sock - TIPC socket structure 71 * struct tipc_sock - TIPC socket structure
66 * @sk: socket - interacts with 'port' and with user via the socket API 72 * @sk: socket - interacts with 'port' and with user via the socket API
@@ -78,7 +84,7 @@ enum {
78 * @conn_timeout: the time we can wait for an unresponded setup request 84 * @conn_timeout: the time we can wait for an unresponded setup request
79 * @dupl_rcvcnt: number of bytes counted twice, in both backlog and rcv queue 85 * @dupl_rcvcnt: number of bytes counted twice, in both backlog and rcv queue
80 * @cong_link_cnt: number of congested links 86 * @cong_link_cnt: number of congested links
81 * @sent_unacked: # messages sent by socket, and not yet acked by peer 87 * @snt_unacked: # messages sent by socket, and not yet acked by peer
82 * @rcv_unacked: # messages read by user, but not yet acked back to peer 88 * @rcv_unacked: # messages read by user, but not yet acked back to peer
83 * @peer: 'connected' peer for dgram/rdm 89 * @peer: 'connected' peer for dgram/rdm
84 * @node: hash table node 90 * @node: hash table node
@@ -109,20 +115,22 @@ struct tipc_sock {
109 struct rhash_head node; 115 struct rhash_head node;
110 struct tipc_mc_method mc_method; 116 struct tipc_mc_method mc_method;
111 struct rcu_head rcu; 117 struct rcu_head rcu;
118 struct tipc_group *group;
112}; 119};
113 120
114static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb); 121static int tipc_sk_backlog_rcv(struct sock *sk, struct sk_buff *skb);
115static void tipc_data_ready(struct sock *sk); 122static void tipc_data_ready(struct sock *sk);
116static void tipc_write_space(struct sock *sk); 123static void tipc_write_space(struct sock *sk);
117static void tipc_sock_destruct(struct sock *sk); 124static void tipc_sock_destruct(struct sock *sk);
118static int tipc_release(struct socket *sock); 125static int tipc_release(struct socket *sock);
119static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags, 126static int tipc_accept(struct socket *sock, struct socket *new_sock, int flags,
120 bool kern); 127 bool kern);
121static void tipc_sk_timeout(unsigned long data); 128static void tipc_sk_timeout(struct timer_list *t);
122static int tipc_sk_publish(struct tipc_sock *tsk, uint scope, 129static int tipc_sk_publish(struct tipc_sock *tsk, uint scope,
123 struct tipc_name_seq const *seq); 130 struct tipc_name_seq const *seq);
124static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope, 131static int tipc_sk_withdraw(struct tipc_sock *tsk, uint scope,
125 struct tipc_name_seq const *seq); 132 struct tipc_name_seq const *seq);
133static int tipc_sk_leave(struct tipc_sock *tsk);
126static struct tipc_sock *tipc_sk_lookup(struct net *net, u32 portid); 134static struct tipc_sock *tipc_sk_lookup(struct net *net, u32 portid);
127static int tipc_sk_insert(struct tipc_sock *tsk); 135static int tipc_sk_insert(struct tipc_sock *tsk);
128static void tipc_sk_remove(struct tipc_sock *tsk); 136static void tipc_sk_remove(struct tipc_sock *tsk);
@@ -193,6 +201,11 @@ static bool tsk_conn_cong(struct tipc_sock *tsk)
193 return tsk->snt_unacked > tsk->snd_win; 201 return tsk->snt_unacked > tsk->snd_win;
194} 202}
195 203
204static u16 tsk_blocks(int len)
205{
206 return ((len / FLOWCTL_BLK_SZ) + 1);
207}
208
196/* tsk_blocks(): translate a buffer size in bytes to number of 209/* tsk_blocks(): translate a buffer size in bytes to number of
197 * advertisable blocks, taking into account the ratio truesize(len)/len 210 * advertisable blocks, taking into account the ratio truesize(len)/len
198 * We can trust that this ratio is always < 4 for len >= FLOWCTL_BLK_SZ 211 * We can trust that this ratio is always < 4 for len >= FLOWCTL_BLK_SZ
@@ -451,9 +464,9 @@ static int tipc_sk_create(struct net *net, struct socket *sock,
451 NAMED_H_SIZE, 0); 464 NAMED_H_SIZE, 0);
452 465
453 msg_set_origport(msg, tsk->portid); 466 msg_set_origport(msg, tsk->portid);
454 setup_timer(&sk->sk_timer, tipc_sk_timeout, (unsigned long)tsk); 467 timer_setup(&sk->sk_timer, tipc_sk_timeout, 0);
455 sk->sk_shutdown = 0; 468 sk->sk_shutdown = 0;
456 sk->sk_backlog_rcv = tipc_backlog_rcv; 469 sk->sk_backlog_rcv = tipc_sk_backlog_rcv;
457 sk->sk_rcvbuf = sysctl_tipc_rmem[1]; 470 sk->sk_rcvbuf = sysctl_tipc_rmem[1];
458 sk->sk_data_ready = tipc_data_ready; 471 sk->sk_data_ready = tipc_data_ready;
459 sk->sk_write_space = tipc_write_space; 472 sk->sk_write_space = tipc_write_space;
@@ -559,13 +572,14 @@ static int tipc_release(struct socket *sock)
559 572
560 __tipc_shutdown(sock, TIPC_ERR_NO_PORT); 573 __tipc_shutdown(sock, TIPC_ERR_NO_PORT);
561 sk->sk_shutdown = SHUTDOWN_MASK; 574 sk->sk_shutdown = SHUTDOWN_MASK;
575 tipc_sk_leave(tsk);
562 tipc_sk_withdraw(tsk, 0, NULL); 576 tipc_sk_withdraw(tsk, 0, NULL);
563 sk_stop_timer(sk, &sk->sk_timer); 577 sk_stop_timer(sk, &sk->sk_timer);
564 tipc_sk_remove(tsk); 578 tipc_sk_remove(tsk);
565 579
566 /* Reject any messages that accumulated in backlog queue */ 580 /* Reject any messages that accumulated in backlog queue */
567 release_sock(sk); 581 release_sock(sk);
568 u32_list_purge(&tsk->cong_links); 582 tipc_dest_list_purge(&tsk->cong_links);
569 tsk->cong_link_cnt = 0; 583 tsk->cong_link_cnt = 0;
570 call_rcu(&tsk->rcu, tipc_sk_callback); 584 call_rcu(&tsk->rcu, tipc_sk_callback);
571 sock->sk = NULL; 585 sock->sk = NULL;
@@ -601,7 +615,10 @@ static int tipc_bind(struct socket *sock, struct sockaddr *uaddr,
601 res = tipc_sk_withdraw(tsk, 0, NULL); 615 res = tipc_sk_withdraw(tsk, 0, NULL);
602 goto exit; 616 goto exit;
603 } 617 }
604 618 if (tsk->group) {
619 res = -EACCES;
620 goto exit;
621 }
605 if (uaddr_len < sizeof(struct sockaddr_tipc)) { 622 if (uaddr_len < sizeof(struct sockaddr_tipc)) {
606 res = -EINVAL; 623 res = -EINVAL;
607 goto exit; 624 goto exit;
@@ -698,38 +715,41 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock,
698{ 715{
699 struct sock *sk = sock->sk; 716 struct sock *sk = sock->sk;
700 struct tipc_sock *tsk = tipc_sk(sk); 717 struct tipc_sock *tsk = tipc_sk(sk);
701 u32 mask = 0; 718 struct tipc_group *grp = tsk->group;
719 u32 revents = 0;
702 720
703 sock_poll_wait(file, sk_sleep(sk), wait); 721 sock_poll_wait(file, sk_sleep(sk), wait);
704 722
705 if (sk->sk_shutdown & RCV_SHUTDOWN) 723 if (sk->sk_shutdown & RCV_SHUTDOWN)
706 mask |= POLLRDHUP | POLLIN | POLLRDNORM; 724 revents |= POLLRDHUP | POLLIN | POLLRDNORM;
707 if (sk->sk_shutdown == SHUTDOWN_MASK) 725 if (sk->sk_shutdown == SHUTDOWN_MASK)
708 mask |= POLLHUP; 726 revents |= POLLHUP;
709 727
710 switch (sk->sk_state) { 728 switch (sk->sk_state) {
711 case TIPC_ESTABLISHED: 729 case TIPC_ESTABLISHED:
712 if (!tsk->cong_link_cnt && !tsk_conn_cong(tsk)) 730 if (!tsk->cong_link_cnt && !tsk_conn_cong(tsk))
713 mask |= POLLOUT; 731 revents |= POLLOUT;
714 /* fall thru' */ 732 /* fall thru' */
715 case TIPC_LISTEN: 733 case TIPC_LISTEN:
716 case TIPC_CONNECTING: 734 case TIPC_CONNECTING:
717 if (!skb_queue_empty(&sk->sk_receive_queue)) 735 if (!skb_queue_empty(&sk->sk_receive_queue))
718 mask |= (POLLIN | POLLRDNORM); 736 revents |= POLLIN | POLLRDNORM;
719 break; 737 break;
720 case TIPC_OPEN: 738 case TIPC_OPEN:
721 if (!tsk->cong_link_cnt) 739 if (!grp || tipc_group_size(grp))
722 mask |= POLLOUT; 740 if (!tsk->cong_link_cnt)
723 if (tipc_sk_type_connectionless(sk) && 741 revents |= POLLOUT;
724 (!skb_queue_empty(&sk->sk_receive_queue))) 742 if (!tipc_sk_type_connectionless(sk))
725 mask |= (POLLIN | POLLRDNORM); 743 break;
744 if (skb_queue_empty(&sk->sk_receive_queue))
745 break;
746 revents |= POLLIN | POLLRDNORM;
726 break; 747 break;
727 case TIPC_DISCONNECTING: 748 case TIPC_DISCONNECTING:
728 mask = (POLLIN | POLLRDNORM | POLLHUP); 749 revents = POLLIN | POLLRDNORM | POLLHUP;
729 break; 750 break;
730 } 751 }
731 752 return revents;
732 return mask;
733} 753}
734 754
735/** 755/**
@@ -757,6 +777,9 @@ static int tipc_sendmcast(struct socket *sock, struct tipc_name_seq *seq,
757 struct tipc_nlist dsts; 777 struct tipc_nlist dsts;
758 int rc; 778 int rc;
759 779
780 if (tsk->group)
781 return -EACCES;
782
760 /* Block or return if any destination link is congested */ 783 /* Block or return if any destination link is congested */
761 rc = tipc_wait_for_cond(sock, &timeout, !tsk->cong_link_cnt); 784 rc = tipc_wait_for_cond(sock, &timeout, !tsk->cong_link_cnt);
762 if (unlikely(rc)) 785 if (unlikely(rc))
@@ -794,6 +817,296 @@ static int tipc_sendmcast(struct socket *sock, struct tipc_name_seq *seq,
794} 817}
795 818
796/** 819/**
820 * tipc_send_group_msg - send a message to a member in the group
821 * @net: network namespace
822 * @m: message to send
823 * @mb: group member
824 * @dnode: destination node
825 * @dport: destination port
826 * @dlen: total length of message data
827 */
828static int tipc_send_group_msg(struct net *net, struct tipc_sock *tsk,
829 struct msghdr *m, struct tipc_member *mb,
830 u32 dnode, u32 dport, int dlen)
831{
832 u16 bc_snd_nxt = tipc_group_bc_snd_nxt(tsk->group);
833 struct tipc_mc_method *method = &tsk->mc_method;
834 int blks = tsk_blocks(GROUP_H_SIZE + dlen);
835 struct tipc_msg *hdr = &tsk->phdr;
836 struct sk_buff_head pkts;
837 int mtu, rc;
838
839 /* Complete message header */
840 msg_set_type(hdr, TIPC_GRP_UCAST_MSG);
841 msg_set_hdr_sz(hdr, GROUP_H_SIZE);
842 msg_set_destport(hdr, dport);
843 msg_set_destnode(hdr, dnode);
844 msg_set_grp_bc_seqno(hdr, bc_snd_nxt);
845
846 /* Build message as chain of buffers */
847 skb_queue_head_init(&pkts);
848 mtu = tipc_node_get_mtu(net, dnode, tsk->portid);
849 rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts);
850 if (unlikely(rc != dlen))
851 return rc;
852
853 /* Send message */
854 rc = tipc_node_xmit(net, &pkts, dnode, tsk->portid);
855 if (unlikely(rc == -ELINKCONG)) {
856 tipc_dest_push(&tsk->cong_links, dnode, 0);
857 tsk->cong_link_cnt++;
858 }
859
860 /* Update send window */
861 tipc_group_update_member(mb, blks);
862
863 /* A broadcast sent within next EXPIRE period must follow same path */
864 method->rcast = true;
865 method->mandatory = true;
866 return dlen;
867}
868
869/**
870 * tipc_send_group_unicast - send message to a member in the group
871 * @sock: socket structure
872 * @m: message to send
873 * @dlen: total length of message data
874 * @timeout: timeout to wait for wakeup
875 *
876 * Called from function tipc_sendmsg(), which has done all sanity checks
877 * Returns the number of bytes sent on success, or errno
878 */
879static int tipc_send_group_unicast(struct socket *sock, struct msghdr *m,
880 int dlen, long timeout)
881{
882 struct sock *sk = sock->sk;
883 DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
884 int blks = tsk_blocks(GROUP_H_SIZE + dlen);
885 struct tipc_sock *tsk = tipc_sk(sk);
886 struct tipc_group *grp = tsk->group;
887 struct net *net = sock_net(sk);
888 struct tipc_member *mb = NULL;
889 u32 node, port;
890 int rc;
891
892 node = dest->addr.id.node;
893 port = dest->addr.id.ref;
894 if (!port && !node)
895 return -EHOSTUNREACH;
896
897 /* Block or return if destination link or member is congested */
898 rc = tipc_wait_for_cond(sock, &timeout,
899 !tipc_dest_find(&tsk->cong_links, node, 0) &&
900 !tipc_group_cong(grp, node, port, blks, &mb));
901 if (unlikely(rc))
902 return rc;
903
904 if (unlikely(!mb))
905 return -EHOSTUNREACH;
906
907 rc = tipc_send_group_msg(net, tsk, m, mb, node, port, dlen);
908
909 return rc ? rc : dlen;
910}
911
912/**
913 * tipc_send_group_anycast - send message to any member with given identity
914 * @sock: socket structure
915 * @m: message to send
916 * @dlen: total length of message data
917 * @timeout: timeout to wait for wakeup
918 *
919 * Called from function tipc_sendmsg(), which has done all sanity checks
920 * Returns the number of bytes sent on success, or errno
921 */
922static int tipc_send_group_anycast(struct socket *sock, struct msghdr *m,
923 int dlen, long timeout)
924{
925 DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
926 struct sock *sk = sock->sk;
927 struct tipc_sock *tsk = tipc_sk(sk);
928 struct list_head *cong_links = &tsk->cong_links;
929 int blks = tsk_blocks(GROUP_H_SIZE + dlen);
930 struct tipc_group *grp = tsk->group;
931 struct tipc_member *first = NULL;
932 struct tipc_member *mbr = NULL;
933 struct net *net = sock_net(sk);
934 u32 node, port, exclude;
935 u32 type, inst, domain;
936 struct list_head dsts;
937 int lookups = 0;
938 int dstcnt, rc;
939 bool cong;
940
941 INIT_LIST_HEAD(&dsts);
942
943 type = dest->addr.name.name.type;
944 inst = dest->addr.name.name.instance;
945 domain = addr_domain(net, dest->scope);
946 exclude = tipc_group_exclude(grp);
947
948 while (++lookups < 4) {
949 first = NULL;
950
951 /* Look for a non-congested destination member, if any */
952 while (1) {
953 if (!tipc_nametbl_lookup(net, type, inst, domain, &dsts,
954 &dstcnt, exclude, false))
955 return -EHOSTUNREACH;
956 tipc_dest_pop(&dsts, &node, &port);
957 cong = tipc_group_cong(grp, node, port, blks, &mbr);
958 if (!cong)
959 break;
960 if (mbr == first)
961 break;
962 if (!first)
963 first = mbr;
964 }
965
966 /* Start over if destination was not in member list */
967 if (unlikely(!mbr))
968 continue;
969
970 if (likely(!cong && !tipc_dest_find(cong_links, node, 0)))
971 break;
972
973 /* Block or return if destination link or member is congested */
974 rc = tipc_wait_for_cond(sock, &timeout,
975 !tipc_dest_find(cong_links, node, 0) &&
976 !tipc_group_cong(grp, node, port,
977 blks, &mbr));
978 if (unlikely(rc))
979 return rc;
980
981 /* Send, unless destination disappeared while waiting */
982 if (likely(mbr))
983 break;
984 }
985
986 if (unlikely(lookups >= 4))
987 return -EHOSTUNREACH;
988
989 rc = tipc_send_group_msg(net, tsk, m, mbr, node, port, dlen);
990
991 return rc ? rc : dlen;
992}
993
994/**
995 * tipc_send_group_bcast - send message to all members in communication group
996 * @sk: socket structure
997 * @m: message to send
998 * @dlen: total length of message data
999 * @timeout: timeout to wait for wakeup
1000 *
1001 * Called from function tipc_sendmsg(), which has done all sanity checks
1002 * Returns the number of bytes sent on success, or errno
1003 */
1004static int tipc_send_group_bcast(struct socket *sock, struct msghdr *m,
1005 int dlen, long timeout)
1006{
1007 DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
1008 struct sock *sk = sock->sk;
1009 struct net *net = sock_net(sk);
1010 struct tipc_sock *tsk = tipc_sk(sk);
1011 struct tipc_group *grp = tsk->group;
1012 struct tipc_nlist *dsts = tipc_group_dests(grp);
1013 struct tipc_mc_method *method = &tsk->mc_method;
1014 bool ack = method->mandatory && method->rcast;
1015 int blks = tsk_blocks(MCAST_H_SIZE + dlen);
1016 struct tipc_msg *hdr = &tsk->phdr;
1017 int mtu = tipc_bcast_get_mtu(net);
1018 struct sk_buff_head pkts;
1019 int rc = -EHOSTUNREACH;
1020
1021 if (!dsts->local && !dsts->remote)
1022 return -EHOSTUNREACH;
1023
1024 /* Block or return if any destination link or member is congested */
1025 rc = tipc_wait_for_cond(sock, &timeout, !tsk->cong_link_cnt &&
1026 !tipc_group_bc_cong(grp, blks));
1027 if (unlikely(rc))
1028 return rc;
1029
1030 /* Complete message header */
1031 if (dest) {
1032 msg_set_type(hdr, TIPC_GRP_MCAST_MSG);
1033 msg_set_nameinst(hdr, dest->addr.name.name.instance);
1034 } else {
1035 msg_set_type(hdr, TIPC_GRP_BCAST_MSG);
1036 msg_set_nameinst(hdr, 0);
1037 }
1038 msg_set_hdr_sz(hdr, GROUP_H_SIZE);
1039 msg_set_destport(hdr, 0);
1040 msg_set_destnode(hdr, 0);
1041 msg_set_grp_bc_seqno(hdr, tipc_group_bc_snd_nxt(grp));
1042
1043 /* Avoid getting stuck with repeated forced replicasts */
1044 msg_set_grp_bc_ack_req(hdr, ack);
1045
1046 /* Build message as chain of buffers */
1047 skb_queue_head_init(&pkts);
1048 rc = tipc_msg_build(hdr, m, 0, dlen, mtu, &pkts);
1049 if (unlikely(rc != dlen))
1050 return rc;
1051
1052 /* Send message */
1053 rc = tipc_mcast_xmit(net, &pkts, method, dsts, &tsk->cong_link_cnt);
1054 if (unlikely(rc))
1055 return rc;
1056
1057 /* Update broadcast sequence number and send windows */
1058 tipc_group_update_bc_members(tsk->group, blks, ack);
1059
1060 /* Broadcast link is now free to choose method for next broadcast */
1061 method->mandatory = false;
1062 method->expires = jiffies;
1063
1064 return dlen;
1065}
1066
1067/**
1068 * tipc_send_group_mcast - send message to all members with given identity
1069 * @sock: socket structure
1070 * @m: message to send
1071 * @dlen: total length of message data
1072 * @timeout: timeout to wait for wakeup
1073 *
1074 * Called from function tipc_sendmsg(), which has done all sanity checks
1075 * Returns the number of bytes sent on success, or errno
1076 */
1077static int tipc_send_group_mcast(struct socket *sock, struct msghdr *m,
1078 int dlen, long timeout)
1079{
1080 struct sock *sk = sock->sk;
1081 DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
1082 struct tipc_name_seq *seq = &dest->addr.nameseq;
1083 struct tipc_sock *tsk = tipc_sk(sk);
1084 struct tipc_group *grp = tsk->group;
1085 struct net *net = sock_net(sk);
1086 u32 domain, exclude, dstcnt;
1087 struct list_head dsts;
1088
1089 INIT_LIST_HEAD(&dsts);
1090
1091 if (seq->lower != seq->upper)
1092 return -ENOTSUPP;
1093
1094 domain = addr_domain(net, dest->scope);
1095 exclude = tipc_group_exclude(grp);
1096 if (!tipc_nametbl_lookup(net, seq->type, seq->lower, domain,
1097 &dsts, &dstcnt, exclude, true))
1098 return -EHOSTUNREACH;
1099
1100 if (dstcnt == 1) {
1101 tipc_dest_pop(&dsts, &dest->addr.id.node, &dest->addr.id.ref);
1102 return tipc_send_group_unicast(sock, m, dlen, timeout);
1103 }
1104
1105 tipc_dest_list_purge(&dsts);
1106 return tipc_send_group_bcast(sock, m, dlen, timeout);
1107}
1108
1109/**
797 * tipc_sk_mcast_rcv - Deliver multicast messages to all destination sockets 1110 * tipc_sk_mcast_rcv - Deliver multicast messages to all destination sockets
798 * @arrvq: queue with arriving messages, to be cloned after destination lookup 1111 * @arrvq: queue with arriving messages, to be cloned after destination lookup
799 * @inputq: queue with cloned messages, delivered to socket after dest lookup 1112 * @inputq: queue with cloned messages, delivered to socket after dest lookup
@@ -803,13 +1116,15 @@ static int tipc_sendmcast(struct socket *sock, struct tipc_name_seq *seq,
803void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq, 1116void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
804 struct sk_buff_head *inputq) 1117 struct sk_buff_head *inputq)
805{ 1118{
806 struct tipc_msg *msg;
807 struct list_head dports;
808 u32 portid;
809 u32 scope = TIPC_CLUSTER_SCOPE; 1119 u32 scope = TIPC_CLUSTER_SCOPE;
810 struct sk_buff_head tmpq; 1120 u32 self = tipc_own_addr(net);
811 uint hsz;
812 struct sk_buff *skb, *_skb; 1121 struct sk_buff *skb, *_skb;
1122 u32 lower = 0, upper = ~0;
1123 struct sk_buff_head tmpq;
1124 u32 portid, oport, onode;
1125 struct list_head dports;
1126 struct tipc_msg *msg;
1127 int user, mtyp, hsz;
813 1128
814 __skb_queue_head_init(&tmpq); 1129 __skb_queue_head_init(&tmpq);
815 INIT_LIST_HEAD(&dports); 1130 INIT_LIST_HEAD(&dports);
@@ -817,17 +1132,32 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
817 skb = tipc_skb_peek(arrvq, &inputq->lock); 1132 skb = tipc_skb_peek(arrvq, &inputq->lock);
818 for (; skb; skb = tipc_skb_peek(arrvq, &inputq->lock)) { 1133 for (; skb; skb = tipc_skb_peek(arrvq, &inputq->lock)) {
819 msg = buf_msg(skb); 1134 msg = buf_msg(skb);
1135 user = msg_user(msg);
1136 mtyp = msg_type(msg);
1137 if (mtyp == TIPC_GRP_UCAST_MSG || user == GROUP_PROTOCOL) {
1138 spin_lock_bh(&inputq->lock);
1139 if (skb_peek(arrvq) == skb) {
1140 __skb_dequeue(arrvq);
1141 __skb_queue_tail(inputq, skb);
1142 }
1143 refcount_dec(&skb->users);
1144 spin_unlock_bh(&inputq->lock);
1145 continue;
1146 }
820 hsz = skb_headroom(skb) + msg_hdr_sz(msg); 1147 hsz = skb_headroom(skb) + msg_hdr_sz(msg);
821 1148 oport = msg_origport(msg);
822 if (in_own_node(net, msg_orignode(msg))) 1149 onode = msg_orignode(msg);
1150 if (onode == self)
823 scope = TIPC_NODE_SCOPE; 1151 scope = TIPC_NODE_SCOPE;
824 1152
825 /* Create destination port list and message clones: */ 1153 /* Create destination port list and message clones: */
826 tipc_nametbl_mc_translate(net, 1154 if (!msg_in_group(msg)) {
827 msg_nametype(msg), msg_namelower(msg), 1155 lower = msg_namelower(msg);
828 msg_nameupper(msg), scope, &dports); 1156 upper = msg_nameupper(msg);
829 portid = u32_pop(&dports); 1157 }
830 for (; portid; portid = u32_pop(&dports)) { 1158 tipc_nametbl_mc_translate(net, msg_nametype(msg), lower, upper,
1159 scope, &dports);
1160 while (tipc_dest_pop(&dports, NULL, &portid)) {
831 _skb = __pskb_copy(skb, hsz, GFP_ATOMIC); 1161 _skb = __pskb_copy(skb, hsz, GFP_ATOMIC);
832 if (_skb) { 1162 if (_skb) {
833 msg_set_destport(buf_msg(_skb), portid); 1163 msg_set_destport(buf_msg(_skb), portid);
@@ -850,16 +1180,16 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
850} 1180}
851 1181
852/** 1182/**
853 * tipc_sk_proto_rcv - receive a connection mng protocol message 1183 * tipc_sk_conn_proto_rcv - receive a connection mng protocol message
854 * @tsk: receiving socket 1184 * @tsk: receiving socket
855 * @skb: pointer to message buffer. 1185 * @skb: pointer to message buffer.
856 */ 1186 */
857static void tipc_sk_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb, 1187static void tipc_sk_conn_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb,
858 struct sk_buff_head *xmitq) 1188 struct sk_buff_head *xmitq)
859{ 1189{
860 struct sock *sk = &tsk->sk;
861 u32 onode = tsk_own_node(tsk);
862 struct tipc_msg *hdr = buf_msg(skb); 1190 struct tipc_msg *hdr = buf_msg(skb);
1191 u32 onode = tsk_own_node(tsk);
1192 struct sock *sk = &tsk->sk;
863 int mtyp = msg_type(hdr); 1193 int mtyp = msg_type(hdr);
864 bool conn_cong; 1194 bool conn_cong;
865 1195
@@ -931,6 +1261,7 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen)
931 long timeout = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT); 1261 long timeout = sock_sndtimeo(sk, m->msg_flags & MSG_DONTWAIT);
932 struct list_head *clinks = &tsk->cong_links; 1262 struct list_head *clinks = &tsk->cong_links;
933 bool syn = !tipc_sk_type_connectionless(sk); 1263 bool syn = !tipc_sk_type_connectionless(sk);
1264 struct tipc_group *grp = tsk->group;
934 struct tipc_msg *hdr = &tsk->phdr; 1265 struct tipc_msg *hdr = &tsk->phdr;
935 struct tipc_name_seq *seq; 1266 struct tipc_name_seq *seq;
936 struct sk_buff_head pkts; 1267 struct sk_buff_head pkts;
@@ -941,18 +1272,31 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen)
941 if (unlikely(dlen > TIPC_MAX_USER_MSG_SIZE)) 1272 if (unlikely(dlen > TIPC_MAX_USER_MSG_SIZE))
942 return -EMSGSIZE; 1273 return -EMSGSIZE;
943 1274
1275 if (likely(dest)) {
1276 if (unlikely(m->msg_namelen < sizeof(*dest)))
1277 return -EINVAL;
1278 if (unlikely(dest->family != AF_TIPC))
1279 return -EINVAL;
1280 }
1281
1282 if (grp) {
1283 if (!dest)
1284 return tipc_send_group_bcast(sock, m, dlen, timeout);
1285 if (dest->addrtype == TIPC_ADDR_NAME)
1286 return tipc_send_group_anycast(sock, m, dlen, timeout);
1287 if (dest->addrtype == TIPC_ADDR_ID)
1288 return tipc_send_group_unicast(sock, m, dlen, timeout);
1289 if (dest->addrtype == TIPC_ADDR_MCAST)
1290 return tipc_send_group_mcast(sock, m, dlen, timeout);
1291 return -EINVAL;
1292 }
1293
944 if (unlikely(!dest)) { 1294 if (unlikely(!dest)) {
945 dest = &tsk->peer; 1295 dest = &tsk->peer;
946 if (!syn || dest->family != AF_TIPC) 1296 if (!syn || dest->family != AF_TIPC)
947 return -EDESTADDRREQ; 1297 return -EDESTADDRREQ;
948 } 1298 }
949 1299
950 if (unlikely(m->msg_namelen < sizeof(*dest)))
951 return -EINVAL;
952
953 if (unlikely(dest->family != AF_TIPC))
954 return -EINVAL;
955
956 if (unlikely(syn)) { 1300 if (unlikely(syn)) {
957 if (sk->sk_state == TIPC_LISTEN) 1301 if (sk->sk_state == TIPC_LISTEN)
958 return -EPIPE; 1302 return -EPIPE;
@@ -985,7 +1329,6 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen)
985 msg_set_destport(hdr, dport); 1329 msg_set_destport(hdr, dport);
986 if (unlikely(!dport && !dnode)) 1330 if (unlikely(!dport && !dnode))
987 return -EHOSTUNREACH; 1331 return -EHOSTUNREACH;
988
989 } else if (dest->addrtype == TIPC_ADDR_ID) { 1332 } else if (dest->addrtype == TIPC_ADDR_ID) {
990 dnode = dest->addr.id.node; 1333 dnode = dest->addr.id.node;
991 msg_set_type(hdr, TIPC_DIRECT_MSG); 1334 msg_set_type(hdr, TIPC_DIRECT_MSG);
@@ -996,7 +1339,8 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen)
996 } 1339 }
997 1340
998 /* Block or return if destination link is congested */ 1341 /* Block or return if destination link is congested */
999 rc = tipc_wait_for_cond(sock, &timeout, !u32_find(clinks, dnode)); 1342 rc = tipc_wait_for_cond(sock, &timeout,
1343 !tipc_dest_find(clinks, dnode, 0));
1000 if (unlikely(rc)) 1344 if (unlikely(rc))
1001 return rc; 1345 return rc;
1002 1346
@@ -1008,7 +1352,7 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen)
1008 1352
1009 rc = tipc_node_xmit(net, &pkts, dnode, tsk->portid); 1353 rc = tipc_node_xmit(net, &pkts, dnode, tsk->portid);
1010 if (unlikely(rc == -ELINKCONG)) { 1354 if (unlikely(rc == -ELINKCONG)) {
1011 u32_push(clinks, dnode); 1355 tipc_dest_push(clinks, dnode, 0);
1012 tsk->cong_link_cnt++; 1356 tsk->cong_link_cnt++;
1013 rc = 0; 1357 rc = 0;
1014 } 1358 }
@@ -1128,7 +1472,7 @@ static void tipc_sk_finish_conn(struct tipc_sock *tsk, u32 peer_port,
1128 msg_set_lookup_scope(msg, 0); 1472 msg_set_lookup_scope(msg, 0);
1129 msg_set_hdr_sz(msg, SHORT_H_SIZE); 1473 msg_set_hdr_sz(msg, SHORT_H_SIZE);
1130 1474
1131 sk_reset_timer(sk, &sk->sk_timer, jiffies + CONN_PROBING_INTERVAL); 1475 sk_reset_timer(sk, &sk->sk_timer, jiffies + CONN_PROBING_INTV);
1132 tipc_set_sk_state(sk, TIPC_ESTABLISHED); 1476 tipc_set_sk_state(sk, TIPC_ESTABLISHED);
1133 tipc_node_add_conn(net, peer_node, tsk->portid, peer_port); 1477 tipc_node_add_conn(net, peer_node, tsk->portid, peer_port);
1134 tsk->max_pkt = tipc_node_get_mtu(net, peer_node, tsk->portid); 1478 tsk->max_pkt = tipc_node_get_mtu(net, peer_node, tsk->portid);
@@ -1142,26 +1486,38 @@ static void tipc_sk_finish_conn(struct tipc_sock *tsk, u32 peer_port,
1142} 1486}
1143 1487
1144/** 1488/**
1145 * set_orig_addr - capture sender's address for received message 1489 * tipc_sk_set_orig_addr - capture sender's address for received message
1146 * @m: descriptor for message info 1490 * @m: descriptor for message info
1147 * @msg: received message header 1491 * @hdr: received message header
1148 * 1492 *
1149 * Note: Address is not captured if not requested by receiver. 1493 * Note: Address is not captured if not requested by receiver.
1150 */ 1494 */
1151static void set_orig_addr(struct msghdr *m, struct tipc_msg *msg) 1495static void tipc_sk_set_orig_addr(struct msghdr *m, struct sk_buff *skb)
1152{ 1496{
1153 DECLARE_SOCKADDR(struct sockaddr_tipc *, addr, m->msg_name); 1497 DECLARE_SOCKADDR(struct sockaddr_pair *, srcaddr, m->msg_name);
1498 struct tipc_msg *hdr = buf_msg(skb);
1154 1499
1155 if (addr) { 1500 if (!srcaddr)
1156 addr->family = AF_TIPC; 1501 return;
1157 addr->addrtype = TIPC_ADDR_ID; 1502
1158 memset(&addr->addr, 0, sizeof(addr->addr)); 1503 srcaddr->sock.family = AF_TIPC;
1159 addr->addr.id.ref = msg_origport(msg); 1504 srcaddr->sock.addrtype = TIPC_ADDR_ID;
1160 addr->addr.id.node = msg_orignode(msg); 1505 srcaddr->sock.addr.id.ref = msg_origport(hdr);
1161 addr->addr.name.domain = 0; /* could leave uninitialized */ 1506 srcaddr->sock.addr.id.node = msg_orignode(hdr);
1162 addr->scope = 0; /* could leave uninitialized */ 1507 srcaddr->sock.addr.name.domain = 0;
1163 m->msg_namelen = sizeof(struct sockaddr_tipc); 1508 srcaddr->sock.scope = 0;
1164 } 1509 m->msg_namelen = sizeof(struct sockaddr_tipc);
1510
1511 if (!msg_in_group(hdr))
1512 return;
1513
1514 /* Group message users may also want to know sending member's id */
1515 srcaddr->member.family = AF_TIPC;
1516 srcaddr->member.addrtype = TIPC_ADDR_NAME;
1517 srcaddr->member.addr.name.name.type = msg_nametype(hdr);
1518 srcaddr->member.addr.name.name.instance = TIPC_SKB_CB(skb)->orig_member;
1519 srcaddr->member.addr.name.domain = 0;
1520 m->msg_namelen = sizeof(*srcaddr);
1165} 1521}
1166 1522
1167/** 1523/**
@@ -1318,11 +1674,13 @@ static int tipc_recvmsg(struct socket *sock, struct msghdr *m,
1318 size_t buflen, int flags) 1674 size_t buflen, int flags)
1319{ 1675{
1320 struct sock *sk = sock->sk; 1676 struct sock *sk = sock->sk;
1321 struct tipc_sock *tsk = tipc_sk(sk);
1322 struct sk_buff *skb;
1323 struct tipc_msg *hdr;
1324 bool connected = !tipc_sk_type_connectionless(sk); 1677 bool connected = !tipc_sk_type_connectionless(sk);
1678 struct tipc_sock *tsk = tipc_sk(sk);
1325 int rc, err, hlen, dlen, copy; 1679 int rc, err, hlen, dlen, copy;
1680 struct sk_buff_head xmitq;
1681 struct tipc_msg *hdr;
1682 struct sk_buff *skb;
1683 bool grp_evt;
1326 long timeout; 1684 long timeout;
1327 1685
1328 /* Catch invalid receive requests */ 1686 /* Catch invalid receive requests */
@@ -1336,8 +1694,8 @@ static int tipc_recvmsg(struct socket *sock, struct msghdr *m,
1336 } 1694 }
1337 timeout = sock_rcvtimeo(sk, flags & MSG_DONTWAIT); 1695 timeout = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
1338 1696
1697 /* Step rcv queue to first msg with data or error; wait if necessary */
1339 do { 1698 do {
1340 /* Look at first msg in receive queue; wait if necessary */
1341 rc = tipc_wait_for_rcvmsg(sock, &timeout); 1699 rc = tipc_wait_for_rcvmsg(sock, &timeout);
1342 if (unlikely(rc)) 1700 if (unlikely(rc))
1343 goto exit; 1701 goto exit;
@@ -1346,13 +1704,14 @@ static int tipc_recvmsg(struct socket *sock, struct msghdr *m,
1346 dlen = msg_data_sz(hdr); 1704 dlen = msg_data_sz(hdr);
1347 hlen = msg_hdr_sz(hdr); 1705 hlen = msg_hdr_sz(hdr);
1348 err = msg_errcode(hdr); 1706 err = msg_errcode(hdr);
1707 grp_evt = msg_is_grp_evt(hdr);
1349 if (likely(dlen || err)) 1708 if (likely(dlen || err))
1350 break; 1709 break;
1351 tsk_advance_rx_queue(sk); 1710 tsk_advance_rx_queue(sk);
1352 } while (1); 1711 } while (1);
1353 1712
1354 /* Collect msg meta data, including error code and rejected data */ 1713 /* Collect msg meta data, including error code and rejected data */
1355 set_orig_addr(m, hdr); 1714 tipc_sk_set_orig_addr(m, skb);
1356 rc = tipc_sk_anc_data_recv(m, hdr, tsk); 1715 rc = tipc_sk_anc_data_recv(m, hdr, tsk);
1357 if (unlikely(rc)) 1716 if (unlikely(rc))
1358 goto exit; 1717 goto exit;
@@ -1372,15 +1731,33 @@ static int tipc_recvmsg(struct socket *sock, struct msghdr *m,
1372 if (unlikely(rc)) 1731 if (unlikely(rc))
1373 goto exit; 1732 goto exit;
1374 1733
1734 /* Mark message as group event if applicable */
1735 if (unlikely(grp_evt)) {
1736 if (msg_grp_evt(hdr) == TIPC_WITHDRAWN)
1737 m->msg_flags |= MSG_EOR;
1738 m->msg_flags |= MSG_OOB;
1739 copy = 0;
1740 }
1741
1375 /* Caption of data or error code/rejected data was successful */ 1742 /* Caption of data or error code/rejected data was successful */
1376 if (unlikely(flags & MSG_PEEK)) 1743 if (unlikely(flags & MSG_PEEK))
1377 goto exit; 1744 goto exit;
1378 1745
1746 /* Send group flow control advertisement when applicable */
1747 if (tsk->group && msg_in_group(hdr) && !grp_evt) {
1748 skb_queue_head_init(&xmitq);
1749 tipc_group_update_rcv_win(tsk->group, tsk_blocks(hlen + dlen),
1750 msg_orignode(hdr), msg_origport(hdr),
1751 &xmitq);
1752 tipc_node_distr_xmit(sock_net(sk), &xmitq);
1753 }
1754
1379 tsk_advance_rx_queue(sk); 1755 tsk_advance_rx_queue(sk);
1756
1380 if (likely(!connected)) 1757 if (likely(!connected))
1381 goto exit; 1758 goto exit;
1382 1759
1383 /* Send connection flow control ack when applicable */ 1760 /* Send connection flow control advertisement when applicable */
1384 tsk->rcv_unacked += tsk_inc(tsk, hlen + dlen); 1761 tsk->rcv_unacked += tsk_inc(tsk, hlen + dlen);
1385 if (tsk->rcv_unacked >= tsk->rcv_win / TIPC_ACK_RATE) 1762 if (tsk->rcv_unacked >= tsk->rcv_win / TIPC_ACK_RATE)
1386 tipc_sk_send_ack(tsk); 1763 tipc_sk_send_ack(tsk);
@@ -1446,7 +1823,7 @@ static int tipc_recvstream(struct socket *sock, struct msghdr *m,
1446 1823
1447 /* Collect msg meta data, incl. error code and rejected data */ 1824 /* Collect msg meta data, incl. error code and rejected data */
1448 if (!copied) { 1825 if (!copied) {
1449 set_orig_addr(m, hdr); 1826 tipc_sk_set_orig_addr(m, skb);
1450 rc = tipc_sk_anc_data_recv(m, hdr, tsk); 1827 rc = tipc_sk_anc_data_recv(m, hdr, tsk);
1451 if (rc) 1828 if (rc)
1452 break; 1829 break;
@@ -1532,14 +1909,51 @@ static void tipc_sock_destruct(struct sock *sk)
1532 __skb_queue_purge(&sk->sk_receive_queue); 1909 __skb_queue_purge(&sk->sk_receive_queue);
1533} 1910}
1534 1911
1912static void tipc_sk_proto_rcv(struct sock *sk,
1913 struct sk_buff_head *inputq,
1914 struct sk_buff_head *xmitq)
1915{
1916 struct sk_buff *skb = __skb_dequeue(inputq);
1917 struct tipc_sock *tsk = tipc_sk(sk);
1918 struct tipc_msg *hdr = buf_msg(skb);
1919 struct tipc_group *grp = tsk->group;
1920 bool wakeup = false;
1921
1922 switch (msg_user(hdr)) {
1923 case CONN_MANAGER:
1924 tipc_sk_conn_proto_rcv(tsk, skb, xmitq);
1925 return;
1926 case SOCK_WAKEUP:
1927 tipc_dest_del(&tsk->cong_links, msg_orignode(hdr), 0);
1928 tsk->cong_link_cnt--;
1929 wakeup = true;
1930 break;
1931 case GROUP_PROTOCOL:
1932 tipc_group_proto_rcv(grp, &wakeup, hdr, inputq, xmitq);
1933 break;
1934 case TOP_SRV:
1935 tipc_group_member_evt(tsk->group, &wakeup, &sk->sk_rcvbuf,
1936 skb, inputq, xmitq);
1937 skb = NULL;
1938 break;
1939 default:
1940 break;
1941 }
1942
1943 if (wakeup)
1944 sk->sk_write_space(sk);
1945
1946 kfree_skb(skb);
1947}
1948
1535/** 1949/**
1536 * filter_connect - Handle all incoming messages for a connection-based socket 1950 * tipc_filter_connect - Handle incoming message for a connection-based socket
1537 * @tsk: TIPC socket 1951 * @tsk: TIPC socket
1538 * @skb: pointer to message buffer. Set to NULL if buffer is consumed 1952 * @skb: pointer to message buffer. Set to NULL if buffer is consumed
1539 * 1953 *
1540 * Returns true if everything ok, false otherwise 1954 * Returns true if everything ok, false otherwise
1541 */ 1955 */
1542static bool filter_connect(struct tipc_sock *tsk, struct sk_buff *skb) 1956static bool tipc_sk_filter_connect(struct tipc_sock *tsk, struct sk_buff *skb)
1543{ 1957{
1544 struct sock *sk = &tsk->sk; 1958 struct sock *sk = &tsk->sk;
1545 struct net *net = sock_net(sk); 1959 struct net *net = sock_net(sk);
@@ -1643,6 +2057,9 @@ static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *skb)
1643 struct tipc_sock *tsk = tipc_sk(sk); 2057 struct tipc_sock *tsk = tipc_sk(sk);
1644 struct tipc_msg *hdr = buf_msg(skb); 2058 struct tipc_msg *hdr = buf_msg(skb);
1645 2059
2060 if (unlikely(msg_in_group(hdr)))
2061 return sk->sk_rcvbuf;
2062
1646 if (unlikely(!msg_connected(hdr))) 2063 if (unlikely(!msg_connected(hdr)))
1647 return sk->sk_rcvbuf << msg_importance(hdr); 2064 return sk->sk_rcvbuf << msg_importance(hdr);
1648 2065
@@ -1653,7 +2070,7 @@ static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *skb)
1653} 2070}
1654 2071
1655/** 2072/**
1656 * filter_rcv - validate incoming message 2073 * tipc_sk_filter_rcv - validate incoming message
1657 * @sk: socket 2074 * @sk: socket
1658 * @skb: pointer to message. 2075 * @skb: pointer to message.
1659 * 2076 *
@@ -1662,99 +2079,71 @@ static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *skb)
1662 * 2079 *
1663 * Called with socket lock already taken 2080 * Called with socket lock already taken
1664 * 2081 *
1665 * Returns true if message was added to socket receive queue, otherwise false
1666 */ 2082 */
1667static bool filter_rcv(struct sock *sk, struct sk_buff *skb, 2083static void tipc_sk_filter_rcv(struct sock *sk, struct sk_buff *skb,
1668 struct sk_buff_head *xmitq) 2084 struct sk_buff_head *xmitq)
1669{ 2085{
2086 bool sk_conn = !tipc_sk_type_connectionless(sk);
1670 struct tipc_sock *tsk = tipc_sk(sk); 2087 struct tipc_sock *tsk = tipc_sk(sk);
2088 struct tipc_group *grp = tsk->group;
1671 struct tipc_msg *hdr = buf_msg(skb); 2089 struct tipc_msg *hdr = buf_msg(skb);
1672 unsigned int limit = rcvbuf_limit(sk, skb); 2090 struct net *net = sock_net(sk);
1673 int err = TIPC_OK; 2091 struct sk_buff_head inputq;
1674 int usr = msg_user(hdr); 2092 int limit, err = TIPC_OK;
1675 u32 onode;
1676 2093
1677 if (unlikely(msg_user(hdr) == CONN_MANAGER)) { 2094 TIPC_SKB_CB(skb)->bytes_read = 0;
1678 tipc_sk_proto_rcv(tsk, skb, xmitq); 2095 __skb_queue_head_init(&inputq);
1679 return false; 2096 __skb_queue_tail(&inputq, skb);
1680 }
1681 2097
1682 if (unlikely(usr == SOCK_WAKEUP)) { 2098 if (unlikely(!msg_isdata(hdr)))
1683 onode = msg_orignode(hdr); 2099 tipc_sk_proto_rcv(sk, &inputq, xmitq);
1684 kfree_skb(skb);
1685 u32_del(&tsk->cong_links, onode);
1686 tsk->cong_link_cnt--;
1687 sk->sk_write_space(sk);
1688 return false;
1689 }
1690 2100
1691 /* Drop if illegal message type */ 2101 if (unlikely(grp))
1692 if (unlikely(msg_type(hdr) > TIPC_DIRECT_MSG)) { 2102 tipc_group_filter_msg(grp, &inputq, xmitq);
1693 kfree_skb(skb);
1694 return false;
1695 }
1696 2103
1697 /* Reject if wrong message type for current socket state */ 2104 /* Validate and add to receive buffer if there is space */
1698 if (tipc_sk_type_connectionless(sk)) { 2105 while ((skb = __skb_dequeue(&inputq))) {
1699 if (msg_connected(hdr)) { 2106 hdr = buf_msg(skb);
2107 limit = rcvbuf_limit(sk, skb);
2108 if ((sk_conn && !tipc_sk_filter_connect(tsk, skb)) ||
2109 (!sk_conn && msg_connected(hdr)) ||
2110 (!grp && msg_in_group(hdr)))
1700 err = TIPC_ERR_NO_PORT; 2111 err = TIPC_ERR_NO_PORT;
1701 goto reject; 2112 else if (sk_rmem_alloc_get(sk) + skb->truesize >= limit)
1702 } 2113 err = TIPC_ERR_OVERLOAD;
1703 } else if (unlikely(!filter_connect(tsk, skb))) {
1704 err = TIPC_ERR_NO_PORT;
1705 goto reject;
1706 }
1707 2114
1708 /* Reject message if there isn't room to queue it */ 2115 if (unlikely(err)) {
1709 if (unlikely(sk_rmem_alloc_get(sk) + skb->truesize >= limit)) { 2116 tipc_skb_reject(net, err, skb, xmitq);
1710 err = TIPC_ERR_OVERLOAD; 2117 err = TIPC_OK;
1711 goto reject; 2118 continue;
2119 }
2120 __skb_queue_tail(&sk->sk_receive_queue, skb);
2121 skb_set_owner_r(skb, sk);
2122 sk->sk_data_ready(sk);
1712 } 2123 }
1713
1714 /* Enqueue message */
1715 TIPC_SKB_CB(skb)->bytes_read = 0;
1716 __skb_queue_tail(&sk->sk_receive_queue, skb);
1717 skb_set_owner_r(skb, sk);
1718
1719 sk->sk_data_ready(sk);
1720 return true;
1721
1722reject:
1723 if (tipc_msg_reverse(tsk_own_node(tsk), &skb, err))
1724 __skb_queue_tail(xmitq, skb);
1725 return false;
1726} 2124}
1727 2125
1728/** 2126/**
1729 * tipc_backlog_rcv - handle incoming message from backlog queue 2127 * tipc_sk_backlog_rcv - handle incoming message from backlog queue
1730 * @sk: socket 2128 * @sk: socket
1731 * @skb: message 2129 * @skb: message
1732 * 2130 *
1733 * Caller must hold socket lock 2131 * Caller must hold socket lock
1734 *
1735 * Returns 0
1736 */ 2132 */
1737static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb) 2133static int tipc_sk_backlog_rcv(struct sock *sk, struct sk_buff *skb)
1738{ 2134{
1739 unsigned int truesize = skb->truesize; 2135 unsigned int before = sk_rmem_alloc_get(sk);
1740 struct sk_buff_head xmitq; 2136 struct sk_buff_head xmitq;
1741 u32 dnode, selector; 2137 unsigned int added;
1742 2138
1743 __skb_queue_head_init(&xmitq); 2139 __skb_queue_head_init(&xmitq);
1744 2140
1745 if (likely(filter_rcv(sk, skb, &xmitq))) { 2141 tipc_sk_filter_rcv(sk, skb, &xmitq);
1746 atomic_add(truesize, &tipc_sk(sk)->dupl_rcvcnt); 2142 added = sk_rmem_alloc_get(sk) - before;
1747 return 0; 2143 atomic_add(added, &tipc_sk(sk)->dupl_rcvcnt);
1748 }
1749 2144
1750 if (skb_queue_empty(&xmitq)) 2145 /* Send pending response/rejected messages, if any */
1751 return 0; 2146 tipc_node_distr_xmit(sock_net(sk), &xmitq);
1752
1753 /* Send response/rejected message */
1754 skb = __skb_dequeue(&xmitq);
1755 dnode = msg_destnode(buf_msg(skb));
1756 selector = msg_origport(buf_msg(skb));
1757 tipc_node_xmit_skb(sock_net(sk), skb, dnode, selector);
1758 return 0; 2147 return 0;
1759} 2148}
1760 2149
@@ -1786,7 +2175,7 @@ static void tipc_sk_enqueue(struct sk_buff_head *inputq, struct sock *sk,
1786 2175
1787 /* Add message directly to receive queue if possible */ 2176 /* Add message directly to receive queue if possible */
1788 if (!sock_owned_by_user(sk)) { 2177 if (!sock_owned_by_user(sk)) {
1789 filter_rcv(sk, skb, xmitq); 2178 tipc_sk_filter_rcv(sk, skb, xmitq);
1790 continue; 2179 continue;
1791 } 2180 }
1792 2181
@@ -1833,14 +2222,10 @@ void tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq)
1833 spin_unlock_bh(&sk->sk_lock.slock); 2222 spin_unlock_bh(&sk->sk_lock.slock);
1834 } 2223 }
1835 /* Send pending response/rejected messages, if any */ 2224 /* Send pending response/rejected messages, if any */
1836 while ((skb = __skb_dequeue(&xmitq))) { 2225 tipc_node_distr_xmit(sock_net(sk), &xmitq);
1837 dnode = msg_destnode(buf_msg(skb));
1838 tipc_node_xmit_skb(net, skb, dnode, dport);
1839 }
1840 sock_put(sk); 2226 sock_put(sk);
1841 continue; 2227 continue;
1842 } 2228 }
1843
1844 /* No destination socket => dequeue skb if still there */ 2229 /* No destination socket => dequeue skb if still there */
1845 skb = tipc_skb_dequeue(inputq, dport); 2230 skb = tipc_skb_dequeue(inputq, dport);
1846 if (!skb) 2231 if (!skb)
@@ -1903,28 +2288,32 @@ static int tipc_connect(struct socket *sock, struct sockaddr *dest,
1903 int previous; 2288 int previous;
1904 int res = 0; 2289 int res = 0;
1905 2290
2291 if (destlen != sizeof(struct sockaddr_tipc))
2292 return -EINVAL;
2293
1906 lock_sock(sk); 2294 lock_sock(sk);
1907 2295
1908 /* DGRAM/RDM connect(), just save the destaddr */ 2296 if (tsk->group) {
1909 if (tipc_sk_type_connectionless(sk)) { 2297 res = -EINVAL;
1910 if (dst->family == AF_UNSPEC) {
1911 memset(&tsk->peer, 0, sizeof(struct sockaddr_tipc));
1912 } else if (destlen != sizeof(struct sockaddr_tipc)) {
1913 res = -EINVAL;
1914 } else {
1915 memcpy(&tsk->peer, dest, destlen);
1916 }
1917 goto exit; 2298 goto exit;
1918 } 2299 }
1919 2300
1920 /* 2301 if (dst->family == AF_UNSPEC) {
1921 * Reject connection attempt using multicast address 2302 memset(&tsk->peer, 0, sizeof(struct sockaddr_tipc));
1922 * 2303 if (!tipc_sk_type_connectionless(sk))
1923 * Note: send_msg() validates the rest of the address fields, 2304 res = -EINVAL;
1924 * so there's no need to do it here 2305 goto exit;
1925 */ 2306 } else if (dst->family != AF_TIPC) {
1926 if (dst->addrtype == TIPC_ADDR_MCAST) {
1927 res = -EINVAL; 2307 res = -EINVAL;
2308 }
2309 if (dst->addrtype != TIPC_ADDR_ID && dst->addrtype != TIPC_ADDR_NAME)
2310 res = -EINVAL;
2311 if (res)
2312 goto exit;
2313
2314 /* DGRAM/RDM connect(), just save the destaddr */
2315 if (tipc_sk_type_connectionless(sk)) {
2316 memcpy(&tsk->peer, dest, destlen);
1928 goto exit; 2317 goto exit;
1929 } 2318 }
1930 2319
@@ -2141,46 +2530,43 @@ static int tipc_shutdown(struct socket *sock, int how)
2141 return res; 2530 return res;
2142} 2531}
2143 2532
2144static void tipc_sk_timeout(unsigned long data) 2533static void tipc_sk_timeout(struct timer_list *t)
2145{ 2534{
2146 struct tipc_sock *tsk = (struct tipc_sock *)data; 2535 struct sock *sk = from_timer(sk, t, sk_timer);
2147 struct sock *sk = &tsk->sk; 2536 struct tipc_sock *tsk = tipc_sk(sk);
2148 struct sk_buff *skb = NULL; 2537 u32 peer_port = tsk_peer_port(tsk);
2149 u32 peer_port, peer_node; 2538 u32 peer_node = tsk_peer_node(tsk);
2150 u32 own_node = tsk_own_node(tsk); 2539 u32 own_node = tsk_own_node(tsk);
2540 u32 own_port = tsk->portid;
2541 struct net *net = sock_net(sk);
2542 struct sk_buff *skb = NULL;
2151 2543
2152 bh_lock_sock(sk); 2544 bh_lock_sock(sk);
2153 if (!tipc_sk_connected(sk)) { 2545 if (!tipc_sk_connected(sk))
2154 bh_unlock_sock(sk); 2546 goto exit;
2547
2548 /* Try again later if socket is busy */
2549 if (sock_owned_by_user(sk)) {
2550 sk_reset_timer(sk, &sk->sk_timer, jiffies + HZ / 20);
2155 goto exit; 2551 goto exit;
2156 } 2552 }
2157 peer_port = tsk_peer_port(tsk);
2158 peer_node = tsk_peer_node(tsk);
2159 2553
2160 if (tsk->probe_unacked) { 2554 if (tsk->probe_unacked) {
2161 if (!sock_owned_by_user(sk)) { 2555 tipc_set_sk_state(sk, TIPC_DISCONNECTING);
2162 tipc_set_sk_state(sk, TIPC_DISCONNECTING); 2556 tipc_node_remove_conn(net, peer_node, peer_port);
2163 tipc_node_remove_conn(sock_net(sk), tsk_peer_node(tsk), 2557 sk->sk_state_change(sk);
2164 tsk_peer_port(tsk));
2165 sk->sk_state_change(sk);
2166 } else {
2167 /* Try again later */
2168 sk_reset_timer(sk, &sk->sk_timer, (HZ / 20));
2169 }
2170
2171 bh_unlock_sock(sk);
2172 goto exit; 2558 goto exit;
2173 } 2559 }
2174 2560 /* Send new probe */
2175 skb = tipc_msg_create(CONN_MANAGER, CONN_PROBE, 2561 skb = tipc_msg_create(CONN_MANAGER, CONN_PROBE, INT_H_SIZE, 0,
2176 INT_H_SIZE, 0, peer_node, own_node, 2562 peer_node, own_node, peer_port, own_port,
2177 peer_port, tsk->portid, TIPC_OK); 2563 TIPC_OK);
2178 tsk->probe_unacked = true; 2564 tsk->probe_unacked = true;
2179 sk_reset_timer(sk, &sk->sk_timer, jiffies + CONN_PROBING_INTERVAL); 2565 sk_reset_timer(sk, &sk->sk_timer, jiffies + CONN_PROBING_INTV);
2566exit:
2180 bh_unlock_sock(sk); 2567 bh_unlock_sock(sk);
2181 if (skb) 2568 if (skb)
2182 tipc_node_xmit_skb(sock_net(sk), skb, peer_node, tsk->portid); 2569 tipc_node_xmit_skb(net, skb, peer_node, own_port);
2183exit:
2184 sock_put(sk); 2570 sock_put(sk);
2185} 2571}
2186 2572
@@ -2345,6 +2731,58 @@ void tipc_sk_rht_destroy(struct net *net)
2345 rhashtable_destroy(&tn->sk_rht); 2731 rhashtable_destroy(&tn->sk_rht);
2346} 2732}
2347 2733
2734static int tipc_sk_join(struct tipc_sock *tsk, struct tipc_group_req *mreq)
2735{
2736 struct net *net = sock_net(&tsk->sk);
2737 u32 domain = addr_domain(net, mreq->scope);
2738 struct tipc_group *grp = tsk->group;
2739 struct tipc_msg *hdr = &tsk->phdr;
2740 struct tipc_name_seq seq;
2741 int rc;
2742
2743 if (mreq->type < TIPC_RESERVED_TYPES)
2744 return -EACCES;
2745 if (grp)
2746 return -EACCES;
2747 grp = tipc_group_create(net, tsk->portid, mreq);
2748 if (!grp)
2749 return -ENOMEM;
2750 tsk->group = grp;
2751 msg_set_lookup_scope(hdr, mreq->scope);
2752 msg_set_nametype(hdr, mreq->type);
2753 msg_set_dest_droppable(hdr, true);
2754 seq.type = mreq->type;
2755 seq.lower = mreq->instance;
2756 seq.upper = seq.lower;
2757 tipc_nametbl_build_group(net, grp, mreq->type, domain);
2758 rc = tipc_sk_publish(tsk, mreq->scope, &seq);
2759 if (rc) {
2760 tipc_group_delete(net, grp);
2761 tsk->group = NULL;
2762 }
2763
2764 /* Eliminate any risk that a broadcast overtakes the sent JOIN */
2765 tsk->mc_method.rcast = true;
2766 tsk->mc_method.mandatory = true;
2767 return rc;
2768}
2769
2770static int tipc_sk_leave(struct tipc_sock *tsk)
2771{
2772 struct net *net = sock_net(&tsk->sk);
2773 struct tipc_group *grp = tsk->group;
2774 struct tipc_name_seq seq;
2775 int scope;
2776
2777 if (!grp)
2778 return -EINVAL;
2779 tipc_group_self(grp, &seq, &scope);
2780 tipc_group_delete(net, grp);
2781 tsk->group = NULL;
2782 tipc_sk_withdraw(tsk, scope, &seq);
2783 return 0;
2784}
2785
2348/** 2786/**
2349 * tipc_setsockopt - set socket option 2787 * tipc_setsockopt - set socket option
2350 * @sock: socket structure 2788 * @sock: socket structure
@@ -2363,6 +2801,7 @@ static int tipc_setsockopt(struct socket *sock, int lvl, int opt,
2363{ 2801{
2364 struct sock *sk = sock->sk; 2802 struct sock *sk = sock->sk;
2365 struct tipc_sock *tsk = tipc_sk(sk); 2803 struct tipc_sock *tsk = tipc_sk(sk);
2804 struct tipc_group_req mreq;
2366 u32 value = 0; 2805 u32 value = 0;
2367 int res = 0; 2806 int res = 0;
2368 2807
@@ -2378,9 +2817,14 @@ static int tipc_setsockopt(struct socket *sock, int lvl, int opt,
2378 case TIPC_CONN_TIMEOUT: 2817 case TIPC_CONN_TIMEOUT:
2379 if (ol < sizeof(value)) 2818 if (ol < sizeof(value))
2380 return -EINVAL; 2819 return -EINVAL;
2381 res = get_user(value, (u32 __user *)ov); 2820 if (get_user(value, (u32 __user *)ov))
2382 if (res) 2821 return -EFAULT;
2383 return res; 2822 break;
2823 case TIPC_GROUP_JOIN:
2824 if (ol < sizeof(mreq))
2825 return -EINVAL;
2826 if (copy_from_user(&mreq, ov, sizeof(mreq)))
2827 return -EFAULT;
2384 break; 2828 break;
2385 default: 2829 default:
2386 if (ov || ol) 2830 if (ov || ol)
@@ -2413,6 +2857,12 @@ static int tipc_setsockopt(struct socket *sock, int lvl, int opt,
2413 tsk->mc_method.rcast = true; 2857 tsk->mc_method.rcast = true;
2414 tsk->mc_method.mandatory = true; 2858 tsk->mc_method.mandatory = true;
2415 break; 2859 break;
2860 case TIPC_GROUP_JOIN:
2861 res = tipc_sk_join(tsk, &mreq);
2862 break;
2863 case TIPC_GROUP_LEAVE:
2864 res = tipc_sk_leave(tsk);
2865 break;
2416 default: 2866 default:
2417 res = -EINVAL; 2867 res = -EINVAL;
2418 } 2868 }
@@ -2440,7 +2890,8 @@ static int tipc_getsockopt(struct socket *sock, int lvl, int opt,
2440{ 2890{
2441 struct sock *sk = sock->sk; 2891 struct sock *sk = sock->sk;
2442 struct tipc_sock *tsk = tipc_sk(sk); 2892 struct tipc_sock *tsk = tipc_sk(sk);
2443 int len; 2893 struct tipc_name_seq seq;
2894 int len, scope;
2444 u32 value; 2895 u32 value;
2445 int res; 2896 int res;
2446 2897
@@ -2474,6 +2925,12 @@ static int tipc_getsockopt(struct socket *sock, int lvl, int opt,
2474 case TIPC_SOCK_RECVQ_DEPTH: 2925 case TIPC_SOCK_RECVQ_DEPTH:
2475 value = skb_queue_len(&sk->sk_receive_queue); 2926 value = skb_queue_len(&sk->sk_receive_queue);
2476 break; 2927 break;
2928 case TIPC_GROUP_JOIN:
2929 seq.type = 0;
2930 if (tsk->group)
2931 tipc_group_self(tsk->group, &seq, &scope);
2932 value = seq.type;
2933 break;
2477 default: 2934 default:
2478 res = -EINVAL; 2935 res = -EINVAL;
2479 } 2936 }