aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc
diff options
context:
space:
mode:
authorJon Paul Maloy <jon.maloy@ericsson.com>2015-10-22 08:51:41 -0400
committerDavid S. Miller <davem@davemloft.net>2015-10-24 09:56:37 -0400
commit5266698661401afc5e4a1a521cf9ba10724d10dd (patch)
treecf3d466a2d9982f403a689e8a0c819c7e3693bde /net/tipc
parentfd556f209af53b9cdc45df8c467feb235376c4df (diff)
tipc: let broadcast packet reception use new link receive function
The code path for receiving broadcast packets is currently distinct from the unicast path. This leads to unnecessary code and data duplication, something that can be avoided with some effort. We now introduce separate per-peer tipc_link instances for handling broadcast packet reception. Each receive link keeps a pointer to the common, single, broadcast link instance, and can hence handle release and retransmission of send buffers as if they belonged to the own instance. Furthermore, we let each unicast link instance keep a reference to both the pertaining broadcast receive link, and to the common send link. This makes it possible for the unicast links to easily access data for broadcast link synchronization, as well as for carrying acknowledges for received broadcast packets. Signed-off-by: Jon Maloy <jon.maloy@ericsson.com> Reviewed-by: Ying Xue <ying.xue@windriver.com> Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net/tipc')
-rw-r--r--net/tipc/bcast.c163
-rw-r--r--net/tipc/bcast.h16
-rw-r--r--net/tipc/core.h5
-rw-r--r--net/tipc/link.c435
-rw-r--r--net/tipc/link.h43
-rw-r--r--net/tipc/msg.c1
-rw-r--r--net/tipc/msg.h5
-rw-r--r--net/tipc/node.c158
-rw-r--r--net/tipc/node.h9
9 files changed, 615 insertions, 220 deletions
diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index 7fdf895e7973..ea28c2919b38 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -112,11 +112,6 @@ static struct tipc_bc_base *tipc_bc_base(struct net *net)
112 return tipc_net(net)->bcbase; 112 return tipc_net(net)->bcbase;
113} 113}
114 114
115static struct tipc_link *tipc_bc_sndlink(struct net *net)
116{
117 return tipc_net(net)->bcl;
118}
119
120/** 115/**
121 * tipc_nmap_equal - test for equality of node maps 116 * tipc_nmap_equal - test for equality of node maps
122 */ 117 */
@@ -169,31 +164,6 @@ static void bcbuf_decr_acks(struct sk_buff *buf)
169 bcbuf_set_acks(buf, bcbuf_acks(buf) - 1); 164 bcbuf_set_acks(buf, bcbuf_acks(buf) - 1);
170} 165}
171 166
172void tipc_bclink_add_node(struct net *net, u32 addr)
173{
174 struct tipc_net *tn = net_generic(net, tipc_net_id);
175 struct tipc_link *l = tipc_bc_sndlink(net);
176 tipc_bclink_lock(net);
177 tipc_nmap_add(&tn->bcbase->bcast_nodes, addr);
178 tipc_link_add_bc_peer(l);
179 tipc_bclink_unlock(net);
180}
181
182void tipc_bclink_remove_node(struct net *net, u32 addr)
183{
184 struct tipc_net *tn = net_generic(net, tipc_net_id);
185
186 tipc_bclink_lock(net);
187 tipc_nmap_remove(&tn->bcbase->bcast_nodes, addr);
188 tn->bcl->ackers--;
189
190 /* Last node? => reset backlog queue */
191 if (!tn->bcbase->bcast_nodes.count)
192 tipc_link_purge_backlog(tn->bcbase->link);
193
194 tipc_bclink_unlock(net);
195}
196
197static void bclink_set_last_sent(struct net *net) 167static void bclink_set_last_sent(struct net *net)
198{ 168{
199 struct tipc_net *tn = net_generic(net, tipc_net_id); 169 struct tipc_net *tn = net_generic(net, tipc_net_id);
@@ -501,12 +471,141 @@ int tipc_bcast_xmit(struct net *net, struct sk_buff_head *list)
501 __skb_queue_purge(&rcvq); 471 __skb_queue_purge(&rcvq);
502 return rc; 472 return rc;
503 } 473 }
474
504 /* Broadcast to all nodes, inluding local node */ 475 /* Broadcast to all nodes, inluding local node */
505 tipc_bcbearer_xmit(net, &xmitq); 476 tipc_bcbearer_xmit(net, &xmitq);
506 tipc_sk_mcast_rcv(net, &rcvq, &inputq); 477 tipc_sk_mcast_rcv(net, &rcvq, &inputq);
507 __skb_queue_purge(list); 478 __skb_queue_purge(list);
508 return 0; 479 return 0;
509} 480}
481
482/* tipc_bcast_rcv - receive a broadcast packet, and deliver to rcv link
483 *
484 * RCU is locked, no other locks set
485 */
486int tipc_bcast_rcv(struct net *net, struct tipc_link *l, struct sk_buff *skb)
487{
488 struct tipc_msg *hdr = buf_msg(skb);
489 struct sk_buff_head *inputq = &tipc_bc_base(net)->inputq;
490 struct sk_buff_head xmitq;
491 int rc;
492
493 __skb_queue_head_init(&xmitq);
494
495 if (msg_mc_netid(hdr) != tipc_netid(net) || !tipc_link_is_up(l)) {
496 kfree_skb(skb);
497 return 0;
498 }
499
500 tipc_bcast_lock(net);
501 if (msg_user(hdr) == BCAST_PROTOCOL)
502 rc = tipc_link_bc_nack_rcv(l, skb, &xmitq);
503 else
504 rc = tipc_link_rcv(l, skb, NULL);
505 tipc_bcast_unlock(net);
506
507 if (!skb_queue_empty(&xmitq))
508 tipc_bcbearer_xmit(net, &xmitq);
509
510 /* Any socket wakeup messages ? */
511 if (!skb_queue_empty(inputq))
512 tipc_sk_rcv(net, inputq);
513
514 return rc;
515}
516
517/* tipc_bcast_ack_rcv - receive and handle a broadcast acknowledge
518 *
519 * RCU is locked, no other locks set
520 */
521void tipc_bcast_ack_rcv(struct net *net, struct tipc_link *l, u32 acked)
522{
523 struct sk_buff_head *inputq = &tipc_bc_base(net)->inputq;
524 struct sk_buff_head xmitq;
525
526 __skb_queue_head_init(&xmitq);
527
528 tipc_bcast_lock(net);
529 tipc_link_bc_ack_rcv(l, acked, &xmitq);
530 tipc_bcast_unlock(net);
531
532 tipc_bcbearer_xmit(net, &xmitq);
533
534 /* Any socket wakeup messages ? */
535 if (!skb_queue_empty(inputq))
536 tipc_sk_rcv(net, inputq);
537}
538
539/* tipc_bcast_synch_rcv - check and update rcv link with peer's send state
540 *
541 * RCU is locked, no other locks set
542 */
543void tipc_bcast_sync_rcv(struct net *net, struct tipc_link *l,
544 struct tipc_msg *hdr)
545{
546 struct sk_buff_head *inputq = &tipc_bc_base(net)->inputq;
547 struct sk_buff_head xmitq;
548
549 __skb_queue_head_init(&xmitq);
550
551 tipc_bcast_lock(net);
552 if (msg_type(hdr) == STATE_MSG) {
553 tipc_link_bc_ack_rcv(l, msg_bcast_ack(hdr), &xmitq);
554 tipc_link_bc_sync_rcv(l, hdr, &xmitq);
555 } else {
556 tipc_link_bc_init_rcv(l, hdr);
557 }
558 tipc_bcast_unlock(net);
559
560 tipc_bcbearer_xmit(net, &xmitq);
561
562 /* Any socket wakeup messages ? */
563 if (!skb_queue_empty(inputq))
564 tipc_sk_rcv(net, inputq);
565}
566
567/* tipc_bcast_add_peer - add a peer node to broadcast link and bearer
568 *
569 * RCU is locked, node lock is set
570 */
571void tipc_bcast_add_peer(struct net *net, u32 addr, struct tipc_link *uc_l,
572 struct sk_buff_head *xmitq)
573{
574 struct tipc_net *tn = net_generic(net, tipc_net_id);
575 struct tipc_link *snd_l = tipc_bc_sndlink(net);
576
577 tipc_bclink_lock(net);
578 tipc_nmap_add(&tn->bcbase->bcast_nodes, addr);
579 tipc_link_add_bc_peer(snd_l, uc_l, xmitq);
580 tipc_bclink_unlock(net);
581}
582
583/* tipc_bcast_remove_peer - remove a peer node from broadcast link and bearer
584 *
585 * RCU is locked, node lock is set
586 */
587void tipc_bcast_remove_peer(struct net *net, u32 addr,
588 struct tipc_link *rcv_l)
589{
590 struct tipc_net *tn = net_generic(net, tipc_net_id);
591 struct sk_buff_head *inputq = &tipc_bc_base(net)->inputq;
592 struct tipc_link *snd_l = tipc_bc_sndlink(net);
593 struct sk_buff_head xmitq;
594
595 __skb_queue_head_init(&xmitq);
596
597 tipc_bclink_lock(net);
598 tipc_nmap_remove(&tn->bcbase->bcast_nodes, addr);
599 tipc_link_remove_bc_peer(snd_l, rcv_l, &xmitq);
600 tipc_bclink_unlock(net);
601
602 tipc_bcbearer_xmit(net, &xmitq);
603
604 /* Any socket wakeup messages ? */
605 if (!skb_queue_empty(inputq))
606 tipc_sk_rcv(net, inputq);
607}
608
510/** 609/**
511 * bclink_accept_pkt - accept an incoming, in-sequence broadcast packet 610 * bclink_accept_pkt - accept an incoming, in-sequence broadcast packet
512 * 611 *
@@ -728,6 +827,7 @@ static int tipc_bcbearer_send(struct net *net, struct sk_buff *buf,
728 return 0; 827 return 0;
729 } 828 }
730 } 829 }
830 msg_set_mc_netid(msg, tn->net_id);
731 831
732 /* Send buffer over bearers until all targets reached */ 832 /* Send buffer over bearers until all targets reached */
733 bcbearer->remains = bclink->bcast_nodes; 833 bcbearer->remains = bclink->bcast_nodes;
@@ -1042,12 +1142,13 @@ int tipc_bcast_init(struct net *net)
1042 spin_lock_init(&tipc_net(net)->bclock); 1142 spin_lock_init(&tipc_net(net)->bclock);
1043 bb->node.net = net; 1143 bb->node.net = net;
1044 1144
1045 if (!tipc_link_bc_create(&bb->node, 1145 if (!tipc_link_bc_create(&bb->node, 0, 0,
1046 MAX_PKT_DEFAULT_MCAST, 1146 MAX_PKT_DEFAULT_MCAST,
1047 BCLINK_WIN_DEFAULT, 1147 BCLINK_WIN_DEFAULT,
1048 0, 1148 0,
1049 &bb->inputq, 1149 &bb->inputq,
1050 &bb->namedq, 1150 &bb->namedq,
1151 NULL,
1051 &l)) 1152 &l))
1052 goto enomem; 1153 goto enomem;
1053 bb->link = l; 1154 bb->link = l;
diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h
index a378fdde9b7a..568a57cd89e6 100644
--- a/net/tipc/bcast.h
+++ b/net/tipc/bcast.h
@@ -47,8 +47,11 @@ struct tipc_node_map;
47int tipc_bcast_init(struct net *net); 47int tipc_bcast_init(struct net *net);
48void tipc_bcast_reinit(struct net *net); 48void tipc_bcast_reinit(struct net *net);
49void tipc_bcast_stop(struct net *net); 49void tipc_bcast_stop(struct net *net);
50void tipc_bclink_add_node(struct net *net, u32 addr); 50void tipc_bcast_add_peer(struct net *net, u32 addr,
51void tipc_bclink_remove_node(struct net *net, u32 addr); 51 struct tipc_link *l,
52 struct sk_buff_head *xmitq);
53void tipc_bcast_remove_peer(struct net *net, u32 addr,
54 struct tipc_link *rcv_bcl);
52struct tipc_node *tipc_bclink_retransmit_to(struct net *tn); 55struct tipc_node *tipc_bclink_retransmit_to(struct net *tn);
53void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked); 56void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked);
54void tipc_bclink_rcv(struct net *net, struct sk_buff *buf); 57void tipc_bclink_rcv(struct net *net, struct sk_buff *buf);
@@ -62,6 +65,10 @@ int tipc_bclink_reset_stats(struct net *net);
62int tipc_bclink_set_queue_limits(struct net *net, u32 limit); 65int tipc_bclink_set_queue_limits(struct net *net, u32 limit);
63uint tipc_bcast_get_mtu(void); 66uint tipc_bcast_get_mtu(void);
64int tipc_bcast_xmit(struct net *net, struct sk_buff_head *list); 67int tipc_bcast_xmit(struct net *net, struct sk_buff_head *list);
68int tipc_bcast_rcv(struct net *net, struct tipc_link *l, struct sk_buff *skb);
69void tipc_bcast_ack_rcv(struct net *net, struct tipc_link *l, u32 acked);
70void tipc_bcast_sync_rcv(struct net *net, struct tipc_link *l,
71 struct tipc_msg *hdr);
65void tipc_bclink_wakeup_users(struct net *net); 72void tipc_bclink_wakeup_users(struct net *net);
66int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg); 73int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg);
67int tipc_nl_bc_link_set(struct net *net, struct nlattr *attrs[]); 74int tipc_nl_bc_link_set(struct net *net, struct nlattr *attrs[]);
@@ -78,4 +85,9 @@ static inline void tipc_bcast_unlock(struct net *net)
78 spin_unlock_bh(&tipc_net(net)->bclock); 85 spin_unlock_bh(&tipc_net(net)->bclock);
79} 86}
80 87
88static inline struct tipc_link *tipc_bc_sndlink(struct net *net)
89{
90 return tipc_net(net)->bcl;
91}
92
81#endif 93#endif
diff --git a/net/tipc/core.h b/net/tipc/core.h
index 645dcac9575d..6d589aaad054 100644
--- a/net/tipc/core.h
+++ b/net/tipc/core.h
@@ -115,6 +115,11 @@ static inline struct tipc_net *tipc_net(struct net *net)
115 return net_generic(net, tipc_net_id); 115 return net_generic(net, tipc_net_id);
116} 116}
117 117
118static inline int tipc_netid(struct net *net)
119{
120 return tipc_net(net)->net_id;
121}
122
118static inline u16 mod(u16 x) 123static inline u16 mod(u16 x)
119{ 124{
120 return x & 0xffffu; 125 return x & 0xffffu;
diff --git a/net/tipc/link.c b/net/tipc/link.c
index 6a1a9d9239ae..ff725c398914 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -76,6 +76,14 @@ static const struct nla_policy tipc_nl_prop_policy[TIPC_NLA_PROP_MAX + 1] = {
76 [TIPC_NLA_PROP_WIN] = { .type = NLA_U32 } 76 [TIPC_NLA_PROP_WIN] = { .type = NLA_U32 }
77}; 77};
78 78
79/* Send states for broadcast NACKs
80 */
81enum {
82 BC_NACK_SND_CONDITIONAL,
83 BC_NACK_SND_UNCONDITIONAL,
84 BC_NACK_SND_SUPPRESS,
85};
86
79/* 87/*
80 * Interval between NACKs when packets arrive out of order 88 * Interval between NACKs when packets arrive out of order
81 */ 89 */
@@ -111,7 +119,11 @@ static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe,
111 struct sk_buff_head *xmitq); 119 struct sk_buff_head *xmitq);
112static void link_reset_statistics(struct tipc_link *l_ptr); 120static void link_reset_statistics(struct tipc_link *l_ptr);
113static void link_print(struct tipc_link *l_ptr, const char *str); 121static void link_print(struct tipc_link *l_ptr, const char *str);
114static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf); 122static void tipc_link_build_nack_msg(struct tipc_link *l,
123 struct sk_buff_head *xmitq);
124static void tipc_link_build_bc_init_msg(struct tipc_link *l,
125 struct sk_buff_head *xmitq);
126static bool tipc_link_release_pkts(struct tipc_link *l, u16 to);
115 127
116/* 128/*
117 * Simple non-static link routines (i.e. referenced outside this file) 129 * Simple non-static link routines (i.e. referenced outside this file)
@@ -151,6 +163,16 @@ bool tipc_link_is_blocked(struct tipc_link *l)
151 return l->state & (LINK_RESETTING | LINK_PEER_RESET | LINK_FAILINGOVER); 163 return l->state & (LINK_RESETTING | LINK_PEER_RESET | LINK_FAILINGOVER);
152} 164}
153 165
166bool link_is_bc_sndlink(struct tipc_link *l)
167{
168 return !l->bc_sndlink;
169}
170
171bool link_is_bc_rcvlink(struct tipc_link *l)
172{
173 return ((l->bc_rcvlink == l) && !link_is_bc_sndlink(l));
174}
175
154int tipc_link_is_active(struct tipc_link *l) 176int tipc_link_is_active(struct tipc_link *l)
155{ 177{
156 struct tipc_node *n = l->owner; 178 struct tipc_node *n = l->owner;
@@ -158,14 +180,31 @@ int tipc_link_is_active(struct tipc_link *l)
158 return (node_active_link(n, 0) == l) || (node_active_link(n, 1) == l); 180 return (node_active_link(n, 0) == l) || (node_active_link(n, 1) == l);
159} 181}
160 182
161void tipc_link_add_bc_peer(struct tipc_link *l) 183void tipc_link_add_bc_peer(struct tipc_link *snd_l,
184 struct tipc_link *uc_l,
185 struct sk_buff_head *xmitq)
162{ 186{
163 l->ackers++; 187 struct tipc_link *rcv_l = uc_l->bc_rcvlink;
188
189 snd_l->ackers++;
190 rcv_l->acked = snd_l->snd_nxt - 1;
191 tipc_link_build_bc_init_msg(uc_l, xmitq);
164} 192}
165 193
166void tipc_link_remove_bc_peer(struct tipc_link *l) 194void tipc_link_remove_bc_peer(struct tipc_link *snd_l,
195 struct tipc_link *rcv_l,
196 struct sk_buff_head *xmitq)
167{ 197{
168 l->ackers--; 198 u16 ack = snd_l->snd_nxt - 1;
199
200 snd_l->ackers--;
201 tipc_link_bc_ack_rcv(rcv_l, ack, xmitq);
202 tipc_link_reset(rcv_l);
203 rcv_l->state = LINK_RESET;
204 if (!snd_l->ackers) {
205 tipc_link_reset(snd_l);
206 __skb_queue_purge(xmitq);
207 }
169} 208}
170 209
171int tipc_link_bc_peers(struct tipc_link *l) 210int tipc_link_bc_peers(struct tipc_link *l)
@@ -193,6 +232,8 @@ static u32 link_own_addr(struct tipc_link *l)
193 * @peer: node id of peer node 232 * @peer: node id of peer node
194 * @peer_caps: bitmap describing peer node capabilities 233 * @peer_caps: bitmap describing peer node capabilities
195 * @maddr: media address to be used 234 * @maddr: media address to be used
235 * @bc_sndlink: the namespace global link used for broadcast sending
236 * @bc_rcvlink: the peer specific link used for broadcast reception
196 * @inputq: queue to put messages ready for delivery 237 * @inputq: queue to put messages ready for delivery
197 * @namedq: queue to put binding table update messages ready for delivery 238 * @namedq: queue to put binding table update messages ready for delivery
198 * @link: return value, pointer to put the created link 239 * @link: return value, pointer to put the created link
@@ -202,8 +243,12 @@ static u32 link_own_addr(struct tipc_link *l)
202bool tipc_link_create(struct tipc_node *n, char *if_name, int bearer_id, 243bool tipc_link_create(struct tipc_node *n, char *if_name, int bearer_id,
203 int tolerance, char net_plane, u32 mtu, int priority, 244 int tolerance, char net_plane, u32 mtu, int priority,
204 int window, u32 session, u32 ownnode, u32 peer, 245 int window, u32 session, u32 ownnode, u32 peer,
205 u16 peer_caps, struct tipc_media_addr *maddr, 246 u16 peer_caps,
206 struct sk_buff_head *inputq, struct sk_buff_head *namedq, 247 struct tipc_media_addr *maddr,
248 struct tipc_link *bc_sndlink,
249 struct tipc_link *bc_rcvlink,
250 struct sk_buff_head *inputq,
251 struct sk_buff_head *namedq,
207 struct tipc_link **link) 252 struct tipc_link **link)
208{ 253{
209 struct tipc_link *l; 254 struct tipc_link *l;
@@ -239,6 +284,8 @@ bool tipc_link_create(struct tipc_node *n, char *if_name, int bearer_id,
239 l->priority = priority; 284 l->priority = priority;
240 tipc_link_set_queue_limits(l, window); 285 tipc_link_set_queue_limits(l, window);
241 l->ackers = 1; 286 l->ackers = 1;
287 l->bc_sndlink = bc_sndlink;
288 l->bc_rcvlink = bc_rcvlink;
242 l->inputq = inputq; 289 l->inputq = inputq;
243 l->namedq = namedq; 290 l->namedq = namedq;
244 l->state = LINK_RESETTING; 291 l->state = LINK_RESETTING;
@@ -261,46 +308,32 @@ bool tipc_link_create(struct tipc_node *n, char *if_name, int bearer_id,
261 * 308 *
262 * Returns true if link was created, otherwise false 309 * Returns true if link was created, otherwise false
263 */ 310 */
264bool tipc_link_bc_create(struct tipc_node *n, int mtu, int window, 311bool tipc_link_bc_create(struct tipc_node *n, u32 ownnode, u32 peer,
265 u16 peer_caps, 312 int mtu, int window, u16 peer_caps,
266 struct sk_buff_head *inputq, 313 struct sk_buff_head *inputq,
267 struct sk_buff_head *namedq, 314 struct sk_buff_head *namedq,
315 struct tipc_link *bc_sndlink,
268 struct tipc_link **link) 316 struct tipc_link **link)
269{ 317{
270 struct tipc_link *l; 318 struct tipc_link *l;
271 319
272 if (!tipc_link_create(n, "", MAX_BEARERS, 0, 'Z', mtu, 0, window, 320 if (!tipc_link_create(n, "", MAX_BEARERS, 0, 'Z', mtu, 0, window,
273 0, 0, 0, peer_caps, NULL, inputq, namedq, link)) 321 0, ownnode, peer, peer_caps, NULL, bc_sndlink,
322 NULL, inputq, namedq, link))
274 return false; 323 return false;
275 324
276 l = *link; 325 l = *link;
277 strcpy(l->name, tipc_bclink_name); 326 strcpy(l->name, tipc_bclink_name);
278 tipc_link_reset(l); 327 tipc_link_reset(l);
328 l->state = LINK_RESET;
279 l->ackers = 0; 329 l->ackers = 0;
280 return true; 330 l->bc_rcvlink = l;
281}
282 331
283/* tipc_link_build_bcast_sync_msg() - synchronize broadcast link endpoints. 332 /* Broadcast send link is always up */
284 * 333 if (link_is_bc_sndlink(l))
285 * Give a newly added peer node the sequence number where it should 334 l->state = LINK_ESTABLISHED;
286 * start receiving and acking broadcast packets.
287 */
288void tipc_link_build_bcast_sync_msg(struct tipc_link *l,
289 struct sk_buff_head *xmitq)
290{
291 struct sk_buff *skb;
292 struct sk_buff_head list;
293 u16 last_sent;
294 335
295 skb = tipc_msg_create(BCAST_PROTOCOL, STATE_MSG, INT_H_SIZE, 336 return true;
296 0, l->addr, link_own_addr(l), 0, 0, 0);
297 if (!skb)
298 return;
299 last_sent = tipc_bclink_get_last_sent(l->owner->net);
300 msg_set_last_bcast(buf_msg(skb), last_sent);
301 __skb_queue_head_init(&list);
302 __skb_queue_tail(&list, skb);
303 tipc_link_xmit(l, &list, xmitq);
304} 337}
305 338
306/** 339/**
@@ -507,12 +540,17 @@ static void link_profile_stats(struct tipc_link *l)
507 540
508/* tipc_link_timeout - perform periodic task as instructed from node timeout 541/* tipc_link_timeout - perform periodic task as instructed from node timeout
509 */ 542 */
543/* tipc_link_timeout - perform periodic task as instructed from node timeout
544 */
510int tipc_link_timeout(struct tipc_link *l, struct sk_buff_head *xmitq) 545int tipc_link_timeout(struct tipc_link *l, struct sk_buff_head *xmitq)
511{ 546{
512 int rc = 0; 547 int rc = 0;
513 int mtyp = STATE_MSG; 548 int mtyp = STATE_MSG;
514 bool xmit = false; 549 bool xmit = false;
515 bool prb = false; 550 bool prb = false;
551 u16 bc_snt = l->bc_sndlink->snd_nxt - 1;
552 u16 bc_acked = l->bc_rcvlink->acked;
553 bool bc_up = link_is_up(l->bc_rcvlink);
516 554
517 link_profile_stats(l); 555 link_profile_stats(l);
518 556
@@ -520,7 +558,7 @@ int tipc_link_timeout(struct tipc_link *l, struct sk_buff_head *xmitq)
520 case LINK_ESTABLISHED: 558 case LINK_ESTABLISHED:
521 case LINK_SYNCHING: 559 case LINK_SYNCHING:
522 if (!l->silent_intv_cnt) { 560 if (!l->silent_intv_cnt) {
523 if (tipc_bclink_acks_missing(l->owner)) 561 if (bc_up && (bc_acked != bc_snt))
524 xmit = true; 562 xmit = true;
525 } else if (l->silent_intv_cnt <= l->abort_limit) { 563 } else if (l->silent_intv_cnt <= l->abort_limit) {
526 xmit = true; 564 xmit = true;
@@ -671,6 +709,7 @@ void tipc_link_reset(struct tipc_link *l)
671 l->silent_intv_cnt = 0; 709 l->silent_intv_cnt = 0;
672 l->stats.recv_info = 0; 710 l->stats.recv_info = 0;
673 l->stale_count = 0; 711 l->stale_count = 0;
712 l->bc_peer_is_up = false;
674 link_reset_statistics(l); 713 link_reset_statistics(l);
675} 714}
676 715
@@ -692,7 +731,7 @@ int __tipc_link_xmit(struct net *net, struct tipc_link *link,
692 uint mtu = link->mtu; 731 uint mtu = link->mtu;
693 u16 ack = mod(link->rcv_nxt - 1); 732 u16 ack = mod(link->rcv_nxt - 1);
694 u16 seqno = link->snd_nxt; 733 u16 seqno = link->snd_nxt;
695 u16 bc_last_in = link->owner->bclink.last_in; 734 u16 bc_ack = link->bc_rcvlink->rcv_nxt - 1;
696 struct tipc_media_addr *addr = link->media_addr; 735 struct tipc_media_addr *addr = link->media_addr;
697 struct sk_buff_head *transmq = &link->transmq; 736 struct sk_buff_head *transmq = &link->transmq;
698 struct sk_buff_head *backlogq = &link->backlogq; 737 struct sk_buff_head *backlogq = &link->backlogq;
@@ -712,7 +751,7 @@ int __tipc_link_xmit(struct net *net, struct tipc_link *link,
712 msg = buf_msg(skb); 751 msg = buf_msg(skb);
713 msg_set_seqno(msg, seqno); 752 msg_set_seqno(msg, seqno);
714 msg_set_ack(msg, ack); 753 msg_set_ack(msg, ack);
715 msg_set_bcast_ack(msg, bc_last_in); 754 msg_set_bcast_ack(msg, bc_ack);
716 755
717 if (likely(skb_queue_len(transmq) < maxwin)) { 756 if (likely(skb_queue_len(transmq) < maxwin)) {
718 __skb_dequeue(list); 757 __skb_dequeue(list);
@@ -762,7 +801,7 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list,
762 unsigned int mtu = l->mtu; 801 unsigned int mtu = l->mtu;
763 u16 ack = l->rcv_nxt - 1; 802 u16 ack = l->rcv_nxt - 1;
764 u16 seqno = l->snd_nxt; 803 u16 seqno = l->snd_nxt;
765 u16 bc_last_in = l->owner->bclink.last_in; 804 u16 bc_ack = l->bc_rcvlink->rcv_nxt - 1;
766 struct sk_buff_head *transmq = &l->transmq; 805 struct sk_buff_head *transmq = &l->transmq;
767 struct sk_buff_head *backlogq = &l->backlogq; 806 struct sk_buff_head *backlogq = &l->backlogq;
768 struct sk_buff *skb, *_skb, *bskb; 807 struct sk_buff *skb, *_skb, *bskb;
@@ -781,7 +820,7 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list,
781 hdr = buf_msg(skb); 820 hdr = buf_msg(skb);
782 msg_set_seqno(hdr, seqno); 821 msg_set_seqno(hdr, seqno);
783 msg_set_ack(hdr, ack); 822 msg_set_ack(hdr, ack);
784 msg_set_bcast_ack(hdr, bc_last_in); 823 msg_set_bcast_ack(hdr, bc_ack);
785 824
786 if (likely(skb_queue_len(transmq) < maxwin)) { 825 if (likely(skb_queue_len(transmq) < maxwin)) {
787 _skb = skb_clone(skb, GFP_ATOMIC); 826 _skb = skb_clone(skb, GFP_ATOMIC);
@@ -816,23 +855,6 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list,
816} 855}
817 856
818/* 857/*
819 * tipc_link_sync_rcv - synchronize broadcast link endpoints.
820 * Receive the sequence number where we should start receiving and
821 * acking broadcast packets from a newly added peer node, and open
822 * up for reception of such packets.
823 *
824 * Called with node locked
825 */
826static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf)
827{
828 struct tipc_msg *msg = buf_msg(buf);
829
830 n->bclink.last_sent = n->bclink.last_in = msg_last_bcast(msg);
831 n->bclink.recv_permitted = true;
832 kfree_skb(buf);
833}
834
835/*
836 * tipc_link_push_packets - push unsent packets to bearer 858 * tipc_link_push_packets - push unsent packets to bearer
837 * 859 *
838 * Push out the unsent messages of a link where congestion 860 * Push out the unsent messages of a link where congestion
@@ -872,6 +894,7 @@ void tipc_link_advance_backlog(struct tipc_link *l, struct sk_buff_head *xmitq)
872 struct tipc_msg *hdr; 894 struct tipc_msg *hdr;
873 u16 seqno = l->snd_nxt; 895 u16 seqno = l->snd_nxt;
874 u16 ack = l->rcv_nxt - 1; 896 u16 ack = l->rcv_nxt - 1;
897 u16 bc_ack = l->bc_rcvlink->rcv_nxt - 1;
875 898
876 while (skb_queue_len(&l->transmq) < l->window) { 899 while (skb_queue_len(&l->transmq) < l->window) {
877 skb = skb_peek(&l->backlogq); 900 skb = skb_peek(&l->backlogq);
@@ -886,54 +909,25 @@ void tipc_link_advance_backlog(struct tipc_link *l, struct sk_buff_head *xmitq)
886 __skb_queue_tail(&l->transmq, skb); 909 __skb_queue_tail(&l->transmq, skb);
887 __skb_queue_tail(xmitq, _skb); 910 __skb_queue_tail(xmitq, _skb);
888 TIPC_SKB_CB(skb)->ackers = l->ackers; 911 TIPC_SKB_CB(skb)->ackers = l->ackers;
889 msg_set_ack(hdr, ack);
890 msg_set_seqno(hdr, seqno); 912 msg_set_seqno(hdr, seqno);
891 msg_set_bcast_ack(hdr, l->owner->bclink.last_in); 913 msg_set_ack(hdr, ack);
914 msg_set_bcast_ack(hdr, bc_ack);
892 l->rcv_unacked = 0; 915 l->rcv_unacked = 0;
893 seqno++; 916 seqno++;
894 } 917 }
895 l->snd_nxt = seqno; 918 l->snd_nxt = seqno;
896} 919}
897 920
898static void link_retransmit_failure(struct tipc_link *l_ptr, 921static void link_retransmit_failure(struct tipc_link *l, struct sk_buff *skb)
899 struct sk_buff *buf)
900{ 922{
901 struct tipc_msg *msg = buf_msg(buf); 923 struct tipc_msg *hdr = buf_msg(skb);
902 struct net *net = l_ptr->owner->net; 924
903 925 pr_warn("Retransmission failure on link <%s>\n", l->name);
904 pr_warn("Retransmission failure on link <%s>\n", l_ptr->name); 926 link_print(l, "Resetting link ");
905 927 pr_info("Failed msg: usr %u, typ %u, len %u, err %u\n",
906 if (l_ptr->addr) { 928 msg_user(hdr), msg_type(hdr), msg_size(hdr), msg_errcode(hdr));
907 /* Handle failure on standard link */ 929 pr_info("sqno %u, prev: %x, src: %x\n",
908 link_print(l_ptr, "Resetting link "); 930 msg_seqno(hdr), msg_prevnode(hdr), msg_orignode(hdr));
909 pr_info("Failed msg: usr %u, typ %u, len %u, err %u\n",
910 msg_user(msg), msg_type(msg), msg_size(msg),
911 msg_errcode(msg));
912 pr_info("sqno %u, prev: %x, src: %x\n",
913 msg_seqno(msg), msg_prevnode(msg), msg_orignode(msg));
914 } else {
915 /* Handle failure on broadcast link */
916 struct tipc_node *n_ptr;
917 char addr_string[16];
918
919 pr_info("Msg seq number: %u, ", msg_seqno(msg));
920 pr_cont("Outstanding acks: %u\n", TIPC_SKB_CB(buf)->ackers);
921
922 n_ptr = tipc_bclink_retransmit_to(net);
923
924 tipc_addr_string_fill(addr_string, n_ptr->addr);
925 pr_info("Broadcast link info for %s\n", addr_string);
926 pr_info("Reception permitted: %d, Acked: %u\n",
927 n_ptr->bclink.recv_permitted,
928 n_ptr->bclink.acked);
929 pr_info("Last in: %u, Oos state: %u, Last sent: %u\n",
930 n_ptr->bclink.last_in,
931 n_ptr->bclink.oos_state,
932 n_ptr->bclink.last_sent);
933
934 n_ptr->action_flags |= TIPC_BCAST_RESET;
935 l_ptr->stale_count = 0;
936 }
937} 931}
938 932
939void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *skb, 933void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *skb,
@@ -976,7 +970,7 @@ int tipc_link_retrans(struct tipc_link *l, u16 from, u16 to,
976 struct sk_buff *_skb, *skb = skb_peek(&l->transmq); 970 struct sk_buff *_skb, *skb = skb_peek(&l->transmq);
977 struct tipc_msg *hdr; 971 struct tipc_msg *hdr;
978 u16 ack = l->rcv_nxt - 1; 972 u16 ack = l->rcv_nxt - 1;
979 u16 bc_ack = l->owner->bclink.last_in; 973 u16 bc_ack = l->bc_rcvlink->rcv_nxt - 1;
980 974
981 if (!skb) 975 if (!skb)
982 return 0; 976 return 0;
@@ -1018,11 +1012,9 @@ int tipc_link_retrans(struct tipc_link *l, u16 from, u16 to,
1018 * Consumes buffer if message is of right type 1012 * Consumes buffer if message is of right type
1019 * Node lock must be held 1013 * Node lock must be held
1020 */ 1014 */
1021static bool tipc_data_input(struct tipc_link *link, struct sk_buff *skb, 1015static bool tipc_data_input(struct tipc_link *l, struct sk_buff *skb,
1022 struct sk_buff_head *inputq) 1016 struct sk_buff_head *inputq)
1023{ 1017{
1024 struct tipc_node *node = link->owner;
1025
1026 switch (msg_user(buf_msg(skb))) { 1018 switch (msg_user(buf_msg(skb))) {
1027 case TIPC_LOW_IMPORTANCE: 1019 case TIPC_LOW_IMPORTANCE:
1028 case TIPC_MEDIUM_IMPORTANCE: 1020 case TIPC_MEDIUM_IMPORTANCE:
@@ -1032,8 +1024,8 @@ static bool tipc_data_input(struct tipc_link *link, struct sk_buff *skb,
1032 skb_queue_tail(inputq, skb); 1024 skb_queue_tail(inputq, skb);
1033 return true; 1025 return true;
1034 case NAME_DISTRIBUTOR: 1026 case NAME_DISTRIBUTOR:
1035 node->bclink.recv_permitted = true; 1027 l->bc_rcvlink->state = LINK_ESTABLISHED;
1036 skb_queue_tail(link->namedq, skb); 1028 skb_queue_tail(l->namedq, skb);
1037 return true; 1029 return true;
1038 case MSG_BUNDLER: 1030 case MSG_BUNDLER:
1039 case TUNNEL_PROTOCOL: 1031 case TUNNEL_PROTOCOL:
@@ -1054,7 +1046,6 @@ static bool tipc_data_input(struct tipc_link *link, struct sk_buff *skb,
1054static int tipc_link_input(struct tipc_link *l, struct sk_buff *skb, 1046static int tipc_link_input(struct tipc_link *l, struct sk_buff *skb,
1055 struct sk_buff_head *inputq) 1047 struct sk_buff_head *inputq)
1056{ 1048{
1057 struct tipc_node *node = l->owner;
1058 struct tipc_msg *hdr = buf_msg(skb); 1049 struct tipc_msg *hdr = buf_msg(skb);
1059 struct sk_buff **reasm_skb = &l->reasm_buf; 1050 struct sk_buff **reasm_skb = &l->reasm_buf;
1060 struct sk_buff *iskb; 1051 struct sk_buff *iskb;
@@ -1095,13 +1086,15 @@ static int tipc_link_input(struct tipc_link *l, struct sk_buff *skb,
1095 if (tipc_buf_append(reasm_skb, &skb)) { 1086 if (tipc_buf_append(reasm_skb, &skb)) {
1096 l->stats.recv_fragmented++; 1087 l->stats.recv_fragmented++;
1097 tipc_data_input(l, skb, inputq); 1088 tipc_data_input(l, skb, inputq);
1098 } else if (!*reasm_skb) { 1089 } else if (!*reasm_skb && !link_is_bc_rcvlink(l)) {
1090 pr_warn_ratelimited("Unable to build fragment list\n");
1099 return tipc_link_fsm_evt(l, LINK_FAILURE_EVT); 1091 return tipc_link_fsm_evt(l, LINK_FAILURE_EVT);
1100 } 1092 }
1101 return 0; 1093 return 0;
1102 } else if (usr == BCAST_PROTOCOL) { 1094 } else if (usr == BCAST_PROTOCOL) {
1103 tipc_link_sync_rcv(node, skb); 1095 tipc_bcast_lock(l->owner->net);
1104 return 0; 1096 tipc_link_bc_init_rcv(l->bc_rcvlink, hdr);
1097 tipc_bcast_unlock(l->owner->net);
1105 } 1098 }
1106drop: 1099drop:
1107 kfree_skb(skb); 1100 kfree_skb(skb);
@@ -1124,12 +1117,28 @@ static bool tipc_link_release_pkts(struct tipc_link *l, u16 acked)
1124} 1117}
1125 1118
1126/* tipc_link_build_ack_msg: prepare link acknowledge message for transmission 1119/* tipc_link_build_ack_msg: prepare link acknowledge message for transmission
1120 *
1121 * Note that sending of broadcast ack is coordinated among nodes, to reduce
1122 * risk of ack storms towards the sender
1127 */ 1123 */
1128void tipc_link_build_ack_msg(struct tipc_link *l, struct sk_buff_head *xmitq) 1124int tipc_link_build_ack_msg(struct tipc_link *l, struct sk_buff_head *xmitq)
1129{ 1125{
1126 if (!l)
1127 return 0;
1128
1129 /* Broadcast ACK must be sent via a unicast link => defer to caller */
1130 if (link_is_bc_rcvlink(l)) {
1131 if (((l->rcv_nxt ^ link_own_addr(l)) & 0xf) != 0xf)
1132 return 0;
1133 l->rcv_unacked = 0;
1134 return TIPC_LINK_SND_BC_ACK;
1135 }
1136
1137 /* Unicast ACK */
1130 l->rcv_unacked = 0; 1138 l->rcv_unacked = 0;
1131 l->stats.sent_acks++; 1139 l->stats.sent_acks++;
1132 tipc_link_build_proto_msg(l, STATE_MSG, 0, 0, 0, 0, xmitq); 1140 tipc_link_build_proto_msg(l, STATE_MSG, 0, 0, 0, 0, xmitq);
1141 return 0;
1133} 1142}
1134 1143
1135/* tipc_link_build_reset_msg: prepare link RESET or ACTIVATE message 1144/* tipc_link_build_reset_msg: prepare link RESET or ACTIVATE message
@@ -1151,6 +1160,9 @@ static void tipc_link_build_nack_msg(struct tipc_link *l,
1151{ 1160{
1152 u32 def_cnt = ++l->stats.deferred_recv; 1161 u32 def_cnt = ++l->stats.deferred_recv;
1153 1162
1163 if (link_is_bc_rcvlink(l))
1164 return;
1165
1154 if ((skb_queue_len(&l->deferdq) == 1) || !(def_cnt % TIPC_NACK_INTV)) 1166 if ((skb_queue_len(&l->deferdq) == 1) || !(def_cnt % TIPC_NACK_INTV))
1155 tipc_link_build_proto_msg(l, STATE_MSG, 0, 0, 0, 0, xmitq); 1167 tipc_link_build_proto_msg(l, STATE_MSG, 0, 0, 0, 0, xmitq);
1156} 1168}
@@ -1211,12 +1223,11 @@ int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb,
1211 l->rcv_nxt++; 1223 l->rcv_nxt++;
1212 l->stats.recv_info++; 1224 l->stats.recv_info++;
1213 if (!tipc_data_input(l, skb, l->inputq)) 1225 if (!tipc_data_input(l, skb, l->inputq))
1214 rc = tipc_link_input(l, skb, l->inputq); 1226 rc |= tipc_link_input(l, skb, l->inputq);
1215 if (unlikely(rc))
1216 break;
1217 if (unlikely(++l->rcv_unacked >= TIPC_MIN_LINK_WIN)) 1227 if (unlikely(++l->rcv_unacked >= TIPC_MIN_LINK_WIN))
1218 tipc_link_build_ack_msg(l, xmitq); 1228 rc |= tipc_link_build_ack_msg(l, xmitq);
1219 1229 if (unlikely(rc & ~TIPC_LINK_SND_BC_ACK))
1230 break;
1220 } while ((skb = __skb_dequeue(defq))); 1231 } while ((skb = __skb_dequeue(defq)));
1221 1232
1222 return rc; 1233 return rc;
@@ -1284,18 +1295,13 @@ void tipc_link_proto_xmit(struct tipc_link *l, u32 msg_typ, int probe_msg,
1284 kfree_skb(skb); 1295 kfree_skb(skb);
1285} 1296}
1286 1297
1287/* tipc_link_build_proto_msg: prepare link protocol message for transmission
1288 */
1289static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe, 1298static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe,
1290 u16 rcvgap, int tolerance, int priority, 1299 u16 rcvgap, int tolerance, int priority,
1291 struct sk_buff_head *xmitq) 1300 struct sk_buff_head *xmitq)
1292{ 1301{
1293 struct sk_buff *skb = NULL; 1302 struct sk_buff *skb = NULL;
1294 struct tipc_msg *hdr = l->pmsg; 1303 struct tipc_msg *hdr = l->pmsg;
1295 u16 snd_nxt = l->snd_nxt; 1304 bool node_up = link_is_up(l->bc_rcvlink);
1296 u16 rcv_nxt = l->rcv_nxt;
1297 u16 rcv_last = rcv_nxt - 1;
1298 int node_up = l->owner->bclink.recv_permitted;
1299 1305
1300 /* Don't send protocol message during reset or link failover */ 1306 /* Don't send protocol message during reset or link failover */
1301 if (tipc_link_is_blocked(l)) 1307 if (tipc_link_is_blocked(l))
@@ -1303,33 +1309,34 @@ static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe,
1303 1309
1304 msg_set_type(hdr, mtyp); 1310 msg_set_type(hdr, mtyp);
1305 msg_set_net_plane(hdr, l->net_plane); 1311 msg_set_net_plane(hdr, l->net_plane);
1306 msg_set_bcast_ack(hdr, l->owner->bclink.last_in); 1312 msg_set_next_sent(hdr, l->snd_nxt);
1307 msg_set_last_bcast(hdr, tipc_bclink_get_last_sent(l->owner->net)); 1313 msg_set_ack(hdr, l->rcv_nxt - 1);
1314 msg_set_bcast_ack(hdr, l->bc_rcvlink->rcv_nxt - 1);
1315 msg_set_last_bcast(hdr, l->bc_sndlink->snd_nxt - 1);
1308 msg_set_link_tolerance(hdr, tolerance); 1316 msg_set_link_tolerance(hdr, tolerance);
1309 msg_set_linkprio(hdr, priority); 1317 msg_set_linkprio(hdr, priority);
1310 msg_set_redundant_link(hdr, node_up); 1318 msg_set_redundant_link(hdr, node_up);
1311 msg_set_seq_gap(hdr, 0); 1319 msg_set_seq_gap(hdr, 0);
1312 1320
1313 /* Compatibility: created msg must not be in sequence with pkt flow */ 1321 /* Compatibility: created msg must not be in sequence with pkt flow */
1314 msg_set_seqno(hdr, snd_nxt + U16_MAX / 2); 1322 msg_set_seqno(hdr, l->snd_nxt + U16_MAX / 2);
1315 1323
1316 if (mtyp == STATE_MSG) { 1324 if (mtyp == STATE_MSG) {
1317 if (!tipc_link_is_up(l)) 1325 if (!tipc_link_is_up(l))
1318 return; 1326 return;
1319 msg_set_next_sent(hdr, snd_nxt);
1320 1327
1321 /* Override rcvgap if there are packets in deferred queue */ 1328 /* Override rcvgap if there are packets in deferred queue */
1322 if (!skb_queue_empty(&l->deferdq)) 1329 if (!skb_queue_empty(&l->deferdq))
1323 rcvgap = buf_seqno(skb_peek(&l->deferdq)) - rcv_nxt; 1330 rcvgap = buf_seqno(skb_peek(&l->deferdq)) - l->rcv_nxt;
1324 if (rcvgap) { 1331 if (rcvgap) {
1325 msg_set_seq_gap(hdr, rcvgap); 1332 msg_set_seq_gap(hdr, rcvgap);
1326 l->stats.sent_nacks++; 1333 l->stats.sent_nacks++;
1327 } 1334 }
1328 msg_set_ack(hdr, rcv_last);
1329 msg_set_probe(hdr, probe); 1335 msg_set_probe(hdr, probe);
1330 if (probe) 1336 if (probe)
1331 l->stats.sent_probes++; 1337 l->stats.sent_probes++;
1332 l->stats.sent_states++; 1338 l->stats.sent_states++;
1339 l->rcv_unacked = 0;
1333 } else { 1340 } else {
1334 /* RESET_MSG or ACTIVATE_MSG */ 1341 /* RESET_MSG or ACTIVATE_MSG */
1335 msg_set_max_pkt(hdr, l->advertised_mtu); 1342 msg_set_max_pkt(hdr, l->advertised_mtu);
@@ -1431,7 +1438,7 @@ static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb,
1431 char *if_name; 1438 char *if_name;
1432 int rc = 0; 1439 int rc = 0;
1433 1440
1434 if (tipc_link_is_blocked(l)) 1441 if (tipc_link_is_blocked(l) || !xmitq)
1435 goto exit; 1442 goto exit;
1436 1443
1437 if (link_own_addr(l) > msg_prevnode(hdr)) 1444 if (link_own_addr(l) > msg_prevnode(hdr))
@@ -1518,6 +1525,188 @@ exit:
1518 return rc; 1525 return rc;
1519} 1526}
1520 1527
1528/* tipc_link_build_bc_proto_msg() - create broadcast protocol message
1529 */
1530static bool tipc_link_build_bc_proto_msg(struct tipc_link *l, bool bcast,
1531 u16 peers_snd_nxt,
1532 struct sk_buff_head *xmitq)
1533{
1534 struct sk_buff *skb;
1535 struct tipc_msg *hdr;
1536 struct sk_buff *dfrd_skb = skb_peek(&l->deferdq);
1537 u16 ack = l->rcv_nxt - 1;
1538 u16 gap_to = peers_snd_nxt - 1;
1539
1540 skb = tipc_msg_create(BCAST_PROTOCOL, STATE_MSG, INT_H_SIZE,
1541 0, l->addr, link_own_addr(l), 0, 0, 0);
1542 if (!skb)
1543 return false;
1544 hdr = buf_msg(skb);
1545 msg_set_last_bcast(hdr, l->bc_sndlink->snd_nxt - 1);
1546 msg_set_bcast_ack(hdr, ack);
1547 msg_set_bcgap_after(hdr, ack);
1548 if (dfrd_skb)
1549 gap_to = buf_seqno(dfrd_skb) - 1;
1550 msg_set_bcgap_to(hdr, gap_to);
1551 msg_set_non_seq(hdr, bcast);
1552 __skb_queue_tail(xmitq, skb);
1553 return true;
1554}
1555
1556/* tipc_link_build_bc_init_msg() - synchronize broadcast link endpoints.
1557 *
1558 * Give a newly added peer node the sequence number where it should
1559 * start receiving and acking broadcast packets.
1560 */
1561void tipc_link_build_bc_init_msg(struct tipc_link *l,
1562 struct sk_buff_head *xmitq)
1563{
1564 struct sk_buff_head list;
1565
1566 __skb_queue_head_init(&list);
1567 if (!tipc_link_build_bc_proto_msg(l->bc_rcvlink, false, 0, &list))
1568 return;
1569 tipc_link_xmit(l, &list, xmitq);
1570}
1571
1572/* tipc_link_bc_init_rcv - receive initial broadcast synch data from peer
1573 */
1574void tipc_link_bc_init_rcv(struct tipc_link *l, struct tipc_msg *hdr)
1575{
1576 int mtyp = msg_type(hdr);
1577 u16 peers_snd_nxt = msg_bc_snd_nxt(hdr);
1578
1579 if (link_is_up(l))
1580 return;
1581
1582 if (msg_user(hdr) == BCAST_PROTOCOL) {
1583 l->rcv_nxt = peers_snd_nxt;
1584 l->state = LINK_ESTABLISHED;
1585 return;
1586 }
1587
1588 if (l->peer_caps & TIPC_BCAST_SYNCH)
1589 return;
1590
1591 if (msg_peer_node_is_up(hdr))
1592 return;
1593
1594 /* Compatibility: accept older, less safe initial synch data */
1595 if ((mtyp == RESET_MSG) || (mtyp == ACTIVATE_MSG))
1596 l->rcv_nxt = peers_snd_nxt;
1597}
1598
1599/* tipc_link_bc_sync_rcv - update rcv link according to peer's send state
1600 */
1601void tipc_link_bc_sync_rcv(struct tipc_link *l, struct tipc_msg *hdr,
1602 struct sk_buff_head *xmitq)
1603{
1604 u16 peers_snd_nxt = msg_bc_snd_nxt(hdr);
1605
1606 if (!link_is_up(l))
1607 return;
1608
1609 if (!msg_peer_node_is_up(hdr))
1610 return;
1611
1612 l->bc_peer_is_up = true;
1613
1614 /* Ignore if peers_snd_nxt goes beyond receive window */
1615 if (more(peers_snd_nxt, l->rcv_nxt + l->window))
1616 return;
1617
1618 if (!more(peers_snd_nxt, l->rcv_nxt)) {
1619 l->nack_state = BC_NACK_SND_CONDITIONAL;
1620 return;
1621 }
1622
1623 /* Don't NACK if one was recently sent or peeked */
1624 if (l->nack_state == BC_NACK_SND_SUPPRESS) {
1625 l->nack_state = BC_NACK_SND_UNCONDITIONAL;
1626 return;
1627 }
1628
1629 /* Conditionally delay NACK sending until next synch rcv */
1630 if (l->nack_state == BC_NACK_SND_CONDITIONAL) {
1631 l->nack_state = BC_NACK_SND_UNCONDITIONAL;
1632 if ((peers_snd_nxt - l->rcv_nxt) < TIPC_MIN_LINK_WIN)
1633 return;
1634 }
1635
1636 /* Send NACK now but suppress next one */
1637 tipc_link_build_bc_proto_msg(l, true, peers_snd_nxt, xmitq);
1638 l->nack_state = BC_NACK_SND_SUPPRESS;
1639}
1640
1641void tipc_link_bc_ack_rcv(struct tipc_link *l, u16 acked,
1642 struct sk_buff_head *xmitq)
1643{
1644 struct sk_buff *skb, *tmp;
1645 struct tipc_link *snd_l = l->bc_sndlink;
1646
1647 if (!link_is_up(l) || !l->bc_peer_is_up)
1648 return;
1649
1650 if (!more(acked, l->acked))
1651 return;
1652
1653 /* Skip over packets peer has already acked */
1654 skb_queue_walk(&snd_l->transmq, skb) {
1655 if (more(buf_seqno(skb), l->acked))
1656 break;
1657 }
1658
1659 /* Update/release the packets peer is acking now */
1660 skb_queue_walk_from_safe(&snd_l->transmq, skb, tmp) {
1661 if (more(buf_seqno(skb), acked))
1662 break;
1663 if (!--TIPC_SKB_CB(skb)->ackers) {
1664 __skb_unlink(skb, &snd_l->transmq);
1665 kfree_skb(skb);
1666 }
1667 }
1668 l->acked = acked;
1669 tipc_link_advance_backlog(snd_l, xmitq);
1670 if (unlikely(!skb_queue_empty(&snd_l->wakeupq)))
1671 link_prepare_wakeup(snd_l);
1672}
1673
1674/* tipc_link_bc_nack_rcv(): receive broadcast nack message
1675 */
1676int tipc_link_bc_nack_rcv(struct tipc_link *l, struct sk_buff *skb,
1677 struct sk_buff_head *xmitq)
1678{
1679 struct tipc_msg *hdr = buf_msg(skb);
1680 u32 dnode = msg_destnode(hdr);
1681 int mtyp = msg_type(hdr);
1682 u16 acked = msg_bcast_ack(hdr);
1683 u16 from = acked + 1;
1684 u16 to = msg_bcgap_to(hdr);
1685 u16 peers_snd_nxt = to + 1;
1686 int rc = 0;
1687
1688 kfree_skb(skb);
1689
1690 if (!tipc_link_is_up(l) || !l->bc_peer_is_up)
1691 return 0;
1692
1693 if (mtyp != STATE_MSG)
1694 return 0;
1695
1696 if (dnode == link_own_addr(l)) {
1697 tipc_link_bc_ack_rcv(l, acked, xmitq);
1698 rc = tipc_link_retrans(l->bc_sndlink, from, to, xmitq);
1699 l->stats.recv_nacks++;
1700 return rc;
1701 }
1702
1703 /* Msg for other node => suppress own NACK at next sync if applicable */
1704 if (more(peers_snd_nxt, l->rcv_nxt) && !less(l->rcv_nxt, from))
1705 l->nack_state = BC_NACK_SND_SUPPRESS;
1706
1707 return 0;
1708}
1709
1521void tipc_link_set_queue_limits(struct tipc_link *l, u32 win) 1710void tipc_link_set_queue_limits(struct tipc_link *l, u32 win)
1522{ 1711{
1523 int max_bulk = TIPC_MAX_PUBLICATIONS / (l->mtu / ITEM_SIZE); 1712 int max_bulk = TIPC_MAX_PUBLICATIONS / (l->mtu / ITEM_SIZE);
diff --git a/net/tipc/link.h b/net/tipc/link.h
index d23329db4b25..28a6396b6d31 100644
--- a/net/tipc/link.h
+++ b/net/tipc/link.h
@@ -66,7 +66,8 @@ enum {
66 */ 66 */
67enum { 67enum {
68 TIPC_LINK_UP_EVT = 1, 68 TIPC_LINK_UP_EVT = 1,
69 TIPC_LINK_DOWN_EVT = (1 << 1) 69 TIPC_LINK_DOWN_EVT = (1 << 1),
70 TIPC_LINK_SND_BC_ACK = (1 << 2)
70}; 71};
71 72
72/* Starting value for maximum packet size negotiation on unicast links 73/* Starting value for maximum packet size negotiation on unicast links
@@ -209,6 +210,10 @@ struct tipc_link {
209 /* Broadcast */ 210 /* Broadcast */
210 u16 ackers; 211 u16 ackers;
211 u16 acked; 212 u16 acked;
213 struct tipc_link *bc_rcvlink;
214 struct tipc_link *bc_sndlink;
215 int nack_state;
216 bool bc_peer_is_up;
212 217
213 /* Statistics */ 218 /* Statistics */
214 struct tipc_stats stats; 219 struct tipc_stats stats;
@@ -217,17 +222,21 @@ struct tipc_link {
217bool tipc_link_create(struct tipc_node *n, char *if_name, int bearer_id, 222bool tipc_link_create(struct tipc_node *n, char *if_name, int bearer_id,
218 int tolerance, char net_plane, u32 mtu, int priority, 223 int tolerance, char net_plane, u32 mtu, int priority,
219 int window, u32 session, u32 ownnode, u32 peer, 224 int window, u32 session, u32 ownnode, u32 peer,
220 u16 peer_caps, struct tipc_media_addr *maddr, 225 u16 peer_caps,
221 struct sk_buff_head *inputq, struct sk_buff_head *namedq, 226 struct tipc_media_addr *maddr,
227 struct tipc_link *bc_sndlink,
228 struct tipc_link *bc_rcvlink,
229 struct sk_buff_head *inputq,
230 struct sk_buff_head *namedq,
222 struct tipc_link **link); 231 struct tipc_link **link);
223bool tipc_link_bc_create(struct tipc_node *n, int mtu, int window, 232bool tipc_link_bc_create(struct tipc_node *n, u32 ownnode, u32 peer,
224 u16 peer_caps, struct sk_buff_head *inputq, 233 int mtu, int window, u16 peer_caps,
234 struct sk_buff_head *inputq,
225 struct sk_buff_head *namedq, 235 struct sk_buff_head *namedq,
236 struct tipc_link *bc_sndlink,
226 struct tipc_link **link); 237 struct tipc_link **link);
227void tipc_link_tnl_prepare(struct tipc_link *l, struct tipc_link *tnl, 238void tipc_link_tnl_prepare(struct tipc_link *l, struct tipc_link *tnl,
228 int mtyp, struct sk_buff_head *xmitq); 239 int mtyp, struct sk_buff_head *xmitq);
229void tipc_link_build_bcast_sync_msg(struct tipc_link *l,
230 struct sk_buff_head *xmitq);
231void tipc_link_build_reset_msg(struct tipc_link *l, struct sk_buff_head *xmitq); 240void tipc_link_build_reset_msg(struct tipc_link *l, struct sk_buff_head *xmitq);
232int tipc_link_fsm_evt(struct tipc_link *l, int evt); 241int tipc_link_fsm_evt(struct tipc_link *l, int evt);
233void tipc_link_reset_fragments(struct tipc_link *l_ptr); 242void tipc_link_reset_fragments(struct tipc_link *l_ptr);
@@ -264,9 +273,21 @@ int tipc_nl_parse_link_prop(struct nlattr *prop, struct nlattr *props[]);
264int tipc_link_timeout(struct tipc_link *l, struct sk_buff_head *xmitq); 273int tipc_link_timeout(struct tipc_link *l, struct sk_buff_head *xmitq);
265int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb, 274int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb,
266 struct sk_buff_head *xmitq); 275 struct sk_buff_head *xmitq);
267void tipc_link_build_ack_msg(struct tipc_link *l, struct sk_buff_head *xmitq); 276int tipc_link_build_ack_msg(struct tipc_link *l, struct sk_buff_head *xmitq);
268void tipc_link_add_bc_peer(struct tipc_link *l); 277void tipc_link_add_bc_peer(struct tipc_link *snd_l,
269void tipc_link_remove_bc_peer(struct tipc_link *l); 278 struct tipc_link *uc_l,
279 struct sk_buff_head *xmitq);
280void tipc_link_remove_bc_peer(struct tipc_link *snd_l,
281 struct tipc_link *rcv_l,
282 struct sk_buff_head *xmitq);
270int tipc_link_bc_peers(struct tipc_link *l); 283int tipc_link_bc_peers(struct tipc_link *l);
271 284void tipc_link_bc_ack_rcv(struct tipc_link *l, u16 acked,
285 struct sk_buff_head *xmitq);
286void tipc_link_build_bc_sync_msg(struct tipc_link *l,
287 struct sk_buff_head *xmitq);
288void tipc_link_bc_init_rcv(struct tipc_link *l, struct tipc_msg *hdr);
289void tipc_link_bc_sync_rcv(struct tipc_link *l, struct tipc_msg *hdr,
290 struct sk_buff_head *xmitq);
291int tipc_link_bc_nack_rcv(struct tipc_link *l, struct sk_buff *skb,
292 struct sk_buff_head *xmitq);
272#endif 293#endif
diff --git a/net/tipc/msg.c b/net/tipc/msg.c
index 5b47468739e7..8740930f0787 100644
--- a/net/tipc/msg.c
+++ b/net/tipc/msg.c
@@ -182,7 +182,6 @@ int tipc_buf_append(struct sk_buff **headbuf, struct sk_buff **buf)
182 *buf = NULL; 182 *buf = NULL;
183 return 0; 183 return 0;
184err: 184err:
185 pr_warn_ratelimited("Unable to build fragment list\n");
186 kfree_skb(*buf); 185 kfree_skb(*buf);
187 kfree_skb(*headbuf); 186 kfree_skb(*headbuf);
188 *buf = *headbuf = NULL; 187 *buf = *headbuf = NULL;
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index fbf51fa1075d..55778a0aebf3 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -601,6 +601,11 @@ static inline u32 msg_last_bcast(struct tipc_msg *m)
601 return msg_bits(m, 4, 16, 0xffff); 601 return msg_bits(m, 4, 16, 0xffff);
602} 602}
603 603
604static inline u32 msg_bc_snd_nxt(struct tipc_msg *m)
605{
606 return msg_last_bcast(m) + 1;
607}
608
604static inline void msg_set_last_bcast(struct tipc_msg *m, u32 n) 609static inline void msg_set_last_bcast(struct tipc_msg *m, u32 n)
605{ 610{
606 msg_set_bits(m, 4, 16, 0xffff, n); 611 msg_set_bits(m, 4, 16, 0xffff, n);
diff --git a/net/tipc/node.c b/net/tipc/node.c
index 28bcd7be23c6..cd924552244b 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -72,7 +72,6 @@ static void __tipc_node_link_down(struct tipc_node *n, int *bearer_id,
72static void tipc_node_link_down(struct tipc_node *n, int bearer_id, 72static void tipc_node_link_down(struct tipc_node *n, int bearer_id,
73 bool delete); 73 bool delete);
74static void node_lost_contact(struct tipc_node *n, struct sk_buff_head *inputq); 74static void node_lost_contact(struct tipc_node *n, struct sk_buff_head *inputq);
75static void node_established_contact(struct tipc_node *n_ptr);
76static void tipc_node_delete(struct tipc_node *node); 75static void tipc_node_delete(struct tipc_node *node);
77static void tipc_node_timeout(unsigned long data); 76static void tipc_node_timeout(unsigned long data);
78static void tipc_node_fsm_evt(struct tipc_node *n, int evt); 77static void tipc_node_fsm_evt(struct tipc_node *n, int evt);
@@ -165,8 +164,10 @@ struct tipc_node *tipc_node_create(struct net *net, u32 addr, u16 capabilities)
165 INIT_LIST_HEAD(&n_ptr->list); 164 INIT_LIST_HEAD(&n_ptr->list);
166 INIT_LIST_HEAD(&n_ptr->publ_list); 165 INIT_LIST_HEAD(&n_ptr->publ_list);
167 INIT_LIST_HEAD(&n_ptr->conn_sks); 166 INIT_LIST_HEAD(&n_ptr->conn_sks);
168 skb_queue_head_init(&n_ptr->bclink.namedq); 167 skb_queue_head_init(&n_ptr->bc_entry.namedq);
169 __skb_queue_head_init(&n_ptr->bclink.deferdq); 168 skb_queue_head_init(&n_ptr->bc_entry.inputq1);
169 __skb_queue_head_init(&n_ptr->bc_entry.arrvq);
170 skb_queue_head_init(&n_ptr->bc_entry.inputq2);
170 hlist_add_head_rcu(&n_ptr->hash, &tn->node_htable[tipc_hashfn(addr)]); 171 hlist_add_head_rcu(&n_ptr->hash, &tn->node_htable[tipc_hashfn(addr)]);
171 list_for_each_entry_rcu(temp_node, &tn->node_list, list) { 172 list_for_each_entry_rcu(temp_node, &tn->node_list, list) {
172 if (n_ptr->addr < temp_node->addr) 173 if (n_ptr->addr < temp_node->addr)
@@ -177,6 +178,18 @@ struct tipc_node *tipc_node_create(struct net *net, u32 addr, u16 capabilities)
177 n_ptr->signature = INVALID_NODE_SIG; 178 n_ptr->signature = INVALID_NODE_SIG;
178 n_ptr->active_links[0] = INVALID_BEARER_ID; 179 n_ptr->active_links[0] = INVALID_BEARER_ID;
179 n_ptr->active_links[1] = INVALID_BEARER_ID; 180 n_ptr->active_links[1] = INVALID_BEARER_ID;
181 if (!tipc_link_bc_create(n_ptr, tipc_own_addr(net), n_ptr->addr,
182 U16_MAX, tipc_bc_sndlink(net)->window,
183 n_ptr->capabilities,
184 &n_ptr->bc_entry.inputq1,
185 &n_ptr->bc_entry.namedq,
186 tipc_bc_sndlink(net),
187 &n_ptr->bc_entry.link)) {
188 pr_warn("Broadcast rcv link creation failed, no memory\n");
189 kfree(n_ptr);
190 n_ptr = NULL;
191 goto exit;
192 }
180 tipc_node_get(n_ptr); 193 tipc_node_get(n_ptr);
181 setup_timer(&n_ptr->timer, tipc_node_timeout, (unsigned long)n_ptr); 194 setup_timer(&n_ptr->timer, tipc_node_timeout, (unsigned long)n_ptr);
182 n_ptr->keepalive_intv = U32_MAX; 195 n_ptr->keepalive_intv = U32_MAX;
@@ -203,6 +216,7 @@ static void tipc_node_delete(struct tipc_node *node)
203{ 216{
204 list_del_rcu(&node->list); 217 list_del_rcu(&node->list);
205 hlist_del_rcu(&node->hash); 218 hlist_del_rcu(&node->hash);
219 kfree(node->bc_entry.link);
206 kfree_rcu(node, rcu); 220 kfree_rcu(node, rcu);
207} 221}
208 222
@@ -340,8 +354,9 @@ static void __tipc_node_link_up(struct tipc_node *n, int bearer_id,
340 if (!ol) { 354 if (!ol) {
341 *slot0 = bearer_id; 355 *slot0 = bearer_id;
342 *slot1 = bearer_id; 356 *slot1 = bearer_id;
343 tipc_link_build_bcast_sync_msg(nl, xmitq); 357 tipc_node_fsm_evt(n, SELF_ESTABL_CONTACT_EVT);
344 node_established_contact(n); 358 n->action_flags |= TIPC_NOTIFY_NODE_UP;
359 tipc_bcast_add_peer(n->net, n->addr, nl, xmitq);
345 return; 360 return;
346 } 361 }
347 362
@@ -585,9 +600,10 @@ void tipc_node_check_dest(struct net *net, u32 onode,
585 b->net_plane, b->mtu, b->priority, 600 b->net_plane, b->mtu, b->priority,
586 b->window, mod(tipc_net(net)->random), 601 b->window, mod(tipc_net(net)->random),
587 tipc_own_addr(net), onode, 602 tipc_own_addr(net), onode,
588 n->capabilities, 603 n->capabilities, &le->maddr,
589 &le->maddr, &le->inputq, 604 tipc_bc_sndlink(n->net), n->bc_entry.link,
590 &n->bclink.namedq, &l)) { 605 &le->inputq,
606 &n->bc_entry.namedq, &l)) {
591 *respond = false; 607 *respond = false;
592 goto exit; 608 goto exit;
593 } 609 }
@@ -830,58 +846,36 @@ bool tipc_node_filter_pkt(struct tipc_node *n, struct tipc_msg *hdr)
830 return true; 846 return true;
831} 847}
832 848
833static void node_established_contact(struct tipc_node *n_ptr) 849static void node_lost_contact(struct tipc_node *n,
834{
835 tipc_node_fsm_evt(n_ptr, SELF_ESTABL_CONTACT_EVT);
836 n_ptr->action_flags |= TIPC_NOTIFY_NODE_UP;
837 n_ptr->bclink.oos_state = 0;
838 n_ptr->bclink.acked = tipc_bclink_get_last_sent(n_ptr->net);
839 tipc_bclink_add_node(n_ptr->net, n_ptr->addr);
840}
841
842static void node_lost_contact(struct tipc_node *n_ptr,
843 struct sk_buff_head *inputq) 850 struct sk_buff_head *inputq)
844{ 851{
845 char addr_string[16]; 852 char addr_string[16];
846 struct tipc_sock_conn *conn, *safe; 853 struct tipc_sock_conn *conn, *safe;
847 struct tipc_link *l; 854 struct tipc_link *l;
848 struct list_head *conns = &n_ptr->conn_sks; 855 struct list_head *conns = &n->conn_sks;
849 struct sk_buff *skb; 856 struct sk_buff *skb;
850 struct tipc_net *tn = net_generic(n_ptr->net, tipc_net_id);
851 uint i; 857 uint i;
852 858
853 pr_debug("Lost contact with %s\n", 859 pr_debug("Lost contact with %s\n",
854 tipc_addr_string_fill(addr_string, n_ptr->addr)); 860 tipc_addr_string_fill(addr_string, n->addr));
855
856 /* Flush broadcast link info associated with lost node */
857 if (n_ptr->bclink.recv_permitted) {
858 __skb_queue_purge(&n_ptr->bclink.deferdq);
859 861
860 if (n_ptr->bclink.reasm_buf) { 862 /* Clean up broadcast state */
861 kfree_skb(n_ptr->bclink.reasm_buf); 863 tipc_bcast_remove_peer(n->net, n->addr, n->bc_entry.link);
862 n_ptr->bclink.reasm_buf = NULL;
863 }
864
865 tipc_bclink_remove_node(n_ptr->net, n_ptr->addr);
866 tipc_bclink_acknowledge(n_ptr, INVALID_LINK_SEQ);
867
868 n_ptr->bclink.recv_permitted = false;
869 }
870 864
871 /* Abort any ongoing link failover */ 865 /* Abort any ongoing link failover */
872 for (i = 0; i < MAX_BEARERS; i++) { 866 for (i = 0; i < MAX_BEARERS; i++) {
873 l = n_ptr->links[i].link; 867 l = n->links[i].link;
874 if (l) 868 if (l)
875 tipc_link_fsm_evt(l, LINK_FAILOVER_END_EVT); 869 tipc_link_fsm_evt(l, LINK_FAILOVER_END_EVT);
876 } 870 }
877 871
878 /* Notify publications from this node */ 872 /* Notify publications from this node */
879 n_ptr->action_flags |= TIPC_NOTIFY_NODE_DOWN; 873 n->action_flags |= TIPC_NOTIFY_NODE_DOWN;
880 874
881 /* Notify sockets connected to node */ 875 /* Notify sockets connected to node */
882 list_for_each_entry_safe(conn, safe, conns, list) { 876 list_for_each_entry_safe(conn, safe, conns, list) {
883 skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, TIPC_CONN_MSG, 877 skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, TIPC_CONN_MSG,
884 SHORT_H_SIZE, 0, tn->own_addr, 878 SHORT_H_SIZE, 0, tipc_own_addr(n->net),
885 conn->peer_node, conn->port, 879 conn->peer_node, conn->port,
886 conn->peer_port, TIPC_ERR_NO_NODE); 880 conn->peer_port, TIPC_ERR_NO_NODE);
887 if (likely(skb)) 881 if (likely(skb))
@@ -1086,6 +1080,67 @@ int tipc_node_xmit_skb(struct net *net, struct sk_buff *skb, u32 dnode,
1086} 1080}
1087 1081
1088/** 1082/**
1083 * tipc_node_bc_rcv - process TIPC broadcast packet arriving from off-node
1084 * @net: the applicable net namespace
1085 * @skb: TIPC packet
1086 * @bearer_id: id of bearer message arrived on
1087 *
1088 * Invoked with no locks held.
1089 */
1090void tipc_node_bc_rcv(struct net *net, struct sk_buff *skb, int bearer_id)
1091{
1092 int rc;
1093 struct sk_buff_head xmitq;
1094 struct tipc_bclink_entry *be;
1095 struct tipc_link_entry *le;
1096 struct tipc_msg *hdr = buf_msg(skb);
1097 int usr = msg_user(hdr);
1098 u32 dnode = msg_destnode(hdr);
1099 struct tipc_node *n;
1100
1101 __skb_queue_head_init(&xmitq);
1102
1103 /* If NACK for other node, let rcv link for that node peek into it */
1104 if ((usr == BCAST_PROTOCOL) && (dnode != tipc_own_addr(net)))
1105 n = tipc_node_find(net, dnode);
1106 else
1107 n = tipc_node_find(net, msg_prevnode(hdr));
1108 if (!n) {
1109 kfree_skb(skb);
1110 return;
1111 }
1112 be = &n->bc_entry;
1113 le = &n->links[bearer_id];
1114
1115 rc = tipc_bcast_rcv(net, be->link, skb);
1116
1117 /* Broadcast link reset may happen at reassembly failure */
1118 if (rc & TIPC_LINK_DOWN_EVT)
1119 tipc_node_reset_links(n);
1120
1121 /* Broadcast ACKs are sent on a unicast link */
1122 if (rc & TIPC_LINK_SND_BC_ACK) {
1123 tipc_node_lock(n);
1124 tipc_link_build_ack_msg(le->link, &xmitq);
1125 tipc_node_unlock(n);
1126 }
1127
1128 if (!skb_queue_empty(&xmitq))
1129 tipc_bearer_xmit(net, bearer_id, &xmitq, &le->maddr);
1130
1131 /* Deliver. 'arrvq' is under inputq2's lock protection */
1132 if (!skb_queue_empty(&be->inputq1)) {
1133 spin_lock_bh(&be->inputq2.lock);
1134 spin_lock_bh(&be->inputq1.lock);
1135 skb_queue_splice_tail_init(&be->inputq1, &be->arrvq);
1136 spin_unlock_bh(&be->inputq1.lock);
1137 spin_unlock_bh(&be->inputq2.lock);
1138 tipc_sk_mcast_rcv(net, &be->arrvq, &be->inputq2);
1139 }
1140 tipc_node_put(n);
1141}
1142
1143/**
1089 * tipc_node_check_state - check and if necessary update node state 1144 * tipc_node_check_state - check and if necessary update node state
1090 * @skb: TIPC packet 1145 * @skb: TIPC packet
1091 * @bearer_id: identity of bearer delivering the packet 1146 * @bearer_id: identity of bearer delivering the packet
@@ -1227,6 +1282,7 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b)
1227 int usr = msg_user(hdr); 1282 int usr = msg_user(hdr);
1228 int bearer_id = b->identity; 1283 int bearer_id = b->identity;
1229 struct tipc_link_entry *le; 1284 struct tipc_link_entry *le;
1285 u16 bc_ack = msg_bcast_ack(hdr);
1230 int rc = 0; 1286 int rc = 0;
1231 1287
1232 __skb_queue_head_init(&xmitq); 1288 __skb_queue_head_init(&xmitq);
@@ -1235,13 +1291,12 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b)
1235 if (unlikely(!tipc_msg_validate(skb))) 1291 if (unlikely(!tipc_msg_validate(skb)))
1236 goto discard; 1292 goto discard;
1237 1293
1238 /* Handle arrival of a non-unicast link packet */ 1294 /* Handle arrival of discovery or broadcast packet */
1239 if (unlikely(msg_non_seq(hdr))) { 1295 if (unlikely(msg_non_seq(hdr))) {
1240 if (usr == LINK_CONFIG) 1296 if (unlikely(usr == LINK_CONFIG))
1241 tipc_disc_rcv(net, skb, b); 1297 return tipc_disc_rcv(net, skb, b);
1242 else 1298 else
1243 tipc_bclink_rcv(net, skb); 1299 return tipc_node_bc_rcv(net, skb, bearer_id);
1244 return;
1245 } 1300 }
1246 1301
1247 /* Locate neighboring node that sent packet */ 1302 /* Locate neighboring node that sent packet */
@@ -1250,19 +1305,18 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b)
1250 goto discard; 1305 goto discard;
1251 le = &n->links[bearer_id]; 1306 le = &n->links[bearer_id];
1252 1307
1308 /* Ensure broadcast reception is in synch with peer's send state */
1309 if (unlikely(usr == LINK_PROTOCOL))
1310 tipc_bcast_sync_rcv(net, n->bc_entry.link, hdr);
1311 else if (unlikely(n->bc_entry.link->acked != bc_ack))
1312 tipc_bcast_ack_rcv(net, n->bc_entry.link, bc_ack);
1313
1253 tipc_node_lock(n); 1314 tipc_node_lock(n);
1254 1315
1255 /* Is reception permitted at the moment ? */ 1316 /* Is reception permitted at the moment ? */
1256 if (!tipc_node_filter_pkt(n, hdr)) 1317 if (!tipc_node_filter_pkt(n, hdr))
1257 goto unlock; 1318 goto unlock;
1258 1319
1259 if (unlikely(msg_user(hdr) == LINK_PROTOCOL))
1260 tipc_bclink_sync_state(n, hdr);
1261
1262 /* Release acked broadcast packets */
1263 if (unlikely(n->bclink.acked != msg_bcast_ack(hdr)))
1264 tipc_bclink_acknowledge(n, msg_bcast_ack(hdr));
1265
1266 /* Check and if necessary update node state */ 1320 /* Check and if necessary update node state */
1267 if (likely(tipc_node_check_state(n, skb, bearer_id, &xmitq))) { 1321 if (likely(tipc_node_check_state(n, skb, bearer_id, &xmitq))) {
1268 rc = tipc_link_rcv(le->link, skb, &xmitq); 1322 rc = tipc_link_rcv(le->link, skb, &xmitq);
@@ -1277,8 +1331,8 @@ unlock:
1277 if (unlikely(rc & TIPC_LINK_DOWN_EVT)) 1331 if (unlikely(rc & TIPC_LINK_DOWN_EVT))
1278 tipc_node_link_down(n, bearer_id, false); 1332 tipc_node_link_down(n, bearer_id, false);
1279 1333
1280 if (unlikely(!skb_queue_empty(&n->bclink.namedq))) 1334 if (unlikely(!skb_queue_empty(&n->bc_entry.namedq)))
1281 tipc_named_rcv(net, &n->bclink.namedq); 1335 tipc_named_rcv(net, &n->bc_entry.namedq);
1282 1336
1283 if (!skb_queue_empty(&le->inputq)) 1337 if (!skb_queue_empty(&le->inputq))
1284 tipc_sk_rcv(net, &le->inputq); 1338 tipc_sk_rcv(net, &le->inputq);
diff --git a/net/tipc/node.h b/net/tipc/node.h
index 1465774ad726..36a1cd0bc1f1 100644
--- a/net/tipc/node.h
+++ b/net/tipc/node.h
@@ -100,6 +100,14 @@ struct tipc_link_entry {
100 struct tipc_media_addr maddr; 100 struct tipc_media_addr maddr;
101}; 101};
102 102
103struct tipc_bclink_entry {
104 struct tipc_link *link;
105 struct sk_buff_head inputq1;
106 struct sk_buff_head arrvq;
107 struct sk_buff_head inputq2;
108 struct sk_buff_head namedq;
109};
110
103/** 111/**
104 * struct tipc_node - TIPC node structure 112 * struct tipc_node - TIPC node structure
105 * @addr: network address of node 113 * @addr: network address of node
@@ -132,6 +140,7 @@ struct tipc_node {
132 struct hlist_node hash; 140 struct hlist_node hash;
133 int active_links[2]; 141 int active_links[2];
134 struct tipc_link_entry links[MAX_BEARERS]; 142 struct tipc_link_entry links[MAX_BEARERS];
143 struct tipc_bclink_entry bc_entry;
135 int action_flags; 144 int action_flags;
136 struct tipc_node_bclink bclink; 145 struct tipc_node_bclink bclink;
137 struct list_head list; 146 struct list_head list;