summaryrefslogtreecommitdiffstats
path: root/net/tipc
diff options
context:
space:
mode:
Diffstat (limited to 'net/tipc')
-rw-r--r--net/tipc/bcast.c163
-rw-r--r--net/tipc/bcast.h16
-rw-r--r--net/tipc/core.h5
-rw-r--r--net/tipc/link.c435
-rw-r--r--net/tipc/link.h43
-rw-r--r--net/tipc/msg.c1
-rw-r--r--net/tipc/msg.h5
-rw-r--r--net/tipc/node.c158
-rw-r--r--net/tipc/node.h9
9 files changed, 615 insertions, 220 deletions
diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index 7fdf895e7973..ea28c2919b38 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -112,11 +112,6 @@ static struct tipc_bc_base *tipc_bc_base(struct net *net)
112 return tipc_net(net)->bcbase; 112 return tipc_net(net)->bcbase;
113} 113}
114 114
115static struct tipc_link *tipc_bc_sndlink(struct net *net)
116{
117 return tipc_net(net)->bcl;
118}
119
120/** 115/**
121 * tipc_nmap_equal - test for equality of node maps 116 * tipc_nmap_equal - test for equality of node maps
122 */ 117 */
@@ -169,31 +164,6 @@ static void bcbuf_decr_acks(struct sk_buff *buf)
169 bcbuf_set_acks(buf, bcbuf_acks(buf) - 1); 164 bcbuf_set_acks(buf, bcbuf_acks(buf) - 1);
170} 165}
171 166
172void tipc_bclink_add_node(struct net *net, u32 addr)
173{
174 struct tipc_net *tn = net_generic(net, tipc_net_id);
175 struct tipc_link *l = tipc_bc_sndlink(net);
176 tipc_bclink_lock(net);
177 tipc_nmap_add(&tn->bcbase->bcast_nodes, addr);
178 tipc_link_add_bc_peer(l);
179 tipc_bclink_unlock(net);
180}
181
182void tipc_bclink_remove_node(struct net *net, u32 addr)
183{
184 struct tipc_net *tn = net_generic(net, tipc_net_id);
185
186 tipc_bclink_lock(net);
187 tipc_nmap_remove(&tn->bcbase->bcast_nodes, addr);
188 tn->bcl->ackers--;
189
190 /* Last node? => reset backlog queue */
191 if (!tn->bcbase->bcast_nodes.count)
192 tipc_link_purge_backlog(tn->bcbase->link);
193
194 tipc_bclink_unlock(net);
195}
196
197static void bclink_set_last_sent(struct net *net) 167static void bclink_set_last_sent(struct net *net)
198{ 168{
199 struct tipc_net *tn = net_generic(net, tipc_net_id); 169 struct tipc_net *tn = net_generic(net, tipc_net_id);
@@ -501,12 +471,141 @@ int tipc_bcast_xmit(struct net *net, struct sk_buff_head *list)
501 __skb_queue_purge(&rcvq); 471 __skb_queue_purge(&rcvq);
502 return rc; 472 return rc;
503 } 473 }
474
504 /* Broadcast to all nodes, inluding local node */ 475 /* Broadcast to all nodes, inluding local node */
505 tipc_bcbearer_xmit(net, &xmitq); 476 tipc_bcbearer_xmit(net, &xmitq);
506 tipc_sk_mcast_rcv(net, &rcvq, &inputq); 477 tipc_sk_mcast_rcv(net, &rcvq, &inputq);
507 __skb_queue_purge(list); 478 __skb_queue_purge(list);
508 return 0; 479 return 0;
509} 480}
481
482/* tipc_bcast_rcv - receive a broadcast packet, and deliver to rcv link
483 *
484 * RCU is locked, no other locks set
485 */
486int tipc_bcast_rcv(struct net *net, struct tipc_link *l, struct sk_buff *skb)
487{
488 struct tipc_msg *hdr = buf_msg(skb);
489 struct sk_buff_head *inputq = &tipc_bc_base(net)->inputq;
490 struct sk_buff_head xmitq;
491 int rc;
492
493 __skb_queue_head_init(&xmitq);
494
495 if (msg_mc_netid(hdr) != tipc_netid(net) || !tipc_link_is_up(l)) {
496 kfree_skb(skb);
497 return 0;
498 }
499
500 tipc_bcast_lock(net);
501 if (msg_user(hdr) == BCAST_PROTOCOL)
502 rc = tipc_link_bc_nack_rcv(l, skb, &xmitq);
503 else
504 rc = tipc_link_rcv(l, skb, NULL);
505 tipc_bcast_unlock(net);
506
507 if (!skb_queue_empty(&xmitq))
508 tipc_bcbearer_xmit(net, &xmitq);
509
510 /* Any socket wakeup messages ? */
511 if (!skb_queue_empty(inputq))
512 tipc_sk_rcv(net, inputq);
513
514 return rc;
515}
516
517/* tipc_bcast_ack_rcv - receive and handle a broadcast acknowledge
518 *
519 * RCU is locked, no other locks set
520 */
521void tipc_bcast_ack_rcv(struct net *net, struct tipc_link *l, u32 acked)
522{
523 struct sk_buff_head *inputq = &tipc_bc_base(net)->inputq;
524 struct sk_buff_head xmitq;
525
526 __skb_queue_head_init(&xmitq);
527
528 tipc_bcast_lock(net);
529 tipc_link_bc_ack_rcv(l, acked, &xmitq);
530 tipc_bcast_unlock(net);
531
532 tipc_bcbearer_xmit(net, &xmitq);
533
534 /* Any socket wakeup messages ? */
535 if (!skb_queue_empty(inputq))
536 tipc_sk_rcv(net, inputq);
537}
538
539/* tipc_bcast_synch_rcv - check and update rcv link with peer's send state
540 *
541 * RCU is locked, no other locks set
542 */
543void tipc_bcast_sync_rcv(struct net *net, struct tipc_link *l,
544 struct tipc_msg *hdr)
545{
546 struct sk_buff_head *inputq = &tipc_bc_base(net)->inputq;
547 struct sk_buff_head xmitq;
548
549 __skb_queue_head_init(&xmitq);
550
551 tipc_bcast_lock(net);
552 if (msg_type(hdr) == STATE_MSG) {
553 tipc_link_bc_ack_rcv(l, msg_bcast_ack(hdr), &xmitq);
554 tipc_link_bc_sync_rcv(l, hdr, &xmitq);
555 } else {
556 tipc_link_bc_init_rcv(l, hdr);
557 }
558 tipc_bcast_unlock(net);
559
560 tipc_bcbearer_xmit(net, &xmitq);
561
562 /* Any socket wakeup messages ? */
563 if (!skb_queue_empty(inputq))
564 tipc_sk_rcv(net, inputq);
565}
566
567/* tipc_bcast_add_peer - add a peer node to broadcast link and bearer
568 *
569 * RCU is locked, node lock is set
570 */
571void tipc_bcast_add_peer(struct net *net, u32 addr, struct tipc_link *uc_l,
572 struct sk_buff_head *xmitq)
573{
574 struct tipc_net *tn = net_generic(net, tipc_net_id);
575 struct tipc_link *snd_l = tipc_bc_sndlink(net);
576
577 tipc_bclink_lock(net);
578 tipc_nmap_add(&tn->bcbase->bcast_nodes, addr);
579 tipc_link_add_bc_peer(snd_l, uc_l, xmitq);
580 tipc_bclink_unlock(net);
581}
582
583/* tipc_bcast_remove_peer - remove a peer node from broadcast link and bearer
584 *
585 * RCU is locked, node lock is set
586 */
587void tipc_bcast_remove_peer(struct net *net, u32 addr,
588 struct tipc_link *rcv_l)
589{
590 struct tipc_net *tn = net_generic(net, tipc_net_id);
591 struct sk_buff_head *inputq = &tipc_bc_base(net)->inputq;
592 struct tipc_link *snd_l = tipc_bc_sndlink(net);
593 struct sk_buff_head xmitq;
594
595 __skb_queue_head_init(&xmitq);
596
597 tipc_bclink_lock(net);
598 tipc_nmap_remove(&tn->bcbase->bcast_nodes, addr);
599 tipc_link_remove_bc_peer(snd_l, rcv_l, &xmitq);
600 tipc_bclink_unlock(net);
601
602 tipc_bcbearer_xmit(net, &xmitq);
603
604 /* Any socket wakeup messages ? */
605 if (!skb_queue_empty(inputq))
606 tipc_sk_rcv(net, inputq);
607}
608
510/** 609/**
511 * bclink_accept_pkt - accept an incoming, in-sequence broadcast packet 610 * bclink_accept_pkt - accept an incoming, in-sequence broadcast packet
512 * 611 *
@@ -728,6 +827,7 @@ static int tipc_bcbearer_send(struct net *net, struct sk_buff *buf,
728 return 0; 827 return 0;
729 } 828 }
730 } 829 }
830 msg_set_mc_netid(msg, tn->net_id);
731 831
732 /* Send buffer over bearers until all targets reached */ 832 /* Send buffer over bearers until all targets reached */
733 bcbearer->remains = bclink->bcast_nodes; 833 bcbearer->remains = bclink->bcast_nodes;
@@ -1042,12 +1142,13 @@ int tipc_bcast_init(struct net *net)
1042 spin_lock_init(&tipc_net(net)->bclock); 1142 spin_lock_init(&tipc_net(net)->bclock);
1043 bb->node.net = net; 1143 bb->node.net = net;
1044 1144
1045 if (!tipc_link_bc_create(&bb->node, 1145 if (!tipc_link_bc_create(&bb->node, 0, 0,
1046 MAX_PKT_DEFAULT_MCAST, 1146 MAX_PKT_DEFAULT_MCAST,
1047 BCLINK_WIN_DEFAULT, 1147 BCLINK_WIN_DEFAULT,
1048 0, 1148 0,
1049 &bb->inputq, 1149 &bb->inputq,
1050 &bb->namedq, 1150 &bb->namedq,
1151 NULL,
1051 &l)) 1152 &l))
1052 goto enomem; 1153 goto enomem;
1053 bb->link = l; 1154 bb->link = l;
diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h
index a378fdde9b7a..568a57cd89e6 100644
--- a/net/tipc/bcast.h
+++ b/net/tipc/bcast.h
@@ -47,8 +47,11 @@ struct tipc_node_map;
47int tipc_bcast_init(struct net *net); 47int tipc_bcast_init(struct net *net);
48void tipc_bcast_reinit(struct net *net); 48void tipc_bcast_reinit(struct net *net);
49void tipc_bcast_stop(struct net *net); 49void tipc_bcast_stop(struct net *net);
50void tipc_bclink_add_node(struct net *net, u32 addr); 50void tipc_bcast_add_peer(struct net *net, u32 addr,
51void tipc_bclink_remove_node(struct net *net, u32 addr); 51 struct tipc_link *l,
52 struct sk_buff_head *xmitq);
53void tipc_bcast_remove_peer(struct net *net, u32 addr,
54 struct tipc_link *rcv_bcl);
52struct tipc_node *tipc_bclink_retransmit_to(struct net *tn); 55struct tipc_node *tipc_bclink_retransmit_to(struct net *tn);
53void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked); 56void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked);
54void tipc_bclink_rcv(struct net *net, struct sk_buff *buf); 57void tipc_bclink_rcv(struct net *net, struct sk_buff *buf);
@@ -62,6 +65,10 @@ int tipc_bclink_reset_stats(struct net *net);
62int tipc_bclink_set_queue_limits(struct net *net, u32 limit); 65int tipc_bclink_set_queue_limits(struct net *net, u32 limit);
63uint tipc_bcast_get_mtu(void); 66uint tipc_bcast_get_mtu(void);
64int tipc_bcast_xmit(struct net *net, struct sk_buff_head *list); 67int tipc_bcast_xmit(struct net *net, struct sk_buff_head *list);
68int tipc_bcast_rcv(struct net *net, struct tipc_link *l, struct sk_buff *skb);
69void tipc_bcast_ack_rcv(struct net *net, struct tipc_link *l, u32 acked);
70void tipc_bcast_sync_rcv(struct net *net, struct tipc_link *l,
71 struct tipc_msg *hdr);
65void tipc_bclink_wakeup_users(struct net *net); 72void tipc_bclink_wakeup_users(struct net *net);
66int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg); 73int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg);
67int tipc_nl_bc_link_set(struct net *net, struct nlattr *attrs[]); 74int tipc_nl_bc_link_set(struct net *net, struct nlattr *attrs[]);
@@ -78,4 +85,9 @@ static inline void tipc_bcast_unlock(struct net *net)
78 spin_unlock_bh(&tipc_net(net)->bclock); 85 spin_unlock_bh(&tipc_net(net)->bclock);
79} 86}
80 87
88static inline struct tipc_link *tipc_bc_sndlink(struct net *net)
89{
90 return tipc_net(net)->bcl;
91}
92
81#endif 93#endif
diff --git a/net/tipc/core.h b/net/tipc/core.h
index 645dcac9575d..6d589aaad054 100644
--- a/net/tipc/core.h
+++ b/net/tipc/core.h
@@ -115,6 +115,11 @@ static inline struct tipc_net *tipc_net(struct net *net)
115 return net_generic(net, tipc_net_id); 115 return net_generic(net, tipc_net_id);
116} 116}
117 117
118static inline int tipc_netid(struct net *net)
119{
120 return tipc_net(net)->net_id;
121}
122
118static inline u16 mod(u16 x) 123static inline u16 mod(u16 x)
119{ 124{
120 return x & 0xffffu; 125 return x & 0xffffu;
diff --git a/net/tipc/link.c b/net/tipc/link.c
index 6a1a9d9239ae..ff725c398914 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -76,6 +76,14 @@ static const struct nla_policy tipc_nl_prop_policy[TIPC_NLA_PROP_MAX + 1] = {
76 [TIPC_NLA_PROP_WIN] = { .type = NLA_U32 } 76 [TIPC_NLA_PROP_WIN] = { .type = NLA_U32 }
77}; 77};
78 78
79/* Send states for broadcast NACKs
80 */
81enum {
82 BC_NACK_SND_CONDITIONAL,
83 BC_NACK_SND_UNCONDITIONAL,
84 BC_NACK_SND_SUPPRESS,
85};
86
79/* 87/*
80 * Interval between NACKs when packets arrive out of order 88 * Interval between NACKs when packets arrive out of order
81 */ 89 */
@@ -111,7 +119,11 @@ static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe,
111 struct sk_buff_head *xmitq); 119 struct sk_buff_head *xmitq);
112static void link_reset_statistics(struct tipc_link *l_ptr); 120static void link_reset_statistics(struct tipc_link *l_ptr);
113static void link_print(struct tipc_link *l_ptr, const char *str); 121static void link_print(struct tipc_link *l_ptr, const char *str);
114static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf); 122static void tipc_link_build_nack_msg(struct tipc_link *l,
123 struct sk_buff_head *xmitq);
124static void tipc_link_build_bc_init_msg(struct tipc_link *l,
125 struct sk_buff_head *xmitq);
126static bool tipc_link_release_pkts(struct tipc_link *l, u16 to);
115 127
116/* 128/*
117 * Simple non-static link routines (i.e. referenced outside this file) 129 * Simple non-static link routines (i.e. referenced outside this file)
@@ -151,6 +163,16 @@ bool tipc_link_is_blocked(struct tipc_link *l)
151 return l->state & (LINK_RESETTING | LINK_PEER_RESET | LINK_FAILINGOVER); 163 return l->state & (LINK_RESETTING | LINK_PEER_RESET | LINK_FAILINGOVER);
152} 164}
153 165
166bool link_is_bc_sndlink(struct tipc_link *l)
167{
168 return !l->bc_sndlink;
169}
170
171bool link_is_bc_rcvlink(struct tipc_link *l)
172{
173 return ((l->bc_rcvlink == l) && !link_is_bc_sndlink(l));
174}
175
154int tipc_link_is_active(struct tipc_link *l) 176int tipc_link_is_active(struct tipc_link *l)
155{ 177{
156 struct tipc_node *n = l->owner; 178 struct tipc_node *n = l->owner;
@@ -158,14 +180,31 @@ int tipc_link_is_active(struct tipc_link *l)
158 return (node_active_link(n, 0) == l) || (node_active_link(n, 1) == l); 180 return (node_active_link(n, 0) == l) || (node_active_link(n, 1) == l);
159} 181}
160 182
161void tipc_link_add_bc_peer(struct tipc_link *l) 183void tipc_link_add_bc_peer(struct tipc_link *snd_l,
184 struct tipc_link *uc_l,
185 struct sk_buff_head *xmitq)
162{ 186{
163 l->ackers++; 187 struct tipc_link *rcv_l = uc_l->bc_rcvlink;
188
189 snd_l->ackers++;
190 rcv_l->acked = snd_l->snd_nxt - 1;
191 tipc_link_build_bc_init_msg(uc_l, xmitq);
164} 192}
165 193
166void tipc_link_remove_bc_peer(struct tipc_link *l) 194void tipc_link_remove_bc_peer(struct tipc_link *snd_l,
195 struct tipc_link *rcv_l,
196 struct sk_buff_head *xmitq)
167{ 197{
168 l->ackers--; 198 u16 ack = snd_l->snd_nxt - 1;
199
200 snd_l->ackers--;
201 tipc_link_bc_ack_rcv(rcv_l, ack, xmitq);
202 tipc_link_reset(rcv_l);
203 rcv_l->state = LINK_RESET;
204 if (!snd_l->ackers) {
205 tipc_link_reset(snd_l);
206 __skb_queue_purge(xmitq);
207 }
169} 208}
170 209
171int tipc_link_bc_peers(struct tipc_link *l) 210int tipc_link_bc_peers(struct tipc_link *l)
@@ -193,6 +232,8 @@ static u32 link_own_addr(struct tipc_link *l)
193 * @peer: node id of peer node 232 * @peer: node id of peer node
194 * @peer_caps: bitmap describing peer node capabilities 233 * @peer_caps: bitmap describing peer node capabilities
195 * @maddr: media address to be used 234 * @maddr: media address to be used
235 * @bc_sndlink: the namespace global link used for broadcast sending
236 * @bc_rcvlink: the peer specific link used for broadcast reception
196 * @inputq: queue to put messages ready for delivery 237 * @inputq: queue to put messages ready for delivery
197 * @namedq: queue to put binding table update messages ready for delivery 238 * @namedq: queue to put binding table update messages ready for delivery
198 * @link: return value, pointer to put the created link 239 * @link: return value, pointer to put the created link
@@ -202,8 +243,12 @@ static u32 link_own_addr(struct tipc_link *l)
202bool tipc_link_create(struct tipc_node *n, char *if_name, int bearer_id, 243bool tipc_link_create(struct tipc_node *n, char *if_name, int bearer_id,
203 int tolerance, char net_plane, u32 mtu, int priority, 244 int tolerance, char net_plane, u32 mtu, int priority,
204 int window, u32 session, u32 ownnode, u32 peer, 245 int window, u32 session, u32 ownnode, u32 peer,
205 u16 peer_caps, struct tipc_media_addr *maddr, 246 u16 peer_caps,
206 struct sk_buff_head *inputq, struct sk_buff_head *namedq, 247 struct tipc_media_addr *maddr,
248 struct tipc_link *bc_sndlink,
249 struct tipc_link *bc_rcvlink,
250 struct sk_buff_head *inputq,
251 struct sk_buff_head *namedq,
207 struct tipc_link **link) 252 struct tipc_link **link)
208{ 253{
209 struct tipc_link *l; 254 struct tipc_link *l;
@@ -239,6 +284,8 @@ bool tipc_link_create(struct tipc_node *n, char *if_name, int bearer_id,
239 l->priority = priority; 284 l->priority = priority;
240 tipc_link_set_queue_limits(l, window); 285 tipc_link_set_queue_limits(l, window);
241 l->ackers = 1; 286 l->ackers = 1;
287 l->bc_sndlink = bc_sndlink;
288 l->bc_rcvlink = bc_rcvlink;
242 l->inputq = inputq; 289 l->inputq = inputq;
243 l->namedq = namedq; 290 l->namedq = namedq;
244 l->state = LINK_RESETTING; 291 l->state = LINK_RESETTING;
@@ -261,46 +308,32 @@ bool tipc_link_create(struct tipc_node *n, char *if_name, int bearer_id,
261 * 308 *
262 * Returns true if link was created, otherwise false 309 * Returns true if link was created, otherwise false
263 */ 310 */
264bool tipc_link_bc_create(struct tipc_node *n, int mtu, int window, 311bool tipc_link_bc_create(struct tipc_node *n, u32 ownnode, u32 peer,
265 u16 peer_caps, 312 int mtu, int window, u16 peer_caps,
266 struct sk_buff_head *inputq, 313 struct sk_buff_head *inputq,
267 struct sk_buff_head *namedq, 314 struct sk_buff_head *namedq,
315 struct tipc_link *bc_sndlink,
268 struct tipc_link **link) 316 struct tipc_link **link)
269{ 317{
270 struct tipc_link *l; 318 struct tipc_link *l;
271 319
272 if (!tipc_link_create(n, "", MAX_BEARERS, 0, 'Z', mtu, 0, window, 320 if (!tipc_link_create(n, "", MAX_BEARERS, 0, 'Z', mtu, 0, window,
273 0, 0, 0, peer_caps, NULL, inputq, namedq, link)) 321 0, ownnode, peer, peer_caps, NULL, bc_sndlink,
322 NULL, inputq, namedq, link))
274 return false; 323 return false;
275 324
276 l = *link; 325 l = *link;
277 strcpy(l->name, tipc_bclink_name); 326 strcpy(l->name, tipc_bclink_name);
278 tipc_link_reset(l); 327 tipc_link_reset(l);
328 l->state = LINK_RESET;
279 l->ackers = 0; 329 l->ackers = 0;
280 return true; 330 l->bc_rcvlink = l;
281}
282 331
283/* tipc_link_build_bcast_sync_msg() - synchronize broadcast link endpoints. 332 /* Broadcast send link is always up */
284 * 333 if (link_is_bc_sndlink(l))
285 * Give a newly added peer node the sequence number where it should 334 l->state = LINK_ESTABLISHED;
286 * start receiving and acking broadcast packets.
287 */
288void tipc_link_build_bcast_sync_msg(struct tipc_link *l,
289 struct sk_buff_head *xmitq)
290{
291 struct sk_buff *skb;
292 struct sk_buff_head list;
293 u16 last_sent;
294 335
295 skb = tipc_msg_create(BCAST_PROTOCOL, STATE_MSG, INT_H_SIZE, 336 return true;
296 0, l->addr, link_own_addr(l), 0, 0, 0);
297 if (!skb)
298 return;
299 last_sent = tipc_bclink_get_last_sent(l->owner->net);
300 msg_set_last_bcast(buf_msg(skb), last_sent);
301 __skb_queue_head_init(&list);
302 __skb_queue_tail(&list, skb);
303 tipc_link_xmit(l, &list, xmitq);
304} 337}
305 338
306/** 339/**
@@ -507,12 +540,17 @@ static void link_profile_stats(struct tipc_link *l)
507 540
508/* tipc_link_timeout - perform periodic task as instructed from node timeout 541/* tipc_link_timeout - perform periodic task as instructed from node timeout
509 */ 542 */
543/* tipc_link_timeout - perform periodic task as instructed from node timeout
544 */
510int tipc_link_timeout(struct tipc_link *l, struct sk_buff_head *xmitq) 545int tipc_link_timeout(struct tipc_link *l, struct sk_buff_head *xmitq)
511{ 546{
512 int rc = 0; 547 int rc = 0;
513 int mtyp = STATE_MSG; 548 int mtyp = STATE_MSG;
514 bool xmit = false; 549 bool xmit = false;
515 bool prb = false; 550 bool prb = false;
551 u16 bc_snt = l->bc_sndlink->snd_nxt - 1;
552 u16 bc_acked = l->bc_rcvlink->acked;
553 bool bc_up = link_is_up(l->bc_rcvlink);
516 554
517 link_profile_stats(l); 555 link_profile_stats(l);
518 556
@@ -520,7 +558,7 @@ int tipc_link_timeout(struct tipc_link *l, struct sk_buff_head *xmitq)
520 case LINK_ESTABLISHED: 558 case LINK_ESTABLISHED:
521 case LINK_SYNCHING: 559 case LINK_SYNCHING:
522 if (!l->silent_intv_cnt) { 560 if (!l->silent_intv_cnt) {
523 if (tipc_bclink_acks_missing(l->owner)) 561 if (bc_up && (bc_acked != bc_snt))
524 xmit = true; 562 xmit = true;
525 } else if (l->silent_intv_cnt <= l->abort_limit) { 563 } else if (l->silent_intv_cnt <= l->abort_limit) {
526 xmit = true; 564 xmit = true;
@@ -671,6 +709,7 @@ void tipc_link_reset(struct tipc_link *l)
671 l->silent_intv_cnt = 0; 709 l->silent_intv_cnt = 0;
672 l->stats.recv_info = 0; 710 l->stats.recv_info = 0;
673 l->stale_count = 0; 711 l->stale_count = 0;
712 l->bc_peer_is_up = false;
674 link_reset_statistics(l); 713 link_reset_statistics(l);
675} 714}
676 715
@@ -692,7 +731,7 @@ int __tipc_link_xmit(struct net *net, struct tipc_link *link,
692 uint mtu = link->mtu; 731 uint mtu = link->mtu;
693 u16 ack = mod(link->rcv_nxt - 1); 732 u16 ack = mod(link->rcv_nxt - 1);
694 u16 seqno = link->snd_nxt; 733 u16 seqno = link->snd_nxt;
695 u16 bc_last_in = link->owner->bclink.last_in; 734 u16 bc_ack = link->bc_rcvlink->rcv_nxt - 1;
696 struct tipc_media_addr *addr = link->media_addr; 735 struct tipc_media_addr *addr = link->media_addr;
697 struct sk_buff_head *transmq = &link->transmq; 736 struct sk_buff_head *transmq = &link->transmq;
698 struct sk_buff_head *backlogq = &link->backlogq; 737 struct sk_buff_head *backlogq = &link->backlogq;
@@ -712,7 +751,7 @@ int __tipc_link_xmit(struct net *net, struct tipc_link *link,
712 msg = buf_msg(skb); 751 msg = buf_msg(skb);
713 msg_set_seqno(msg, seqno); 752 msg_set_seqno(msg, seqno);
714 msg_set_ack(msg, ack); 753 msg_set_ack(msg, ack);
715 msg_set_bcast_ack(msg, bc_last_in); 754 msg_set_bcast_ack(msg, bc_ack);
716 755
717 if (likely(skb_queue_len(transmq) < maxwin)) { 756 if (likely(skb_queue_len(transmq) < maxwin)) {
718 __skb_dequeue(list); 757 __skb_dequeue(list);
@@ -762,7 +801,7 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list,
762 unsigned int mtu = l->mtu; 801 unsigned int mtu = l->mtu;
763 u16 ack = l->rcv_nxt - 1; 802 u16 ack = l->rcv_nxt - 1;
764 u16 seqno = l->snd_nxt; 803 u16 seqno = l->snd_nxt;
765 u16 bc_last_in = l->owner->bclink.last_in; 804 u16 bc_ack = l->bc_rcvlink->rcv_nxt - 1;
766 struct sk_buff_head *transmq = &l->transmq; 805 struct sk_buff_head *transmq = &l->transmq;
767 struct sk_buff_head *backlogq = &l->backlogq; 806 struct sk_buff_head *backlogq = &l->backlogq;
768 struct sk_buff *skb, *_skb, *bskb; 807 struct sk_buff *skb, *_skb, *bskb;
@@ -781,7 +820,7 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list,
781 hdr = buf_msg(skb); 820 hdr = buf_msg(skb);
782 msg_set_seqno(hdr, seqno); 821 msg_set_seqno(hdr, seqno);
783 msg_set_ack(hdr, ack); 822 msg_set_ack(hdr, ack);
784 msg_set_bcast_ack(hdr, bc_last_in); 823 msg_set_bcast_ack(hdr, bc_ack);
785 824
786 if (likely(skb_queue_len(transmq) < maxwin)) { 825 if (likely(skb_queue_len(transmq) < maxwin)) {
787 _skb = skb_clone(skb, GFP_ATOMIC); 826 _skb = skb_clone(skb, GFP_ATOMIC);
@@ -816,23 +855,6 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list,
816} 855}
817 856
818/* 857/*
819 * tipc_link_sync_rcv - synchronize broadcast link endpoints.
820 * Receive the sequence number where we should start receiving and
821 * acking broadcast packets from a newly added peer node, and open
822 * up for reception of such packets.
823 *
824 * Called with node locked
825 */
826static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf)
827{
828 struct tipc_msg *msg = buf_msg(buf);
829
830 n->bclink.last_sent = n->bclink.last_in = msg_last_bcast(msg);
831 n->bclink.recv_permitted = true;
832 kfree_skb(buf);
833}
834
835/*
836 * tipc_link_push_packets - push unsent packets to bearer 858 * tipc_link_push_packets - push unsent packets to bearer
837 * 859 *
838 * Push out the unsent messages of a link where congestion 860 * Push out the unsent messages of a link where congestion
@@ -872,6 +894,7 @@ void tipc_link_advance_backlog(struct tipc_link *l, struct sk_buff_head *xmitq)
872 struct tipc_msg *hdr; 894 struct tipc_msg *hdr;
873 u16 seqno = l->snd_nxt; 895 u16 seqno = l->snd_nxt;
874 u16 ack = l->rcv_nxt - 1; 896 u16 ack = l->rcv_nxt - 1;
897 u16 bc_ack = l->bc_rcvlink->rcv_nxt - 1;
875 898
876 while (skb_queue_len(&l->transmq) < l->window) { 899 while (skb_queue_len(&l->transmq) < l->window) {
877 skb = skb_peek(&l->backlogq); 900 skb = skb_peek(&l->backlogq);
@@ -886,54 +909,25 @@ void tipc_link_advance_backlog(struct tipc_link *l, struct sk_buff_head *xmitq)
886 __skb_queue_tail(&l->transmq, skb); 909 __skb_queue_tail(&l->transmq, skb);
887 __skb_queue_tail(xmitq, _skb); 910 __skb_queue_tail(xmitq, _skb);
888 TIPC_SKB_CB(skb)->ackers = l->ackers; 911 TIPC_SKB_CB(skb)->ackers = l->ackers;
889 msg_set_ack(hdr, ack);
890 msg_set_seqno(hdr, seqno); 912 msg_set_seqno(hdr, seqno);
891 msg_set_bcast_ack(hdr, l->owner->bclink.last_in); 913 msg_set_ack(hdr, ack);
914 msg_set_bcast_ack(hdr, bc_ack);
892 l->rcv_unacked = 0; 915 l->rcv_unacked = 0;
893 seqno++; 916 seqno++;
894 } 917 }
895 l->snd_nxt = seqno; 918 l->snd_nxt = seqno;
896} 919}
897 920
898static void link_retransmit_failure(struct tipc_link *l_ptr, 921static void link_retransmit_failure(struct tipc_link *l, struct sk_buff *skb)
899 struct sk_buff *buf)
900{ 922{
901 struct tipc_msg *msg = buf_msg(buf); 923 struct tipc_msg *hdr = buf_msg(skb);
902 struct net *net = l_ptr->owner->net; 924
903 925 pr_warn("Retransmission failure on link <%s>\n", l->name);
904 pr_warn("Retransmission failure on link <%s>\n", l_ptr->name); 926 link_print(l, "Resetting link ");
905 927 pr_info("Failed msg: usr %u, typ %u, len %u, err %u\n",
906 if (l_ptr->addr) { 928 msg_user(hdr), msg_type(hdr), msg_size(hdr), msg_errcode(hdr));
907 /* Handle failure on standard link */ 929 pr_info("sqno %u, prev: %x, src: %x\n",
908 link_print(l_ptr, "Resetting link "); 930 msg_seqno(hdr), msg_prevnode(hdr), msg_orignode(hdr));
909 pr_info("Failed msg: usr %u, typ %u, len %u, err %u\n",
910 msg_user(msg), msg_type(msg), msg_size(msg),
911 msg_errcode(msg));
912 pr_info("sqno %u, prev: %x, src: %x\n",
913 msg_seqno(msg), msg_prevnode(msg), msg_orignode(msg));
914 } else {
915 /* Handle failure on broadcast link */
916 struct tipc_node *n_ptr;
917 char addr_string[16];
918
919 pr_info("Msg seq number: %u, ", msg_seqno(msg));
920 pr_cont("Outstanding acks: %u\n", TIPC_SKB_CB(buf)->ackers);
921
922 n_ptr = tipc_bclink_retransmit_to(net);
923
924 tipc_addr_string_fill(addr_string, n_ptr->addr);
925 pr_info("Broadcast link info for %s\n", addr_string);
926 pr_info("Reception permitted: %d, Acked: %u\n",
927 n_ptr->bclink.recv_permitted,
928 n_ptr->bclink.acked);
929 pr_info("Last in: %u, Oos state: %u, Last sent: %u\n",
930 n_ptr->bclink.last_in,
931 n_ptr->bclink.oos_state,
932 n_ptr->bclink.last_sent);
933
934 n_ptr->action_flags |= TIPC_BCAST_RESET;
935 l_ptr->stale_count = 0;
936 }
937} 931}
938 932
939void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *skb, 933void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *skb,
@@ -976,7 +970,7 @@ int tipc_link_retrans(struct tipc_link *l, u16 from, u16 to,
976 struct sk_buff *_skb, *skb = skb_peek(&l->transmq); 970 struct sk_buff *_skb, *skb = skb_peek(&l->transmq);
977 struct tipc_msg *hdr; 971 struct tipc_msg *hdr;
978 u16 ack = l->rcv_nxt - 1; 972 u16 ack = l->rcv_nxt - 1;
979 u16 bc_ack = l->owner->bclink.last_in; 973 u16 bc_ack = l->bc_rcvlink->rcv_nxt - 1;
980 974
981 if (!skb) 975 if (!skb)
982 return 0; 976 return 0;
@@ -1018,11 +1012,9 @@ int tipc_link_retrans(struct tipc_link *l, u16 from, u16 to,
1018 * Consumes buffer if message is of right type 1012 * Consumes buffer if message is of right type
1019 * Node lock must be held 1013 * Node lock must be held
1020 */ 1014 */
1021static bool tipc_data_input(struct tipc_link *link, struct sk_buff *skb, 1015static bool tipc_data_input(struct tipc_link *l, struct sk_buff *skb,
1022 struct sk_buff_head *inputq) 1016 struct sk_buff_head *inputq)
1023{ 1017{
1024 struct tipc_node *node = link->owner;
1025
1026 switch (msg_user(buf_msg(skb))) { 1018 switch (msg_user(buf_msg(skb))) {
1027 case TIPC_LOW_IMPORTANCE: 1019 case TIPC_LOW_IMPORTANCE:
1028 case TIPC_MEDIUM_IMPORTANCE: 1020 case TIPC_MEDIUM_IMPORTANCE:
@@ -1032,8 +1024,8 @@ static bool tipc_data_input(struct tipc_link *link, struct sk_buff *skb,
1032 skb_queue_tail(inputq, skb); 1024 skb_queue_tail(inputq, skb);
1033 return true; 1025 return true;
1034 case NAME_DISTRIBUTOR: 1026 case NAME_DISTRIBUTOR:
1035 node->bclink.recv_permitted = true; 1027 l->bc_rcvlink->state = LINK_ESTABLISHED;
1036 skb_queue_tail(link->namedq, skb); 1028 skb_queue_tail(l->namedq, skb);
1037 return true; 1029 return true;
1038 case MSG_BUNDLER: 1030 case MSG_BUNDLER:
1039 case TUNNEL_PROTOCOL: 1031 case TUNNEL_PROTOCOL:
@@ -1054,7 +1046,6 @@ static bool tipc_data_input(struct tipc_link *link, struct sk_buff *skb,
1054static int tipc_link_input(struct tipc_link *l, struct sk_buff *skb, 1046static int tipc_link_input(struct tipc_link *l, struct sk_buff *skb,
1055 struct sk_buff_head *inputq) 1047 struct sk_buff_head *inputq)
1056{ 1048{
1057 struct tipc_node *node = l->owner;
1058 struct tipc_msg *hdr = buf_msg(skb); 1049 struct tipc_msg *hdr = buf_msg(skb);
1059 struct sk_buff **reasm_skb = &l->reasm_buf; 1050 struct sk_buff **reasm_skb = &l->reasm_buf;
1060 struct sk_buff *iskb; 1051 struct sk_buff *iskb;
@@ -1095,13 +1086,15 @@ static int tipc_link_input(struct tipc_link *l, struct sk_buff *skb,
1095 if (tipc_buf_append(reasm_skb, &skb)) { 1086 if (tipc_buf_append(reasm_skb, &skb)) {
1096 l->stats.recv_fragmented++; 1087 l->stats.recv_fragmented++;
1097 tipc_data_input(l, skb, inputq); 1088 tipc_data_input(l, skb, inputq);
1098 } else if (!*reasm_skb) { 1089 } else if (!*reasm_skb && !link_is_bc_rcvlink(l)) {
1090 pr_warn_ratelimited("Unable to build fragment list\n");
1099 return tipc_link_fsm_evt(l, LINK_FAILURE_EVT); 1091 return tipc_link_fsm_evt(l, LINK_FAILURE_EVT);
1100 } 1092 }
1101 return 0; 1093 return 0;
1102 } else if (usr == BCAST_PROTOCOL) { 1094 } else if (usr == BCAST_PROTOCOL) {
1103 tipc_link_sync_rcv(node, skb); 1095 tipc_bcast_lock(l->owner->net);
1104 return 0; 1096 tipc_link_bc_init_rcv(l->bc_rcvlink, hdr);
1097 tipc_bcast_unlock(l->owner->net);
1105 } 1098 }
1106drop: 1099drop:
1107 kfree_skb(skb); 1100 kfree_skb(skb);
@@ -1124,12 +1117,28 @@ static bool tipc_link_release_pkts(struct tipc_link *l, u16 acked)
1124} 1117}
1125 1118
1126/* tipc_link_build_ack_msg: prepare link acknowledge message for transmission 1119/* tipc_link_build_ack_msg: prepare link acknowledge message for transmission
1120 *
1121 * Note that sending of broadcast ack is coordinated among nodes, to reduce
1122 * risk of ack storms towards the sender
1127 */ 1123 */
1128void tipc_link_build_ack_msg(struct tipc_link *l, struct sk_buff_head *xmitq) 1124int tipc_link_build_ack_msg(struct tipc_link *l, struct sk_buff_head *xmitq)
1129{ 1125{
1126 if (!l)
1127 return 0;
1128
1129 /* Broadcast ACK must be sent via a unicast link => defer to caller */
1130 if (link_is_bc_rcvlink(l)) {
1131 if (((l->rcv_nxt ^ link_own_addr(l)) & 0xf) != 0xf)
1132 return 0;
1133 l->rcv_unacked = 0;
1134 return TIPC_LINK_SND_BC_ACK;
1135 }
1136
1137 /* Unicast ACK */
1130 l->rcv_unacked = 0; 1138 l->rcv_unacked = 0;
1131 l->stats.sent_acks++; 1139 l->stats.sent_acks++;
1132 tipc_link_build_proto_msg(l, STATE_MSG, 0, 0, 0, 0, xmitq); 1140 tipc_link_build_proto_msg(l, STATE_MSG, 0, 0, 0, 0, xmitq);
1141 return 0;
1133} 1142}
1134 1143
1135/* tipc_link_build_reset_msg: prepare link RESET or ACTIVATE message 1144/* tipc_link_build_reset_msg: prepare link RESET or ACTIVATE message
@@ -1151,6 +1160,9 @@ static void tipc_link_build_nack_msg(struct tipc_link *l,
1151{ 1160{
1152 u32 def_cnt = ++l->stats.deferred_recv; 1161 u32 def_cnt = ++l->stats.deferred_recv;
1153 1162
1163 if (link_is_bc_rcvlink(l))
1164 return;
1165
1154 if ((skb_queue_len(&l->deferdq) == 1) || !(def_cnt % TIPC_NACK_INTV)) 1166 if ((skb_queue_len(&l->deferdq) == 1) || !(def_cnt % TIPC_NACK_INTV))
1155 tipc_link_build_proto_msg(l, STATE_MSG, 0, 0, 0, 0, xmitq); 1167 tipc_link_build_proto_msg(l, STATE_MSG, 0, 0, 0, 0, xmitq);
1156} 1168}
@@ -1211,12 +1223,11 @@ int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb,
1211 l->rcv_nxt++; 1223 l->rcv_nxt++;
1212 l->stats.recv_info++; 1224 l->stats.recv_info++;
1213 if (!tipc_data_input(l, skb, l->inputq)) 1225 if (!tipc_data_input(l, skb, l->inputq))
1214 rc = tipc_link_input(l, skb, l->inputq); 1226 rc |= tipc_link_input(l, skb, l->inputq);
1215 if (unlikely(rc))
1216 break;
1217 if (unlikely(++l->rcv_unacked >= TIPC_MIN_LINK_WIN)) 1227 if (unlikely(++l->rcv_unacked >= TIPC_MIN_LINK_WIN))
1218 tipc_link_build_ack_msg(l, xmitq); 1228 rc |= tipc_link_build_ack_msg(l, xmitq);
1219 1229 if (unlikely(rc & ~TIPC_LINK_SND_BC_ACK))
1230 break;
1220 } while ((skb = __skb_dequeue(defq))); 1231 } while ((skb = __skb_dequeue(defq)));
1221 1232
1222 return rc; 1233 return rc;
@@ -1284,18 +1295,13 @@ void tipc_link_proto_xmit(struct tipc_link *l, u32 msg_typ, int probe_msg,
1284 kfree_skb(skb); 1295 kfree_skb(skb);
1285} 1296}
1286 1297
1287/* tipc_link_build_proto_msg: prepare link protocol message for transmission
1288 */
1289static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe, 1298static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe,
1290 u16 rcvgap, int tolerance, int priority, 1299 u16 rcvgap, int tolerance, int priority,
1291 struct sk_buff_head *xmitq) 1300 struct sk_buff_head *xmitq)
1292{ 1301{
1293 struct sk_buff *skb = NULL; 1302 struct sk_buff *skb = NULL;
1294 struct tipc_msg *hdr = l->pmsg; 1303 struct tipc_msg *hdr = l->pmsg;
1295 u16 snd_nxt = l->snd_nxt; 1304 bool node_up = link_is_up(l->bc_rcvlink);
1296 u16 rcv_nxt = l->rcv_nxt;
1297 u16 rcv_last = rcv_nxt - 1;
1298 int node_up = l->owner->bclink.recv_permitted;
1299 1305
1300 /* Don't send protocol message during reset or link failover */ 1306 /* Don't send protocol message during reset or link failover */
1301 if (tipc_link_is_blocked(l)) 1307 if (tipc_link_is_blocked(l))
@@ -1303,33 +1309,34 @@ static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe,
1303 1309
1304 msg_set_type(hdr, mtyp); 1310 msg_set_type(hdr, mtyp);
1305 msg_set_net_plane(hdr, l->net_plane); 1311 msg_set_net_plane(hdr, l->net_plane);
1306 msg_set_bcast_ack(hdr, l->owner->bclink.last_in); 1312 msg_set_next_sent(hdr, l->snd_nxt);
1307 msg_set_last_bcast(hdr, tipc_bclink_get_last_sent(l->owner->net)); 1313 msg_set_ack(hdr, l->rcv_nxt - 1);
1314 msg_set_bcast_ack(hdr, l->bc_rcvlink->rcv_nxt - 1);
1315 msg_set_last_bcast(hdr, l->bc_sndlink->snd_nxt - 1);
1308 msg_set_link_tolerance(hdr, tolerance); 1316 msg_set_link_tolerance(hdr, tolerance);
1309 msg_set_linkprio(hdr, priority); 1317 msg_set_linkprio(hdr, priority);
1310 msg_set_redundant_link(hdr, node_up); 1318 msg_set_redundant_link(hdr, node_up);
1311 msg_set_seq_gap(hdr, 0); 1319 msg_set_seq_gap(hdr, 0);
1312 1320
1313 /* Compatibility: created msg must not be in sequence with pkt flow */ 1321 /* Compatibility: created msg must not be in sequence with pkt flow */
1314 msg_set_seqno(hdr, snd_nxt + U16_MAX / 2); 1322 msg_set_seqno(hdr, l->snd_nxt + U16_MAX / 2);
1315 1323
1316 if (mtyp == STATE_MSG) { 1324 if (mtyp == STATE_MSG) {
1317 if (!tipc_link_is_up(l)) 1325 if (!tipc_link_is_up(l))
1318 return; 1326 return;
1319 msg_set_next_sent(hdr, snd_nxt);
1320 1327
1321 /* Override rcvgap if there are packets in deferred queue */ 1328 /* Override rcvgap if there are packets in deferred queue */
1322 if (!skb_queue_empty(&l->deferdq)) 1329 if (!skb_queue_empty(&l->deferdq))
1323 rcvgap = buf_seqno(skb_peek(&l->deferdq)) - rcv_nxt; 1330 rcvgap = buf_seqno(skb_peek(&l->deferdq)) - l->rcv_nxt;
1324 if (rcvgap) { 1331 if (rcvgap) {
1325 msg_set_seq_gap(hdr, rcvgap); 1332 msg_set_seq_gap(hdr, rcvgap);
1326 l->stats.sent_nacks++; 1333 l->stats.sent_nacks++;
1327 } 1334 }
1328 msg_set_ack(hdr, rcv_last);
1329 msg_set_probe(hdr, probe); 1335 msg_set_probe(hdr, probe);
1330 if (probe) 1336 if (probe)
1331 l->stats.sent_probes++; 1337 l->stats.sent_probes++;
1332 l->stats.sent_states++; 1338 l->stats.sent_states++;
1339 l->rcv_unacked = 0;
1333 } else { 1340 } else {
1334 /* RESET_MSG or ACTIVATE_MSG */ 1341 /* RESET_MSG or ACTIVATE_MSG */
1335 msg_set_max_pkt(hdr, l->advertised_mtu); 1342 msg_set_max_pkt(hdr, l->advertised_mtu);
@@ -1431,7 +1438,7 @@ static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb,
1431 char *if_name; 1438 char *if_name;
1432 int rc = 0; 1439 int rc = 0;
1433 1440
1434 if (tipc_link_is_blocked(l)) 1441 if (tipc_link_is_blocked(l) || !xmitq)
1435 goto exit; 1442 goto exit;
1436 1443
1437 if (link_own_addr(l) > msg_prevnode(hdr)) 1444 if (link_own_addr(l) > msg_prevnode(hdr))
@@ -1518,6 +1525,188 @@ exit:
1518 return rc; 1525 return rc;
1519} 1526}
1520 1527
1528/* tipc_link_build_bc_proto_msg() - create broadcast protocol message
1529 */
1530static bool tipc_link_build_bc_proto_msg(struct tipc_link *l, bool bcast,
1531 u16 peers_snd_nxt,
1532 struct sk_buff_head *xmitq)
1533{
1534 struct sk_buff *skb;
1535 struct tipc_msg *hdr;
1536 struct sk_buff *dfrd_skb = skb_peek(&l->deferdq);
1537 u16 ack = l->rcv_nxt - 1;
1538 u16 gap_to = peers_snd_nxt - 1;
1539
1540 skb = tipc_msg_create(BCAST_PROTOCOL, STATE_MSG, INT_H_SIZE,
1541 0, l->addr, link_own_addr(l), 0, 0, 0);
1542 if (!skb)
1543 return false;
1544 hdr = buf_msg(skb);
1545 msg_set_last_bcast(hdr, l->bc_sndlink->snd_nxt - 1);
1546 msg_set_bcast_ack(hdr, ack);
1547 msg_set_bcgap_after(hdr, ack);
1548 if (dfrd_skb)
1549 gap_to = buf_seqno(dfrd_skb) - 1;
1550 msg_set_bcgap_to(hdr, gap_to);
1551 msg_set_non_seq(hdr, bcast);
1552 __skb_queue_tail(xmitq, skb);
1553 return true;
1554}
1555
1556/* tipc_link_build_bc_init_msg() - synchronize broadcast link endpoints.
1557 *
1558 * Give a newly added peer node the sequence number where it should
1559 * start receiving and acking broadcast packets.
1560 */
1561void tipc_link_build_bc_init_msg(struct tipc_link *l,
1562 struct sk_buff_head *xmitq)
1563{
1564 struct sk_buff_head list;
1565
1566 __skb_queue_head_init(&list);
1567 if (!tipc_link_build_bc_proto_msg(l->bc_rcvlink, false, 0, &list))
1568 return;
1569 tipc_link_xmit(l, &list, xmitq);
1570}
1571
1572/* tipc_link_bc_init_rcv - receive initial broadcast synch data from peer
1573 */
1574void tipc_link_bc_init_rcv(struct tipc_link *l, struct tipc_msg *hdr)
1575{
1576 int mtyp = msg_type(hdr);
1577 u16 peers_snd_nxt = msg_bc_snd_nxt(hdr);
1578
1579 if (link_is_up(l))
1580 return;
1581
1582 if (msg_user(hdr) == BCAST_PROTOCOL) {
1583 l->rcv_nxt = peers_snd_nxt;
1584 l->state = LINK_ESTABLISHED;
1585 return;
1586 }
1587
1588 if (l->peer_caps & TIPC_BCAST_SYNCH)
1589 return;
1590
1591 if (msg_peer_node_is_up(hdr))
1592 return;
1593
1594 /* Compatibility: accept older, less safe initial synch data */
1595 if ((mtyp == RESET_MSG) || (mtyp == ACTIVATE_MSG))
1596 l->rcv_nxt = peers_snd_nxt;
1597}
1598
1599/* tipc_link_bc_sync_rcv - update rcv link according to peer's send state
1600 */
1601void tipc_link_bc_sync_rcv(struct tipc_link *l, struct tipc_msg *hdr,
1602 struct sk_buff_head *xmitq)
1603{
1604 u16 peers_snd_nxt = msg_bc_snd_nxt(hdr);
1605
1606 if (!link_is_up(l))
1607 return;
1608
1609 if (!msg_peer_node_is_up(hdr))
1610 return;
1611
1612 l->bc_peer_is_up = true;
1613
1614 /* Ignore if peers_snd_nxt goes beyond receive window */
1615 if (more(peers_snd_nxt, l->rcv_nxt + l->window))
1616 return;
1617
1618 if (!more(peers_snd_nxt, l->rcv_nxt)) {
1619 l->nack_state = BC_NACK_SND_CONDITIONAL;
1620 return;
1621 }
1622
1623 /* Don't NACK if one was recently sent or peeked */
1624 if (l->nack_state == BC_NACK_SND_SUPPRESS) {
1625 l->nack_state = BC_NACK_SND_UNCONDITIONAL;
1626 return;
1627 }
1628
1629 /* Conditionally delay NACK sending until next synch rcv */
1630 if (l->nack_state == BC_NACK_SND_CONDITIONAL) {
1631 l->nack_state = BC_NACK_SND_UNCONDITIONAL;
1632 if ((peers_snd_nxt - l->rcv_nxt) < TIPC_MIN_LINK_WIN)
1633 return;
1634 }
1635
1636 /* Send NACK now but suppress next one */
1637 tipc_link_build_bc_proto_msg(l, true, peers_snd_nxt, xmitq);
1638 l->nack_state = BC_NACK_SND_SUPPRESS;
1639}
1640
1641void tipc_link_bc_ack_rcv(struct tipc_link *l, u16 acked,
1642 struct sk_buff_head *xmitq)
1643{
1644 struct sk_buff *skb, *tmp;
1645 struct tipc_link *snd_l = l->bc_sndlink;
1646
1647 if (!link_is_up(l) || !l->bc_peer_is_up)
1648 return;
1649
1650 if (!more(acked, l->acked))
1651 return;
1652
1653 /* Skip over packets peer has already acked */
1654 skb_queue_walk(&snd_l->transmq, skb) {
1655 if (more(buf_seqno(skb), l->acked))
1656 break;
1657 }
1658
1659 /* Update/release the packets peer is acking now */
1660 skb_queue_walk_from_safe(&snd_l->transmq, skb, tmp) {
1661 if (more(buf_seqno(skb), acked))
1662 break;
1663 if (!--TIPC_SKB_CB(skb)->ackers) {
1664 __skb_unlink(skb, &snd_l->transmq);
1665 kfree_skb(skb);
1666 }
1667 }
1668 l->acked = acked;
1669 tipc_link_advance_backlog(snd_l, xmitq);
1670 if (unlikely(!skb_queue_empty(&snd_l->wakeupq)))
1671 link_prepare_wakeup(snd_l);
1672}
1673
1674/* tipc_link_bc_nack_rcv(): receive broadcast nack message
1675 */
1676int tipc_link_bc_nack_rcv(struct tipc_link *l, struct sk_buff *skb,
1677 struct sk_buff_head *xmitq)
1678{
1679 struct tipc_msg *hdr = buf_msg(skb);
1680 u32 dnode = msg_destnode(hdr);
1681 int mtyp = msg_type(hdr);
1682 u16 acked = msg_bcast_ack(hdr);
1683 u16 from = acked + 1;
1684 u16 to = msg_bcgap_to(hdr);
1685 u16 peers_snd_nxt = to + 1;
1686 int rc = 0;
1687
1688 kfree_skb(skb);
1689
1690 if (!tipc_link_is_up(l) || !l->bc_peer_is_up)
1691 return 0;
1692
1693 if (mtyp != STATE_MSG)
1694 return 0;
1695
1696 if (dnode == link_own_addr(l)) {
1697 tipc_link_bc_ack_rcv(l, acked, xmitq);
1698 rc = tipc_link_retrans(l->bc_sndlink, from, to, xmitq);
1699 l->stats.recv_nacks++;
1700 return rc;
1701 }
1702
1703 /* Msg for other node => suppress own NACK at next sync if applicable */
1704 if (more(peers_snd_nxt, l->rcv_nxt) && !less(l->rcv_nxt, from))
1705 l->nack_state = BC_NACK_SND_SUPPRESS;
1706
1707 return 0;
1708}
1709
1521void tipc_link_set_queue_limits(struct tipc_link *l, u32 win) 1710void tipc_link_set_queue_limits(struct tipc_link *l, u32 win)
1522{ 1711{
1523 int max_bulk = TIPC_MAX_PUBLICATIONS / (l->mtu / ITEM_SIZE); 1712 int max_bulk = TIPC_MAX_PUBLICATIONS / (l->mtu / ITEM_SIZE);
diff --git a/net/tipc/link.h b/net/tipc/link.h
index d23329db4b25..28a6396b6d31 100644
--- a/net/tipc/link.h
+++ b/net/tipc/link.h
@@ -66,7 +66,8 @@ enum {
66 */ 66 */
67enum { 67enum {
68 TIPC_LINK_UP_EVT = 1, 68 TIPC_LINK_UP_EVT = 1,
69 TIPC_LINK_DOWN_EVT = (1 << 1) 69 TIPC_LINK_DOWN_EVT = (1 << 1),
70 TIPC_LINK_SND_BC_ACK = (1 << 2)
70}; 71};
71 72
72/* Starting value for maximum packet size negotiation on unicast links 73/* Starting value for maximum packet size negotiation on unicast links
@@ -209,6 +210,10 @@ struct tipc_link {
209 /* Broadcast */ 210 /* Broadcast */
210 u16 ackers; 211 u16 ackers;
211 u16 acked; 212 u16 acked;
213 struct tipc_link *bc_rcvlink;
214 struct tipc_link *bc_sndlink;
215 int nack_state;
216 bool bc_peer_is_up;
212 217
213 /* Statistics */ 218 /* Statistics */
214 struct tipc_stats stats; 219 struct tipc_stats stats;
@@ -217,17 +222,21 @@ struct tipc_link {
217bool tipc_link_create(struct tipc_node *n, char *if_name, int bearer_id, 222bool tipc_link_create(struct tipc_node *n, char *if_name, int bearer_id,
218 int tolerance, char net_plane, u32 mtu, int priority, 223 int tolerance, char net_plane, u32 mtu, int priority,
219 int window, u32 session, u32 ownnode, u32 peer, 224 int window, u32 session, u32 ownnode, u32 peer,
220 u16 peer_caps, struct tipc_media_addr *maddr, 225 u16 peer_caps,
221 struct sk_buff_head *inputq, struct sk_buff_head *namedq, 226 struct tipc_media_addr *maddr,
227 struct tipc_link *bc_sndlink,
228 struct tipc_link *bc_rcvlink,
229 struct sk_buff_head *inputq,
230 struct sk_buff_head *namedq,
222 struct tipc_link **link); 231 struct tipc_link **link);
223bool tipc_link_bc_create(struct tipc_node *n, int mtu, int window, 232bool tipc_link_bc_create(struct tipc_node *n, u32 ownnode, u32 peer,
224 u16 peer_caps, struct sk_buff_head *inputq, 233 int mtu, int window, u16 peer_caps,
234 struct sk_buff_head *inputq,
225 struct sk_buff_head *namedq, 235 struct sk_buff_head *namedq,
236 struct tipc_link *bc_sndlink,
226 struct tipc_link **link); 237 struct tipc_link **link);
227void tipc_link_tnl_prepare(struct tipc_link *l, struct tipc_link *tnl, 238void tipc_link_tnl_prepare(struct tipc_link *l, struct tipc_link *tnl,
228 int mtyp, struct sk_buff_head *xmitq); 239 int mtyp, struct sk_buff_head *xmitq);
229void tipc_link_build_bcast_sync_msg(struct tipc_link *l,
230 struct sk_buff_head *xmitq);
231void tipc_link_build_reset_msg(struct tipc_link *l, struct sk_buff_head *xmitq); 240void tipc_link_build_reset_msg(struct tipc_link *l, struct sk_buff_head *xmitq);
232int tipc_link_fsm_evt(struct tipc_link *l, int evt); 241int tipc_link_fsm_evt(struct tipc_link *l, int evt);
233void tipc_link_reset_fragments(struct tipc_link *l_ptr); 242void tipc_link_reset_fragments(struct tipc_link *l_ptr);
@@ -264,9 +273,21 @@ int tipc_nl_parse_link_prop(struct nlattr *prop, struct nlattr *props[]);
264int tipc_link_timeout(struct tipc_link *l, struct sk_buff_head *xmitq); 273int tipc_link_timeout(struct tipc_link *l, struct sk_buff_head *xmitq);
265int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb, 274int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb,
266 struct sk_buff_head *xmitq); 275 struct sk_buff_head *xmitq);
267void tipc_link_build_ack_msg(struct tipc_link *l, struct sk_buff_head *xmitq); 276int tipc_link_build_ack_msg(struct tipc_link *l, struct sk_buff_head *xmitq);
268void tipc_link_add_bc_peer(struct tipc_link *l); 277void tipc_link_add_bc_peer(struct tipc_link *snd_l,
269void tipc_link_remove_bc_peer(struct tipc_link *l); 278 struct tipc_link *uc_l,
279 struct sk_buff_head *xmitq);
280void tipc_link_remove_bc_peer(struct tipc_link *snd_l,
281 struct tipc_link *rcv_l,
282 struct sk_buff_head *xmitq);
270int tipc_link_bc_peers(struct tipc_link *l); 283int tipc_link_bc_peers(struct tipc_link *l);
271 284void tipc_link_bc_ack_rcv(struct tipc_link *l, u16 acked,
285 struct sk_buff_head *xmitq);
286void tipc_link_build_bc_sync_msg(struct tipc_link *l,
287 struct sk_buff_head *xmitq);
288void tipc_link_bc_init_rcv(struct tipc_link *l, struct tipc_msg *hdr);
289void tipc_link_bc_sync_rcv(struct tipc_link *l, struct tipc_msg *hdr,
290 struct sk_buff_head *xmitq);
291int tipc_link_bc_nack_rcv(struct tipc_link *l, struct sk_buff *skb,
292 struct sk_buff_head *xmitq);
272#endif 293#endif
diff --git a/net/tipc/msg.c b/net/tipc/msg.c
index 5b47468739e7..8740930f0787 100644
--- a/net/tipc/msg.c
+++ b/net/tipc/msg.c
@@ -182,7 +182,6 @@ int tipc_buf_append(struct sk_buff **headbuf, struct sk_buff **buf)
182 *buf = NULL; 182 *buf = NULL;
183 return 0; 183 return 0;
184err: 184err:
185 pr_warn_ratelimited("Unable to build fragment list\n");
186 kfree_skb(*buf); 185 kfree_skb(*buf);
187 kfree_skb(*headbuf); 186 kfree_skb(*headbuf);
188 *buf = *headbuf = NULL; 187 *buf = *headbuf = NULL;
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index fbf51fa1075d..55778a0aebf3 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -601,6 +601,11 @@ static inline u32 msg_last_bcast(struct tipc_msg *m)
601 return msg_bits(m, 4, 16, 0xffff); 601 return msg_bits(m, 4, 16, 0xffff);
602} 602}
603 603
604static inline u32 msg_bc_snd_nxt(struct tipc_msg *m)
605{
606 return msg_last_bcast(m) + 1;
607}
608
604static inline void msg_set_last_bcast(struct tipc_msg *m, u32 n) 609static inline void msg_set_last_bcast(struct tipc_msg *m, u32 n)
605{ 610{
606 msg_set_bits(m, 4, 16, 0xffff, n); 611 msg_set_bits(m, 4, 16, 0xffff, n);
diff --git a/net/tipc/node.c b/net/tipc/node.c
index 28bcd7be23c6..cd924552244b 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -72,7 +72,6 @@ static void __tipc_node_link_down(struct tipc_node *n, int *bearer_id,
72static void tipc_node_link_down(struct tipc_node *n, int bearer_id, 72static void tipc_node_link_down(struct tipc_node *n, int bearer_id,
73 bool delete); 73 bool delete);
74static void node_lost_contact(struct tipc_node *n, struct sk_buff_head *inputq); 74static void node_lost_contact(struct tipc_node *n, struct sk_buff_head *inputq);
75static void node_established_contact(struct tipc_node *n_ptr);
76static void tipc_node_delete(struct tipc_node *node); 75static void tipc_node_delete(struct tipc_node *node);
77static void tipc_node_timeout(unsigned long data); 76static void tipc_node_timeout(unsigned long data);
78static void tipc_node_fsm_evt(struct tipc_node *n, int evt); 77static void tipc_node_fsm_evt(struct tipc_node *n, int evt);
@@ -165,8 +164,10 @@ struct tipc_node *tipc_node_create(struct net *net, u32 addr, u16 capabilities)
165 INIT_LIST_HEAD(&n_ptr->list); 164 INIT_LIST_HEAD(&n_ptr->list);
166 INIT_LIST_HEAD(&n_ptr->publ_list); 165 INIT_LIST_HEAD(&n_ptr->publ_list);
167 INIT_LIST_HEAD(&n_ptr->conn_sks); 166 INIT_LIST_HEAD(&n_ptr->conn_sks);
168 skb_queue_head_init(&n_ptr->bclink.namedq); 167 skb_queue_head_init(&n_ptr->bc_entry.namedq);
169 __skb_queue_head_init(&n_ptr->bclink.deferdq); 168 skb_queue_head_init(&n_ptr->bc_entry.inputq1);
169 __skb_queue_head_init(&n_ptr->bc_entry.arrvq);
170 skb_queue_head_init(&n_ptr->bc_entry.inputq2);
170 hlist_add_head_rcu(&n_ptr->hash, &tn->node_htable[tipc_hashfn(addr)]); 171 hlist_add_head_rcu(&n_ptr->hash, &tn->node_htable[tipc_hashfn(addr)]);
171 list_for_each_entry_rcu(temp_node, &tn->node_list, list) { 172 list_for_each_entry_rcu(temp_node, &tn->node_list, list) {
172 if (n_ptr->addr < temp_node->addr) 173 if (n_ptr->addr < temp_node->addr)
@@ -177,6 +178,18 @@ struct tipc_node *tipc_node_create(struct net *net, u32 addr, u16 capabilities)
177 n_ptr->signature = INVALID_NODE_SIG; 178 n_ptr->signature = INVALID_NODE_SIG;
178 n_ptr->active_links[0] = INVALID_BEARER_ID; 179 n_ptr->active_links[0] = INVALID_BEARER_ID;
179 n_ptr->active_links[1] = INVALID_BEARER_ID; 180 n_ptr->active_links[1] = INVALID_BEARER_ID;
181 if (!tipc_link_bc_create(n_ptr, tipc_own_addr(net), n_ptr->addr,
182 U16_MAX, tipc_bc_sndlink(net)->window,
183 n_ptr->capabilities,
184 &n_ptr->bc_entry.inputq1,
185 &n_ptr->bc_entry.namedq,
186 tipc_bc_sndlink(net),
187 &n_ptr->bc_entry.link)) {
188 pr_warn("Broadcast rcv link creation failed, no memory\n");
189 kfree(n_ptr);
190 n_ptr = NULL;
191 goto exit;
192 }
180 tipc_node_get(n_ptr); 193 tipc_node_get(n_ptr);
181 setup_timer(&n_ptr->timer, tipc_node_timeout, (unsigned long)n_ptr); 194 setup_timer(&n_ptr->timer, tipc_node_timeout, (unsigned long)n_ptr);
182 n_ptr->keepalive_intv = U32_MAX; 195 n_ptr->keepalive_intv = U32_MAX;
@@ -203,6 +216,7 @@ static void tipc_node_delete(struct tipc_node *node)
203{ 216{
204 list_del_rcu(&node->list); 217 list_del_rcu(&node->list);
205 hlist_del_rcu(&node->hash); 218 hlist_del_rcu(&node->hash);
219 kfree(node->bc_entry.link);
206 kfree_rcu(node, rcu); 220 kfree_rcu(node, rcu);
207} 221}
208 222
@@ -340,8 +354,9 @@ static void __tipc_node_link_up(struct tipc_node *n, int bearer_id,
340 if (!ol) { 354 if (!ol) {
341 *slot0 = bearer_id; 355 *slot0 = bearer_id;
342 *slot1 = bearer_id; 356 *slot1 = bearer_id;
343 tipc_link_build_bcast_sync_msg(nl, xmitq); 357 tipc_node_fsm_evt(n, SELF_ESTABL_CONTACT_EVT);
344 node_established_contact(n); 358 n->action_flags |= TIPC_NOTIFY_NODE_UP;
359 tipc_bcast_add_peer(n->net, n->addr, nl, xmitq);
345 return; 360 return;
346 } 361 }
347 362
@@ -585,9 +600,10 @@ void tipc_node_check_dest(struct net *net, u32 onode,
585 b->net_plane, b->mtu, b->priority, 600 b->net_plane, b->mtu, b->priority,
586 b->window, mod(tipc_net(net)->random), 601 b->window, mod(tipc_net(net)->random),
587 tipc_own_addr(net), onode, 602 tipc_own_addr(net), onode,
588 n->capabilities, 603 n->capabilities, &le->maddr,
589 &le->maddr, &le->inputq, 604 tipc_bc_sndlink(n->net), n->bc_entry.link,
590 &n->bclink.namedq, &l)) { 605 &le->inputq,
606 &n->bc_entry.namedq, &l)) {
591 *respond = false; 607 *respond = false;
592 goto exit; 608 goto exit;
593 } 609 }
@@ -830,58 +846,36 @@ bool tipc_node_filter_pkt(struct tipc_node *n, struct tipc_msg *hdr)
830 return true; 846 return true;
831} 847}
832 848
833static void node_established_contact(struct tipc_node *n_ptr) 849static void node_lost_contact(struct tipc_node *n,
834{
835 tipc_node_fsm_evt(n_ptr, SELF_ESTABL_CONTACT_EVT);
836 n_ptr->action_flags |= TIPC_NOTIFY_NODE_UP;
837 n_ptr->bclink.oos_state = 0;
838 n_ptr->bclink.acked = tipc_bclink_get_last_sent(n_ptr->net);
839 tipc_bclink_add_node(n_ptr->net, n_ptr->addr);
840}
841
842static void node_lost_contact(struct tipc_node *n_ptr,
843 struct sk_buff_head *inputq) 850 struct sk_buff_head *inputq)
844{ 851{
845 char addr_string[16]; 852 char addr_string[16];
846 struct tipc_sock_conn *conn, *safe; 853 struct tipc_sock_conn *conn, *safe;
847 struct tipc_link *l; 854 struct tipc_link *l;
848 struct list_head *conns = &n_ptr->conn_sks; 855 struct list_head *conns = &n->conn_sks;
849 struct sk_buff *skb; 856 struct sk_buff *skb;
850 struct tipc_net *tn = net_generic(n_ptr->net, tipc_net_id);
851 uint i; 857 uint i;
852 858
853 pr_debug("Lost contact with %s\n", 859 pr_debug("Lost contact with %s\n",
854 tipc_addr_string_fill(addr_string, n_ptr->addr)); 860 tipc_addr_string_fill(addr_string, n->addr));
855
856 /* Flush broadcast link info associated with lost node */
857 if (n_ptr->bclink.recv_permitted) {
858 __skb_queue_purge(&n_ptr->bclink.deferdq);
859 861
860 if (n_ptr->bclink.reasm_buf) { 862 /* Clean up broadcast state */
861 kfree_skb(n_ptr->bclink.reasm_buf); 863 tipc_bcast_remove_peer(n->net, n->addr, n->bc_entry.link);
862 n_ptr->bclink.reasm_buf = NULL;
863 }
864
865 tipc_bclink_remove_node(n_ptr->net, n_ptr->addr);
866 tipc_bclink_acknowledge(n_ptr, INVALID_LINK_SEQ);
867
868 n_ptr->bclink.recv_permitted = false;
869 }
870 864
871 /* Abort any ongoing link failover */ 865 /* Abort any ongoing link failover */
872 for (i = 0; i < MAX_BEARERS; i++) { 866 for (i = 0; i < MAX_BEARERS; i++) {
873 l = n_ptr->links[i].link; 867 l = n->links[i].link;
874 if (l) 868 if (l)
875 tipc_link_fsm_evt(l, LINK_FAILOVER_END_EVT); 869 tipc_link_fsm_evt(l, LINK_FAILOVER_END_EVT);
876 } 870 }
877 871
878 /* Notify publications from this node */ 872 /* Notify publications from this node */
879 n_ptr->action_flags |= TIPC_NOTIFY_NODE_DOWN; 873 n->action_flags |= TIPC_NOTIFY_NODE_DOWN;
880 874
881 /* Notify sockets connected to node */ 875 /* Notify sockets connected to node */
882 list_for_each_entry_safe(conn, safe, conns, list) { 876 list_for_each_entry_safe(conn, safe, conns, list) {
883 skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, TIPC_CONN_MSG, 877 skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, TIPC_CONN_MSG,
884 SHORT_H_SIZE, 0, tn->own_addr, 878 SHORT_H_SIZE, 0, tipc_own_addr(n->net),
885 conn->peer_node, conn->port, 879 conn->peer_node, conn->port,
886 conn->peer_port, TIPC_ERR_NO_NODE); 880 conn->peer_port, TIPC_ERR_NO_NODE);
887 if (likely(skb)) 881 if (likely(skb))
@@ -1086,6 +1080,67 @@ int tipc_node_xmit_skb(struct net *net, struct sk_buff *skb, u32 dnode,
1086} 1080}
1087 1081
1088/** 1082/**
1083 * tipc_node_bc_rcv - process TIPC broadcast packet arriving from off-node
1084 * @net: the applicable net namespace
1085 * @skb: TIPC packet
1086 * @bearer_id: id of bearer message arrived on
1087 *
1088 * Invoked with no locks held.
1089 */
1090void tipc_node_bc_rcv(struct net *net, struct sk_buff *skb, int bearer_id)
1091{
1092 int rc;
1093 struct sk_buff_head xmitq;
1094 struct tipc_bclink_entry *be;
1095 struct tipc_link_entry *le;
1096 struct tipc_msg *hdr = buf_msg(skb);
1097 int usr = msg_user(hdr);
1098 u32 dnode = msg_destnode(hdr);
1099 struct tipc_node *n;
1100
1101 __skb_queue_head_init(&xmitq);
1102
1103 /* If NACK for other node, let rcv link for that node peek into it */
1104 if ((usr == BCAST_PROTOCOL) && (dnode != tipc_own_addr(net)))
1105 n = tipc_node_find(net, dnode);
1106 else
1107 n = tipc_node_find(net, msg_prevnode(hdr));
1108 if (!n) {
1109 kfree_skb(skb);
1110 return;
1111 }
1112 be = &n->bc_entry;
1113 le = &n->links[bearer_id];
1114
1115 rc = tipc_bcast_rcv(net, be->link, skb);
1116
1117 /* Broadcast link reset may happen at reassembly failure */
1118 if (rc & TIPC_LINK_DOWN_EVT)
1119 tipc_node_reset_links(n);
1120
1121 /* Broadcast ACKs are sent on a unicast link */
1122 if (rc & TIPC_LINK_SND_BC_ACK) {
1123 tipc_node_lock(n);
1124 tipc_link_build_ack_msg(le->link, &xmitq);
1125 tipc_node_unlock(n);
1126 }
1127
1128 if (!skb_queue_empty(&xmitq))
1129 tipc_bearer_xmit(net, bearer_id, &xmitq, &le->maddr);
1130
1131 /* Deliver. 'arrvq' is under inputq2's lock protection */
1132 if (!skb_queue_empty(&be->inputq1)) {
1133 spin_lock_bh(&be->inputq2.lock);
1134 spin_lock_bh(&be->inputq1.lock);
1135 skb_queue_splice_tail_init(&be->inputq1, &be->arrvq);
1136 spin_unlock_bh(&be->inputq1.lock);
1137 spin_unlock_bh(&be->inputq2.lock);
1138 tipc_sk_mcast_rcv(net, &be->arrvq, &be->inputq2);
1139 }
1140 tipc_node_put(n);
1141}
1142
1143/**
1089 * tipc_node_check_state - check and if necessary update node state 1144 * tipc_node_check_state - check and if necessary update node state
1090 * @skb: TIPC packet 1145 * @skb: TIPC packet
1091 * @bearer_id: identity of bearer delivering the packet 1146 * @bearer_id: identity of bearer delivering the packet
@@ -1227,6 +1282,7 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b)
1227 int usr = msg_user(hdr); 1282 int usr = msg_user(hdr);
1228 int bearer_id = b->identity; 1283 int bearer_id = b->identity;
1229 struct tipc_link_entry *le; 1284 struct tipc_link_entry *le;
1285 u16 bc_ack = msg_bcast_ack(hdr);
1230 int rc = 0; 1286 int rc = 0;
1231 1287
1232 __skb_queue_head_init(&xmitq); 1288 __skb_queue_head_init(&xmitq);
@@ -1235,13 +1291,12 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b)
1235 if (unlikely(!tipc_msg_validate(skb))) 1291 if (unlikely(!tipc_msg_validate(skb)))
1236 goto discard; 1292 goto discard;
1237 1293
1238 /* Handle arrival of a non-unicast link packet */ 1294 /* Handle arrival of discovery or broadcast packet */
1239 if (unlikely(msg_non_seq(hdr))) { 1295 if (unlikely(msg_non_seq(hdr))) {
1240 if (usr == LINK_CONFIG) 1296 if (unlikely(usr == LINK_CONFIG))
1241 tipc_disc_rcv(net, skb, b); 1297 return tipc_disc_rcv(net, skb, b);
1242 else 1298 else
1243 tipc_bclink_rcv(net, skb); 1299 return tipc_node_bc_rcv(net, skb, bearer_id);
1244 return;
1245 } 1300 }
1246 1301
1247 /* Locate neighboring node that sent packet */ 1302 /* Locate neighboring node that sent packet */
@@ -1250,19 +1305,18 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b)
1250 goto discard; 1305 goto discard;
1251 le = &n->links[bearer_id]; 1306 le = &n->links[bearer_id];
1252 1307
1308 /* Ensure broadcast reception is in synch with peer's send state */
1309 if (unlikely(usr == LINK_PROTOCOL))
1310 tipc_bcast_sync_rcv(net, n->bc_entry.link, hdr);
1311 else if (unlikely(n->bc_entry.link->acked != bc_ack))
1312 tipc_bcast_ack_rcv(net, n->bc_entry.link, bc_ack);
1313
1253 tipc_node_lock(n); 1314 tipc_node_lock(n);
1254 1315
1255 /* Is reception permitted at the moment ? */ 1316 /* Is reception permitted at the moment ? */
1256 if (!tipc_node_filter_pkt(n, hdr)) 1317 if (!tipc_node_filter_pkt(n, hdr))
1257 goto unlock; 1318 goto unlock;
1258 1319
1259 if (unlikely(msg_user(hdr) == LINK_PROTOCOL))
1260 tipc_bclink_sync_state(n, hdr);
1261
1262 /* Release acked broadcast packets */
1263 if (unlikely(n->bclink.acked != msg_bcast_ack(hdr)))
1264 tipc_bclink_acknowledge(n, msg_bcast_ack(hdr));
1265
1266 /* Check and if necessary update node state */ 1320 /* Check and if necessary update node state */
1267 if (likely(tipc_node_check_state(n, skb, bearer_id, &xmitq))) { 1321 if (likely(tipc_node_check_state(n, skb, bearer_id, &xmitq))) {
1268 rc = tipc_link_rcv(le->link, skb, &xmitq); 1322 rc = tipc_link_rcv(le->link, skb, &xmitq);
@@ -1277,8 +1331,8 @@ unlock:
1277 if (unlikely(rc & TIPC_LINK_DOWN_EVT)) 1331 if (unlikely(rc & TIPC_LINK_DOWN_EVT))
1278 tipc_node_link_down(n, bearer_id, false); 1332 tipc_node_link_down(n, bearer_id, false);
1279 1333
1280 if (unlikely(!skb_queue_empty(&n->bclink.namedq))) 1334 if (unlikely(!skb_queue_empty(&n->bc_entry.namedq)))
1281 tipc_named_rcv(net, &n->bclink.namedq); 1335 tipc_named_rcv(net, &n->bc_entry.namedq);
1282 1336
1283 if (!skb_queue_empty(&le->inputq)) 1337 if (!skb_queue_empty(&le->inputq))
1284 tipc_sk_rcv(net, &le->inputq); 1338 tipc_sk_rcv(net, &le->inputq);
diff --git a/net/tipc/node.h b/net/tipc/node.h
index 1465774ad726..36a1cd0bc1f1 100644
--- a/net/tipc/node.h
+++ b/net/tipc/node.h
@@ -100,6 +100,14 @@ struct tipc_link_entry {
100 struct tipc_media_addr maddr; 100 struct tipc_media_addr maddr;
101}; 101};
102 102
103struct tipc_bclink_entry {
104 struct tipc_link *link;
105 struct sk_buff_head inputq1;
106 struct sk_buff_head arrvq;
107 struct sk_buff_head inputq2;
108 struct sk_buff_head namedq;
109};
110
103/** 111/**
104 * struct tipc_node - TIPC node structure 112 * struct tipc_node - TIPC node structure
105 * @addr: network address of node 113 * @addr: network address of node
@@ -132,6 +140,7 @@ struct tipc_node {
132 struct hlist_node hash; 140 struct hlist_node hash;
133 int active_links[2]; 141 int active_links[2];
134 struct tipc_link_entry links[MAX_BEARERS]; 142 struct tipc_link_entry links[MAX_BEARERS];
143 struct tipc_bclink_entry bc_entry;
135 int action_flags; 144 int action_flags;
136 struct tipc_node_bclink bclink; 145 struct tipc_node_bclink bclink;
137 struct list_head list; 146 struct list_head list;