aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc
diff options
context:
space:
mode:
authorJon Maloy <jon.maloy@ericsson.com>2018-01-08 15:03:30 -0500
committerDavid S. Miller <davem@davemloft.net>2018-01-09 12:35:58 -0500
commit232d07b74a33b9f5d48516dc1d8ce41723ada593 (patch)
tree04a2df7d400b7aa4fbb0028d033bbace6e0ebdcc /net/tipc
parent8348500f80d5660af29c475e1f15d412d83564c9 (diff)
tipc: improve groupcast scope handling
When a member joins a group, it also indicates a binding scope. This makes it possible to create both node local groups, invisible to other nodes, as well as cluster global groups, visible everywhere. In order to avoid that different members end up having permanently differing views of group size and memberhip, we must inhibit locally and globally bound members from joining the same group. We do this by using the binding scope as an additional separator between groups. I.e., a member must ignore all membership events from sockets using a different scope than itself, and all lookups for message destinations must require an exact match between the message's lookup scope and the potential target's binding scope. Apart from making it possible to create local groups using the same identity on different nodes, a side effect of this is that it now also becomes possible to create a cluster global group with the same identity across the same nodes, without interfering with the local groups. Acked-by: Ying Xue <ying.xue@windriver.com> Signed-off-by: Jon Maloy <jon.maloy@ericsson.com> Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net/tipc')
-rw-r--r--net/tipc/group.c13
-rw-r--r--net/tipc/name_table.c40
-rw-r--r--net/tipc/name_table.h4
-rw-r--r--net/tipc/server.c4
-rw-r--r--net/tipc/server.h6
-rw-r--r--net/tipc/socket.c88
-rw-r--r--net/tipc/subscr.c10
-rw-r--r--net/tipc/subscr.h2
8 files changed, 96 insertions, 71 deletions
diff --git a/net/tipc/group.c b/net/tipc/group.c
index cf996bd6ec98..1908773c9fca 100644
--- a/net/tipc/group.c
+++ b/net/tipc/group.c
@@ -87,7 +87,6 @@ struct tipc_group {
87 int subid; 87 int subid;
88 u32 type; 88 u32 type;
89 u32 instance; 89 u32 instance;
90 u32 domain;
91 u32 scope; 90 u32 scope;
92 u32 portid; 91 u32 portid;
93 u16 member_cnt; 92 u16 member_cnt;
@@ -158,6 +157,8 @@ int tipc_group_size(struct tipc_group *grp)
158struct tipc_group *tipc_group_create(struct net *net, u32 portid, 157struct tipc_group *tipc_group_create(struct net *net, u32 portid,
159 struct tipc_group_req *mreq) 158 struct tipc_group_req *mreq)
160{ 159{
160 u32 filter = TIPC_SUB_PORTS | TIPC_SUB_NO_STATUS;
161 bool global = mreq->scope != TIPC_NODE_SCOPE;
161 struct tipc_group *grp; 162 struct tipc_group *grp;
162 u32 type = mreq->type; 163 u32 type = mreq->type;
163 164
@@ -171,15 +172,14 @@ struct tipc_group *tipc_group_create(struct net *net, u32 portid,
171 grp->members = RB_ROOT; 172 grp->members = RB_ROOT;
172 grp->net = net; 173 grp->net = net;
173 grp->portid = portid; 174 grp->portid = portid;
174 grp->domain = addr_domain(net, mreq->scope);
175 grp->type = type; 175 grp->type = type;
176 grp->instance = mreq->instance; 176 grp->instance = mreq->instance;
177 grp->scope = mreq->scope; 177 grp->scope = mreq->scope;
178 grp->loopback = mreq->flags & TIPC_GROUP_LOOPBACK; 178 grp->loopback = mreq->flags & TIPC_GROUP_LOOPBACK;
179 grp->events = mreq->flags & TIPC_GROUP_MEMBER_EVTS; 179 grp->events = mreq->flags & TIPC_GROUP_MEMBER_EVTS;
180 if (tipc_topsrv_kern_subscr(net, portid, type, 180 filter |= global ? TIPC_SUB_CLUSTER_SCOPE : TIPC_SUB_NODE_SCOPE;
181 TIPC_SUB_PORTS | TIPC_SUB_NO_STATUS, 181 if (tipc_topsrv_kern_subscr(net, portid, type, 0, ~0,
182 0, ~0, &grp->subid)) 182 filter, &grp->subid))
183 return grp; 183 return grp;
184 kfree(grp); 184 kfree(grp);
185 return NULL; 185 return NULL;
@@ -732,6 +732,9 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup,
732 if (!grp) 732 if (!grp)
733 return; 733 return;
734 734
735 if (grp->scope == TIPC_NODE_SCOPE && node != tipc_own_addr(grp->net))
736 return;
737
735 m = tipc_group_find_member(grp, node, port); 738 m = tipc_group_find_member(grp, node, port);
736 739
737 switch (msg_type(hdr)) { 740 switch (msg_type(hdr)) {
diff --git a/net/tipc/name_table.c b/net/tipc/name_table.c
index 60af9885f160..64cdd3c302b0 100644
--- a/net/tipc/name_table.c
+++ b/net/tipc/name_table.c
@@ -328,7 +328,8 @@ static struct publication *tipc_nameseq_insert_publ(struct net *net,
328 list_for_each_entry_safe(s, st, &nseq->subscriptions, nameseq_list) { 328 list_for_each_entry_safe(s, st, &nseq->subscriptions, nameseq_list) {
329 tipc_subscrp_report_overlap(s, publ->lower, publ->upper, 329 tipc_subscrp_report_overlap(s, publ->lower, publ->upper,
330 TIPC_PUBLISHED, publ->ref, 330 TIPC_PUBLISHED, publ->ref,
331 publ->node, created_subseq); 331 publ->node, publ->scope,
332 created_subseq);
332 } 333 }
333 return publ; 334 return publ;
334} 335}
@@ -398,7 +399,8 @@ found:
398 list_for_each_entry_safe(s, st, &nseq->subscriptions, nameseq_list) { 399 list_for_each_entry_safe(s, st, &nseq->subscriptions, nameseq_list) {
399 tipc_subscrp_report_overlap(s, publ->lower, publ->upper, 400 tipc_subscrp_report_overlap(s, publ->lower, publ->upper,
400 TIPC_WITHDRAWN, publ->ref, 401 TIPC_WITHDRAWN, publ->ref,
401 publ->node, removed_subseq); 402 publ->node, publ->scope,
403 removed_subseq);
402 } 404 }
403 405
404 return publ; 406 return publ;
@@ -435,6 +437,7 @@ static void tipc_nameseq_subscribe(struct name_seq *nseq,
435 sseq->upper, 437 sseq->upper,
436 TIPC_PUBLISHED, 438 TIPC_PUBLISHED,
437 crs->ref, crs->node, 439 crs->ref, crs->node,
440 crs->scope,
438 must_report); 441 must_report);
439 must_report = 0; 442 must_report = 0;
440 } 443 }
@@ -598,7 +601,7 @@ not_found:
598 return ref; 601 return ref;
599} 602}
600 603
601bool tipc_nametbl_lookup(struct net *net, u32 type, u32 instance, u32 domain, 604bool tipc_nametbl_lookup(struct net *net, u32 type, u32 instance, u32 scope,
602 struct list_head *dsts, int *dstcnt, u32 exclude, 605 struct list_head *dsts, int *dstcnt, u32 exclude,
603 bool all) 606 bool all)
604{ 607{
@@ -608,9 +611,6 @@ bool tipc_nametbl_lookup(struct net *net, u32 type, u32 instance, u32 domain,
608 struct name_seq *seq; 611 struct name_seq *seq;
609 struct sub_seq *sseq; 612 struct sub_seq *sseq;
610 613
611 if (!tipc_in_scope(domain, self))
612 return false;
613
614 *dstcnt = 0; 614 *dstcnt = 0;
615 rcu_read_lock(); 615 rcu_read_lock();
616 seq = nametbl_find_seq(net, type); 616 seq = nametbl_find_seq(net, type);
@@ -621,7 +621,7 @@ bool tipc_nametbl_lookup(struct net *net, u32 type, u32 instance, u32 domain,
621 if (likely(sseq)) { 621 if (likely(sseq)) {
622 info = sseq->info; 622 info = sseq->info;
623 list_for_each_entry(publ, &info->zone_list, zone_list) { 623 list_for_each_entry(publ, &info->zone_list, zone_list) {
624 if (!tipc_in_scope(domain, publ->node)) 624 if (publ->scope != scope)
625 continue; 625 continue;
626 if (publ->ref == exclude && publ->node == self) 626 if (publ->ref == exclude && publ->node == self)
627 continue; 627 continue;
@@ -639,13 +639,14 @@ exit:
639 return !list_empty(dsts); 639 return !list_empty(dsts);
640} 640}
641 641
642int tipc_nametbl_mc_translate(struct net *net, u32 type, u32 lower, u32 upper, 642int tipc_nametbl_mc_lookup(struct net *net, u32 type, u32 lower, u32 upper,
643 u32 limit, struct list_head *dports) 643 u32 scope, bool exact, struct list_head *dports)
644{ 644{
645 struct name_seq *seq;
646 struct sub_seq *sseq;
647 struct sub_seq *sseq_stop; 645 struct sub_seq *sseq_stop;
648 struct name_info *info; 646 struct name_info *info;
647 struct publication *p;
648 struct name_seq *seq;
649 struct sub_seq *sseq;
649 int res = 0; 650 int res = 0;
650 651
651 rcu_read_lock(); 652 rcu_read_lock();
@@ -657,15 +658,12 @@ int tipc_nametbl_mc_translate(struct net *net, u32 type, u32 lower, u32 upper,
657 sseq = seq->sseqs + nameseq_locate_subseq(seq, lower); 658 sseq = seq->sseqs + nameseq_locate_subseq(seq, lower);
658 sseq_stop = seq->sseqs + seq->first_free; 659 sseq_stop = seq->sseqs + seq->first_free;
659 for (; sseq != sseq_stop; sseq++) { 660 for (; sseq != sseq_stop; sseq++) {
660 struct publication *publ;
661
662 if (sseq->lower > upper) 661 if (sseq->lower > upper)
663 break; 662 break;
664
665 info = sseq->info; 663 info = sseq->info;
666 list_for_each_entry(publ, &info->node_list, node_list) { 664 list_for_each_entry(p, &info->node_list, node_list) {
667 if (publ->scope <= limit) 665 if (p->scope == scope || (!exact && p->scope < scope))
668 tipc_dest_push(dports, 0, publ->ref); 666 tipc_dest_push(dports, 0, p->ref);
669 } 667 }
670 668
671 if (info->cluster_list_size != info->node_list_size) 669 if (info->cluster_list_size != info->node_list_size)
@@ -682,7 +680,7 @@ exit:
682 * - Determines if any node local ports overlap 680 * - Determines if any node local ports overlap
683 */ 681 */
684void tipc_nametbl_lookup_dst_nodes(struct net *net, u32 type, u32 lower, 682void tipc_nametbl_lookup_dst_nodes(struct net *net, u32 type, u32 lower,
685 u32 upper, u32 domain, 683 u32 upper, u32 scope,
686 struct tipc_nlist *nodes) 684 struct tipc_nlist *nodes)
687{ 685{
688 struct sub_seq *sseq, *stop; 686 struct sub_seq *sseq, *stop;
@@ -701,7 +699,7 @@ void tipc_nametbl_lookup_dst_nodes(struct net *net, u32 type, u32 lower,
701 for (; sseq != stop && sseq->lower <= upper; sseq++) { 699 for (; sseq != stop && sseq->lower <= upper; sseq++) {
702 info = sseq->info; 700 info = sseq->info;
703 list_for_each_entry(publ, &info->zone_list, zone_list) { 701 list_for_each_entry(publ, &info->zone_list, zone_list) {
704 if (tipc_in_scope(domain, publ->node)) 702 if (publ->scope == scope)
705 tipc_nlist_add(nodes, publ->node); 703 tipc_nlist_add(nodes, publ->node);
706 } 704 }
707 } 705 }
@@ -713,7 +711,7 @@ exit:
713/* tipc_nametbl_build_group - build list of communication group members 711/* tipc_nametbl_build_group - build list of communication group members
714 */ 712 */
715void tipc_nametbl_build_group(struct net *net, struct tipc_group *grp, 713void tipc_nametbl_build_group(struct net *net, struct tipc_group *grp,
716 u32 type, u32 domain) 714 u32 type, u32 scope)
717{ 715{
718 struct sub_seq *sseq, *stop; 716 struct sub_seq *sseq, *stop;
719 struct name_info *info; 717 struct name_info *info;
@@ -731,7 +729,7 @@ void tipc_nametbl_build_group(struct net *net, struct tipc_group *grp,
731 for (; sseq != stop; sseq++) { 729 for (; sseq != stop; sseq++) {
732 info = sseq->info; 730 info = sseq->info;
733 list_for_each_entry(p, &info->zone_list, zone_list) { 731 list_for_each_entry(p, &info->zone_list, zone_list) {
734 if (!tipc_in_scope(domain, p->node)) 732 if (p->scope != scope)
735 continue; 733 continue;
736 tipc_group_add_member(grp, p->node, p->ref, p->lower); 734 tipc_group_add_member(grp, p->node, p->ref, p->lower);
737 } 735 }
diff --git a/net/tipc/name_table.h b/net/tipc/name_table.h
index 73a148c85c15..b595d8aa00f0 100644
--- a/net/tipc/name_table.h
+++ b/net/tipc/name_table.h
@@ -100,8 +100,8 @@ struct name_table {
100int tipc_nl_name_table_dump(struct sk_buff *skb, struct netlink_callback *cb); 100int tipc_nl_name_table_dump(struct sk_buff *skb, struct netlink_callback *cb);
101 101
102u32 tipc_nametbl_translate(struct net *net, u32 type, u32 instance, u32 *node); 102u32 tipc_nametbl_translate(struct net *net, u32 type, u32 instance, u32 *node);
103int tipc_nametbl_mc_translate(struct net *net, u32 type, u32 lower, u32 upper, 103int tipc_nametbl_mc_lookup(struct net *net, u32 type, u32 lower, u32 upper,
104 u32 limit, struct list_head *dports); 104 u32 scope, bool exact, struct list_head *dports);
105void tipc_nametbl_build_group(struct net *net, struct tipc_group *grp, 105void tipc_nametbl_build_group(struct net *net, struct tipc_group *grp,
106 u32 type, u32 domain); 106 u32 type, u32 domain);
107void tipc_nametbl_lookup_dst_nodes(struct net *net, u32 type, u32 lower, 107void tipc_nametbl_lookup_dst_nodes(struct net *net, u32 type, u32 lower,
diff --git a/net/tipc/server.c b/net/tipc/server.c
index 950c54cbcf3a..8ee5e86b7870 100644
--- a/net/tipc/server.c
+++ b/net/tipc/server.c
@@ -489,8 +489,8 @@ void tipc_conn_terminate(struct tipc_server *s, int conid)
489 } 489 }
490} 490}
491 491
492bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, 492bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, u32 lower,
493 u32 filter, u32 lower, u32 upper, int *conid) 493 u32 upper, u32 filter, int *conid)
494{ 494{
495 struct tipc_subscriber *scbr; 495 struct tipc_subscriber *scbr;
496 struct tipc_subscr sub; 496 struct tipc_subscr sub;
diff --git a/net/tipc/server.h b/net/tipc/server.h
index ea1effbff23e..17f49ee44cfd 100644
--- a/net/tipc/server.h
+++ b/net/tipc/server.h
@@ -41,6 +41,8 @@
41#include <net/net_namespace.h> 41#include <net/net_namespace.h>
42 42
43#define TIPC_SERVER_NAME_LEN 32 43#define TIPC_SERVER_NAME_LEN 32
44#define TIPC_SUB_CLUSTER_SCOPE 0x20
45#define TIPC_SUB_NODE_SCOPE 0x40
44#define TIPC_SUB_NO_STATUS 0x80 46#define TIPC_SUB_NO_STATUS 0x80
45 47
46/** 48/**
@@ -84,8 +86,8 @@ struct tipc_server {
84int tipc_conn_sendmsg(struct tipc_server *s, int conid, 86int tipc_conn_sendmsg(struct tipc_server *s, int conid,
85 struct sockaddr_tipc *addr, void *data, size_t len); 87 struct sockaddr_tipc *addr, void *data, size_t len);
86 88
87bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, 89bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, u32 lower,
88 u32 filter, u32 lower, u32 upper, int *conid); 90 u32 upper, u32 filter, int *conid);
89void tipc_topsrv_kern_unsubscr(struct net *net, int conid); 91void tipc_topsrv_kern_unsubscr(struct net *net, int conid);
90 92
91/** 93/**
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index e3a02f1fcab5..b24dab3996c9 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -928,21 +928,22 @@ static int tipc_send_group_anycast(struct socket *sock, struct msghdr *m,
928 struct list_head *cong_links = &tsk->cong_links; 928 struct list_head *cong_links = &tsk->cong_links;
929 int blks = tsk_blocks(GROUP_H_SIZE + dlen); 929 int blks = tsk_blocks(GROUP_H_SIZE + dlen);
930 struct tipc_group *grp = tsk->group; 930 struct tipc_group *grp = tsk->group;
931 struct tipc_msg *hdr = &tsk->phdr;
931 struct tipc_member *first = NULL; 932 struct tipc_member *first = NULL;
932 struct tipc_member *mbr = NULL; 933 struct tipc_member *mbr = NULL;
933 struct net *net = sock_net(sk); 934 struct net *net = sock_net(sk);
934 u32 node, port, exclude; 935 u32 node, port, exclude;
935 u32 type, inst, domain;
936 struct list_head dsts; 936 struct list_head dsts;
937 u32 type, inst, scope;
937 int lookups = 0; 938 int lookups = 0;
938 int dstcnt, rc; 939 int dstcnt, rc;
939 bool cong; 940 bool cong;
940 941
941 INIT_LIST_HEAD(&dsts); 942 INIT_LIST_HEAD(&dsts);
942 943
943 type = dest->addr.name.name.type; 944 type = msg_nametype(hdr);
944 inst = dest->addr.name.name.instance; 945 inst = dest->addr.name.name.instance;
945 domain = addr_domain(net, dest->scope); 946 scope = msg_lookup_scope(hdr);
946 exclude = tipc_group_exclude(grp); 947 exclude = tipc_group_exclude(grp);
947 948
948 while (++lookups < 4) { 949 while (++lookups < 4) {
@@ -950,7 +951,7 @@ static int tipc_send_group_anycast(struct socket *sock, struct msghdr *m,
950 951
951 /* Look for a non-congested destination member, if any */ 952 /* Look for a non-congested destination member, if any */
952 while (1) { 953 while (1) {
953 if (!tipc_nametbl_lookup(net, type, inst, domain, &dsts, 954 if (!tipc_nametbl_lookup(net, type, inst, scope, &dsts,
954 &dstcnt, exclude, false)) 955 &dstcnt, exclude, false))
955 return -EHOSTUNREACH; 956 return -EHOSTUNREACH;
956 tipc_dest_pop(&dsts, &node, &port); 957 tipc_dest_pop(&dsts, &node, &port);
@@ -1079,22 +1080,23 @@ static int tipc_send_group_mcast(struct socket *sock, struct msghdr *m,
1079{ 1080{
1080 struct sock *sk = sock->sk; 1081 struct sock *sk = sock->sk;
1081 DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name); 1082 DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
1082 struct tipc_name_seq *seq = &dest->addr.nameseq;
1083 struct tipc_sock *tsk = tipc_sk(sk); 1083 struct tipc_sock *tsk = tipc_sk(sk);
1084 struct tipc_group *grp = tsk->group; 1084 struct tipc_group *grp = tsk->group;
1085 struct tipc_msg *hdr = &tsk->phdr;
1085 struct net *net = sock_net(sk); 1086 struct net *net = sock_net(sk);
1086 u32 domain, exclude, dstcnt; 1087 u32 type, inst, scope, exclude;
1087 struct list_head dsts; 1088 struct list_head dsts;
1089 u32 dstcnt;
1088 1090
1089 INIT_LIST_HEAD(&dsts); 1091 INIT_LIST_HEAD(&dsts);
1090 1092
1091 if (seq->lower != seq->upper) 1093 type = msg_nametype(hdr);
1092 return -ENOTSUPP; 1094 inst = dest->addr.name.name.instance;
1093 1095 scope = msg_lookup_scope(hdr);
1094 domain = addr_domain(net, dest->scope);
1095 exclude = tipc_group_exclude(grp); 1096 exclude = tipc_group_exclude(grp);
1096 if (!tipc_nametbl_lookup(net, seq->type, seq->lower, domain, 1097
1097 &dsts, &dstcnt, exclude, true)) 1098 if (!tipc_nametbl_lookup(net, type, inst, scope, &dsts,
1099 &dstcnt, exclude, true))
1098 return -EHOSTUNREACH; 1100 return -EHOSTUNREACH;
1099 1101
1100 if (dstcnt == 1) { 1102 if (dstcnt == 1) {
@@ -1116,24 +1118,29 @@ static int tipc_send_group_mcast(struct socket *sock, struct msghdr *m,
1116void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq, 1118void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
1117 struct sk_buff_head *inputq) 1119 struct sk_buff_head *inputq)
1118{ 1120{
1119 u32 scope = TIPC_CLUSTER_SCOPE;
1120 u32 self = tipc_own_addr(net); 1121 u32 self = tipc_own_addr(net);
1122 u32 type, lower, upper, scope;
1121 struct sk_buff *skb, *_skb; 1123 struct sk_buff *skb, *_skb;
1122 u32 lower = 0, upper = ~0;
1123 struct sk_buff_head tmpq;
1124 u32 portid, oport, onode; 1124 u32 portid, oport, onode;
1125 struct sk_buff_head tmpq;
1125 struct list_head dports; 1126 struct list_head dports;
1126 struct tipc_msg *msg; 1127 struct tipc_msg *hdr;
1127 int user, mtyp, hsz; 1128 int user, mtyp, hlen;
1129 bool exact;
1128 1130
1129 __skb_queue_head_init(&tmpq); 1131 __skb_queue_head_init(&tmpq);
1130 INIT_LIST_HEAD(&dports); 1132 INIT_LIST_HEAD(&dports);
1131 1133
1132 skb = tipc_skb_peek(arrvq, &inputq->lock); 1134 skb = tipc_skb_peek(arrvq, &inputq->lock);
1133 for (; skb; skb = tipc_skb_peek(arrvq, &inputq->lock)) { 1135 for (; skb; skb = tipc_skb_peek(arrvq, &inputq->lock)) {
1134 msg = buf_msg(skb); 1136 hdr = buf_msg(skb);
1135 user = msg_user(msg); 1137 user = msg_user(hdr);
1136 mtyp = msg_type(msg); 1138 mtyp = msg_type(hdr);
1139 hlen = skb_headroom(skb) + msg_hdr_sz(hdr);
1140 oport = msg_origport(hdr);
1141 onode = msg_orignode(hdr);
1142 type = msg_nametype(hdr);
1143
1137 if (mtyp == TIPC_GRP_UCAST_MSG || user == GROUP_PROTOCOL) { 1144 if (mtyp == TIPC_GRP_UCAST_MSG || user == GROUP_PROTOCOL) {
1138 spin_lock_bh(&inputq->lock); 1145 spin_lock_bh(&inputq->lock);
1139 if (skb_peek(arrvq) == skb) { 1146 if (skb_peek(arrvq) == skb) {
@@ -1144,21 +1151,31 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
1144 spin_unlock_bh(&inputq->lock); 1151 spin_unlock_bh(&inputq->lock);
1145 continue; 1152 continue;
1146 } 1153 }
1147 hsz = skb_headroom(skb) + msg_hdr_sz(msg); 1154
1148 oport = msg_origport(msg); 1155 /* Group messages require exact scope match */
1149 onode = msg_orignode(msg); 1156 if (msg_in_group(hdr)) {
1150 if (onode == self) 1157 lower = 0;
1151 scope = TIPC_NODE_SCOPE; 1158 upper = ~0;
1152 1159 scope = msg_lookup_scope(hdr);
1153 /* Create destination port list and message clones: */ 1160 exact = true;
1154 if (!msg_in_group(msg)) { 1161 } else {
1155 lower = msg_namelower(msg); 1162 /* TIPC_NODE_SCOPE means "any scope" in this context */
1156 upper = msg_nameupper(msg); 1163 if (onode == self)
1164 scope = TIPC_NODE_SCOPE;
1165 else
1166 scope = TIPC_CLUSTER_SCOPE;
1167 exact = false;
1168 lower = msg_namelower(hdr);
1169 upper = msg_nameupper(hdr);
1157 } 1170 }
1158 tipc_nametbl_mc_translate(net, msg_nametype(msg), lower, upper, 1171
1159 scope, &dports); 1172 /* Create destination port list: */
1173 tipc_nametbl_mc_lookup(net, type, lower, upper,
1174 scope, exact, &dports);
1175
1176 /* Clone message per destination */
1160 while (tipc_dest_pop(&dports, NULL, &portid)) { 1177 while (tipc_dest_pop(&dports, NULL, &portid)) {
1161 _skb = __pskb_copy(skb, hsz, GFP_ATOMIC); 1178 _skb = __pskb_copy(skb, hlen, GFP_ATOMIC);
1162 if (_skb) { 1179 if (_skb) {
1163 msg_set_destport(buf_msg(_skb), portid); 1180 msg_set_destport(buf_msg(_skb), portid);
1164 __skb_queue_tail(&tmpq, _skb); 1181 __skb_queue_tail(&tmpq, _skb);
@@ -2731,7 +2748,6 @@ void tipc_sk_rht_destroy(struct net *net)
2731static int tipc_sk_join(struct tipc_sock *tsk, struct tipc_group_req *mreq) 2748static int tipc_sk_join(struct tipc_sock *tsk, struct tipc_group_req *mreq)
2732{ 2749{
2733 struct net *net = sock_net(&tsk->sk); 2750 struct net *net = sock_net(&tsk->sk);
2734 u32 domain = addr_domain(net, mreq->scope);
2735 struct tipc_group *grp = tsk->group; 2751 struct tipc_group *grp = tsk->group;
2736 struct tipc_msg *hdr = &tsk->phdr; 2752 struct tipc_msg *hdr = &tsk->phdr;
2737 struct tipc_name_seq seq; 2753 struct tipc_name_seq seq;
@@ -2739,6 +2755,8 @@ static int tipc_sk_join(struct tipc_sock *tsk, struct tipc_group_req *mreq)
2739 2755
2740 if (mreq->type < TIPC_RESERVED_TYPES) 2756 if (mreq->type < TIPC_RESERVED_TYPES)
2741 return -EACCES; 2757 return -EACCES;
2758 if (mreq->scope > TIPC_NODE_SCOPE)
2759 return -EINVAL;
2742 if (grp) 2760 if (grp)
2743 return -EACCES; 2761 return -EACCES;
2744 grp = tipc_group_create(net, tsk->portid, mreq); 2762 grp = tipc_group_create(net, tsk->portid, mreq);
@@ -2751,7 +2769,7 @@ static int tipc_sk_join(struct tipc_sock *tsk, struct tipc_group_req *mreq)
2751 seq.type = mreq->type; 2769 seq.type = mreq->type;
2752 seq.lower = mreq->instance; 2770 seq.lower = mreq->instance;
2753 seq.upper = seq.lower; 2771 seq.upper = seq.lower;
2754 tipc_nametbl_build_group(net, grp, mreq->type, domain); 2772 tipc_nametbl_build_group(net, grp, mreq->type, mreq->scope);
2755 rc = tipc_sk_publish(tsk, mreq->scope, &seq); 2773 rc = tipc_sk_publish(tsk, mreq->scope, &seq);
2756 if (rc) { 2774 if (rc) {
2757 tipc_group_delete(net, grp); 2775 tipc_group_delete(net, grp);
diff --git a/net/tipc/subscr.c b/net/tipc/subscr.c
index 1052341a0ea9..44df528ed6ab 100644
--- a/net/tipc/subscr.c
+++ b/net/tipc/subscr.c
@@ -118,15 +118,19 @@ void tipc_subscrp_convert_seq(struct tipc_name_seq *in, int swap,
118 118
119void tipc_subscrp_report_overlap(struct tipc_subscription *sub, u32 found_lower, 119void tipc_subscrp_report_overlap(struct tipc_subscription *sub, u32 found_lower,
120 u32 found_upper, u32 event, u32 port_ref, 120 u32 found_upper, u32 event, u32 port_ref,
121 u32 node, int must) 121 u32 node, u32 scope, int must)
122{ 122{
123 u32 filter = htohl(sub->evt.s.filter, sub->swap);
123 struct tipc_name_seq seq; 124 struct tipc_name_seq seq;
124 125
125 tipc_subscrp_convert_seq(&sub->evt.s.seq, sub->swap, &seq); 126 tipc_subscrp_convert_seq(&sub->evt.s.seq, sub->swap, &seq);
126 if (!tipc_subscrp_check_overlap(&seq, found_lower, found_upper)) 127 if (!tipc_subscrp_check_overlap(&seq, found_lower, found_upper))
127 return; 128 return;
128 if (!must && 129 if (!must && !(filter & TIPC_SUB_PORTS))
129 !(htohl(sub->evt.s.filter, sub->swap) & TIPC_SUB_PORTS)) 130 return;
131 if (filter & TIPC_SUB_CLUSTER_SCOPE && scope == TIPC_NODE_SCOPE)
132 return;
133 if (filter & TIPC_SUB_NODE_SCOPE && scope != TIPC_NODE_SCOPE)
130 return; 134 return;
131 135
132 tipc_subscrp_send_event(sub, found_lower, found_upper, event, port_ref, 136 tipc_subscrp_send_event(sub, found_lower, found_upper, event, port_ref,
diff --git a/net/tipc/subscr.h b/net/tipc/subscr.h
index ee52957dc952..f3edca775d9f 100644
--- a/net/tipc/subscr.h
+++ b/net/tipc/subscr.h
@@ -71,7 +71,7 @@ int tipc_subscrp_check_overlap(struct tipc_name_seq *seq, u32 found_lower,
71 u32 found_upper); 71 u32 found_upper);
72void tipc_subscrp_report_overlap(struct tipc_subscription *sub, 72void tipc_subscrp_report_overlap(struct tipc_subscription *sub,
73 u32 found_lower, u32 found_upper, u32 event, 73 u32 found_lower, u32 found_upper, u32 event,
74 u32 port_ref, u32 node, int must); 74 u32 port_ref, u32 node, u32 scope, int must);
75void tipc_subscrp_convert_seq(struct tipc_name_seq *in, int swap, 75void tipc_subscrp_convert_seq(struct tipc_name_seq *in, int swap,
76 struct tipc_name_seq *out); 76 struct tipc_name_seq *out);
77u32 tipc_subscrp_convert_seq_type(u32 type, int swap); 77u32 tipc_subscrp_convert_seq_type(u32 type, int swap);