aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc
diff options
context:
space:
mode:
Diffstat (limited to 'net/tipc')
-rw-r--r--net/tipc/bcast.c12
-rw-r--r--net/tipc/core.h1
-rw-r--r--net/tipc/group.c372
-rw-r--r--net/tipc/group.h8
-rw-r--r--net/tipc/link.c2
-rw-r--r--net/tipc/msg.c51
-rw-r--r--net/tipc/msg.h3
-rw-r--r--net/tipc/name_table.c57
-rw-r--r--net/tipc/name_table.h9
-rw-r--r--net/tipc/server.c76
-rw-r--r--net/tipc/server.h13
-rw-r--r--net/tipc/socket.c113
-rw-r--r--net/tipc/subscr.c35
-rw-r--r--net/tipc/subscr.h2
14 files changed, 424 insertions, 330 deletions
diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index 329325bd553e..37892b3909af 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -1,7 +1,7 @@
1/* 1/*
2 * net/tipc/bcast.c: TIPC broadcast code 2 * net/tipc/bcast.c: TIPC broadcast code
3 * 3 *
4 * Copyright (c) 2004-2006, 2014-2016, Ericsson AB 4 * Copyright (c) 2004-2006, 2014-2017, Ericsson AB
5 * Copyright (c) 2004, Intel Corporation. 5 * Copyright (c) 2004, Intel Corporation.
6 * Copyright (c) 2005, 2010-2011, Wind River Systems 6 * Copyright (c) 2005, 2010-2011, Wind River Systems
7 * All rights reserved. 7 * All rights reserved.
@@ -42,8 +42,8 @@
42#include "link.h" 42#include "link.h"
43#include "name_table.h" 43#include "name_table.h"
44 44
45#define BCLINK_WIN_DEFAULT 50 /* bcast link window size (default) */ 45#define BCLINK_WIN_DEFAULT 50 /* bcast link window size (default) */
46#define BCLINK_WIN_MIN 32 /* bcast minimum link window size */ 46#define BCLINK_WIN_MIN 32 /* bcast minimum link window size */
47 47
48const char tipc_bclink_name[] = "broadcast-link"; 48const char tipc_bclink_name[] = "broadcast-link";
49 49
@@ -74,6 +74,10 @@ static struct tipc_bc_base *tipc_bc_base(struct net *net)
74 return tipc_net(net)->bcbase; 74 return tipc_net(net)->bcbase;
75} 75}
76 76
77/* tipc_bcast_get_mtu(): -get the MTU currently used by broadcast link
78 * Note: the MTU is decremented to give room for a tunnel header, in
79 * case the message needs to be sent as replicast
80 */
77int tipc_bcast_get_mtu(struct net *net) 81int tipc_bcast_get_mtu(struct net *net)
78{ 82{
79 return tipc_link_mtu(tipc_bc_sndlink(net)) - INT_H_SIZE; 83 return tipc_link_mtu(tipc_bc_sndlink(net)) - INT_H_SIZE;
@@ -515,7 +519,7 @@ int tipc_bcast_init(struct net *net)
515 spin_lock_init(&tipc_net(net)->bclock); 519 spin_lock_init(&tipc_net(net)->bclock);
516 520
517 if (!tipc_link_bc_create(net, 0, 0, 521 if (!tipc_link_bc_create(net, 0, 0,
518 U16_MAX, 522 FB_MTU,
519 BCLINK_WIN_DEFAULT, 523 BCLINK_WIN_DEFAULT,
520 0, 524 0,
521 &bb->inputq, 525 &bb->inputq,
diff --git a/net/tipc/core.h b/net/tipc/core.h
index 964342689f2c..20b21af2ff14 100644
--- a/net/tipc/core.h
+++ b/net/tipc/core.h
@@ -49,7 +49,6 @@
49#include <linux/uaccess.h> 49#include <linux/uaccess.h>
50#include <linux/interrupt.h> 50#include <linux/interrupt.h>
51#include <linux/atomic.h> 51#include <linux/atomic.h>
52#include <asm/hardirq.h>
53#include <linux/netdevice.h> 52#include <linux/netdevice.h>
54#include <linux/in.h> 53#include <linux/in.h>
55#include <linux/list.h> 54#include <linux/list.h>
diff --git a/net/tipc/group.c b/net/tipc/group.c
index 5f4ffae807ee..497ee34bfab9 100644
--- a/net/tipc/group.c
+++ b/net/tipc/group.c
@@ -49,8 +49,6 @@
49#define ADV_ACTIVE (ADV_UNIT * 12) 49#define ADV_ACTIVE (ADV_UNIT * 12)
50 50
51enum mbr_state { 51enum mbr_state {
52 MBR_QUARANTINED,
53 MBR_DISCOVERED,
54 MBR_JOINING, 52 MBR_JOINING,
55 MBR_PUBLISHED, 53 MBR_PUBLISHED,
56 MBR_JOINED, 54 MBR_JOINED,
@@ -64,8 +62,7 @@ enum mbr_state {
64struct tipc_member { 62struct tipc_member {
65 struct rb_node tree_node; 63 struct rb_node tree_node;
66 struct list_head list; 64 struct list_head list;
67 struct list_head congested; 65 struct list_head small_win;
68 struct sk_buff *event_msg;
69 struct sk_buff_head deferredq; 66 struct sk_buff_head deferredq;
70 struct tipc_group *group; 67 struct tipc_group *group;
71 u32 node; 68 u32 node;
@@ -77,21 +74,18 @@ struct tipc_member {
77 u16 bc_rcv_nxt; 74 u16 bc_rcv_nxt;
78 u16 bc_syncpt; 75 u16 bc_syncpt;
79 u16 bc_acked; 76 u16 bc_acked;
80 bool usr_pending;
81}; 77};
82 78
83struct tipc_group { 79struct tipc_group {
84 struct rb_root members; 80 struct rb_root members;
85 struct list_head congested; 81 struct list_head small_win;
86 struct list_head pending; 82 struct list_head pending;
87 struct list_head active; 83 struct list_head active;
88 struct list_head reclaiming;
89 struct tipc_nlist dests; 84 struct tipc_nlist dests;
90 struct net *net; 85 struct net *net;
91 int subid; 86 int subid;
92 u32 type; 87 u32 type;
93 u32 instance; 88 u32 instance;
94 u32 domain;
95 u32 scope; 89 u32 scope;
96 u32 portid; 90 u32 portid;
97 u16 member_cnt; 91 u16 member_cnt;
@@ -101,11 +95,27 @@ struct tipc_group {
101 u16 bc_ackers; 95 u16 bc_ackers;
102 bool loopback; 96 bool loopback;
103 bool events; 97 bool events;
98 bool open;
104}; 99};
105 100
106static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m, 101static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m,
107 int mtyp, struct sk_buff_head *xmitq); 102 int mtyp, struct sk_buff_head *xmitq);
108 103
104bool tipc_group_is_open(struct tipc_group *grp)
105{
106 return grp->open;
107}
108
109static void tipc_group_open(struct tipc_member *m, bool *wakeup)
110{
111 *wakeup = false;
112 if (list_empty(&m->small_win))
113 return;
114 list_del_init(&m->small_win);
115 m->group->open = true;
116 *wakeup = true;
117}
118
109static void tipc_group_decr_active(struct tipc_group *grp, 119static void tipc_group_decr_active(struct tipc_group *grp,
110 struct tipc_member *m) 120 struct tipc_member *m)
111{ 121{
@@ -137,14 +147,14 @@ u16 tipc_group_bc_snd_nxt(struct tipc_group *grp)
137 return grp->bc_snd_nxt; 147 return grp->bc_snd_nxt;
138} 148}
139 149
140static bool tipc_group_is_enabled(struct tipc_member *m) 150static bool tipc_group_is_receiver(struct tipc_member *m)
141{ 151{
142 return m->state != MBR_QUARANTINED && m->state != MBR_LEAVING; 152 return m && m->state != MBR_JOINING && m->state != MBR_LEAVING;
143} 153}
144 154
145static bool tipc_group_is_receiver(struct tipc_member *m) 155static bool tipc_group_is_sender(struct tipc_member *m)
146{ 156{
147 return m && m->state >= MBR_JOINED; 157 return m && m->state != MBR_JOINING && m->state != MBR_PUBLISHED;
148} 158}
149 159
150u32 tipc_group_exclude(struct tipc_group *grp) 160u32 tipc_group_exclude(struct tipc_group *grp)
@@ -162,6 +172,8 @@ int tipc_group_size(struct tipc_group *grp)
162struct tipc_group *tipc_group_create(struct net *net, u32 portid, 172struct tipc_group *tipc_group_create(struct net *net, u32 portid,
163 struct tipc_group_req *mreq) 173 struct tipc_group_req *mreq)
164{ 174{
175 u32 filter = TIPC_SUB_PORTS | TIPC_SUB_NO_STATUS;
176 bool global = mreq->scope != TIPC_NODE_SCOPE;
165 struct tipc_group *grp; 177 struct tipc_group *grp;
166 u32 type = mreq->type; 178 u32 type = mreq->type;
167 179
@@ -169,25 +181,40 @@ struct tipc_group *tipc_group_create(struct net *net, u32 portid,
169 if (!grp) 181 if (!grp)
170 return NULL; 182 return NULL;
171 tipc_nlist_init(&grp->dests, tipc_own_addr(net)); 183 tipc_nlist_init(&grp->dests, tipc_own_addr(net));
172 INIT_LIST_HEAD(&grp->congested); 184 INIT_LIST_HEAD(&grp->small_win);
173 INIT_LIST_HEAD(&grp->active); 185 INIT_LIST_HEAD(&grp->active);
174 INIT_LIST_HEAD(&grp->pending); 186 INIT_LIST_HEAD(&grp->pending);
175 INIT_LIST_HEAD(&grp->reclaiming);
176 grp->members = RB_ROOT; 187 grp->members = RB_ROOT;
177 grp->net = net; 188 grp->net = net;
178 grp->portid = portid; 189 grp->portid = portid;
179 grp->domain = addr_domain(net, mreq->scope);
180 grp->type = type; 190 grp->type = type;
181 grp->instance = mreq->instance; 191 grp->instance = mreq->instance;
182 grp->scope = mreq->scope; 192 grp->scope = mreq->scope;
183 grp->loopback = mreq->flags & TIPC_GROUP_LOOPBACK; 193 grp->loopback = mreq->flags & TIPC_GROUP_LOOPBACK;
184 grp->events = mreq->flags & TIPC_GROUP_MEMBER_EVTS; 194 grp->events = mreq->flags & TIPC_GROUP_MEMBER_EVTS;
185 if (tipc_topsrv_kern_subscr(net, portid, type, 0, ~0, &grp->subid)) 195 filter |= global ? TIPC_SUB_CLUSTER_SCOPE : TIPC_SUB_NODE_SCOPE;
196 if (tipc_topsrv_kern_subscr(net, portid, type, 0, ~0,
197 filter, &grp->subid))
186 return grp; 198 return grp;
187 kfree(grp); 199 kfree(grp);
188 return NULL; 200 return NULL;
189} 201}
190 202
203void tipc_group_join(struct net *net, struct tipc_group *grp, int *sk_rcvbuf)
204{
205 struct rb_root *tree = &grp->members;
206 struct tipc_member *m, *tmp;
207 struct sk_buff_head xmitq;
208
209 skb_queue_head_init(&xmitq);
210 rbtree_postorder_for_each_entry_safe(m, tmp, tree, tree_node) {
211 tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, &xmitq);
212 tipc_group_update_member(m, 0);
213 }
214 tipc_node_distr_xmit(net, &xmitq);
215 *sk_rcvbuf = tipc_group_rcvbuf_limit(grp);
216}
217
191void tipc_group_delete(struct net *net, struct tipc_group *grp) 218void tipc_group_delete(struct net *net, struct tipc_group *grp)
192{ 219{
193 struct rb_root *tree = &grp->members; 220 struct rb_root *tree = &grp->members;
@@ -233,7 +260,7 @@ static struct tipc_member *tipc_group_find_dest(struct tipc_group *grp,
233 struct tipc_member *m; 260 struct tipc_member *m;
234 261
235 m = tipc_group_find_member(grp, node, port); 262 m = tipc_group_find_member(grp, node, port);
236 if (m && tipc_group_is_enabled(m)) 263 if (m && tipc_group_is_receiver(m))
237 return m; 264 return m;
238 return NULL; 265 return NULL;
239} 266}
@@ -278,7 +305,7 @@ static void tipc_group_add_to_tree(struct tipc_group *grp,
278 305
279static struct tipc_member *tipc_group_create_member(struct tipc_group *grp, 306static struct tipc_member *tipc_group_create_member(struct tipc_group *grp,
280 u32 node, u32 port, 307 u32 node, u32 port,
281 int state) 308 u32 instance, int state)
282{ 309{
283 struct tipc_member *m; 310 struct tipc_member *m;
284 311
@@ -286,11 +313,12 @@ static struct tipc_member *tipc_group_create_member(struct tipc_group *grp,
286 if (!m) 313 if (!m)
287 return NULL; 314 return NULL;
288 INIT_LIST_HEAD(&m->list); 315 INIT_LIST_HEAD(&m->list);
289 INIT_LIST_HEAD(&m->congested); 316 INIT_LIST_HEAD(&m->small_win);
290 __skb_queue_head_init(&m->deferredq); 317 __skb_queue_head_init(&m->deferredq);
291 m->group = grp; 318 m->group = grp;
292 m->node = node; 319 m->node = node;
293 m->port = port; 320 m->port = port;
321 m->instance = instance;
294 m->bc_acked = grp->bc_snd_nxt - 1; 322 m->bc_acked = grp->bc_snd_nxt - 1;
295 grp->member_cnt++; 323 grp->member_cnt++;
296 tipc_group_add_to_tree(grp, m); 324 tipc_group_add_to_tree(grp, m);
@@ -299,9 +327,10 @@ static struct tipc_member *tipc_group_create_member(struct tipc_group *grp,
299 return m; 327 return m;
300} 328}
301 329
302void tipc_group_add_member(struct tipc_group *grp, u32 node, u32 port) 330void tipc_group_add_member(struct tipc_group *grp, u32 node,
331 u32 port, u32 instance)
303{ 332{
304 tipc_group_create_member(grp, node, port, MBR_DISCOVERED); 333 tipc_group_create_member(grp, node, port, instance, MBR_PUBLISHED);
305} 334}
306 335
307static void tipc_group_delete_member(struct tipc_group *grp, 336static void tipc_group_delete_member(struct tipc_group *grp,
@@ -315,7 +344,7 @@ static void tipc_group_delete_member(struct tipc_group *grp,
315 grp->bc_ackers--; 344 grp->bc_ackers--;
316 345
317 list_del_init(&m->list); 346 list_del_init(&m->list);
318 list_del_init(&m->congested); 347 list_del_init(&m->small_win);
319 tipc_group_decr_active(grp, m); 348 tipc_group_decr_active(grp, m);
320 349
321 /* If last member on a node, remove node from dest list */ 350 /* If last member on a node, remove node from dest list */
@@ -344,7 +373,7 @@ void tipc_group_update_member(struct tipc_member *m, int len)
344 struct tipc_group *grp = m->group; 373 struct tipc_group *grp = m->group;
345 struct tipc_member *_m, *tmp; 374 struct tipc_member *_m, *tmp;
346 375
347 if (!tipc_group_is_enabled(m)) 376 if (!tipc_group_is_receiver(m))
348 return; 377 return;
349 378
350 m->window -= len; 379 m->window -= len;
@@ -352,16 +381,14 @@ void tipc_group_update_member(struct tipc_member *m, int len)
352 if (m->window >= ADV_IDLE) 381 if (m->window >= ADV_IDLE)
353 return; 382 return;
354 383
355 list_del_init(&m->congested); 384 list_del_init(&m->small_win);
356 385
357 /* Sort member into congested members' list */ 386 /* Sort member into small_window members' list */
358 list_for_each_entry_safe(_m, tmp, &grp->congested, congested) { 387 list_for_each_entry_safe(_m, tmp, &grp->small_win, small_win) {
359 if (m->window > _m->window) 388 if (_m->window > m->window)
360 continue; 389 break;
361 list_add_tail(&m->congested, &_m->congested);
362 return;
363 } 390 }
364 list_add_tail(&m->congested, &grp->congested); 391 list_add_tail(&m->small_win, &_m->small_win);
365} 392}
366 393
367void tipc_group_update_bc_members(struct tipc_group *grp, int len, bool ack) 394void tipc_group_update_bc_members(struct tipc_group *grp, int len, bool ack)
@@ -373,7 +400,7 @@ void tipc_group_update_bc_members(struct tipc_group *grp, int len, bool ack)
373 400
374 for (n = rb_first(&grp->members); n; n = rb_next(n)) { 401 for (n = rb_first(&grp->members); n; n = rb_next(n)) {
375 m = container_of(n, struct tipc_member, tree_node); 402 m = container_of(n, struct tipc_member, tree_node);
376 if (tipc_group_is_enabled(m)) { 403 if (tipc_group_is_receiver(m)) {
377 tipc_group_update_member(m, len); 404 tipc_group_update_member(m, len);
378 m->bc_acked = prev; 405 m->bc_acked = prev;
379 ackers++; 406 ackers++;
@@ -394,20 +421,20 @@ bool tipc_group_cong(struct tipc_group *grp, u32 dnode, u32 dport,
394 int adv, state; 421 int adv, state;
395 422
396 m = tipc_group_find_dest(grp, dnode, dport); 423 m = tipc_group_find_dest(grp, dnode, dport);
397 *mbr = m; 424 if (!tipc_group_is_receiver(m)) {
398 if (!m) 425 *mbr = NULL;
399 return false; 426 return false;
400 if (m->usr_pending) 427 }
401 return true; 428 *mbr = m;
429
402 if (m->window >= len) 430 if (m->window >= len)
403 return false; 431 return false;
404 m->usr_pending = true; 432
433 grp->open = false;
405 434
406 /* If not fully advertised, do it now to prevent mutual blocking */ 435 /* If not fully advertised, do it now to prevent mutual blocking */
407 adv = m->advertised; 436 adv = m->advertised;
408 state = m->state; 437 state = m->state;
409 if (state < MBR_JOINED)
410 return true;
411 if (state == MBR_JOINED && adv == ADV_IDLE) 438 if (state == MBR_JOINED && adv == ADV_IDLE)
412 return true; 439 return true;
413 if (state == MBR_ACTIVE && adv == ADV_ACTIVE) 440 if (state == MBR_ACTIVE && adv == ADV_ACTIVE)
@@ -425,13 +452,14 @@ bool tipc_group_bc_cong(struct tipc_group *grp, int len)
425 struct tipc_member *m = NULL; 452 struct tipc_member *m = NULL;
426 453
427 /* If prev bcast was replicast, reject until all receivers have acked */ 454 /* If prev bcast was replicast, reject until all receivers have acked */
428 if (grp->bc_ackers) 455 if (grp->bc_ackers) {
456 grp->open = false;
429 return true; 457 return true;
430 458 }
431 if (list_empty(&grp->congested)) 459 if (list_empty(&grp->small_win))
432 return false; 460 return false;
433 461
434 m = list_first_entry(&grp->congested, struct tipc_member, congested); 462 m = list_first_entry(&grp->small_win, struct tipc_member, small_win);
435 if (m->window >= len) 463 if (m->window >= len)
436 return false; 464 return false;
437 465
@@ -486,7 +514,7 @@ void tipc_group_filter_msg(struct tipc_group *grp, struct sk_buff_head *inputq,
486 goto drop; 514 goto drop;
487 515
488 m = tipc_group_find_member(grp, node, port); 516 m = tipc_group_find_member(grp, node, port);
489 if (!tipc_group_is_receiver(m)) 517 if (!tipc_group_is_sender(m))
490 goto drop; 518 goto drop;
491 519
492 if (less(msg_grp_bc_seqno(hdr), m->bc_rcv_nxt)) 520 if (less(msg_grp_bc_seqno(hdr), m->bc_rcv_nxt))
@@ -573,24 +601,34 @@ void tipc_group_update_rcv_win(struct tipc_group *grp, int blks, u32 node,
573 601
574 switch (m->state) { 602 switch (m->state) {
575 case MBR_JOINED: 603 case MBR_JOINED:
576 /* Reclaim advertised space from least active member */ 604 /* First, decide if member can go active */
577 if (!list_empty(active) && active_cnt >= reclaim_limit) { 605 if (active_cnt <= max_active) {
606 m->state = MBR_ACTIVE;
607 list_add_tail(&m->list, active);
608 grp->active_cnt++;
609 tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq);
610 } else {
611 m->state = MBR_PENDING;
612 list_add_tail(&m->list, &grp->pending);
613 }
614
615 if (active_cnt < reclaim_limit)
616 break;
617
618 /* Reclaim from oldest active member, if possible */
619 if (!list_empty(active)) {
578 rm = list_first_entry(active, struct tipc_member, list); 620 rm = list_first_entry(active, struct tipc_member, list);
579 rm->state = MBR_RECLAIMING; 621 rm->state = MBR_RECLAIMING;
580 list_move_tail(&rm->list, &grp->reclaiming); 622 list_del_init(&rm->list);
581 tipc_group_proto_xmit(grp, rm, GRP_RECLAIM_MSG, xmitq); 623 tipc_group_proto_xmit(grp, rm, GRP_RECLAIM_MSG, xmitq);
582 }
583 /* If max active, become pending and wait for reclaimed space */
584 if (active_cnt >= max_active) {
585 m->state = MBR_PENDING;
586 list_add_tail(&m->list, &grp->pending);
587 break; 624 break;
588 } 625 }
589 /* Otherwise become active */ 626 /* Nobody to reclaim from; - revert oldest pending to JOINED */
590 m->state = MBR_ACTIVE; 627 pm = list_first_entry(&grp->pending, struct tipc_member, list);
591 list_add_tail(&m->list, &grp->active); 628 list_del_init(&pm->list);
592 grp->active_cnt++; 629 pm->state = MBR_JOINED;
593 /* Fall through */ 630 tipc_group_proto_xmit(grp, pm, GRP_ADV_MSG, xmitq);
631 break;
594 case MBR_ACTIVE: 632 case MBR_ACTIVE:
595 if (!list_is_last(&m->list, &grp->active)) 633 if (!list_is_last(&m->list, &grp->active))
596 list_move_tail(&m->list, &grp->active); 634 list_move_tail(&m->list, &grp->active);
@@ -602,12 +640,12 @@ void tipc_group_update_rcv_win(struct tipc_group *grp, int blks, u32 node,
602 if (m->advertised > ADV_IDLE) 640 if (m->advertised > ADV_IDLE)
603 break; 641 break;
604 m->state = MBR_JOINED; 642 m->state = MBR_JOINED;
643 grp->active_cnt--;
605 if (m->advertised < ADV_IDLE) { 644 if (m->advertised < ADV_IDLE) {
606 pr_warn_ratelimited("Rcv unexpected msg after REMIT\n"); 645 pr_warn_ratelimited("Rcv unexpected msg after REMIT\n");
607 tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq); 646 tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq);
608 } 647 }
609 grp->active_cnt--; 648
610 list_del_init(&m->list);
611 if (list_empty(&grp->pending)) 649 if (list_empty(&grp->pending))
612 return; 650 return;
613 651
@@ -619,7 +657,6 @@ void tipc_group_update_rcv_win(struct tipc_group *grp, int blks, u32 node,
619 tipc_group_proto_xmit(grp, pm, GRP_ADV_MSG, xmitq); 657 tipc_group_proto_xmit(grp, pm, GRP_ADV_MSG, xmitq);
620 break; 658 break;
621 case MBR_RECLAIMING: 659 case MBR_RECLAIMING:
622 case MBR_DISCOVERED:
623 case MBR_JOINING: 660 case MBR_JOINING:
624 case MBR_LEAVING: 661 case MBR_LEAVING:
625 default: 662 default:
@@ -627,6 +664,40 @@ void tipc_group_update_rcv_win(struct tipc_group *grp, int blks, u32 node,
627 } 664 }
628} 665}
629 666
667static void tipc_group_create_event(struct tipc_group *grp,
668 struct tipc_member *m,
669 u32 event, u16 seqno,
670 struct sk_buff_head *inputq)
671{ u32 dnode = tipc_own_addr(grp->net);
672 struct tipc_event evt;
673 struct sk_buff *skb;
674 struct tipc_msg *hdr;
675
676 evt.event = event;
677 evt.found_lower = m->instance;
678 evt.found_upper = m->instance;
679 evt.port.ref = m->port;
680 evt.port.node = m->node;
681 evt.s.seq.type = grp->type;
682 evt.s.seq.lower = m->instance;
683 evt.s.seq.upper = m->instance;
684
685 skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, TIPC_GRP_MEMBER_EVT,
686 GROUP_H_SIZE, sizeof(evt), dnode, m->node,
687 grp->portid, m->port, 0);
688 if (!skb)
689 return;
690
691 hdr = buf_msg(skb);
692 msg_set_nametype(hdr, grp->type);
693 msg_set_grp_evt(hdr, event);
694 msg_set_dest_droppable(hdr, true);
695 msg_set_grp_bc_seqno(hdr, seqno);
696 memcpy(msg_data(hdr), &evt, sizeof(evt));
697 TIPC_SKB_CB(skb)->orig_member = m->instance;
698 __skb_queue_tail(inputq, skb);
699}
700
630static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m, 701static void tipc_group_proto_xmit(struct tipc_group *grp, struct tipc_member *m,
631 int mtyp, struct sk_buff_head *xmitq) 702 int mtyp, struct sk_buff_head *xmitq)
632{ 703{
@@ -672,83 +743,73 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup,
672 u32 node = msg_orignode(hdr); 743 u32 node = msg_orignode(hdr);
673 u32 port = msg_origport(hdr); 744 u32 port = msg_origport(hdr);
674 struct tipc_member *m, *pm; 745 struct tipc_member *m, *pm;
675 struct tipc_msg *ehdr;
676 u16 remitted, in_flight; 746 u16 remitted, in_flight;
677 747
678 if (!grp) 748 if (!grp)
679 return; 749 return;
680 750
751 if (grp->scope == TIPC_NODE_SCOPE && node != tipc_own_addr(grp->net))
752 return;
753
681 m = tipc_group_find_member(grp, node, port); 754 m = tipc_group_find_member(grp, node, port);
682 755
683 switch (msg_type(hdr)) { 756 switch (msg_type(hdr)) {
684 case GRP_JOIN_MSG: 757 case GRP_JOIN_MSG:
685 if (!m) 758 if (!m)
686 m = tipc_group_create_member(grp, node, port, 759 m = tipc_group_create_member(grp, node, port,
687 MBR_QUARANTINED); 760 0, MBR_JOINING);
688 if (!m) 761 if (!m)
689 return; 762 return;
690 m->bc_syncpt = msg_grp_bc_syncpt(hdr); 763 m->bc_syncpt = msg_grp_bc_syncpt(hdr);
691 m->bc_rcv_nxt = m->bc_syncpt; 764 m->bc_rcv_nxt = m->bc_syncpt;
692 m->window += msg_adv_win(hdr); 765 m->window += msg_adv_win(hdr);
693 766
694 /* Wait until PUBLISH event is received */ 767 /* Wait until PUBLISH event is received if necessary */
695 if (m->state == MBR_DISCOVERED) { 768 if (m->state != MBR_PUBLISHED)
696 m->state = MBR_JOINING; 769 return;
697 } else if (m->state == MBR_PUBLISHED) { 770
698 m->state = MBR_JOINED; 771 /* Member can be taken into service */
699 *usr_wakeup = true; 772 m->state = MBR_JOINED;
700 m->usr_pending = false; 773 tipc_group_open(m, usr_wakeup);
701 tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq);
702 ehdr = buf_msg(m->event_msg);
703 msg_set_grp_bc_seqno(ehdr, m->bc_syncpt);
704 __skb_queue_tail(inputq, m->event_msg);
705 }
706 list_del_init(&m->congested);
707 tipc_group_update_member(m, 0); 774 tipc_group_update_member(m, 0);
775 tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq);
776 tipc_group_create_event(grp, m, TIPC_PUBLISHED,
777 m->bc_syncpt, inputq);
708 return; 778 return;
709 case GRP_LEAVE_MSG: 779 case GRP_LEAVE_MSG:
710 if (!m) 780 if (!m)
711 return; 781 return;
712 m->bc_syncpt = msg_grp_bc_syncpt(hdr); 782 m->bc_syncpt = msg_grp_bc_syncpt(hdr);
713 list_del_init(&m->list); 783 list_del_init(&m->list);
714 list_del_init(&m->congested); 784 tipc_group_open(m, usr_wakeup);
715 *usr_wakeup = true; 785 tipc_group_decr_active(grp, m);
716 786 m->state = MBR_LEAVING;
717 /* Wait until WITHDRAW event is received */ 787 tipc_group_create_event(grp, m, TIPC_WITHDRAWN,
718 if (m->state != MBR_LEAVING) { 788 m->bc_syncpt, inputq);
719 tipc_group_decr_active(grp, m);
720 m->state = MBR_LEAVING;
721 return;
722 }
723 /* Otherwise deliver already received WITHDRAW event */
724 ehdr = buf_msg(m->event_msg);
725 msg_set_grp_bc_seqno(ehdr, m->bc_syncpt);
726 __skb_queue_tail(inputq, m->event_msg);
727 return; 789 return;
728 case GRP_ADV_MSG: 790 case GRP_ADV_MSG:
729 if (!m) 791 if (!m)
730 return; 792 return;
731 m->window += msg_adv_win(hdr); 793 m->window += msg_adv_win(hdr);
732 *usr_wakeup = m->usr_pending; 794 tipc_group_open(m, usr_wakeup);
733 m->usr_pending = false;
734 list_del_init(&m->congested);
735 return; 795 return;
736 case GRP_ACK_MSG: 796 case GRP_ACK_MSG:
737 if (!m) 797 if (!m)
738 return; 798 return;
739 m->bc_acked = msg_grp_bc_acked(hdr); 799 m->bc_acked = msg_grp_bc_acked(hdr);
740 if (--grp->bc_ackers) 800 if (--grp->bc_ackers)
741 break; 801 return;
802 list_del_init(&m->small_win);
803 m->group->open = true;
742 *usr_wakeup = true; 804 *usr_wakeup = true;
743 m->usr_pending = false; 805 tipc_group_update_member(m, 0);
744 return; 806 return;
745 case GRP_RECLAIM_MSG: 807 case GRP_RECLAIM_MSG:
746 if (!m) 808 if (!m)
747 return; 809 return;
748 *usr_wakeup = m->usr_pending;
749 m->usr_pending = false;
750 tipc_group_proto_xmit(grp, m, GRP_REMIT_MSG, xmitq); 810 tipc_group_proto_xmit(grp, m, GRP_REMIT_MSG, xmitq);
751 m->window = ADV_IDLE; 811 m->window = ADV_IDLE;
812 tipc_group_open(m, usr_wakeup);
752 return; 813 return;
753 case GRP_REMIT_MSG: 814 case GRP_REMIT_MSG:
754 if (!m || m->state != MBR_RECLAIMING) 815 if (!m || m->state != MBR_RECLAIMING)
@@ -763,18 +824,14 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup,
763 m->advertised = ADV_IDLE + in_flight; 824 m->advertised = ADV_IDLE + in_flight;
764 return; 825 return;
765 } 826 }
766 /* All messages preceding the REMIT have been read */ 827 /* This should never happen */
767 if (m->advertised <= remitted) {
768 m->state = MBR_JOINED;
769 in_flight = 0;
770 }
771 /* ..and the REMIT overtaken by more messages => re-advertise */
772 if (m->advertised < remitted) 828 if (m->advertised < remitted)
773 tipc_group_proto_xmit(grp, m, GRP_ADV_MSG, xmitq); 829 pr_warn_ratelimited("Unexpected REMIT msg\n");
774 830
775 m->advertised = ADV_IDLE + in_flight; 831 /* All messages preceding the REMIT have been read */
832 m->state = MBR_JOINED;
776 grp->active_cnt--; 833 grp->active_cnt--;
777 list_del_init(&m->list); 834 m->advertised = ADV_IDLE;
778 835
779 /* Set oldest pending member to active and advertise */ 836 /* Set oldest pending member to active and advertise */
780 if (list_empty(&grp->pending)) 837 if (list_empty(&grp->pending))
@@ -796,11 +853,10 @@ void tipc_group_proto_rcv(struct tipc_group *grp, bool *usr_wakeup,
796void tipc_group_member_evt(struct tipc_group *grp, 853void tipc_group_member_evt(struct tipc_group *grp,
797 bool *usr_wakeup, 854 bool *usr_wakeup,
798 int *sk_rcvbuf, 855 int *sk_rcvbuf,
799 struct sk_buff *skb, 856 struct tipc_msg *hdr,
800 struct sk_buff_head *inputq, 857 struct sk_buff_head *inputq,
801 struct sk_buff_head *xmitq) 858 struct sk_buff_head *xmitq)
802{ 859{
803 struct tipc_msg *hdr = buf_msg(skb);
804 struct tipc_event *evt = (void *)msg_data(hdr); 860 struct tipc_event *evt = (void *)msg_data(hdr);
805 u32 instance = evt->found_lower; 861 u32 instance = evt->found_lower;
806 u32 node = evt->port.node; 862 u32 node = evt->port.node;
@@ -808,89 +864,59 @@ void tipc_group_member_evt(struct tipc_group *grp,
808 int event = evt->event; 864 int event = evt->event;
809 struct tipc_member *m; 865 struct tipc_member *m;
810 struct net *net; 866 struct net *net;
811 bool node_up;
812 u32 self; 867 u32 self;
813 868
814 if (!grp) 869 if (!grp)
815 goto drop; 870 return;
816 871
817 net = grp->net; 872 net = grp->net;
818 self = tipc_own_addr(net); 873 self = tipc_own_addr(net);
819 if (!grp->loopback && node == self && port == grp->portid) 874 if (!grp->loopback && node == self && port == grp->portid)
820 goto drop; 875 return;
821
822 /* Convert message before delivery to user */
823 msg_set_hdr_sz(hdr, GROUP_H_SIZE);
824 msg_set_user(hdr, TIPC_CRITICAL_IMPORTANCE);
825 msg_set_type(hdr, TIPC_GRP_MEMBER_EVT);
826 msg_set_origport(hdr, port);
827 msg_set_orignode(hdr, node);
828 msg_set_nametype(hdr, grp->type);
829 msg_set_grp_evt(hdr, event);
830 876
831 m = tipc_group_find_member(grp, node, port); 877 m = tipc_group_find_member(grp, node, port);
832 878
833 if (event == TIPC_PUBLISHED) { 879 switch (event) {
834 if (!m) 880 case TIPC_PUBLISHED:
835 m = tipc_group_create_member(grp, node, port, 881 /* Send and wait for arrival of JOIN message if necessary */
836 MBR_DISCOVERED); 882 if (!m) {
837 if (!m) 883 m = tipc_group_create_member(grp, node, port, instance,
838 goto drop; 884 MBR_PUBLISHED);
839 885 if (!m)
840 /* Hold back event if JOIN message not yet received */ 886 break;
841 if (m->state == MBR_DISCOVERED) { 887 tipc_group_update_member(m, 0);
842 m->event_msg = skb; 888 tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, xmitq);
843 m->state = MBR_PUBLISHED; 889 break;
844 } else {
845 msg_set_grp_bc_seqno(hdr, m->bc_syncpt);
846 __skb_queue_tail(inputq, skb);
847 m->state = MBR_JOINED;
848 *usr_wakeup = true;
849 m->usr_pending = false;
850 } 890 }
891
892 if (m->state != MBR_JOINING)
893 break;
894
895 /* Member can be taken into service */
851 m->instance = instance; 896 m->instance = instance;
852 TIPC_SKB_CB(skb)->orig_member = m->instance; 897 m->state = MBR_JOINED;
898 tipc_group_open(m, usr_wakeup);
899 tipc_group_update_member(m, 0);
853 tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, xmitq); 900 tipc_group_proto_xmit(grp, m, GRP_JOIN_MSG, xmitq);
854 if (m->window < ADV_IDLE) 901 tipc_group_create_event(grp, m, TIPC_PUBLISHED,
855 tipc_group_update_member(m, 0); 902 m->bc_syncpt, inputq);
856 else 903 break;
857 list_del_init(&m->congested); 904 case TIPC_WITHDRAWN:
858 } else if (event == TIPC_WITHDRAWN) {
859 if (!m) 905 if (!m)
860 goto drop; 906 break;
861
862 TIPC_SKB_CB(skb)->orig_member = m->instance;
863 907
864 *usr_wakeup = true; 908 tipc_group_decr_active(grp, m);
865 m->usr_pending = false; 909 m->state = MBR_LEAVING;
866 node_up = tipc_node_is_up(net, node);
867 m->event_msg = NULL;
868
869 if (node_up) {
870 /* Hold back event if a LEAVE msg should be expected */
871 if (m->state != MBR_LEAVING) {
872 m->event_msg = skb;
873 tipc_group_decr_active(grp, m);
874 m->state = MBR_LEAVING;
875 } else {
876 msg_set_grp_bc_seqno(hdr, m->bc_syncpt);
877 __skb_queue_tail(inputq, skb);
878 }
879 } else {
880 if (m->state != MBR_LEAVING) {
881 tipc_group_decr_active(grp, m);
882 m->state = MBR_LEAVING;
883 msg_set_grp_bc_seqno(hdr, m->bc_rcv_nxt);
884 } else {
885 msg_set_grp_bc_seqno(hdr, m->bc_syncpt);
886 }
887 __skb_queue_tail(inputq, skb);
888 }
889 list_del_init(&m->list); 910 list_del_init(&m->list);
890 list_del_init(&m->congested); 911 tipc_group_open(m, usr_wakeup);
912
913 /* Only send event if no LEAVE message can be expected */
914 if (!tipc_node_is_up(net, node))
915 tipc_group_create_event(grp, m, TIPC_WITHDRAWN,
916 m->bc_rcv_nxt, inputq);
917 break;
918 default:
919 break;
891 } 920 }
892 *sk_rcvbuf = tipc_group_rcvbuf_limit(grp); 921 *sk_rcvbuf = tipc_group_rcvbuf_limit(grp);
893 return;
894drop:
895 kfree_skb(skb);
896} 922}
diff --git a/net/tipc/group.h b/net/tipc/group.h
index d525e1cd7de5..f4a596ed9848 100644
--- a/net/tipc/group.h
+++ b/net/tipc/group.h
@@ -44,8 +44,10 @@ struct tipc_msg;
44 44
45struct tipc_group *tipc_group_create(struct net *net, u32 portid, 45struct tipc_group *tipc_group_create(struct net *net, u32 portid,
46 struct tipc_group_req *mreq); 46 struct tipc_group_req *mreq);
47void tipc_group_join(struct net *net, struct tipc_group *grp, int *sk_rcv_buf);
47void tipc_group_delete(struct net *net, struct tipc_group *grp); 48void tipc_group_delete(struct net *net, struct tipc_group *grp);
48void tipc_group_add_member(struct tipc_group *grp, u32 node, u32 port); 49void tipc_group_add_member(struct tipc_group *grp, u32 node,
50 u32 port, u32 instance);
49struct tipc_nlist *tipc_group_dests(struct tipc_group *grp); 51struct tipc_nlist *tipc_group_dests(struct tipc_group *grp);
50void tipc_group_self(struct tipc_group *grp, struct tipc_name_seq *seq, 52void tipc_group_self(struct tipc_group *grp, struct tipc_name_seq *seq,
51 int *scope); 53 int *scope);
@@ -54,7 +56,7 @@ void tipc_group_filter_msg(struct tipc_group *grp,
54 struct sk_buff_head *inputq, 56 struct sk_buff_head *inputq,
55 struct sk_buff_head *xmitq); 57 struct sk_buff_head *xmitq);
56void tipc_group_member_evt(struct tipc_group *grp, bool *wakeup, 58void tipc_group_member_evt(struct tipc_group *grp, bool *wakeup,
57 int *sk_rcvbuf, struct sk_buff *skb, 59 int *sk_rcvbuf, struct tipc_msg *hdr,
58 struct sk_buff_head *inputq, 60 struct sk_buff_head *inputq,
59 struct sk_buff_head *xmitq); 61 struct sk_buff_head *xmitq);
60void tipc_group_proto_rcv(struct tipc_group *grp, bool *wakeup, 62void tipc_group_proto_rcv(struct tipc_group *grp, bool *wakeup,
@@ -65,9 +67,9 @@ void tipc_group_update_bc_members(struct tipc_group *grp, int len, bool ack);
65bool tipc_group_cong(struct tipc_group *grp, u32 dnode, u32 dport, 67bool tipc_group_cong(struct tipc_group *grp, u32 dnode, u32 dport,
66 int len, struct tipc_member **m); 68 int len, struct tipc_member **m);
67bool tipc_group_bc_cong(struct tipc_group *grp, int len); 69bool tipc_group_bc_cong(struct tipc_group *grp, int len);
70bool tipc_group_is_open(struct tipc_group *grp);
68void tipc_group_update_rcv_win(struct tipc_group *grp, int blks, u32 node, 71void tipc_group_update_rcv_win(struct tipc_group *grp, int blks, u32 node,
69 u32 port, struct sk_buff_head *xmitq); 72 u32 port, struct sk_buff_head *xmitq);
70u16 tipc_group_bc_snd_nxt(struct tipc_group *grp); 73u16 tipc_group_bc_snd_nxt(struct tipc_group *grp);
71void tipc_group_update_member(struct tipc_member *m, int len); 74void tipc_group_update_member(struct tipc_member *m, int len);
72int tipc_group_size(struct tipc_group *grp);
73#endif 75#endif
diff --git a/net/tipc/link.c b/net/tipc/link.c
index 6bce0b1117bd..2d6b2aed30e0 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -483,7 +483,7 @@ bool tipc_link_create(struct net *net, char *if_name, int bearer_id,
483/** 483/**
484 * tipc_link_bc_create - create new link to be used for broadcast 484 * tipc_link_bc_create - create new link to be used for broadcast
485 * @n: pointer to associated node 485 * @n: pointer to associated node
486 * @mtu: mtu to be used 486 * @mtu: mtu to be used initially if no peers
487 * @window: send window to be used 487 * @window: send window to be used
488 * @inputq: queue to put messages ready for delivery 488 * @inputq: queue to put messages ready for delivery
489 * @namedq: queue to put binding table update messages ready for delivery 489 * @namedq: queue to put binding table update messages ready for delivery
diff --git a/net/tipc/msg.c b/net/tipc/msg.c
index b0d07b35909d..55d8ba92291d 100644
--- a/net/tipc/msg.c
+++ b/net/tipc/msg.c
@@ -251,20 +251,23 @@ bool tipc_msg_validate(struct sk_buff **_skb)
251 * @pktmax: Max packet size that can be used 251 * @pktmax: Max packet size that can be used
252 * @list: Buffer or chain of buffers to be returned to caller 252 * @list: Buffer or chain of buffers to be returned to caller
253 * 253 *
254 * Note that the recursive call we are making here is safe, since it can
255 * logically go only one further level down.
256 *
254 * Returns message data size or errno: -ENOMEM, -EFAULT 257 * Returns message data size or errno: -ENOMEM, -EFAULT
255 */ 258 */
256int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m, 259int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m, int offset,
257 int offset, int dsz, int pktmax, struct sk_buff_head *list) 260 int dsz, int pktmax, struct sk_buff_head *list)
258{ 261{
259 int mhsz = msg_hdr_sz(mhdr); 262 int mhsz = msg_hdr_sz(mhdr);
263 struct tipc_msg pkthdr;
260 int msz = mhsz + dsz; 264 int msz = mhsz + dsz;
261 int pktno = 1;
262 int pktsz;
263 int pktrem = pktmax; 265 int pktrem = pktmax;
264 int drem = dsz;
265 struct tipc_msg pkthdr;
266 struct sk_buff *skb; 266 struct sk_buff *skb;
267 int drem = dsz;
268 int pktno = 1;
267 char *pktpos; 269 char *pktpos;
270 int pktsz;
268 int rc; 271 int rc;
269 272
270 msg_set_size(mhdr, msz); 273 msg_set_size(mhdr, msz);
@@ -272,8 +275,18 @@ int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m,
272 /* No fragmentation needed? */ 275 /* No fragmentation needed? */
273 if (likely(msz <= pktmax)) { 276 if (likely(msz <= pktmax)) {
274 skb = tipc_buf_acquire(msz, GFP_KERNEL); 277 skb = tipc_buf_acquire(msz, GFP_KERNEL);
275 if (unlikely(!skb)) 278
279 /* Fall back to smaller MTU if node local message */
280 if (unlikely(!skb)) {
281 if (pktmax != MAX_MSG_SIZE)
282 return -ENOMEM;
283 rc = tipc_msg_build(mhdr, m, offset, dsz, FB_MTU, list);
284 if (rc != dsz)
285 return rc;
286 if (tipc_msg_assemble(list))
287 return dsz;
276 return -ENOMEM; 288 return -ENOMEM;
289 }
277 skb_orphan(skb); 290 skb_orphan(skb);
278 __skb_queue_tail(list, skb); 291 __skb_queue_tail(list, skb);
279 skb_copy_to_linear_data(skb, mhdr, mhsz); 292 skb_copy_to_linear_data(skb, mhdr, mhsz);
@@ -589,6 +602,30 @@ bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb, int *err)
589 return true; 602 return true;
590} 603}
591 604
605/* tipc_msg_assemble() - assemble chain of fragments into one message
606 */
607bool tipc_msg_assemble(struct sk_buff_head *list)
608{
609 struct sk_buff *skb, *tmp = NULL;
610
611 if (skb_queue_len(list) == 1)
612 return true;
613
614 while ((skb = __skb_dequeue(list))) {
615 skb->next = NULL;
616 if (tipc_buf_append(&tmp, &skb)) {
617 __skb_queue_tail(list, skb);
618 return true;
619 }
620 if (!tmp)
621 break;
622 }
623 __skb_queue_purge(list);
624 __skb_queue_head_init(list);
625 pr_warn("Failed do assemble buffer\n");
626 return false;
627}
628
592/* tipc_msg_reassemble() - clone a buffer chain of fragments and 629/* tipc_msg_reassemble() - clone a buffer chain of fragments and
593 * reassemble the clones into one message 630 * reassemble the clones into one message
594 */ 631 */
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index 3e4384c222f7..b4ba1b4f9ae7 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -98,7 +98,7 @@ struct plist;
98#define MAX_H_SIZE 60 /* Largest possible TIPC header size */ 98#define MAX_H_SIZE 60 /* Largest possible TIPC header size */
99 99
100#define MAX_MSG_SIZE (MAX_H_SIZE + TIPC_MAX_USER_MSG_SIZE) 100#define MAX_MSG_SIZE (MAX_H_SIZE + TIPC_MAX_USER_MSG_SIZE)
101 101#define FB_MTU 3744
102#define TIPC_MEDIA_INFO_OFFSET 5 102#define TIPC_MEDIA_INFO_OFFSET 5
103 103
104struct tipc_skb_cb { 104struct tipc_skb_cb {
@@ -943,6 +943,7 @@ bool tipc_msg_extract(struct sk_buff *skb, struct sk_buff **iskb, int *pos);
943int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m, 943int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m,
944 int offset, int dsz, int mtu, struct sk_buff_head *list); 944 int offset, int dsz, int mtu, struct sk_buff_head *list);
945bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb, int *err); 945bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb, int *err);
946bool tipc_msg_assemble(struct sk_buff_head *list);
946bool tipc_msg_reassemble(struct sk_buff_head *list, struct sk_buff_head *rcvq); 947bool tipc_msg_reassemble(struct sk_buff_head *list, struct sk_buff_head *rcvq);
947bool tipc_msg_pskb_copy(u32 dst, struct sk_buff_head *msg, 948bool tipc_msg_pskb_copy(u32 dst, struct sk_buff_head *msg,
948 struct sk_buff_head *cpy); 949 struct sk_buff_head *cpy);
diff --git a/net/tipc/name_table.c b/net/tipc/name_table.c
index b3829bcf63c7..ed0457cc99d6 100644
--- a/net/tipc/name_table.c
+++ b/net/tipc/name_table.c
@@ -328,7 +328,8 @@ static struct publication *tipc_nameseq_insert_publ(struct net *net,
328 list_for_each_entry_safe(s, st, &nseq->subscriptions, nameseq_list) { 328 list_for_each_entry_safe(s, st, &nseq->subscriptions, nameseq_list) {
329 tipc_subscrp_report_overlap(s, publ->lower, publ->upper, 329 tipc_subscrp_report_overlap(s, publ->lower, publ->upper,
330 TIPC_PUBLISHED, publ->ref, 330 TIPC_PUBLISHED, publ->ref,
331 publ->node, created_subseq); 331 publ->node, publ->scope,
332 created_subseq);
332 } 333 }
333 return publ; 334 return publ;
334} 335}
@@ -398,19 +399,21 @@ found:
398 list_for_each_entry_safe(s, st, &nseq->subscriptions, nameseq_list) { 399 list_for_each_entry_safe(s, st, &nseq->subscriptions, nameseq_list) {
399 tipc_subscrp_report_overlap(s, publ->lower, publ->upper, 400 tipc_subscrp_report_overlap(s, publ->lower, publ->upper,
400 TIPC_WITHDRAWN, publ->ref, 401 TIPC_WITHDRAWN, publ->ref,
401 publ->node, removed_subseq); 402 publ->node, publ->scope,
403 removed_subseq);
402 } 404 }
403 405
404 return publ; 406 return publ;
405} 407}
406 408
407/** 409/**
408 * tipc_nameseq_subscribe - attach a subscription, and issue 410 * tipc_nameseq_subscribe - attach a subscription, and optionally
409 * the prescribed number of events if there is any sub- 411 * issue the prescribed number of events if there is any sub-
410 * sequence overlapping with the requested sequence 412 * sequence overlapping with the requested sequence
411 */ 413 */
412static void tipc_nameseq_subscribe(struct name_seq *nseq, 414static void tipc_nameseq_subscribe(struct name_seq *nseq,
413 struct tipc_subscription *s) 415 struct tipc_subscription *s,
416 bool status)
414{ 417{
415 struct sub_seq *sseq = nseq->sseqs; 418 struct sub_seq *sseq = nseq->sseqs;
416 struct tipc_name_seq ns; 419 struct tipc_name_seq ns;
@@ -420,7 +423,7 @@ static void tipc_nameseq_subscribe(struct name_seq *nseq,
420 tipc_subscrp_get(s); 423 tipc_subscrp_get(s);
421 list_add(&s->nameseq_list, &nseq->subscriptions); 424 list_add(&s->nameseq_list, &nseq->subscriptions);
422 425
423 if (!sseq) 426 if (!status || !sseq)
424 return; 427 return;
425 428
426 while (sseq != &nseq->sseqs[nseq->first_free]) { 429 while (sseq != &nseq->sseqs[nseq->first_free]) {
@@ -434,6 +437,7 @@ static void tipc_nameseq_subscribe(struct name_seq *nseq,
434 sseq->upper, 437 sseq->upper,
435 TIPC_PUBLISHED, 438 TIPC_PUBLISHED,
436 crs->ref, crs->node, 439 crs->ref, crs->node,
440 crs->scope,
437 must_report); 441 must_report);
438 must_report = 0; 442 must_report = 0;
439 } 443 }
@@ -597,7 +601,7 @@ not_found:
597 return ref; 601 return ref;
598} 602}
599 603
600bool tipc_nametbl_lookup(struct net *net, u32 type, u32 instance, u32 domain, 604bool tipc_nametbl_lookup(struct net *net, u32 type, u32 instance, u32 scope,
601 struct list_head *dsts, int *dstcnt, u32 exclude, 605 struct list_head *dsts, int *dstcnt, u32 exclude,
602 bool all) 606 bool all)
603{ 607{
@@ -607,9 +611,6 @@ bool tipc_nametbl_lookup(struct net *net, u32 type, u32 instance, u32 domain,
607 struct name_seq *seq; 611 struct name_seq *seq;
608 struct sub_seq *sseq; 612 struct sub_seq *sseq;
609 613
610 if (!tipc_in_scope(domain, self))
611 return false;
612
613 *dstcnt = 0; 614 *dstcnt = 0;
614 rcu_read_lock(); 615 rcu_read_lock();
615 seq = nametbl_find_seq(net, type); 616 seq = nametbl_find_seq(net, type);
@@ -620,7 +621,7 @@ bool tipc_nametbl_lookup(struct net *net, u32 type, u32 instance, u32 domain,
620 if (likely(sseq)) { 621 if (likely(sseq)) {
621 info = sseq->info; 622 info = sseq->info;
622 list_for_each_entry(publ, &info->zone_list, zone_list) { 623 list_for_each_entry(publ, &info->zone_list, zone_list) {
623 if (!tipc_in_scope(domain, publ->node)) 624 if (publ->scope != scope)
624 continue; 625 continue;
625 if (publ->ref == exclude && publ->node == self) 626 if (publ->ref == exclude && publ->node == self)
626 continue; 627 continue;
@@ -638,13 +639,14 @@ exit:
638 return !list_empty(dsts); 639 return !list_empty(dsts);
639} 640}
640 641
641int tipc_nametbl_mc_translate(struct net *net, u32 type, u32 lower, u32 upper, 642int tipc_nametbl_mc_lookup(struct net *net, u32 type, u32 lower, u32 upper,
642 u32 limit, struct list_head *dports) 643 u32 scope, bool exact, struct list_head *dports)
643{ 644{
644 struct name_seq *seq;
645 struct sub_seq *sseq;
646 struct sub_seq *sseq_stop; 645 struct sub_seq *sseq_stop;
647 struct name_info *info; 646 struct name_info *info;
647 struct publication *p;
648 struct name_seq *seq;
649 struct sub_seq *sseq;
648 int res = 0; 650 int res = 0;
649 651
650 rcu_read_lock(); 652 rcu_read_lock();
@@ -656,15 +658,12 @@ int tipc_nametbl_mc_translate(struct net *net, u32 type, u32 lower, u32 upper,
656 sseq = seq->sseqs + nameseq_locate_subseq(seq, lower); 658 sseq = seq->sseqs + nameseq_locate_subseq(seq, lower);
657 sseq_stop = seq->sseqs + seq->first_free; 659 sseq_stop = seq->sseqs + seq->first_free;
658 for (; sseq != sseq_stop; sseq++) { 660 for (; sseq != sseq_stop; sseq++) {
659 struct publication *publ;
660
661 if (sseq->lower > upper) 661 if (sseq->lower > upper)
662 break; 662 break;
663
664 info = sseq->info; 663 info = sseq->info;
665 list_for_each_entry(publ, &info->node_list, node_list) { 664 list_for_each_entry(p, &info->node_list, node_list) {
666 if (publ->scope <= limit) 665 if (p->scope == scope || (!exact && p->scope < scope))
667 tipc_dest_push(dports, 0, publ->ref); 666 tipc_dest_push(dports, 0, p->ref);
668 } 667 }
669 668
670 if (info->cluster_list_size != info->node_list_size) 669 if (info->cluster_list_size != info->node_list_size)
@@ -681,8 +680,7 @@ exit:
681 * - Determines if any node local ports overlap 680 * - Determines if any node local ports overlap
682 */ 681 */
683void tipc_nametbl_lookup_dst_nodes(struct net *net, u32 type, u32 lower, 682void tipc_nametbl_lookup_dst_nodes(struct net *net, u32 type, u32 lower,
684 u32 upper, u32 domain, 683 u32 upper, struct tipc_nlist *nodes)
685 struct tipc_nlist *nodes)
686{ 684{
687 struct sub_seq *sseq, *stop; 685 struct sub_seq *sseq, *stop;
688 struct publication *publ; 686 struct publication *publ;
@@ -700,8 +698,7 @@ void tipc_nametbl_lookup_dst_nodes(struct net *net, u32 type, u32 lower,
700 for (; sseq != stop && sseq->lower <= upper; sseq++) { 698 for (; sseq != stop && sseq->lower <= upper; sseq++) {
701 info = sseq->info; 699 info = sseq->info;
702 list_for_each_entry(publ, &info->zone_list, zone_list) { 700 list_for_each_entry(publ, &info->zone_list, zone_list) {
703 if (tipc_in_scope(domain, publ->node)) 701 tipc_nlist_add(nodes, publ->node);
704 tipc_nlist_add(nodes, publ->node);
705 } 702 }
706 } 703 }
707 spin_unlock_bh(&seq->lock); 704 spin_unlock_bh(&seq->lock);
@@ -712,7 +709,7 @@ exit:
712/* tipc_nametbl_build_group - build list of communication group members 709/* tipc_nametbl_build_group - build list of communication group members
713 */ 710 */
714void tipc_nametbl_build_group(struct net *net, struct tipc_group *grp, 711void tipc_nametbl_build_group(struct net *net, struct tipc_group *grp,
715 u32 type, u32 domain) 712 u32 type, u32 scope)
716{ 713{
717 struct sub_seq *sseq, *stop; 714 struct sub_seq *sseq, *stop;
718 struct name_info *info; 715 struct name_info *info;
@@ -730,9 +727,9 @@ void tipc_nametbl_build_group(struct net *net, struct tipc_group *grp,
730 for (; sseq != stop; sseq++) { 727 for (; sseq != stop; sseq++) {
731 info = sseq->info; 728 info = sseq->info;
732 list_for_each_entry(p, &info->zone_list, zone_list) { 729 list_for_each_entry(p, &info->zone_list, zone_list) {
733 if (!tipc_in_scope(domain, p->node)) 730 if (p->scope != scope)
734 continue; 731 continue;
735 tipc_group_add_member(grp, p->node, p->ref); 732 tipc_group_add_member(grp, p->node, p->ref, p->lower);
736 } 733 }
737 } 734 }
738 spin_unlock_bh(&seq->lock); 735 spin_unlock_bh(&seq->lock);
@@ -811,7 +808,7 @@ int tipc_nametbl_withdraw(struct net *net, u32 type, u32 lower, u32 ref,
811/** 808/**
812 * tipc_nametbl_subscribe - add a subscription object to the name table 809 * tipc_nametbl_subscribe - add a subscription object to the name table
813 */ 810 */
814void tipc_nametbl_subscribe(struct tipc_subscription *s) 811void tipc_nametbl_subscribe(struct tipc_subscription *s, bool status)
815{ 812{
816 struct tipc_net *tn = net_generic(s->net, tipc_net_id); 813 struct tipc_net *tn = net_generic(s->net, tipc_net_id);
817 u32 type = tipc_subscrp_convert_seq_type(s->evt.s.seq.type, s->swap); 814 u32 type = tipc_subscrp_convert_seq_type(s->evt.s.seq.type, s->swap);
@@ -825,7 +822,7 @@ void tipc_nametbl_subscribe(struct tipc_subscription *s)
825 seq = tipc_nameseq_create(type, &tn->nametbl->seq_hlist[index]); 822 seq = tipc_nameseq_create(type, &tn->nametbl->seq_hlist[index]);
826 if (seq) { 823 if (seq) {
827 spin_lock_bh(&seq->lock); 824 spin_lock_bh(&seq->lock);
828 tipc_nameseq_subscribe(seq, s); 825 tipc_nameseq_subscribe(seq, s, status);
829 spin_unlock_bh(&seq->lock); 826 spin_unlock_bh(&seq->lock);
830 } else { 827 } else {
831 tipc_subscrp_convert_seq(&s->evt.s.seq, s->swap, &ns); 828 tipc_subscrp_convert_seq(&s->evt.s.seq, s->swap, &ns);
diff --git a/net/tipc/name_table.h b/net/tipc/name_table.h
index 71926e429446..f56e7cb3d436 100644
--- a/net/tipc/name_table.h
+++ b/net/tipc/name_table.h
@@ -100,13 +100,12 @@ struct name_table {
100int tipc_nl_name_table_dump(struct sk_buff *skb, struct netlink_callback *cb); 100int tipc_nl_name_table_dump(struct sk_buff *skb, struct netlink_callback *cb);
101 101
102u32 tipc_nametbl_translate(struct net *net, u32 type, u32 instance, u32 *node); 102u32 tipc_nametbl_translate(struct net *net, u32 type, u32 instance, u32 *node);
103int tipc_nametbl_mc_translate(struct net *net, u32 type, u32 lower, u32 upper, 103int tipc_nametbl_mc_lookup(struct net *net, u32 type, u32 lower, u32 upper,
104 u32 limit, struct list_head *dports); 104 u32 scope, bool exact, struct list_head *dports);
105void tipc_nametbl_build_group(struct net *net, struct tipc_group *grp, 105void tipc_nametbl_build_group(struct net *net, struct tipc_group *grp,
106 u32 type, u32 domain); 106 u32 type, u32 domain);
107void tipc_nametbl_lookup_dst_nodes(struct net *net, u32 type, u32 lower, 107void tipc_nametbl_lookup_dst_nodes(struct net *net, u32 type, u32 lower,
108 u32 upper, u32 domain, 108 u32 upper, struct tipc_nlist *nodes);
109 struct tipc_nlist *nodes);
110bool tipc_nametbl_lookup(struct net *net, u32 type, u32 instance, u32 domain, 109bool tipc_nametbl_lookup(struct net *net, u32 type, u32 instance, u32 domain,
111 struct list_head *dsts, int *dstcnt, u32 exclude, 110 struct list_head *dsts, int *dstcnt, u32 exclude,
112 bool all); 111 bool all);
@@ -121,7 +120,7 @@ struct publication *tipc_nametbl_insert_publ(struct net *net, u32 type,
121struct publication *tipc_nametbl_remove_publ(struct net *net, u32 type, 120struct publication *tipc_nametbl_remove_publ(struct net *net, u32 type,
122 u32 lower, u32 node, u32 ref, 121 u32 lower, u32 node, u32 ref,
123 u32 key); 122 u32 key);
124void tipc_nametbl_subscribe(struct tipc_subscription *s); 123void tipc_nametbl_subscribe(struct tipc_subscription *s, bool status);
125void tipc_nametbl_unsubscribe(struct tipc_subscription *s); 124void tipc_nametbl_unsubscribe(struct tipc_subscription *s);
126int tipc_nametbl_init(struct net *net); 125int tipc_nametbl_init(struct net *net);
127void tipc_nametbl_stop(struct net *net); 126void tipc_nametbl_stop(struct net *net);
diff --git a/net/tipc/server.c b/net/tipc/server.c
index d60c30342327..c0d331f13eee 100644
--- a/net/tipc/server.c
+++ b/net/tipc/server.c
@@ -132,10 +132,11 @@ static struct tipc_conn *tipc_conn_lookup(struct tipc_server *s, int conid)
132 132
133 spin_lock_bh(&s->idr_lock); 133 spin_lock_bh(&s->idr_lock);
134 con = idr_find(&s->conn_idr, conid); 134 con = idr_find(&s->conn_idr, conid);
135 if (con && test_bit(CF_CONNECTED, &con->flags)) 135 if (con) {
136 conn_get(con); 136 if (!test_bit(CF_CONNECTED, &con->flags) ||
137 else 137 !kref_get_unless_zero(&con->kref))
138 con = NULL; 138 con = NULL;
139 }
139 spin_unlock_bh(&s->idr_lock); 140 spin_unlock_bh(&s->idr_lock);
140 return con; 141 return con;
141} 142}
@@ -183,35 +184,28 @@ static void tipc_register_callbacks(struct socket *sock, struct tipc_conn *con)
183 write_unlock_bh(&sk->sk_callback_lock); 184 write_unlock_bh(&sk->sk_callback_lock);
184} 185}
185 186
186static void tipc_unregister_callbacks(struct tipc_conn *con)
187{
188 struct sock *sk = con->sock->sk;
189
190 write_lock_bh(&sk->sk_callback_lock);
191 sk->sk_user_data = NULL;
192 write_unlock_bh(&sk->sk_callback_lock);
193}
194
195static void tipc_close_conn(struct tipc_conn *con) 187static void tipc_close_conn(struct tipc_conn *con)
196{ 188{
197 struct tipc_server *s = con->server; 189 struct tipc_server *s = con->server;
190 struct sock *sk = con->sock->sk;
191 bool disconnect = false;
198 192
199 if (test_and_clear_bit(CF_CONNECTED, &con->flags)) { 193 write_lock_bh(&sk->sk_callback_lock);
200 if (con->sock) 194 disconnect = test_and_clear_bit(CF_CONNECTED, &con->flags);
201 tipc_unregister_callbacks(con); 195 if (disconnect) {
202 196 sk->sk_user_data = NULL;
203 if (con->conid) 197 if (con->conid)
204 s->tipc_conn_release(con->conid, con->usr_data); 198 s->tipc_conn_release(con->conid, con->usr_data);
205
206 /* We shouldn't flush pending works as we may be in the
207 * thread. In fact the races with pending rx/tx work structs
208 * are harmless for us here as we have already deleted this
209 * connection from server connection list.
210 */
211 if (con->sock)
212 kernel_sock_shutdown(con->sock, SHUT_RDWR);
213 conn_put(con);
214 } 199 }
200 write_unlock_bh(&sk->sk_callback_lock);
201
202 /* Handle concurrent calls from sending and receiving threads */
203 if (!disconnect)
204 return;
205
206 /* Don't flush pending works, -just let them expire */
207 kernel_sock_shutdown(con->sock, SHUT_RDWR);
208 conn_put(con);
215} 209}
216 210
217static struct tipc_conn *tipc_alloc_conn(struct tipc_server *s) 211static struct tipc_conn *tipc_alloc_conn(struct tipc_server *s)
@@ -248,9 +242,10 @@ static struct tipc_conn *tipc_alloc_conn(struct tipc_server *s)
248 242
249static int tipc_receive_from_sock(struct tipc_conn *con) 243static int tipc_receive_from_sock(struct tipc_conn *con)
250{ 244{
251 struct msghdr msg = {};
252 struct tipc_server *s = con->server; 245 struct tipc_server *s = con->server;
246 struct sock *sk = con->sock->sk;
253 struct sockaddr_tipc addr; 247 struct sockaddr_tipc addr;
248 struct msghdr msg = {};
254 struct kvec iov; 249 struct kvec iov;
255 void *buf; 250 void *buf;
256 int ret; 251 int ret;
@@ -271,12 +266,15 @@ static int tipc_receive_from_sock(struct tipc_conn *con)
271 goto out_close; 266 goto out_close;
272 } 267 }
273 268
274 s->tipc_conn_recvmsg(sock_net(con->sock->sk), con->conid, &addr, 269 read_lock_bh(&sk->sk_callback_lock);
275 con->usr_data, buf, ret); 270 if (test_bit(CF_CONNECTED, &con->flags))
276 271 ret = s->tipc_conn_recvmsg(sock_net(con->sock->sk), con->conid,
272 &addr, con->usr_data, buf, ret);
273 read_unlock_bh(&sk->sk_callback_lock);
277 kmem_cache_free(s->rcvbuf_cache, buf); 274 kmem_cache_free(s->rcvbuf_cache, buf);
278 275 if (ret < 0)
279 return 0; 276 tipc_conn_terminate(s, con->conid);
277 return ret;
280 278
281out_close: 279out_close:
282 if (ret != -EWOULDBLOCK) 280 if (ret != -EWOULDBLOCK)
@@ -489,8 +487,8 @@ void tipc_conn_terminate(struct tipc_server *s, int conid)
489 } 487 }
490} 488}
491 489
492bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, 490bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, u32 lower,
493 u32 lower, u32 upper, int *conid) 491 u32 upper, u32 filter, int *conid)
494{ 492{
495 struct tipc_subscriber *scbr; 493 struct tipc_subscriber *scbr;
496 struct tipc_subscr sub; 494 struct tipc_subscr sub;
@@ -501,7 +499,7 @@ bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type,
501 sub.seq.lower = lower; 499 sub.seq.lower = lower;
502 sub.seq.upper = upper; 500 sub.seq.upper = upper;
503 sub.timeout = TIPC_WAIT_FOREVER; 501 sub.timeout = TIPC_WAIT_FOREVER;
504 sub.filter = TIPC_SUB_PORTS; 502 sub.filter = filter;
505 *(u32 *)&sub.usr_handle = port; 503 *(u32 *)&sub.usr_handle = port;
506 504
507 con = tipc_alloc_conn(tipc_topsrv(net)); 505 con = tipc_alloc_conn(tipc_topsrv(net));
@@ -525,11 +523,17 @@ bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type,
525void tipc_topsrv_kern_unsubscr(struct net *net, int conid) 523void tipc_topsrv_kern_unsubscr(struct net *net, int conid)
526{ 524{
527 struct tipc_conn *con; 525 struct tipc_conn *con;
526 struct tipc_server *srv;
528 527
529 con = tipc_conn_lookup(tipc_topsrv(net), conid); 528 con = tipc_conn_lookup(tipc_topsrv(net), conid);
530 if (!con) 529 if (!con)
531 return; 530 return;
532 tipc_close_conn(con); 531
532 test_and_clear_bit(CF_CONNECTED, &con->flags);
533 srv = con->server;
534 if (con->conid)
535 srv->tipc_conn_release(con->conid, con->usr_data);
536 conn_put(con);
533 conn_put(con); 537 conn_put(con);
534} 538}
535 539
diff --git a/net/tipc/server.h b/net/tipc/server.h
index 2113c9192633..64df7513cd70 100644
--- a/net/tipc/server.h
+++ b/net/tipc/server.h
@@ -41,6 +41,9 @@
41#include <net/net_namespace.h> 41#include <net/net_namespace.h>
42 42
43#define TIPC_SERVER_NAME_LEN 32 43#define TIPC_SERVER_NAME_LEN 32
44#define TIPC_SUB_CLUSTER_SCOPE 0x20
45#define TIPC_SUB_NODE_SCOPE 0x40
46#define TIPC_SUB_NO_STATUS 0x80
44 47
45/** 48/**
46 * struct tipc_server - TIPC server structure 49 * struct tipc_server - TIPC server structure
@@ -71,9 +74,9 @@ struct tipc_server {
71 int max_rcvbuf_size; 74 int max_rcvbuf_size;
72 void *(*tipc_conn_new)(int conid); 75 void *(*tipc_conn_new)(int conid);
73 void (*tipc_conn_release)(int conid, void *usr_data); 76 void (*tipc_conn_release)(int conid, void *usr_data);
74 void (*tipc_conn_recvmsg)(struct net *net, int conid, 77 int (*tipc_conn_recvmsg)(struct net *net, int conid,
75 struct sockaddr_tipc *addr, void *usr_data, 78 struct sockaddr_tipc *addr, void *usr_data,
76 void *buf, size_t len); 79 void *buf, size_t len);
77 struct sockaddr_tipc *saddr; 80 struct sockaddr_tipc *saddr;
78 char name[TIPC_SERVER_NAME_LEN]; 81 char name[TIPC_SERVER_NAME_LEN];
79 int imp; 82 int imp;
@@ -83,8 +86,8 @@ struct tipc_server {
83int tipc_conn_sendmsg(struct tipc_server *s, int conid, 86int tipc_conn_sendmsg(struct tipc_server *s, int conid,
84 struct sockaddr_tipc *addr, void *data, size_t len); 87 struct sockaddr_tipc *addr, void *data, size_t len);
85 88
86bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, 89bool tipc_topsrv_kern_subscr(struct net *net, u32 port, u32 type, u32 lower,
87 u32 lower, u32 upper, int *conid); 90 u32 upper, u32 filter, int *conid);
88void tipc_topsrv_kern_unsubscr(struct net *net, int conid); 91void tipc_topsrv_kern_unsubscr(struct net *net, int conid);
89 92
90/** 93/**
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index 3b4084480377..d799e50ff722 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -715,7 +715,7 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock,
715{ 715{
716 struct sock *sk = sock->sk; 716 struct sock *sk = sock->sk;
717 struct tipc_sock *tsk = tipc_sk(sk); 717 struct tipc_sock *tsk = tipc_sk(sk);
718 struct tipc_group *grp = tsk->group; 718 struct tipc_group *grp;
719 u32 revents = 0; 719 u32 revents = 0;
720 720
721 sock_poll_wait(file, sk_sleep(sk), wait); 721 sock_poll_wait(file, sk_sleep(sk), wait);
@@ -736,9 +736,9 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock,
736 revents |= POLLIN | POLLRDNORM; 736 revents |= POLLIN | POLLRDNORM;
737 break; 737 break;
738 case TIPC_OPEN: 738 case TIPC_OPEN:
739 if (!grp || tipc_group_size(grp)) 739 grp = tsk->group;
740 if (!tsk->cong_link_cnt) 740 if ((!grp || tipc_group_is_open(grp)) && !tsk->cong_link_cnt)
741 revents |= POLLOUT; 741 revents |= POLLOUT;
742 if (!tipc_sk_type_connectionless(sk)) 742 if (!tipc_sk_type_connectionless(sk))
743 break; 743 break;
744 if (skb_queue_empty(&sk->sk_receive_queue)) 744 if (skb_queue_empty(&sk->sk_receive_queue))
@@ -772,7 +772,6 @@ static int tipc_sendmcast(struct socket *sock, struct tipc_name_seq *seq,
772 struct net *net = sock_net(sk); 772 struct net *net = sock_net(sk);
773 int mtu = tipc_bcast_get_mtu(net); 773 int mtu = tipc_bcast_get_mtu(net);
774 struct tipc_mc_method *method = &tsk->mc_method; 774 struct tipc_mc_method *method = &tsk->mc_method;
775 u32 domain = addr_domain(net, TIPC_CLUSTER_SCOPE);
776 struct sk_buff_head pkts; 775 struct sk_buff_head pkts;
777 struct tipc_nlist dsts; 776 struct tipc_nlist dsts;
778 int rc; 777 int rc;
@@ -788,7 +787,7 @@ static int tipc_sendmcast(struct socket *sock, struct tipc_name_seq *seq,
788 /* Lookup destination nodes */ 787 /* Lookup destination nodes */
789 tipc_nlist_init(&dsts, tipc_own_addr(net)); 788 tipc_nlist_init(&dsts, tipc_own_addr(net));
790 tipc_nametbl_lookup_dst_nodes(net, seq->type, seq->lower, 789 tipc_nametbl_lookup_dst_nodes(net, seq->type, seq->lower,
791 seq->upper, domain, &dsts); 790 seq->upper, &dsts);
792 if (!dsts.local && !dsts.remote) 791 if (!dsts.local && !dsts.remote)
793 return -EHOSTUNREACH; 792 return -EHOSTUNREACH;
794 793
@@ -928,21 +927,22 @@ static int tipc_send_group_anycast(struct socket *sock, struct msghdr *m,
928 struct list_head *cong_links = &tsk->cong_links; 927 struct list_head *cong_links = &tsk->cong_links;
929 int blks = tsk_blocks(GROUP_H_SIZE + dlen); 928 int blks = tsk_blocks(GROUP_H_SIZE + dlen);
930 struct tipc_group *grp = tsk->group; 929 struct tipc_group *grp = tsk->group;
930 struct tipc_msg *hdr = &tsk->phdr;
931 struct tipc_member *first = NULL; 931 struct tipc_member *first = NULL;
932 struct tipc_member *mbr = NULL; 932 struct tipc_member *mbr = NULL;
933 struct net *net = sock_net(sk); 933 struct net *net = sock_net(sk);
934 u32 node, port, exclude; 934 u32 node, port, exclude;
935 u32 type, inst, domain;
936 struct list_head dsts; 935 struct list_head dsts;
936 u32 type, inst, scope;
937 int lookups = 0; 937 int lookups = 0;
938 int dstcnt, rc; 938 int dstcnt, rc;
939 bool cong; 939 bool cong;
940 940
941 INIT_LIST_HEAD(&dsts); 941 INIT_LIST_HEAD(&dsts);
942 942
943 type = dest->addr.name.name.type; 943 type = msg_nametype(hdr);
944 inst = dest->addr.name.name.instance; 944 inst = dest->addr.name.name.instance;
945 domain = addr_domain(net, dest->scope); 945 scope = msg_lookup_scope(hdr);
946 exclude = tipc_group_exclude(grp); 946 exclude = tipc_group_exclude(grp);
947 947
948 while (++lookups < 4) { 948 while (++lookups < 4) {
@@ -950,7 +950,7 @@ static int tipc_send_group_anycast(struct socket *sock, struct msghdr *m,
950 950
951 /* Look for a non-congested destination member, if any */ 951 /* Look for a non-congested destination member, if any */
952 while (1) { 952 while (1) {
953 if (!tipc_nametbl_lookup(net, type, inst, domain, &dsts, 953 if (!tipc_nametbl_lookup(net, type, inst, scope, &dsts,
954 &dstcnt, exclude, false)) 954 &dstcnt, exclude, false))
955 return -EHOSTUNREACH; 955 return -EHOSTUNREACH;
956 tipc_dest_pop(&dsts, &node, &port); 956 tipc_dest_pop(&dsts, &node, &port);
@@ -1079,22 +1079,23 @@ static int tipc_send_group_mcast(struct socket *sock, struct msghdr *m,
1079{ 1079{
1080 struct sock *sk = sock->sk; 1080 struct sock *sk = sock->sk;
1081 DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name); 1081 DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
1082 struct tipc_name_seq *seq = &dest->addr.nameseq;
1083 struct tipc_sock *tsk = tipc_sk(sk); 1082 struct tipc_sock *tsk = tipc_sk(sk);
1084 struct tipc_group *grp = tsk->group; 1083 struct tipc_group *grp = tsk->group;
1084 struct tipc_msg *hdr = &tsk->phdr;
1085 struct net *net = sock_net(sk); 1085 struct net *net = sock_net(sk);
1086 u32 domain, exclude, dstcnt; 1086 u32 type, inst, scope, exclude;
1087 struct list_head dsts; 1087 struct list_head dsts;
1088 u32 dstcnt;
1088 1089
1089 INIT_LIST_HEAD(&dsts); 1090 INIT_LIST_HEAD(&dsts);
1090 1091
1091 if (seq->lower != seq->upper) 1092 type = msg_nametype(hdr);
1092 return -ENOTSUPP; 1093 inst = dest->addr.name.name.instance;
1093 1094 scope = msg_lookup_scope(hdr);
1094 domain = addr_domain(net, dest->scope);
1095 exclude = tipc_group_exclude(grp); 1095 exclude = tipc_group_exclude(grp);
1096 if (!tipc_nametbl_lookup(net, seq->type, seq->lower, domain, 1096
1097 &dsts, &dstcnt, exclude, true)) 1097 if (!tipc_nametbl_lookup(net, type, inst, scope, &dsts,
1098 &dstcnt, exclude, true))
1098 return -EHOSTUNREACH; 1099 return -EHOSTUNREACH;
1099 1100
1100 if (dstcnt == 1) { 1101 if (dstcnt == 1) {
@@ -1116,24 +1117,29 @@ static int tipc_send_group_mcast(struct socket *sock, struct msghdr *m,
1116void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq, 1117void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
1117 struct sk_buff_head *inputq) 1118 struct sk_buff_head *inputq)
1118{ 1119{
1119 u32 scope = TIPC_CLUSTER_SCOPE;
1120 u32 self = tipc_own_addr(net); 1120 u32 self = tipc_own_addr(net);
1121 u32 type, lower, upper, scope;
1121 struct sk_buff *skb, *_skb; 1122 struct sk_buff *skb, *_skb;
1122 u32 lower = 0, upper = ~0;
1123 struct sk_buff_head tmpq;
1124 u32 portid, oport, onode; 1123 u32 portid, oport, onode;
1124 struct sk_buff_head tmpq;
1125 struct list_head dports; 1125 struct list_head dports;
1126 struct tipc_msg *msg; 1126 struct tipc_msg *hdr;
1127 int user, mtyp, hsz; 1127 int user, mtyp, hlen;
1128 bool exact;
1128 1129
1129 __skb_queue_head_init(&tmpq); 1130 __skb_queue_head_init(&tmpq);
1130 INIT_LIST_HEAD(&dports); 1131 INIT_LIST_HEAD(&dports);
1131 1132
1132 skb = tipc_skb_peek(arrvq, &inputq->lock); 1133 skb = tipc_skb_peek(arrvq, &inputq->lock);
1133 for (; skb; skb = tipc_skb_peek(arrvq, &inputq->lock)) { 1134 for (; skb; skb = tipc_skb_peek(arrvq, &inputq->lock)) {
1134 msg = buf_msg(skb); 1135 hdr = buf_msg(skb);
1135 user = msg_user(msg); 1136 user = msg_user(hdr);
1136 mtyp = msg_type(msg); 1137 mtyp = msg_type(hdr);
1138 hlen = skb_headroom(skb) + msg_hdr_sz(hdr);
1139 oport = msg_origport(hdr);
1140 onode = msg_orignode(hdr);
1141 type = msg_nametype(hdr);
1142
1137 if (mtyp == TIPC_GRP_UCAST_MSG || user == GROUP_PROTOCOL) { 1143 if (mtyp == TIPC_GRP_UCAST_MSG || user == GROUP_PROTOCOL) {
1138 spin_lock_bh(&inputq->lock); 1144 spin_lock_bh(&inputq->lock);
1139 if (skb_peek(arrvq) == skb) { 1145 if (skb_peek(arrvq) == skb) {
@@ -1144,21 +1150,31 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
1144 spin_unlock_bh(&inputq->lock); 1150 spin_unlock_bh(&inputq->lock);
1145 continue; 1151 continue;
1146 } 1152 }
1147 hsz = skb_headroom(skb) + msg_hdr_sz(msg); 1153
1148 oport = msg_origport(msg); 1154 /* Group messages require exact scope match */
1149 onode = msg_orignode(msg); 1155 if (msg_in_group(hdr)) {
1150 if (onode == self) 1156 lower = 0;
1151 scope = TIPC_NODE_SCOPE; 1157 upper = ~0;
1152 1158 scope = msg_lookup_scope(hdr);
1153 /* Create destination port list and message clones: */ 1159 exact = true;
1154 if (!msg_in_group(msg)) { 1160 } else {
1155 lower = msg_namelower(msg); 1161 /* TIPC_NODE_SCOPE means "any scope" in this context */
1156 upper = msg_nameupper(msg); 1162 if (onode == self)
1163 scope = TIPC_NODE_SCOPE;
1164 else
1165 scope = TIPC_CLUSTER_SCOPE;
1166 exact = false;
1167 lower = msg_namelower(hdr);
1168 upper = msg_nameupper(hdr);
1157 } 1169 }
1158 tipc_nametbl_mc_translate(net, msg_nametype(msg), lower, upper, 1170
1159 scope, &dports); 1171 /* Create destination port list: */
1172 tipc_nametbl_mc_lookup(net, type, lower, upper,
1173 scope, exact, &dports);
1174
1175 /* Clone message per destination */
1160 while (tipc_dest_pop(&dports, NULL, &portid)) { 1176 while (tipc_dest_pop(&dports, NULL, &portid)) {
1161 _skb = __pskb_copy(skb, hsz, GFP_ATOMIC); 1177 _skb = __pskb_copy(skb, hlen, GFP_ATOMIC);
1162 if (_skb) { 1178 if (_skb) {
1163 msg_set_destport(buf_msg(_skb), portid); 1179 msg_set_destport(buf_msg(_skb), portid);
1164 __skb_queue_tail(&tmpq, _skb); 1180 __skb_queue_tail(&tmpq, _skb);
@@ -1933,8 +1949,7 @@ static void tipc_sk_proto_rcv(struct sock *sk,
1933 break; 1949 break;
1934 case TOP_SRV: 1950 case TOP_SRV:
1935 tipc_group_member_evt(tsk->group, &wakeup, &sk->sk_rcvbuf, 1951 tipc_group_member_evt(tsk->group, &wakeup, &sk->sk_rcvbuf,
1936 skb, inputq, xmitq); 1952 hdr, inputq, xmitq);
1937 skb = NULL;
1938 break; 1953 break;
1939 default: 1954 default:
1940 break; 1955 break;
@@ -2640,9 +2655,7 @@ void tipc_sk_reinit(struct net *net)
2640 rhashtable_walk_enter(&tn->sk_rht, &iter); 2655 rhashtable_walk_enter(&tn->sk_rht, &iter);
2641 2656
2642 do { 2657 do {
2643 tsk = ERR_PTR(rhashtable_walk_start(&iter)); 2658 rhashtable_walk_start(&iter);
2644 if (IS_ERR(tsk))
2645 goto walk_stop;
2646 2659
2647 while ((tsk = rhashtable_walk_next(&iter)) && !IS_ERR(tsk)) { 2660 while ((tsk = rhashtable_walk_next(&iter)) && !IS_ERR(tsk)) {
2648 spin_lock_bh(&tsk->sk.sk_lock.slock); 2661 spin_lock_bh(&tsk->sk.sk_lock.slock);
@@ -2651,7 +2664,7 @@ void tipc_sk_reinit(struct net *net)
2651 msg_set_orignode(msg, tn->own_addr); 2664 msg_set_orignode(msg, tn->own_addr);
2652 spin_unlock_bh(&tsk->sk.sk_lock.slock); 2665 spin_unlock_bh(&tsk->sk.sk_lock.slock);
2653 } 2666 }
2654walk_stop: 2667
2655 rhashtable_walk_stop(&iter); 2668 rhashtable_walk_stop(&iter);
2656 } while (tsk == ERR_PTR(-EAGAIN)); 2669 } while (tsk == ERR_PTR(-EAGAIN));
2657} 2670}
@@ -2734,7 +2747,6 @@ void tipc_sk_rht_destroy(struct net *net)
2734static int tipc_sk_join(struct tipc_sock *tsk, struct tipc_group_req *mreq) 2747static int tipc_sk_join(struct tipc_sock *tsk, struct tipc_group_req *mreq)
2735{ 2748{
2736 struct net *net = sock_net(&tsk->sk); 2749 struct net *net = sock_net(&tsk->sk);
2737 u32 domain = addr_domain(net, mreq->scope);
2738 struct tipc_group *grp = tsk->group; 2750 struct tipc_group *grp = tsk->group;
2739 struct tipc_msg *hdr = &tsk->phdr; 2751 struct tipc_msg *hdr = &tsk->phdr;
2740 struct tipc_name_seq seq; 2752 struct tipc_name_seq seq;
@@ -2742,6 +2754,8 @@ static int tipc_sk_join(struct tipc_sock *tsk, struct tipc_group_req *mreq)
2742 2754
2743 if (mreq->type < TIPC_RESERVED_TYPES) 2755 if (mreq->type < TIPC_RESERVED_TYPES)
2744 return -EACCES; 2756 return -EACCES;
2757 if (mreq->scope > TIPC_NODE_SCOPE)
2758 return -EINVAL;
2745 if (grp) 2759 if (grp)
2746 return -EACCES; 2760 return -EACCES;
2747 grp = tipc_group_create(net, tsk->portid, mreq); 2761 grp = tipc_group_create(net, tsk->portid, mreq);
@@ -2754,16 +2768,17 @@ static int tipc_sk_join(struct tipc_sock *tsk, struct tipc_group_req *mreq)
2754 seq.type = mreq->type; 2768 seq.type = mreq->type;
2755 seq.lower = mreq->instance; 2769 seq.lower = mreq->instance;
2756 seq.upper = seq.lower; 2770 seq.upper = seq.lower;
2757 tipc_nametbl_build_group(net, grp, mreq->type, domain); 2771 tipc_nametbl_build_group(net, grp, mreq->type, mreq->scope);
2758 rc = tipc_sk_publish(tsk, mreq->scope, &seq); 2772 rc = tipc_sk_publish(tsk, mreq->scope, &seq);
2759 if (rc) { 2773 if (rc) {
2760 tipc_group_delete(net, grp); 2774 tipc_group_delete(net, grp);
2761 tsk->group = NULL; 2775 tsk->group = NULL;
2776 return rc;
2762 } 2777 }
2763 2778 /* Eliminate any risk that a broadcast overtakes sent JOINs */
2764 /* Eliminate any risk that a broadcast overtakes the sent JOIN */
2765 tsk->mc_method.rcast = true; 2779 tsk->mc_method.rcast = true;
2766 tsk->mc_method.mandatory = true; 2780 tsk->mc_method.mandatory = true;
2781 tipc_group_join(net, grp, &tsk->sk.sk_rcvbuf);
2767 return rc; 2782 return rc;
2768} 2783}
2769 2784
diff --git a/net/tipc/subscr.c b/net/tipc/subscr.c
index 251065dfd8df..68e26470c516 100644
--- a/net/tipc/subscr.c
+++ b/net/tipc/subscr.c
@@ -118,15 +118,19 @@ void tipc_subscrp_convert_seq(struct tipc_name_seq *in, int swap,
118 118
119void tipc_subscrp_report_overlap(struct tipc_subscription *sub, u32 found_lower, 119void tipc_subscrp_report_overlap(struct tipc_subscription *sub, u32 found_lower,
120 u32 found_upper, u32 event, u32 port_ref, 120 u32 found_upper, u32 event, u32 port_ref,
121 u32 node, int must) 121 u32 node, u32 scope, int must)
122{ 122{
123 u32 filter = htohl(sub->evt.s.filter, sub->swap);
123 struct tipc_name_seq seq; 124 struct tipc_name_seq seq;
124 125
125 tipc_subscrp_convert_seq(&sub->evt.s.seq, sub->swap, &seq); 126 tipc_subscrp_convert_seq(&sub->evt.s.seq, sub->swap, &seq);
126 if (!tipc_subscrp_check_overlap(&seq, found_lower, found_upper)) 127 if (!tipc_subscrp_check_overlap(&seq, found_lower, found_upper))
127 return; 128 return;
128 if (!must && 129 if (!must && !(filter & TIPC_SUB_PORTS))
129 !(htohl(sub->evt.s.filter, sub->swap) & TIPC_SUB_PORTS)) 130 return;
131 if (filter & TIPC_SUB_CLUSTER_SCOPE && scope == TIPC_NODE_SCOPE)
132 return;
133 if (filter & TIPC_SUB_NODE_SCOPE && scope != TIPC_NODE_SCOPE)
130 return; 134 return;
131 135
132 tipc_subscrp_send_event(sub, found_lower, found_upper, event, port_ref, 136 tipc_subscrp_send_event(sub, found_lower, found_upper, event, port_ref,
@@ -285,21 +289,21 @@ static struct tipc_subscription *tipc_subscrp_create(struct net *net,
285 return sub; 289 return sub;
286} 290}
287 291
288static void tipc_subscrp_subscribe(struct net *net, struct tipc_subscr *s, 292static int tipc_subscrp_subscribe(struct net *net, struct tipc_subscr *s,
289 struct tipc_subscriber *subscriber, int swap) 293 struct tipc_subscriber *subscriber, int swap,
294 bool status)
290{ 295{
291 struct tipc_net *tn = net_generic(net, tipc_net_id);
292 struct tipc_subscription *sub = NULL; 296 struct tipc_subscription *sub = NULL;
293 u32 timeout; 297 u32 timeout;
294 298
295 sub = tipc_subscrp_create(net, s, swap); 299 sub = tipc_subscrp_create(net, s, swap);
296 if (!sub) 300 if (!sub)
297 return tipc_conn_terminate(tn->topsrv, subscriber->conid); 301 return -1;
298 302
299 spin_lock_bh(&subscriber->lock); 303 spin_lock_bh(&subscriber->lock);
300 list_add(&sub->subscrp_list, &subscriber->subscrp_list); 304 list_add(&sub->subscrp_list, &subscriber->subscrp_list);
301 sub->subscriber = subscriber; 305 sub->subscriber = subscriber;
302 tipc_nametbl_subscribe(sub); 306 tipc_nametbl_subscribe(sub, status);
303 tipc_subscrb_get(subscriber); 307 tipc_subscrb_get(subscriber);
304 spin_unlock_bh(&subscriber->lock); 308 spin_unlock_bh(&subscriber->lock);
305 309
@@ -308,6 +312,7 @@ static void tipc_subscrp_subscribe(struct net *net, struct tipc_subscr *s,
308 312
309 if (timeout != TIPC_WAIT_FOREVER) 313 if (timeout != TIPC_WAIT_FOREVER)
310 mod_timer(&sub->timer, jiffies + msecs_to_jiffies(timeout)); 314 mod_timer(&sub->timer, jiffies + msecs_to_jiffies(timeout));
315 return 0;
311} 316}
312 317
313/* Handle one termination request for the subscriber */ 318/* Handle one termination request for the subscriber */
@@ -317,12 +322,13 @@ static void tipc_subscrb_release_cb(int conid, void *usr_data)
317} 322}
318 323
319/* Handle one request to create a new subscription for the subscriber */ 324/* Handle one request to create a new subscription for the subscriber */
320static void tipc_subscrb_rcv_cb(struct net *net, int conid, 325static int tipc_subscrb_rcv_cb(struct net *net, int conid,
321 struct sockaddr_tipc *addr, void *usr_data, 326 struct sockaddr_tipc *addr, void *usr_data,
322 void *buf, size_t len) 327 void *buf, size_t len)
323{ 328{
324 struct tipc_subscriber *subscriber = usr_data; 329 struct tipc_subscriber *subscriber = usr_data;
325 struct tipc_subscr *s = (struct tipc_subscr *)buf; 330 struct tipc_subscr *s = (struct tipc_subscr *)buf;
331 bool status;
326 int swap; 332 int swap;
327 333
328 /* Determine subscriber's endianness */ 334 /* Determine subscriber's endianness */
@@ -332,10 +338,11 @@ static void tipc_subscrb_rcv_cb(struct net *net, int conid,
332 /* Detect & process a subscription cancellation request */ 338 /* Detect & process a subscription cancellation request */
333 if (s->filter & htohl(TIPC_SUB_CANCEL, swap)) { 339 if (s->filter & htohl(TIPC_SUB_CANCEL, swap)) {
334 s->filter &= ~htohl(TIPC_SUB_CANCEL, swap); 340 s->filter &= ~htohl(TIPC_SUB_CANCEL, swap);
335 return tipc_subscrp_cancel(s, subscriber); 341 tipc_subscrp_cancel(s, subscriber);
342 return 0;
336 } 343 }
337 344 status = !(s->filter & htohl(TIPC_SUB_NO_STATUS, swap));
338 tipc_subscrp_subscribe(net, s, subscriber, swap); 345 return tipc_subscrp_subscribe(net, s, subscriber, swap, status);
339} 346}
340 347
341/* Handle one request to establish a new subscriber */ 348/* Handle one request to establish a new subscriber */
diff --git a/net/tipc/subscr.h b/net/tipc/subscr.h
index ee52957dc952..f3edca775d9f 100644
--- a/net/tipc/subscr.h
+++ b/net/tipc/subscr.h
@@ -71,7 +71,7 @@ int tipc_subscrp_check_overlap(struct tipc_name_seq *seq, u32 found_lower,
71 u32 found_upper); 71 u32 found_upper);
72void tipc_subscrp_report_overlap(struct tipc_subscription *sub, 72void tipc_subscrp_report_overlap(struct tipc_subscription *sub,
73 u32 found_lower, u32 found_upper, u32 event, 73 u32 found_lower, u32 found_upper, u32 event,
74 u32 port_ref, u32 node, int must); 74 u32 port_ref, u32 node, u32 scope, int must);
75void tipc_subscrp_convert_seq(struct tipc_name_seq *in, int swap, 75void tipc_subscrp_convert_seq(struct tipc_name_seq *in, int swap,
76 struct tipc_name_seq *out); 76 struct tipc_name_seq *out);
77u32 tipc_subscrp_convert_seq_type(u32 type, int swap); 77u32 tipc_subscrp_convert_seq_type(u32 type, int swap);