aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc
diff options
context:
space:
mode:
Diffstat (limited to 'net/tipc')
-rw-r--r--net/tipc/bcast.c988
-rw-r--r--net/tipc/bcast.h122
-rw-r--r--net/tipc/bearer.c94
-rw-r--r--net/tipc/bearer.h9
-rw-r--r--net/tipc/core.c9
-rw-r--r--net/tipc/core.h12
-rw-r--r--net/tipc/discover.c28
-rw-r--r--net/tipc/link.c775
-rw-r--r--net/tipc/link.h76
-rw-r--r--net/tipc/msg.c20
-rw-r--r--net/tipc/msg.h8
-rw-r--r--net/tipc/name_distr.c4
-rw-r--r--net/tipc/net.c6
-rw-r--r--net/tipc/node.c184
-rw-r--r--net/tipc/node.h41
-rw-r--r--net/tipc/socket.c4
-rw-r--r--net/tipc/udp_media.c12
17 files changed, 1005 insertions, 1387 deletions
diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index eadba62afa85..9dc239dfe192 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -35,742 +35,301 @@
35 * POSSIBILITY OF SUCH DAMAGE. 35 * POSSIBILITY OF SUCH DAMAGE.
36 */ 36 */
37 37
38#include <linux/tipc_config.h>
38#include "socket.h" 39#include "socket.h"
39#include "msg.h" 40#include "msg.h"
40#include "bcast.h" 41#include "bcast.h"
41#include "name_distr.h" 42#include "name_distr.h"
42#include "core.h" 43#include "link.h"
44#include "node.h"
43 45
44#define MAX_PKT_DEFAULT_MCAST 1500 /* bcast link max packet size (fixed) */
45#define BCLINK_WIN_DEFAULT 50 /* bcast link window size (default) */ 46#define BCLINK_WIN_DEFAULT 50 /* bcast link window size (default) */
46#define BCLINK_WIN_MIN 32 /* bcast minimum link window size */ 47#define BCLINK_WIN_MIN 32 /* bcast minimum link window size */
47 48
48const char tipc_bclink_name[] = "broadcast-link"; 49const char tipc_bclink_name[] = "broadcast-link";
49 50
50static void tipc_nmap_diff(struct tipc_node_map *nm_a, 51/**
51 struct tipc_node_map *nm_b, 52 * struct tipc_bc_base - base structure for keeping broadcast send state
52 struct tipc_node_map *nm_diff); 53 * @link: broadcast send link structure
53static void tipc_nmap_add(struct tipc_node_map *nm_ptr, u32 node); 54 * @inputq: data input queue; will only carry SOCK_WAKEUP messages
54static void tipc_nmap_remove(struct tipc_node_map *nm_ptr, u32 node); 55 * @dest: array keeping number of reachable destinations per bearer
55 56 * @primary_bearer: a bearer having links to all broadcast destinations, if any
56static void tipc_bclink_lock(struct net *net) 57 */
57{ 58struct tipc_bc_base {
58 struct tipc_net *tn = net_generic(net, tipc_net_id); 59 struct tipc_link *link;
59 60 struct sk_buff_head inputq;
60 spin_lock_bh(&tn->bclink->lock); 61 int dests[MAX_BEARERS];
61} 62 int primary_bearer;
62 63};
63static void tipc_bclink_unlock(struct net *net)
64{
65 struct tipc_net *tn = net_generic(net, tipc_net_id);
66
67 spin_unlock_bh(&tn->bclink->lock);
68}
69
70void tipc_bclink_input(struct net *net)
71{
72 struct tipc_net *tn = net_generic(net, tipc_net_id);
73
74 tipc_sk_mcast_rcv(net, &tn->bclink->arrvq, &tn->bclink->inputq);
75}
76
77uint tipc_bclink_get_mtu(void)
78{
79 return MAX_PKT_DEFAULT_MCAST;
80}
81
82static u32 bcbuf_acks(struct sk_buff *buf)
83{
84 return (u32)(unsigned long)TIPC_SKB_CB(buf)->handle;
85}
86
87static void bcbuf_set_acks(struct sk_buff *buf, u32 acks)
88{
89 TIPC_SKB_CB(buf)->handle = (void *)(unsigned long)acks;
90}
91
92static void bcbuf_decr_acks(struct sk_buff *buf)
93{
94 bcbuf_set_acks(buf, bcbuf_acks(buf) - 1);
95}
96 64
97void tipc_bclink_add_node(struct net *net, u32 addr) 65static struct tipc_bc_base *tipc_bc_base(struct net *net)
98{ 66{
99 struct tipc_net *tn = net_generic(net, tipc_net_id); 67 return tipc_net(net)->bcbase;
100
101 tipc_bclink_lock(net);
102 tipc_nmap_add(&tn->bclink->bcast_nodes, addr);
103 tipc_bclink_unlock(net);
104} 68}
105 69
106void tipc_bclink_remove_node(struct net *net, u32 addr) 70int tipc_bcast_get_mtu(struct net *net)
107{ 71{
108 struct tipc_net *tn = net_generic(net, tipc_net_id); 72 return tipc_link_mtu(tipc_bc_sndlink(net));
109
110 tipc_bclink_lock(net);
111 tipc_nmap_remove(&tn->bclink->bcast_nodes, addr);
112
113 /* Last node? => reset backlog queue */
114 if (!tn->bclink->bcast_nodes.count)
115 tipc_link_purge_backlog(&tn->bclink->link);
116
117 tipc_bclink_unlock(net);
118} 73}
119 74
120static void bclink_set_last_sent(struct net *net) 75/* tipc_bcbase_select_primary(): find a bearer with links to all destinations,
76 * if any, and make it primary bearer
77 */
78static void tipc_bcbase_select_primary(struct net *net)
121{ 79{
122 struct tipc_net *tn = net_generic(net, tipc_net_id); 80 struct tipc_bc_base *bb = tipc_bc_base(net);
123 struct tipc_link *bcl = tn->bcl; 81 int all_dests = tipc_link_bc_peers(bb->link);
82 int i, mtu;
124 83
125 bcl->silent_intv_cnt = mod(bcl->snd_nxt - 1); 84 bb->primary_bearer = INVALID_BEARER_ID;
126}
127 85
128u32 tipc_bclink_get_last_sent(struct net *net) 86 if (!all_dests)
129{ 87 return;
130 struct tipc_net *tn = net_generic(net, tipc_net_id);
131 88
132 return tn->bcl->silent_intv_cnt; 89 for (i = 0; i < MAX_BEARERS; i++) {
133} 90 if (!bb->dests[i])
91 continue;
134 92
135static void bclink_update_last_sent(struct tipc_node *node, u32 seqno) 93 mtu = tipc_bearer_mtu(net, i);
136{ 94 if (mtu < tipc_link_mtu(bb->link))
137 node->bclink.last_sent = less_eq(node->bclink.last_sent, seqno) ? 95 tipc_link_set_mtu(bb->link, mtu);
138 seqno : node->bclink.last_sent;
139}
140 96
141/** 97 if (bb->dests[i] < all_dests)
142 * tipc_bclink_retransmit_to - get most recent node to request retransmission 98 continue;
143 *
144 * Called with bclink_lock locked
145 */
146struct tipc_node *tipc_bclink_retransmit_to(struct net *net)
147{
148 struct tipc_net *tn = net_generic(net, tipc_net_id);
149
150 return tn->bclink->retransmit_to;
151}
152 99
153/** 100 bb->primary_bearer = i;
154 * bclink_retransmit_pkt - retransmit broadcast packets
155 * @after: sequence number of last packet to *not* retransmit
156 * @to: sequence number of last packet to retransmit
157 *
158 * Called with bclink_lock locked
159 */
160static void bclink_retransmit_pkt(struct tipc_net *tn, u32 after, u32 to)
161{
162 struct sk_buff *skb;
163 struct tipc_link *bcl = tn->bcl;
164 101
165 skb_queue_walk(&bcl->transmq, skb) { 102 /* Reduce risk that all nodes select same primary */
166 if (more(buf_seqno(skb), after)) { 103 if ((i ^ tipc_own_addr(net)) & 1)
167 tipc_link_retransmit(bcl, skb, mod(to - after));
168 break; 104 break;
169 }
170 } 105 }
171} 106}
172 107
173/** 108void tipc_bcast_inc_bearer_dst_cnt(struct net *net, int bearer_id)
174 * bclink_prepare_wakeup - prepare users for wakeup after congestion
175 * @bcl: broadcast link
176 * @resultq: queue for users which can be woken up
177 * Move a number of waiting users, as permitted by available space in
178 * the send queue, from link wait queue to specified queue for wakeup
179 */
180static void bclink_prepare_wakeup(struct tipc_link *bcl, struct sk_buff_head *resultq)
181{ 109{
182 int pnd[TIPC_SYSTEM_IMPORTANCE + 1] = {0,}; 110 struct tipc_bc_base *bb = tipc_bc_base(net);
183 int imp, lim;
184 struct sk_buff *skb, *tmp;
185
186 skb_queue_walk_safe(&bcl->wakeupq, skb, tmp) {
187 imp = TIPC_SKB_CB(skb)->chain_imp;
188 lim = bcl->window + bcl->backlog[imp].limit;
189 pnd[imp] += TIPC_SKB_CB(skb)->chain_sz;
190 if ((pnd[imp] + bcl->backlog[imp].len) >= lim)
191 continue;
192 skb_unlink(skb, &bcl->wakeupq);
193 skb_queue_tail(resultq, skb);
194 }
195}
196 111
197/** 112 tipc_bcast_lock(net);
198 * tipc_bclink_wakeup_users - wake up pending users 113 bb->dests[bearer_id]++;
199 * 114 tipc_bcbase_select_primary(net);
200 * Called with no locks taken 115 tipc_bcast_unlock(net);
201 */
202void tipc_bclink_wakeup_users(struct net *net)
203{
204 struct tipc_net *tn = net_generic(net, tipc_net_id);
205 struct tipc_link *bcl = tn->bcl;
206 struct sk_buff_head resultq;
207
208 skb_queue_head_init(&resultq);
209 bclink_prepare_wakeup(bcl, &resultq);
210 tipc_sk_rcv(net, &resultq);
211} 116}
212 117
213/** 118void tipc_bcast_dec_bearer_dst_cnt(struct net *net, int bearer_id)
214 * tipc_bclink_acknowledge - handle acknowledgement of broadcast packets
215 * @n_ptr: node that sent acknowledgement info
216 * @acked: broadcast sequence # that has been acknowledged
217 *
218 * Node is locked, bclink_lock unlocked.
219 */
220void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked)
221{ 119{
222 struct sk_buff *skb, *tmp; 120 struct tipc_bc_base *bb = tipc_bc_base(net);
223 unsigned int released = 0;
224 struct net *net = n_ptr->net;
225 struct tipc_net *tn = net_generic(net, tipc_net_id);
226
227 if (unlikely(!n_ptr->bclink.recv_permitted))
228 return;
229 121
230 tipc_bclink_lock(net); 122 tipc_bcast_lock(net);
231 123 bb->dests[bearer_id]--;
232 /* Bail out if tx queue is empty (no clean up is required) */ 124 tipc_bcbase_select_primary(net);
233 skb = skb_peek(&tn->bcl->transmq); 125 tipc_bcast_unlock(net);
234 if (!skb)
235 goto exit;
236
237 /* Determine which messages need to be acknowledged */
238 if (acked == INVALID_LINK_SEQ) {
239 /*
240 * Contact with specified node has been lost, so need to
241 * acknowledge sent messages only (if other nodes still exist)
242 * or both sent and unsent messages (otherwise)
243 */
244 if (tn->bclink->bcast_nodes.count)
245 acked = tn->bcl->silent_intv_cnt;
246 else
247 acked = tn->bcl->snd_nxt;
248 } else {
249 /*
250 * Bail out if specified sequence number does not correspond
251 * to a message that has been sent and not yet acknowledged
252 */
253 if (less(acked, buf_seqno(skb)) ||
254 less(tn->bcl->silent_intv_cnt, acked) ||
255 less_eq(acked, n_ptr->bclink.acked))
256 goto exit;
257 }
258
259 /* Skip over packets that node has previously acknowledged */
260 skb_queue_walk(&tn->bcl->transmq, skb) {
261 if (more(buf_seqno(skb), n_ptr->bclink.acked))
262 break;
263 }
264
265 /* Update packets that node is now acknowledging */
266 skb_queue_walk_from_safe(&tn->bcl->transmq, skb, tmp) {
267 if (more(buf_seqno(skb), acked))
268 break;
269 bcbuf_decr_acks(skb);
270 bclink_set_last_sent(net);
271 if (bcbuf_acks(skb) == 0) {
272 __skb_unlink(skb, &tn->bcl->transmq);
273 kfree_skb(skb);
274 released = 1;
275 }
276 }
277 n_ptr->bclink.acked = acked;
278
279 /* Try resolving broadcast link congestion, if necessary */
280 if (unlikely(skb_peek(&tn->bcl->backlogq))) {
281 tipc_link_push_packets(tn->bcl);
282 bclink_set_last_sent(net);
283 }
284 if (unlikely(released && !skb_queue_empty(&tn->bcl->wakeupq)))
285 n_ptr->action_flags |= TIPC_WAKEUP_BCAST_USERS;
286exit:
287 tipc_bclink_unlock(net);
288} 126}
289 127
290/** 128/* tipc_bcbase_xmit - broadcast a packet queue across one or more bearers
291 * tipc_bclink_update_link_state - update broadcast link state
292 * 129 *
293 * RCU and node lock set 130 * Note that number of reachable destinations, as indicated in the dests[]
131 * array, may transitionally differ from the number of destinations indicated
132 * in each sent buffer. We can sustain this. Excess destination nodes will
133 * drop and never acknowledge the unexpected packets, and missing destinations
134 * will either require retransmission (if they are just about to be added to
135 * the bearer), or be removed from the buffer's 'ackers' counter (if they
136 * just went down)
294 */ 137 */
295void tipc_bclink_update_link_state(struct tipc_node *n_ptr, 138static void tipc_bcbase_xmit(struct net *net, struct sk_buff_head *xmitq)
296 u32 last_sent)
297{ 139{
298 struct sk_buff *buf; 140 int bearer_id;
299 struct net *net = n_ptr->net; 141 struct tipc_bc_base *bb = tipc_bc_base(net);
300 struct tipc_net *tn = net_generic(net, tipc_net_id); 142 struct sk_buff *skb, *_skb;
301 143 struct sk_buff_head _xmitq;
302 /* Ignore "stale" link state info */
303 if (less_eq(last_sent, n_ptr->bclink.last_in))
304 return;
305 144
306 /* Update link synchronization state; quit if in sync */ 145 if (skb_queue_empty(xmitq))
307 bclink_update_last_sent(n_ptr, last_sent);
308
309 if (n_ptr->bclink.last_sent == n_ptr->bclink.last_in)
310 return; 146 return;
311 147
312 /* Update out-of-sync state; quit if loss is still unconfirmed */ 148 /* The typical case: at least one bearer has links to all nodes */
313 if ((++n_ptr->bclink.oos_state) == 1) { 149 bearer_id = bb->primary_bearer;
314 if (n_ptr->bclink.deferred_size < (TIPC_MIN_LINK_WIN / 2)) 150 if (bearer_id >= 0) {
315 return; 151 tipc_bearer_bc_xmit(net, bearer_id, xmitq);
316 n_ptr->bclink.oos_state++;
317 }
318
319 /* Don't NACK if one has been recently sent (or seen) */
320 if (n_ptr->bclink.oos_state & 0x1)
321 return; 152 return;
322
323 /* Send NACK */
324 buf = tipc_buf_acquire(INT_H_SIZE);
325 if (buf) {
326 struct tipc_msg *msg = buf_msg(buf);
327 struct sk_buff *skb = skb_peek(&n_ptr->bclink.deferdq);
328 u32 to = skb ? buf_seqno(skb) - 1 : n_ptr->bclink.last_sent;
329
330 tipc_msg_init(tn->own_addr, msg, BCAST_PROTOCOL, STATE_MSG,
331 INT_H_SIZE, n_ptr->addr);
332 msg_set_non_seq(msg, 1);
333 msg_set_mc_netid(msg, tn->net_id);
334 msg_set_bcast_ack(msg, n_ptr->bclink.last_in);
335 msg_set_bcgap_after(msg, n_ptr->bclink.last_in);
336 msg_set_bcgap_to(msg, to);
337
338 tipc_bclink_lock(net);
339 tipc_bearer_send(net, MAX_BEARERS, buf, NULL);
340 tn->bcl->stats.sent_nacks++;
341 tipc_bclink_unlock(net);
342 kfree_skb(buf);
343
344 n_ptr->bclink.oos_state++;
345 } 153 }
346}
347 154
348void tipc_bclink_sync_state(struct tipc_node *n, struct tipc_msg *hdr) 155 /* We have to transmit across all bearers */
349{ 156 skb_queue_head_init(&_xmitq);
350 u16 last = msg_last_bcast(hdr); 157 for (bearer_id = 0; bearer_id < MAX_BEARERS; bearer_id++) {
351 int mtyp = msg_type(hdr); 158 if (!bb->dests[bearer_id])
159 continue;
352 160
353 if (unlikely(msg_user(hdr) != LINK_PROTOCOL)) 161 skb_queue_walk(xmitq, skb) {
354 return; 162 _skb = pskb_copy_for_clone(skb, GFP_ATOMIC);
355 if (mtyp == STATE_MSG) { 163 if (!_skb)
356 tipc_bclink_update_link_state(n, last); 164 break;
357 return; 165 __skb_queue_tail(&_xmitq, _skb);
166 }
167 tipc_bearer_bc_xmit(net, bearer_id, &_xmitq);
358 } 168 }
359 /* Compatibility: older nodes don't know BCAST_PROTOCOL synchronization, 169 __skb_queue_purge(xmitq);
360 * and transfer synch info in LINK_PROTOCOL messages. 170 __skb_queue_purge(&_xmitq);
361 */
362 if (tipc_node_is_up(n))
363 return;
364 if ((mtyp != RESET_MSG) && (mtyp != ACTIVATE_MSG))
365 return;
366 n->bclink.last_sent = last;
367 n->bclink.last_in = last;
368 n->bclink.oos_state = 0;
369} 171}
370 172
371/** 173/* tipc_bcast_xmit - deliver buffer chain to all nodes in cluster
372 * bclink_peek_nack - monitor retransmission requests sent by other nodes
373 *
374 * Delay any upcoming NACK by this node if another node has already
375 * requested the first message this node is going to ask for.
376 */
377static void bclink_peek_nack(struct net *net, struct tipc_msg *msg)
378{
379 struct tipc_node *n_ptr = tipc_node_find(net, msg_destnode(msg));
380
381 if (unlikely(!n_ptr))
382 return;
383
384 tipc_node_lock(n_ptr);
385 if (n_ptr->bclink.recv_permitted &&
386 (n_ptr->bclink.last_in != n_ptr->bclink.last_sent) &&
387 (n_ptr->bclink.last_in == msg_bcgap_after(msg)))
388 n_ptr->bclink.oos_state = 2;
389 tipc_node_unlock(n_ptr);
390 tipc_node_put(n_ptr);
391}
392
393/* tipc_bclink_xmit - deliver buffer chain to all nodes in cluster
394 * and to identified node local sockets 174 * and to identified node local sockets
395 * @net: the applicable net namespace 175 * @net: the applicable net namespace
396 * @list: chain of buffers containing message 176 * @list: chain of buffers containing message
397 * Consumes the buffer chain, except when returning -ELINKCONG 177 * Consumes the buffer chain, except when returning -ELINKCONG
398 * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE 178 * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE
399 */ 179 */
400int tipc_bclink_xmit(struct net *net, struct sk_buff_head *list) 180int tipc_bcast_xmit(struct net *net, struct sk_buff_head *list)
401{ 181{
402 struct tipc_net *tn = net_generic(net, tipc_net_id); 182 struct tipc_link *l = tipc_bc_sndlink(net);
403 struct tipc_link *bcl = tn->bcl; 183 struct sk_buff_head xmitq, inputq, rcvq;
404 struct tipc_bclink *bclink = tn->bclink;
405 int rc = 0; 184 int rc = 0;
406 int bc = 0;
407 struct sk_buff *skb;
408 struct sk_buff_head arrvq;
409 struct sk_buff_head inputq;
410 185
411 /* Prepare clone of message for local node */ 186 __skb_queue_head_init(&rcvq);
412 skb = tipc_msg_reassemble(list); 187 __skb_queue_head_init(&xmitq);
413 if (unlikely(!skb)) 188 skb_queue_head_init(&inputq);
414 return -EHOSTUNREACH;
415 189
416 /* Broadcast to all nodes */ 190 /* Prepare message clone for local node */
417 if (likely(bclink)) { 191 if (unlikely(!tipc_msg_reassemble(list, &rcvq)))
418 tipc_bclink_lock(net); 192 return -EHOSTUNREACH;
419 if (likely(bclink->bcast_nodes.count)) {
420 rc = __tipc_link_xmit(net, bcl, list);
421 if (likely(!rc)) {
422 u32 len = skb_queue_len(&bcl->transmq);
423
424 bclink_set_last_sent(net);
425 bcl->stats.queue_sz_counts++;
426 bcl->stats.accu_queue_sz += len;
427 }
428 bc = 1;
429 }
430 tipc_bclink_unlock(net);
431 }
432 193
433 if (unlikely(!bc)) 194 tipc_bcast_lock(net);
434 __skb_queue_purge(list); 195 if (tipc_link_bc_peers(l))
196 rc = tipc_link_xmit(l, list, &xmitq);
197 tipc_bcast_unlock(net);
435 198
199 /* Don't send to local node if adding to link failed */
436 if (unlikely(rc)) { 200 if (unlikely(rc)) {
437 kfree_skb(skb); 201 __skb_queue_purge(&rcvq);
438 return rc; 202 return rc;
439 } 203 }
440 /* Deliver message clone */
441 __skb_queue_head_init(&arrvq);
442 skb_queue_head_init(&inputq);
443 __skb_queue_tail(&arrvq, skb);
444 tipc_sk_mcast_rcv(net, &arrvq, &inputq);
445 return rc;
446}
447 204
448/** 205 /* Broadcast to all nodes, inluding local node */
449 * bclink_accept_pkt - accept an incoming, in-sequence broadcast packet 206 tipc_bcbase_xmit(net, &xmitq);
450 * 207 tipc_sk_mcast_rcv(net, &rcvq, &inputq);
451 * Called with both sending node's lock and bclink_lock taken. 208 __skb_queue_purge(list);
452 */ 209 return 0;
453static void bclink_accept_pkt(struct tipc_node *node, u32 seqno)
454{
455 struct tipc_net *tn = net_generic(node->net, tipc_net_id);
456
457 bclink_update_last_sent(node, seqno);
458 node->bclink.last_in = seqno;
459 node->bclink.oos_state = 0;
460 tn->bcl->stats.recv_info++;
461
462 /*
463 * Unicast an ACK periodically, ensuring that
464 * all nodes in the cluster don't ACK at the same time
465 */
466 if (((seqno - tn->own_addr) % TIPC_MIN_LINK_WIN) == 0) {
467 tipc_link_proto_xmit(node_active_link(node, node->addr),
468 STATE_MSG, 0, 0, 0, 0);
469 tn->bcl->stats.sent_acks++;
470 }
471} 210}
472 211
473/** 212/* tipc_bcast_rcv - receive a broadcast packet, and deliver to rcv link
474 * tipc_bclink_rcv - receive a broadcast packet, and deliver upwards
475 * 213 *
476 * RCU is locked, no other locks set 214 * RCU is locked, no other locks set
477 */ 215 */
478void tipc_bclink_rcv(struct net *net, struct sk_buff *buf) 216int tipc_bcast_rcv(struct net *net, struct tipc_link *l, struct sk_buff *skb)
479{ 217{
480 struct tipc_net *tn = net_generic(net, tipc_net_id); 218 struct tipc_msg *hdr = buf_msg(skb);
481 struct tipc_link *bcl = tn->bcl; 219 struct sk_buff_head *inputq = &tipc_bc_base(net)->inputq;
482 struct tipc_msg *msg = buf_msg(buf); 220 struct sk_buff_head xmitq;
483 struct tipc_node *node; 221 int rc;
484 u32 next_in;
485 u32 seqno;
486 int deferred = 0;
487 int pos = 0;
488 struct sk_buff *iskb;
489 struct sk_buff_head *arrvq, *inputq;
490
491 /* Screen out unwanted broadcast messages */
492 if (msg_mc_netid(msg) != tn->net_id)
493 goto exit;
494
495 node = tipc_node_find(net, msg_prevnode(msg));
496 if (unlikely(!node))
497 goto exit;
498
499 tipc_node_lock(node);
500 if (unlikely(!node->bclink.recv_permitted))
501 goto unlock;
502
503 /* Handle broadcast protocol message */
504 if (unlikely(msg_user(msg) == BCAST_PROTOCOL)) {
505 if (msg_type(msg) != STATE_MSG)
506 goto unlock;
507 if (msg_destnode(msg) == tn->own_addr) {
508 tipc_bclink_acknowledge(node, msg_bcast_ack(msg));
509 tipc_bclink_lock(net);
510 bcl->stats.recv_nacks++;
511 tn->bclink->retransmit_to = node;
512 bclink_retransmit_pkt(tn, msg_bcgap_after(msg),
513 msg_bcgap_to(msg));
514 tipc_bclink_unlock(net);
515 tipc_node_unlock(node);
516 } else {
517 tipc_node_unlock(node);
518 bclink_peek_nack(net, msg);
519 }
520 tipc_node_put(node);
521 goto exit;
522 }
523
524 /* Handle in-sequence broadcast message */
525 seqno = msg_seqno(msg);
526 next_in = mod(node->bclink.last_in + 1);
527 arrvq = &tn->bclink->arrvq;
528 inputq = &tn->bclink->inputq;
529
530 if (likely(seqno == next_in)) {
531receive:
532 /* Deliver message to destination */
533 if (likely(msg_isdata(msg))) {
534 tipc_bclink_lock(net);
535 bclink_accept_pkt(node, seqno);
536 spin_lock_bh(&inputq->lock);
537 __skb_queue_tail(arrvq, buf);
538 spin_unlock_bh(&inputq->lock);
539 node->action_flags |= TIPC_BCAST_MSG_EVT;
540 tipc_bclink_unlock(net);
541 tipc_node_unlock(node);
542 } else if (msg_user(msg) == MSG_BUNDLER) {
543 tipc_bclink_lock(net);
544 bclink_accept_pkt(node, seqno);
545 bcl->stats.recv_bundles++;
546 bcl->stats.recv_bundled += msg_msgcnt(msg);
547 pos = 0;
548 while (tipc_msg_extract(buf, &iskb, &pos)) {
549 spin_lock_bh(&inputq->lock);
550 __skb_queue_tail(arrvq, iskb);
551 spin_unlock_bh(&inputq->lock);
552 }
553 node->action_flags |= TIPC_BCAST_MSG_EVT;
554 tipc_bclink_unlock(net);
555 tipc_node_unlock(node);
556 } else if (msg_user(msg) == MSG_FRAGMENTER) {
557 tipc_bclink_lock(net);
558 bclink_accept_pkt(node, seqno);
559 tipc_buf_append(&node->bclink.reasm_buf, &buf);
560 if (unlikely(!buf && !node->bclink.reasm_buf)) {
561 tipc_bclink_unlock(net);
562 goto unlock;
563 }
564 bcl->stats.recv_fragments++;
565 if (buf) {
566 bcl->stats.recv_fragmented++;
567 msg = buf_msg(buf);
568 tipc_bclink_unlock(net);
569 goto receive;
570 }
571 tipc_bclink_unlock(net);
572 tipc_node_unlock(node);
573 } else {
574 tipc_bclink_lock(net);
575 bclink_accept_pkt(node, seqno);
576 tipc_bclink_unlock(net);
577 tipc_node_unlock(node);
578 kfree_skb(buf);
579 }
580 buf = NULL;
581 222
582 /* Determine new synchronization state */ 223 __skb_queue_head_init(&xmitq);
583 tipc_node_lock(node);
584 if (unlikely(!tipc_node_is_up(node)))
585 goto unlock;
586 224
587 if (node->bclink.last_in == node->bclink.last_sent) 225 if (msg_mc_netid(hdr) != tipc_netid(net) || !tipc_link_is_up(l)) {
588 goto unlock; 226 kfree_skb(skb);
589 227 return 0;
590 if (skb_queue_empty(&node->bclink.deferdq)) {
591 node->bclink.oos_state = 1;
592 goto unlock;
593 }
594
595 msg = buf_msg(skb_peek(&node->bclink.deferdq));
596 seqno = msg_seqno(msg);
597 next_in = mod(next_in + 1);
598 if (seqno != next_in)
599 goto unlock;
600
601 /* Take in-sequence message from deferred queue & deliver it */
602 buf = __skb_dequeue(&node->bclink.deferdq);
603 goto receive;
604 }
605
606 /* Handle out-of-sequence broadcast message */
607 if (less(next_in, seqno)) {
608 deferred = tipc_link_defer_pkt(&node->bclink.deferdq,
609 buf);
610 bclink_update_last_sent(node, seqno);
611 buf = NULL;
612 } 228 }
613 229
614 tipc_bclink_lock(net); 230 tipc_bcast_lock(net);
615 231 if (msg_user(hdr) == BCAST_PROTOCOL)
616 if (deferred) 232 rc = tipc_link_bc_nack_rcv(l, skb, &xmitq);
617 bcl->stats.deferred_recv++;
618 else 233 else
619 bcl->stats.duplicates++; 234 rc = tipc_link_rcv(l, skb, NULL);
235 tipc_bcast_unlock(net);
620 236
621 tipc_bclink_unlock(net); 237 tipc_bcbase_xmit(net, &xmitq);
622 238
623unlock: 239 /* Any socket wakeup messages ? */
624 tipc_node_unlock(node); 240 if (!skb_queue_empty(inputq))
625 tipc_node_put(node); 241 tipc_sk_rcv(net, inputq);
626exit:
627 kfree_skb(buf);
628}
629 242
630u32 tipc_bclink_acks_missing(struct tipc_node *n_ptr) 243 return rc;
631{
632 return (n_ptr->bclink.recv_permitted &&
633 (tipc_bclink_get_last_sent(n_ptr->net) != n_ptr->bclink.acked));
634} 244}
635 245
636 246/* tipc_bcast_ack_rcv - receive and handle a broadcast acknowledge
637/**
638 * tipc_bcbearer_send - send a packet through the broadcast pseudo-bearer
639 * 247 *
640 * Send packet over as many bearers as necessary to reach all nodes 248 * RCU is locked, no other locks set
641 * that have joined the broadcast link.
642 *
643 * Returns 0 (packet sent successfully) under all circumstances,
644 * since the broadcast link's pseudo-bearer never blocks
645 */ 249 */
646static int tipc_bcbearer_send(struct net *net, struct sk_buff *buf, 250void tipc_bcast_ack_rcv(struct net *net, struct tipc_link *l, u32 acked)
647 struct tipc_bearer *unused1,
648 struct tipc_media_addr *unused2)
649{ 251{
650 int bp_index; 252 struct sk_buff_head *inputq = &tipc_bc_base(net)->inputq;
651 struct tipc_msg *msg = buf_msg(buf); 253 struct sk_buff_head xmitq;
652 struct tipc_net *tn = net_generic(net, tipc_net_id);
653 struct tipc_bcbearer *bcbearer = tn->bcbearer;
654 struct tipc_bclink *bclink = tn->bclink;
655
656 /* Prepare broadcast link message for reliable transmission,
657 * if first time trying to send it;
658 * preparation is skipped for broadcast link protocol messages
659 * since they are sent in an unreliable manner and don't need it
660 */
661 if (likely(!msg_non_seq(buf_msg(buf)))) {
662 bcbuf_set_acks(buf, bclink->bcast_nodes.count);
663 msg_set_non_seq(msg, 1);
664 msg_set_mc_netid(msg, tn->net_id);
665 tn->bcl->stats.sent_info++;
666 if (WARN_ON(!bclink->bcast_nodes.count)) {
667 dump_stack();
668 return 0;
669 }
670 }
671 254
672 /* Send buffer over bearers until all targets reached */ 255 __skb_queue_head_init(&xmitq);
673 bcbearer->remains = bclink->bcast_nodes;
674
675 for (bp_index = 0; bp_index < MAX_BEARERS; bp_index++) {
676 struct tipc_bearer *p = bcbearer->bpairs[bp_index].primary;
677 struct tipc_bearer *s = bcbearer->bpairs[bp_index].secondary;
678 struct tipc_bearer *bp[2] = {p, s};
679 struct tipc_bearer *b = bp[msg_link_selector(msg)];
680 struct sk_buff *tbuf;
681
682 if (!p)
683 break; /* No more bearers to try */
684 if (!b)
685 b = p;
686 tipc_nmap_diff(&bcbearer->remains, &b->nodes,
687 &bcbearer->remains_new);
688 if (bcbearer->remains_new.count == bcbearer->remains.count)
689 continue; /* Nothing added by bearer pair */
690
691 if (bp_index == 0) {
692 /* Use original buffer for first bearer */
693 tipc_bearer_send(net, b->identity, buf, &b->bcast_addr);
694 } else {
695 /* Avoid concurrent buffer access */
696 tbuf = pskb_copy_for_clone(buf, GFP_ATOMIC);
697 if (!tbuf)
698 break;
699 tipc_bearer_send(net, b->identity, tbuf,
700 &b->bcast_addr);
701 kfree_skb(tbuf); /* Bearer keeps a clone */
702 }
703 if (bcbearer->remains_new.count == 0)
704 break; /* All targets reached */
705 256
706 bcbearer->remains = bcbearer->remains_new; 257 tipc_bcast_lock(net);
707 } 258 tipc_link_bc_ack_rcv(l, acked, &xmitq);
259 tipc_bcast_unlock(net);
708 260
709 return 0; 261 tipc_bcbase_xmit(net, &xmitq);
262
263 /* Any socket wakeup messages ? */
264 if (!skb_queue_empty(inputq))
265 tipc_sk_rcv(net, inputq);
710} 266}
711 267
712/** 268/* tipc_bcast_synch_rcv - check and update rcv link with peer's send state
713 * tipc_bcbearer_sort - create sets of bearer pairs used by broadcast bearer 269 *
270 * RCU is locked, no other locks set
714 */ 271 */
715void tipc_bcbearer_sort(struct net *net, struct tipc_node_map *nm_ptr, 272void tipc_bcast_sync_rcv(struct net *net, struct tipc_link *l,
716 u32 node, bool action) 273 struct tipc_msg *hdr)
717{ 274{
718 struct tipc_net *tn = net_generic(net, tipc_net_id); 275 struct sk_buff_head *inputq = &tipc_bc_base(net)->inputq;
719 struct tipc_bcbearer *bcbearer = tn->bcbearer; 276 struct sk_buff_head xmitq;
720 struct tipc_bcbearer_pair *bp_temp = bcbearer->bpairs_temp;
721 struct tipc_bcbearer_pair *bp_curr;
722 struct tipc_bearer *b;
723 int b_index;
724 int pri;
725
726 tipc_bclink_lock(net);
727 277
728 if (action) 278 __skb_queue_head_init(&xmitq);
729 tipc_nmap_add(nm_ptr, node);
730 else
731 tipc_nmap_remove(nm_ptr, node);
732 279
733 /* Group bearers by priority (can assume max of two per priority) */ 280 tipc_bcast_lock(net);
734 memset(bp_temp, 0, sizeof(bcbearer->bpairs_temp)); 281 if (msg_type(hdr) == STATE_MSG) {
282 tipc_link_bc_ack_rcv(l, msg_bcast_ack(hdr), &xmitq);
283 tipc_link_bc_sync_rcv(l, hdr, &xmitq);
284 } else {
285 tipc_link_bc_init_rcv(l, hdr);
286 }
287 tipc_bcast_unlock(net);
735 288
736 rcu_read_lock(); 289 tipc_bcbase_xmit(net, &xmitq);
737 for (b_index = 0; b_index < MAX_BEARERS; b_index++) {
738 b = rcu_dereference_rtnl(tn->bearer_list[b_index]);
739 if (!b || !b->nodes.count)
740 continue;
741 290
742 if (!bp_temp[b->priority].primary) 291 /* Any socket wakeup messages ? */
743 bp_temp[b->priority].primary = b; 292 if (!skb_queue_empty(inputq))
744 else 293 tipc_sk_rcv(net, inputq);
745 bp_temp[b->priority].secondary = b; 294}
746 }
747 rcu_read_unlock();
748 295
749 /* Create array of bearer pairs for broadcasting */ 296/* tipc_bcast_add_peer - add a peer node to broadcast link and bearer
750 bp_curr = bcbearer->bpairs; 297 *
751 memset(bcbearer->bpairs, 0, sizeof(bcbearer->bpairs)); 298 * RCU is locked, node lock is set
299 */
300void tipc_bcast_add_peer(struct net *net, struct tipc_link *uc_l,
301 struct sk_buff_head *xmitq)
302{
303 struct tipc_link *snd_l = tipc_bc_sndlink(net);
752 304
753 for (pri = TIPC_MAX_LINK_PRI; pri >= 0; pri--) { 305 tipc_bcast_lock(net);
306 tipc_link_add_bc_peer(snd_l, uc_l, xmitq);
307 tipc_bcbase_select_primary(net);
308 tipc_bcast_unlock(net);
309}
754 310
755 if (!bp_temp[pri].primary) 311/* tipc_bcast_remove_peer - remove a peer node from broadcast link and bearer
756 continue; 312 *
313 * RCU is locked, node lock is set
314 */
315void tipc_bcast_remove_peer(struct net *net, struct tipc_link *rcv_l)
316{
317 struct tipc_link *snd_l = tipc_bc_sndlink(net);
318 struct sk_buff_head *inputq = &tipc_bc_base(net)->inputq;
319 struct sk_buff_head xmitq;
757 320
758 bp_curr->primary = bp_temp[pri].primary; 321 __skb_queue_head_init(&xmitq);
759 322
760 if (bp_temp[pri].secondary) { 323 tipc_bcast_lock(net);
761 if (tipc_nmap_equal(&bp_temp[pri].primary->nodes, 324 tipc_link_remove_bc_peer(snd_l, rcv_l, &xmitq);
762 &bp_temp[pri].secondary->nodes)) { 325 tipc_bcbase_select_primary(net);
763 bp_curr->secondary = bp_temp[pri].secondary; 326 tipc_bcast_unlock(net);
764 } else {
765 bp_curr++;
766 bp_curr->primary = bp_temp[pri].secondary;
767 }
768 }
769 327
770 bp_curr++; 328 tipc_bcbase_xmit(net, &xmitq);
771 }
772 329
773 tipc_bclink_unlock(net); 330 /* Any socket wakeup messages ? */
331 if (!skb_queue_empty(inputq))
332 tipc_sk_rcv(net, inputq);
774} 333}
775 334
776static int __tipc_nl_add_bc_link_stat(struct sk_buff *skb, 335static int __tipc_nl_add_bc_link_stat(struct sk_buff *skb,
@@ -836,7 +395,7 @@ int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg)
836 if (!bcl) 395 if (!bcl)
837 return 0; 396 return 0;
838 397
839 tipc_bclink_lock(net); 398 tipc_bcast_lock(net);
840 399
841 hdr = genlmsg_put(msg->skb, msg->portid, msg->seq, &tipc_genl_family, 400 hdr = genlmsg_put(msg->skb, msg->portid, msg->seq, &tipc_genl_family,
842 NLM_F_MULTI, TIPC_NL_LINK_GET); 401 NLM_F_MULTI, TIPC_NL_LINK_GET);
@@ -871,7 +430,7 @@ int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg)
871 if (err) 430 if (err)
872 goto attr_msg_full; 431 goto attr_msg_full;
873 432
874 tipc_bclink_unlock(net); 433 tipc_bcast_unlock(net);
875 nla_nest_end(msg->skb, attrs); 434 nla_nest_end(msg->skb, attrs);
876 genlmsg_end(msg->skb, hdr); 435 genlmsg_end(msg->skb, hdr);
877 436
@@ -882,7 +441,7 @@ prop_msg_full:
882attr_msg_full: 441attr_msg_full:
883 nla_nest_cancel(msg->skb, attrs); 442 nla_nest_cancel(msg->skb, attrs);
884msg_full: 443msg_full:
885 tipc_bclink_unlock(net); 444 tipc_bcast_unlock(net);
886 genlmsg_cancel(msg->skb, hdr); 445 genlmsg_cancel(msg->skb, hdr);
887 446
888 return -EMSGSIZE; 447 return -EMSGSIZE;
@@ -896,26 +455,25 @@ int tipc_bclink_reset_stats(struct net *net)
896 if (!bcl) 455 if (!bcl)
897 return -ENOPROTOOPT; 456 return -ENOPROTOOPT;
898 457
899 tipc_bclink_lock(net); 458 tipc_bcast_lock(net);
900 memset(&bcl->stats, 0, sizeof(bcl->stats)); 459 memset(&bcl->stats, 0, sizeof(bcl->stats));
901 tipc_bclink_unlock(net); 460 tipc_bcast_unlock(net);
902 return 0; 461 return 0;
903} 462}
904 463
905int tipc_bclink_set_queue_limits(struct net *net, u32 limit) 464static int tipc_bc_link_set_queue_limits(struct net *net, u32 limit)
906{ 465{
907 struct tipc_net *tn = net_generic(net, tipc_net_id); 466 struct tipc_link *l = tipc_bc_sndlink(net);
908 struct tipc_link *bcl = tn->bcl;
909 467
910 if (!bcl) 468 if (!l)
911 return -ENOPROTOOPT; 469 return -ENOPROTOOPT;
912 if (limit < BCLINK_WIN_MIN) 470 if (limit < BCLINK_WIN_MIN)
913 limit = BCLINK_WIN_MIN; 471 limit = BCLINK_WIN_MIN;
914 if (limit > TIPC_MAX_LINK_WIN) 472 if (limit > TIPC_MAX_LINK_WIN)
915 return -EINVAL; 473 return -EINVAL;
916 tipc_bclink_lock(net); 474 tipc_bcast_lock(net);
917 tipc_link_set_queue_limits(bcl, limit); 475 tipc_link_set_queue_limits(l, limit);
918 tipc_bclink_unlock(net); 476 tipc_bcast_unlock(net);
919 return 0; 477 return 0;
920} 478}
921 479
@@ -937,123 +495,51 @@ int tipc_nl_bc_link_set(struct net *net, struct nlattr *attrs[])
937 495
938 win = nla_get_u32(props[TIPC_NLA_PROP_WIN]); 496 win = nla_get_u32(props[TIPC_NLA_PROP_WIN]);
939 497
940 return tipc_bclink_set_queue_limits(net, win); 498 return tipc_bc_link_set_queue_limits(net, win);
941} 499}
942 500
943int tipc_bclink_init(struct net *net) 501int tipc_bcast_init(struct net *net)
944{ 502{
945 struct tipc_net *tn = net_generic(net, tipc_net_id); 503 struct tipc_net *tn = tipc_net(net);
946 struct tipc_bcbearer *bcbearer; 504 struct tipc_bc_base *bb = NULL;
947 struct tipc_bclink *bclink; 505 struct tipc_link *l = NULL;
948 struct tipc_link *bcl;
949
950 bcbearer = kzalloc(sizeof(*bcbearer), GFP_ATOMIC);
951 if (!bcbearer)
952 return -ENOMEM;
953
954 bclink = kzalloc(sizeof(*bclink), GFP_ATOMIC);
955 if (!bclink) {
956 kfree(bcbearer);
957 return -ENOMEM;
958 }
959 506
960 bcl = &bclink->link; 507 bb = kzalloc(sizeof(*bb), GFP_ATOMIC);
961 bcbearer->bearer.media = &bcbearer->media; 508 if (!bb)
962 bcbearer->media.send_msg = tipc_bcbearer_send; 509 goto enomem;
963 sprintf(bcbearer->media.name, "tipc-broadcast"); 510 tn->bcbase = bb;
964 511 spin_lock_init(&tipc_net(net)->bclock);
965 spin_lock_init(&bclink->lock);
966 __skb_queue_head_init(&bcl->transmq);
967 __skb_queue_head_init(&bcl->backlogq);
968 __skb_queue_head_init(&bcl->deferdq);
969 skb_queue_head_init(&bcl->wakeupq);
970 bcl->snd_nxt = 1;
971 spin_lock_init(&bclink->node.lock);
972 __skb_queue_head_init(&bclink->arrvq);
973 skb_queue_head_init(&bclink->inputq);
974 bcl->owner = &bclink->node;
975 bcl->owner->net = net;
976 bcl->mtu = MAX_PKT_DEFAULT_MCAST;
977 tipc_link_set_queue_limits(bcl, BCLINK_WIN_DEFAULT);
978 bcl->bearer_id = MAX_BEARERS;
979 rcu_assign_pointer(tn->bearer_list[MAX_BEARERS], &bcbearer->bearer);
980 bcl->pmsg = (struct tipc_msg *)&bcl->proto_msg;
981 msg_set_prevnode(bcl->pmsg, tn->own_addr);
982 strlcpy(bcl->name, tipc_bclink_name, TIPC_MAX_LINK_NAME);
983 tn->bcbearer = bcbearer;
984 tn->bclink = bclink;
985 tn->bcl = bcl;
986 return 0;
987}
988 512
989void tipc_bclink_stop(struct net *net) 513 if (!tipc_link_bc_create(net, 0, 0,
990{ 514 U16_MAX,
991 struct tipc_net *tn = net_generic(net, tipc_net_id); 515 BCLINK_WIN_DEFAULT,
992 516 0,
993 tipc_bclink_lock(net); 517 &bb->inputq,
994 tipc_link_purge_queues(tn->bcl); 518 NULL,
995 tipc_bclink_unlock(net); 519 NULL,
996 520 &l))
997 RCU_INIT_POINTER(tn->bearer_list[BCBEARER], NULL); 521 goto enomem;
998 synchronize_net(); 522 bb->link = l;
999 kfree(tn->bcbearer); 523 tn->bcl = l;
1000 kfree(tn->bclink); 524 return 0;
525enomem:
526 kfree(bb);
527 kfree(l);
528 return -ENOMEM;
1001} 529}
1002 530
1003/** 531void tipc_bcast_reinit(struct net *net)
1004 * tipc_nmap_add - add a node to a node map
1005 */
1006static void tipc_nmap_add(struct tipc_node_map *nm_ptr, u32 node)
1007{ 532{
1008 int n = tipc_node(node); 533 struct tipc_bc_base *b = tipc_bc_base(net);
1009 int w = n / WSIZE;
1010 u32 mask = (1 << (n % WSIZE));
1011 534
1012 if ((nm_ptr->map[w] & mask) == 0) { 535 msg_set_prevnode(b->link->pmsg, tipc_own_addr(net));
1013 nm_ptr->count++;
1014 nm_ptr->map[w] |= mask;
1015 }
1016} 536}
1017 537
1018/** 538void tipc_bcast_stop(struct net *net)
1019 * tipc_nmap_remove - remove a node from a node map
1020 */
1021static void tipc_nmap_remove(struct tipc_node_map *nm_ptr, u32 node)
1022{ 539{
1023 int n = tipc_node(node); 540 struct tipc_net *tn = net_generic(net, tipc_net_id);
1024 int w = n / WSIZE;
1025 u32 mask = (1 << (n % WSIZE));
1026
1027 if ((nm_ptr->map[w] & mask) != 0) {
1028 nm_ptr->map[w] &= ~mask;
1029 nm_ptr->count--;
1030 }
1031}
1032 541
1033/** 542 synchronize_net();
1034 * tipc_nmap_diff - find differences between node maps 543 kfree(tn->bcbase);
1035 * @nm_a: input node map A 544 kfree(tn->bcl);
1036 * @nm_b: input node map B
1037 * @nm_diff: output node map A-B (i.e. nodes of A that are not in B)
1038 */
1039static void tipc_nmap_diff(struct tipc_node_map *nm_a,
1040 struct tipc_node_map *nm_b,
1041 struct tipc_node_map *nm_diff)
1042{
1043 int stop = ARRAY_SIZE(nm_a->map);
1044 int w;
1045 int b;
1046 u32 map;
1047
1048 memset(nm_diff, 0, sizeof(*nm_diff));
1049 for (w = 0; w < stop; w++) {
1050 map = nm_a->map[w] ^ (nm_a->map[w] & nm_b->map[w]);
1051 nm_diff->map[w] = map;
1052 if (map != 0) {
1053 for (b = 0 ; b < WSIZE; b++) {
1054 if (map & (1 << b))
1055 nm_diff->count++;
1056 }
1057 }
1058 }
1059} 545}
diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h
index d74c69bcf60b..2855b9356a15 100644
--- a/net/tipc/bcast.h
+++ b/net/tipc/bcast.h
@@ -37,102 +37,44 @@
37#ifndef _TIPC_BCAST_H 37#ifndef _TIPC_BCAST_H
38#define _TIPC_BCAST_H 38#define _TIPC_BCAST_H
39 39
40#include <linux/tipc_config.h> 40#include "core.h"
41#include "link.h"
42#include "node.h"
43 41
44/** 42struct tipc_node;
45 * struct tipc_bcbearer_pair - a pair of bearers used by broadcast link 43struct tipc_msg;
46 * @primary: pointer to primary bearer 44struct tipc_nl_msg;
47 * @secondary: pointer to secondary bearer 45struct tipc_node_map;
48 *
49 * Bearers must have same priority and same set of reachable destinations
50 * to be paired.
51 */
52
53struct tipc_bcbearer_pair {
54 struct tipc_bearer *primary;
55 struct tipc_bearer *secondary;
56};
57
58#define BCBEARER MAX_BEARERS
59
60/**
61 * struct tipc_bcbearer - bearer used by broadcast link
62 * @bearer: (non-standard) broadcast bearer structure
63 * @media: (non-standard) broadcast media structure
64 * @bpairs: array of bearer pairs
65 * @bpairs_temp: temporary array of bearer pairs used by tipc_bcbearer_sort()
66 * @remains: temporary node map used by tipc_bcbearer_send()
67 * @remains_new: temporary node map used tipc_bcbearer_send()
68 *
69 * Note: The fields labelled "temporary" are incorporated into the bearer
70 * to avoid consuming potentially limited stack space through the use of
71 * large local variables within multicast routines. Concurrent access is
72 * prevented through use of the spinlock "bclink_lock".
73 */
74struct tipc_bcbearer {
75 struct tipc_bearer bearer;
76 struct tipc_media media;
77 struct tipc_bcbearer_pair bpairs[MAX_BEARERS];
78 struct tipc_bcbearer_pair bpairs_temp[TIPC_MAX_LINK_PRI + 1];
79 struct tipc_node_map remains;
80 struct tipc_node_map remains_new;
81};
82 46
83/** 47int tipc_bcast_init(struct net *net);
84 * struct tipc_bclink - link used for broadcast messages 48void tipc_bcast_reinit(struct net *net);
85 * @lock: spinlock governing access to structure 49void tipc_bcast_stop(struct net *net);
86 * @link: (non-standard) broadcast link structure 50void tipc_bcast_add_peer(struct net *net, struct tipc_link *l,
87 * @node: (non-standard) node structure representing b'cast link's peer node 51 struct sk_buff_head *xmitq);
88 * @bcast_nodes: map of broadcast-capable nodes 52void tipc_bcast_remove_peer(struct net *net, struct tipc_link *rcv_bcl);
89 * @retransmit_to: node that most recently requested a retransmit 53void tipc_bcast_inc_bearer_dst_cnt(struct net *net, int bearer_id);
90 * 54void tipc_bcast_dec_bearer_dst_cnt(struct net *net, int bearer_id);
91 * Handles sequence numbering, fragmentation, bundling, etc. 55int tipc_bcast_get_mtu(struct net *net);
92 */ 56int tipc_bcast_xmit(struct net *net, struct sk_buff_head *list);
93struct tipc_bclink { 57int tipc_bcast_rcv(struct net *net, struct tipc_link *l, struct sk_buff *skb);
94 spinlock_t lock; 58void tipc_bcast_ack_rcv(struct net *net, struct tipc_link *l, u32 acked);
95 struct tipc_link link; 59void tipc_bcast_sync_rcv(struct net *net, struct tipc_link *l,
96 struct tipc_node node; 60 struct tipc_msg *hdr);
97 struct sk_buff_head arrvq; 61int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg);
98 struct sk_buff_head inputq; 62int tipc_nl_bc_link_set(struct net *net, struct nlattr *attrs[]);
99 struct tipc_node_map bcast_nodes; 63int tipc_bclink_reset_stats(struct net *net);
100 struct tipc_node *retransmit_to;
101};
102 64
103struct tipc_node; 65static inline void tipc_bcast_lock(struct net *net)
104extern const char tipc_bclink_name[]; 66{
67 spin_lock_bh(&tipc_net(net)->bclock);
68}
105 69
106/** 70static inline void tipc_bcast_unlock(struct net *net)
107 * tipc_nmap_equal - test for equality of node maps
108 */
109static inline int tipc_nmap_equal(struct tipc_node_map *nm_a,
110 struct tipc_node_map *nm_b)
111{ 71{
112 return !memcmp(nm_a, nm_b, sizeof(*nm_a)); 72 spin_unlock_bh(&tipc_net(net)->bclock);
113} 73}
114 74
115int tipc_bclink_init(struct net *net); 75static inline struct tipc_link *tipc_bc_sndlink(struct net *net)
116void tipc_bclink_stop(struct net *net); 76{
117void tipc_bclink_add_node(struct net *net, u32 addr); 77 return tipc_net(net)->bcl;
118void tipc_bclink_remove_node(struct net *net, u32 addr); 78}
119struct tipc_node *tipc_bclink_retransmit_to(struct net *tn);
120void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked);
121void tipc_bclink_rcv(struct net *net, struct sk_buff *buf);
122u32 tipc_bclink_get_last_sent(struct net *net);
123u32 tipc_bclink_acks_missing(struct tipc_node *n_ptr);
124void tipc_bclink_update_link_state(struct tipc_node *node,
125 u32 last_sent);
126int tipc_bclink_reset_stats(struct net *net);
127int tipc_bclink_set_queue_limits(struct net *net, u32 limit);
128void tipc_bcbearer_sort(struct net *net, struct tipc_node_map *nm_ptr,
129 u32 node, bool action);
130uint tipc_bclink_get_mtu(void);
131int tipc_bclink_xmit(struct net *net, struct sk_buff_head *list);
132void tipc_bclink_wakeup_users(struct net *net);
133int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg);
134int tipc_nl_bc_link_set(struct net *net, struct nlattr *attrs[]);
135void tipc_bclink_input(struct net *net);
136void tipc_bclink_sync_state(struct tipc_node *n, struct tipc_msg *msg);
137 79
138#endif 80#endif
diff --git a/net/tipc/bearer.c b/net/tipc/bearer.c
index 82b278668ab7..648f2a67f314 100644
--- a/net/tipc/bearer.c
+++ b/net/tipc/bearer.c
@@ -193,10 +193,8 @@ void tipc_bearer_add_dest(struct net *net, u32 bearer_id, u32 dest)
193 193
194 rcu_read_lock(); 194 rcu_read_lock();
195 b_ptr = rcu_dereference_rtnl(tn->bearer_list[bearer_id]); 195 b_ptr = rcu_dereference_rtnl(tn->bearer_list[bearer_id]);
196 if (b_ptr) { 196 if (b_ptr)
197 tipc_bcbearer_sort(net, &b_ptr->nodes, dest, true);
198 tipc_disc_add_dest(b_ptr->link_req); 197 tipc_disc_add_dest(b_ptr->link_req);
199 }
200 rcu_read_unlock(); 198 rcu_read_unlock();
201} 199}
202 200
@@ -207,10 +205,8 @@ void tipc_bearer_remove_dest(struct net *net, u32 bearer_id, u32 dest)
207 205
208 rcu_read_lock(); 206 rcu_read_lock();
209 b_ptr = rcu_dereference_rtnl(tn->bearer_list[bearer_id]); 207 b_ptr = rcu_dereference_rtnl(tn->bearer_list[bearer_id]);
210 if (b_ptr) { 208 if (b_ptr)
211 tipc_bcbearer_sort(net, &b_ptr->nodes, dest, false);
212 tipc_disc_remove_dest(b_ptr->link_req); 209 tipc_disc_remove_dest(b_ptr->link_req);
213 }
214 rcu_read_unlock(); 210 rcu_read_unlock();
215} 211}
216 212
@@ -418,10 +414,9 @@ void tipc_disable_l2_media(struct tipc_bearer *b)
418 * @b_ptr: the bearer through which the packet is to be sent 414 * @b_ptr: the bearer through which the packet is to be sent
419 * @dest: peer destination address 415 * @dest: peer destination address
420 */ 416 */
421int tipc_l2_send_msg(struct net *net, struct sk_buff *buf, 417int tipc_l2_send_msg(struct net *net, struct sk_buff *skb,
422 struct tipc_bearer *b, struct tipc_media_addr *dest) 418 struct tipc_bearer *b, struct tipc_media_addr *dest)
423{ 419{
424 struct sk_buff *clone;
425 struct net_device *dev; 420 struct net_device *dev;
426 int delta; 421 int delta;
427 422
@@ -429,42 +424,48 @@ int tipc_l2_send_msg(struct net *net, struct sk_buff *buf,
429 if (!dev) 424 if (!dev)
430 return 0; 425 return 0;
431 426
432 clone = skb_clone(buf, GFP_ATOMIC); 427 delta = dev->hard_header_len - skb_headroom(skb);
433 if (!clone)
434 return 0;
435
436 delta = dev->hard_header_len - skb_headroom(buf);
437 if ((delta > 0) && 428 if ((delta > 0) &&
438 pskb_expand_head(clone, SKB_DATA_ALIGN(delta), 0, GFP_ATOMIC)) { 429 pskb_expand_head(skb, SKB_DATA_ALIGN(delta), 0, GFP_ATOMIC)) {
439 kfree_skb(clone); 430 kfree_skb(skb);
440 return 0; 431 return 0;
441 } 432 }
442 433
443 skb_reset_network_header(clone); 434 skb_reset_network_header(skb);
444 clone->dev = dev; 435 skb->dev = dev;
445 clone->protocol = htons(ETH_P_TIPC); 436 skb->protocol = htons(ETH_P_TIPC);
446 dev_hard_header(clone, dev, ETH_P_TIPC, dest->value, 437 dev_hard_header(skb, dev, ETH_P_TIPC, dest->value,
447 dev->dev_addr, clone->len); 438 dev->dev_addr, skb->len);
448 dev_queue_xmit(clone); 439 dev_queue_xmit(skb);
449 return 0; 440 return 0;
450} 441}
451 442
452/* tipc_bearer_send- sends buffer to destination over bearer 443int tipc_bearer_mtu(struct net *net, u32 bearer_id)
453 * 444{
454 * IMPORTANT: 445 int mtu = 0;
455 * The media send routine must not alter the buffer being passed in 446 struct tipc_bearer *b;
456 * as it may be needed for later retransmission! 447
448 rcu_read_lock();
449 b = rcu_dereference_rtnl(tipc_net(net)->bearer_list[bearer_id]);
450 if (b)
451 mtu = b->mtu;
452 rcu_read_unlock();
453 return mtu;
454}
455
456/* tipc_bearer_xmit_skb - sends buffer to destination over bearer
457 */ 457 */
458void tipc_bearer_send(struct net *net, u32 bearer_id, struct sk_buff *buf, 458void tipc_bearer_xmit_skb(struct net *net, u32 bearer_id,
459 struct tipc_media_addr *dest) 459 struct sk_buff *skb,
460 struct tipc_media_addr *dest)
460{ 461{
461 struct tipc_net *tn = net_generic(net, tipc_net_id); 462 struct tipc_net *tn = tipc_net(net);
462 struct tipc_bearer *b_ptr; 463 struct tipc_bearer *b;
463 464
464 rcu_read_lock(); 465 rcu_read_lock();
465 b_ptr = rcu_dereference_rtnl(tn->bearer_list[bearer_id]); 466 b = rcu_dereference_rtnl(tn->bearer_list[bearer_id]);
466 if (likely(b_ptr)) 467 if (likely(b))
467 b_ptr->media->send_msg(net, buf, b_ptr, dest); 468 b->media->send_msg(net, skb, b, dest);
468 rcu_read_unlock(); 469 rcu_read_unlock();
469} 470}
470 471
@@ -487,8 +488,31 @@ void tipc_bearer_xmit(struct net *net, u32 bearer_id,
487 skb_queue_walk_safe(xmitq, skb, tmp) { 488 skb_queue_walk_safe(xmitq, skb, tmp) {
488 __skb_dequeue(xmitq); 489 __skb_dequeue(xmitq);
489 b->media->send_msg(net, skb, b, dst); 490 b->media->send_msg(net, skb, b, dst);
490 /* Until we remove cloning in tipc_l2_send_msg(): */ 491 }
491 kfree_skb(skb); 492 }
493 rcu_read_unlock();
494}
495
496/* tipc_bearer_bc_xmit() - broadcast buffers to all destinations
497 */
498void tipc_bearer_bc_xmit(struct net *net, u32 bearer_id,
499 struct sk_buff_head *xmitq)
500{
501 struct tipc_net *tn = tipc_net(net);
502 int net_id = tn->net_id;
503 struct tipc_bearer *b;
504 struct sk_buff *skb, *tmp;
505 struct tipc_msg *hdr;
506
507 rcu_read_lock();
508 b = rcu_dereference_rtnl(tn->bearer_list[bearer_id]);
509 if (likely(b)) {
510 skb_queue_walk_safe(xmitq, skb, tmp) {
511 hdr = buf_msg(skb);
512 msg_set_non_seq(hdr, 1);
513 msg_set_mc_netid(hdr, net_id);
514 __skb_dequeue(xmitq);
515 b->media->send_msg(net, skb, b, &b->bcast_addr);
492 } 516 }
493 } 517 }
494 rcu_read_unlock(); 518 rcu_read_unlock();
diff --git a/net/tipc/bearer.h b/net/tipc/bearer.h
index 6426f242f626..552185bc4773 100644
--- a/net/tipc/bearer.h
+++ b/net/tipc/bearer.h
@@ -163,6 +163,7 @@ struct tipc_bearer {
163 u32 identity; 163 u32 identity;
164 struct tipc_link_req *link_req; 164 struct tipc_link_req *link_req;
165 char net_plane; 165 char net_plane;
166 int node_cnt;
166 struct tipc_node_map nodes; 167 struct tipc_node_map nodes;
167}; 168};
168 169
@@ -215,10 +216,14 @@ struct tipc_media *tipc_media_find(const char *name);
215int tipc_bearer_setup(void); 216int tipc_bearer_setup(void);
216void tipc_bearer_cleanup(void); 217void tipc_bearer_cleanup(void);
217void tipc_bearer_stop(struct net *net); 218void tipc_bearer_stop(struct net *net);
218void tipc_bearer_send(struct net *net, u32 bearer_id, struct sk_buff *buf, 219int tipc_bearer_mtu(struct net *net, u32 bearer_id);
219 struct tipc_media_addr *dest); 220void tipc_bearer_xmit_skb(struct net *net, u32 bearer_id,
221 struct sk_buff *skb,
222 struct tipc_media_addr *dest);
220void tipc_bearer_xmit(struct net *net, u32 bearer_id, 223void tipc_bearer_xmit(struct net *net, u32 bearer_id,
221 struct sk_buff_head *xmitq, 224 struct sk_buff_head *xmitq,
222 struct tipc_media_addr *dst); 225 struct tipc_media_addr *dst);
226void tipc_bearer_bc_xmit(struct net *net, u32 bearer_id,
227 struct sk_buff_head *xmitq);
223 228
224#endif /* _TIPC_BEARER_H */ 229#endif /* _TIPC_BEARER_H */
diff --git a/net/tipc/core.c b/net/tipc/core.c
index 005ba5eb0ea4..03a842870c52 100644
--- a/net/tipc/core.c
+++ b/net/tipc/core.c
@@ -42,6 +42,7 @@
42#include "bearer.h" 42#include "bearer.h"
43#include "net.h" 43#include "net.h"
44#include "socket.h" 44#include "socket.h"
45#include "bcast.h"
45 46
46#include <linux/module.h> 47#include <linux/module.h>
47 48
@@ -71,8 +72,15 @@ static int __net_init tipc_init_net(struct net *net)
71 err = tipc_topsrv_start(net); 72 err = tipc_topsrv_start(net);
72 if (err) 73 if (err)
73 goto out_subscr; 74 goto out_subscr;
75
76 err = tipc_bcast_init(net);
77 if (err)
78 goto out_bclink;
79
74 return 0; 80 return 0;
75 81
82out_bclink:
83 tipc_bcast_stop(net);
76out_subscr: 84out_subscr:
77 tipc_nametbl_stop(net); 85 tipc_nametbl_stop(net);
78out_nametbl: 86out_nametbl:
@@ -85,6 +93,7 @@ static void __net_exit tipc_exit_net(struct net *net)
85{ 93{
86 tipc_topsrv_stop(net); 94 tipc_topsrv_stop(net);
87 tipc_net_stop(net); 95 tipc_net_stop(net);
96 tipc_bcast_stop(net);
88 tipc_nametbl_stop(net); 97 tipc_nametbl_stop(net);
89 tipc_sk_rht_destroy(net); 98 tipc_sk_rht_destroy(net);
90} 99}
diff --git a/net/tipc/core.h b/net/tipc/core.h
index b96b41eabf12..18e95a8020cd 100644
--- a/net/tipc/core.h
+++ b/net/tipc/core.h
@@ -62,8 +62,7 @@
62 62
63struct tipc_node; 63struct tipc_node;
64struct tipc_bearer; 64struct tipc_bearer;
65struct tipc_bcbearer; 65struct tipc_bc_base;
66struct tipc_bclink;
67struct tipc_link; 66struct tipc_link;
68struct tipc_name_table; 67struct tipc_name_table;
69struct tipc_server; 68struct tipc_server;
@@ -93,8 +92,8 @@ struct tipc_net {
93 struct tipc_bearer __rcu *bearer_list[MAX_BEARERS + 1]; 92 struct tipc_bearer __rcu *bearer_list[MAX_BEARERS + 1];
94 93
95 /* Broadcast link */ 94 /* Broadcast link */
96 struct tipc_bcbearer *bcbearer; 95 spinlock_t bclock;
97 struct tipc_bclink *bclink; 96 struct tipc_bc_base *bcbase;
98 struct tipc_link *bcl; 97 struct tipc_link *bcl;
99 98
100 /* Socket hash table */ 99 /* Socket hash table */
@@ -114,6 +113,11 @@ static inline struct tipc_net *tipc_net(struct net *net)
114 return net_generic(net, tipc_net_id); 113 return net_generic(net, tipc_net_id);
115} 114}
116 115
116static inline int tipc_netid(struct net *net)
117{
118 return tipc_net(net)->net_id;
119}
120
117static inline u16 mod(u16 x) 121static inline u16 mod(u16 x)
118{ 122{
119 return x & 0xffffu; 123 return x & 0xffffu;
diff --git a/net/tipc/discover.c b/net/tipc/discover.c
index d14e0a4aa9af..afe8c47c4085 100644
--- a/net/tipc/discover.c
+++ b/net/tipc/discover.c
@@ -89,7 +89,7 @@ static void tipc_disc_init_msg(struct net *net, struct sk_buff *buf, u32 type,
89 MAX_H_SIZE, dest_domain); 89 MAX_H_SIZE, dest_domain);
90 msg_set_non_seq(msg, 1); 90 msg_set_non_seq(msg, 1);
91 msg_set_node_sig(msg, tn->random); 91 msg_set_node_sig(msg, tn->random);
92 msg_set_node_capabilities(msg, 0); 92 msg_set_node_capabilities(msg, TIPC_NODE_CAPABILITIES);
93 msg_set_dest_domain(msg, dest_domain); 93 msg_set_dest_domain(msg, dest_domain);
94 msg_set_bc_netid(msg, tn->net_id); 94 msg_set_bc_netid(msg, tn->net_id);
95 b_ptr->media->addr2msg(msg_media_addr(msg), &b_ptr->addr); 95 b_ptr->media->addr2msg(msg_media_addr(msg), &b_ptr->addr);
@@ -167,11 +167,10 @@ void tipc_disc_rcv(struct net *net, struct sk_buff *skb,
167 /* Send response, if necessary */ 167 /* Send response, if necessary */
168 if (respond && (mtyp == DSC_REQ_MSG)) { 168 if (respond && (mtyp == DSC_REQ_MSG)) {
169 rskb = tipc_buf_acquire(MAX_H_SIZE); 169 rskb = tipc_buf_acquire(MAX_H_SIZE);
170 if (rskb) { 170 if (!rskb)
171 tipc_disc_init_msg(net, rskb, DSC_RESP_MSG, bearer); 171 return;
172 tipc_bearer_send(net, bearer->identity, rskb, &maddr); 172 tipc_disc_init_msg(net, rskb, DSC_RESP_MSG, bearer);
173 kfree_skb(rskb); 173 tipc_bearer_xmit_skb(net, bearer->identity, rskb, &maddr);
174 }
175 } 174 }
176} 175}
177 176
@@ -225,6 +224,7 @@ void tipc_disc_remove_dest(struct tipc_link_req *req)
225static void disc_timeout(unsigned long data) 224static void disc_timeout(unsigned long data)
226{ 225{
227 struct tipc_link_req *req = (struct tipc_link_req *)data; 226 struct tipc_link_req *req = (struct tipc_link_req *)data;
227 struct sk_buff *skb;
228 int max_delay; 228 int max_delay;
229 229
230 spin_lock_bh(&req->lock); 230 spin_lock_bh(&req->lock);
@@ -242,9 +242,9 @@ static void disc_timeout(unsigned long data)
242 * hold at fast polling rate if don't have any associated nodes, 242 * hold at fast polling rate if don't have any associated nodes,
243 * otherwise hold at slow polling rate 243 * otherwise hold at slow polling rate
244 */ 244 */
245 tipc_bearer_send(req->net, req->bearer_id, req->buf, &req->dest); 245 skb = skb_clone(req->buf, GFP_ATOMIC);
246 246 if (skb)
247 247 tipc_bearer_xmit_skb(req->net, req->bearer_id, skb, &req->dest);
248 req->timer_intv *= 2; 248 req->timer_intv *= 2;
249 if (req->num_nodes) 249 if (req->num_nodes)
250 max_delay = TIPC_LINK_REQ_SLOW; 250 max_delay = TIPC_LINK_REQ_SLOW;
@@ -271,6 +271,7 @@ int tipc_disc_create(struct net *net, struct tipc_bearer *b_ptr,
271 struct tipc_media_addr *dest) 271 struct tipc_media_addr *dest)
272{ 272{
273 struct tipc_link_req *req; 273 struct tipc_link_req *req;
274 struct sk_buff *skb;
274 275
275 req = kmalloc(sizeof(*req), GFP_ATOMIC); 276 req = kmalloc(sizeof(*req), GFP_ATOMIC);
276 if (!req) 277 if (!req)
@@ -292,7 +293,9 @@ int tipc_disc_create(struct net *net, struct tipc_bearer *b_ptr,
292 setup_timer(&req->timer, disc_timeout, (unsigned long)req); 293 setup_timer(&req->timer, disc_timeout, (unsigned long)req);
293 mod_timer(&req->timer, jiffies + req->timer_intv); 294 mod_timer(&req->timer, jiffies + req->timer_intv);
294 b_ptr->link_req = req; 295 b_ptr->link_req = req;
295 tipc_bearer_send(net, req->bearer_id, req->buf, &req->dest); 296 skb = skb_clone(req->buf, GFP_ATOMIC);
297 if (skb)
298 tipc_bearer_xmit_skb(net, req->bearer_id, skb, &req->dest);
296 return 0; 299 return 0;
297} 300}
298 301
@@ -316,6 +319,7 @@ void tipc_disc_delete(struct tipc_link_req *req)
316void tipc_disc_reset(struct net *net, struct tipc_bearer *b_ptr) 319void tipc_disc_reset(struct net *net, struct tipc_bearer *b_ptr)
317{ 320{
318 struct tipc_link_req *req = b_ptr->link_req; 321 struct tipc_link_req *req = b_ptr->link_req;
322 struct sk_buff *skb;
319 323
320 spin_lock_bh(&req->lock); 324 spin_lock_bh(&req->lock);
321 tipc_disc_init_msg(net, req->buf, DSC_REQ_MSG, b_ptr); 325 tipc_disc_init_msg(net, req->buf, DSC_REQ_MSG, b_ptr);
@@ -325,6 +329,8 @@ void tipc_disc_reset(struct net *net, struct tipc_bearer *b_ptr)
325 req->num_nodes = 0; 329 req->num_nodes = 0;
326 req->timer_intv = TIPC_LINK_REQ_INIT; 330 req->timer_intv = TIPC_LINK_REQ_INIT;
327 mod_timer(&req->timer, jiffies + req->timer_intv); 331 mod_timer(&req->timer, jiffies + req->timer_intv);
328 tipc_bearer_send(net, req->bearer_id, req->buf, &req->dest); 332 skb = skb_clone(req->buf, GFP_ATOMIC);
333 if (skb)
334 tipc_bearer_xmit_skb(net, req->bearer_id, skb, &req->dest);
329 spin_unlock_bh(&req->lock); 335 spin_unlock_bh(&req->lock);
330} 336}
diff --git a/net/tipc/link.c b/net/tipc/link.c
index ff9b0b92e62e..4449fa01e232 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -50,6 +50,7 @@
50 */ 50 */
51static const char *link_co_err = "Link tunneling error, "; 51static const char *link_co_err = "Link tunneling error, ";
52static const char *link_rst_msg = "Resetting link "; 52static const char *link_rst_msg = "Resetting link ";
53static const char tipc_bclink_name[] = "broadcast-link";
53 54
54static const struct nla_policy tipc_nl_link_policy[TIPC_NLA_LINK_MAX + 1] = { 55static const struct nla_policy tipc_nl_link_policy[TIPC_NLA_LINK_MAX + 1] = {
55 [TIPC_NLA_LINK_UNSPEC] = { .type = NLA_UNSPEC }, 56 [TIPC_NLA_LINK_UNSPEC] = { .type = NLA_UNSPEC },
@@ -75,6 +76,14 @@ static const struct nla_policy tipc_nl_prop_policy[TIPC_NLA_PROP_MAX + 1] = {
75 [TIPC_NLA_PROP_WIN] = { .type = NLA_U32 } 76 [TIPC_NLA_PROP_WIN] = { .type = NLA_U32 }
76}; 77};
77 78
79/* Send states for broadcast NACKs
80 */
81enum {
82 BC_NACK_SND_CONDITIONAL,
83 BC_NACK_SND_UNCONDITIONAL,
84 BC_NACK_SND_SUPPRESS,
85};
86
78/* 87/*
79 * Interval between NACKs when packets arrive out of order 88 * Interval between NACKs when packets arrive out of order
80 */ 89 */
@@ -110,7 +119,11 @@ static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe,
110 struct sk_buff_head *xmitq); 119 struct sk_buff_head *xmitq);
111static void link_reset_statistics(struct tipc_link *l_ptr); 120static void link_reset_statistics(struct tipc_link *l_ptr);
112static void link_print(struct tipc_link *l_ptr, const char *str); 121static void link_print(struct tipc_link *l_ptr, const char *str);
113static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf); 122static void tipc_link_build_nack_msg(struct tipc_link *l,
123 struct sk_buff_head *xmitq);
124static void tipc_link_build_bc_init_msg(struct tipc_link *l,
125 struct sk_buff_head *xmitq);
126static bool tipc_link_release_pkts(struct tipc_link *l, u16 to);
114 127
115/* 128/*
116 * Simple non-static link routines (i.e. referenced outside this file) 129 * Simple non-static link routines (i.e. referenced outside this file)
@@ -150,11 +163,66 @@ bool tipc_link_is_blocked(struct tipc_link *l)
150 return l->state & (LINK_RESETTING | LINK_PEER_RESET | LINK_FAILINGOVER); 163 return l->state & (LINK_RESETTING | LINK_PEER_RESET | LINK_FAILINGOVER);
151} 164}
152 165
166bool link_is_bc_sndlink(struct tipc_link *l)
167{
168 return !l->bc_sndlink;
169}
170
171bool link_is_bc_rcvlink(struct tipc_link *l)
172{
173 return ((l->bc_rcvlink == l) && !link_is_bc_sndlink(l));
174}
175
153int tipc_link_is_active(struct tipc_link *l) 176int tipc_link_is_active(struct tipc_link *l)
154{ 177{
155 struct tipc_node *n = l->owner; 178 return l->active;
179}
180
181void tipc_link_set_active(struct tipc_link *l, bool active)
182{
183 l->active = active;
184}
185
186void tipc_link_add_bc_peer(struct tipc_link *snd_l,
187 struct tipc_link *uc_l,
188 struct sk_buff_head *xmitq)
189{
190 struct tipc_link *rcv_l = uc_l->bc_rcvlink;
156 191
157 return (node_active_link(n, 0) == l) || (node_active_link(n, 1) == l); 192 snd_l->ackers++;
193 rcv_l->acked = snd_l->snd_nxt - 1;
194 tipc_link_build_bc_init_msg(uc_l, xmitq);
195}
196
197void tipc_link_remove_bc_peer(struct tipc_link *snd_l,
198 struct tipc_link *rcv_l,
199 struct sk_buff_head *xmitq)
200{
201 u16 ack = snd_l->snd_nxt - 1;
202
203 snd_l->ackers--;
204 tipc_link_bc_ack_rcv(rcv_l, ack, xmitq);
205 tipc_link_reset(rcv_l);
206 rcv_l->state = LINK_RESET;
207 if (!snd_l->ackers) {
208 tipc_link_reset(snd_l);
209 __skb_queue_purge(xmitq);
210 }
211}
212
213int tipc_link_bc_peers(struct tipc_link *l)
214{
215 return l->ackers;
216}
217
218void tipc_link_set_mtu(struct tipc_link *l, int mtu)
219{
220 l->mtu = mtu;
221}
222
223int tipc_link_mtu(struct tipc_link *l)
224{
225 return l->mtu;
158} 226}
159 227
160static u32 link_own_addr(struct tipc_link *l) 228static u32 link_own_addr(struct tipc_link *l)
@@ -165,57 +233,72 @@ static u32 link_own_addr(struct tipc_link *l)
165/** 233/**
166 * tipc_link_create - create a new link 234 * tipc_link_create - create a new link
167 * @n: pointer to associated node 235 * @n: pointer to associated node
168 * @b: pointer to associated bearer 236 * @if_name: associated interface name
237 * @bearer_id: id (index) of associated bearer
238 * @tolerance: link tolerance to be used by link
239 * @net_plane: network plane (A,B,c..) this link belongs to
240 * @mtu: mtu to be advertised by link
241 * @priority: priority to be used by link
242 * @window: send window to be used by link
243 * @session: session to be used by link
169 * @ownnode: identity of own node 244 * @ownnode: identity of own node
170 * @peer: identity of peer node 245 * @peer: node id of peer node
171 * @maddr: media address to be used 246 * @peer_caps: bitmap describing peer node capabilities
247 * @bc_sndlink: the namespace global link used for broadcast sending
248 * @bc_rcvlink: the peer specific link used for broadcast reception
172 * @inputq: queue to put messages ready for delivery 249 * @inputq: queue to put messages ready for delivery
173 * @namedq: queue to put binding table update messages ready for delivery 250 * @namedq: queue to put binding table update messages ready for delivery
174 * @link: return value, pointer to put the created link 251 * @link: return value, pointer to put the created link
175 * 252 *
176 * Returns true if link was created, otherwise false 253 * Returns true if link was created, otherwise false
177 */ 254 */
178bool tipc_link_create(struct tipc_node *n, struct tipc_bearer *b, u32 session, 255bool tipc_link_create(struct net *net, char *if_name, int bearer_id,
179 u32 ownnode, u32 peer, struct tipc_media_addr *maddr, 256 int tolerance, char net_plane, u32 mtu, int priority,
180 struct sk_buff_head *inputq, struct sk_buff_head *namedq, 257 int window, u32 session, u32 ownnode, u32 peer,
258 u16 peer_caps,
259 struct tipc_link *bc_sndlink,
260 struct tipc_link *bc_rcvlink,
261 struct sk_buff_head *inputq,
262 struct sk_buff_head *namedq,
181 struct tipc_link **link) 263 struct tipc_link **link)
182{ 264{
183 struct tipc_link *l; 265 struct tipc_link *l;
184 struct tipc_msg *hdr; 266 struct tipc_msg *hdr;
185 char *if_name;
186 267
187 l = kzalloc(sizeof(*l), GFP_ATOMIC); 268 l = kzalloc(sizeof(*l), GFP_ATOMIC);
188 if (!l) 269 if (!l)
189 return false; 270 return false;
190 *link = l; 271 *link = l;
272 l->pmsg = (struct tipc_msg *)&l->proto_msg;
273 hdr = l->pmsg;
274 tipc_msg_init(ownnode, hdr, LINK_PROTOCOL, RESET_MSG, INT_H_SIZE, peer);
275 msg_set_size(hdr, sizeof(l->proto_msg));
276 msg_set_session(hdr, session);
277 msg_set_bearer_id(hdr, l->bearer_id);
191 278
192 /* Note: peer i/f name is completed by reset/activate message */ 279 /* Note: peer i/f name is completed by reset/activate message */
193 if_name = strchr(b->name, ':') + 1;
194 sprintf(l->name, "%u.%u.%u:%s-%u.%u.%u:unknown", 280 sprintf(l->name, "%u.%u.%u:%s-%u.%u.%u:unknown",
195 tipc_zone(ownnode), tipc_cluster(ownnode), tipc_node(ownnode), 281 tipc_zone(ownnode), tipc_cluster(ownnode), tipc_node(ownnode),
196 if_name, tipc_zone(peer), tipc_cluster(peer), tipc_node(peer)); 282 if_name, tipc_zone(peer), tipc_cluster(peer), tipc_node(peer));
283 strcpy((char *)msg_data(hdr), if_name);
197 284
198 l->addr = peer; 285 l->addr = peer;
199 l->media_addr = maddr; 286 l->peer_caps = peer_caps;
200 l->owner = n; 287 l->net = net;
201 l->peer_session = WILDCARD_SESSION; 288 l->peer_session = WILDCARD_SESSION;
202 l->bearer_id = b->identity; 289 l->bearer_id = bearer_id;
203 l->tolerance = b->tolerance; 290 l->tolerance = tolerance;
204 l->net_plane = b->net_plane; 291 l->net_plane = net_plane;
205 l->advertised_mtu = b->mtu; 292 l->advertised_mtu = mtu;
206 l->mtu = b->mtu; 293 l->mtu = mtu;
207 l->priority = b->priority; 294 l->priority = priority;
208 tipc_link_set_queue_limits(l, b->window); 295 tipc_link_set_queue_limits(l, window);
296 l->ackers = 1;
297 l->bc_sndlink = bc_sndlink;
298 l->bc_rcvlink = bc_rcvlink;
209 l->inputq = inputq; 299 l->inputq = inputq;
210 l->namedq = namedq; 300 l->namedq = namedq;
211 l->state = LINK_RESETTING; 301 l->state = LINK_RESETTING;
212 l->pmsg = (struct tipc_msg *)&l->proto_msg;
213 hdr = l->pmsg;
214 tipc_msg_init(ownnode, hdr, LINK_PROTOCOL, RESET_MSG, INT_H_SIZE, peer);
215 msg_set_size(hdr, sizeof(l->proto_msg));
216 msg_set_session(hdr, session);
217 msg_set_bearer_id(hdr, l->bearer_id);
218 strcpy((char *)msg_data(hdr), if_name);
219 __skb_queue_head_init(&l->transmq); 302 __skb_queue_head_init(&l->transmq);
220 __skb_queue_head_init(&l->backlogq); 303 __skb_queue_head_init(&l->backlogq);
221 __skb_queue_head_init(&l->deferdq); 304 __skb_queue_head_init(&l->deferdq);
@@ -224,27 +307,43 @@ bool tipc_link_create(struct tipc_node *n, struct tipc_bearer *b, u32 session,
224 return true; 307 return true;
225} 308}
226 309
227/* tipc_link_build_bcast_sync_msg() - synchronize broadcast link endpoints. 310/**
311 * tipc_link_bc_create - create new link to be used for broadcast
312 * @n: pointer to associated node
313 * @mtu: mtu to be used
314 * @window: send window to be used
315 * @inputq: queue to put messages ready for delivery
316 * @namedq: queue to put binding table update messages ready for delivery
317 * @link: return value, pointer to put the created link
228 * 318 *
229 * Give a newly added peer node the sequence number where it should 319 * Returns true if link was created, otherwise false
230 * start receiving and acking broadcast packets.
231 */ 320 */
232void tipc_link_build_bcast_sync_msg(struct tipc_link *l, 321bool tipc_link_bc_create(struct net *net, u32 ownnode, u32 peer,
233 struct sk_buff_head *xmitq) 322 int mtu, int window, u16 peer_caps,
323 struct sk_buff_head *inputq,
324 struct sk_buff_head *namedq,
325 struct tipc_link *bc_sndlink,
326 struct tipc_link **link)
234{ 327{
235 struct sk_buff *skb; 328 struct tipc_link *l;
236 struct sk_buff_head list;
237 u16 last_sent;
238 329
239 skb = tipc_msg_create(BCAST_PROTOCOL, STATE_MSG, INT_H_SIZE, 330 if (!tipc_link_create(net, "", MAX_BEARERS, 0, 'Z', mtu, 0, window,
240 0, l->addr, link_own_addr(l), 0, 0, 0); 331 0, ownnode, peer, peer_caps, bc_sndlink,
241 if (!skb) 332 NULL, inputq, namedq, link))
242 return; 333 return false;
243 last_sent = tipc_bclink_get_last_sent(l->owner->net); 334
244 msg_set_last_bcast(buf_msg(skb), last_sent); 335 l = *link;
245 __skb_queue_head_init(&list); 336 strcpy(l->name, tipc_bclink_name);
246 __skb_queue_tail(&list, skb); 337 tipc_link_reset(l);
247 tipc_link_xmit(l, &list, xmitq); 338 l->state = LINK_RESET;
339 l->ackers = 0;
340 l->bc_rcvlink = l;
341
342 /* Broadcast send link is always up */
343 if (link_is_bc_sndlink(l))
344 l->state = LINK_ESTABLISHED;
345
346 return true;
248} 347}
249 348
250/** 349/**
@@ -451,12 +550,17 @@ static void link_profile_stats(struct tipc_link *l)
451 550
452/* tipc_link_timeout - perform periodic task as instructed from node timeout 551/* tipc_link_timeout - perform periodic task as instructed from node timeout
453 */ 552 */
553/* tipc_link_timeout - perform periodic task as instructed from node timeout
554 */
454int tipc_link_timeout(struct tipc_link *l, struct sk_buff_head *xmitq) 555int tipc_link_timeout(struct tipc_link *l, struct sk_buff_head *xmitq)
455{ 556{
456 int rc = 0; 557 int rc = 0;
457 int mtyp = STATE_MSG; 558 int mtyp = STATE_MSG;
458 bool xmit = false; 559 bool xmit = false;
459 bool prb = false; 560 bool prb = false;
561 u16 bc_snt = l->bc_sndlink->snd_nxt - 1;
562 u16 bc_acked = l->bc_rcvlink->acked;
563 bool bc_up = link_is_up(l->bc_rcvlink);
460 564
461 link_profile_stats(l); 565 link_profile_stats(l);
462 566
@@ -464,7 +568,7 @@ int tipc_link_timeout(struct tipc_link *l, struct sk_buff_head *xmitq)
464 case LINK_ESTABLISHED: 568 case LINK_ESTABLISHED:
465 case LINK_SYNCHING: 569 case LINK_SYNCHING:
466 if (!l->silent_intv_cnt) { 570 if (!l->silent_intv_cnt) {
467 if (tipc_bclink_acks_missing(l->owner)) 571 if (bc_up && (bc_acked != bc_snt))
468 xmit = true; 572 xmit = true;
469 } else if (l->silent_intv_cnt <= l->abort_limit) { 573 } else if (l->silent_intv_cnt <= l->abort_limit) {
470 xmit = true; 574 xmit = true;
@@ -555,38 +659,6 @@ void link_prepare_wakeup(struct tipc_link *l)
555 } 659 }
556} 660}
557 661
558/**
559 * tipc_link_reset_fragments - purge link's inbound message fragments queue
560 * @l_ptr: pointer to link
561 */
562void tipc_link_reset_fragments(struct tipc_link *l_ptr)
563{
564 kfree_skb(l_ptr->reasm_buf);
565 l_ptr->reasm_buf = NULL;
566}
567
568void tipc_link_purge_backlog(struct tipc_link *l)
569{
570 __skb_queue_purge(&l->backlogq);
571 l->backlog[TIPC_LOW_IMPORTANCE].len = 0;
572 l->backlog[TIPC_MEDIUM_IMPORTANCE].len = 0;
573 l->backlog[TIPC_HIGH_IMPORTANCE].len = 0;
574 l->backlog[TIPC_CRITICAL_IMPORTANCE].len = 0;
575 l->backlog[TIPC_SYSTEM_IMPORTANCE].len = 0;
576}
577
578/**
579 * tipc_link_purge_queues - purge all pkt queues associated with link
580 * @l_ptr: pointer to link
581 */
582void tipc_link_purge_queues(struct tipc_link *l_ptr)
583{
584 __skb_queue_purge(&l_ptr->deferdq);
585 __skb_queue_purge(&l_ptr->transmq);
586 tipc_link_purge_backlog(l_ptr);
587 tipc_link_reset_fragments(l_ptr);
588}
589
590void tipc_link_reset(struct tipc_link *l) 662void tipc_link_reset(struct tipc_link *l)
591{ 663{
592 /* Link is down, accept any session */ 664 /* Link is down, accept any session */
@@ -598,12 +670,16 @@ void tipc_link_reset(struct tipc_link *l)
598 /* Prepare for renewed mtu size negotiation */ 670 /* Prepare for renewed mtu size negotiation */
599 l->mtu = l->advertised_mtu; 671 l->mtu = l->advertised_mtu;
600 672
601 /* Clean up all queues: */ 673 /* Clean up all queues and counters: */
602 __skb_queue_purge(&l->transmq); 674 __skb_queue_purge(&l->transmq);
603 __skb_queue_purge(&l->deferdq); 675 __skb_queue_purge(&l->deferdq);
604 skb_queue_splice_init(&l->wakeupq, l->inputq); 676 skb_queue_splice_init(&l->wakeupq, l->inputq);
605 677 __skb_queue_purge(&l->backlogq);
606 tipc_link_purge_backlog(l); 678 l->backlog[TIPC_LOW_IMPORTANCE].len = 0;
679 l->backlog[TIPC_MEDIUM_IMPORTANCE].len = 0;
680 l->backlog[TIPC_HIGH_IMPORTANCE].len = 0;
681 l->backlog[TIPC_CRITICAL_IMPORTANCE].len = 0;
682 l->backlog[TIPC_SYSTEM_IMPORTANCE].len = 0;
607 kfree_skb(l->reasm_buf); 683 kfree_skb(l->reasm_buf);
608 kfree_skb(l->failover_reasm_skb); 684 kfree_skb(l->failover_reasm_skb);
609 l->reasm_buf = NULL; 685 l->reasm_buf = NULL;
@@ -611,81 +687,15 @@ void tipc_link_reset(struct tipc_link *l)
611 l->rcv_unacked = 0; 687 l->rcv_unacked = 0;
612 l->snd_nxt = 1; 688 l->snd_nxt = 1;
613 l->rcv_nxt = 1; 689 l->rcv_nxt = 1;
690 l->acked = 0;
614 l->silent_intv_cnt = 0; 691 l->silent_intv_cnt = 0;
615 l->stats.recv_info = 0; 692 l->stats.recv_info = 0;
616 l->stale_count = 0; 693 l->stale_count = 0;
694 l->bc_peer_is_up = false;
617 link_reset_statistics(l); 695 link_reset_statistics(l);
618} 696}
619 697
620/** 698/**
621 * __tipc_link_xmit(): same as tipc_link_xmit, but destlink is known & locked
622 * @link: link to use
623 * @list: chain of buffers containing message
624 *
625 * Consumes the buffer chain, except when returning an error code,
626 * Returns 0 if success, or errno: -ELINKCONG, -EMSGSIZE or -ENOBUFS
627 * Messages at TIPC_SYSTEM_IMPORTANCE are always accepted
628 */
629int __tipc_link_xmit(struct net *net, struct tipc_link *link,
630 struct sk_buff_head *list)
631{
632 struct tipc_msg *msg = buf_msg(skb_peek(list));
633 unsigned int maxwin = link->window;
634 unsigned int i, imp = msg_importance(msg);
635 uint mtu = link->mtu;
636 u16 ack = mod(link->rcv_nxt - 1);
637 u16 seqno = link->snd_nxt;
638 u16 bc_last_in = link->owner->bclink.last_in;
639 struct tipc_media_addr *addr = link->media_addr;
640 struct sk_buff_head *transmq = &link->transmq;
641 struct sk_buff_head *backlogq = &link->backlogq;
642 struct sk_buff *skb, *bskb;
643
644 /* Match msg importance against this and all higher backlog limits: */
645 for (i = imp; i <= TIPC_SYSTEM_IMPORTANCE; i++) {
646 if (unlikely(link->backlog[i].len >= link->backlog[i].limit))
647 return link_schedule_user(link, list);
648 }
649 if (unlikely(msg_size(msg) > mtu))
650 return -EMSGSIZE;
651
652 /* Prepare each packet for sending, and add to relevant queue: */
653 while (skb_queue_len(list)) {
654 skb = skb_peek(list);
655 msg = buf_msg(skb);
656 msg_set_seqno(msg, seqno);
657 msg_set_ack(msg, ack);
658 msg_set_bcast_ack(msg, bc_last_in);
659
660 if (likely(skb_queue_len(transmq) < maxwin)) {
661 __skb_dequeue(list);
662 __skb_queue_tail(transmq, skb);
663 tipc_bearer_send(net, link->bearer_id, skb, addr);
664 link->rcv_unacked = 0;
665 seqno++;
666 continue;
667 }
668 if (tipc_msg_bundle(skb_peek_tail(backlogq), msg, mtu)) {
669 kfree_skb(__skb_dequeue(list));
670 link->stats.sent_bundled++;
671 continue;
672 }
673 if (tipc_msg_make_bundle(&bskb, msg, mtu, link->addr)) {
674 kfree_skb(__skb_dequeue(list));
675 __skb_queue_tail(backlogq, bskb);
676 link->backlog[msg_importance(buf_msg(bskb))].len++;
677 link->stats.sent_bundled++;
678 link->stats.sent_bundles++;
679 continue;
680 }
681 link->backlog[imp].len += skb_queue_len(list);
682 skb_queue_splice_tail_init(list, backlogq);
683 }
684 link->snd_nxt = seqno;
685 return 0;
686}
687
688/**
689 * tipc_link_xmit(): enqueue buffer list according to queue situation 699 * tipc_link_xmit(): enqueue buffer list according to queue situation
690 * @link: link to use 700 * @link: link to use
691 * @list: chain of buffers containing message 701 * @list: chain of buffers containing message
@@ -705,7 +715,7 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list,
705 unsigned int mtu = l->mtu; 715 unsigned int mtu = l->mtu;
706 u16 ack = l->rcv_nxt - 1; 716 u16 ack = l->rcv_nxt - 1;
707 u16 seqno = l->snd_nxt; 717 u16 seqno = l->snd_nxt;
708 u16 bc_last_in = l->owner->bclink.last_in; 718 u16 bc_ack = l->bc_rcvlink->rcv_nxt - 1;
709 struct sk_buff_head *transmq = &l->transmq; 719 struct sk_buff_head *transmq = &l->transmq;
710 struct sk_buff_head *backlogq = &l->backlogq; 720 struct sk_buff_head *backlogq = &l->backlogq;
711 struct sk_buff *skb, *_skb, *bskb; 721 struct sk_buff *skb, *_skb, *bskb;
@@ -724,7 +734,7 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list,
724 hdr = buf_msg(skb); 734 hdr = buf_msg(skb);
725 msg_set_seqno(hdr, seqno); 735 msg_set_seqno(hdr, seqno);
726 msg_set_ack(hdr, ack); 736 msg_set_ack(hdr, ack);
727 msg_set_bcast_ack(hdr, bc_last_in); 737 msg_set_bcast_ack(hdr, bc_ack);
728 738
729 if (likely(skb_queue_len(transmq) < maxwin)) { 739 if (likely(skb_queue_len(transmq) < maxwin)) {
730 _skb = skb_clone(skb, GFP_ATOMIC); 740 _skb = skb_clone(skb, GFP_ATOMIC);
@@ -733,6 +743,7 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list,
733 __skb_dequeue(list); 743 __skb_dequeue(list);
734 __skb_queue_tail(transmq, skb); 744 __skb_queue_tail(transmq, skb);
735 __skb_queue_tail(xmitq, _skb); 745 __skb_queue_tail(xmitq, _skb);
746 TIPC_SKB_CB(skb)->ackers = l->ackers;
736 l->rcv_unacked = 0; 747 l->rcv_unacked = 0;
737 seqno++; 748 seqno++;
738 continue; 749 continue;
@@ -757,62 +768,13 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list,
757 return 0; 768 return 0;
758} 769}
759 770
760/*
761 * tipc_link_sync_rcv - synchronize broadcast link endpoints.
762 * Receive the sequence number where we should start receiving and
763 * acking broadcast packets from a newly added peer node, and open
764 * up for reception of such packets.
765 *
766 * Called with node locked
767 */
768static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf)
769{
770 struct tipc_msg *msg = buf_msg(buf);
771
772 n->bclink.last_sent = n->bclink.last_in = msg_last_bcast(msg);
773 n->bclink.recv_permitted = true;
774 kfree_skb(buf);
775}
776
777/*
778 * tipc_link_push_packets - push unsent packets to bearer
779 *
780 * Push out the unsent messages of a link where congestion
781 * has abated. Node is locked.
782 *
783 * Called with node locked
784 */
785void tipc_link_push_packets(struct tipc_link *link)
786{
787 struct sk_buff *skb;
788 struct tipc_msg *msg;
789 u16 seqno = link->snd_nxt;
790 u16 ack = mod(link->rcv_nxt - 1);
791
792 while (skb_queue_len(&link->transmq) < link->window) {
793 skb = __skb_dequeue(&link->backlogq);
794 if (!skb)
795 break;
796 msg = buf_msg(skb);
797 link->backlog[msg_importance(msg)].len--;
798 msg_set_ack(msg, ack);
799 msg_set_seqno(msg, seqno);
800 seqno = mod(seqno + 1);
801 msg_set_bcast_ack(msg, link->owner->bclink.last_in);
802 link->rcv_unacked = 0;
803 __skb_queue_tail(&link->transmq, skb);
804 tipc_bearer_send(link->owner->net, link->bearer_id,
805 skb, link->media_addr);
806 }
807 link->snd_nxt = seqno;
808}
809
810void tipc_link_advance_backlog(struct tipc_link *l, struct sk_buff_head *xmitq) 771void tipc_link_advance_backlog(struct tipc_link *l, struct sk_buff_head *xmitq)
811{ 772{
812 struct sk_buff *skb, *_skb; 773 struct sk_buff *skb, *_skb;
813 struct tipc_msg *hdr; 774 struct tipc_msg *hdr;
814 u16 seqno = l->snd_nxt; 775 u16 seqno = l->snd_nxt;
815 u16 ack = l->rcv_nxt - 1; 776 u16 ack = l->rcv_nxt - 1;
777 u16 bc_ack = l->bc_rcvlink->rcv_nxt - 1;
816 778
817 while (skb_queue_len(&l->transmq) < l->window) { 779 while (skb_queue_len(&l->transmq) < l->window) {
818 skb = skb_peek(&l->backlogq); 780 skb = skb_peek(&l->backlogq);
@@ -826,96 +788,35 @@ void tipc_link_advance_backlog(struct tipc_link *l, struct sk_buff_head *xmitq)
826 l->backlog[msg_importance(hdr)].len--; 788 l->backlog[msg_importance(hdr)].len--;
827 __skb_queue_tail(&l->transmq, skb); 789 __skb_queue_tail(&l->transmq, skb);
828 __skb_queue_tail(xmitq, _skb); 790 __skb_queue_tail(xmitq, _skb);
829 msg_set_ack(hdr, ack); 791 TIPC_SKB_CB(skb)->ackers = l->ackers;
830 msg_set_seqno(hdr, seqno); 792 msg_set_seqno(hdr, seqno);
831 msg_set_bcast_ack(hdr, l->owner->bclink.last_in); 793 msg_set_ack(hdr, ack);
794 msg_set_bcast_ack(hdr, bc_ack);
832 l->rcv_unacked = 0; 795 l->rcv_unacked = 0;
833 seqno++; 796 seqno++;
834 } 797 }
835 l->snd_nxt = seqno; 798 l->snd_nxt = seqno;
836} 799}
837 800
838static void link_retransmit_failure(struct tipc_link *l_ptr, 801static void link_retransmit_failure(struct tipc_link *l, struct sk_buff *skb)
839 struct sk_buff *buf)
840{
841 struct tipc_msg *msg = buf_msg(buf);
842 struct net *net = l_ptr->owner->net;
843
844 pr_warn("Retransmission failure on link <%s>\n", l_ptr->name);
845
846 if (l_ptr->addr) {
847 /* Handle failure on standard link */
848 link_print(l_ptr, "Resetting link ");
849 pr_info("Failed msg: usr %u, typ %u, len %u, err %u\n",
850 msg_user(msg), msg_type(msg), msg_size(msg),
851 msg_errcode(msg));
852 pr_info("sqno %u, prev: %x, src: %x\n",
853 msg_seqno(msg), msg_prevnode(msg), msg_orignode(msg));
854 } else {
855 /* Handle failure on broadcast link */
856 struct tipc_node *n_ptr;
857 char addr_string[16];
858
859 pr_info("Msg seq number: %u, ", msg_seqno(msg));
860 pr_cont("Outstanding acks: %lu\n",
861 (unsigned long) TIPC_SKB_CB(buf)->handle);
862
863 n_ptr = tipc_bclink_retransmit_to(net);
864
865 tipc_addr_string_fill(addr_string, n_ptr->addr);
866 pr_info("Broadcast link info for %s\n", addr_string);
867 pr_info("Reception permitted: %d, Acked: %u\n",
868 n_ptr->bclink.recv_permitted,
869 n_ptr->bclink.acked);
870 pr_info("Last in: %u, Oos state: %u, Last sent: %u\n",
871 n_ptr->bclink.last_in,
872 n_ptr->bclink.oos_state,
873 n_ptr->bclink.last_sent);
874
875 n_ptr->action_flags |= TIPC_BCAST_RESET;
876 l_ptr->stale_count = 0;
877 }
878}
879
880void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *skb,
881 u32 retransmits)
882{ 802{
883 struct tipc_msg *msg; 803 struct tipc_msg *hdr = buf_msg(skb);
884
885 if (!skb)
886 return;
887
888 msg = buf_msg(skb);
889
890 /* Detect repeated retransmit failures */
891 if (l_ptr->last_retransm == msg_seqno(msg)) {
892 if (++l_ptr->stale_count > 100) {
893 link_retransmit_failure(l_ptr, skb);
894 return;
895 }
896 } else {
897 l_ptr->last_retransm = msg_seqno(msg);
898 l_ptr->stale_count = 1;
899 }
900 804
901 skb_queue_walk_from(&l_ptr->transmq, skb) { 805 pr_warn("Retransmission failure on link <%s>\n", l->name);
902 if (!retransmits) 806 link_print(l, "Resetting link ");
903 break; 807 pr_info("Failed msg: usr %u, typ %u, len %u, err %u\n",
904 msg = buf_msg(skb); 808 msg_user(hdr), msg_type(hdr), msg_size(hdr), msg_errcode(hdr));
905 msg_set_ack(msg, mod(l_ptr->rcv_nxt - 1)); 809 pr_info("sqno %u, prev: %x, src: %x\n",
906 msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in); 810 msg_seqno(hdr), msg_prevnode(hdr), msg_orignode(hdr));
907 tipc_bearer_send(l_ptr->owner->net, l_ptr->bearer_id, skb,
908 l_ptr->media_addr);
909 retransmits--;
910 l_ptr->stats.retransmitted++;
911 }
912} 811}
913 812
914static int tipc_link_retransm(struct tipc_link *l, int retransm, 813int tipc_link_retrans(struct tipc_link *l, u16 from, u16 to,
915 struct sk_buff_head *xmitq) 814 struct sk_buff_head *xmitq)
916{ 815{
917 struct sk_buff *_skb, *skb = skb_peek(&l->transmq); 816 struct sk_buff *_skb, *skb = skb_peek(&l->transmq);
918 struct tipc_msg *hdr; 817 struct tipc_msg *hdr;
818 u16 ack = l->rcv_nxt - 1;
819 u16 bc_ack = l->bc_rcvlink->rcv_nxt - 1;
919 820
920 if (!skb) 821 if (!skb)
921 return 0; 822 return 0;
@@ -928,19 +829,25 @@ static int tipc_link_retransm(struct tipc_link *l, int retransm,
928 link_retransmit_failure(l, skb); 829 link_retransmit_failure(l, skb);
929 return tipc_link_fsm_evt(l, LINK_FAILURE_EVT); 830 return tipc_link_fsm_evt(l, LINK_FAILURE_EVT);
930 } 831 }
832
833 /* Move forward to where retransmission should start */
931 skb_queue_walk(&l->transmq, skb) { 834 skb_queue_walk(&l->transmq, skb) {
932 if (!retransm) 835 if (!less(buf_seqno(skb), from))
933 return 0; 836 break;
837 }
838
839 skb_queue_walk_from(&l->transmq, skb) {
840 if (more(buf_seqno(skb), to))
841 break;
934 hdr = buf_msg(skb); 842 hdr = buf_msg(skb);
935 _skb = __pskb_copy(skb, MIN_H_SIZE, GFP_ATOMIC); 843 _skb = __pskb_copy(skb, MIN_H_SIZE, GFP_ATOMIC);
936 if (!_skb) 844 if (!_skb)
937 return 0; 845 return 0;
938 hdr = buf_msg(_skb); 846 hdr = buf_msg(_skb);
939 msg_set_ack(hdr, l->rcv_nxt - 1); 847 msg_set_ack(hdr, ack);
940 msg_set_bcast_ack(hdr, l->owner->bclink.last_in); 848 msg_set_bcast_ack(hdr, bc_ack);
941 _skb->priority = TC_PRIO_CONTROL; 849 _skb->priority = TC_PRIO_CONTROL;
942 __skb_queue_tail(xmitq, _skb); 850 __skb_queue_tail(xmitq, _skb);
943 retransm--;
944 l->stats.retransmitted++; 851 l->stats.retransmitted++;
945 } 852 }
946 return 0; 853 return 0;
@@ -951,11 +858,9 @@ static int tipc_link_retransm(struct tipc_link *l, int retransm,
951 * Consumes buffer if message is of right type 858 * Consumes buffer if message is of right type
952 * Node lock must be held 859 * Node lock must be held
953 */ 860 */
954static bool tipc_data_input(struct tipc_link *link, struct sk_buff *skb, 861static bool tipc_data_input(struct tipc_link *l, struct sk_buff *skb,
955 struct sk_buff_head *inputq) 862 struct sk_buff_head *inputq)
956{ 863{
957 struct tipc_node *node = link->owner;
958
959 switch (msg_user(buf_msg(skb))) { 864 switch (msg_user(buf_msg(skb))) {
960 case TIPC_LOW_IMPORTANCE: 865 case TIPC_LOW_IMPORTANCE:
961 case TIPC_MEDIUM_IMPORTANCE: 866 case TIPC_MEDIUM_IMPORTANCE:
@@ -965,8 +870,8 @@ static bool tipc_data_input(struct tipc_link *link, struct sk_buff *skb,
965 skb_queue_tail(inputq, skb); 870 skb_queue_tail(inputq, skb);
966 return true; 871 return true;
967 case NAME_DISTRIBUTOR: 872 case NAME_DISTRIBUTOR:
968 node->bclink.recv_permitted = true; 873 l->bc_rcvlink->state = LINK_ESTABLISHED;
969 skb_queue_tail(link->namedq, skb); 874 skb_queue_tail(l->namedq, skb);
970 return true; 875 return true;
971 case MSG_BUNDLER: 876 case MSG_BUNDLER:
972 case TUNNEL_PROTOCOL: 877 case TUNNEL_PROTOCOL:
@@ -987,7 +892,6 @@ static bool tipc_data_input(struct tipc_link *link, struct sk_buff *skb,
987static int tipc_link_input(struct tipc_link *l, struct sk_buff *skb, 892static int tipc_link_input(struct tipc_link *l, struct sk_buff *skb,
988 struct sk_buff_head *inputq) 893 struct sk_buff_head *inputq)
989{ 894{
990 struct tipc_node *node = l->owner;
991 struct tipc_msg *hdr = buf_msg(skb); 895 struct tipc_msg *hdr = buf_msg(skb);
992 struct sk_buff **reasm_skb = &l->reasm_buf; 896 struct sk_buff **reasm_skb = &l->reasm_buf;
993 struct sk_buff *iskb; 897 struct sk_buff *iskb;
@@ -1028,13 +932,15 @@ static int tipc_link_input(struct tipc_link *l, struct sk_buff *skb,
1028 if (tipc_buf_append(reasm_skb, &skb)) { 932 if (tipc_buf_append(reasm_skb, &skb)) {
1029 l->stats.recv_fragmented++; 933 l->stats.recv_fragmented++;
1030 tipc_data_input(l, skb, inputq); 934 tipc_data_input(l, skb, inputq);
1031 } else if (!*reasm_skb) { 935 } else if (!*reasm_skb && !link_is_bc_rcvlink(l)) {
936 pr_warn_ratelimited("Unable to build fragment list\n");
1032 return tipc_link_fsm_evt(l, LINK_FAILURE_EVT); 937 return tipc_link_fsm_evt(l, LINK_FAILURE_EVT);
1033 } 938 }
1034 return 0; 939 return 0;
1035 } else if (usr == BCAST_PROTOCOL) { 940 } else if (usr == BCAST_PROTOCOL) {
1036 tipc_link_sync_rcv(node, skb); 941 tipc_bcast_lock(l->net);
1037 return 0; 942 tipc_link_bc_init_rcv(l->bc_rcvlink, hdr);
943 tipc_bcast_unlock(l->net);
1038 } 944 }
1039drop: 945drop:
1040 kfree_skb(skb); 946 kfree_skb(skb);
@@ -1057,12 +963,28 @@ static bool tipc_link_release_pkts(struct tipc_link *l, u16 acked)
1057} 963}
1058 964
1059/* tipc_link_build_ack_msg: prepare link acknowledge message for transmission 965/* tipc_link_build_ack_msg: prepare link acknowledge message for transmission
966 *
967 * Note that sending of broadcast ack is coordinated among nodes, to reduce
968 * risk of ack storms towards the sender
1060 */ 969 */
1061void tipc_link_build_ack_msg(struct tipc_link *l, struct sk_buff_head *xmitq) 970int tipc_link_build_ack_msg(struct tipc_link *l, struct sk_buff_head *xmitq)
1062{ 971{
972 if (!l)
973 return 0;
974
975 /* Broadcast ACK must be sent via a unicast link => defer to caller */
976 if (link_is_bc_rcvlink(l)) {
977 if (((l->rcv_nxt ^ link_own_addr(l)) & 0xf) != 0xf)
978 return 0;
979 l->rcv_unacked = 0;
980 return TIPC_LINK_SND_BC_ACK;
981 }
982
983 /* Unicast ACK */
1063 l->rcv_unacked = 0; 984 l->rcv_unacked = 0;
1064 l->stats.sent_acks++; 985 l->stats.sent_acks++;
1065 tipc_link_build_proto_msg(l, STATE_MSG, 0, 0, 0, 0, xmitq); 986 tipc_link_build_proto_msg(l, STATE_MSG, 0, 0, 0, 0, xmitq);
987 return 0;
1066} 988}
1067 989
1068/* tipc_link_build_reset_msg: prepare link RESET or ACTIVATE message 990/* tipc_link_build_reset_msg: prepare link RESET or ACTIVATE message
@@ -1084,6 +1006,9 @@ static void tipc_link_build_nack_msg(struct tipc_link *l,
1084{ 1006{
1085 u32 def_cnt = ++l->stats.deferred_recv; 1007 u32 def_cnt = ++l->stats.deferred_recv;
1086 1008
1009 if (link_is_bc_rcvlink(l))
1010 return;
1011
1087 if ((skb_queue_len(&l->deferdq) == 1) || !(def_cnt % TIPC_NACK_INTV)) 1012 if ((skb_queue_len(&l->deferdq) == 1) || !(def_cnt % TIPC_NACK_INTV))
1088 tipc_link_build_proto_msg(l, STATE_MSG, 0, 0, 0, 0, xmitq); 1013 tipc_link_build_proto_msg(l, STATE_MSG, 0, 0, 0, 0, xmitq);
1089} 1014}
@@ -1144,12 +1069,11 @@ int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb,
1144 l->rcv_nxt++; 1069 l->rcv_nxt++;
1145 l->stats.recv_info++; 1070 l->stats.recv_info++;
1146 if (!tipc_data_input(l, skb, l->inputq)) 1071 if (!tipc_data_input(l, skb, l->inputq))
1147 rc = tipc_link_input(l, skb, l->inputq); 1072 rc |= tipc_link_input(l, skb, l->inputq);
1148 if (unlikely(rc))
1149 break;
1150 if (unlikely(++l->rcv_unacked >= TIPC_MIN_LINK_WIN)) 1073 if (unlikely(++l->rcv_unacked >= TIPC_MIN_LINK_WIN))
1151 tipc_link_build_ack_msg(l, xmitq); 1074 rc |= tipc_link_build_ack_msg(l, xmitq);
1152 1075 if (unlikely(rc & ~TIPC_LINK_SND_BC_ACK))
1076 break;
1153 } while ((skb = __skb_dequeue(defq))); 1077 } while ((skb = __skb_dequeue(defq)));
1154 1078
1155 return rc; 1079 return rc;
@@ -1158,45 +1082,6 @@ drop:
1158 return rc; 1082 return rc;
1159} 1083}
1160 1084
1161/**
1162 * tipc_link_defer_pkt - Add out-of-sequence message to deferred reception queue
1163 *
1164 * Returns increase in queue length (i.e. 0 or 1)
1165 */
1166u32 tipc_link_defer_pkt(struct sk_buff_head *list, struct sk_buff *skb)
1167{
1168 struct sk_buff *skb1;
1169 u16 seq_no = buf_seqno(skb);
1170
1171 /* Empty queue ? */
1172 if (skb_queue_empty(list)) {
1173 __skb_queue_tail(list, skb);
1174 return 1;
1175 }
1176
1177 /* Last ? */
1178 if (less(buf_seqno(skb_peek_tail(list)), seq_no)) {
1179 __skb_queue_tail(list, skb);
1180 return 1;
1181 }
1182
1183 /* Locate insertion point in queue, then insert; discard if duplicate */
1184 skb_queue_walk(list, skb1) {
1185 u16 curr_seqno = buf_seqno(skb1);
1186
1187 if (seq_no == curr_seqno) {
1188 kfree_skb(skb);
1189 return 0;
1190 }
1191
1192 if (less(seq_no, curr_seqno))
1193 break;
1194 }
1195
1196 __skb_queue_before(list, skb1, skb);
1197 return 1;
1198}
1199
1200/* 1085/*
1201 * Send protocol message to the other endpoint. 1086 * Send protocol message to the other endpoint.
1202 */ 1087 */
@@ -1212,23 +1097,17 @@ void tipc_link_proto_xmit(struct tipc_link *l, u32 msg_typ, int probe_msg,
1212 skb = __skb_dequeue(&xmitq); 1097 skb = __skb_dequeue(&xmitq);
1213 if (!skb) 1098 if (!skb)
1214 return; 1099 return;
1215 tipc_bearer_send(l->owner->net, l->bearer_id, skb, l->media_addr); 1100 tipc_bearer_xmit_skb(l->net, l->bearer_id, skb, l->media_addr);
1216 l->rcv_unacked = 0; 1101 l->rcv_unacked = 0;
1217 kfree_skb(skb);
1218} 1102}
1219 1103
1220/* tipc_link_build_proto_msg: prepare link protocol message for transmission
1221 */
1222static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe, 1104static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe,
1223 u16 rcvgap, int tolerance, int priority, 1105 u16 rcvgap, int tolerance, int priority,
1224 struct sk_buff_head *xmitq) 1106 struct sk_buff_head *xmitq)
1225{ 1107{
1226 struct sk_buff *skb = NULL; 1108 struct sk_buff *skb = NULL;
1227 struct tipc_msg *hdr = l->pmsg; 1109 struct tipc_msg *hdr = l->pmsg;
1228 u16 snd_nxt = l->snd_nxt; 1110 bool node_up = link_is_up(l->bc_rcvlink);
1229 u16 rcv_nxt = l->rcv_nxt;
1230 u16 rcv_last = rcv_nxt - 1;
1231 int node_up = l->owner->bclink.recv_permitted;
1232 1111
1233 /* Don't send protocol message during reset or link failover */ 1112 /* Don't send protocol message during reset or link failover */
1234 if (tipc_link_is_blocked(l)) 1113 if (tipc_link_is_blocked(l))
@@ -1236,33 +1115,34 @@ static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe,
1236 1115
1237 msg_set_type(hdr, mtyp); 1116 msg_set_type(hdr, mtyp);
1238 msg_set_net_plane(hdr, l->net_plane); 1117 msg_set_net_plane(hdr, l->net_plane);
1239 msg_set_bcast_ack(hdr, l->owner->bclink.last_in); 1118 msg_set_next_sent(hdr, l->snd_nxt);
1240 msg_set_last_bcast(hdr, tipc_bclink_get_last_sent(l->owner->net)); 1119 msg_set_ack(hdr, l->rcv_nxt - 1);
1120 msg_set_bcast_ack(hdr, l->bc_rcvlink->rcv_nxt - 1);
1121 msg_set_last_bcast(hdr, l->bc_sndlink->snd_nxt - 1);
1241 msg_set_link_tolerance(hdr, tolerance); 1122 msg_set_link_tolerance(hdr, tolerance);
1242 msg_set_linkprio(hdr, priority); 1123 msg_set_linkprio(hdr, priority);
1243 msg_set_redundant_link(hdr, node_up); 1124 msg_set_redundant_link(hdr, node_up);
1244 msg_set_seq_gap(hdr, 0); 1125 msg_set_seq_gap(hdr, 0);
1245 1126
1246 /* Compatibility: created msg must not be in sequence with pkt flow */ 1127 /* Compatibility: created msg must not be in sequence with pkt flow */
1247 msg_set_seqno(hdr, snd_nxt + U16_MAX / 2); 1128 msg_set_seqno(hdr, l->snd_nxt + U16_MAX / 2);
1248 1129
1249 if (mtyp == STATE_MSG) { 1130 if (mtyp == STATE_MSG) {
1250 if (!tipc_link_is_up(l)) 1131 if (!tipc_link_is_up(l))
1251 return; 1132 return;
1252 msg_set_next_sent(hdr, snd_nxt);
1253 1133
1254 /* Override rcvgap if there are packets in deferred queue */ 1134 /* Override rcvgap if there are packets in deferred queue */
1255 if (!skb_queue_empty(&l->deferdq)) 1135 if (!skb_queue_empty(&l->deferdq))
1256 rcvgap = buf_seqno(skb_peek(&l->deferdq)) - rcv_nxt; 1136 rcvgap = buf_seqno(skb_peek(&l->deferdq)) - l->rcv_nxt;
1257 if (rcvgap) { 1137 if (rcvgap) {
1258 msg_set_seq_gap(hdr, rcvgap); 1138 msg_set_seq_gap(hdr, rcvgap);
1259 l->stats.sent_nacks++; 1139 l->stats.sent_nacks++;
1260 } 1140 }
1261 msg_set_ack(hdr, rcv_last);
1262 msg_set_probe(hdr, probe); 1141 msg_set_probe(hdr, probe);
1263 if (probe) 1142 if (probe)
1264 l->stats.sent_probes++; 1143 l->stats.sent_probes++;
1265 l->stats.sent_states++; 1144 l->stats.sent_states++;
1145 l->rcv_unacked = 0;
1266 } else { 1146 } else {
1267 /* RESET_MSG or ACTIVATE_MSG */ 1147 /* RESET_MSG or ACTIVATE_MSG */
1268 msg_set_max_pkt(hdr, l->advertised_mtu); 1148 msg_set_max_pkt(hdr, l->advertised_mtu);
@@ -1354,7 +1234,8 @@ static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb,
1354{ 1234{
1355 struct tipc_msg *hdr = buf_msg(skb); 1235 struct tipc_msg *hdr = buf_msg(skb);
1356 u16 rcvgap = 0; 1236 u16 rcvgap = 0;
1357 u16 nacked_gap = msg_seq_gap(hdr); 1237 u16 ack = msg_ack(hdr);
1238 u16 gap = msg_seq_gap(hdr);
1358 u16 peers_snd_nxt = msg_next_sent(hdr); 1239 u16 peers_snd_nxt = msg_next_sent(hdr);
1359 u16 peers_tol = msg_link_tolerance(hdr); 1240 u16 peers_tol = msg_link_tolerance(hdr);
1360 u16 peers_prio = msg_linkprio(hdr); 1241 u16 peers_prio = msg_linkprio(hdr);
@@ -1363,7 +1244,7 @@ static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb,
1363 char *if_name; 1244 char *if_name;
1364 int rc = 0; 1245 int rc = 0;
1365 1246
1366 if (tipc_link_is_blocked(l)) 1247 if (tipc_link_is_blocked(l) || !xmitq)
1367 goto exit; 1248 goto exit;
1368 1249
1369 if (link_own_addr(l) > msg_prevnode(hdr)) 1250 if (link_own_addr(l) > msg_prevnode(hdr))
@@ -1433,11 +1314,11 @@ static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb,
1433 if (rcvgap || (msg_probe(hdr))) 1314 if (rcvgap || (msg_probe(hdr)))
1434 tipc_link_build_proto_msg(l, STATE_MSG, 0, rcvgap, 1315 tipc_link_build_proto_msg(l, STATE_MSG, 0, rcvgap,
1435 0, 0, xmitq); 1316 0, 0, xmitq);
1436 tipc_link_release_pkts(l, msg_ack(hdr)); 1317 tipc_link_release_pkts(l, ack);
1437 1318
1438 /* If NACK, retransmit will now start at right position */ 1319 /* If NACK, retransmit will now start at right position */
1439 if (nacked_gap) { 1320 if (gap) {
1440 rc = tipc_link_retransm(l, nacked_gap, xmitq); 1321 rc = tipc_link_retrans(l, ack + 1, ack + gap, xmitq);
1441 l->stats.recv_nacks++; 1322 l->stats.recv_nacks++;
1442 } 1323 }
1443 1324
@@ -1450,6 +1331,188 @@ exit:
1450 return rc; 1331 return rc;
1451} 1332}
1452 1333
1334/* tipc_link_build_bc_proto_msg() - create broadcast protocol message
1335 */
1336static bool tipc_link_build_bc_proto_msg(struct tipc_link *l, bool bcast,
1337 u16 peers_snd_nxt,
1338 struct sk_buff_head *xmitq)
1339{
1340 struct sk_buff *skb;
1341 struct tipc_msg *hdr;
1342 struct sk_buff *dfrd_skb = skb_peek(&l->deferdq);
1343 u16 ack = l->rcv_nxt - 1;
1344 u16 gap_to = peers_snd_nxt - 1;
1345
1346 skb = tipc_msg_create(BCAST_PROTOCOL, STATE_MSG, INT_H_SIZE,
1347 0, l->addr, link_own_addr(l), 0, 0, 0);
1348 if (!skb)
1349 return false;
1350 hdr = buf_msg(skb);
1351 msg_set_last_bcast(hdr, l->bc_sndlink->snd_nxt - 1);
1352 msg_set_bcast_ack(hdr, ack);
1353 msg_set_bcgap_after(hdr, ack);
1354 if (dfrd_skb)
1355 gap_to = buf_seqno(dfrd_skb) - 1;
1356 msg_set_bcgap_to(hdr, gap_to);
1357 msg_set_non_seq(hdr, bcast);
1358 __skb_queue_tail(xmitq, skb);
1359 return true;
1360}
1361
1362/* tipc_link_build_bc_init_msg() - synchronize broadcast link endpoints.
1363 *
1364 * Give a newly added peer node the sequence number where it should
1365 * start receiving and acking broadcast packets.
1366 */
1367void tipc_link_build_bc_init_msg(struct tipc_link *l,
1368 struct sk_buff_head *xmitq)
1369{
1370 struct sk_buff_head list;
1371
1372 __skb_queue_head_init(&list);
1373 if (!tipc_link_build_bc_proto_msg(l->bc_rcvlink, false, 0, &list))
1374 return;
1375 tipc_link_xmit(l, &list, xmitq);
1376}
1377
1378/* tipc_link_bc_init_rcv - receive initial broadcast synch data from peer
1379 */
1380void tipc_link_bc_init_rcv(struct tipc_link *l, struct tipc_msg *hdr)
1381{
1382 int mtyp = msg_type(hdr);
1383 u16 peers_snd_nxt = msg_bc_snd_nxt(hdr);
1384
1385 if (link_is_up(l))
1386 return;
1387
1388 if (msg_user(hdr) == BCAST_PROTOCOL) {
1389 l->rcv_nxt = peers_snd_nxt;
1390 l->state = LINK_ESTABLISHED;
1391 return;
1392 }
1393
1394 if (l->peer_caps & TIPC_BCAST_SYNCH)
1395 return;
1396
1397 if (msg_peer_node_is_up(hdr))
1398 return;
1399
1400 /* Compatibility: accept older, less safe initial synch data */
1401 if ((mtyp == RESET_MSG) || (mtyp == ACTIVATE_MSG))
1402 l->rcv_nxt = peers_snd_nxt;
1403}
1404
1405/* tipc_link_bc_sync_rcv - update rcv link according to peer's send state
1406 */
1407void tipc_link_bc_sync_rcv(struct tipc_link *l, struct tipc_msg *hdr,
1408 struct sk_buff_head *xmitq)
1409{
1410 u16 peers_snd_nxt = msg_bc_snd_nxt(hdr);
1411
1412 if (!link_is_up(l))
1413 return;
1414
1415 if (!msg_peer_node_is_up(hdr))
1416 return;
1417
1418 l->bc_peer_is_up = true;
1419
1420 /* Ignore if peers_snd_nxt goes beyond receive window */
1421 if (more(peers_snd_nxt, l->rcv_nxt + l->window))
1422 return;
1423
1424 if (!more(peers_snd_nxt, l->rcv_nxt)) {
1425 l->nack_state = BC_NACK_SND_CONDITIONAL;
1426 return;
1427 }
1428
1429 /* Don't NACK if one was recently sent or peeked */
1430 if (l->nack_state == BC_NACK_SND_SUPPRESS) {
1431 l->nack_state = BC_NACK_SND_UNCONDITIONAL;
1432 return;
1433 }
1434
1435 /* Conditionally delay NACK sending until next synch rcv */
1436 if (l->nack_state == BC_NACK_SND_CONDITIONAL) {
1437 l->nack_state = BC_NACK_SND_UNCONDITIONAL;
1438 if ((peers_snd_nxt - l->rcv_nxt) < TIPC_MIN_LINK_WIN)
1439 return;
1440 }
1441
1442 /* Send NACK now but suppress next one */
1443 tipc_link_build_bc_proto_msg(l, true, peers_snd_nxt, xmitq);
1444 l->nack_state = BC_NACK_SND_SUPPRESS;
1445}
1446
1447void tipc_link_bc_ack_rcv(struct tipc_link *l, u16 acked,
1448 struct sk_buff_head *xmitq)
1449{
1450 struct sk_buff *skb, *tmp;
1451 struct tipc_link *snd_l = l->bc_sndlink;
1452
1453 if (!link_is_up(l) || !l->bc_peer_is_up)
1454 return;
1455
1456 if (!more(acked, l->acked))
1457 return;
1458
1459 /* Skip over packets peer has already acked */
1460 skb_queue_walk(&snd_l->transmq, skb) {
1461 if (more(buf_seqno(skb), l->acked))
1462 break;
1463 }
1464
1465 /* Update/release the packets peer is acking now */
1466 skb_queue_walk_from_safe(&snd_l->transmq, skb, tmp) {
1467 if (more(buf_seqno(skb), acked))
1468 break;
1469 if (!--TIPC_SKB_CB(skb)->ackers) {
1470 __skb_unlink(skb, &snd_l->transmq);
1471 kfree_skb(skb);
1472 }
1473 }
1474 l->acked = acked;
1475 tipc_link_advance_backlog(snd_l, xmitq);
1476 if (unlikely(!skb_queue_empty(&snd_l->wakeupq)))
1477 link_prepare_wakeup(snd_l);
1478}
1479
1480/* tipc_link_bc_nack_rcv(): receive broadcast nack message
1481 */
1482int tipc_link_bc_nack_rcv(struct tipc_link *l, struct sk_buff *skb,
1483 struct sk_buff_head *xmitq)
1484{
1485 struct tipc_msg *hdr = buf_msg(skb);
1486 u32 dnode = msg_destnode(hdr);
1487 int mtyp = msg_type(hdr);
1488 u16 acked = msg_bcast_ack(hdr);
1489 u16 from = acked + 1;
1490 u16 to = msg_bcgap_to(hdr);
1491 u16 peers_snd_nxt = to + 1;
1492 int rc = 0;
1493
1494 kfree_skb(skb);
1495
1496 if (!tipc_link_is_up(l) || !l->bc_peer_is_up)
1497 return 0;
1498
1499 if (mtyp != STATE_MSG)
1500 return 0;
1501
1502 if (dnode == link_own_addr(l)) {
1503 tipc_link_bc_ack_rcv(l, acked, xmitq);
1504 rc = tipc_link_retrans(l->bc_sndlink, from, to, xmitq);
1505 l->stats.recv_nacks++;
1506 return rc;
1507 }
1508
1509 /* Msg for other node => suppress own NACK at next sync if applicable */
1510 if (more(peers_snd_nxt, l->rcv_nxt) && !less(l->rcv_nxt, from))
1511 l->nack_state = BC_NACK_SND_SUPPRESS;
1512
1513 return 0;
1514}
1515
1453void tipc_link_set_queue_limits(struct tipc_link *l, u32 win) 1516void tipc_link_set_queue_limits(struct tipc_link *l, u32 win)
1454{ 1517{
1455 int max_bulk = TIPC_MAX_PUBLICATIONS / (l->mtu / ITEM_SIZE); 1518 int max_bulk = TIPC_MAX_PUBLICATIONS / (l->mtu / ITEM_SIZE);
@@ -1514,7 +1577,7 @@ static void link_reset_statistics(struct tipc_link *l_ptr)
1514static void link_print(struct tipc_link *l, const char *str) 1577static void link_print(struct tipc_link *l, const char *str)
1515{ 1578{
1516 struct sk_buff *hskb = skb_peek(&l->transmq); 1579 struct sk_buff *hskb = skb_peek(&l->transmq);
1517 u16 head = hskb ? msg_seqno(buf_msg(hskb)) : l->snd_nxt; 1580 u16 head = hskb ? msg_seqno(buf_msg(hskb)) : l->snd_nxt - 1;
1518 u16 tail = l->snd_nxt - 1; 1581 u16 tail = l->snd_nxt - 1;
1519 1582
1520 pr_info("%s Link <%s> state %x\n", str, l->name, l->state); 1583 pr_info("%s Link <%s> state %x\n", str, l->name, l->state);
@@ -1738,7 +1801,7 @@ static int __tipc_nl_add_link(struct net *net, struct tipc_nl_msg *msg,
1738 if (tipc_link_is_up(link)) 1801 if (tipc_link_is_up(link))
1739 if (nla_put_flag(msg->skb, TIPC_NLA_LINK_UP)) 1802 if (nla_put_flag(msg->skb, TIPC_NLA_LINK_UP))
1740 goto attr_msg_full; 1803 goto attr_msg_full;
1741 if (tipc_link_is_active(link)) 1804 if (link->active)
1742 if (nla_put_flag(msg->skb, TIPC_NLA_LINK_ACTIVE)) 1805 if (nla_put_flag(msg->skb, TIPC_NLA_LINK_ACTIVE))
1743 goto attr_msg_full; 1806 goto attr_msg_full;
1744 1807
diff --git a/net/tipc/link.h b/net/tipc/link.h
index 0201212cb49a..66d859b66c84 100644
--- a/net/tipc/link.h
+++ b/net/tipc/link.h
@@ -66,7 +66,8 @@ enum {
66 */ 66 */
67enum { 67enum {
68 TIPC_LINK_UP_EVT = 1, 68 TIPC_LINK_UP_EVT = 1,
69 TIPC_LINK_DOWN_EVT = (1 << 1) 69 TIPC_LINK_DOWN_EVT = (1 << 1),
70 TIPC_LINK_SND_BC_ACK = (1 << 2)
70}; 71};
71 72
72/* Starting value for maximum packet size negotiation on unicast links 73/* Starting value for maximum packet size negotiation on unicast links
@@ -110,7 +111,7 @@ struct tipc_stats {
110 * @name: link name character string 111 * @name: link name character string
111 * @media_addr: media address to use when sending messages over link 112 * @media_addr: media address to use when sending messages over link
112 * @timer: link timer 113 * @timer: link timer
113 * @owner: pointer to peer node 114 * @net: pointer to namespace struct
114 * @refcnt: reference counter for permanent references (owner node & timer) 115 * @refcnt: reference counter for permanent references (owner node & timer)
115 * @peer_session: link session # being used by peer end of link 116 * @peer_session: link session # being used by peer end of link
116 * @peer_bearer_id: bearer id used by link's peer endpoint 117 * @peer_bearer_id: bearer id used by link's peer endpoint
@@ -119,6 +120,7 @@ struct tipc_stats {
119 * @keepalive_intv: link keepalive timer interval 120 * @keepalive_intv: link keepalive timer interval
120 * @abort_limit: # of unacknowledged continuity probes needed to reset link 121 * @abort_limit: # of unacknowledged continuity probes needed to reset link
121 * @state: current state of link FSM 122 * @state: current state of link FSM
123 * @peer_caps: bitmap describing capabilities of peer node
122 * @silent_intv_cnt: # of timer intervals without any reception from peer 124 * @silent_intv_cnt: # of timer intervals without any reception from peer
123 * @proto_msg: template for control messages generated by link 125 * @proto_msg: template for control messages generated by link
124 * @pmsg: convenience pointer to "proto_msg" field 126 * @pmsg: convenience pointer to "proto_msg" field
@@ -134,6 +136,8 @@ struct tipc_stats {
134 * @snt_nxt: next sequence number to use for outbound messages 136 * @snt_nxt: next sequence number to use for outbound messages
135 * @last_retransmitted: sequence number of most recently retransmitted message 137 * @last_retransmitted: sequence number of most recently retransmitted message
136 * @stale_count: # of identical retransmit requests made by peer 138 * @stale_count: # of identical retransmit requests made by peer
139 * @ackers: # of peers that needs to ack each packet before it can be released
140 * @acked: # last packet acked by a certain peer. Used for broadcast.
137 * @rcv_nxt: next sequence number to expect for inbound messages 141 * @rcv_nxt: next sequence number to expect for inbound messages
138 * @deferred_queue: deferred queue saved OOS b'cast message received from node 142 * @deferred_queue: deferred queue saved OOS b'cast message received from node
139 * @unacked_window: # of inbound messages rx'd without ack'ing back to peer 143 * @unacked_window: # of inbound messages rx'd without ack'ing back to peer
@@ -143,13 +147,14 @@ struct tipc_stats {
143 * @wakeupq: linked list of wakeup msgs waiting for link congestion to abate 147 * @wakeupq: linked list of wakeup msgs waiting for link congestion to abate
144 * @long_msg_seq_no: next identifier to use for outbound fragmented messages 148 * @long_msg_seq_no: next identifier to use for outbound fragmented messages
145 * @reasm_buf: head of partially reassembled inbound message fragments 149 * @reasm_buf: head of partially reassembled inbound message fragments
150 * @bc_rcvr: marks that this is a broadcast receiver link
146 * @stats: collects statistics regarding link activity 151 * @stats: collects statistics regarding link activity
147 */ 152 */
148struct tipc_link { 153struct tipc_link {
149 u32 addr; 154 u32 addr;
150 char name[TIPC_MAX_LINK_NAME]; 155 char name[TIPC_MAX_LINK_NAME];
151 struct tipc_media_addr *media_addr; 156 struct tipc_media_addr *media_addr;
152 struct tipc_node *owner; 157 struct net *net;
153 158
154 /* Management and link supervision data */ 159 /* Management and link supervision data */
155 u32 peer_session; 160 u32 peer_session;
@@ -159,6 +164,8 @@ struct tipc_link {
159 unsigned long keepalive_intv; 164 unsigned long keepalive_intv;
160 u32 abort_limit; 165 u32 abort_limit;
161 u32 state; 166 u32 state;
167 u16 peer_caps;
168 bool active;
162 u32 silent_intv_cnt; 169 u32 silent_intv_cnt;
163 struct { 170 struct {
164 unchar hdr[INT_H_SIZE]; 171 unchar hdr[INT_H_SIZE];
@@ -201,18 +208,35 @@ struct tipc_link {
201 /* Fragmentation/reassembly */ 208 /* Fragmentation/reassembly */
202 struct sk_buff *reasm_buf; 209 struct sk_buff *reasm_buf;
203 210
211 /* Broadcast */
212 u16 ackers;
213 u16 acked;
214 struct tipc_link *bc_rcvlink;
215 struct tipc_link *bc_sndlink;
216 int nack_state;
217 bool bc_peer_is_up;
218
204 /* Statistics */ 219 /* Statistics */
205 struct tipc_stats stats; 220 struct tipc_stats stats;
206}; 221};
207 222
208bool tipc_link_create(struct tipc_node *n, struct tipc_bearer *b, u32 session, 223bool tipc_link_create(struct net *net, char *if_name, int bearer_id,
209 u32 ownnode, u32 peer, struct tipc_media_addr *maddr, 224 int tolerance, char net_plane, u32 mtu, int priority,
210 struct sk_buff_head *inputq, struct sk_buff_head *namedq, 225 int window, u32 session, u32 ownnode, u32 peer,
226 u16 peer_caps,
227 struct tipc_link *bc_sndlink,
228 struct tipc_link *bc_rcvlink,
229 struct sk_buff_head *inputq,
230 struct sk_buff_head *namedq,
211 struct tipc_link **link); 231 struct tipc_link **link);
232bool tipc_link_bc_create(struct net *net, u32 ownnode, u32 peer,
233 int mtu, int window, u16 peer_caps,
234 struct sk_buff_head *inputq,
235 struct sk_buff_head *namedq,
236 struct tipc_link *bc_sndlink,
237 struct tipc_link **link);
212void tipc_link_tnl_prepare(struct tipc_link *l, struct tipc_link *tnl, 238void tipc_link_tnl_prepare(struct tipc_link *l, struct tipc_link *tnl,
213 int mtyp, struct sk_buff_head *xmitq); 239 int mtyp, struct sk_buff_head *xmitq);
214void tipc_link_build_bcast_sync_msg(struct tipc_link *l,
215 struct sk_buff_head *xmitq);
216void tipc_link_build_reset_msg(struct tipc_link *l, struct sk_buff_head *xmitq); 240void tipc_link_build_reset_msg(struct tipc_link *l, struct sk_buff_head *xmitq);
217int tipc_link_fsm_evt(struct tipc_link *l, int evt); 241int tipc_link_fsm_evt(struct tipc_link *l, int evt);
218void tipc_link_reset_fragments(struct tipc_link *l_ptr); 242void tipc_link_reset_fragments(struct tipc_link *l_ptr);
@@ -223,23 +247,11 @@ bool tipc_link_is_establishing(struct tipc_link *l);
223bool tipc_link_is_synching(struct tipc_link *l); 247bool tipc_link_is_synching(struct tipc_link *l);
224bool tipc_link_is_failingover(struct tipc_link *l); 248bool tipc_link_is_failingover(struct tipc_link *l);
225bool tipc_link_is_blocked(struct tipc_link *l); 249bool tipc_link_is_blocked(struct tipc_link *l);
226int tipc_link_is_active(struct tipc_link *l_ptr); 250void tipc_link_set_active(struct tipc_link *l, bool active);
227void tipc_link_purge_queues(struct tipc_link *l_ptr);
228void tipc_link_purge_backlog(struct tipc_link *l);
229void tipc_link_reset(struct tipc_link *l_ptr); 251void tipc_link_reset(struct tipc_link *l_ptr);
230int __tipc_link_xmit(struct net *net, struct tipc_link *link,
231 struct sk_buff_head *list);
232int tipc_link_xmit(struct tipc_link *link, struct sk_buff_head *list, 252int tipc_link_xmit(struct tipc_link *link, struct sk_buff_head *list,
233 struct sk_buff_head *xmitq); 253 struct sk_buff_head *xmitq);
234void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int prob, 254void tipc_link_set_queue_limits(struct tipc_link *l, u32 window);
235 u32 gap, u32 tolerance, u32 priority);
236void tipc_link_push_packets(struct tipc_link *l_ptr);
237u32 tipc_link_defer_pkt(struct sk_buff_head *list, struct sk_buff *buf);
238void tipc_link_set_queue_limits(struct tipc_link *l_ptr, u32 window);
239void tipc_link_retransmit(struct tipc_link *l_ptr,
240 struct sk_buff *start, u32 retransmits);
241struct sk_buff *tipc_skb_queue_next(const struct sk_buff_head *list,
242 const struct sk_buff *skb);
243 255
244int tipc_nl_link_dump(struct sk_buff *skb, struct netlink_callback *cb); 256int tipc_nl_link_dump(struct sk_buff *skb, struct netlink_callback *cb);
245int tipc_nl_link_get(struct sk_buff *skb, struct genl_info *info); 257int tipc_nl_link_get(struct sk_buff *skb, struct genl_info *info);
@@ -249,5 +261,23 @@ int tipc_nl_parse_link_prop(struct nlattr *prop, struct nlattr *props[]);
249int tipc_link_timeout(struct tipc_link *l, struct sk_buff_head *xmitq); 261int tipc_link_timeout(struct tipc_link *l, struct sk_buff_head *xmitq);
250int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb, 262int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb,
251 struct sk_buff_head *xmitq); 263 struct sk_buff_head *xmitq);
252 264int tipc_link_build_ack_msg(struct tipc_link *l, struct sk_buff_head *xmitq);
265void tipc_link_add_bc_peer(struct tipc_link *snd_l,
266 struct tipc_link *uc_l,
267 struct sk_buff_head *xmitq);
268void tipc_link_remove_bc_peer(struct tipc_link *snd_l,
269 struct tipc_link *rcv_l,
270 struct sk_buff_head *xmitq);
271int tipc_link_bc_peers(struct tipc_link *l);
272void tipc_link_set_mtu(struct tipc_link *l, int mtu);
273int tipc_link_mtu(struct tipc_link *l);
274void tipc_link_bc_ack_rcv(struct tipc_link *l, u16 acked,
275 struct sk_buff_head *xmitq);
276void tipc_link_build_bc_sync_msg(struct tipc_link *l,
277 struct sk_buff_head *xmitq);
278void tipc_link_bc_init_rcv(struct tipc_link *l, struct tipc_msg *hdr);
279void tipc_link_bc_sync_rcv(struct tipc_link *l, struct tipc_msg *hdr,
280 struct sk_buff_head *xmitq);
281int tipc_link_bc_nack_rcv(struct tipc_link *l, struct sk_buff *skb,
282 struct sk_buff_head *xmitq);
253#endif 283#endif
diff --git a/net/tipc/msg.c b/net/tipc/msg.c
index 26d38b3d8760..8740930f0787 100644
--- a/net/tipc/msg.c
+++ b/net/tipc/msg.c
@@ -182,7 +182,6 @@ int tipc_buf_append(struct sk_buff **headbuf, struct sk_buff **buf)
182 *buf = NULL; 182 *buf = NULL;
183 return 0; 183 return 0;
184err: 184err:
185 pr_warn_ratelimited("Unable to build fragment list\n");
186 kfree_skb(*buf); 185 kfree_skb(*buf);
187 kfree_skb(*headbuf); 186 kfree_skb(*headbuf);
188 *buf = *headbuf = NULL; 187 *buf = *headbuf = NULL;
@@ -565,18 +564,22 @@ bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb, int *err)
565/* tipc_msg_reassemble() - clone a buffer chain of fragments and 564/* tipc_msg_reassemble() - clone a buffer chain of fragments and
566 * reassemble the clones into one message 565 * reassemble the clones into one message
567 */ 566 */
568struct sk_buff *tipc_msg_reassemble(struct sk_buff_head *list) 567bool tipc_msg_reassemble(struct sk_buff_head *list, struct sk_buff_head *rcvq)
569{ 568{
570 struct sk_buff *skb; 569 struct sk_buff *skb, *_skb;
571 struct sk_buff *frag = NULL; 570 struct sk_buff *frag = NULL;
572 struct sk_buff *head = NULL; 571 struct sk_buff *head = NULL;
573 int hdr_sz; 572 int hdr_len;
574 573
575 /* Copy header if single buffer */ 574 /* Copy header if single buffer */
576 if (skb_queue_len(list) == 1) { 575 if (skb_queue_len(list) == 1) {
577 skb = skb_peek(list); 576 skb = skb_peek(list);
578 hdr_sz = skb_headroom(skb) + msg_hdr_sz(buf_msg(skb)); 577 hdr_len = skb_headroom(skb) + msg_hdr_sz(buf_msg(skb));
579 return __pskb_copy(skb, hdr_sz, GFP_ATOMIC); 578 _skb = __pskb_copy(skb, hdr_len, GFP_ATOMIC);
579 if (!_skb)
580 return false;
581 __skb_queue_tail(rcvq, _skb);
582 return true;
580 } 583 }
581 584
582 /* Clone all fragments and reassemble */ 585 /* Clone all fragments and reassemble */
@@ -590,11 +593,12 @@ struct sk_buff *tipc_msg_reassemble(struct sk_buff_head *list)
590 if (!head) 593 if (!head)
591 goto error; 594 goto error;
592 } 595 }
593 return frag; 596 __skb_queue_tail(rcvq, frag);
597 return true;
594error: 598error:
595 pr_warn("Failed do clone local mcast rcv buffer\n"); 599 pr_warn("Failed do clone local mcast rcv buffer\n");
596 kfree_skb(head); 600 kfree_skb(head);
597 return NULL; 601 return false;
598} 602}
599 603
600/* tipc_skb_queue_sorted(); sort pkt into list according to sequence number 604/* tipc_skb_queue_sorted(); sort pkt into list according to sequence number
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index 9f0ef54be612..55778a0aebf3 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -112,6 +112,7 @@ struct tipc_skb_cb {
112 bool wakeup_pending; 112 bool wakeup_pending;
113 u16 chain_sz; 113 u16 chain_sz;
114 u16 chain_imp; 114 u16 chain_imp;
115 u16 ackers;
115}; 116};
116 117
117#define TIPC_SKB_CB(__skb) ((struct tipc_skb_cb *)&((__skb)->cb[0])) 118#define TIPC_SKB_CB(__skb) ((struct tipc_skb_cb *)&((__skb)->cb[0]))
@@ -600,6 +601,11 @@ static inline u32 msg_last_bcast(struct tipc_msg *m)
600 return msg_bits(m, 4, 16, 0xffff); 601 return msg_bits(m, 4, 16, 0xffff);
601} 602}
602 603
604static inline u32 msg_bc_snd_nxt(struct tipc_msg *m)
605{
606 return msg_last_bcast(m) + 1;
607}
608
603static inline void msg_set_last_bcast(struct tipc_msg *m, u32 n) 609static inline void msg_set_last_bcast(struct tipc_msg *m, u32 n)
604{ 610{
605 msg_set_bits(m, 4, 16, 0xffff, n); 611 msg_set_bits(m, 4, 16, 0xffff, n);
@@ -789,7 +795,7 @@ bool tipc_msg_extract(struct sk_buff *skb, struct sk_buff **iskb, int *pos);
789int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m, 795int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m,
790 int offset, int dsz, int mtu, struct sk_buff_head *list); 796 int offset, int dsz, int mtu, struct sk_buff_head *list);
791bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb, int *err); 797bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb, int *err);
792struct sk_buff *tipc_msg_reassemble(struct sk_buff_head *list); 798bool tipc_msg_reassemble(struct sk_buff_head *list, struct sk_buff_head *rcvq);
793void __tipc_skb_queue_sorted(struct sk_buff_head *list, u16 seqno, 799void __tipc_skb_queue_sorted(struct sk_buff_head *list, u16 seqno,
794 struct sk_buff *skb); 800 struct sk_buff *skb);
795 801
diff --git a/net/tipc/name_distr.c b/net/tipc/name_distr.c
index e6018b7eb197..c07612bab95c 100644
--- a/net/tipc/name_distr.c
+++ b/net/tipc/name_distr.c
@@ -102,7 +102,7 @@ void named_cluster_distribute(struct net *net, struct sk_buff *skb)
102 if (!oskb) 102 if (!oskb)
103 break; 103 break;
104 msg_set_destnode(buf_msg(oskb), dnode); 104 msg_set_destnode(buf_msg(oskb), dnode);
105 tipc_node_xmit_skb(net, oskb, dnode, dnode); 105 tipc_node_xmit_skb(net, oskb, dnode, 0);
106 } 106 }
107 rcu_read_unlock(); 107 rcu_read_unlock();
108 108
@@ -223,7 +223,7 @@ void tipc_named_node_up(struct net *net, u32 dnode)
223 &tn->nametbl->publ_list[TIPC_ZONE_SCOPE]); 223 &tn->nametbl->publ_list[TIPC_ZONE_SCOPE]);
224 rcu_read_unlock(); 224 rcu_read_unlock();
225 225
226 tipc_node_xmit(net, &head, dnode, dnode); 226 tipc_node_xmit(net, &head, dnode, 0);
227} 227}
228 228
229static void tipc_publ_subscribe(struct net *net, struct publication *publ, 229static void tipc_publ_subscribe(struct net *net, struct publication *publ,
diff --git a/net/tipc/net.c b/net/tipc/net.c
index d6d1399ae229..77bf9113c7a7 100644
--- a/net/tipc/net.c
+++ b/net/tipc/net.c
@@ -112,14 +112,11 @@ int tipc_net_start(struct net *net, u32 addr)
112{ 112{
113 struct tipc_net *tn = net_generic(net, tipc_net_id); 113 struct tipc_net *tn = net_generic(net, tipc_net_id);
114 char addr_string[16]; 114 char addr_string[16];
115 int res;
116 115
117 tn->own_addr = addr; 116 tn->own_addr = addr;
118 tipc_named_reinit(net); 117 tipc_named_reinit(net);
119 tipc_sk_reinit(net); 118 tipc_sk_reinit(net);
120 res = tipc_bclink_init(net); 119 tipc_bcast_reinit(net);
121 if (res)
122 return res;
123 120
124 tipc_nametbl_publish(net, TIPC_CFG_SRV, tn->own_addr, tn->own_addr, 121 tipc_nametbl_publish(net, TIPC_CFG_SRV, tn->own_addr, tn->own_addr,
125 TIPC_ZONE_SCOPE, 0, tn->own_addr); 122 TIPC_ZONE_SCOPE, 0, tn->own_addr);
@@ -142,7 +139,6 @@ void tipc_net_stop(struct net *net)
142 tn->own_addr); 139 tn->own_addr);
143 rtnl_lock(); 140 rtnl_lock();
144 tipc_bearer_stop(net); 141 tipc_bearer_stop(net);
145 tipc_bclink_stop(net);
146 tipc_node_stop(net); 142 tipc_node_stop(net);
147 rtnl_unlock(); 143 rtnl_unlock();
148 144
diff --git a/net/tipc/node.c b/net/tipc/node.c
index 2670751d0e2e..7493506b069b 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -72,7 +72,6 @@ static void __tipc_node_link_down(struct tipc_node *n, int *bearer_id,
72static void tipc_node_link_down(struct tipc_node *n, int bearer_id, 72static void tipc_node_link_down(struct tipc_node *n, int bearer_id,
73 bool delete); 73 bool delete);
74static void node_lost_contact(struct tipc_node *n, struct sk_buff_head *inputq); 74static void node_lost_contact(struct tipc_node *n, struct sk_buff_head *inputq);
75static void node_established_contact(struct tipc_node *n_ptr);
76static void tipc_node_delete(struct tipc_node *node); 75static void tipc_node_delete(struct tipc_node *node);
77static void tipc_node_timeout(unsigned long data); 76static void tipc_node_timeout(unsigned long data);
78static void tipc_node_fsm_evt(struct tipc_node *n, int evt); 77static void tipc_node_fsm_evt(struct tipc_node *n, int evt);
@@ -165,8 +164,10 @@ struct tipc_node *tipc_node_create(struct net *net, u32 addr, u16 capabilities)
165 INIT_LIST_HEAD(&n_ptr->list); 164 INIT_LIST_HEAD(&n_ptr->list);
166 INIT_LIST_HEAD(&n_ptr->publ_list); 165 INIT_LIST_HEAD(&n_ptr->publ_list);
167 INIT_LIST_HEAD(&n_ptr->conn_sks); 166 INIT_LIST_HEAD(&n_ptr->conn_sks);
168 skb_queue_head_init(&n_ptr->bclink.namedq); 167 skb_queue_head_init(&n_ptr->bc_entry.namedq);
169 __skb_queue_head_init(&n_ptr->bclink.deferdq); 168 skb_queue_head_init(&n_ptr->bc_entry.inputq1);
169 __skb_queue_head_init(&n_ptr->bc_entry.arrvq);
170 skb_queue_head_init(&n_ptr->bc_entry.inputq2);
170 hlist_add_head_rcu(&n_ptr->hash, &tn->node_htable[tipc_hashfn(addr)]); 171 hlist_add_head_rcu(&n_ptr->hash, &tn->node_htable[tipc_hashfn(addr)]);
171 list_for_each_entry_rcu(temp_node, &tn->node_list, list) { 172 list_for_each_entry_rcu(temp_node, &tn->node_list, list) {
172 if (n_ptr->addr < temp_node->addr) 173 if (n_ptr->addr < temp_node->addr)
@@ -177,6 +178,18 @@ struct tipc_node *tipc_node_create(struct net *net, u32 addr, u16 capabilities)
177 n_ptr->signature = INVALID_NODE_SIG; 178 n_ptr->signature = INVALID_NODE_SIG;
178 n_ptr->active_links[0] = INVALID_BEARER_ID; 179 n_ptr->active_links[0] = INVALID_BEARER_ID;
179 n_ptr->active_links[1] = INVALID_BEARER_ID; 180 n_ptr->active_links[1] = INVALID_BEARER_ID;
181 if (!tipc_link_bc_create(net, tipc_own_addr(net), n_ptr->addr,
182 U16_MAX, tipc_bc_sndlink(net)->window,
183 n_ptr->capabilities,
184 &n_ptr->bc_entry.inputq1,
185 &n_ptr->bc_entry.namedq,
186 tipc_bc_sndlink(net),
187 &n_ptr->bc_entry.link)) {
188 pr_warn("Broadcast rcv link creation failed, no memory\n");
189 kfree(n_ptr);
190 n_ptr = NULL;
191 goto exit;
192 }
180 tipc_node_get(n_ptr); 193 tipc_node_get(n_ptr);
181 setup_timer(&n_ptr->timer, tipc_node_timeout, (unsigned long)n_ptr); 194 setup_timer(&n_ptr->timer, tipc_node_timeout, (unsigned long)n_ptr);
182 n_ptr->keepalive_intv = U32_MAX; 195 n_ptr->keepalive_intv = U32_MAX;
@@ -203,6 +216,7 @@ static void tipc_node_delete(struct tipc_node *node)
203{ 216{
204 list_del_rcu(&node->list); 217 list_del_rcu(&node->list);
205 hlist_del_rcu(&node->hash); 218 hlist_del_rcu(&node->hash);
219 kfree(node->bc_entry.link);
206 kfree_rcu(node, rcu); 220 kfree_rcu(node, rcu);
207} 221}
208 222
@@ -332,6 +346,7 @@ static void __tipc_node_link_up(struct tipc_node *n, int bearer_id,
332 n->links[bearer_id].mtu = nl->mtu - INT_H_SIZE; 346 n->links[bearer_id].mtu = nl->mtu - INT_H_SIZE;
333 347
334 tipc_bearer_add_dest(n->net, bearer_id, n->addr); 348 tipc_bearer_add_dest(n->net, bearer_id, n->addr);
349 tipc_bcast_inc_bearer_dst_cnt(n->net, bearer_id);
335 350
336 pr_debug("Established link <%s> on network plane %c\n", 351 pr_debug("Established link <%s> on network plane %c\n",
337 nl->name, nl->net_plane); 352 nl->name, nl->net_plane);
@@ -340,8 +355,9 @@ static void __tipc_node_link_up(struct tipc_node *n, int bearer_id,
340 if (!ol) { 355 if (!ol) {
341 *slot0 = bearer_id; 356 *slot0 = bearer_id;
342 *slot1 = bearer_id; 357 *slot1 = bearer_id;
343 tipc_link_build_bcast_sync_msg(nl, xmitq); 358 tipc_node_fsm_evt(n, SELF_ESTABL_CONTACT_EVT);
344 node_established_contact(n); 359 n->action_flags |= TIPC_NOTIFY_NODE_UP;
360 tipc_bcast_add_peer(n->net, nl, xmitq);
345 return; 361 return;
346 } 362 }
347 363
@@ -350,8 +366,11 @@ static void __tipc_node_link_up(struct tipc_node *n, int bearer_id,
350 pr_debug("Old link <%s> becomes standby\n", ol->name); 366 pr_debug("Old link <%s> becomes standby\n", ol->name);
351 *slot0 = bearer_id; 367 *slot0 = bearer_id;
352 *slot1 = bearer_id; 368 *slot1 = bearer_id;
369 tipc_link_set_active(nl, true);
370 tipc_link_set_active(ol, false);
353 } else if (nl->priority == ol->priority) { 371 } else if (nl->priority == ol->priority) {
354 *slot0 = bearer_id; 372 tipc_link_set_active(nl, true);
373 *slot1 = bearer_id;
355 } else { 374 } else {
356 pr_debug("New link <%s> is standby\n", nl->name); 375 pr_debug("New link <%s> is standby\n", nl->name);
357 } 376 }
@@ -428,8 +447,10 @@ static void __tipc_node_link_down(struct tipc_node *n, int *bearer_id,
428 tipc_link_build_reset_msg(l, xmitq); 447 tipc_link_build_reset_msg(l, xmitq);
429 *maddr = &n->links[*bearer_id].maddr; 448 *maddr = &n->links[*bearer_id].maddr;
430 node_lost_contact(n, &le->inputq); 449 node_lost_contact(n, &le->inputq);
450 tipc_bcast_dec_bearer_dst_cnt(n->net, *bearer_id);
431 return; 451 return;
432 } 452 }
453 tipc_bcast_dec_bearer_dst_cnt(n->net, *bearer_id);
433 454
434 /* There is still a working link => initiate failover */ 455 /* There is still a working link => initiate failover */
435 tnl = node_active_link(n, 0); 456 tnl = node_active_link(n, 0);
@@ -493,6 +514,7 @@ void tipc_node_check_dest(struct net *net, u32 onode,
493 bool link_up = false; 514 bool link_up = false;
494 bool accept_addr = false; 515 bool accept_addr = false;
495 bool reset = true; 516 bool reset = true;
517 char *if_name;
496 518
497 *dupl_addr = false; 519 *dupl_addr = false;
498 *respond = false; 520 *respond = false;
@@ -579,9 +601,15 @@ void tipc_node_check_dest(struct net *net, u32 onode,
579 pr_warn("Cannot establish 3rd link to %x\n", n->addr); 601 pr_warn("Cannot establish 3rd link to %x\n", n->addr);
580 goto exit; 602 goto exit;
581 } 603 }
582 if (!tipc_link_create(n, b, mod(tipc_net(net)->random), 604 if_name = strchr(b->name, ':') + 1;
583 tipc_own_addr(net), onode, &le->maddr, 605 if (!tipc_link_create(net, if_name, b->identity, b->tolerance,
584 &le->inputq, &n->bclink.namedq, &l)) { 606 b->net_plane, b->mtu, b->priority,
607 b->window, mod(tipc_net(net)->random),
608 tipc_own_addr(net), onode,
609 n->capabilities,
610 tipc_bc_sndlink(n->net), n->bc_entry.link,
611 &le->inputq,
612 &n->bc_entry.namedq, &l)) {
585 *respond = false; 613 *respond = false;
586 goto exit; 614 goto exit;
587 } 615 }
@@ -824,58 +852,36 @@ bool tipc_node_filter_pkt(struct tipc_node *n, struct tipc_msg *hdr)
824 return true; 852 return true;
825} 853}
826 854
827static void node_established_contact(struct tipc_node *n_ptr) 855static void node_lost_contact(struct tipc_node *n,
828{
829 tipc_node_fsm_evt(n_ptr, SELF_ESTABL_CONTACT_EVT);
830 n_ptr->action_flags |= TIPC_NOTIFY_NODE_UP;
831 n_ptr->bclink.oos_state = 0;
832 n_ptr->bclink.acked = tipc_bclink_get_last_sent(n_ptr->net);
833 tipc_bclink_add_node(n_ptr->net, n_ptr->addr);
834}
835
836static void node_lost_contact(struct tipc_node *n_ptr,
837 struct sk_buff_head *inputq) 856 struct sk_buff_head *inputq)
838{ 857{
839 char addr_string[16]; 858 char addr_string[16];
840 struct tipc_sock_conn *conn, *safe; 859 struct tipc_sock_conn *conn, *safe;
841 struct tipc_link *l; 860 struct tipc_link *l;
842 struct list_head *conns = &n_ptr->conn_sks; 861 struct list_head *conns = &n->conn_sks;
843 struct sk_buff *skb; 862 struct sk_buff *skb;
844 struct tipc_net *tn = net_generic(n_ptr->net, tipc_net_id);
845 uint i; 863 uint i;
846 864
847 pr_debug("Lost contact with %s\n", 865 pr_debug("Lost contact with %s\n",
848 tipc_addr_string_fill(addr_string, n_ptr->addr)); 866 tipc_addr_string_fill(addr_string, n->addr));
849 867
850 /* Flush broadcast link info associated with lost node */ 868 /* Clean up broadcast state */
851 if (n_ptr->bclink.recv_permitted) { 869 tipc_bcast_remove_peer(n->net, n->bc_entry.link);
852 __skb_queue_purge(&n_ptr->bclink.deferdq);
853
854 if (n_ptr->bclink.reasm_buf) {
855 kfree_skb(n_ptr->bclink.reasm_buf);
856 n_ptr->bclink.reasm_buf = NULL;
857 }
858
859 tipc_bclink_remove_node(n_ptr->net, n_ptr->addr);
860 tipc_bclink_acknowledge(n_ptr, INVALID_LINK_SEQ);
861
862 n_ptr->bclink.recv_permitted = false;
863 }
864 870
865 /* Abort any ongoing link failover */ 871 /* Abort any ongoing link failover */
866 for (i = 0; i < MAX_BEARERS; i++) { 872 for (i = 0; i < MAX_BEARERS; i++) {
867 l = n_ptr->links[i].link; 873 l = n->links[i].link;
868 if (l) 874 if (l)
869 tipc_link_fsm_evt(l, LINK_FAILOVER_END_EVT); 875 tipc_link_fsm_evt(l, LINK_FAILOVER_END_EVT);
870 } 876 }
871 877
872 /* Notify publications from this node */ 878 /* Notify publications from this node */
873 n_ptr->action_flags |= TIPC_NOTIFY_NODE_DOWN; 879 n->action_flags |= TIPC_NOTIFY_NODE_DOWN;
874 880
875 /* Notify sockets connected to node */ 881 /* Notify sockets connected to node */
876 list_for_each_entry_safe(conn, safe, conns, list) { 882 list_for_each_entry_safe(conn, safe, conns, list) {
877 skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, TIPC_CONN_MSG, 883 skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, TIPC_CONN_MSG,
878 SHORT_H_SIZE, 0, tn->own_addr, 884 SHORT_H_SIZE, 0, tipc_own_addr(n->net),
879 conn->peer_node, conn->port, 885 conn->peer_node, conn->port,
880 conn->peer_port, TIPC_ERR_NO_NODE); 886 conn->peer_port, TIPC_ERR_NO_NODE);
881 if (likely(skb)) 887 if (likely(skb))
@@ -937,18 +943,13 @@ void tipc_node_unlock(struct tipc_node *node)
937 publ_list = &node->publ_list; 943 publ_list = &node->publ_list;
938 944
939 node->action_flags &= ~(TIPC_NOTIFY_NODE_DOWN | TIPC_NOTIFY_NODE_UP | 945 node->action_flags &= ~(TIPC_NOTIFY_NODE_DOWN | TIPC_NOTIFY_NODE_UP |
940 TIPC_NOTIFY_LINK_DOWN | TIPC_NOTIFY_LINK_UP | 946 TIPC_NOTIFY_LINK_DOWN | TIPC_NOTIFY_LINK_UP);
941 TIPC_WAKEUP_BCAST_USERS | TIPC_BCAST_MSG_EVT |
942 TIPC_BCAST_RESET);
943 947
944 spin_unlock_bh(&node->lock); 948 spin_unlock_bh(&node->lock);
945 949
946 if (flags & TIPC_NOTIFY_NODE_DOWN) 950 if (flags & TIPC_NOTIFY_NODE_DOWN)
947 tipc_publ_notify(net, publ_list, addr); 951 tipc_publ_notify(net, publ_list, addr);
948 952
949 if (flags & TIPC_WAKEUP_BCAST_USERS)
950 tipc_bclink_wakeup_users(net);
951
952 if (flags & TIPC_NOTIFY_NODE_UP) 953 if (flags & TIPC_NOTIFY_NODE_UP)
953 tipc_named_node_up(net, addr); 954 tipc_named_node_up(net, addr);
954 955
@@ -960,11 +961,6 @@ void tipc_node_unlock(struct tipc_node *node)
960 tipc_nametbl_withdraw(net, TIPC_LINK_STATE, addr, 961 tipc_nametbl_withdraw(net, TIPC_LINK_STATE, addr,
961 link_id, addr); 962 link_id, addr);
962 963
963 if (flags & TIPC_BCAST_MSG_EVT)
964 tipc_bclink_input(net);
965
966 if (flags & TIPC_BCAST_RESET)
967 tipc_node_reset_links(node);
968} 964}
969 965
970/* Caller should hold node lock for the passed node */ 966/* Caller should hold node lock for the passed node */
@@ -1080,6 +1076,67 @@ int tipc_node_xmit_skb(struct net *net, struct sk_buff *skb, u32 dnode,
1080} 1076}
1081 1077
1082/** 1078/**
1079 * tipc_node_bc_rcv - process TIPC broadcast packet arriving from off-node
1080 * @net: the applicable net namespace
1081 * @skb: TIPC packet
1082 * @bearer_id: id of bearer message arrived on
1083 *
1084 * Invoked with no locks held.
1085 */
1086void tipc_node_bc_rcv(struct net *net, struct sk_buff *skb, int bearer_id)
1087{
1088 int rc;
1089 struct sk_buff_head xmitq;
1090 struct tipc_bclink_entry *be;
1091 struct tipc_link_entry *le;
1092 struct tipc_msg *hdr = buf_msg(skb);
1093 int usr = msg_user(hdr);
1094 u32 dnode = msg_destnode(hdr);
1095 struct tipc_node *n;
1096
1097 __skb_queue_head_init(&xmitq);
1098
1099 /* If NACK for other node, let rcv link for that node peek into it */
1100 if ((usr == BCAST_PROTOCOL) && (dnode != tipc_own_addr(net)))
1101 n = tipc_node_find(net, dnode);
1102 else
1103 n = tipc_node_find(net, msg_prevnode(hdr));
1104 if (!n) {
1105 kfree_skb(skb);
1106 return;
1107 }
1108 be = &n->bc_entry;
1109 le = &n->links[bearer_id];
1110
1111 rc = tipc_bcast_rcv(net, be->link, skb);
1112
1113 /* Broadcast link reset may happen at reassembly failure */
1114 if (rc & TIPC_LINK_DOWN_EVT)
1115 tipc_node_reset_links(n);
1116
1117 /* Broadcast ACKs are sent on a unicast link */
1118 if (rc & TIPC_LINK_SND_BC_ACK) {
1119 tipc_node_lock(n);
1120 tipc_link_build_ack_msg(le->link, &xmitq);
1121 tipc_node_unlock(n);
1122 }
1123
1124 if (!skb_queue_empty(&xmitq))
1125 tipc_bearer_xmit(net, bearer_id, &xmitq, &le->maddr);
1126
1127 /* Deliver. 'arrvq' is under inputq2's lock protection */
1128 if (!skb_queue_empty(&be->inputq1)) {
1129 spin_lock_bh(&be->inputq2.lock);
1130 spin_lock_bh(&be->inputq1.lock);
1131 skb_queue_splice_tail_init(&be->inputq1, &be->arrvq);
1132 spin_unlock_bh(&be->inputq1.lock);
1133 spin_unlock_bh(&be->inputq2.lock);
1134 tipc_sk_mcast_rcv(net, &be->arrvq, &be->inputq2);
1135 }
1136 tipc_node_put(n);
1137}
1138
1139/**
1083 * tipc_node_check_state - check and if necessary update node state 1140 * tipc_node_check_state - check and if necessary update node state
1084 * @skb: TIPC packet 1141 * @skb: TIPC packet
1085 * @bearer_id: identity of bearer delivering the packet 1142 * @bearer_id: identity of bearer delivering the packet
@@ -1221,6 +1278,7 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b)
1221 int usr = msg_user(hdr); 1278 int usr = msg_user(hdr);
1222 int bearer_id = b->identity; 1279 int bearer_id = b->identity;
1223 struct tipc_link_entry *le; 1280 struct tipc_link_entry *le;
1281 u16 bc_ack = msg_bcast_ack(hdr);
1224 int rc = 0; 1282 int rc = 0;
1225 1283
1226 __skb_queue_head_init(&xmitq); 1284 __skb_queue_head_init(&xmitq);
@@ -1229,13 +1287,12 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b)
1229 if (unlikely(!tipc_msg_validate(skb))) 1287 if (unlikely(!tipc_msg_validate(skb)))
1230 goto discard; 1288 goto discard;
1231 1289
1232 /* Handle arrival of a non-unicast link packet */ 1290 /* Handle arrival of discovery or broadcast packet */
1233 if (unlikely(msg_non_seq(hdr))) { 1291 if (unlikely(msg_non_seq(hdr))) {
1234 if (usr == LINK_CONFIG) 1292 if (unlikely(usr == LINK_CONFIG))
1235 tipc_disc_rcv(net, skb, b); 1293 return tipc_disc_rcv(net, skb, b);
1236 else 1294 else
1237 tipc_bclink_rcv(net, skb); 1295 return tipc_node_bc_rcv(net, skb, bearer_id);
1238 return;
1239 } 1296 }
1240 1297
1241 /* Locate neighboring node that sent packet */ 1298 /* Locate neighboring node that sent packet */
@@ -1244,19 +1301,18 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b)
1244 goto discard; 1301 goto discard;
1245 le = &n->links[bearer_id]; 1302 le = &n->links[bearer_id];
1246 1303
1304 /* Ensure broadcast reception is in synch with peer's send state */
1305 if (unlikely(usr == LINK_PROTOCOL))
1306 tipc_bcast_sync_rcv(net, n->bc_entry.link, hdr);
1307 else if (unlikely(n->bc_entry.link->acked != bc_ack))
1308 tipc_bcast_ack_rcv(net, n->bc_entry.link, bc_ack);
1309
1247 tipc_node_lock(n); 1310 tipc_node_lock(n);
1248 1311
1249 /* Is reception permitted at the moment ? */ 1312 /* Is reception permitted at the moment ? */
1250 if (!tipc_node_filter_pkt(n, hdr)) 1313 if (!tipc_node_filter_pkt(n, hdr))
1251 goto unlock; 1314 goto unlock;
1252 1315
1253 if (unlikely(msg_user(hdr) == LINK_PROTOCOL))
1254 tipc_bclink_sync_state(n, hdr);
1255
1256 /* Release acked broadcast packets */
1257 if (unlikely(n->bclink.acked != msg_bcast_ack(hdr)))
1258 tipc_bclink_acknowledge(n, msg_bcast_ack(hdr));
1259
1260 /* Check and if necessary update node state */ 1316 /* Check and if necessary update node state */
1261 if (likely(tipc_node_check_state(n, skb, bearer_id, &xmitq))) { 1317 if (likely(tipc_node_check_state(n, skb, bearer_id, &xmitq))) {
1262 rc = tipc_link_rcv(le->link, skb, &xmitq); 1318 rc = tipc_link_rcv(le->link, skb, &xmitq);
@@ -1271,8 +1327,8 @@ unlock:
1271 if (unlikely(rc & TIPC_LINK_DOWN_EVT)) 1327 if (unlikely(rc & TIPC_LINK_DOWN_EVT))
1272 tipc_node_link_down(n, bearer_id, false); 1328 tipc_node_link_down(n, bearer_id, false);
1273 1329
1274 if (unlikely(!skb_queue_empty(&n->bclink.namedq))) 1330 if (unlikely(!skb_queue_empty(&n->bc_entry.namedq)))
1275 tipc_named_rcv(net, &n->bclink.namedq); 1331 tipc_named_rcv(net, &n->bc_entry.namedq);
1276 1332
1277 if (!skb_queue_empty(&le->inputq)) 1333 if (!skb_queue_empty(&le->inputq))
1278 tipc_sk_rcv(net, &le->inputq); 1334 tipc_sk_rcv(net, &le->inputq);
diff --git a/net/tipc/node.h b/net/tipc/node.h
index 344b3e7594fd..6734562d3c6e 100644
--- a/net/tipc/node.h
+++ b/net/tipc/node.h
@@ -55,36 +55,18 @@
55enum { 55enum {
56 TIPC_NOTIFY_NODE_DOWN = (1 << 3), 56 TIPC_NOTIFY_NODE_DOWN = (1 << 3),
57 TIPC_NOTIFY_NODE_UP = (1 << 4), 57 TIPC_NOTIFY_NODE_UP = (1 << 4),
58 TIPC_WAKEUP_BCAST_USERS = (1 << 5),
59 TIPC_NOTIFY_LINK_UP = (1 << 6), 58 TIPC_NOTIFY_LINK_UP = (1 << 6),
60 TIPC_NOTIFY_LINK_DOWN = (1 << 7), 59 TIPC_NOTIFY_LINK_DOWN = (1 << 7)
61 TIPC_BCAST_MSG_EVT = (1 << 9),
62 TIPC_BCAST_RESET = (1 << 10)
63}; 60};
64 61
65/** 62/* Optional capabilities supported by this code version
66 * struct tipc_node_bclink - TIPC node bclink structure
67 * @acked: sequence # of last outbound b'cast message acknowledged by node
68 * @last_in: sequence # of last in-sequence b'cast message received from node
69 * @last_sent: sequence # of last b'cast message sent by node
70 * @oos_state: state tracker for handling OOS b'cast messages
71 * @deferred_queue: deferred queue saved OOS b'cast message received from node
72 * @reasm_buf: broadcast reassembly queue head from node
73 * @inputq_map: bitmap indicating which inqueues should be kicked
74 * @recv_permitted: true if node is allowed to receive b'cast messages
75 */ 63 */
76struct tipc_node_bclink { 64enum {
77 u32 acked; 65 TIPC_BCAST_SYNCH = (1 << 1)
78 u32 last_in;
79 u32 last_sent;
80 u32 oos_state;
81 u32 deferred_size;
82 struct sk_buff_head deferdq;
83 struct sk_buff *reasm_buf;
84 struct sk_buff_head namedq;
85 bool recv_permitted;
86}; 66};
87 67
68#define TIPC_NODE_CAPABILITIES TIPC_BCAST_SYNCH
69
88struct tipc_link_entry { 70struct tipc_link_entry {
89 struct tipc_link *link; 71 struct tipc_link *link;
90 u32 mtu; 72 u32 mtu;
@@ -92,6 +74,14 @@ struct tipc_link_entry {
92 struct tipc_media_addr maddr; 74 struct tipc_media_addr maddr;
93}; 75};
94 76
77struct tipc_bclink_entry {
78 struct tipc_link *link;
79 struct sk_buff_head inputq1;
80 struct sk_buff_head arrvq;
81 struct sk_buff_head inputq2;
82 struct sk_buff_head namedq;
83};
84
95/** 85/**
96 * struct tipc_node - TIPC node structure 86 * struct tipc_node - TIPC node structure
97 * @addr: network address of node 87 * @addr: network address of node
@@ -104,7 +94,6 @@ struct tipc_link_entry {
104 * @active_links: bearer ids of active links, used as index into links[] array 94 * @active_links: bearer ids of active links, used as index into links[] array
105 * @links: array containing references to all links to node 95 * @links: array containing references to all links to node
106 * @action_flags: bit mask of different types of node actions 96 * @action_flags: bit mask of different types of node actions
107 * @bclink: broadcast-related info
108 * @state: connectivity state vs peer node 97 * @state: connectivity state vs peer node
109 * @sync_point: sequence number where synch/failover is finished 98 * @sync_point: sequence number where synch/failover is finished
110 * @list: links to adjacent nodes in sorted list of cluster's nodes 99 * @list: links to adjacent nodes in sorted list of cluster's nodes
@@ -124,8 +113,8 @@ struct tipc_node {
124 struct hlist_node hash; 113 struct hlist_node hash;
125 int active_links[2]; 114 int active_links[2];
126 struct tipc_link_entry links[MAX_BEARERS]; 115 struct tipc_link_entry links[MAX_BEARERS];
116 struct tipc_bclink_entry bc_entry;
127 int action_flags; 117 int action_flags;
128 struct tipc_node_bclink bclink;
129 struct list_head list; 118 struct list_head list;
130 int state; 119 int state;
131 u16 sync_point; 120 u16 sync_point;
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index 1060d52ff23e..552dbaba9cf3 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -689,13 +689,13 @@ static int tipc_sendmcast(struct socket *sock, struct tipc_name_seq *seq,
689 msg_set_hdr_sz(mhdr, MCAST_H_SIZE); 689 msg_set_hdr_sz(mhdr, MCAST_H_SIZE);
690 690
691new_mtu: 691new_mtu:
692 mtu = tipc_bclink_get_mtu(); 692 mtu = tipc_bcast_get_mtu(net);
693 rc = tipc_msg_build(mhdr, msg, 0, dsz, mtu, pktchain); 693 rc = tipc_msg_build(mhdr, msg, 0, dsz, mtu, pktchain);
694 if (unlikely(rc < 0)) 694 if (unlikely(rc < 0))
695 return rc; 695 return rc;
696 696
697 do { 697 do {
698 rc = tipc_bclink_xmit(net, pktchain); 698 rc = tipc_bcast_xmit(net, pktchain);
699 if (likely(!rc)) 699 if (likely(!rc))
700 return dsz; 700 return dsz;
701 701
diff --git a/net/tipc/udp_media.c b/net/tipc/udp_media.c
index 0021c01dec17..816914ef228d 100644
--- a/net/tipc/udp_media.c
+++ b/net/tipc/udp_media.c
@@ -155,14 +155,12 @@ static int tipc_udp_send_msg(struct net *net, struct sk_buff *skb,
155 struct udp_bearer *ub; 155 struct udp_bearer *ub;
156 struct udp_media_addr *dst = (struct udp_media_addr *)&dest->value; 156 struct udp_media_addr *dst = (struct udp_media_addr *)&dest->value;
157 struct udp_media_addr *src = (struct udp_media_addr *)&b->addr.value; 157 struct udp_media_addr *src = (struct udp_media_addr *)&b->addr.value;
158 struct sk_buff *clone;
159 struct rtable *rt; 158 struct rtable *rt;
160 159
161 if (skb_headroom(skb) < UDP_MIN_HEADROOM) 160 if (skb_headroom(skb) < UDP_MIN_HEADROOM)
162 pskb_expand_head(skb, UDP_MIN_HEADROOM, 0, GFP_ATOMIC); 161 pskb_expand_head(skb, UDP_MIN_HEADROOM, 0, GFP_ATOMIC);
163 162
164 clone = skb_clone(skb, GFP_ATOMIC); 163 skb_set_inner_protocol(skb, htons(ETH_P_TIPC));
165 skb_set_inner_protocol(clone, htons(ETH_P_TIPC));
166 ub = rcu_dereference_rtnl(b->media_ptr); 164 ub = rcu_dereference_rtnl(b->media_ptr);
167 if (!ub) { 165 if (!ub) {
168 err = -ENODEV; 166 err = -ENODEV;
@@ -172,7 +170,7 @@ static int tipc_udp_send_msg(struct net *net, struct sk_buff *skb,
172 struct flowi4 fl = { 170 struct flowi4 fl = {
173 .daddr = dst->ipv4.s_addr, 171 .daddr = dst->ipv4.s_addr,
174 .saddr = src->ipv4.s_addr, 172 .saddr = src->ipv4.s_addr,
175 .flowi4_mark = clone->mark, 173 .flowi4_mark = skb->mark,
176 .flowi4_proto = IPPROTO_UDP 174 .flowi4_proto = IPPROTO_UDP
177 }; 175 };
178 rt = ip_route_output_key(net, &fl); 176 rt = ip_route_output_key(net, &fl);
@@ -181,7 +179,7 @@ static int tipc_udp_send_msg(struct net *net, struct sk_buff *skb,
181 goto tx_error; 179 goto tx_error;
182 } 180 }
183 ttl = ip4_dst_hoplimit(&rt->dst); 181 ttl = ip4_dst_hoplimit(&rt->dst);
184 err = udp_tunnel_xmit_skb(rt, ub->ubsock->sk, clone, 182 err = udp_tunnel_xmit_skb(rt, ub->ubsock->sk, skb,
185 src->ipv4.s_addr, 183 src->ipv4.s_addr,
186 dst->ipv4.s_addr, 0, ttl, 0, 184 dst->ipv4.s_addr, 0, ttl, 0,
187 src->udp_port, dst->udp_port, 185 src->udp_port, dst->udp_port,
@@ -204,7 +202,7 @@ static int tipc_udp_send_msg(struct net *net, struct sk_buff *skb,
204 if (err) 202 if (err)
205 goto tx_error; 203 goto tx_error;
206 ttl = ip6_dst_hoplimit(ndst); 204 ttl = ip6_dst_hoplimit(ndst);
207 err = udp_tunnel6_xmit_skb(ndst, ub->ubsock->sk, clone, 205 err = udp_tunnel6_xmit_skb(ndst, ub->ubsock->sk, skb,
208 ndst->dev, &src->ipv6, 206 ndst->dev, &src->ipv6,
209 &dst->ipv6, 0, ttl, src->udp_port, 207 &dst->ipv6, 0, ttl, src->udp_port,
210 dst->udp_port, false); 208 dst->udp_port, false);
@@ -213,7 +211,7 @@ static int tipc_udp_send_msg(struct net *net, struct sk_buff *skb,
213 return err; 211 return err;
214 212
215tx_error: 213tx_error:
216 kfree_skb(clone); 214 kfree_skb(skb);
217 return err; 215 return err;
218} 216}
219 217