aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDavid S. Miller <davem@davemloft.net>2015-10-24 09:56:54 -0400
committerDavid S. Miller <davem@davemloft.net>2015-10-24 09:56:54 -0400
commit687f079addba1ac7f97ce97080c2291bbe8c8dce (patch)
tree5da9c2e91de35b9111a3badb947416deba5083d8
parentba3e2084f268bdfed7627046e58a2218037e15af (diff)
parent2af5ae372a4b6d6e2d3314af0e9c865d6d64f8d3 (diff)
Merge branch 'tipc-next'
Jon Maloy says: ==================== tipc: improve broadcast implementation The TIPC broadcast link implementation is currently complex and hard to follow. It also incurs some amount of code and structure duplication, something that can be reduced significantly with a little effort. This commit series introduces a number of improvements which address both the locking structure, the code/structure duplication issue, and the overall readbility of the code. The series consists of three main parts: 1-7: Adaptation to the new link structure, and preparation for the next step. In particular, we want the broadcast transmission link to have a life cycle that is longer than any of its potential (unicast and broadcast receive links) users. This eliminates the need to always test for the presence of this link before accessing it. 8-10: This is what is really new in this series. Commit #9 is by far the largest and most important one, because it moves most of the broadcast functionality into link.c, partially reusing the fields and functionality of the unicast link. The removal of the "node_map" infrastructure in commit #10 is also an important achievement. 11-16: Some improvements leveraging the changes made in the previous commits. The series needs commit 53387c4e22ac ("tipc: extend broadcast link window size") and commit e53567948f82 ("tipc: conditionally expand buffer headroom over udp tunnel") which are both present in 'net' but not yet in 'net-next', to apply cleanly. ==================== Signed-off-by: David S. Miller <davem@davemloft.net>
-rw-r--r--net/tipc/bcast.c988
-rw-r--r--net/tipc/bcast.h122
-rw-r--r--net/tipc/bearer.c94
-rw-r--r--net/tipc/bearer.h9
-rw-r--r--net/tipc/core.c9
-rw-r--r--net/tipc/core.h12
-rw-r--r--net/tipc/discover.c28
-rw-r--r--net/tipc/link.c775
-rw-r--r--net/tipc/link.h76
-rw-r--r--net/tipc/msg.c20
-rw-r--r--net/tipc/msg.h8
-rw-r--r--net/tipc/name_distr.c4
-rw-r--r--net/tipc/net.c6
-rw-r--r--net/tipc/node.c184
-rw-r--r--net/tipc/node.h41
-rw-r--r--net/tipc/socket.c4
-rw-r--r--net/tipc/udp_media.c12
17 files changed, 1005 insertions, 1387 deletions
diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index eadba62afa85..9dc239dfe192 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -35,742 +35,301 @@
35 * POSSIBILITY OF SUCH DAMAGE. 35 * POSSIBILITY OF SUCH DAMAGE.
36 */ 36 */
37 37
38#include <linux/tipc_config.h>
38#include "socket.h" 39#include "socket.h"
39#include "msg.h" 40#include "msg.h"
40#include "bcast.h" 41#include "bcast.h"
41#include "name_distr.h" 42#include "name_distr.h"
42#include "core.h" 43#include "link.h"
44#include "node.h"
43 45
44#define MAX_PKT_DEFAULT_MCAST 1500 /* bcast link max packet size (fixed) */
45#define BCLINK_WIN_DEFAULT 50 /* bcast link window size (default) */ 46#define BCLINK_WIN_DEFAULT 50 /* bcast link window size (default) */
46#define BCLINK_WIN_MIN 32 /* bcast minimum link window size */ 47#define BCLINK_WIN_MIN 32 /* bcast minimum link window size */
47 48
48const char tipc_bclink_name[] = "broadcast-link"; 49const char tipc_bclink_name[] = "broadcast-link";
49 50
50static void tipc_nmap_diff(struct tipc_node_map *nm_a, 51/**
51 struct tipc_node_map *nm_b, 52 * struct tipc_bc_base - base structure for keeping broadcast send state
52 struct tipc_node_map *nm_diff); 53 * @link: broadcast send link structure
53static void tipc_nmap_add(struct tipc_node_map *nm_ptr, u32 node); 54 * @inputq: data input queue; will only carry SOCK_WAKEUP messages
54static void tipc_nmap_remove(struct tipc_node_map *nm_ptr, u32 node); 55 * @dest: array keeping number of reachable destinations per bearer
55 56 * @primary_bearer: a bearer having links to all broadcast destinations, if any
56static void tipc_bclink_lock(struct net *net) 57 */
57{ 58struct tipc_bc_base {
58 struct tipc_net *tn = net_generic(net, tipc_net_id); 59 struct tipc_link *link;
59 60 struct sk_buff_head inputq;
60 spin_lock_bh(&tn->bclink->lock); 61 int dests[MAX_BEARERS];
61} 62 int primary_bearer;
62 63};
63static void tipc_bclink_unlock(struct net *net)
64{
65 struct tipc_net *tn = net_generic(net, tipc_net_id);
66
67 spin_unlock_bh(&tn->bclink->lock);
68}
69
70void tipc_bclink_input(struct net *net)
71{
72 struct tipc_net *tn = net_generic(net, tipc_net_id);
73
74 tipc_sk_mcast_rcv(net, &tn->bclink->arrvq, &tn->bclink->inputq);
75}
76
77uint tipc_bclink_get_mtu(void)
78{
79 return MAX_PKT_DEFAULT_MCAST;
80}
81
82static u32 bcbuf_acks(struct sk_buff *buf)
83{
84 return (u32)(unsigned long)TIPC_SKB_CB(buf)->handle;
85}
86
87static void bcbuf_set_acks(struct sk_buff *buf, u32 acks)
88{
89 TIPC_SKB_CB(buf)->handle = (void *)(unsigned long)acks;
90}
91
92static void bcbuf_decr_acks(struct sk_buff *buf)
93{
94 bcbuf_set_acks(buf, bcbuf_acks(buf) - 1);
95}
96 64
97void tipc_bclink_add_node(struct net *net, u32 addr) 65static struct tipc_bc_base *tipc_bc_base(struct net *net)
98{ 66{
99 struct tipc_net *tn = net_generic(net, tipc_net_id); 67 return tipc_net(net)->bcbase;
100
101 tipc_bclink_lock(net);
102 tipc_nmap_add(&tn->bclink->bcast_nodes, addr);
103 tipc_bclink_unlock(net);
104} 68}
105 69
106void tipc_bclink_remove_node(struct net *net, u32 addr) 70int tipc_bcast_get_mtu(struct net *net)
107{ 71{
108 struct tipc_net *tn = net_generic(net, tipc_net_id); 72 return tipc_link_mtu(tipc_bc_sndlink(net));
109
110 tipc_bclink_lock(net);
111 tipc_nmap_remove(&tn->bclink->bcast_nodes, addr);
112
113 /* Last node? => reset backlog queue */
114 if (!tn->bclink->bcast_nodes.count)
115 tipc_link_purge_backlog(&tn->bclink->link);
116
117 tipc_bclink_unlock(net);
118} 73}
119 74
120static void bclink_set_last_sent(struct net *net) 75/* tipc_bcbase_select_primary(): find a bearer with links to all destinations,
76 * if any, and make it primary bearer
77 */
78static void tipc_bcbase_select_primary(struct net *net)
121{ 79{
122 struct tipc_net *tn = net_generic(net, tipc_net_id); 80 struct tipc_bc_base *bb = tipc_bc_base(net);
123 struct tipc_link *bcl = tn->bcl; 81 int all_dests = tipc_link_bc_peers(bb->link);
82 int i, mtu;
124 83
125 bcl->silent_intv_cnt = mod(bcl->snd_nxt - 1); 84 bb->primary_bearer = INVALID_BEARER_ID;
126}
127 85
128u32 tipc_bclink_get_last_sent(struct net *net) 86 if (!all_dests)
129{ 87 return;
130 struct tipc_net *tn = net_generic(net, tipc_net_id);
131 88
132 return tn->bcl->silent_intv_cnt; 89 for (i = 0; i < MAX_BEARERS; i++) {
133} 90 if (!bb->dests[i])
91 continue;
134 92
135static void bclink_update_last_sent(struct tipc_node *node, u32 seqno) 93 mtu = tipc_bearer_mtu(net, i);
136{ 94 if (mtu < tipc_link_mtu(bb->link))
137 node->bclink.last_sent = less_eq(node->bclink.last_sent, seqno) ? 95 tipc_link_set_mtu(bb->link, mtu);
138 seqno : node->bclink.last_sent;
139}
140 96
141/** 97 if (bb->dests[i] < all_dests)
142 * tipc_bclink_retransmit_to - get most recent node to request retransmission 98 continue;
143 *
144 * Called with bclink_lock locked
145 */
146struct tipc_node *tipc_bclink_retransmit_to(struct net *net)
147{
148 struct tipc_net *tn = net_generic(net, tipc_net_id);
149
150 return tn->bclink->retransmit_to;
151}
152 99
153/** 100 bb->primary_bearer = i;
154 * bclink_retransmit_pkt - retransmit broadcast packets
155 * @after: sequence number of last packet to *not* retransmit
156 * @to: sequence number of last packet to retransmit
157 *
158 * Called with bclink_lock locked
159 */
160static void bclink_retransmit_pkt(struct tipc_net *tn, u32 after, u32 to)
161{
162 struct sk_buff *skb;
163 struct tipc_link *bcl = tn->bcl;
164 101
165 skb_queue_walk(&bcl->transmq, skb) { 102 /* Reduce risk that all nodes select same primary */
166 if (more(buf_seqno(skb), after)) { 103 if ((i ^ tipc_own_addr(net)) & 1)
167 tipc_link_retransmit(bcl, skb, mod(to - after));
168 break; 104 break;
169 }
170 } 105 }
171} 106}
172 107
173/** 108void tipc_bcast_inc_bearer_dst_cnt(struct net *net, int bearer_id)
174 * bclink_prepare_wakeup - prepare users for wakeup after congestion
175 * @bcl: broadcast link
176 * @resultq: queue for users which can be woken up
177 * Move a number of waiting users, as permitted by available space in
178 * the send queue, from link wait queue to specified queue for wakeup
179 */
180static void bclink_prepare_wakeup(struct tipc_link *bcl, struct sk_buff_head *resultq)
181{ 109{
182 int pnd[TIPC_SYSTEM_IMPORTANCE + 1] = {0,}; 110 struct tipc_bc_base *bb = tipc_bc_base(net);
183 int imp, lim;
184 struct sk_buff *skb, *tmp;
185
186 skb_queue_walk_safe(&bcl->wakeupq, skb, tmp) {
187 imp = TIPC_SKB_CB(skb)->chain_imp;
188 lim = bcl->window + bcl->backlog[imp].limit;
189 pnd[imp] += TIPC_SKB_CB(skb)->chain_sz;
190 if ((pnd[imp] + bcl->backlog[imp].len) >= lim)
191 continue;
192 skb_unlink(skb, &bcl->wakeupq);
193 skb_queue_tail(resultq, skb);
194 }
195}
196 111
197/** 112 tipc_bcast_lock(net);
198 * tipc_bclink_wakeup_users - wake up pending users 113 bb->dests[bearer_id]++;
199 * 114 tipc_bcbase_select_primary(net);
200 * Called with no locks taken 115 tipc_bcast_unlock(net);
201 */
202void tipc_bclink_wakeup_users(struct net *net)
203{
204 struct tipc_net *tn = net_generic(net, tipc_net_id);
205 struct tipc_link *bcl = tn->bcl;
206 struct sk_buff_head resultq;
207
208 skb_queue_head_init(&resultq);
209 bclink_prepare_wakeup(bcl, &resultq);
210 tipc_sk_rcv(net, &resultq);
211} 116}
212 117
213/** 118void tipc_bcast_dec_bearer_dst_cnt(struct net *net, int bearer_id)
214 * tipc_bclink_acknowledge - handle acknowledgement of broadcast packets
215 * @n_ptr: node that sent acknowledgement info
216 * @acked: broadcast sequence # that has been acknowledged
217 *
218 * Node is locked, bclink_lock unlocked.
219 */
220void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked)
221{ 119{
222 struct sk_buff *skb, *tmp; 120 struct tipc_bc_base *bb = tipc_bc_base(net);
223 unsigned int released = 0;
224 struct net *net = n_ptr->net;
225 struct tipc_net *tn = net_generic(net, tipc_net_id);
226
227 if (unlikely(!n_ptr->bclink.recv_permitted))
228 return;
229 121
230 tipc_bclink_lock(net); 122 tipc_bcast_lock(net);
231 123 bb->dests[bearer_id]--;
232 /* Bail out if tx queue is empty (no clean up is required) */ 124 tipc_bcbase_select_primary(net);
233 skb = skb_peek(&tn->bcl->transmq); 125 tipc_bcast_unlock(net);
234 if (!skb)
235 goto exit;
236
237 /* Determine which messages need to be acknowledged */
238 if (acked == INVALID_LINK_SEQ) {
239 /*
240 * Contact with specified node has been lost, so need to
241 * acknowledge sent messages only (if other nodes still exist)
242 * or both sent and unsent messages (otherwise)
243 */
244 if (tn->bclink->bcast_nodes.count)
245 acked = tn->bcl->silent_intv_cnt;
246 else
247 acked = tn->bcl->snd_nxt;
248 } else {
249 /*
250 * Bail out if specified sequence number does not correspond
251 * to a message that has been sent and not yet acknowledged
252 */
253 if (less(acked, buf_seqno(skb)) ||
254 less(tn->bcl->silent_intv_cnt, acked) ||
255 less_eq(acked, n_ptr->bclink.acked))
256 goto exit;
257 }
258
259 /* Skip over packets that node has previously acknowledged */
260 skb_queue_walk(&tn->bcl->transmq, skb) {
261 if (more(buf_seqno(skb), n_ptr->bclink.acked))
262 break;
263 }
264
265 /* Update packets that node is now acknowledging */
266 skb_queue_walk_from_safe(&tn->bcl->transmq, skb, tmp) {
267 if (more(buf_seqno(skb), acked))
268 break;
269 bcbuf_decr_acks(skb);
270 bclink_set_last_sent(net);
271 if (bcbuf_acks(skb) == 0) {
272 __skb_unlink(skb, &tn->bcl->transmq);
273 kfree_skb(skb);
274 released = 1;
275 }
276 }
277 n_ptr->bclink.acked = acked;
278
279 /* Try resolving broadcast link congestion, if necessary */
280 if (unlikely(skb_peek(&tn->bcl->backlogq))) {
281 tipc_link_push_packets(tn->bcl);
282 bclink_set_last_sent(net);
283 }
284 if (unlikely(released && !skb_queue_empty(&tn->bcl->wakeupq)))
285 n_ptr->action_flags |= TIPC_WAKEUP_BCAST_USERS;
286exit:
287 tipc_bclink_unlock(net);
288} 126}
289 127
290/** 128/* tipc_bcbase_xmit - broadcast a packet queue across one or more bearers
291 * tipc_bclink_update_link_state - update broadcast link state
292 * 129 *
293 * RCU and node lock set 130 * Note that number of reachable destinations, as indicated in the dests[]
131 * array, may transitionally differ from the number of destinations indicated
132 * in each sent buffer. We can sustain this. Excess destination nodes will
133 * drop and never acknowledge the unexpected packets, and missing destinations
134 * will either require retransmission (if they are just about to be added to
135 * the bearer), or be removed from the buffer's 'ackers' counter (if they
136 * just went down)
294 */ 137 */
295void tipc_bclink_update_link_state(struct tipc_node *n_ptr, 138static void tipc_bcbase_xmit(struct net *net, struct sk_buff_head *xmitq)
296 u32 last_sent)
297{ 139{
298 struct sk_buff *buf; 140 int bearer_id;
299 struct net *net = n_ptr->net; 141 struct tipc_bc_base *bb = tipc_bc_base(net);
300 struct tipc_net *tn = net_generic(net, tipc_net_id); 142 struct sk_buff *skb, *_skb;
301 143 struct sk_buff_head _xmitq;
302 /* Ignore "stale" link state info */
303 if (less_eq(last_sent, n_ptr->bclink.last_in))
304 return;
305 144
306 /* Update link synchronization state; quit if in sync */ 145 if (skb_queue_empty(xmitq))
307 bclink_update_last_sent(n_ptr, last_sent);
308
309 if (n_ptr->bclink.last_sent == n_ptr->bclink.last_in)
310 return; 146 return;
311 147
312 /* Update out-of-sync state; quit if loss is still unconfirmed */ 148 /* The typical case: at least one bearer has links to all nodes */
313 if ((++n_ptr->bclink.oos_state) == 1) { 149 bearer_id = bb->primary_bearer;
314 if (n_ptr->bclink.deferred_size < (TIPC_MIN_LINK_WIN / 2)) 150 if (bearer_id >= 0) {
315 return; 151 tipc_bearer_bc_xmit(net, bearer_id, xmitq);
316 n_ptr->bclink.oos_state++;
317 }
318
319 /* Don't NACK if one has been recently sent (or seen) */
320 if (n_ptr->bclink.oos_state & 0x1)
321 return; 152 return;
322
323 /* Send NACK */
324 buf = tipc_buf_acquire(INT_H_SIZE);
325 if (buf) {
326 struct tipc_msg *msg = buf_msg(buf);
327 struct sk_buff *skb = skb_peek(&n_ptr->bclink.deferdq);
328 u32 to = skb ? buf_seqno(skb) - 1 : n_ptr->bclink.last_sent;
329
330 tipc_msg_init(tn->own_addr, msg, BCAST_PROTOCOL, STATE_MSG,
331 INT_H_SIZE, n_ptr->addr);
332 msg_set_non_seq(msg, 1);
333 msg_set_mc_netid(msg, tn->net_id);
334 msg_set_bcast_ack(msg, n_ptr->bclink.last_in);
335 msg_set_bcgap_after(msg, n_ptr->bclink.last_in);
336 msg_set_bcgap_to(msg, to);
337
338 tipc_bclink_lock(net);
339 tipc_bearer_send(net, MAX_BEARERS, buf, NULL);
340 tn->bcl->stats.sent_nacks++;
341 tipc_bclink_unlock(net);
342 kfree_skb(buf);
343
344 n_ptr->bclink.oos_state++;
345 } 153 }
346}
347 154
348void tipc_bclink_sync_state(struct tipc_node *n, struct tipc_msg *hdr) 155 /* We have to transmit across all bearers */
349{ 156 skb_queue_head_init(&_xmitq);
350 u16 last = msg_last_bcast(hdr); 157 for (bearer_id = 0; bearer_id < MAX_BEARERS; bearer_id++) {
351 int mtyp = msg_type(hdr); 158 if (!bb->dests[bearer_id])
159 continue;
352 160
353 if (unlikely(msg_user(hdr) != LINK_PROTOCOL)) 161 skb_queue_walk(xmitq, skb) {
354 return; 162 _skb = pskb_copy_for_clone(skb, GFP_ATOMIC);
355 if (mtyp == STATE_MSG) { 163 if (!_skb)
356 tipc_bclink_update_link_state(n, last); 164 break;
357 return; 165 __skb_queue_tail(&_xmitq, _skb);
166 }
167 tipc_bearer_bc_xmit(net, bearer_id, &_xmitq);
358 } 168 }
359 /* Compatibility: older nodes don't know BCAST_PROTOCOL synchronization, 169 __skb_queue_purge(xmitq);
360 * and transfer synch info in LINK_PROTOCOL messages. 170 __skb_queue_purge(&_xmitq);
361 */
362 if (tipc_node_is_up(n))
363 return;
364 if ((mtyp != RESET_MSG) && (mtyp != ACTIVATE_MSG))
365 return;
366 n->bclink.last_sent = last;
367 n->bclink.last_in = last;
368 n->bclink.oos_state = 0;
369} 171}
370 172
371/** 173/* tipc_bcast_xmit - deliver buffer chain to all nodes in cluster
372 * bclink_peek_nack - monitor retransmission requests sent by other nodes
373 *
374 * Delay any upcoming NACK by this node if another node has already
375 * requested the first message this node is going to ask for.
376 */
377static void bclink_peek_nack(struct net *net, struct tipc_msg *msg)
378{
379 struct tipc_node *n_ptr = tipc_node_find(net, msg_destnode(msg));
380
381 if (unlikely(!n_ptr))
382 return;
383
384 tipc_node_lock(n_ptr);
385 if (n_ptr->bclink.recv_permitted &&
386 (n_ptr->bclink.last_in != n_ptr->bclink.last_sent) &&
387 (n_ptr->bclink.last_in == msg_bcgap_after(msg)))
388 n_ptr->bclink.oos_state = 2;
389 tipc_node_unlock(n_ptr);
390 tipc_node_put(n_ptr);
391}
392
393/* tipc_bclink_xmit - deliver buffer chain to all nodes in cluster
394 * and to identified node local sockets 174 * and to identified node local sockets
395 * @net: the applicable net namespace 175 * @net: the applicable net namespace
396 * @list: chain of buffers containing message 176 * @list: chain of buffers containing message
397 * Consumes the buffer chain, except when returning -ELINKCONG 177 * Consumes the buffer chain, except when returning -ELINKCONG
398 * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE 178 * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE
399 */ 179 */
400int tipc_bclink_xmit(struct net *net, struct sk_buff_head *list) 180int tipc_bcast_xmit(struct net *net, struct sk_buff_head *list)
401{ 181{
402 struct tipc_net *tn = net_generic(net, tipc_net_id); 182 struct tipc_link *l = tipc_bc_sndlink(net);
403 struct tipc_link *bcl = tn->bcl; 183 struct sk_buff_head xmitq, inputq, rcvq;
404 struct tipc_bclink *bclink = tn->bclink;
405 int rc = 0; 184 int rc = 0;
406 int bc = 0;
407 struct sk_buff *skb;
408 struct sk_buff_head arrvq;
409 struct sk_buff_head inputq;
410 185
411 /* Prepare clone of message for local node */ 186 __skb_queue_head_init(&rcvq);
412 skb = tipc_msg_reassemble(list); 187 __skb_queue_head_init(&xmitq);
413 if (unlikely(!skb)) 188 skb_queue_head_init(&inputq);
414 return -EHOSTUNREACH;
415 189
416 /* Broadcast to all nodes */ 190 /* Prepare message clone for local node */
417 if (likely(bclink)) { 191 if (unlikely(!tipc_msg_reassemble(list, &rcvq)))
418 tipc_bclink_lock(net); 192 return -EHOSTUNREACH;
419 if (likely(bclink->bcast_nodes.count)) {
420 rc = __tipc_link_xmit(net, bcl, list);
421 if (likely(!rc)) {
422 u32 len = skb_queue_len(&bcl->transmq);
423
424 bclink_set_last_sent(net);
425 bcl->stats.queue_sz_counts++;
426 bcl->stats.accu_queue_sz += len;
427 }
428 bc = 1;
429 }
430 tipc_bclink_unlock(net);
431 }
432 193
433 if (unlikely(!bc)) 194 tipc_bcast_lock(net);
434 __skb_queue_purge(list); 195 if (tipc_link_bc_peers(l))
196 rc = tipc_link_xmit(l, list, &xmitq);
197 tipc_bcast_unlock(net);
435 198
199 /* Don't send to local node if adding to link failed */
436 if (unlikely(rc)) { 200 if (unlikely(rc)) {
437 kfree_skb(skb); 201 __skb_queue_purge(&rcvq);
438 return rc; 202 return rc;
439 } 203 }
440 /* Deliver message clone */
441 __skb_queue_head_init(&arrvq);
442 skb_queue_head_init(&inputq);
443 __skb_queue_tail(&arrvq, skb);
444 tipc_sk_mcast_rcv(net, &arrvq, &inputq);
445 return rc;
446}
447 204
448/** 205 /* Broadcast to all nodes, inluding local node */
449 * bclink_accept_pkt - accept an incoming, in-sequence broadcast packet 206 tipc_bcbase_xmit(net, &xmitq);
450 * 207 tipc_sk_mcast_rcv(net, &rcvq, &inputq);
451 * Called with both sending node's lock and bclink_lock taken. 208 __skb_queue_purge(list);
452 */ 209 return 0;
453static void bclink_accept_pkt(struct tipc_node *node, u32 seqno)
454{
455 struct tipc_net *tn = net_generic(node->net, tipc_net_id);
456
457 bclink_update_last_sent(node, seqno);
458 node->bclink.last_in = seqno;
459 node->bclink.oos_state = 0;
460 tn->bcl->stats.recv_info++;
461
462 /*
463 * Unicast an ACK periodically, ensuring that
464 * all nodes in the cluster don't ACK at the same time
465 */
466 if (((seqno - tn->own_addr) % TIPC_MIN_LINK_WIN) == 0) {
467 tipc_link_proto_xmit(node_active_link(node, node->addr),
468 STATE_MSG, 0, 0, 0, 0);
469 tn->bcl->stats.sent_acks++;
470 }
471} 210}
472 211
473/** 212/* tipc_bcast_rcv - receive a broadcast packet, and deliver to rcv link
474 * tipc_bclink_rcv - receive a broadcast packet, and deliver upwards
475 * 213 *
476 * RCU is locked, no other locks set 214 * RCU is locked, no other locks set
477 */ 215 */
478void tipc_bclink_rcv(struct net *net, struct sk_buff *buf) 216int tipc_bcast_rcv(struct net *net, struct tipc_link *l, struct sk_buff *skb)
479{ 217{
480 struct tipc_net *tn = net_generic(net, tipc_net_id); 218 struct tipc_msg *hdr = buf_msg(skb);
481 struct tipc_link *bcl = tn->bcl; 219 struct sk_buff_head *inputq = &tipc_bc_base(net)->inputq;
482 struct tipc_msg *msg = buf_msg(buf); 220 struct sk_buff_head xmitq;
483 struct tipc_node *node; 221 int rc;
484 u32 next_in;
485 u32 seqno;
486 int deferred = 0;
487 int pos = 0;
488 struct sk_buff *iskb;
489 struct sk_buff_head *arrvq, *inputq;
490
491 /* Screen out unwanted broadcast messages */
492 if (msg_mc_netid(msg) != tn->net_id)
493 goto exit;
494
495 node = tipc_node_find(net, msg_prevnode(msg));
496 if (unlikely(!node))
497 goto exit;
498
499 tipc_node_lock(node);
500 if (unlikely(!node->bclink.recv_permitted))
501 goto unlock;
502
503 /* Handle broadcast protocol message */
504 if (unlikely(msg_user(msg) == BCAST_PROTOCOL)) {
505 if (msg_type(msg) != STATE_MSG)
506 goto unlock;
507 if (msg_destnode(msg) == tn->own_addr) {
508 tipc_bclink_acknowledge(node, msg_bcast_ack(msg));
509 tipc_bclink_lock(net);
510 bcl->stats.recv_nacks++;
511 tn->bclink->retransmit_to = node;
512 bclink_retransmit_pkt(tn, msg_bcgap_after(msg),
513 msg_bcgap_to(msg));
514 tipc_bclink_unlock(net);
515 tipc_node_unlock(node);
516 } else {
517 tipc_node_unlock(node);
518 bclink_peek_nack(net, msg);
519 }
520 tipc_node_put(node);
521 goto exit;
522 }
523
524 /* Handle in-sequence broadcast message */
525 seqno = msg_seqno(msg);
526 next_in = mod(node->bclink.last_in + 1);
527 arrvq = &tn->bclink->arrvq;
528 inputq = &tn->bclink->inputq;
529
530 if (likely(seqno == next_in)) {
531receive:
532 /* Deliver message to destination */
533 if (likely(msg_isdata(msg))) {
534 tipc_bclink_lock(net);
535 bclink_accept_pkt(node, seqno);
536 spin_lock_bh(&inputq->lock);
537 __skb_queue_tail(arrvq, buf);
538 spin_unlock_bh(&inputq->lock);
539 node->action_flags |= TIPC_BCAST_MSG_EVT;
540 tipc_bclink_unlock(net);
541 tipc_node_unlock(node);
542 } else if (msg_user(msg) == MSG_BUNDLER) {
543 tipc_bclink_lock(net);
544 bclink_accept_pkt(node, seqno);
545 bcl->stats.recv_bundles++;
546 bcl->stats.recv_bundled += msg_msgcnt(msg);
547 pos = 0;
548 while (tipc_msg_extract(buf, &iskb, &pos)) {
549 spin_lock_bh(&inputq->lock);
550 __skb_queue_tail(arrvq, iskb);
551 spin_unlock_bh(&inputq->lock);
552 }
553 node->action_flags |= TIPC_BCAST_MSG_EVT;
554 tipc_bclink_unlock(net);
555 tipc_node_unlock(node);
556 } else if (msg_user(msg) == MSG_FRAGMENTER) {
557 tipc_bclink_lock(net);
558 bclink_accept_pkt(node, seqno);
559 tipc_buf_append(&node->bclink.reasm_buf, &buf);
560 if (unlikely(!buf && !node->bclink.reasm_buf)) {
561 tipc_bclink_unlock(net);
562 goto unlock;
563 }
564 bcl->stats.recv_fragments++;
565 if (buf) {
566 bcl->stats.recv_fragmented++;
567 msg = buf_msg(buf);
568 tipc_bclink_unlock(net);
569 goto receive;
570 }
571 tipc_bclink_unlock(net);
572 tipc_node_unlock(node);
573 } else {
574 tipc_bclink_lock(net);
575 bclink_accept_pkt(node, seqno);
576 tipc_bclink_unlock(net);
577 tipc_node_unlock(node);
578 kfree_skb(buf);
579 }
580 buf = NULL;
581 222
582 /* Determine new synchronization state */ 223 __skb_queue_head_init(&xmitq);
583 tipc_node_lock(node);
584 if (unlikely(!tipc_node_is_up(node)))
585 goto unlock;
586 224
587 if (node->bclink.last_in == node->bclink.last_sent) 225 if (msg_mc_netid(hdr) != tipc_netid(net) || !tipc_link_is_up(l)) {
588 goto unlock; 226 kfree_skb(skb);
589 227 return 0;
590 if (skb_queue_empty(&node->bclink.deferdq)) {
591 node->bclink.oos_state = 1;
592 goto unlock;
593 }
594
595 msg = buf_msg(skb_peek(&node->bclink.deferdq));
596 seqno = msg_seqno(msg);
597 next_in = mod(next_in + 1);
598 if (seqno != next_in)
599 goto unlock;
600
601 /* Take in-sequence message from deferred queue & deliver it */
602 buf = __skb_dequeue(&node->bclink.deferdq);
603 goto receive;
604 }
605
606 /* Handle out-of-sequence broadcast message */
607 if (less(next_in, seqno)) {
608 deferred = tipc_link_defer_pkt(&node->bclink.deferdq,
609 buf);
610 bclink_update_last_sent(node, seqno);
611 buf = NULL;
612 } 228 }
613 229
614 tipc_bclink_lock(net); 230 tipc_bcast_lock(net);
615 231 if (msg_user(hdr) == BCAST_PROTOCOL)
616 if (deferred) 232 rc = tipc_link_bc_nack_rcv(l, skb, &xmitq);
617 bcl->stats.deferred_recv++;
618 else 233 else
619 bcl->stats.duplicates++; 234 rc = tipc_link_rcv(l, skb, NULL);
235 tipc_bcast_unlock(net);
620 236
621 tipc_bclink_unlock(net); 237 tipc_bcbase_xmit(net, &xmitq);
622 238
623unlock: 239 /* Any socket wakeup messages ? */
624 tipc_node_unlock(node); 240 if (!skb_queue_empty(inputq))
625 tipc_node_put(node); 241 tipc_sk_rcv(net, inputq);
626exit:
627 kfree_skb(buf);
628}
629 242
630u32 tipc_bclink_acks_missing(struct tipc_node *n_ptr) 243 return rc;
631{
632 return (n_ptr->bclink.recv_permitted &&
633 (tipc_bclink_get_last_sent(n_ptr->net) != n_ptr->bclink.acked));
634} 244}
635 245
636 246/* tipc_bcast_ack_rcv - receive and handle a broadcast acknowledge
637/**
638 * tipc_bcbearer_send - send a packet through the broadcast pseudo-bearer
639 * 247 *
640 * Send packet over as many bearers as necessary to reach all nodes 248 * RCU is locked, no other locks set
641 * that have joined the broadcast link.
642 *
643 * Returns 0 (packet sent successfully) under all circumstances,
644 * since the broadcast link's pseudo-bearer never blocks
645 */ 249 */
646static int tipc_bcbearer_send(struct net *net, struct sk_buff *buf, 250void tipc_bcast_ack_rcv(struct net *net, struct tipc_link *l, u32 acked)
647 struct tipc_bearer *unused1,
648 struct tipc_media_addr *unused2)
649{ 251{
650 int bp_index; 252 struct sk_buff_head *inputq = &tipc_bc_base(net)->inputq;
651 struct tipc_msg *msg = buf_msg(buf); 253 struct sk_buff_head xmitq;
652 struct tipc_net *tn = net_generic(net, tipc_net_id);
653 struct tipc_bcbearer *bcbearer = tn->bcbearer;
654 struct tipc_bclink *bclink = tn->bclink;
655
656 /* Prepare broadcast link message for reliable transmission,
657 * if first time trying to send it;
658 * preparation is skipped for broadcast link protocol messages
659 * since they are sent in an unreliable manner and don't need it
660 */
661 if (likely(!msg_non_seq(buf_msg(buf)))) {
662 bcbuf_set_acks(buf, bclink->bcast_nodes.count);
663 msg_set_non_seq(msg, 1);
664 msg_set_mc_netid(msg, tn->net_id);
665 tn->bcl->stats.sent_info++;
666 if (WARN_ON(!bclink->bcast_nodes.count)) {
667 dump_stack();
668 return 0;
669 }
670 }
671 254
672 /* Send buffer over bearers until all targets reached */ 255 __skb_queue_head_init(&xmitq);
673 bcbearer->remains = bclink->bcast_nodes;
674
675 for (bp_index = 0; bp_index < MAX_BEARERS; bp_index++) {
676 struct tipc_bearer *p = bcbearer->bpairs[bp_index].primary;
677 struct tipc_bearer *s = bcbearer->bpairs[bp_index].secondary;
678 struct tipc_bearer *bp[2] = {p, s};
679 struct tipc_bearer *b = bp[msg_link_selector(msg)];
680 struct sk_buff *tbuf;
681
682 if (!p)
683 break; /* No more bearers to try */
684 if (!b)
685 b = p;
686 tipc_nmap_diff(&bcbearer->remains, &b->nodes,
687 &bcbearer->remains_new);
688 if (bcbearer->remains_new.count == bcbearer->remains.count)
689 continue; /* Nothing added by bearer pair */
690
691 if (bp_index == 0) {
692 /* Use original buffer for first bearer */
693 tipc_bearer_send(net, b->identity, buf, &b->bcast_addr);
694 } else {
695 /* Avoid concurrent buffer access */
696 tbuf = pskb_copy_for_clone(buf, GFP_ATOMIC);
697 if (!tbuf)
698 break;
699 tipc_bearer_send(net, b->identity, tbuf,
700 &b->bcast_addr);
701 kfree_skb(tbuf); /* Bearer keeps a clone */
702 }
703 if (bcbearer->remains_new.count == 0)
704 break; /* All targets reached */
705 256
706 bcbearer->remains = bcbearer->remains_new; 257 tipc_bcast_lock(net);
707 } 258 tipc_link_bc_ack_rcv(l, acked, &xmitq);
259 tipc_bcast_unlock(net);
708 260
709 return 0; 261 tipc_bcbase_xmit(net, &xmitq);
262
263 /* Any socket wakeup messages ? */
264 if (!skb_queue_empty(inputq))
265 tipc_sk_rcv(net, inputq);
710} 266}
711 267
712/** 268/* tipc_bcast_synch_rcv - check and update rcv link with peer's send state
713 * tipc_bcbearer_sort - create sets of bearer pairs used by broadcast bearer 269 *
270 * RCU is locked, no other locks set
714 */ 271 */
715void tipc_bcbearer_sort(struct net *net, struct tipc_node_map *nm_ptr, 272void tipc_bcast_sync_rcv(struct net *net, struct tipc_link *l,
716 u32 node, bool action) 273 struct tipc_msg *hdr)
717{ 274{
718 struct tipc_net *tn = net_generic(net, tipc_net_id); 275 struct sk_buff_head *inputq = &tipc_bc_base(net)->inputq;
719 struct tipc_bcbearer *bcbearer = tn->bcbearer; 276 struct sk_buff_head xmitq;
720 struct tipc_bcbearer_pair *bp_temp = bcbearer->bpairs_temp;
721 struct tipc_bcbearer_pair *bp_curr;
722 struct tipc_bearer *b;
723 int b_index;
724 int pri;
725
726 tipc_bclink_lock(net);
727 277
728 if (action) 278 __skb_queue_head_init(&xmitq);
729 tipc_nmap_add(nm_ptr, node);
730 else
731 tipc_nmap_remove(nm_ptr, node);
732 279
733 /* Group bearers by priority (can assume max of two per priority) */ 280 tipc_bcast_lock(net);
734 memset(bp_temp, 0, sizeof(bcbearer->bpairs_temp)); 281 if (msg_type(hdr) == STATE_MSG) {
282 tipc_link_bc_ack_rcv(l, msg_bcast_ack(hdr), &xmitq);
283 tipc_link_bc_sync_rcv(l, hdr, &xmitq);
284 } else {
285 tipc_link_bc_init_rcv(l, hdr);
286 }
287 tipc_bcast_unlock(net);
735 288
736 rcu_read_lock(); 289 tipc_bcbase_xmit(net, &xmitq);
737 for (b_index = 0; b_index < MAX_BEARERS; b_index++) {
738 b = rcu_dereference_rtnl(tn->bearer_list[b_index]);
739 if (!b || !b->nodes.count)
740 continue;
741 290
742 if (!bp_temp[b->priority].primary) 291 /* Any socket wakeup messages ? */
743 bp_temp[b->priority].primary = b; 292 if (!skb_queue_empty(inputq))
744 else 293 tipc_sk_rcv(net, inputq);
745 bp_temp[b->priority].secondary = b; 294}
746 }
747 rcu_read_unlock();
748 295
749 /* Create array of bearer pairs for broadcasting */ 296/* tipc_bcast_add_peer - add a peer node to broadcast link and bearer
750 bp_curr = bcbearer->bpairs; 297 *
751 memset(bcbearer->bpairs, 0, sizeof(bcbearer->bpairs)); 298 * RCU is locked, node lock is set
299 */
300void tipc_bcast_add_peer(struct net *net, struct tipc_link *uc_l,
301 struct sk_buff_head *xmitq)
302{
303 struct tipc_link *snd_l = tipc_bc_sndlink(net);
752 304
753 for (pri = TIPC_MAX_LINK_PRI; pri >= 0; pri--) { 305 tipc_bcast_lock(net);
306 tipc_link_add_bc_peer(snd_l, uc_l, xmitq);
307 tipc_bcbase_select_primary(net);
308 tipc_bcast_unlock(net);
309}
754 310
755 if (!bp_temp[pri].primary) 311/* tipc_bcast_remove_peer - remove a peer node from broadcast link and bearer
756 continue; 312 *
313 * RCU is locked, node lock is set
314 */
315void tipc_bcast_remove_peer(struct net *net, struct tipc_link *rcv_l)
316{
317 struct tipc_link *snd_l = tipc_bc_sndlink(net);
318 struct sk_buff_head *inputq = &tipc_bc_base(net)->inputq;
319 struct sk_buff_head xmitq;
757 320
758 bp_curr->primary = bp_temp[pri].primary; 321 __skb_queue_head_init(&xmitq);
759 322
760 if (bp_temp[pri].secondary) { 323 tipc_bcast_lock(net);
761 if (tipc_nmap_equal(&bp_temp[pri].primary->nodes, 324 tipc_link_remove_bc_peer(snd_l, rcv_l, &xmitq);
762 &bp_temp[pri].secondary->nodes)) { 325 tipc_bcbase_select_primary(net);
763 bp_curr->secondary = bp_temp[pri].secondary; 326 tipc_bcast_unlock(net);
764 } else {
765 bp_curr++;
766 bp_curr->primary = bp_temp[pri].secondary;
767 }
768 }
769 327
770 bp_curr++; 328 tipc_bcbase_xmit(net, &xmitq);
771 }
772 329
773 tipc_bclink_unlock(net); 330 /* Any socket wakeup messages ? */
331 if (!skb_queue_empty(inputq))
332 tipc_sk_rcv(net, inputq);
774} 333}
775 334
776static int __tipc_nl_add_bc_link_stat(struct sk_buff *skb, 335static int __tipc_nl_add_bc_link_stat(struct sk_buff *skb,
@@ -836,7 +395,7 @@ int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg)
836 if (!bcl) 395 if (!bcl)
837 return 0; 396 return 0;
838 397
839 tipc_bclink_lock(net); 398 tipc_bcast_lock(net);
840 399
841 hdr = genlmsg_put(msg->skb, msg->portid, msg->seq, &tipc_genl_family, 400 hdr = genlmsg_put(msg->skb, msg->portid, msg->seq, &tipc_genl_family,
842 NLM_F_MULTI, TIPC_NL_LINK_GET); 401 NLM_F_MULTI, TIPC_NL_LINK_GET);
@@ -871,7 +430,7 @@ int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg)
871 if (err) 430 if (err)
872 goto attr_msg_full; 431 goto attr_msg_full;
873 432
874 tipc_bclink_unlock(net); 433 tipc_bcast_unlock(net);
875 nla_nest_end(msg->skb, attrs); 434 nla_nest_end(msg->skb, attrs);
876 genlmsg_end(msg->skb, hdr); 435 genlmsg_end(msg->skb, hdr);
877 436
@@ -882,7 +441,7 @@ prop_msg_full:
882attr_msg_full: 441attr_msg_full:
883 nla_nest_cancel(msg->skb, attrs); 442 nla_nest_cancel(msg->skb, attrs);
884msg_full: 443msg_full:
885 tipc_bclink_unlock(net); 444 tipc_bcast_unlock(net);
886 genlmsg_cancel(msg->skb, hdr); 445 genlmsg_cancel(msg->skb, hdr);
887 446
888 return -EMSGSIZE; 447 return -EMSGSIZE;
@@ -896,26 +455,25 @@ int tipc_bclink_reset_stats(struct net *net)
896 if (!bcl) 455 if (!bcl)
897 return -ENOPROTOOPT; 456 return -ENOPROTOOPT;
898 457
899 tipc_bclink_lock(net); 458 tipc_bcast_lock(net);
900 memset(&bcl->stats, 0, sizeof(bcl->stats)); 459 memset(&bcl->stats, 0, sizeof(bcl->stats));
901 tipc_bclink_unlock(net); 460 tipc_bcast_unlock(net);
902 return 0; 461 return 0;
903} 462}
904 463
905int tipc_bclink_set_queue_limits(struct net *net, u32 limit) 464static int tipc_bc_link_set_queue_limits(struct net *net, u32 limit)
906{ 465{
907 struct tipc_net *tn = net_generic(net, tipc_net_id); 466 struct tipc_link *l = tipc_bc_sndlink(net);
908 struct tipc_link *bcl = tn->bcl;
909 467
910 if (!bcl) 468 if (!l)
911 return -ENOPROTOOPT; 469 return -ENOPROTOOPT;
912 if (limit < BCLINK_WIN_MIN) 470 if (limit < BCLINK_WIN_MIN)
913 limit = BCLINK_WIN_MIN; 471 limit = BCLINK_WIN_MIN;
914 if (limit > TIPC_MAX_LINK_WIN) 472 if (limit > TIPC_MAX_LINK_WIN)
915 return -EINVAL; 473 return -EINVAL;
916 tipc_bclink_lock(net); 474 tipc_bcast_lock(net);
917 tipc_link_set_queue_limits(bcl, limit); 475 tipc_link_set_queue_limits(l, limit);
918 tipc_bclink_unlock(net); 476 tipc_bcast_unlock(net);
919 return 0; 477 return 0;
920} 478}
921 479
@@ -937,123 +495,51 @@ int tipc_nl_bc_link_set(struct net *net, struct nlattr *attrs[])
937 495
938 win = nla_get_u32(props[TIPC_NLA_PROP_WIN]); 496 win = nla_get_u32(props[TIPC_NLA_PROP_WIN]);
939 497
940 return tipc_bclink_set_queue_limits(net, win); 498 return tipc_bc_link_set_queue_limits(net, win);
941} 499}
942 500
943int tipc_bclink_init(struct net *net) 501int tipc_bcast_init(struct net *net)
944{ 502{
945 struct tipc_net *tn = net_generic(net, tipc_net_id); 503 struct tipc_net *tn = tipc_net(net);
946 struct tipc_bcbearer *bcbearer; 504 struct tipc_bc_base *bb = NULL;
947 struct tipc_bclink *bclink; 505 struct tipc_link *l = NULL;
948 struct tipc_link *bcl;
949
950 bcbearer = kzalloc(sizeof(*bcbearer), GFP_ATOMIC);
951 if (!bcbearer)
952 return -ENOMEM;
953
954 bclink = kzalloc(sizeof(*bclink), GFP_ATOMIC);
955 if (!bclink) {
956 kfree(bcbearer);
957 return -ENOMEM;
958 }
959 506
960 bcl = &bclink->link; 507 bb = kzalloc(sizeof(*bb), GFP_ATOMIC);
961 bcbearer->bearer.media = &bcbearer->media; 508 if (!bb)
962 bcbearer->media.send_msg = tipc_bcbearer_send; 509 goto enomem;
963 sprintf(bcbearer->media.name, "tipc-broadcast"); 510 tn->bcbase = bb;
964 511 spin_lock_init(&tipc_net(net)->bclock);
965 spin_lock_init(&bclink->lock);
966 __skb_queue_head_init(&bcl->transmq);
967 __skb_queue_head_init(&bcl->backlogq);
968 __skb_queue_head_init(&bcl->deferdq);
969 skb_queue_head_init(&bcl->wakeupq);
970 bcl->snd_nxt = 1;
971 spin_lock_init(&bclink->node.lock);
972 __skb_queue_head_init(&bclink->arrvq);
973 skb_queue_head_init(&bclink->inputq);
974 bcl->owner = &bclink->node;
975 bcl->owner->net = net;
976 bcl->mtu = MAX_PKT_DEFAULT_MCAST;
977 tipc_link_set_queue_limits(bcl, BCLINK_WIN_DEFAULT);
978 bcl->bearer_id = MAX_BEARERS;
979 rcu_assign_pointer(tn->bearer_list[MAX_BEARERS], &bcbearer->bearer);
980 bcl->pmsg = (struct tipc_msg *)&bcl->proto_msg;
981 msg_set_prevnode(bcl->pmsg, tn->own_addr);
982 strlcpy(bcl->name, tipc_bclink_name, TIPC_MAX_LINK_NAME);
983 tn->bcbearer = bcbearer;
984 tn->bclink = bclink;
985 tn->bcl = bcl;
986 return 0;
987}
988 512
989void tipc_bclink_stop(struct net *net) 513 if (!tipc_link_bc_create(net, 0, 0,
990{ 514 U16_MAX,
991 struct tipc_net *tn = net_generic(net, tipc_net_id); 515 BCLINK_WIN_DEFAULT,
992 516 0,
993 tipc_bclink_lock(net); 517 &bb->inputq,
994 tipc_link_purge_queues(tn->bcl); 518 NULL,
995 tipc_bclink_unlock(net); 519 NULL,
996 520 &l))
997 RCU_INIT_POINTER(tn->bearer_list[BCBEARER], NULL); 521 goto enomem;
998 synchronize_net(); 522 bb->link = l;
999 kfree(tn->bcbearer); 523 tn->bcl = l;
1000 kfree(tn->bclink); 524 return 0;
525enomem:
526 kfree(bb);
527 kfree(l);
528 return -ENOMEM;
1001} 529}
1002 530
1003/** 531void tipc_bcast_reinit(struct net *net)
1004 * tipc_nmap_add - add a node to a node map
1005 */
1006static void tipc_nmap_add(struct tipc_node_map *nm_ptr, u32 node)
1007{ 532{
1008 int n = tipc_node(node); 533 struct tipc_bc_base *b = tipc_bc_base(net);
1009 int w = n / WSIZE;
1010 u32 mask = (1 << (n % WSIZE));
1011 534
1012 if ((nm_ptr->map[w] & mask) == 0) { 535 msg_set_prevnode(b->link->pmsg, tipc_own_addr(net));
1013 nm_ptr->count++;
1014 nm_ptr->map[w] |= mask;
1015 }
1016} 536}
1017 537
1018/** 538void tipc_bcast_stop(struct net *net)
1019 * tipc_nmap_remove - remove a node from a node map
1020 */
1021static void tipc_nmap_remove(struct tipc_node_map *nm_ptr, u32 node)
1022{ 539{
1023 int n = tipc_node(node); 540 struct tipc_net *tn = net_generic(net, tipc_net_id);
1024 int w = n / WSIZE;
1025 u32 mask = (1 << (n % WSIZE));
1026
1027 if ((nm_ptr->map[w] & mask) != 0) {
1028 nm_ptr->map[w] &= ~mask;
1029 nm_ptr->count--;
1030 }
1031}
1032 541
1033/** 542 synchronize_net();
1034 * tipc_nmap_diff - find differences between node maps 543 kfree(tn->bcbase);
1035 * @nm_a: input node map A 544 kfree(tn->bcl);
1036 * @nm_b: input node map B
1037 * @nm_diff: output node map A-B (i.e. nodes of A that are not in B)
1038 */
1039static void tipc_nmap_diff(struct tipc_node_map *nm_a,
1040 struct tipc_node_map *nm_b,
1041 struct tipc_node_map *nm_diff)
1042{
1043 int stop = ARRAY_SIZE(nm_a->map);
1044 int w;
1045 int b;
1046 u32 map;
1047
1048 memset(nm_diff, 0, sizeof(*nm_diff));
1049 for (w = 0; w < stop; w++) {
1050 map = nm_a->map[w] ^ (nm_a->map[w] & nm_b->map[w]);
1051 nm_diff->map[w] = map;
1052 if (map != 0) {
1053 for (b = 0 ; b < WSIZE; b++) {
1054 if (map & (1 << b))
1055 nm_diff->count++;
1056 }
1057 }
1058 }
1059} 545}
diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h
index d74c69bcf60b..2855b9356a15 100644
--- a/net/tipc/bcast.h
+++ b/net/tipc/bcast.h
@@ -37,102 +37,44 @@
37#ifndef _TIPC_BCAST_H 37#ifndef _TIPC_BCAST_H
38#define _TIPC_BCAST_H 38#define _TIPC_BCAST_H
39 39
40#include <linux/tipc_config.h> 40#include "core.h"
41#include "link.h"
42#include "node.h"
43 41
44/** 42struct tipc_node;
45 * struct tipc_bcbearer_pair - a pair of bearers used by broadcast link 43struct tipc_msg;
46 * @primary: pointer to primary bearer 44struct tipc_nl_msg;
47 * @secondary: pointer to secondary bearer 45struct tipc_node_map;
48 *
49 * Bearers must have same priority and same set of reachable destinations
50 * to be paired.
51 */
52
53struct tipc_bcbearer_pair {
54 struct tipc_bearer *primary;
55 struct tipc_bearer *secondary;
56};
57
58#define BCBEARER MAX_BEARERS
59
60/**
61 * struct tipc_bcbearer - bearer used by broadcast link
62 * @bearer: (non-standard) broadcast bearer structure
63 * @media: (non-standard) broadcast media structure
64 * @bpairs: array of bearer pairs
65 * @bpairs_temp: temporary array of bearer pairs used by tipc_bcbearer_sort()
66 * @remains: temporary node map used by tipc_bcbearer_send()
67 * @remains_new: temporary node map used tipc_bcbearer_send()
68 *
69 * Note: The fields labelled "temporary" are incorporated into the bearer
70 * to avoid consuming potentially limited stack space through the use of
71 * large local variables within multicast routines. Concurrent access is
72 * prevented through use of the spinlock "bclink_lock".
73 */
74struct tipc_bcbearer {
75 struct tipc_bearer bearer;
76 struct tipc_media media;
77 struct tipc_bcbearer_pair bpairs[MAX_BEARERS];
78 struct tipc_bcbearer_pair bpairs_temp[TIPC_MAX_LINK_PRI + 1];
79 struct tipc_node_map remains;
80 struct tipc_node_map remains_new;
81};
82 46
83/** 47int tipc_bcast_init(struct net *net);
84 * struct tipc_bclink - link used for broadcast messages 48void tipc_bcast_reinit(struct net *net);
85 * @lock: spinlock governing access to structure 49void tipc_bcast_stop(struct net *net);
86 * @link: (non-standard) broadcast link structure 50void tipc_bcast_add_peer(struct net *net, struct tipc_link *l,
87 * @node: (non-standard) node structure representing b'cast link's peer node 51 struct sk_buff_head *xmitq);
88 * @bcast_nodes: map of broadcast-capable nodes 52void tipc_bcast_remove_peer(struct net *net, struct tipc_link *rcv_bcl);
89 * @retransmit_to: node that most recently requested a retransmit 53void tipc_bcast_inc_bearer_dst_cnt(struct net *net, int bearer_id);
90 * 54void tipc_bcast_dec_bearer_dst_cnt(struct net *net, int bearer_id);
91 * Handles sequence numbering, fragmentation, bundling, etc. 55int tipc_bcast_get_mtu(struct net *net);
92 */ 56int tipc_bcast_xmit(struct net *net, struct sk_buff_head *list);
93struct tipc_bclink { 57int tipc_bcast_rcv(struct net *net, struct tipc_link *l, struct sk_buff *skb);
94 spinlock_t lock; 58void tipc_bcast_ack_rcv(struct net *net, struct tipc_link *l, u32 acked);
95 struct tipc_link link; 59void tipc_bcast_sync_rcv(struct net *net, struct tipc_link *l,
96 struct tipc_node node; 60 struct tipc_msg *hdr);
97 struct sk_buff_head arrvq; 61int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg);
98 struct sk_buff_head inputq; 62int tipc_nl_bc_link_set(struct net *net, struct nlattr *attrs[]);
99 struct tipc_node_map bcast_nodes; 63int tipc_bclink_reset_stats(struct net *net);
100 struct tipc_node *retransmit_to;
101};
102 64
103struct tipc_node; 65static inline void tipc_bcast_lock(struct net *net)
104extern const char tipc_bclink_name[]; 66{
67 spin_lock_bh(&tipc_net(net)->bclock);
68}
105 69
106/** 70static inline void tipc_bcast_unlock(struct net *net)
107 * tipc_nmap_equal - test for equality of node maps
108 */
109static inline int tipc_nmap_equal(struct tipc_node_map *nm_a,
110 struct tipc_node_map *nm_b)
111{ 71{
112 return !memcmp(nm_a, nm_b, sizeof(*nm_a)); 72 spin_unlock_bh(&tipc_net(net)->bclock);
113} 73}
114 74
115int tipc_bclink_init(struct net *net); 75static inline struct tipc_link *tipc_bc_sndlink(struct net *net)
116void tipc_bclink_stop(struct net *net); 76{
117void tipc_bclink_add_node(struct net *net, u32 addr); 77 return tipc_net(net)->bcl;
118void tipc_bclink_remove_node(struct net *net, u32 addr); 78}
119struct tipc_node *tipc_bclink_retransmit_to(struct net *tn);
120void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked);
121void tipc_bclink_rcv(struct net *net, struct sk_buff *buf);
122u32 tipc_bclink_get_last_sent(struct net *net);
123u32 tipc_bclink_acks_missing(struct tipc_node *n_ptr);
124void tipc_bclink_update_link_state(struct tipc_node *node,
125 u32 last_sent);
126int tipc_bclink_reset_stats(struct net *net);
127int tipc_bclink_set_queue_limits(struct net *net, u32 limit);
128void tipc_bcbearer_sort(struct net *net, struct tipc_node_map *nm_ptr,
129 u32 node, bool action);
130uint tipc_bclink_get_mtu(void);
131int tipc_bclink_xmit(struct net *net, struct sk_buff_head *list);
132void tipc_bclink_wakeup_users(struct net *net);
133int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg);
134int tipc_nl_bc_link_set(struct net *net, struct nlattr *attrs[]);
135void tipc_bclink_input(struct net *net);
136void tipc_bclink_sync_state(struct tipc_node *n, struct tipc_msg *msg);
137 79
138#endif 80#endif
diff --git a/net/tipc/bearer.c b/net/tipc/bearer.c
index 82b278668ab7..648f2a67f314 100644
--- a/net/tipc/bearer.c
+++ b/net/tipc/bearer.c
@@ -193,10 +193,8 @@ void tipc_bearer_add_dest(struct net *net, u32 bearer_id, u32 dest)
193 193
194 rcu_read_lock(); 194 rcu_read_lock();
195 b_ptr = rcu_dereference_rtnl(tn->bearer_list[bearer_id]); 195 b_ptr = rcu_dereference_rtnl(tn->bearer_list[bearer_id]);
196 if (b_ptr) { 196 if (b_ptr)
197 tipc_bcbearer_sort(net, &b_ptr->nodes, dest, true);
198 tipc_disc_add_dest(b_ptr->link_req); 197 tipc_disc_add_dest(b_ptr->link_req);
199 }
200 rcu_read_unlock(); 198 rcu_read_unlock();
201} 199}
202 200
@@ -207,10 +205,8 @@ void tipc_bearer_remove_dest(struct net *net, u32 bearer_id, u32 dest)
207 205
208 rcu_read_lock(); 206 rcu_read_lock();
209 b_ptr = rcu_dereference_rtnl(tn->bearer_list[bearer_id]); 207 b_ptr = rcu_dereference_rtnl(tn->bearer_list[bearer_id]);
210 if (b_ptr) { 208 if (b_ptr)
211 tipc_bcbearer_sort(net, &b_ptr->nodes, dest, false);
212 tipc_disc_remove_dest(b_ptr->link_req); 209 tipc_disc_remove_dest(b_ptr->link_req);
213 }
214 rcu_read_unlock(); 210 rcu_read_unlock();
215} 211}
216 212
@@ -418,10 +414,9 @@ void tipc_disable_l2_media(struct tipc_bearer *b)
418 * @b_ptr: the bearer through which the packet is to be sent 414 * @b_ptr: the bearer through which the packet is to be sent
419 * @dest: peer destination address 415 * @dest: peer destination address
420 */ 416 */
421int tipc_l2_send_msg(struct net *net, struct sk_buff *buf, 417int tipc_l2_send_msg(struct net *net, struct sk_buff *skb,
422 struct tipc_bearer *b, struct tipc_media_addr *dest) 418 struct tipc_bearer *b, struct tipc_media_addr *dest)
423{ 419{
424 struct sk_buff *clone;
425 struct net_device *dev; 420 struct net_device *dev;
426 int delta; 421 int delta;
427 422
@@ -429,42 +424,48 @@ int tipc_l2_send_msg(struct net *net, struct sk_buff *buf,
429 if (!dev) 424 if (!dev)
430 return 0; 425 return 0;
431 426
432 clone = skb_clone(buf, GFP_ATOMIC); 427 delta = dev->hard_header_len - skb_headroom(skb);
433 if (!clone)
434 return 0;
435
436 delta = dev->hard_header_len - skb_headroom(buf);
437 if ((delta > 0) && 428 if ((delta > 0) &&
438 pskb_expand_head(clone, SKB_DATA_ALIGN(delta), 0, GFP_ATOMIC)) { 429 pskb_expand_head(skb, SKB_DATA_ALIGN(delta), 0, GFP_ATOMIC)) {
439 kfree_skb(clone); 430 kfree_skb(skb);
440 return 0; 431 return 0;
441 } 432 }
442 433
443 skb_reset_network_header(clone); 434 skb_reset_network_header(skb);
444 clone->dev = dev; 435 skb->dev = dev;
445 clone->protocol = htons(ETH_P_TIPC); 436 skb->protocol = htons(ETH_P_TIPC);
446 dev_hard_header(clone, dev, ETH_P_TIPC, dest->value, 437 dev_hard_header(skb, dev, ETH_P_TIPC, dest->value,
447 dev->dev_addr, clone->len); 438 dev->dev_addr, skb->len);
448 dev_queue_xmit(clone); 439 dev_queue_xmit(skb);
449 return 0; 440 return 0;
450} 441}
451 442
452/* tipc_bearer_send- sends buffer to destination over bearer 443int tipc_bearer_mtu(struct net *net, u32 bearer_id)
453 * 444{
454 * IMPORTANT: 445 int mtu = 0;
455 * The media send routine must not alter the buffer being passed in 446 struct tipc_bearer *b;
456 * as it may be needed for later retransmission! 447
448 rcu_read_lock();
449 b = rcu_dereference_rtnl(tipc_net(net)->bearer_list[bearer_id]);
450 if (b)
451 mtu = b->mtu;
452 rcu_read_unlock();
453 return mtu;
454}
455
456/* tipc_bearer_xmit_skb - sends buffer to destination over bearer
457 */ 457 */
458void tipc_bearer_send(struct net *net, u32 bearer_id, struct sk_buff *buf, 458void tipc_bearer_xmit_skb(struct net *net, u32 bearer_id,
459 struct tipc_media_addr *dest) 459 struct sk_buff *skb,
460 struct tipc_media_addr *dest)
460{ 461{
461 struct tipc_net *tn = net_generic(net, tipc_net_id); 462 struct tipc_net *tn = tipc_net(net);
462 struct tipc_bearer *b_ptr; 463 struct tipc_bearer *b;
463 464
464 rcu_read_lock(); 465 rcu_read_lock();
465 b_ptr = rcu_dereference_rtnl(tn->bearer_list[bearer_id]); 466 b = rcu_dereference_rtnl(tn->bearer_list[bearer_id]);
466 if (likely(b_ptr)) 467 if (likely(b))
467 b_ptr->media->send_msg(net, buf, b_ptr, dest); 468 b->media->send_msg(net, skb, b, dest);
468 rcu_read_unlock(); 469 rcu_read_unlock();
469} 470}
470 471
@@ -487,8 +488,31 @@ void tipc_bearer_xmit(struct net *net, u32 bearer_id,
487 skb_queue_walk_safe(xmitq, skb, tmp) { 488 skb_queue_walk_safe(xmitq, skb, tmp) {
488 __skb_dequeue(xmitq); 489 __skb_dequeue(xmitq);
489 b->media->send_msg(net, skb, b, dst); 490 b->media->send_msg(net, skb, b, dst);
490 /* Until we remove cloning in tipc_l2_send_msg(): */ 491 }
491 kfree_skb(skb); 492 }
493 rcu_read_unlock();
494}
495
496/* tipc_bearer_bc_xmit() - broadcast buffers to all destinations
497 */
498void tipc_bearer_bc_xmit(struct net *net, u32 bearer_id,
499 struct sk_buff_head *xmitq)
500{
501 struct tipc_net *tn = tipc_net(net);
502 int net_id = tn->net_id;
503 struct tipc_bearer *b;
504 struct sk_buff *skb, *tmp;
505 struct tipc_msg *hdr;
506
507 rcu_read_lock();
508 b = rcu_dereference_rtnl(tn->bearer_list[bearer_id]);
509 if (likely(b)) {
510 skb_queue_walk_safe(xmitq, skb, tmp) {
511 hdr = buf_msg(skb);
512 msg_set_non_seq(hdr, 1);
513 msg_set_mc_netid(hdr, net_id);
514 __skb_dequeue(xmitq);
515 b->media->send_msg(net, skb, b, &b->bcast_addr);
492 } 516 }
493 } 517 }
494 rcu_read_unlock(); 518 rcu_read_unlock();
diff --git a/net/tipc/bearer.h b/net/tipc/bearer.h
index 6426f242f626..552185bc4773 100644
--- a/net/tipc/bearer.h
+++ b/net/tipc/bearer.h
@@ -163,6 +163,7 @@ struct tipc_bearer {
163 u32 identity; 163 u32 identity;
164 struct tipc_link_req *link_req; 164 struct tipc_link_req *link_req;
165 char net_plane; 165 char net_plane;
166 int node_cnt;
166 struct tipc_node_map nodes; 167 struct tipc_node_map nodes;
167}; 168};
168 169
@@ -215,10 +216,14 @@ struct tipc_media *tipc_media_find(const char *name);
215int tipc_bearer_setup(void); 216int tipc_bearer_setup(void);
216void tipc_bearer_cleanup(void); 217void tipc_bearer_cleanup(void);
217void tipc_bearer_stop(struct net *net); 218void tipc_bearer_stop(struct net *net);
218void tipc_bearer_send(struct net *net, u32 bearer_id, struct sk_buff *buf, 219int tipc_bearer_mtu(struct net *net, u32 bearer_id);
219 struct tipc_media_addr *dest); 220void tipc_bearer_xmit_skb(struct net *net, u32 bearer_id,
221 struct sk_buff *skb,
222 struct tipc_media_addr *dest);
220void tipc_bearer_xmit(struct net *net, u32 bearer_id, 223void tipc_bearer_xmit(struct net *net, u32 bearer_id,
221 struct sk_buff_head *xmitq, 224 struct sk_buff_head *xmitq,
222 struct tipc_media_addr *dst); 225 struct tipc_media_addr *dst);
226void tipc_bearer_bc_xmit(struct net *net, u32 bearer_id,
227 struct sk_buff_head *xmitq);
223 228
224#endif /* _TIPC_BEARER_H */ 229#endif /* _TIPC_BEARER_H */
diff --git a/net/tipc/core.c b/net/tipc/core.c
index 005ba5eb0ea4..03a842870c52 100644
--- a/net/tipc/core.c
+++ b/net/tipc/core.c
@@ -42,6 +42,7 @@
42#include "bearer.h" 42#include "bearer.h"
43#include "net.h" 43#include "net.h"
44#include "socket.h" 44#include "socket.h"
45#include "bcast.h"
45 46
46#include <linux/module.h> 47#include <linux/module.h>
47 48
@@ -71,8 +72,15 @@ static int __net_init tipc_init_net(struct net *net)
71 err = tipc_topsrv_start(net); 72 err = tipc_topsrv_start(net);
72 if (err) 73 if (err)
73 goto out_subscr; 74 goto out_subscr;
75
76 err = tipc_bcast_init(net);
77 if (err)
78 goto out_bclink;
79
74 return 0; 80 return 0;
75 81
82out_bclink:
83 tipc_bcast_stop(net);
76out_subscr: 84out_subscr:
77 tipc_nametbl_stop(net); 85 tipc_nametbl_stop(net);
78out_nametbl: 86out_nametbl:
@@ -85,6 +93,7 @@ static void __net_exit tipc_exit_net(struct net *net)
85{ 93{
86 tipc_topsrv_stop(net); 94 tipc_topsrv_stop(net);
87 tipc_net_stop(net); 95 tipc_net_stop(net);
96 tipc_bcast_stop(net);
88 tipc_nametbl_stop(net); 97 tipc_nametbl_stop(net);
89 tipc_sk_rht_destroy(net); 98 tipc_sk_rht_destroy(net);
90} 99}
diff --git a/net/tipc/core.h b/net/tipc/core.h
index b96b41eabf12..18e95a8020cd 100644
--- a/net/tipc/core.h
+++ b/net/tipc/core.h
@@ -62,8 +62,7 @@
62 62
63struct tipc_node; 63struct tipc_node;
64struct tipc_bearer; 64struct tipc_bearer;
65struct tipc_bcbearer; 65struct tipc_bc_base;
66struct tipc_bclink;
67struct tipc_link; 66struct tipc_link;
68struct tipc_name_table; 67struct tipc_name_table;
69struct tipc_server; 68struct tipc_server;
@@ -93,8 +92,8 @@ struct tipc_net {
93 struct tipc_bearer __rcu *bearer_list[MAX_BEARERS + 1]; 92 struct tipc_bearer __rcu *bearer_list[MAX_BEARERS + 1];
94 93
95 /* Broadcast link */ 94 /* Broadcast link */
96 struct tipc_bcbearer *bcbearer; 95 spinlock_t bclock;
97 struct tipc_bclink *bclink; 96 struct tipc_bc_base *bcbase;
98 struct tipc_link *bcl; 97 struct tipc_link *bcl;
99 98
100 /* Socket hash table */ 99 /* Socket hash table */
@@ -114,6 +113,11 @@ static inline struct tipc_net *tipc_net(struct net *net)
114 return net_generic(net, tipc_net_id); 113 return net_generic(net, tipc_net_id);
115} 114}
116 115
116static inline int tipc_netid(struct net *net)
117{
118 return tipc_net(net)->net_id;
119}
120
117static inline u16 mod(u16 x) 121static inline u16 mod(u16 x)
118{ 122{
119 return x & 0xffffu; 123 return x & 0xffffu;
diff --git a/net/tipc/discover.c b/net/tipc/discover.c
index d14e0a4aa9af..afe8c47c4085 100644
--- a/net/tipc/discover.c
+++ b/net/tipc/discover.c
@@ -89,7 +89,7 @@ static void tipc_disc_init_msg(struct net *net, struct sk_buff *buf, u32 type,
89 MAX_H_SIZE, dest_domain); 89 MAX_H_SIZE, dest_domain);
90 msg_set_non_seq(msg, 1); 90 msg_set_non_seq(msg, 1);
91 msg_set_node_sig(msg, tn->random); 91 msg_set_node_sig(msg, tn->random);
92 msg_set_node_capabilities(msg, 0); 92 msg_set_node_capabilities(msg, TIPC_NODE_CAPABILITIES);
93 msg_set_dest_domain(msg, dest_domain); 93 msg_set_dest_domain(msg, dest_domain);
94 msg_set_bc_netid(msg, tn->net_id); 94 msg_set_bc_netid(msg, tn->net_id);
95 b_ptr->media->addr2msg(msg_media_addr(msg), &b_ptr->addr); 95 b_ptr->media->addr2msg(msg_media_addr(msg), &b_ptr->addr);
@@ -167,11 +167,10 @@ void tipc_disc_rcv(struct net *net, struct sk_buff *skb,
167 /* Send response, if necessary */ 167 /* Send response, if necessary */
168 if (respond && (mtyp == DSC_REQ_MSG)) { 168 if (respond && (mtyp == DSC_REQ_MSG)) {
169 rskb = tipc_buf_acquire(MAX_H_SIZE); 169 rskb = tipc_buf_acquire(MAX_H_SIZE);
170 if (rskb) { 170 if (!rskb)
171 tipc_disc_init_msg(net, rskb, DSC_RESP_MSG, bearer); 171 return;
172 tipc_bearer_send(net, bearer->identity, rskb, &maddr); 172 tipc_disc_init_msg(net, rskb, DSC_RESP_MSG, bearer);
173 kfree_skb(rskb); 173 tipc_bearer_xmit_skb(net, bearer->identity, rskb, &maddr);
174 }
175 } 174 }
176} 175}
177 176
@@ -225,6 +224,7 @@ void tipc_disc_remove_dest(struct tipc_link_req *req)
225static void disc_timeout(unsigned long data) 224static void disc_timeout(unsigned long data)
226{ 225{
227 struct tipc_link_req *req = (struct tipc_link_req *)data; 226 struct tipc_link_req *req = (struct tipc_link_req *)data;
227 struct sk_buff *skb;
228 int max_delay; 228 int max_delay;
229 229
230 spin_lock_bh(&req->lock); 230 spin_lock_bh(&req->lock);
@@ -242,9 +242,9 @@ static void disc_timeout(unsigned long data)
242 * hold at fast polling rate if don't have any associated nodes, 242 * hold at fast polling rate if don't have any associated nodes,
243 * otherwise hold at slow polling rate 243 * otherwise hold at slow polling rate
244 */ 244 */
245 tipc_bearer_send(req->net, req->bearer_id, req->buf, &req->dest); 245 skb = skb_clone(req->buf, GFP_ATOMIC);
246 246 if (skb)
247 247 tipc_bearer_xmit_skb(req->net, req->bearer_id, skb, &req->dest);
248 req->timer_intv *= 2; 248 req->timer_intv *= 2;
249 if (req->num_nodes) 249 if (req->num_nodes)
250 max_delay = TIPC_LINK_REQ_SLOW; 250 max_delay = TIPC_LINK_REQ_SLOW;
@@ -271,6 +271,7 @@ int tipc_disc_create(struct net *net, struct tipc_bearer *b_ptr,
271 struct tipc_media_addr *dest) 271 struct tipc_media_addr *dest)
272{ 272{
273 struct tipc_link_req *req; 273 struct tipc_link_req *req;
274 struct sk_buff *skb;
274 275
275 req = kmalloc(sizeof(*req), GFP_ATOMIC); 276 req = kmalloc(sizeof(*req), GFP_ATOMIC);
276 if (!req) 277 if (!req)
@@ -292,7 +293,9 @@ int tipc_disc_create(struct net *net, struct tipc_bearer *b_ptr,
292 setup_timer(&req->timer, disc_timeout, (unsigned long)req); 293 setup_timer(&req->timer, disc_timeout, (unsigned long)req);
293 mod_timer(&req->timer, jiffies + req->timer_intv); 294 mod_timer(&req->timer, jiffies + req->timer_intv);
294 b_ptr->link_req = req; 295 b_ptr->link_req = req;
295 tipc_bearer_send(net, req->bearer_id, req->buf, &req->dest); 296 skb = skb_clone(req->buf, GFP_ATOMIC);
297 if (skb)
298 tipc_bearer_xmit_skb(net, req->bearer_id, skb, &req->dest);
296 return 0; 299 return 0;
297} 300}
298 301
@@ -316,6 +319,7 @@ void tipc_disc_delete(struct tipc_link_req *req)
316void tipc_disc_reset(struct net *net, struct tipc_bearer *b_ptr) 319void tipc_disc_reset(struct net *net, struct tipc_bearer *b_ptr)
317{ 320{
318 struct tipc_link_req *req = b_ptr->link_req; 321 struct tipc_link_req *req = b_ptr->link_req;
322 struct sk_buff *skb;
319 323
320 spin_lock_bh(&req->lock); 324 spin_lock_bh(&req->lock);
321 tipc_disc_init_msg(net, req->buf, DSC_REQ_MSG, b_ptr); 325 tipc_disc_init_msg(net, req->buf, DSC_REQ_MSG, b_ptr);
@@ -325,6 +329,8 @@ void tipc_disc_reset(struct net *net, struct tipc_bearer *b_ptr)
325 req->num_nodes = 0; 329 req->num_nodes = 0;
326 req->timer_intv = TIPC_LINK_REQ_INIT; 330 req->timer_intv = TIPC_LINK_REQ_INIT;
327 mod_timer(&req->timer, jiffies + req->timer_intv); 331 mod_timer(&req->timer, jiffies + req->timer_intv);
328 tipc_bearer_send(net, req->bearer_id, req->buf, &req->dest); 332 skb = skb_clone(req->buf, GFP_ATOMIC);
333 if (skb)
334 tipc_bearer_xmit_skb(net, req->bearer_id, skb, &req->dest);
329 spin_unlock_bh(&req->lock); 335 spin_unlock_bh(&req->lock);
330} 336}
diff --git a/net/tipc/link.c b/net/tipc/link.c
index ff9b0b92e62e..4449fa01e232 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -50,6 +50,7 @@
50 */ 50 */
51static const char *link_co_err = "Link tunneling error, "; 51static const char *link_co_err = "Link tunneling error, ";
52static const char *link_rst_msg = "Resetting link "; 52static const char *link_rst_msg = "Resetting link ";
53static const char tipc_bclink_name[] = "broadcast-link";
53 54
54static const struct nla_policy tipc_nl_link_policy[TIPC_NLA_LINK_MAX + 1] = { 55static const struct nla_policy tipc_nl_link_policy[TIPC_NLA_LINK_MAX + 1] = {
55 [TIPC_NLA_LINK_UNSPEC] = { .type = NLA_UNSPEC }, 56 [TIPC_NLA_LINK_UNSPEC] = { .type = NLA_UNSPEC },
@@ -75,6 +76,14 @@ static const struct nla_policy tipc_nl_prop_policy[TIPC_NLA_PROP_MAX + 1] = {
75 [TIPC_NLA_PROP_WIN] = { .type = NLA_U32 } 76 [TIPC_NLA_PROP_WIN] = { .type = NLA_U32 }
76}; 77};
77 78
79/* Send states for broadcast NACKs
80 */
81enum {
82 BC_NACK_SND_CONDITIONAL,
83 BC_NACK_SND_UNCONDITIONAL,
84 BC_NACK_SND_SUPPRESS,
85};
86
78/* 87/*
79 * Interval between NACKs when packets arrive out of order 88 * Interval between NACKs when packets arrive out of order
80 */ 89 */
@@ -110,7 +119,11 @@ static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe,
110 struct sk_buff_head *xmitq); 119 struct sk_buff_head *xmitq);
111static void link_reset_statistics(struct tipc_link *l_ptr); 120static void link_reset_statistics(struct tipc_link *l_ptr);
112static void link_print(struct tipc_link *l_ptr, const char *str); 121static void link_print(struct tipc_link *l_ptr, const char *str);
113static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf); 122static void tipc_link_build_nack_msg(struct tipc_link *l,
123 struct sk_buff_head *xmitq);
124static void tipc_link_build_bc_init_msg(struct tipc_link *l,
125 struct sk_buff_head *xmitq);
126static bool tipc_link_release_pkts(struct tipc_link *l, u16 to);
114 127
115/* 128/*
116 * Simple non-static link routines (i.e. referenced outside this file) 129 * Simple non-static link routines (i.e. referenced outside this file)
@@ -150,11 +163,66 @@ bool tipc_link_is_blocked(struct tipc_link *l)
150 return l->state & (LINK_RESETTING | LINK_PEER_RESET | LINK_FAILINGOVER); 163 return l->state & (LINK_RESETTING | LINK_PEER_RESET | LINK_FAILINGOVER);
151} 164}
152 165
166bool link_is_bc_sndlink(struct tipc_link *l)
167{
168 return !l->bc_sndlink;
169}
170
171bool link_is_bc_rcvlink(struct tipc_link *l)
172{
173 return ((l->bc_rcvlink == l) && !link_is_bc_sndlink(l));
174}
175
153int tipc_link_is_active(struct tipc_link *l) 176int tipc_link_is_active(struct tipc_link *l)
154{ 177{
155 struct tipc_node *n = l->owner; 178 return l->active;
179}
180
181void tipc_link_set_active(struct tipc_link *l, bool active)
182{
183 l->active = active;
184}
185
186void tipc_link_add_bc_peer(struct tipc_link *snd_l,
187 struct tipc_link *uc_l,
188 struct sk_buff_head *xmitq)
189{
190 struct tipc_link *rcv_l = uc_l->bc_rcvlink;
156 191
157 return (node_active_link(n, 0) == l) || (node_active_link(n, 1) == l); 192 snd_l->ackers++;
193 rcv_l->acked = snd_l->snd_nxt - 1;
194 tipc_link_build_bc_init_msg(uc_l, xmitq);
195}
196
197void tipc_link_remove_bc_peer(struct tipc_link *snd_l,
198 struct tipc_link *rcv_l,
199 struct sk_buff_head *xmitq)
200{
201 u16 ack = snd_l->snd_nxt - 1;
202
203 snd_l->ackers--;
204 tipc_link_bc_ack_rcv(rcv_l, ack, xmitq);
205 tipc_link_reset(rcv_l);
206 rcv_l->state = LINK_RESET;
207 if (!snd_l->ackers) {
208 tipc_link_reset(snd_l);
209 __skb_queue_purge(xmitq);
210 }
211}
212
213int tipc_link_bc_peers(struct tipc_link *l)
214{
215 return l->ackers;
216}
217
218void tipc_link_set_mtu(struct tipc_link *l, int mtu)
219{
220 l->mtu = mtu;
221}
222
223int tipc_link_mtu(struct tipc_link *l)
224{
225 return l->mtu;
158} 226}
159 227
160static u32 link_own_addr(struct tipc_link *l) 228static u32 link_own_addr(struct tipc_link *l)
@@ -165,57 +233,72 @@ static u32 link_own_addr(struct tipc_link *l)
165/** 233/**
166 * tipc_link_create - create a new link 234 * tipc_link_create - create a new link
167 * @n: pointer to associated node 235 * @n: pointer to associated node
168 * @b: pointer to associated bearer 236 * @if_name: associated interface name
237 * @bearer_id: id (index) of associated bearer
238 * @tolerance: link tolerance to be used by link
239 * @net_plane: network plane (A,B,c..) this link belongs to
240 * @mtu: mtu to be advertised by link
241 * @priority: priority to be used by link
242 * @window: send window to be used by link
243 * @session: session to be used by link
169 * @ownnode: identity of own node 244 * @ownnode: identity of own node
170 * @peer: identity of peer node 245 * @peer: node id of peer node
171 * @maddr: media address to be used 246 * @peer_caps: bitmap describing peer node capabilities
247 * @bc_sndlink: the namespace global link used for broadcast sending
248 * @bc_rcvlink: the peer specific link used for broadcast reception
172 * @inputq: queue to put messages ready for delivery 249 * @inputq: queue to put messages ready for delivery
173 * @namedq: queue to put binding table update messages ready for delivery 250 * @namedq: queue to put binding table update messages ready for delivery
174 * @link: return value, pointer to put the created link 251 * @link: return value, pointer to put the created link
175 * 252 *
176 * Returns true if link was created, otherwise false 253 * Returns true if link was created, otherwise false
177 */ 254 */
178bool tipc_link_create(struct tipc_node *n, struct tipc_bearer *b, u32 session, 255bool tipc_link_create(struct net *net, char *if_name, int bearer_id,
179 u32 ownnode, u32 peer, struct tipc_media_addr *maddr, 256 int tolerance, char net_plane, u32 mtu, int priority,
180 struct sk_buff_head *inputq, struct sk_buff_head *namedq, 257 int window, u32 session, u32 ownnode, u32 peer,
258 u16 peer_caps,
259 struct tipc_link *bc_sndlink,
260 struct tipc_link *bc_rcvlink,
261 struct sk_buff_head *inputq,
262 struct sk_buff_head *namedq,
181 struct tipc_link **link) 263 struct tipc_link **link)
182{ 264{
183 struct tipc_link *l; 265 struct tipc_link *l;
184 struct tipc_msg *hdr; 266 struct tipc_msg *hdr;
185 char *if_name;
186 267
187 l = kzalloc(sizeof(*l), GFP_ATOMIC); 268 l = kzalloc(sizeof(*l), GFP_ATOMIC);
188 if (!l) 269 if (!l)
189 return false; 270 return false;
190 *link = l; 271 *link = l;
272 l->pmsg = (struct tipc_msg *)&l->proto_msg;
273 hdr = l->pmsg;
274 tipc_msg_init(ownnode, hdr, LINK_PROTOCOL, RESET_MSG, INT_H_SIZE, peer);
275 msg_set_size(hdr, sizeof(l->proto_msg));
276 msg_set_session(hdr, session);
277 msg_set_bearer_id(hdr, l->bearer_id);
191 278
192 /* Note: peer i/f name is completed by reset/activate message */ 279 /* Note: peer i/f name is completed by reset/activate message */
193 if_name = strchr(b->name, ':') + 1;
194 sprintf(l->name, "%u.%u.%u:%s-%u.%u.%u:unknown", 280 sprintf(l->name, "%u.%u.%u:%s-%u.%u.%u:unknown",
195 tipc_zone(ownnode), tipc_cluster(ownnode), tipc_node(ownnode), 281 tipc_zone(ownnode), tipc_cluster(ownnode), tipc_node(ownnode),
196 if_name, tipc_zone(peer), tipc_cluster(peer), tipc_node(peer)); 282 if_name, tipc_zone(peer), tipc_cluster(peer), tipc_node(peer));
283 strcpy((char *)msg_data(hdr), if_name);
197 284
198 l->addr = peer; 285 l->addr = peer;
199 l->media_addr = maddr; 286 l->peer_caps = peer_caps;
200 l->owner = n; 287 l->net = net;
201 l->peer_session = WILDCARD_SESSION; 288 l->peer_session = WILDCARD_SESSION;
202 l->bearer_id = b->identity; 289 l->bearer_id = bearer_id;
203 l->tolerance = b->tolerance; 290 l->tolerance = tolerance;
204 l->net_plane = b->net_plane; 291 l->net_plane = net_plane;
205 l->advertised_mtu = b->mtu; 292 l->advertised_mtu = mtu;
206 l->mtu = b->mtu; 293 l->mtu = mtu;
207 l->priority = b->priority; 294 l->priority = priority;
208 tipc_link_set_queue_limits(l, b->window); 295 tipc_link_set_queue_limits(l, window);
296 l->ackers = 1;
297 l->bc_sndlink = bc_sndlink;
298 l->bc_rcvlink = bc_rcvlink;
209 l->inputq = inputq; 299 l->inputq = inputq;
210 l->namedq = namedq; 300 l->namedq = namedq;
211 l->state = LINK_RESETTING; 301 l->state = LINK_RESETTING;
212 l->pmsg = (struct tipc_msg *)&l->proto_msg;
213 hdr = l->pmsg;
214 tipc_msg_init(ownnode, hdr, LINK_PROTOCOL, RESET_MSG, INT_H_SIZE, peer);
215 msg_set_size(hdr, sizeof(l->proto_msg));
216 msg_set_session(hdr, session);
217 msg_set_bearer_id(hdr, l->bearer_id);
218 strcpy((char *)msg_data(hdr), if_name);
219 __skb_queue_head_init(&l->transmq); 302 __skb_queue_head_init(&l->transmq);
220 __skb_queue_head_init(&l->backlogq); 303 __skb_queue_head_init(&l->backlogq);
221 __skb_queue_head_init(&l->deferdq); 304 __skb_queue_head_init(&l->deferdq);
@@ -224,27 +307,43 @@ bool tipc_link_create(struct tipc_node *n, struct tipc_bearer *b, u32 session,
224 return true; 307 return true;
225} 308}
226 309
227/* tipc_link_build_bcast_sync_msg() - synchronize broadcast link endpoints. 310/**
311 * tipc_link_bc_create - create new link to be used for broadcast
312 * @n: pointer to associated node
313 * @mtu: mtu to be used
314 * @window: send window to be used
315 * @inputq: queue to put messages ready for delivery
316 * @namedq: queue to put binding table update messages ready for delivery
317 * @link: return value, pointer to put the created link
228 * 318 *
229 * Give a newly added peer node the sequence number where it should 319 * Returns true if link was created, otherwise false
230 * start receiving and acking broadcast packets.
231 */ 320 */
232void tipc_link_build_bcast_sync_msg(struct tipc_link *l, 321bool tipc_link_bc_create(struct net *net, u32 ownnode, u32 peer,
233 struct sk_buff_head *xmitq) 322 int mtu, int window, u16 peer_caps,
323 struct sk_buff_head *inputq,
324 struct sk_buff_head *namedq,
325 struct tipc_link *bc_sndlink,
326 struct tipc_link **link)
234{ 327{
235 struct sk_buff *skb; 328 struct tipc_link *l;
236 struct sk_buff_head list;
237 u16 last_sent;
238 329
239 skb = tipc_msg_create(BCAST_PROTOCOL, STATE_MSG, INT_H_SIZE, 330 if (!tipc_link_create(net, "", MAX_BEARERS, 0, 'Z', mtu, 0, window,
240 0, l->addr, link_own_addr(l), 0, 0, 0); 331 0, ownnode, peer, peer_caps, bc_sndlink,
241 if (!skb) 332 NULL, inputq, namedq, link))
242 return; 333 return false;
243 last_sent = tipc_bclink_get_last_sent(l->owner->net); 334
244 msg_set_last_bcast(buf_msg(skb), last_sent); 335 l = *link;
245 __skb_queue_head_init(&list); 336 strcpy(l->name, tipc_bclink_name);
246 __skb_queue_tail(&list, skb); 337 tipc_link_reset(l);
247 tipc_link_xmit(l, &list, xmitq); 338 l->state = LINK_RESET;
339 l->ackers = 0;
340 l->bc_rcvlink = l;
341
342 /* Broadcast send link is always up */
343 if (link_is_bc_sndlink(l))
344 l->state = LINK_ESTABLISHED;
345
346 return true;
248} 347}
249 348
250/** 349/**
@@ -451,12 +550,17 @@ static void link_profile_stats(struct tipc_link *l)
451 550
452/* tipc_link_timeout - perform periodic task as instructed from node timeout 551/* tipc_link_timeout - perform periodic task as instructed from node timeout
453 */ 552 */
553/* tipc_link_timeout - perform periodic task as instructed from node timeout
554 */
454int tipc_link_timeout(struct tipc_link *l, struct sk_buff_head *xmitq) 555int tipc_link_timeout(struct tipc_link *l, struct sk_buff_head *xmitq)
455{ 556{
456 int rc = 0; 557 int rc = 0;
457 int mtyp = STATE_MSG; 558 int mtyp = STATE_MSG;
458 bool xmit = false; 559 bool xmit = false;
459 bool prb = false; 560 bool prb = false;
561 u16 bc_snt = l->bc_sndlink->snd_nxt - 1;
562 u16 bc_acked = l->bc_rcvlink->acked;
563 bool bc_up = link_is_up(l->bc_rcvlink);
460 564
461 link_profile_stats(l); 565 link_profile_stats(l);
462 566
@@ -464,7 +568,7 @@ int tipc_link_timeout(struct tipc_link *l, struct sk_buff_head *xmitq)
464 case LINK_ESTABLISHED: 568 case LINK_ESTABLISHED:
465 case LINK_SYNCHING: 569 case LINK_SYNCHING:
466 if (!l->silent_intv_cnt) { 570 if (!l->silent_intv_cnt) {
467 if (tipc_bclink_acks_missing(l->owner)) 571 if (bc_up && (bc_acked != bc_snt))
468 xmit = true; 572 xmit = true;
469 } else if (l->silent_intv_cnt <= l->abort_limit) { 573 } else if (l->silent_intv_cnt <= l->abort_limit) {
470 xmit = true; 574 xmit = true;
@@ -555,38 +659,6 @@ void link_prepare_wakeup(struct tipc_link *l)
555 } 659 }
556} 660}
557 661
558/**
559 * tipc_link_reset_fragments - purge link's inbound message fragments queue
560 * @l_ptr: pointer to link
561 */
562void tipc_link_reset_fragments(struct tipc_link *l_ptr)
563{
564 kfree_skb(l_ptr->reasm_buf);
565 l_ptr->reasm_buf = NULL;
566}
567
568void tipc_link_purge_backlog(struct tipc_link *l)
569{
570 __skb_queue_purge(&l->backlogq);
571 l->backlog[TIPC_LOW_IMPORTANCE].len = 0;
572 l->backlog[TIPC_MEDIUM_IMPORTANCE].len = 0;
573 l->backlog[TIPC_HIGH_IMPORTANCE].len = 0;
574 l->backlog[TIPC_CRITICAL_IMPORTANCE].len = 0;
575 l->backlog[TIPC_SYSTEM_IMPORTANCE].len = 0;
576}
577
578/**
579 * tipc_link_purge_queues - purge all pkt queues associated with link
580 * @l_ptr: pointer to link
581 */
582void tipc_link_purge_queues(struct tipc_link *l_ptr)
583{
584 __skb_queue_purge(&l_ptr->deferdq);
585 __skb_queue_purge(&l_ptr->transmq);
586 tipc_link_purge_backlog(l_ptr);
587 tipc_link_reset_fragments(l_ptr);
588}
589
590void tipc_link_reset(struct tipc_link *l) 662void tipc_link_reset(struct tipc_link *l)
591{ 663{
592 /* Link is down, accept any session */ 664 /* Link is down, accept any session */
@@ -598,12 +670,16 @@ void tipc_link_reset(struct tipc_link *l)
598 /* Prepare for renewed mtu size negotiation */ 670 /* Prepare for renewed mtu size negotiation */
599 l->mtu = l->advertised_mtu; 671 l->mtu = l->advertised_mtu;
600 672
601 /* Clean up all queues: */ 673 /* Clean up all queues and counters: */
602 __skb_queue_purge(&l->transmq); 674 __skb_queue_purge(&l->transmq);
603 __skb_queue_purge(&l->deferdq); 675 __skb_queue_purge(&l->deferdq);
604 skb_queue_splice_init(&l->wakeupq, l->inputq); 676 skb_queue_splice_init(&l->wakeupq, l->inputq);
605 677 __skb_queue_purge(&l->backlogq);
606 tipc_link_purge_backlog(l); 678 l->backlog[TIPC_LOW_IMPORTANCE].len = 0;
679 l->backlog[TIPC_MEDIUM_IMPORTANCE].len = 0;
680 l->backlog[TIPC_HIGH_IMPORTANCE].len = 0;
681 l->backlog[TIPC_CRITICAL_IMPORTANCE].len = 0;
682 l->backlog[TIPC_SYSTEM_IMPORTANCE].len = 0;
607 kfree_skb(l->reasm_buf); 683 kfree_skb(l->reasm_buf);
608 kfree_skb(l->failover_reasm_skb); 684 kfree_skb(l->failover_reasm_skb);
609 l->reasm_buf = NULL; 685 l->reasm_buf = NULL;
@@ -611,81 +687,15 @@ void tipc_link_reset(struct tipc_link *l)
611 l->rcv_unacked = 0; 687 l->rcv_unacked = 0;
612 l->snd_nxt = 1; 688 l->snd_nxt = 1;
613 l->rcv_nxt = 1; 689 l->rcv_nxt = 1;
690 l->acked = 0;
614 l->silent_intv_cnt = 0; 691 l->silent_intv_cnt = 0;
615 l->stats.recv_info = 0; 692 l->stats.recv_info = 0;
616 l->stale_count = 0; 693 l->stale_count = 0;
694 l->bc_peer_is_up = false;
617 link_reset_statistics(l); 695 link_reset_statistics(l);
618} 696}
619 697
620/** 698/**
621 * __tipc_link_xmit(): same as tipc_link_xmit, but destlink is known & locked
622 * @link: link to use
623 * @list: chain of buffers containing message
624 *
625 * Consumes the buffer chain, except when returning an error code,
626 * Returns 0 if success, or errno: -ELINKCONG, -EMSGSIZE or -ENOBUFS
627 * Messages at TIPC_SYSTEM_IMPORTANCE are always accepted
628 */
629int __tipc_link_xmit(struct net *net, struct tipc_link *link,
630 struct sk_buff_head *list)
631{
632 struct tipc_msg *msg = buf_msg(skb_peek(list));
633 unsigned int maxwin = link->window;
634 unsigned int i, imp = msg_importance(msg);
635 uint mtu = link->mtu;
636 u16 ack = mod(link->rcv_nxt - 1);
637 u16 seqno = link->snd_nxt;
638 u16 bc_last_in = link->owner->bclink.last_in;
639 struct tipc_media_addr *addr = link->media_addr;
640 struct sk_buff_head *transmq = &link->transmq;
641 struct sk_buff_head *backlogq = &link->backlogq;
642 struct sk_buff *skb, *bskb;
643
644 /* Match msg importance against this and all higher backlog limits: */
645 for (i = imp; i <= TIPC_SYSTEM_IMPORTANCE; i++) {
646 if (unlikely(link->backlog[i].len >= link->backlog[i].limit))
647 return link_schedule_user(link, list);
648 }
649 if (unlikely(msg_size(msg) > mtu))
650 return -EMSGSIZE;
651
652 /* Prepare each packet for sending, and add to relevant queue: */
653 while (skb_queue_len(list)) {
654 skb = skb_peek(list);
655 msg = buf_msg(skb);
656 msg_set_seqno(msg, seqno);
657 msg_set_ack(msg, ack);
658 msg_set_bcast_ack(msg, bc_last_in);
659
660 if (likely(skb_queue_len(transmq) < maxwin)) {
661 __skb_dequeue(list);
662 __skb_queue_tail(transmq, skb);
663 tipc_bearer_send(net, link->bearer_id, skb, addr);
664 link->rcv_unacked = 0;
665 seqno++;
666 continue;
667 }
668 if (tipc_msg_bundle(skb_peek_tail(backlogq), msg, mtu)) {
669 kfree_skb(__skb_dequeue(list));
670 link->stats.sent_bundled++;
671 continue;
672 }
673 if (tipc_msg_make_bundle(&bskb, msg, mtu, link->addr)) {
674 kfree_skb(__skb_dequeue(list));
675 __skb_queue_tail(backlogq, bskb);
676 link->backlog[msg_importance(buf_msg(bskb))].len++;
677 link->stats.sent_bundled++;
678 link->stats.sent_bundles++;
679 continue;
680 }
681 link->backlog[imp].len += skb_queue_len(list);
682 skb_queue_splice_tail_init(list, backlogq);
683 }
684 link->snd_nxt = seqno;
685 return 0;
686}
687
688/**
689 * tipc_link_xmit(): enqueue buffer list according to queue situation 699 * tipc_link_xmit(): enqueue buffer list according to queue situation
690 * @link: link to use 700 * @link: link to use
691 * @list: chain of buffers containing message 701 * @list: chain of buffers containing message
@@ -705,7 +715,7 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list,
705 unsigned int mtu = l->mtu; 715 unsigned int mtu = l->mtu;
706 u16 ack = l->rcv_nxt - 1; 716 u16 ack = l->rcv_nxt - 1;
707 u16 seqno = l->snd_nxt; 717 u16 seqno = l->snd_nxt;
708 u16 bc_last_in = l->owner->bclink.last_in; 718 u16 bc_ack = l->bc_rcvlink->rcv_nxt - 1;
709 struct sk_buff_head *transmq = &l->transmq; 719 struct sk_buff_head *transmq = &l->transmq;
710 struct sk_buff_head *backlogq = &l->backlogq; 720 struct sk_buff_head *backlogq = &l->backlogq;
711 struct sk_buff *skb, *_skb, *bskb; 721 struct sk_buff *skb, *_skb, *bskb;
@@ -724,7 +734,7 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list,
724 hdr = buf_msg(skb); 734 hdr = buf_msg(skb);
725 msg_set_seqno(hdr, seqno); 735 msg_set_seqno(hdr, seqno);
726 msg_set_ack(hdr, ack); 736 msg_set_ack(hdr, ack);
727 msg_set_bcast_ack(hdr, bc_last_in); 737 msg_set_bcast_ack(hdr, bc_ack);
728 738
729 if (likely(skb_queue_len(transmq) < maxwin)) { 739 if (likely(skb_queue_len(transmq) < maxwin)) {
730 _skb = skb_clone(skb, GFP_ATOMIC); 740 _skb = skb_clone(skb, GFP_ATOMIC);
@@ -733,6 +743,7 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list,
733 __skb_dequeue(list); 743 __skb_dequeue(list);
734 __skb_queue_tail(transmq, skb); 744 __skb_queue_tail(transmq, skb);
735 __skb_queue_tail(xmitq, _skb); 745 __skb_queue_tail(xmitq, _skb);
746 TIPC_SKB_CB(skb)->ackers = l->ackers;
736 l->rcv_unacked = 0; 747 l->rcv_unacked = 0;
737 seqno++; 748 seqno++;
738 continue; 749 continue;
@@ -757,62 +768,13 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list,
757 return 0; 768 return 0;
758} 769}
759 770
760/*
761 * tipc_link_sync_rcv - synchronize broadcast link endpoints.
762 * Receive the sequence number where we should start receiving and
763 * acking broadcast packets from a newly added peer node, and open
764 * up for reception of such packets.
765 *
766 * Called with node locked
767 */
768static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf)
769{
770 struct tipc_msg *msg = buf_msg(buf);
771
772 n->bclink.last_sent = n->bclink.last_in = msg_last_bcast(msg);
773 n->bclink.recv_permitted = true;
774 kfree_skb(buf);
775}
776
777/*
778 * tipc_link_push_packets - push unsent packets to bearer
779 *
780 * Push out the unsent messages of a link where congestion
781 * has abated. Node is locked.
782 *
783 * Called with node locked
784 */
785void tipc_link_push_packets(struct tipc_link *link)
786{
787 struct sk_buff *skb;
788 struct tipc_msg *msg;
789 u16 seqno = link->snd_nxt;
790 u16 ack = mod(link->rcv_nxt - 1);
791
792 while (skb_queue_len(&link->transmq) < link->window) {
793 skb = __skb_dequeue(&link->backlogq);
794 if (!skb)
795 break;
796 msg = buf_msg(skb);
797 link->backlog[msg_importance(msg)].len--;
798 msg_set_ack(msg, ack);
799 msg_set_seqno(msg, seqno);
800 seqno = mod(seqno + 1);
801 msg_set_bcast_ack(msg, link->owner->bclink.last_in);
802 link->rcv_unacked = 0;
803 __skb_queue_tail(&link->transmq, skb);
804 tipc_bearer_send(link->owner->net, link->bearer_id,
805 skb, link->media_addr);
806 }
807 link->snd_nxt = seqno;
808}
809
810void tipc_link_advance_backlog(struct tipc_link *l, struct sk_buff_head *xmitq) 771void tipc_link_advance_backlog(struct tipc_link *l, struct sk_buff_head *xmitq)
811{ 772{
812 struct sk_buff *skb, *_skb; 773 struct sk_buff *skb, *_skb;
813 struct tipc_msg *hdr; 774 struct tipc_msg *hdr;
814 u16 seqno = l->snd_nxt; 775 u16 seqno = l->snd_nxt;
815 u16 ack = l->rcv_nxt - 1; 776 u16 ack = l->rcv_nxt - 1;
777 u16 bc_ack = l->bc_rcvlink->rcv_nxt - 1;
816 778
817 while (skb_queue_len(&l->transmq) < l->window) { 779 while (skb_queue_len(&l->transmq) < l->window) {
818 skb = skb_peek(&l->backlogq); 780 skb = skb_peek(&l->backlogq);
@@ -826,96 +788,35 @@ void tipc_link_advance_backlog(struct tipc_link *l, struct sk_buff_head *xmitq)
826 l->backlog[msg_importance(hdr)].len--; 788 l->backlog[msg_importance(hdr)].len--;
827 __skb_queue_tail(&l->transmq, skb); 789 __skb_queue_tail(&l->transmq, skb);
828 __skb_queue_tail(xmitq, _skb); 790 __skb_queue_tail(xmitq, _skb);
829 msg_set_ack(hdr, ack); 791 TIPC_SKB_CB(skb)->ackers = l->ackers;
830 msg_set_seqno(hdr, seqno); 792 msg_set_seqno(hdr, seqno);
831 msg_set_bcast_ack(hdr, l->owner->bclink.last_in); 793 msg_set_ack(hdr, ack);
794 msg_set_bcast_ack(hdr, bc_ack);
832 l->rcv_unacked = 0; 795 l->rcv_unacked = 0;
833 seqno++; 796 seqno++;
834 } 797 }
835 l->snd_nxt = seqno; 798 l->snd_nxt = seqno;
836} 799}
837 800
838static void link_retransmit_failure(struct tipc_link *l_ptr, 801static void link_retransmit_failure(struct tipc_link *l, struct sk_buff *skb)
839 struct sk_buff *buf)
840{
841 struct tipc_msg *msg = buf_msg(buf);
842 struct net *net = l_ptr->owner->net;
843
844 pr_warn("Retransmission failure on link <%s>\n", l_ptr->name);
845
846 if (l_ptr->addr) {
847 /* Handle failure on standard link */
848 link_print(l_ptr, "Resetting link ");
849 pr_info("Failed msg: usr %u, typ %u, len %u, err %u\n",
850 msg_user(msg), msg_type(msg), msg_size(msg),
851 msg_errcode(msg));
852 pr_info("sqno %u, prev: %x, src: %x\n",
853 msg_seqno(msg), msg_prevnode(msg), msg_orignode(msg));
854 } else {
855 /* Handle failure on broadcast link */
856 struct tipc_node *n_ptr;
857 char addr_string[16];
858
859 pr_info("Msg seq number: %u, ", msg_seqno(msg));
860 pr_cont("Outstanding acks: %lu\n",
861 (unsigned long) TIPC_SKB_CB(buf)->handle);
862
863 n_ptr = tipc_bclink_retransmit_to(net);
864
865 tipc_addr_string_fill(addr_string, n_ptr->addr);
866 pr_info("Broadcast link info for %s\n", addr_string);
867 pr_info("Reception permitted: %d, Acked: %u\n",
868 n_ptr->bclink.recv_permitted,
869 n_ptr->bclink.acked);
870 pr_info("Last in: %u, Oos state: %u, Last sent: %u\n",
871 n_ptr->bclink.last_in,
872 n_ptr->bclink.oos_state,
873 n_ptr->bclink.last_sent);
874
875 n_ptr->action_flags |= TIPC_BCAST_RESET;
876 l_ptr->stale_count = 0;
877 }
878}
879
880void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *skb,
881 u32 retransmits)
882{ 802{
883 struct tipc_msg *msg; 803 struct tipc_msg *hdr = buf_msg(skb);
884
885 if (!skb)
886 return;
887
888 msg = buf_msg(skb);
889
890 /* Detect repeated retransmit failures */
891 if (l_ptr->last_retransm == msg_seqno(msg)) {
892 if (++l_ptr->stale_count > 100) {
893 link_retransmit_failure(l_ptr, skb);
894 return;
895 }
896 } else {
897 l_ptr->last_retransm = msg_seqno(msg);
898 l_ptr->stale_count = 1;
899 }
900 804
901 skb_queue_walk_from(&l_ptr->transmq, skb) { 805 pr_warn("Retransmission failure on link <%s>\n", l->name);
902 if (!retransmits) 806 link_print(l, "Resetting link ");
903 break; 807 pr_info("Failed msg: usr %u, typ %u, len %u, err %u\n",
904 msg = buf_msg(skb); 808 msg_user(hdr), msg_type(hdr), msg_size(hdr), msg_errcode(hdr));
905 msg_set_ack(msg, mod(l_ptr->rcv_nxt - 1)); 809 pr_info("sqno %u, prev: %x, src: %x\n",
906 msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in); 810 msg_seqno(hdr), msg_prevnode(hdr), msg_orignode(hdr));
907 tipc_bearer_send(l_ptr->owner->net, l_ptr->bearer_id, skb,
908 l_ptr->media_addr);
909 retransmits--;
910 l_ptr->stats.retransmitted++;
911 }
912} 811}
913 812
914static int tipc_link_retransm(struct tipc_link *l, int retransm, 813int tipc_link_retrans(struct tipc_link *l, u16 from, u16 to,
915 struct sk_buff_head *xmitq) 814 struct sk_buff_head *xmitq)
916{ 815{
917 struct sk_buff *_skb, *skb = skb_peek(&l->transmq); 816 struct sk_buff *_skb, *skb = skb_peek(&l->transmq);
918 struct tipc_msg *hdr; 817 struct tipc_msg *hdr;
818 u16 ack = l->rcv_nxt - 1;
819 u16 bc_ack = l->bc_rcvlink->rcv_nxt - 1;
919 820
920 if (!skb) 821 if (!skb)
921 return 0; 822 return 0;
@@ -928,19 +829,25 @@ static int tipc_link_retransm(struct tipc_link *l, int retransm,
928 link_retransmit_failure(l, skb); 829 link_retransmit_failure(l, skb);
929 return tipc_link_fsm_evt(l, LINK_FAILURE_EVT); 830 return tipc_link_fsm_evt(l, LINK_FAILURE_EVT);
930 } 831 }
832
833 /* Move forward to where retransmission should start */
931 skb_queue_walk(&l->transmq, skb) { 834 skb_queue_walk(&l->transmq, skb) {
932 if (!retransm) 835 if (!less(buf_seqno(skb), from))
933 return 0; 836 break;
837 }
838
839 skb_queue_walk_from(&l->transmq, skb) {
840 if (more(buf_seqno(skb), to))
841 break;
934 hdr = buf_msg(skb); 842 hdr = buf_msg(skb);
935 _skb = __pskb_copy(skb, MIN_H_SIZE, GFP_ATOMIC); 843 _skb = __pskb_copy(skb, MIN_H_SIZE, GFP_ATOMIC);
936 if (!_skb) 844 if (!_skb)
937 return 0; 845 return 0;
938 hdr = buf_msg(_skb); 846 hdr = buf_msg(_skb);
939 msg_set_ack(hdr, l->rcv_nxt - 1); 847 msg_set_ack(hdr, ack);
940 msg_set_bcast_ack(hdr, l->owner->bclink.last_in); 848 msg_set_bcast_ack(hdr, bc_ack);
941 _skb->priority = TC_PRIO_CONTROL; 849 _skb->priority = TC_PRIO_CONTROL;
942 __skb_queue_tail(xmitq, _skb); 850 __skb_queue_tail(xmitq, _skb);
943 retransm--;
944 l->stats.retransmitted++; 851 l->stats.retransmitted++;
945 } 852 }
946 return 0; 853 return 0;
@@ -951,11 +858,9 @@ static int tipc_link_retransm(struct tipc_link *l, int retransm,
951 * Consumes buffer if message is of right type 858 * Consumes buffer if message is of right type
952 * Node lock must be held 859 * Node lock must be held
953 */ 860 */
954static bool tipc_data_input(struct tipc_link *link, struct sk_buff *skb, 861static bool tipc_data_input(struct tipc_link *l, struct sk_buff *skb,
955 struct sk_buff_head *inputq) 862 struct sk_buff_head *inputq)
956{ 863{
957 struct tipc_node *node = link->owner;
958
959 switch (msg_user(buf_msg(skb))) { 864 switch (msg_user(buf_msg(skb))) {
960 case TIPC_LOW_IMPORTANCE: 865 case TIPC_LOW_IMPORTANCE:
961 case TIPC_MEDIUM_IMPORTANCE: 866 case TIPC_MEDIUM_IMPORTANCE:
@@ -965,8 +870,8 @@ static bool tipc_data_input(struct tipc_link *link, struct sk_buff *skb,
965 skb_queue_tail(inputq, skb); 870 skb_queue_tail(inputq, skb);
966 return true; 871 return true;
967 case NAME_DISTRIBUTOR: 872 case NAME_DISTRIBUTOR:
968 node->bclink.recv_permitted = true; 873 l->bc_rcvlink->state = LINK_ESTABLISHED;
969 skb_queue_tail(link->namedq, skb); 874 skb_queue_tail(l->namedq, skb);
970 return true; 875 return true;
971 case MSG_BUNDLER: 876 case MSG_BUNDLER:
972 case TUNNEL_PROTOCOL: 877 case TUNNEL_PROTOCOL:
@@ -987,7 +892,6 @@ static bool tipc_data_input(struct tipc_link *link, struct sk_buff *skb,
987static int tipc_link_input(struct tipc_link *l, struct sk_buff *skb, 892static int tipc_link_input(struct tipc_link *l, struct sk_buff *skb,
988 struct sk_buff_head *inputq) 893 struct sk_buff_head *inputq)
989{ 894{
990 struct tipc_node *node = l->owner;
991 struct tipc_msg *hdr = buf_msg(skb); 895 struct tipc_msg *hdr = buf_msg(skb);
992 struct sk_buff **reasm_skb = &l->reasm_buf; 896 struct sk_buff **reasm_skb = &l->reasm_buf;
993 struct sk_buff *iskb; 897 struct sk_buff *iskb;
@@ -1028,13 +932,15 @@ static int tipc_link_input(struct tipc_link *l, struct sk_buff *skb,
1028 if (tipc_buf_append(reasm_skb, &skb)) { 932 if (tipc_buf_append(reasm_skb, &skb)) {
1029 l->stats.recv_fragmented++; 933 l->stats.recv_fragmented++;
1030 tipc_data_input(l, skb, inputq); 934 tipc_data_input(l, skb, inputq);
1031 } else if (!*reasm_skb) { 935 } else if (!*reasm_skb && !link_is_bc_rcvlink(l)) {
936 pr_warn_ratelimited("Unable to build fragment list\n");
1032 return tipc_link_fsm_evt(l, LINK_FAILURE_EVT); 937 return tipc_link_fsm_evt(l, LINK_FAILURE_EVT);
1033 } 938 }
1034 return 0; 939 return 0;
1035 } else if (usr == BCAST_PROTOCOL) { 940 } else if (usr == BCAST_PROTOCOL) {
1036 tipc_link_sync_rcv(node, skb); 941 tipc_bcast_lock(l->net);
1037 return 0; 942 tipc_link_bc_init_rcv(l->bc_rcvlink, hdr);
943 tipc_bcast_unlock(l->net);
1038 } 944 }
1039drop: 945drop:
1040 kfree_skb(skb); 946 kfree_skb(skb);
@@ -1057,12 +963,28 @@ static bool tipc_link_release_pkts(struct tipc_link *l, u16 acked)
1057} 963}
1058 964
1059/* tipc_link_build_ack_msg: prepare link acknowledge message for transmission 965/* tipc_link_build_ack_msg: prepare link acknowledge message for transmission
966 *
967 * Note that sending of broadcast ack is coordinated among nodes, to reduce
968 * risk of ack storms towards the sender
1060 */ 969 */
1061void tipc_link_build_ack_msg(struct tipc_link *l, struct sk_buff_head *xmitq) 970int tipc_link_build_ack_msg(struct tipc_link *l, struct sk_buff_head *xmitq)
1062{ 971{
972 if (!l)
973 return 0;
974
975 /* Broadcast ACK must be sent via a unicast link => defer to caller */
976 if (link_is_bc_rcvlink(l)) {
977 if (((l->rcv_nxt ^ link_own_addr(l)) & 0xf) != 0xf)
978 return 0;
979 l->rcv_unacked = 0;
980 return TIPC_LINK_SND_BC_ACK;
981 }
982
983 /* Unicast ACK */
1063 l->rcv_unacked = 0; 984 l->rcv_unacked = 0;
1064 l->stats.sent_acks++; 985 l->stats.sent_acks++;
1065 tipc_link_build_proto_msg(l, STATE_MSG, 0, 0, 0, 0, xmitq); 986 tipc_link_build_proto_msg(l, STATE_MSG, 0, 0, 0, 0, xmitq);
987 return 0;
1066} 988}
1067 989
1068/* tipc_link_build_reset_msg: prepare link RESET or ACTIVATE message 990/* tipc_link_build_reset_msg: prepare link RESET or ACTIVATE message
@@ -1084,6 +1006,9 @@ static void tipc_link_build_nack_msg(struct tipc_link *l,
1084{ 1006{
1085 u32 def_cnt = ++l->stats.deferred_recv; 1007 u32 def_cnt = ++l->stats.deferred_recv;
1086 1008
1009 if (link_is_bc_rcvlink(l))
1010 return;
1011
1087 if ((skb_queue_len(&l->deferdq) == 1) || !(def_cnt % TIPC_NACK_INTV)) 1012 if ((skb_queue_len(&l->deferdq) == 1) || !(def_cnt % TIPC_NACK_INTV))
1088 tipc_link_build_proto_msg(l, STATE_MSG, 0, 0, 0, 0, xmitq); 1013 tipc_link_build_proto_msg(l, STATE_MSG, 0, 0, 0, 0, xmitq);
1089} 1014}
@@ -1144,12 +1069,11 @@ int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb,
1144 l->rcv_nxt++; 1069 l->rcv_nxt++;
1145 l->stats.recv_info++; 1070 l->stats.recv_info++;
1146 if (!tipc_data_input(l, skb, l->inputq)) 1071 if (!tipc_data_input(l, skb, l->inputq))
1147 rc = tipc_link_input(l, skb, l->inputq); 1072 rc |= tipc_link_input(l, skb, l->inputq);
1148 if (unlikely(rc))
1149 break;
1150 if (unlikely(++l->rcv_unacked >= TIPC_MIN_LINK_WIN)) 1073 if (unlikely(++l->rcv_unacked >= TIPC_MIN_LINK_WIN))
1151 tipc_link_build_ack_msg(l, xmitq); 1074 rc |= tipc_link_build_ack_msg(l, xmitq);
1152 1075 if (unlikely(rc & ~TIPC_LINK_SND_BC_ACK))
1076 break;
1153 } while ((skb = __skb_dequeue(defq))); 1077 } while ((skb = __skb_dequeue(defq)));
1154 1078
1155 return rc; 1079 return rc;
@@ -1158,45 +1082,6 @@ drop:
1158 return rc; 1082 return rc;
1159} 1083}
1160 1084
1161/**
1162 * tipc_link_defer_pkt - Add out-of-sequence message to deferred reception queue
1163 *
1164 * Returns increase in queue length (i.e. 0 or 1)
1165 */
1166u32 tipc_link_defer_pkt(struct sk_buff_head *list, struct sk_buff *skb)
1167{
1168 struct sk_buff *skb1;
1169 u16 seq_no = buf_seqno(skb);
1170
1171 /* Empty queue ? */
1172 if (skb_queue_empty(list)) {
1173 __skb_queue_tail(list, skb);
1174 return 1;
1175 }
1176
1177 /* Last ? */
1178 if (less(buf_seqno(skb_peek_tail(list)), seq_no)) {
1179 __skb_queue_tail(list, skb);
1180 return 1;
1181 }
1182
1183 /* Locate insertion point in queue, then insert; discard if duplicate */
1184 skb_queue_walk(list, skb1) {
1185 u16 curr_seqno = buf_seqno(skb1);
1186
1187 if (seq_no == curr_seqno) {
1188 kfree_skb(skb);
1189 return 0;
1190 }
1191
1192 if (less(seq_no, curr_seqno))
1193 break;
1194 }
1195
1196 __skb_queue_before(list, skb1, skb);
1197 return 1;
1198}
1199
1200/* 1085/*
1201 * Send protocol message to the other endpoint. 1086 * Send protocol message to the other endpoint.
1202 */ 1087 */
@@ -1212,23 +1097,17 @@ void tipc_link_proto_xmit(struct tipc_link *l, u32 msg_typ, int probe_msg,
1212 skb = __skb_dequeue(&xmitq); 1097 skb = __skb_dequeue(&xmitq);
1213 if (!skb) 1098 if (!skb)
1214 return; 1099 return;
1215 tipc_bearer_send(l->owner->net, l->bearer_id, skb, l->media_addr); 1100 tipc_bearer_xmit_skb(l->net, l->bearer_id, skb, l->media_addr);
1216 l->rcv_unacked = 0; 1101 l->rcv_unacked = 0;
1217 kfree_skb(skb);
1218} 1102}
1219 1103
1220/* tipc_link_build_proto_msg: prepare link protocol message for transmission
1221 */
1222static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe, 1104static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe,
1223 u16 rcvgap, int tolerance, int priority, 1105 u16 rcvgap, int tolerance, int priority,
1224 struct sk_buff_head *xmitq) 1106 struct sk_buff_head *xmitq)
1225{ 1107{
1226 struct sk_buff *skb = NULL; 1108 struct sk_buff *skb = NULL;
1227 struct tipc_msg *hdr = l->pmsg; 1109 struct tipc_msg *hdr = l->pmsg;
1228 u16 snd_nxt = l->snd_nxt; 1110 bool node_up = link_is_up(l->bc_rcvlink);
1229 u16 rcv_nxt = l->rcv_nxt;
1230 u16 rcv_last = rcv_nxt - 1;
1231 int node_up = l->owner->bclink.recv_permitted;
1232 1111
1233 /* Don't send protocol message during reset or link failover */ 1112 /* Don't send protocol message during reset or link failover */
1234 if (tipc_link_is_blocked(l)) 1113 if (tipc_link_is_blocked(l))
@@ -1236,33 +1115,34 @@ static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe,
1236 1115
1237 msg_set_type(hdr, mtyp); 1116 msg_set_type(hdr, mtyp);
1238 msg_set_net_plane(hdr, l->net_plane); 1117 msg_set_net_plane(hdr, l->net_plane);
1239 msg_set_bcast_ack(hdr, l->owner->bclink.last_in); 1118 msg_set_next_sent(hdr, l->snd_nxt);
1240 msg_set_last_bcast(hdr, tipc_bclink_get_last_sent(l->owner->net)); 1119 msg_set_ack(hdr, l->rcv_nxt - 1);
1120 msg_set_bcast_ack(hdr, l->bc_rcvlink->rcv_nxt - 1);
1121 msg_set_last_bcast(hdr, l->bc_sndlink->snd_nxt - 1);
1241 msg_set_link_tolerance(hdr, tolerance); 1122 msg_set_link_tolerance(hdr, tolerance);
1242 msg_set_linkprio(hdr, priority); 1123 msg_set_linkprio(hdr, priority);
1243 msg_set_redundant_link(hdr, node_up); 1124 msg_set_redundant_link(hdr, node_up);
1244 msg_set_seq_gap(hdr, 0); 1125 msg_set_seq_gap(hdr, 0);
1245 1126
1246 /* Compatibility: created msg must not be in sequence with pkt flow */ 1127 /* Compatibility: created msg must not be in sequence with pkt flow */
1247 msg_set_seqno(hdr, snd_nxt + U16_MAX / 2); 1128 msg_set_seqno(hdr, l->snd_nxt + U16_MAX / 2);
1248 1129
1249 if (mtyp == STATE_MSG) { 1130 if (mtyp == STATE_MSG) {
1250 if (!tipc_link_is_up(l)) 1131 if (!tipc_link_is_up(l))
1251 return; 1132 return;
1252 msg_set_next_sent(hdr, snd_nxt);
1253 1133
1254 /* Override rcvgap if there are packets in deferred queue */ 1134 /* Override rcvgap if there are packets in deferred queue */
1255 if (!skb_queue_empty(&l->deferdq)) 1135 if (!skb_queue_empty(&l->deferdq))
1256 rcvgap = buf_seqno(skb_peek(&l->deferdq)) - rcv_nxt; 1136 rcvgap = buf_seqno(skb_peek(&l->deferdq)) - l->rcv_nxt;
1257 if (rcvgap) { 1137 if (rcvgap) {
1258 msg_set_seq_gap(hdr, rcvgap); 1138 msg_set_seq_gap(hdr, rcvgap);
1259 l->stats.sent_nacks++; 1139 l->stats.sent_nacks++;
1260 } 1140 }
1261 msg_set_ack(hdr, rcv_last);
1262 msg_set_probe(hdr, probe); 1141 msg_set_probe(hdr, probe);
1263 if (probe) 1142 if (probe)
1264 l->stats.sent_probes++; 1143 l->stats.sent_probes++;
1265 l->stats.sent_states++; 1144 l->stats.sent_states++;
1145 l->rcv_unacked = 0;
1266 } else { 1146 } else {
1267 /* RESET_MSG or ACTIVATE_MSG */ 1147 /* RESET_MSG or ACTIVATE_MSG */
1268 msg_set_max_pkt(hdr, l->advertised_mtu); 1148 msg_set_max_pkt(hdr, l->advertised_mtu);
@@ -1354,7 +1234,8 @@ static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb,
1354{ 1234{
1355 struct tipc_msg *hdr = buf_msg(skb); 1235 struct tipc_msg *hdr = buf_msg(skb);
1356 u16 rcvgap = 0; 1236 u16 rcvgap = 0;
1357 u16 nacked_gap = msg_seq_gap(hdr); 1237 u16 ack = msg_ack(hdr);
1238 u16 gap = msg_seq_gap(hdr);
1358 u16 peers_snd_nxt = msg_next_sent(hdr); 1239 u16 peers_snd_nxt = msg_next_sent(hdr);
1359 u16 peers_tol = msg_link_tolerance(hdr); 1240 u16 peers_tol = msg_link_tolerance(hdr);
1360 u16 peers_prio = msg_linkprio(hdr); 1241 u16 peers_prio = msg_linkprio(hdr);
@@ -1363,7 +1244,7 @@ static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb,
1363 char *if_name; 1244 char *if_name;
1364 int rc = 0; 1245 int rc = 0;
1365 1246
1366 if (tipc_link_is_blocked(l)) 1247 if (tipc_link_is_blocked(l) || !xmitq)
1367 goto exit; 1248 goto exit;
1368 1249
1369 if (link_own_addr(l) > msg_prevnode(hdr)) 1250 if (link_own_addr(l) > msg_prevnode(hdr))
@@ -1433,11 +1314,11 @@ static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb,
1433 if (rcvgap || (msg_probe(hdr))) 1314 if (rcvgap || (msg_probe(hdr)))
1434 tipc_link_build_proto_msg(l, STATE_MSG, 0, rcvgap, 1315 tipc_link_build_proto_msg(l, STATE_MSG, 0, rcvgap,
1435 0, 0, xmitq); 1316 0, 0, xmitq);
1436 tipc_link_release_pkts(l, msg_ack(hdr)); 1317 tipc_link_release_pkts(l, ack);
1437 1318
1438 /* If NACK, retransmit will now start at right position */ 1319 /* If NACK, retransmit will now start at right position */
1439 if (nacked_gap) { 1320 if (gap) {
1440 rc = tipc_link_retransm(l, nacked_gap, xmitq); 1321 rc = tipc_link_retrans(l, ack + 1, ack + gap, xmitq);
1441 l->stats.recv_nacks++; 1322 l->stats.recv_nacks++;
1442 } 1323 }
1443 1324
@@ -1450,6 +1331,188 @@ exit:
1450 return rc; 1331 return rc;
1451} 1332}
1452 1333
1334/* tipc_link_build_bc_proto_msg() - create broadcast protocol message
1335 */
1336static bool tipc_link_build_bc_proto_msg(struct tipc_link *l, bool bcast,
1337 u16 peers_snd_nxt,
1338 struct sk_buff_head *xmitq)
1339{
1340 struct sk_buff *skb;
1341 struct tipc_msg *hdr;
1342 struct sk_buff *dfrd_skb = skb_peek(&l->deferdq);
1343 u16 ack = l->rcv_nxt - 1;
1344 u16 gap_to = peers_snd_nxt - 1;
1345
1346 skb = tipc_msg_create(BCAST_PROTOCOL, STATE_MSG, INT_H_SIZE,
1347 0, l->addr, link_own_addr(l), 0, 0, 0);
1348 if (!skb)
1349 return false;
1350 hdr = buf_msg(skb);
1351 msg_set_last_bcast(hdr, l->bc_sndlink->snd_nxt - 1);
1352 msg_set_bcast_ack(hdr, ack);
1353 msg_set_bcgap_after(hdr, ack);
1354 if (dfrd_skb)
1355 gap_to = buf_seqno(dfrd_skb) - 1;
1356 msg_set_bcgap_to(hdr, gap_to);
1357 msg_set_non_seq(hdr, bcast);
1358 __skb_queue_tail(xmitq, skb);
1359 return true;
1360}
1361
1362/* tipc_link_build_bc_init_msg() - synchronize broadcast link endpoints.
1363 *
1364 * Give a newly added peer node the sequence number where it should
1365 * start receiving and acking broadcast packets.
1366 */
1367void tipc_link_build_bc_init_msg(struct tipc_link *l,
1368 struct sk_buff_head *xmitq)
1369{
1370 struct sk_buff_head list;
1371
1372 __skb_queue_head_init(&list);
1373 if (!tipc_link_build_bc_proto_msg(l->bc_rcvlink, false, 0, &list))
1374 return;
1375 tipc_link_xmit(l, &list, xmitq);
1376}
1377
1378/* tipc_link_bc_init_rcv - receive initial broadcast synch data from peer
1379 */
1380void tipc_link_bc_init_rcv(struct tipc_link *l, struct tipc_msg *hdr)
1381{
1382 int mtyp = msg_type(hdr);
1383 u16 peers_snd_nxt = msg_bc_snd_nxt(hdr);
1384
1385 if (link_is_up(l))
1386 return;
1387
1388 if (msg_user(hdr) == BCAST_PROTOCOL) {
1389 l->rcv_nxt = peers_snd_nxt;
1390 l->state = LINK_ESTABLISHED;
1391 return;
1392 }
1393
1394 if (l->peer_caps & TIPC_BCAST_SYNCH)
1395 return;
1396
1397 if (msg_peer_node_is_up(hdr))
1398 return;
1399
1400 /* Compatibility: accept older, less safe initial synch data */
1401 if ((mtyp == RESET_MSG) || (mtyp == ACTIVATE_MSG))
1402 l->rcv_nxt = peers_snd_nxt;
1403}
1404
1405/* tipc_link_bc_sync_rcv - update rcv link according to peer's send state
1406 */
1407void tipc_link_bc_sync_rcv(struct tipc_link *l, struct tipc_msg *hdr,
1408 struct sk_buff_head *xmitq)
1409{
1410 u16 peers_snd_nxt = msg_bc_snd_nxt(hdr);
1411
1412 if (!link_is_up(l))
1413 return;
1414
1415 if (!msg_peer_node_is_up(hdr))
1416 return;
1417
1418 l->bc_peer_is_up = true;
1419
1420 /* Ignore if peers_snd_nxt goes beyond receive window */
1421 if (more(peers_snd_nxt, l->rcv_nxt + l->window))
1422 return;
1423
1424 if (!more(peers_snd_nxt, l->rcv_nxt)) {
1425 l->nack_state = BC_NACK_SND_CONDITIONAL;
1426 return;
1427 }
1428
1429 /* Don't NACK if one was recently sent or peeked */
1430 if (l->nack_state == BC_NACK_SND_SUPPRESS) {
1431 l->nack_state = BC_NACK_SND_UNCONDITIONAL;
1432 return;
1433 }
1434
1435 /* Conditionally delay NACK sending until next synch rcv */
1436 if (l->nack_state == BC_NACK_SND_CONDITIONAL) {
1437 l->nack_state = BC_NACK_SND_UNCONDITIONAL;
1438 if ((peers_snd_nxt - l->rcv_nxt) < TIPC_MIN_LINK_WIN)
1439 return;
1440 }
1441
1442 /* Send NACK now but suppress next one */
1443 tipc_link_build_bc_proto_msg(l, true, peers_snd_nxt, xmitq);
1444 l->nack_state = BC_NACK_SND_SUPPRESS;
1445}
1446
1447void tipc_link_bc_ack_rcv(struct tipc_link *l, u16 acked,
1448 struct sk_buff_head *xmitq)
1449{
1450 struct sk_buff *skb, *tmp;
1451 struct tipc_link *snd_l = l->bc_sndlink;
1452
1453 if (!link_is_up(l) || !l->bc_peer_is_up)
1454 return;
1455
1456 if (!more(acked, l->acked))
1457 return;
1458
1459 /* Skip over packets peer has already acked */
1460 skb_queue_walk(&snd_l->transmq, skb) {
1461 if (more(buf_seqno(skb), l->acked))
1462 break;
1463 }
1464
1465 /* Update/release the packets peer is acking now */
1466 skb_queue_walk_from_safe(&snd_l->transmq, skb, tmp) {
1467 if (more(buf_seqno(skb), acked))
1468 break;
1469 if (!--TIPC_SKB_CB(skb)->ackers) {
1470 __skb_unlink(skb, &snd_l->transmq);
1471 kfree_skb(skb);
1472 }
1473 }
1474 l->acked = acked;
1475 tipc_link_advance_backlog(snd_l, xmitq);
1476 if (unlikely(!skb_queue_empty(&snd_l->wakeupq)))
1477 link_prepare_wakeup(snd_l);
1478}
1479
1480/* tipc_link_bc_nack_rcv(): receive broadcast nack message
1481 */
1482int tipc_link_bc_nack_rcv(struct tipc_link *l, struct sk_buff *skb,
1483 struct sk_buff_head *xmitq)
1484{
1485 struct tipc_msg *hdr = buf_msg(skb);
1486 u32 dnode = msg_destnode(hdr);
1487 int mtyp = msg_type(hdr);
1488 u16 acked = msg_bcast_ack(hdr);
1489 u16 from = acked + 1;
1490 u16 to = msg_bcgap_to(hdr);
1491 u16 peers_snd_nxt = to + 1;
1492 int rc = 0;
1493
1494 kfree_skb(skb);
1495
1496 if (!tipc_link_is_up(l) || !l->bc_peer_is_up)
1497 return 0;
1498
1499 if (mtyp != STATE_MSG)
1500 return 0;
1501
1502 if (dnode == link_own_addr(l)) {
1503 tipc_link_bc_ack_rcv(l, acked, xmitq);
1504 rc = tipc_link_retrans(l->bc_sndlink, from, to, xmitq);
1505 l->stats.recv_nacks++;
1506 return rc;
1507 }
1508
1509 /* Msg for other node => suppress own NACK at next sync if applicable */
1510 if (more(peers_snd_nxt, l->rcv_nxt) && !less(l->rcv_nxt, from))
1511 l->nack_state = BC_NACK_SND_SUPPRESS;
1512
1513 return 0;
1514}
1515
1453void tipc_link_set_queue_limits(struct tipc_link *l, u32 win) 1516void tipc_link_set_queue_limits(struct tipc_link *l, u32 win)
1454{ 1517{
1455 int max_bulk = TIPC_MAX_PUBLICATIONS / (l->mtu / ITEM_SIZE); 1518 int max_bulk = TIPC_MAX_PUBLICATIONS / (l->mtu / ITEM_SIZE);
@@ -1514,7 +1577,7 @@ static void link_reset_statistics(struct tipc_link *l_ptr)
1514static void link_print(struct tipc_link *l, const char *str) 1577static void link_print(struct tipc_link *l, const char *str)
1515{ 1578{
1516 struct sk_buff *hskb = skb_peek(&l->transmq); 1579 struct sk_buff *hskb = skb_peek(&l->transmq);
1517 u16 head = hskb ? msg_seqno(buf_msg(hskb)) : l->snd_nxt; 1580 u16 head = hskb ? msg_seqno(buf_msg(hskb)) : l->snd_nxt - 1;
1518 u16 tail = l->snd_nxt - 1; 1581 u16 tail = l->snd_nxt - 1;
1519 1582
1520 pr_info("%s Link <%s> state %x\n", str, l->name, l->state); 1583 pr_info("%s Link <%s> state %x\n", str, l->name, l->state);
@@ -1738,7 +1801,7 @@ static int __tipc_nl_add_link(struct net *net, struct tipc_nl_msg *msg,
1738 if (tipc_link_is_up(link)) 1801 if (tipc_link_is_up(link))
1739 if (nla_put_flag(msg->skb, TIPC_NLA_LINK_UP)) 1802 if (nla_put_flag(msg->skb, TIPC_NLA_LINK_UP))
1740 goto attr_msg_full; 1803 goto attr_msg_full;
1741 if (tipc_link_is_active(link)) 1804 if (link->active)
1742 if (nla_put_flag(msg->skb, TIPC_NLA_LINK_ACTIVE)) 1805 if (nla_put_flag(msg->skb, TIPC_NLA_LINK_ACTIVE))
1743 goto attr_msg_full; 1806 goto attr_msg_full;
1744 1807
diff --git a/net/tipc/link.h b/net/tipc/link.h
index 0201212cb49a..66d859b66c84 100644
--- a/net/tipc/link.h
+++ b/net/tipc/link.h
@@ -66,7 +66,8 @@ enum {
66 */ 66 */
67enum { 67enum {
68 TIPC_LINK_UP_EVT = 1, 68 TIPC_LINK_UP_EVT = 1,
69 TIPC_LINK_DOWN_EVT = (1 << 1) 69 TIPC_LINK_DOWN_EVT = (1 << 1),
70 TIPC_LINK_SND_BC_ACK = (1 << 2)
70}; 71};
71 72
72/* Starting value for maximum packet size negotiation on unicast links 73/* Starting value for maximum packet size negotiation on unicast links
@@ -110,7 +111,7 @@ struct tipc_stats {
110 * @name: link name character string 111 * @name: link name character string
111 * @media_addr: media address to use when sending messages over link 112 * @media_addr: media address to use when sending messages over link
112 * @timer: link timer 113 * @timer: link timer
113 * @owner: pointer to peer node 114 * @net: pointer to namespace struct
114 * @refcnt: reference counter for permanent references (owner node & timer) 115 * @refcnt: reference counter for permanent references (owner node & timer)
115 * @peer_session: link session # being used by peer end of link 116 * @peer_session: link session # being used by peer end of link
116 * @peer_bearer_id: bearer id used by link's peer endpoint 117 * @peer_bearer_id: bearer id used by link's peer endpoint
@@ -119,6 +120,7 @@ struct tipc_stats {
119 * @keepalive_intv: link keepalive timer interval 120 * @keepalive_intv: link keepalive timer interval
120 * @abort_limit: # of unacknowledged continuity probes needed to reset link 121 * @abort_limit: # of unacknowledged continuity probes needed to reset link
121 * @state: current state of link FSM 122 * @state: current state of link FSM
123 * @peer_caps: bitmap describing capabilities of peer node
122 * @silent_intv_cnt: # of timer intervals without any reception from peer 124 * @silent_intv_cnt: # of timer intervals without any reception from peer
123 * @proto_msg: template for control messages generated by link 125 * @proto_msg: template for control messages generated by link
124 * @pmsg: convenience pointer to "proto_msg" field 126 * @pmsg: convenience pointer to "proto_msg" field
@@ -134,6 +136,8 @@ struct tipc_stats {
134 * @snt_nxt: next sequence number to use for outbound messages 136 * @snt_nxt: next sequence number to use for outbound messages
135 * @last_retransmitted: sequence number of most recently retransmitted message 137 * @last_retransmitted: sequence number of most recently retransmitted message
136 * @stale_count: # of identical retransmit requests made by peer 138 * @stale_count: # of identical retransmit requests made by peer
139 * @ackers: # of peers that needs to ack each packet before it can be released
140 * @acked: # last packet acked by a certain peer. Used for broadcast.
137 * @rcv_nxt: next sequence number to expect for inbound messages 141 * @rcv_nxt: next sequence number to expect for inbound messages
138 * @deferred_queue: deferred queue saved OOS b'cast message received from node 142 * @deferred_queue: deferred queue saved OOS b'cast message received from node
139 * @unacked_window: # of inbound messages rx'd without ack'ing back to peer 143 * @unacked_window: # of inbound messages rx'd without ack'ing back to peer
@@ -143,13 +147,14 @@ struct tipc_stats {
143 * @wakeupq: linked list of wakeup msgs waiting for link congestion to abate 147 * @wakeupq: linked list of wakeup msgs waiting for link congestion to abate
144 * @long_msg_seq_no: next identifier to use for outbound fragmented messages 148 * @long_msg_seq_no: next identifier to use for outbound fragmented messages
145 * @reasm_buf: head of partially reassembled inbound message fragments 149 * @reasm_buf: head of partially reassembled inbound message fragments
150 * @bc_rcvr: marks that this is a broadcast receiver link
146 * @stats: collects statistics regarding link activity 151 * @stats: collects statistics regarding link activity
147 */ 152 */
148struct tipc_link { 153struct tipc_link {
149 u32 addr; 154 u32 addr;
150 char name[TIPC_MAX_LINK_NAME]; 155 char name[TIPC_MAX_LINK_NAME];
151 struct tipc_media_addr *media_addr; 156 struct tipc_media_addr *media_addr;
152 struct tipc_node *owner; 157 struct net *net;
153 158
154 /* Management and link supervision data */ 159 /* Management and link supervision data */
155 u32 peer_session; 160 u32 peer_session;
@@ -159,6 +164,8 @@ struct tipc_link {
159 unsigned long keepalive_intv; 164 unsigned long keepalive_intv;
160 u32 abort_limit; 165 u32 abort_limit;
161 u32 state; 166 u32 state;
167 u16 peer_caps;
168 bool active;
162 u32 silent_intv_cnt; 169 u32 silent_intv_cnt;
163 struct { 170 struct {
164 unchar hdr[INT_H_SIZE]; 171 unchar hdr[INT_H_SIZE];
@@ -201,18 +208,35 @@ struct tipc_link {
201 /* Fragmentation/reassembly */ 208 /* Fragmentation/reassembly */
202 struct sk_buff *reasm_buf; 209 struct sk_buff *reasm_buf;
203 210
211 /* Broadcast */
212 u16 ackers;
213 u16 acked;
214 struct tipc_link *bc_rcvlink;
215 struct tipc_link *bc_sndlink;
216 int nack_state;
217 bool bc_peer_is_up;
218
204 /* Statistics */ 219 /* Statistics */
205 struct tipc_stats stats; 220 struct tipc_stats stats;
206}; 221};
207 222
208bool tipc_link_create(struct tipc_node *n, struct tipc_bearer *b, u32 session, 223bool tipc_link_create(struct net *net, char *if_name, int bearer_id,
209 u32 ownnode, u32 peer, struct tipc_media_addr *maddr, 224 int tolerance, char net_plane, u32 mtu, int priority,
210 struct sk_buff_head *inputq, struct sk_buff_head *namedq, 225 int window, u32 session, u32 ownnode, u32 peer,
226 u16 peer_caps,
227 struct tipc_link *bc_sndlink,
228 struct tipc_link *bc_rcvlink,
229 struct sk_buff_head *inputq,
230 struct sk_buff_head *namedq,
211 struct tipc_link **link); 231 struct tipc_link **link);
232bool tipc_link_bc_create(struct net *net, u32 ownnode, u32 peer,
233 int mtu, int window, u16 peer_caps,
234 struct sk_buff_head *inputq,
235 struct sk_buff_head *namedq,
236 struct tipc_link *bc_sndlink,
237 struct tipc_link **link);
212void tipc_link_tnl_prepare(struct tipc_link *l, struct tipc_link *tnl, 238void tipc_link_tnl_prepare(struct tipc_link *l, struct tipc_link *tnl,
213 int mtyp, struct sk_buff_head *xmitq); 239 int mtyp, struct sk_buff_head *xmitq);
214void tipc_link_build_bcast_sync_msg(struct tipc_link *l,
215 struct sk_buff_head *xmitq);
216void tipc_link_build_reset_msg(struct tipc_link *l, struct sk_buff_head *xmitq); 240void tipc_link_build_reset_msg(struct tipc_link *l, struct sk_buff_head *xmitq);
217int tipc_link_fsm_evt(struct tipc_link *l, int evt); 241int tipc_link_fsm_evt(struct tipc_link *l, int evt);
218void tipc_link_reset_fragments(struct tipc_link *l_ptr); 242void tipc_link_reset_fragments(struct tipc_link *l_ptr);
@@ -223,23 +247,11 @@ bool tipc_link_is_establishing(struct tipc_link *l);
223bool tipc_link_is_synching(struct tipc_link *l); 247bool tipc_link_is_synching(struct tipc_link *l);
224bool tipc_link_is_failingover(struct tipc_link *l); 248bool tipc_link_is_failingover(struct tipc_link *l);
225bool tipc_link_is_blocked(struct tipc_link *l); 249bool tipc_link_is_blocked(struct tipc_link *l);
226int tipc_link_is_active(struct tipc_link *l_ptr); 250void tipc_link_set_active(struct tipc_link *l, bool active);
227void tipc_link_purge_queues(struct tipc_link *l_ptr);
228void tipc_link_purge_backlog(struct tipc_link *l);
229void tipc_link_reset(struct tipc_link *l_ptr); 251void tipc_link_reset(struct tipc_link *l_ptr);
230int __tipc_link_xmit(struct net *net, struct tipc_link *link,
231 struct sk_buff_head *list);
232int tipc_link_xmit(struct tipc_link *link, struct sk_buff_head *list, 252int tipc_link_xmit(struct tipc_link *link, struct sk_buff_head *list,
233 struct sk_buff_head *xmitq); 253 struct sk_buff_head *xmitq);
234void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int prob, 254void tipc_link_set_queue_limits(struct tipc_link *l, u32 window);
235 u32 gap, u32 tolerance, u32 priority);
236void tipc_link_push_packets(struct tipc_link *l_ptr);
237u32 tipc_link_defer_pkt(struct sk_buff_head *list, struct sk_buff *buf);
238void tipc_link_set_queue_limits(struct tipc_link *l_ptr, u32 window);
239void tipc_link_retransmit(struct tipc_link *l_ptr,
240 struct sk_buff *start, u32 retransmits);
241struct sk_buff *tipc_skb_queue_next(const struct sk_buff_head *list,
242 const struct sk_buff *skb);
243 255
244int tipc_nl_link_dump(struct sk_buff *skb, struct netlink_callback *cb); 256int tipc_nl_link_dump(struct sk_buff *skb, struct netlink_callback *cb);
245int tipc_nl_link_get(struct sk_buff *skb, struct genl_info *info); 257int tipc_nl_link_get(struct sk_buff *skb, struct genl_info *info);
@@ -249,5 +261,23 @@ int tipc_nl_parse_link_prop(struct nlattr *prop, struct nlattr *props[]);
249int tipc_link_timeout(struct tipc_link *l, struct sk_buff_head *xmitq); 261int tipc_link_timeout(struct tipc_link *l, struct sk_buff_head *xmitq);
250int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb, 262int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb,
251 struct sk_buff_head *xmitq); 263 struct sk_buff_head *xmitq);
252 264int tipc_link_build_ack_msg(struct tipc_link *l, struct sk_buff_head *xmitq);
265void tipc_link_add_bc_peer(struct tipc_link *snd_l,
266 struct tipc_link *uc_l,
267 struct sk_buff_head *xmitq);
268void tipc_link_remove_bc_peer(struct tipc_link *snd_l,
269 struct tipc_link *rcv_l,
270 struct sk_buff_head *xmitq);
271int tipc_link_bc_peers(struct tipc_link *l);
272void tipc_link_set_mtu(struct tipc_link *l, int mtu);
273int tipc_link_mtu(struct tipc_link *l);
274void tipc_link_bc_ack_rcv(struct tipc_link *l, u16 acked,
275 struct sk_buff_head *xmitq);
276void tipc_link_build_bc_sync_msg(struct tipc_link *l,
277 struct sk_buff_head *xmitq);
278void tipc_link_bc_init_rcv(struct tipc_link *l, struct tipc_msg *hdr);
279void tipc_link_bc_sync_rcv(struct tipc_link *l, struct tipc_msg *hdr,
280 struct sk_buff_head *xmitq);
281int tipc_link_bc_nack_rcv(struct tipc_link *l, struct sk_buff *skb,
282 struct sk_buff_head *xmitq);
253#endif 283#endif
diff --git a/net/tipc/msg.c b/net/tipc/msg.c
index 26d38b3d8760..8740930f0787 100644
--- a/net/tipc/msg.c
+++ b/net/tipc/msg.c
@@ -182,7 +182,6 @@ int tipc_buf_append(struct sk_buff **headbuf, struct sk_buff **buf)
182 *buf = NULL; 182 *buf = NULL;
183 return 0; 183 return 0;
184err: 184err:
185 pr_warn_ratelimited("Unable to build fragment list\n");
186 kfree_skb(*buf); 185 kfree_skb(*buf);
187 kfree_skb(*headbuf); 186 kfree_skb(*headbuf);
188 *buf = *headbuf = NULL; 187 *buf = *headbuf = NULL;
@@ -565,18 +564,22 @@ bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb, int *err)
565/* tipc_msg_reassemble() - clone a buffer chain of fragments and 564/* tipc_msg_reassemble() - clone a buffer chain of fragments and
566 * reassemble the clones into one message 565 * reassemble the clones into one message
567 */ 566 */
568struct sk_buff *tipc_msg_reassemble(struct sk_buff_head *list) 567bool tipc_msg_reassemble(struct sk_buff_head *list, struct sk_buff_head *rcvq)
569{ 568{
570 struct sk_buff *skb; 569 struct sk_buff *skb, *_skb;
571 struct sk_buff *frag = NULL; 570 struct sk_buff *frag = NULL;
572 struct sk_buff *head = NULL; 571 struct sk_buff *head = NULL;
573 int hdr_sz; 572 int hdr_len;
574 573
575 /* Copy header if single buffer */ 574 /* Copy header if single buffer */
576 if (skb_queue_len(list) == 1) { 575 if (skb_queue_len(list) == 1) {
577 skb = skb_peek(list); 576 skb = skb_peek(list);
578 hdr_sz = skb_headroom(skb) + msg_hdr_sz(buf_msg(skb)); 577 hdr_len = skb_headroom(skb) + msg_hdr_sz(buf_msg(skb));
579 return __pskb_copy(skb, hdr_sz, GFP_ATOMIC); 578 _skb = __pskb_copy(skb, hdr_len, GFP_ATOMIC);
579 if (!_skb)
580 return false;
581 __skb_queue_tail(rcvq, _skb);
582 return true;
580 } 583 }
581 584
582 /* Clone all fragments and reassemble */ 585 /* Clone all fragments and reassemble */
@@ -590,11 +593,12 @@ struct sk_buff *tipc_msg_reassemble(struct sk_buff_head *list)
590 if (!head) 593 if (!head)
591 goto error; 594 goto error;
592 } 595 }
593 return frag; 596 __skb_queue_tail(rcvq, frag);
597 return true;
594error: 598error:
595 pr_warn("Failed do clone local mcast rcv buffer\n"); 599 pr_warn("Failed do clone local mcast rcv buffer\n");
596 kfree_skb(head); 600 kfree_skb(head);
597 return NULL; 601 return false;
598} 602}
599 603
600/* tipc_skb_queue_sorted(); sort pkt into list according to sequence number 604/* tipc_skb_queue_sorted(); sort pkt into list according to sequence number
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index 9f0ef54be612..55778a0aebf3 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -112,6 +112,7 @@ struct tipc_skb_cb {
112 bool wakeup_pending; 112 bool wakeup_pending;
113 u16 chain_sz; 113 u16 chain_sz;
114 u16 chain_imp; 114 u16 chain_imp;
115 u16 ackers;
115}; 116};
116 117
117#define TIPC_SKB_CB(__skb) ((struct tipc_skb_cb *)&((__skb)->cb[0])) 118#define TIPC_SKB_CB(__skb) ((struct tipc_skb_cb *)&((__skb)->cb[0]))
@@ -600,6 +601,11 @@ static inline u32 msg_last_bcast(struct tipc_msg *m)
600 return msg_bits(m, 4, 16, 0xffff); 601 return msg_bits(m, 4, 16, 0xffff);
601} 602}
602 603
604static inline u32 msg_bc_snd_nxt(struct tipc_msg *m)
605{
606 return msg_last_bcast(m) + 1;
607}
608
603static inline void msg_set_last_bcast(struct tipc_msg *m, u32 n) 609static inline void msg_set_last_bcast(struct tipc_msg *m, u32 n)
604{ 610{
605 msg_set_bits(m, 4, 16, 0xffff, n); 611 msg_set_bits(m, 4, 16, 0xffff, n);
@@ -789,7 +795,7 @@ bool tipc_msg_extract(struct sk_buff *skb, struct sk_buff **iskb, int *pos);
789int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m, 795int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m,
790 int offset, int dsz, int mtu, struct sk_buff_head *list); 796 int offset, int dsz, int mtu, struct sk_buff_head *list);
791bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb, int *err); 797bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb, int *err);
792struct sk_buff *tipc_msg_reassemble(struct sk_buff_head *list); 798bool tipc_msg_reassemble(struct sk_buff_head *list, struct sk_buff_head *rcvq);
793void __tipc_skb_queue_sorted(struct sk_buff_head *list, u16 seqno, 799void __tipc_skb_queue_sorted(struct sk_buff_head *list, u16 seqno,
794 struct sk_buff *skb); 800 struct sk_buff *skb);
795 801
diff --git a/net/tipc/name_distr.c b/net/tipc/name_distr.c
index e6018b7eb197..c07612bab95c 100644
--- a/net/tipc/name_distr.c
+++ b/net/tipc/name_distr.c
@@ -102,7 +102,7 @@ void named_cluster_distribute(struct net *net, struct sk_buff *skb)
102 if (!oskb) 102 if (!oskb)
103 break; 103 break;
104 msg_set_destnode(buf_msg(oskb), dnode); 104 msg_set_destnode(buf_msg(oskb), dnode);
105 tipc_node_xmit_skb(net, oskb, dnode, dnode); 105 tipc_node_xmit_skb(net, oskb, dnode, 0);
106 } 106 }
107 rcu_read_unlock(); 107 rcu_read_unlock();
108 108
@@ -223,7 +223,7 @@ void tipc_named_node_up(struct net *net, u32 dnode)
223 &tn->nametbl->publ_list[TIPC_ZONE_SCOPE]); 223 &tn->nametbl->publ_list[TIPC_ZONE_SCOPE]);
224 rcu_read_unlock(); 224 rcu_read_unlock();
225 225
226 tipc_node_xmit(net, &head, dnode, dnode); 226 tipc_node_xmit(net, &head, dnode, 0);
227} 227}
228 228
229static void tipc_publ_subscribe(struct net *net, struct publication *publ, 229static void tipc_publ_subscribe(struct net *net, struct publication *publ,
diff --git a/net/tipc/net.c b/net/tipc/net.c
index d6d1399ae229..77bf9113c7a7 100644
--- a/net/tipc/net.c
+++ b/net/tipc/net.c
@@ -112,14 +112,11 @@ int tipc_net_start(struct net *net, u32 addr)
112{ 112{
113 struct tipc_net *tn = net_generic(net, tipc_net_id); 113 struct tipc_net *tn = net_generic(net, tipc_net_id);
114 char addr_string[16]; 114 char addr_string[16];
115 int res;
116 115
117 tn->own_addr = addr; 116 tn->own_addr = addr;
118 tipc_named_reinit(net); 117 tipc_named_reinit(net);
119 tipc_sk_reinit(net); 118 tipc_sk_reinit(net);
120 res = tipc_bclink_init(net); 119 tipc_bcast_reinit(net);
121 if (res)
122 return res;
123 120
124 tipc_nametbl_publish(net, TIPC_CFG_SRV, tn->own_addr, tn->own_addr, 121 tipc_nametbl_publish(net, TIPC_CFG_SRV, tn->own_addr, tn->own_addr,
125 TIPC_ZONE_SCOPE, 0, tn->own_addr); 122 TIPC_ZONE_SCOPE, 0, tn->own_addr);
@@ -142,7 +139,6 @@ void tipc_net_stop(struct net *net)
142 tn->own_addr); 139 tn->own_addr);
143 rtnl_lock(); 140 rtnl_lock();
144 tipc_bearer_stop(net); 141 tipc_bearer_stop(net);
145 tipc_bclink_stop(net);
146 tipc_node_stop(net); 142 tipc_node_stop(net);
147 rtnl_unlock(); 143 rtnl_unlock();
148 144
diff --git a/net/tipc/node.c b/net/tipc/node.c
index 2670751d0e2e..7493506b069b 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -72,7 +72,6 @@ static void __tipc_node_link_down(struct tipc_node *n, int *bearer_id,
72static void tipc_node_link_down(struct tipc_node *n, int bearer_id, 72static void tipc_node_link_down(struct tipc_node *n, int bearer_id,
73 bool delete); 73 bool delete);
74static void node_lost_contact(struct tipc_node *n, struct sk_buff_head *inputq); 74static void node_lost_contact(struct tipc_node *n, struct sk_buff_head *inputq);
75static void node_established_contact(struct tipc_node *n_ptr);
76static void tipc_node_delete(struct tipc_node *node); 75static void tipc_node_delete(struct tipc_node *node);
77static void tipc_node_timeout(unsigned long data); 76static void tipc_node_timeout(unsigned long data);
78static void tipc_node_fsm_evt(struct tipc_node *n, int evt); 77static void tipc_node_fsm_evt(struct tipc_node *n, int evt);
@@ -165,8 +164,10 @@ struct tipc_node *tipc_node_create(struct net *net, u32 addr, u16 capabilities)
165 INIT_LIST_HEAD(&n_ptr->list); 164 INIT_LIST_HEAD(&n_ptr->list);
166 INIT_LIST_HEAD(&n_ptr->publ_list); 165 INIT_LIST_HEAD(&n_ptr->publ_list);
167 INIT_LIST_HEAD(&n_ptr->conn_sks); 166 INIT_LIST_HEAD(&n_ptr->conn_sks);
168 skb_queue_head_init(&n_ptr->bclink.namedq); 167 skb_queue_head_init(&n_ptr->bc_entry.namedq);
169 __skb_queue_head_init(&n_ptr->bclink.deferdq); 168 skb_queue_head_init(&n_ptr->bc_entry.inputq1);
169 __skb_queue_head_init(&n_ptr->bc_entry.arrvq);
170 skb_queue_head_init(&n_ptr->bc_entry.inputq2);
170 hlist_add_head_rcu(&n_ptr->hash, &tn->node_htable[tipc_hashfn(addr)]); 171 hlist_add_head_rcu(&n_ptr->hash, &tn->node_htable[tipc_hashfn(addr)]);
171 list_for_each_entry_rcu(temp_node, &tn->node_list, list) { 172 list_for_each_entry_rcu(temp_node, &tn->node_list, list) {
172 if (n_ptr->addr < temp_node->addr) 173 if (n_ptr->addr < temp_node->addr)
@@ -177,6 +178,18 @@ struct tipc_node *tipc_node_create(struct net *net, u32 addr, u16 capabilities)
177 n_ptr->signature = INVALID_NODE_SIG; 178 n_ptr->signature = INVALID_NODE_SIG;
178 n_ptr->active_links[0] = INVALID_BEARER_ID; 179 n_ptr->active_links[0] = INVALID_BEARER_ID;
179 n_ptr->active_links[1] = INVALID_BEARER_ID; 180 n_ptr->active_links[1] = INVALID_BEARER_ID;
181 if (!tipc_link_bc_create(net, tipc_own_addr(net), n_ptr->addr,
182 U16_MAX, tipc_bc_sndlink(net)->window,
183 n_ptr->capabilities,
184 &n_ptr->bc_entry.inputq1,
185 &n_ptr->bc_entry.namedq,
186 tipc_bc_sndlink(net),
187 &n_ptr->bc_entry.link)) {
188 pr_warn("Broadcast rcv link creation failed, no memory\n");
189 kfree(n_ptr);
190 n_ptr = NULL;
191 goto exit;
192 }
180 tipc_node_get(n_ptr); 193 tipc_node_get(n_ptr);
181 setup_timer(&n_ptr->timer, tipc_node_timeout, (unsigned long)n_ptr); 194 setup_timer(&n_ptr->timer, tipc_node_timeout, (unsigned long)n_ptr);
182 n_ptr->keepalive_intv = U32_MAX; 195 n_ptr->keepalive_intv = U32_MAX;
@@ -203,6 +216,7 @@ static void tipc_node_delete(struct tipc_node *node)
203{ 216{
204 list_del_rcu(&node->list); 217 list_del_rcu(&node->list);
205 hlist_del_rcu(&node->hash); 218 hlist_del_rcu(&node->hash);
219 kfree(node->bc_entry.link);
206 kfree_rcu(node, rcu); 220 kfree_rcu(node, rcu);
207} 221}
208 222
@@ -332,6 +346,7 @@ static void __tipc_node_link_up(struct tipc_node *n, int bearer_id,
332 n->links[bearer_id].mtu = nl->mtu - INT_H_SIZE; 346 n->links[bearer_id].mtu = nl->mtu - INT_H_SIZE;
333 347
334 tipc_bearer_add_dest(n->net, bearer_id, n->addr); 348 tipc_bearer_add_dest(n->net, bearer_id, n->addr);
349 tipc_bcast_inc_bearer_dst_cnt(n->net, bearer_id);
335 350
336 pr_debug("Established link <%s> on network plane %c\n", 351 pr_debug("Established link <%s> on network plane %c\n",
337 nl->name, nl->net_plane); 352 nl->name, nl->net_plane);
@@ -340,8 +355,9 @@ static void __tipc_node_link_up(struct tipc_node *n, int bearer_id,
340 if (!ol) { 355 if (!ol) {
341 *slot0 = bearer_id; 356 *slot0 = bearer_id;
342 *slot1 = bearer_id; 357 *slot1 = bearer_id;
343 tipc_link_build_bcast_sync_msg(nl, xmitq); 358 tipc_node_fsm_evt(n, SELF_ESTABL_CONTACT_EVT);
344 node_established_contact(n); 359 n->action_flags |= TIPC_NOTIFY_NODE_UP;
360 tipc_bcast_add_peer(n->net, nl, xmitq);
345 return; 361 return;
346 } 362 }
347 363
@@ -350,8 +366,11 @@ static void __tipc_node_link_up(struct tipc_node *n, int bearer_id,
350 pr_debug("Old link <%s> becomes standby\n", ol->name); 366 pr_debug("Old link <%s> becomes standby\n", ol->name);
351 *slot0 = bearer_id; 367 *slot0 = bearer_id;
352 *slot1 = bearer_id; 368 *slot1 = bearer_id;
369 tipc_link_set_active(nl, true);
370 tipc_link_set_active(ol, false);
353 } else if (nl->priority == ol->priority) { 371 } else if (nl->priority == ol->priority) {
354 *slot0 = bearer_id; 372 tipc_link_set_active(nl, true);
373 *slot1 = bearer_id;
355 } else { 374 } else {
356 pr_debug("New link <%s> is standby\n", nl->name); 375 pr_debug("New link <%s> is standby\n", nl->name);
357 } 376 }
@@ -428,8 +447,10 @@ static void __tipc_node_link_down(struct tipc_node *n, int *bearer_id,
428 tipc_link_build_reset_msg(l, xmitq); 447 tipc_link_build_reset_msg(l, xmitq);
429 *maddr = &n->links[*bearer_id].maddr; 448 *maddr = &n->links[*bearer_id].maddr;
430 node_lost_contact(n, &le->inputq); 449 node_lost_contact(n, &le->inputq);
450 tipc_bcast_dec_bearer_dst_cnt(n->net, *bearer_id);
431 return; 451 return;
432 } 452 }
453 tipc_bcast_dec_bearer_dst_cnt(n->net, *bearer_id);
433 454
434 /* There is still a working link => initiate failover */ 455 /* There is still a working link => initiate failover */
435 tnl = node_active_link(n, 0); 456 tnl = node_active_link(n, 0);
@@ -493,6 +514,7 @@ void tipc_node_check_dest(struct net *net, u32 onode,
493 bool link_up = false; 514 bool link_up = false;
494 bool accept_addr = false; 515 bool accept_addr = false;
495 bool reset = true; 516 bool reset = true;
517 char *if_name;
496 518
497 *dupl_addr = false; 519 *dupl_addr = false;
498 *respond = false; 520 *respond = false;
@@ -579,9 +601,15 @@ void tipc_node_check_dest(struct net *net, u32 onode,
579 pr_warn("Cannot establish 3rd link to %x\n", n->addr); 601 pr_warn("Cannot establish 3rd link to %x\n", n->addr);
580 goto exit; 602 goto exit;
581 } 603 }
582 if (!tipc_link_create(n, b, mod(tipc_net(net)->random), 604 if_name = strchr(b->name, ':') + 1;
583 tipc_own_addr(net), onode, &le->maddr, 605 if (!tipc_link_create(net, if_name, b->identity, b->tolerance,
584 &le->inputq, &n->bclink.namedq, &l)) { 606 b->net_plane, b->mtu, b->priority,
607 b->window, mod(tipc_net(net)->random),
608 tipc_own_addr(net), onode,
609 n->capabilities,
610 tipc_bc_sndlink(n->net), n->bc_entry.link,
611 &le->inputq,
612 &n->bc_entry.namedq, &l)) {
585 *respond = false; 613 *respond = false;
586 goto exit; 614 goto exit;
587 } 615 }
@@ -824,58 +852,36 @@ bool tipc_node_filter_pkt(struct tipc_node *n, struct tipc_msg *hdr)
824 return true; 852 return true;
825} 853}
826 854
827static void node_established_contact(struct tipc_node *n_ptr) 855static void node_lost_contact(struct tipc_node *n,
828{
829 tipc_node_fsm_evt(n_ptr, SELF_ESTABL_CONTACT_EVT);
830 n_ptr->action_flags |= TIPC_NOTIFY_NODE_UP;
831 n_ptr->bclink.oos_state = 0;
832 n_ptr->bclink.acked = tipc_bclink_get_last_sent(n_ptr->net);
833 tipc_bclink_add_node(n_ptr->net, n_ptr->addr);
834}
835
836static void node_lost_contact(struct tipc_node *n_ptr,
837 struct sk_buff_head *inputq) 856 struct sk_buff_head *inputq)
838{ 857{
839 char addr_string[16]; 858 char addr_string[16];
840 struct tipc_sock_conn *conn, *safe; 859 struct tipc_sock_conn *conn, *safe;
841 struct tipc_link *l; 860 struct tipc_link *l;
842 struct list_head *conns = &n_ptr->conn_sks; 861 struct list_head *conns = &n->conn_sks;
843 struct sk_buff *skb; 862 struct sk_buff *skb;
844 struct tipc_net *tn = net_generic(n_ptr->net, tipc_net_id);
845 uint i; 863 uint i;
846 864
847 pr_debug("Lost contact with %s\n", 865 pr_debug("Lost contact with %s\n",
848 tipc_addr_string_fill(addr_string, n_ptr->addr)); 866 tipc_addr_string_fill(addr_string, n->addr));
849 867
850 /* Flush broadcast link info associated with lost node */ 868 /* Clean up broadcast state */
851 if (n_ptr->bclink.recv_permitted) { 869 tipc_bcast_remove_peer(n->net, n->bc_entry.link);
852 __skb_queue_purge(&n_ptr->bclink.deferdq);
853
854 if (n_ptr->bclink.reasm_buf) {
855 kfree_skb(n_ptr->bclink.reasm_buf);
856 n_ptr->bclink.reasm_buf = NULL;
857 }
858
859 tipc_bclink_remove_node(n_ptr->net, n_ptr->addr);
860 tipc_bclink_acknowledge(n_ptr, INVALID_LINK_SEQ);
861
862 n_ptr->bclink.recv_permitted = false;
863 }
864 870
865 /* Abort any ongoing link failover */ 871 /* Abort any ongoing link failover */
866 for (i = 0; i < MAX_BEARERS; i++) { 872 for (i = 0; i < MAX_BEARERS; i++) {
867 l = n_ptr->links[i].link; 873 l = n->links[i].link;
868 if (l) 874 if (l)
869 tipc_link_fsm_evt(l, LINK_FAILOVER_END_EVT); 875 tipc_link_fsm_evt(l, LINK_FAILOVER_END_EVT);
870 } 876 }
871 877
872 /* Notify publications from this node */ 878 /* Notify publications from this node */
873 n_ptr->action_flags |= TIPC_NOTIFY_NODE_DOWN; 879 n->action_flags |= TIPC_NOTIFY_NODE_DOWN;
874 880
875 /* Notify sockets connected to node */ 881 /* Notify sockets connected to node */
876 list_for_each_entry_safe(conn, safe, conns, list) { 882 list_for_each_entry_safe(conn, safe, conns, list) {
877 skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, TIPC_CONN_MSG, 883 skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, TIPC_CONN_MSG,
878 SHORT_H_SIZE, 0, tn->own_addr, 884 SHORT_H_SIZE, 0, tipc_own_addr(n->net),
879 conn->peer_node, conn->port, 885 conn->peer_node, conn->port,
880 conn->peer_port, TIPC_ERR_NO_NODE); 886 conn->peer_port, TIPC_ERR_NO_NODE);
881 if (likely(skb)) 887 if (likely(skb))
@@ -937,18 +943,13 @@ void tipc_node_unlock(struct tipc_node *node)
937 publ_list = &node->publ_list; 943 publ_list = &node->publ_list;
938 944
939 node->action_flags &= ~(TIPC_NOTIFY_NODE_DOWN | TIPC_NOTIFY_NODE_UP | 945 node->action_flags &= ~(TIPC_NOTIFY_NODE_DOWN | TIPC_NOTIFY_NODE_UP |
940 TIPC_NOTIFY_LINK_DOWN | TIPC_NOTIFY_LINK_UP | 946 TIPC_NOTIFY_LINK_DOWN | TIPC_NOTIFY_LINK_UP);
941 TIPC_WAKEUP_BCAST_USERS | TIPC_BCAST_MSG_EVT |
942 TIPC_BCAST_RESET);
943 947
944 spin_unlock_bh(&node->lock); 948 spin_unlock_bh(&node->lock);
945 949
946 if (flags & TIPC_NOTIFY_NODE_DOWN) 950 if (flags & TIPC_NOTIFY_NODE_DOWN)
947 tipc_publ_notify(net, publ_list, addr); 951 tipc_publ_notify(net, publ_list, addr);
948 952
949 if (flags & TIPC_WAKEUP_BCAST_USERS)
950 tipc_bclink_wakeup_users(net);
951
952 if (flags & TIPC_NOTIFY_NODE_UP) 953 if (flags & TIPC_NOTIFY_NODE_UP)
953 tipc_named_node_up(net, addr); 954 tipc_named_node_up(net, addr);
954 955
@@ -960,11 +961,6 @@ void tipc_node_unlock(struct tipc_node *node)
960 tipc_nametbl_withdraw(net, TIPC_LINK_STATE, addr, 961 tipc_nametbl_withdraw(net, TIPC_LINK_STATE, addr,
961 link_id, addr); 962 link_id, addr);
962 963
963 if (flags & TIPC_BCAST_MSG_EVT)
964 tipc_bclink_input(net);
965
966 if (flags & TIPC_BCAST_RESET)
967 tipc_node_reset_links(node);
968} 964}
969 965
970/* Caller should hold node lock for the passed node */ 966/* Caller should hold node lock for the passed node */
@@ -1080,6 +1076,67 @@ int tipc_node_xmit_skb(struct net *net, struct sk_buff *skb, u32 dnode,
1080} 1076}
1081 1077
1082/** 1078/**
1079 * tipc_node_bc_rcv - process TIPC broadcast packet arriving from off-node
1080 * @net: the applicable net namespace
1081 * @skb: TIPC packet
1082 * @bearer_id: id of bearer message arrived on
1083 *
1084 * Invoked with no locks held.
1085 */
1086void tipc_node_bc_rcv(struct net *net, struct sk_buff *skb, int bearer_id)
1087{
1088 int rc;
1089 struct sk_buff_head xmitq;
1090 struct tipc_bclink_entry *be;
1091 struct tipc_link_entry *le;
1092 struct tipc_msg *hdr = buf_msg(skb);
1093 int usr = msg_user(hdr);
1094 u32 dnode = msg_destnode(hdr);
1095 struct tipc_node *n;
1096
1097 __skb_queue_head_init(&xmitq);
1098
1099 /* If NACK for other node, let rcv link for that node peek into it */
1100 if ((usr == BCAST_PROTOCOL) && (dnode != tipc_own_addr(net)))
1101 n = tipc_node_find(net, dnode);
1102 else
1103 n = tipc_node_find(net, msg_prevnode(hdr));
1104 if (!n) {
1105 kfree_skb(skb);
1106 return;
1107 }
1108 be = &n->bc_entry;
1109 le = &n->links[bearer_id];
1110
1111 rc = tipc_bcast_rcv(net, be->link, skb);
1112
1113 /* Broadcast link reset may happen at reassembly failure */
1114 if (rc & TIPC_LINK_DOWN_EVT)
1115 tipc_node_reset_links(n);
1116
1117 /* Broadcast ACKs are sent on a unicast link */
1118 if (rc & TIPC_LINK_SND_BC_ACK) {
1119 tipc_node_lock(n);
1120 tipc_link_build_ack_msg(le->link, &xmitq);
1121 tipc_node_unlock(n);
1122 }
1123
1124 if (!skb_queue_empty(&xmitq))
1125 tipc_bearer_xmit(net, bearer_id, &xmitq, &le->maddr);
1126
1127 /* Deliver. 'arrvq' is under inputq2's lock protection */
1128 if (!skb_queue_empty(&be->inputq1)) {
1129 spin_lock_bh(&be->inputq2.lock);
1130 spin_lock_bh(&be->inputq1.lock);
1131 skb_queue_splice_tail_init(&be->inputq1, &be->arrvq);
1132 spin_unlock_bh(&be->inputq1.lock);
1133 spin_unlock_bh(&be->inputq2.lock);
1134 tipc_sk_mcast_rcv(net, &be->arrvq, &be->inputq2);
1135 }
1136 tipc_node_put(n);
1137}
1138
1139/**
1083 * tipc_node_check_state - check and if necessary update node state 1140 * tipc_node_check_state - check and if necessary update node state
1084 * @skb: TIPC packet 1141 * @skb: TIPC packet
1085 * @bearer_id: identity of bearer delivering the packet 1142 * @bearer_id: identity of bearer delivering the packet
@@ -1221,6 +1278,7 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b)
1221 int usr = msg_user(hdr); 1278 int usr = msg_user(hdr);
1222 int bearer_id = b->identity; 1279 int bearer_id = b->identity;
1223 struct tipc_link_entry *le; 1280 struct tipc_link_entry *le;
1281 u16 bc_ack = msg_bcast_ack(hdr);
1224 int rc = 0; 1282 int rc = 0;
1225 1283
1226 __skb_queue_head_init(&xmitq); 1284 __skb_queue_head_init(&xmitq);
@@ -1229,13 +1287,12 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b)
1229 if (unlikely(!tipc_msg_validate(skb))) 1287 if (unlikely(!tipc_msg_validate(skb)))
1230 goto discard; 1288 goto discard;
1231 1289
1232 /* Handle arrival of a non-unicast link packet */ 1290 /* Handle arrival of discovery or broadcast packet */
1233 if (unlikely(msg_non_seq(hdr))) { 1291 if (unlikely(msg_non_seq(hdr))) {
1234 if (usr == LINK_CONFIG) 1292 if (unlikely(usr == LINK_CONFIG))
1235 tipc_disc_rcv(net, skb, b); 1293 return tipc_disc_rcv(net, skb, b);
1236 else 1294 else
1237 tipc_bclink_rcv(net, skb); 1295 return tipc_node_bc_rcv(net, skb, bearer_id);
1238 return;
1239 } 1296 }
1240 1297
1241 /* Locate neighboring node that sent packet */ 1298 /* Locate neighboring node that sent packet */
@@ -1244,19 +1301,18 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b)
1244 goto discard; 1301 goto discard;
1245 le = &n->links[bearer_id]; 1302 le = &n->links[bearer_id];
1246 1303
1304 /* Ensure broadcast reception is in synch with peer's send state */
1305 if (unlikely(usr == LINK_PROTOCOL))
1306 tipc_bcast_sync_rcv(net, n->bc_entry.link, hdr);
1307 else if (unlikely(n->bc_entry.link->acked != bc_ack))
1308 tipc_bcast_ack_rcv(net, n->bc_entry.link, bc_ack);
1309
1247 tipc_node_lock(n); 1310 tipc_node_lock(n);
1248 1311
1249 /* Is reception permitted at the moment ? */ 1312 /* Is reception permitted at the moment ? */
1250 if (!tipc_node_filter_pkt(n, hdr)) 1313 if (!tipc_node_filter_pkt(n, hdr))
1251 goto unlock; 1314 goto unlock;
1252 1315
1253 if (unlikely(msg_user(hdr) == LINK_PROTOCOL))
1254 tipc_bclink_sync_state(n, hdr);
1255
1256 /* Release acked broadcast packets */
1257 if (unlikely(n->bclink.acked != msg_bcast_ack(hdr)))
1258 tipc_bclink_acknowledge(n, msg_bcast_ack(hdr));
1259
1260 /* Check and if necessary update node state */ 1316 /* Check and if necessary update node state */
1261 if (likely(tipc_node_check_state(n, skb, bearer_id, &xmitq))) { 1317 if (likely(tipc_node_check_state(n, skb, bearer_id, &xmitq))) {
1262 rc = tipc_link_rcv(le->link, skb, &xmitq); 1318 rc = tipc_link_rcv(le->link, skb, &xmitq);
@@ -1271,8 +1327,8 @@ unlock:
1271 if (unlikely(rc & TIPC_LINK_DOWN_EVT)) 1327 if (unlikely(rc & TIPC_LINK_DOWN_EVT))
1272 tipc_node_link_down(n, bearer_id, false); 1328 tipc_node_link_down(n, bearer_id, false);
1273 1329
1274 if (unlikely(!skb_queue_empty(&n->bclink.namedq))) 1330 if (unlikely(!skb_queue_empty(&n->bc_entry.namedq)))
1275 tipc_named_rcv(net, &n->bclink.namedq); 1331 tipc_named_rcv(net, &n->bc_entry.namedq);
1276 1332
1277 if (!skb_queue_empty(&le->inputq)) 1333 if (!skb_queue_empty(&le->inputq))
1278 tipc_sk_rcv(net, &le->inputq); 1334 tipc_sk_rcv(net, &le->inputq);
diff --git a/net/tipc/node.h b/net/tipc/node.h
index 344b3e7594fd..6734562d3c6e 100644
--- a/net/tipc/node.h
+++ b/net/tipc/node.h
@@ -55,36 +55,18 @@
55enum { 55enum {
56 TIPC_NOTIFY_NODE_DOWN = (1 << 3), 56 TIPC_NOTIFY_NODE_DOWN = (1 << 3),
57 TIPC_NOTIFY_NODE_UP = (1 << 4), 57 TIPC_NOTIFY_NODE_UP = (1 << 4),
58 TIPC_WAKEUP_BCAST_USERS = (1 << 5),
59 TIPC_NOTIFY_LINK_UP = (1 << 6), 58 TIPC_NOTIFY_LINK_UP = (1 << 6),
60 TIPC_NOTIFY_LINK_DOWN = (1 << 7), 59 TIPC_NOTIFY_LINK_DOWN = (1 << 7)
61 TIPC_BCAST_MSG_EVT = (1 << 9),
62 TIPC_BCAST_RESET = (1 << 10)
63}; 60};
64 61
65/** 62/* Optional capabilities supported by this code version
66 * struct tipc_node_bclink - TIPC node bclink structure
67 * @acked: sequence # of last outbound b'cast message acknowledged by node
68 * @last_in: sequence # of last in-sequence b'cast message received from node
69 * @last_sent: sequence # of last b'cast message sent by node
70 * @oos_state: state tracker for handling OOS b'cast messages
71 * @deferred_queue: deferred queue saved OOS b'cast message received from node
72 * @reasm_buf: broadcast reassembly queue head from node
73 * @inputq_map: bitmap indicating which inqueues should be kicked
74 * @recv_permitted: true if node is allowed to receive b'cast messages
75 */ 63 */
76struct tipc_node_bclink { 64enum {
77 u32 acked; 65 TIPC_BCAST_SYNCH = (1 << 1)
78 u32 last_in;
79 u32 last_sent;
80 u32 oos_state;
81 u32 deferred_size;
82 struct sk_buff_head deferdq;
83 struct sk_buff *reasm_buf;
84 struct sk_buff_head namedq;
85 bool recv_permitted;
86}; 66};
87 67
68#define TIPC_NODE_CAPABILITIES TIPC_BCAST_SYNCH
69
88struct tipc_link_entry { 70struct tipc_link_entry {
89 struct tipc_link *link; 71 struct tipc_link *link;
90 u32 mtu; 72 u32 mtu;
@@ -92,6 +74,14 @@ struct tipc_link_entry {
92 struct tipc_media_addr maddr; 74 struct tipc_media_addr maddr;
93}; 75};
94 76
77struct tipc_bclink_entry {
78 struct tipc_link *link;
79 struct sk_buff_head inputq1;
80 struct sk_buff_head arrvq;
81 struct sk_buff_head inputq2;
82 struct sk_buff_head namedq;
83};
84
95/** 85/**
96 * struct tipc_node - TIPC node structure 86 * struct tipc_node - TIPC node structure
97 * @addr: network address of node 87 * @addr: network address of node
@@ -104,7 +94,6 @@ struct tipc_link_entry {
104 * @active_links: bearer ids of active links, used as index into links[] array 94 * @active_links: bearer ids of active links, used as index into links[] array
105 * @links: array containing references to all links to node 95 * @links: array containing references to all links to node
106 * @action_flags: bit mask of different types of node actions 96 * @action_flags: bit mask of different types of node actions
107 * @bclink: broadcast-related info
108 * @state: connectivity state vs peer node 97 * @state: connectivity state vs peer node
109 * @sync_point: sequence number where synch/failover is finished 98 * @sync_point: sequence number where synch/failover is finished
110 * @list: links to adjacent nodes in sorted list of cluster's nodes 99 * @list: links to adjacent nodes in sorted list of cluster's nodes
@@ -124,8 +113,8 @@ struct tipc_node {
124 struct hlist_node hash; 113 struct hlist_node hash;
125 int active_links[2]; 114 int active_links[2];
126 struct tipc_link_entry links[MAX_BEARERS]; 115 struct tipc_link_entry links[MAX_BEARERS];
116 struct tipc_bclink_entry bc_entry;
127 int action_flags; 117 int action_flags;
128 struct tipc_node_bclink bclink;
129 struct list_head list; 118 struct list_head list;
130 int state; 119 int state;
131 u16 sync_point; 120 u16 sync_point;
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index 1060d52ff23e..552dbaba9cf3 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -689,13 +689,13 @@ static int tipc_sendmcast(struct socket *sock, struct tipc_name_seq *seq,
689 msg_set_hdr_sz(mhdr, MCAST_H_SIZE); 689 msg_set_hdr_sz(mhdr, MCAST_H_SIZE);
690 690
691new_mtu: 691new_mtu:
692 mtu = tipc_bclink_get_mtu(); 692 mtu = tipc_bcast_get_mtu(net);
693 rc = tipc_msg_build(mhdr, msg, 0, dsz, mtu, pktchain); 693 rc = tipc_msg_build(mhdr, msg, 0, dsz, mtu, pktchain);
694 if (unlikely(rc < 0)) 694 if (unlikely(rc < 0))
695 return rc; 695 return rc;
696 696
697 do { 697 do {
698 rc = tipc_bclink_xmit(net, pktchain); 698 rc = tipc_bcast_xmit(net, pktchain);
699 if (likely(!rc)) 699 if (likely(!rc))
700 return dsz; 700 return dsz;
701 701
diff --git a/net/tipc/udp_media.c b/net/tipc/udp_media.c
index 0021c01dec17..816914ef228d 100644
--- a/net/tipc/udp_media.c
+++ b/net/tipc/udp_media.c
@@ -155,14 +155,12 @@ static int tipc_udp_send_msg(struct net *net, struct sk_buff *skb,
155 struct udp_bearer *ub; 155 struct udp_bearer *ub;
156 struct udp_media_addr *dst = (struct udp_media_addr *)&dest->value; 156 struct udp_media_addr *dst = (struct udp_media_addr *)&dest->value;
157 struct udp_media_addr *src = (struct udp_media_addr *)&b->addr.value; 157 struct udp_media_addr *src = (struct udp_media_addr *)&b->addr.value;
158 struct sk_buff *clone;
159 struct rtable *rt; 158 struct rtable *rt;
160 159
161 if (skb_headroom(skb) < UDP_MIN_HEADROOM) 160 if (skb_headroom(skb) < UDP_MIN_HEADROOM)
162 pskb_expand_head(skb, UDP_MIN_HEADROOM, 0, GFP_ATOMIC); 161 pskb_expand_head(skb, UDP_MIN_HEADROOM, 0, GFP_ATOMIC);
163 162
164 clone = skb_clone(skb, GFP_ATOMIC); 163 skb_set_inner_protocol(skb, htons(ETH_P_TIPC));
165 skb_set_inner_protocol(clone, htons(ETH_P_TIPC));
166 ub = rcu_dereference_rtnl(b->media_ptr); 164 ub = rcu_dereference_rtnl(b->media_ptr);
167 if (!ub) { 165 if (!ub) {
168 err = -ENODEV; 166 err = -ENODEV;
@@ -172,7 +170,7 @@ static int tipc_udp_send_msg(struct net *net, struct sk_buff *skb,
172 struct flowi4 fl = { 170 struct flowi4 fl = {
173 .daddr = dst->ipv4.s_addr, 171 .daddr = dst->ipv4.s_addr,
174 .saddr = src->ipv4.s_addr, 172 .saddr = src->ipv4.s_addr,
175 .flowi4_mark = clone->mark, 173 .flowi4_mark = skb->mark,
176 .flowi4_proto = IPPROTO_UDP 174 .flowi4_proto = IPPROTO_UDP
177 }; 175 };
178 rt = ip_route_output_key(net, &fl); 176 rt = ip_route_output_key(net, &fl);
@@ -181,7 +179,7 @@ static int tipc_udp_send_msg(struct net *net, struct sk_buff *skb,
181 goto tx_error; 179 goto tx_error;
182 } 180 }
183 ttl = ip4_dst_hoplimit(&rt->dst); 181 ttl = ip4_dst_hoplimit(&rt->dst);
184 err = udp_tunnel_xmit_skb(rt, ub->ubsock->sk, clone, 182 err = udp_tunnel_xmit_skb(rt, ub->ubsock->sk, skb,
185 src->ipv4.s_addr, 183 src->ipv4.s_addr,
186 dst->ipv4.s_addr, 0, ttl, 0, 184 dst->ipv4.s_addr, 0, ttl, 0,
187 src->udp_port, dst->udp_port, 185 src->udp_port, dst->udp_port,
@@ -204,7 +202,7 @@ static int tipc_udp_send_msg(struct net *net, struct sk_buff *skb,
204 if (err) 202 if (err)
205 goto tx_error; 203 goto tx_error;
206 ttl = ip6_dst_hoplimit(ndst); 204 ttl = ip6_dst_hoplimit(ndst);
207 err = udp_tunnel6_xmit_skb(ndst, ub->ubsock->sk, clone, 205 err = udp_tunnel6_xmit_skb(ndst, ub->ubsock->sk, skb,
208 ndst->dev, &src->ipv6, 206 ndst->dev, &src->ipv6,
209 &dst->ipv6, 0, ttl, src->udp_port, 207 &dst->ipv6, 0, ttl, src->udp_port,
210 dst->udp_port, false); 208 dst->udp_port, false);
@@ -213,7 +211,7 @@ static int tipc_udp_send_msg(struct net *net, struct sk_buff *skb,
213 return err; 211 return err;
214 212
215tx_error: 213tx_error:
216 kfree_skb(clone); 214 kfree_skb(skb);
217 return err; 215 return err;
218} 216}
219 217