aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc
diff options
context:
space:
mode:
authorJon Paul Maloy <jon.maloy@ericsson.com>2015-10-22 08:51:48 -0400
committerDavid S. Miller <davem@davemloft.net>2015-10-24 09:56:47 -0400
commit2af5ae372a4b6d6e2d3314af0e9c865d6d64f8d3 (patch)
tree5da9c2e91de35b9111a3badb947416deba5083d8 /net/tipc
parentc49a0a84391bcc313b3dc2a9ceee6de684e07655 (diff)
tipc: clean up unused code and structures
After the previous changes in this series, we can now remove some unused code and structures, both in the broadcast, link aggregation and link code. There are no functional changes in this commit. Signed-off-by: Jon Maloy <jon.maloy@ericsson.com> Reviewed-by: Ying Xue <ying.xue@windriver.com> Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net/tipc')
-rw-r--r--net/tipc/bcast.c815
-rw-r--r--net/tipc/bcast.h15
-rw-r--r--net/tipc/bearer.c19
-rw-r--r--net/tipc/bearer.h2
-rw-r--r--net/tipc/core.h2
-rw-r--r--net/tipc/link.c222
-rw-r--r--net/tipc/link.h15
-rw-r--r--net/tipc/node.c14
-rw-r--r--net/tipc/node.h30
9 files changed, 30 insertions, 1104 deletions
diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index c012ce9e93c7..9dc239dfe192 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -43,72 +43,23 @@
43#include "link.h" 43#include "link.h"
44#include "node.h" 44#include "node.h"
45 45
46#define MAX_PKT_DEFAULT_MCAST 1500 /* bcast link max packet size (fixed) */
47#define BCLINK_WIN_DEFAULT 50 /* bcast link window size (default) */ 46#define BCLINK_WIN_DEFAULT 50 /* bcast link window size (default) */
48#define BCLINK_WIN_MIN 32 /* bcast minimum link window size */ 47#define BCLINK_WIN_MIN 32 /* bcast minimum link window size */
49 48
50const char tipc_bclink_name[] = "broadcast-link"; 49const char tipc_bclink_name[] = "broadcast-link";
51 50
52/** 51/**
53 * struct tipc_bcbearer_pair - a pair of bearers used by broadcast link 52 * struct tipc_bc_base - base structure for keeping broadcast send state
54 * @primary: pointer to primary bearer
55 * @secondary: pointer to secondary bearer
56 *
57 * Bearers must have same priority and same set of reachable destinations
58 * to be paired.
59 */
60
61struct tipc_bcbearer_pair {
62 struct tipc_bearer *primary;
63 struct tipc_bearer *secondary;
64};
65
66#define BCBEARER MAX_BEARERS
67
68/**
69 * struct tipc_bcbearer - bearer used by broadcast link
70 * @bearer: (non-standard) broadcast bearer structure
71 * @media: (non-standard) broadcast media structure
72 * @bpairs: array of bearer pairs
73 * @bpairs_temp: temporary array of bearer pairs used by tipc_bcbearer_sort()
74 * @remains: temporary node map used by tipc_bcbearer_send()
75 * @remains_new: temporary node map used tipc_bcbearer_send()
76 *
77 * Note: The fields labelled "temporary" are incorporated into the bearer
78 * to avoid consuming potentially limited stack space through the use of
79 * large local variables within multicast routines. Concurrent access is
80 * prevented through use of the spinlock "bcast_lock".
81 */
82struct tipc_bcbearer {
83 struct tipc_bearer bearer;
84 struct tipc_media media;
85 struct tipc_bcbearer_pair bpairs[MAX_BEARERS];
86 struct tipc_bcbearer_pair bpairs_temp[TIPC_MAX_LINK_PRI + 1];
87 struct tipc_node_map remains;
88 struct tipc_node_map remains_new;
89};
90
91/**
92 * struct tipc_bc_base - link used for broadcast messages
93 * @link: broadcast send link structure 53 * @link: broadcast send link structure
94 * @node: (non-standard) node structure representing b'cast link's peer node 54 * @inputq: data input queue; will only carry SOCK_WAKEUP messages
95 * @bcast_nodes: map of broadcast-capable nodes 55 * @dest: array keeping number of reachable destinations per bearer
96 * @retransmit_to: node that most recently requested a retransmit 56 * @primary_bearer: a bearer having links to all broadcast destinations, if any
97 * @dest_nnt: array indicating number of reachable destinations per bearer
98 * @bearers: array of bearers, sorted by number of reachable destinations
99 *
100 * Handles sequence numbering, fragmentation, bundling, etc.
101 */ 57 */
102struct tipc_bc_base { 58struct tipc_bc_base {
103 struct tipc_link *link; 59 struct tipc_link *link;
104 struct tipc_node node;
105 struct sk_buff_head arrvq;
106 struct sk_buff_head inputq; 60 struct sk_buff_head inputq;
107 struct sk_buff_head namedq;
108 int dests[MAX_BEARERS]; 61 int dests[MAX_BEARERS];
109 int primary_bearer; 62 int primary_bearer;
110 struct tipc_node_map bcast_nodes;
111 struct tipc_node *retransmit_to;
112}; 63};
113 64
114static struct tipc_bc_base *tipc_bc_base(struct net *net) 65static struct tipc_bc_base *tipc_bc_base(struct net *net)
@@ -116,58 +67,11 @@ static struct tipc_bc_base *tipc_bc_base(struct net *net)
116 return tipc_net(net)->bcbase; 67 return tipc_net(net)->bcbase;
117} 68}
118 69
119/**
120 * tipc_nmap_equal - test for equality of node maps
121 */
122static int tipc_nmap_equal(struct tipc_node_map *nm_a,
123 struct tipc_node_map *nm_b)
124{
125 return !memcmp(nm_a, nm_b, sizeof(*nm_a));
126}
127
128static void tipc_bcbearer_xmit(struct net *net, struct sk_buff_head *xmitq);
129static void tipc_nmap_diff(struct tipc_node_map *nm_a,
130 struct tipc_node_map *nm_b,
131 struct tipc_node_map *nm_diff);
132static void tipc_nmap_add(struct tipc_node_map *nm_ptr, u32 node);
133static void tipc_nmap_remove(struct tipc_node_map *nm_ptr, u32 node);
134static void tipc_bclink_lock(struct net *net)
135{
136 tipc_bcast_lock(net);
137}
138
139static void tipc_bclink_unlock(struct net *net)
140{
141 tipc_bcast_unlock(net);
142}
143
144void tipc_bclink_input(struct net *net)
145{
146 struct tipc_net *tn = net_generic(net, tipc_net_id);
147
148 tipc_sk_mcast_rcv(net, &tn->bcbase->arrvq, &tn->bcbase->inputq);
149}
150
151int tipc_bcast_get_mtu(struct net *net) 70int tipc_bcast_get_mtu(struct net *net)
152{ 71{
153 return tipc_link_mtu(tipc_bc_sndlink(net)); 72 return tipc_link_mtu(tipc_bc_sndlink(net));
154} 73}
155 74
156static u16 bcbuf_acks(struct sk_buff *skb)
157{
158 return TIPC_SKB_CB(skb)->ackers;
159}
160
161static void bcbuf_set_acks(struct sk_buff *buf, u16 ackers)
162{
163 TIPC_SKB_CB(buf)->ackers = ackers;
164}
165
166static void bcbuf_decr_acks(struct sk_buff *buf)
167{
168 bcbuf_set_acks(buf, bcbuf_acks(buf) - 1);
169}
170
171/* tipc_bcbase_select_primary(): find a bearer with links to all destinations, 75/* tipc_bcbase_select_primary(): find a bearer with links to all destinations,
172 * if any, and make it primary bearer 76 * if any, and make it primary bearer
173 */ 77 */
@@ -221,281 +125,6 @@ void tipc_bcast_dec_bearer_dst_cnt(struct net *net, int bearer_id)
221 tipc_bcast_unlock(net); 125 tipc_bcast_unlock(net);
222} 126}
223 127
224static void bclink_set_last_sent(struct net *net)
225{
226 struct tipc_net *tn = net_generic(net, tipc_net_id);
227 struct tipc_link *bcl = tn->bcl;
228
229 bcl->silent_intv_cnt = mod(bcl->snd_nxt - 1);
230}
231
232u32 tipc_bclink_get_last_sent(struct net *net)
233{
234 struct tipc_net *tn = net_generic(net, tipc_net_id);
235
236 return tn->bcl->silent_intv_cnt;
237}
238
239static void bclink_update_last_sent(struct tipc_node *node, u32 seqno)
240{
241 node->bclink.last_sent = less_eq(node->bclink.last_sent, seqno) ?
242 seqno : node->bclink.last_sent;
243}
244
245/**
246 * tipc_bclink_retransmit_to - get most recent node to request retransmission
247 *
248 * Called with bclink_lock locked
249 */
250struct tipc_node *tipc_bclink_retransmit_to(struct net *net)
251{
252 struct tipc_net *tn = net_generic(net, tipc_net_id);
253
254 return tn->bcbase->retransmit_to;
255}
256
257/**
258 * bclink_retransmit_pkt - retransmit broadcast packets
259 * @after: sequence number of last packet to *not* retransmit
260 * @to: sequence number of last packet to retransmit
261 *
262 * Called with bclink_lock locked
263 */
264static void bclink_retransmit_pkt(struct tipc_net *tn, u32 after, u32 to)
265{
266 struct sk_buff *skb;
267 struct tipc_link *bcl = tn->bcl;
268
269 skb_queue_walk(&bcl->transmq, skb) {
270 if (more(buf_seqno(skb), after)) {
271 tipc_link_retransmit(bcl, skb, mod(to - after));
272 break;
273 }
274 }
275}
276
277/**
278 * bclink_prepare_wakeup - prepare users for wakeup after congestion
279 * @bcl: broadcast link
280 * @resultq: queue for users which can be woken up
281 * Move a number of waiting users, as permitted by available space in
282 * the send queue, from link wait queue to specified queue for wakeup
283 */
284static void bclink_prepare_wakeup(struct tipc_link *bcl, struct sk_buff_head *resultq)
285{
286 int pnd[TIPC_SYSTEM_IMPORTANCE + 1] = {0,};
287 int imp, lim;
288 struct sk_buff *skb, *tmp;
289
290 skb_queue_walk_safe(&bcl->wakeupq, skb, tmp) {
291 imp = TIPC_SKB_CB(skb)->chain_imp;
292 lim = bcl->window + bcl->backlog[imp].limit;
293 pnd[imp] += TIPC_SKB_CB(skb)->chain_sz;
294 if ((pnd[imp] + bcl->backlog[imp].len) >= lim)
295 continue;
296 skb_unlink(skb, &bcl->wakeupq);
297 skb_queue_tail(resultq, skb);
298 }
299}
300
301/**
302 * tipc_bclink_wakeup_users - wake up pending users
303 *
304 * Called with no locks taken
305 */
306void tipc_bclink_wakeup_users(struct net *net)
307{
308 struct tipc_net *tn = net_generic(net, tipc_net_id);
309 struct tipc_link *bcl = tn->bcl;
310 struct sk_buff_head resultq;
311
312 skb_queue_head_init(&resultq);
313 bclink_prepare_wakeup(bcl, &resultq);
314 tipc_sk_rcv(net, &resultq);
315}
316
317/**
318 * tipc_bclink_acknowledge - handle acknowledgement of broadcast packets
319 * @n_ptr: node that sent acknowledgement info
320 * @acked: broadcast sequence # that has been acknowledged
321 *
322 * Node is locked, bclink_lock unlocked.
323 */
324void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked)
325{
326 struct sk_buff *skb, *tmp;
327 unsigned int released = 0;
328 struct net *net = n_ptr->net;
329 struct tipc_net *tn = net_generic(net, tipc_net_id);
330
331 if (unlikely(!n_ptr->bclink.recv_permitted))
332 return;
333 tipc_bclink_lock(net);
334
335 /* Bail out if tx queue is empty (no clean up is required) */
336 skb = skb_peek(&tn->bcl->transmq);
337 if (!skb)
338 goto exit;
339
340 /* Determine which messages need to be acknowledged */
341 if (acked == INVALID_LINK_SEQ) {
342 /*
343 * Contact with specified node has been lost, so need to
344 * acknowledge sent messages only (if other nodes still exist)
345 * or both sent and unsent messages (otherwise)
346 */
347 if (tn->bcbase->bcast_nodes.count)
348 acked = tn->bcl->silent_intv_cnt;
349 else
350 acked = tn->bcl->snd_nxt;
351 } else {
352 /*
353 * Bail out if specified sequence number does not correspond
354 * to a message that has been sent and not yet acknowledged
355 */
356 if (less(acked, buf_seqno(skb)) ||
357 less(tn->bcl->silent_intv_cnt, acked) ||
358 less_eq(acked, n_ptr->bclink.acked))
359 goto exit;
360 }
361 /* Skip over packets that node has previously acknowledged */
362 skb_queue_walk(&tn->bcl->transmq, skb) {
363 if (more(buf_seqno(skb), n_ptr->bclink.acked))
364 break;
365 }
366 /* Update packets that node is now acknowledging */
367 skb_queue_walk_from_safe(&tn->bcl->transmq, skb, tmp) {
368 if (more(buf_seqno(skb), acked))
369 break;
370 bcbuf_decr_acks(skb);
371 bclink_set_last_sent(net);
372 if (bcbuf_acks(skb) == 0) {
373 __skb_unlink(skb, &tn->bcl->transmq);
374 kfree_skb(skb);
375 released = 1;
376 }
377 }
378 n_ptr->bclink.acked = acked;
379
380 /* Try resolving broadcast link congestion, if necessary */
381 if (unlikely(skb_peek(&tn->bcl->backlogq))) {
382 tipc_link_push_packets(tn->bcl);
383 bclink_set_last_sent(net);
384 }
385 if (unlikely(released && !skb_queue_empty(&tn->bcl->wakeupq)))
386 n_ptr->action_flags |= TIPC_WAKEUP_BCAST_USERS;
387exit:
388 tipc_bclink_unlock(net);
389}
390
391/**
392 * tipc_bclink_update_link_state - update broadcast link state
393 *
394 * RCU and node lock set
395 */
396void tipc_bclink_update_link_state(struct tipc_node *n_ptr,
397 u32 last_sent)
398{
399 struct sk_buff *buf;
400 struct net *net = n_ptr->net;
401 struct tipc_net *tn = net_generic(net, tipc_net_id);
402 struct tipc_link *bcl = tn->bcl;
403
404 /* Ignore "stale" link state info */
405 if (less_eq(last_sent, n_ptr->bclink.last_in))
406 return;
407
408 /* Update link synchronization state; quit if in sync */
409 bclink_update_last_sent(n_ptr, last_sent);
410
411 /* This is a good location for statistical profiling */
412 bcl->stats.queue_sz_counts++;
413 bcl->stats.accu_queue_sz += skb_queue_len(&bcl->transmq);
414
415 if (n_ptr->bclink.last_sent == n_ptr->bclink.last_in)
416 return;
417
418 /* Update out-of-sync state; quit if loss is still unconfirmed */
419 if ((++n_ptr->bclink.oos_state) == 1) {
420 if (n_ptr->bclink.deferred_size < (TIPC_MIN_LINK_WIN / 2))
421 return;
422 n_ptr->bclink.oos_state++;
423 }
424
425 /* Don't NACK if one has been recently sent (or seen) */
426 if (n_ptr->bclink.oos_state & 0x1)
427 return;
428
429 /* Send NACK */
430 buf = tipc_buf_acquire(INT_H_SIZE);
431 if (buf) {
432 struct tipc_msg *msg = buf_msg(buf);
433 struct sk_buff *skb = skb_peek(&n_ptr->bclink.deferdq);
434 u32 to = skb ? buf_seqno(skb) - 1 : n_ptr->bclink.last_sent;
435
436 tipc_msg_init(tn->own_addr, msg, BCAST_PROTOCOL, STATE_MSG,
437 INT_H_SIZE, n_ptr->addr);
438 msg_set_non_seq(msg, 1);
439 msg_set_mc_netid(msg, tn->net_id);
440 msg_set_bcast_ack(msg, n_ptr->bclink.last_in);
441 msg_set_bcgap_after(msg, n_ptr->bclink.last_in);
442 msg_set_bcgap_to(msg, to);
443
444 tipc_bclink_lock(net);
445 tipc_bearer_send(net, MAX_BEARERS, buf, NULL);
446 tn->bcl->stats.sent_nacks++;
447 tipc_bclink_unlock(net);
448 kfree_skb(buf);
449
450 n_ptr->bclink.oos_state++;
451 }
452}
453
454void tipc_bclink_sync_state(struct tipc_node *n, struct tipc_msg *hdr)
455{
456 u16 last = msg_last_bcast(hdr);
457 int mtyp = msg_type(hdr);
458
459 if (unlikely(msg_user(hdr) != LINK_PROTOCOL))
460 return;
461 if (mtyp == STATE_MSG) {
462 tipc_bclink_update_link_state(n, last);
463 return;
464 }
465 /* Compatibility: older nodes don't know BCAST_PROTOCOL synchronization,
466 * and transfer synch info in LINK_PROTOCOL messages.
467 */
468 if (tipc_node_is_up(n))
469 return;
470 if ((mtyp != RESET_MSG) && (mtyp != ACTIVATE_MSG))
471 return;
472 n->bclink.last_sent = last;
473 n->bclink.last_in = last;
474 n->bclink.oos_state = 0;
475}
476
477/**
478 * bclink_peek_nack - monitor retransmission requests sent by other nodes
479 *
480 * Delay any upcoming NACK by this node if another node has already
481 * requested the first message this node is going to ask for.
482 */
483static void bclink_peek_nack(struct net *net, struct tipc_msg *msg)
484{
485 struct tipc_node *n_ptr = tipc_node_find(net, msg_destnode(msg));
486
487 if (unlikely(!n_ptr))
488 return;
489
490 tipc_node_lock(n_ptr);
491 if (n_ptr->bclink.recv_permitted &&
492 (n_ptr->bclink.last_in != n_ptr->bclink.last_sent) &&
493 (n_ptr->bclink.last_in == msg_bcgap_after(msg)))
494 n_ptr->bclink.oos_state = 2;
495 tipc_node_unlock(n_ptr);
496 tipc_node_put(n_ptr);
497}
498
499/* tipc_bcbase_xmit - broadcast a packet queue across one or more bearers 128/* tipc_bcbase_xmit - broadcast a packet queue across one or more bearers
500 * 129 *
501 * Note that number of reachable destinations, as indicated in the dests[] 130 * Note that number of reachable destinations, as indicated in the dests[]
@@ -703,333 +332,6 @@ void tipc_bcast_remove_peer(struct net *net, struct tipc_link *rcv_l)
703 tipc_sk_rcv(net, inputq); 332 tipc_sk_rcv(net, inputq);
704} 333}
705 334
706/**
707 * bclink_accept_pkt - accept an incoming, in-sequence broadcast packet
708 *
709 * Called with both sending node's lock and bclink_lock taken.
710 */
711static void bclink_accept_pkt(struct tipc_node *node, u32 seqno)
712{
713 struct tipc_net *tn = net_generic(node->net, tipc_net_id);
714
715 bclink_update_last_sent(node, seqno);
716 node->bclink.last_in = seqno;
717 node->bclink.oos_state = 0;
718 tn->bcl->stats.recv_info++;
719
720 /*
721 * Unicast an ACK periodically, ensuring that
722 * all nodes in the cluster don't ACK at the same time
723 */
724 if (((seqno - tn->own_addr) % TIPC_MIN_LINK_WIN) == 0) {
725 tipc_link_proto_xmit(node_active_link(node, node->addr),
726 STATE_MSG, 0, 0, 0, 0);
727 tn->bcl->stats.sent_acks++;
728 }
729}
730
731/**
732 * tipc_bclink_rcv - receive a broadcast packet, and deliver upwards
733 *
734 * RCU is locked, no other locks set
735 */
736void tipc_bclink_rcv(struct net *net, struct sk_buff *buf)
737{
738 struct tipc_net *tn = net_generic(net, tipc_net_id);
739 struct tipc_link *bcl = tn->bcl;
740 struct tipc_msg *msg = buf_msg(buf);
741 struct tipc_node *node;
742 u32 next_in;
743 u32 seqno;
744 int deferred = 0;
745 int pos = 0;
746 struct sk_buff *iskb;
747 struct sk_buff_head *arrvq, *inputq;
748
749 /* Screen out unwanted broadcast messages */
750 if (msg_mc_netid(msg) != tn->net_id)
751 goto exit;
752
753 node = tipc_node_find(net, msg_prevnode(msg));
754 if (unlikely(!node))
755 goto exit;
756 tipc_node_lock(node);
757 if (unlikely(!node->bclink.recv_permitted))
758 goto unlock;
759
760 /* Handle broadcast protocol message */
761 if (unlikely(msg_user(msg) == BCAST_PROTOCOL)) {
762 if (msg_type(msg) != STATE_MSG)
763 goto unlock;
764 if (msg_destnode(msg) == tn->own_addr) {
765 tipc_bclink_acknowledge(node, msg_bcast_ack(msg));
766 tipc_bclink_lock(net);
767 bcl->stats.recv_nacks++;
768 tn->bcbase->retransmit_to = node;
769 bclink_retransmit_pkt(tn, msg_bcgap_after(msg),
770 msg_bcgap_to(msg));
771 tipc_bclink_unlock(net);
772 tipc_node_unlock(node);
773 } else {
774 tipc_node_unlock(node);
775 bclink_peek_nack(net, msg);
776 }
777 tipc_node_put(node);
778 goto exit;
779 }
780 /* Handle in-sequence broadcast message */
781 seqno = msg_seqno(msg);
782 next_in = mod(node->bclink.last_in + 1);
783 arrvq = &tn->bcbase->arrvq;
784 inputq = &tn->bcbase->inputq;
785
786 if (likely(seqno == next_in)) {
787receive:
788 /* Deliver message to destination */
789 if (likely(msg_isdata(msg))) {
790 tipc_bclink_lock(net);
791 bclink_accept_pkt(node, seqno);
792 spin_lock_bh(&inputq->lock);
793 __skb_queue_tail(arrvq, buf);
794 spin_unlock_bh(&inputq->lock);
795 node->action_flags |= TIPC_BCAST_MSG_EVT;
796 tipc_bclink_unlock(net);
797 tipc_node_unlock(node);
798 } else if (msg_user(msg) == MSG_BUNDLER) {
799 tipc_bclink_lock(net);
800 bclink_accept_pkt(node, seqno);
801 bcl->stats.recv_bundles++;
802 bcl->stats.recv_bundled += msg_msgcnt(msg);
803 pos = 0;
804 while (tipc_msg_extract(buf, &iskb, &pos)) {
805 spin_lock_bh(&inputq->lock);
806 __skb_queue_tail(arrvq, iskb);
807 spin_unlock_bh(&inputq->lock);
808 }
809 node->action_flags |= TIPC_BCAST_MSG_EVT;
810 tipc_bclink_unlock(net);
811 tipc_node_unlock(node);
812 } else if (msg_user(msg) == MSG_FRAGMENTER) {
813 tipc_bclink_lock(net);
814 bclink_accept_pkt(node, seqno);
815 tipc_buf_append(&node->bclink.reasm_buf, &buf);
816 if (unlikely(!buf && !node->bclink.reasm_buf)) {
817 tipc_bclink_unlock(net);
818 goto unlock;
819 }
820 bcl->stats.recv_fragments++;
821 if (buf) {
822 bcl->stats.recv_fragmented++;
823 msg = buf_msg(buf);
824 tipc_bclink_unlock(net);
825 goto receive;
826 }
827 tipc_bclink_unlock(net);
828 tipc_node_unlock(node);
829 } else {
830 tipc_bclink_lock(net);
831 bclink_accept_pkt(node, seqno);
832 tipc_bclink_unlock(net);
833 tipc_node_unlock(node);
834 kfree_skb(buf);
835 }
836 buf = NULL;
837
838 /* Determine new synchronization state */
839 tipc_node_lock(node);
840 if (unlikely(!tipc_node_is_up(node)))
841 goto unlock;
842
843 if (node->bclink.last_in == node->bclink.last_sent)
844 goto unlock;
845
846 if (skb_queue_empty(&node->bclink.deferdq)) {
847 node->bclink.oos_state = 1;
848 goto unlock;
849 }
850
851 msg = buf_msg(skb_peek(&node->bclink.deferdq));
852 seqno = msg_seqno(msg);
853 next_in = mod(next_in + 1);
854 if (seqno != next_in)
855 goto unlock;
856
857 /* Take in-sequence message from deferred queue & deliver it */
858 buf = __skb_dequeue(&node->bclink.deferdq);
859 goto receive;
860 }
861
862 /* Handle out-of-sequence broadcast message */
863 if (less(next_in, seqno)) {
864 deferred = tipc_link_defer_pkt(&node->bclink.deferdq,
865 buf);
866 bclink_update_last_sent(node, seqno);
867 buf = NULL;
868 }
869
870 tipc_bclink_lock(net);
871
872 if (deferred)
873 bcl->stats.deferred_recv++;
874 else
875 bcl->stats.duplicates++;
876
877 tipc_bclink_unlock(net);
878
879unlock:
880 tipc_node_unlock(node);
881 tipc_node_put(node);
882exit:
883 kfree_skb(buf);
884}
885
886u32 tipc_bclink_acks_missing(struct tipc_node *n_ptr)
887{
888 return (n_ptr->bclink.recv_permitted &&
889 (tipc_bclink_get_last_sent(n_ptr->net) != n_ptr->bclink.acked));
890}
891
892
893/**
894 * tipc_bcbearer_send - send a packet through the broadcast pseudo-bearer
895 *
896 * Send packet over as many bearers as necessary to reach all nodes
897 * that have joined the broadcast link.
898 *
899 * Returns 0 (packet sent successfully) under all circumstances,
900 * since the broadcast link's pseudo-bearer never blocks
901 */
902static int tipc_bcbearer_send(struct net *net, struct sk_buff *buf,
903 struct tipc_bearer *unused1,
904 struct tipc_media_addr *unused2)
905{
906 int bp_index;
907 struct tipc_msg *msg = buf_msg(buf);
908 struct tipc_net *tn = net_generic(net, tipc_net_id);
909 struct tipc_bcbearer *bcbearer = tn->bcbearer;
910 struct tipc_bc_base *bclink = tn->bcbase;
911
912 /* Prepare broadcast link message for reliable transmission,
913 * if first time trying to send it;
914 * preparation is skipped for broadcast link protocol messages
915 * since they are sent in an unreliable manner and don't need it
916 */
917 if (likely(!msg_non_seq(buf_msg(buf)))) {
918 bcbuf_set_acks(buf, bclink->bcast_nodes.count);
919 msg_set_non_seq(msg, 1);
920 msg_set_mc_netid(msg, tn->net_id);
921 tn->bcl->stats.sent_info++;
922 if (WARN_ON(!bclink->bcast_nodes.count)) {
923 dump_stack();
924 return 0;
925 }
926 }
927 msg_set_mc_netid(msg, tn->net_id);
928
929 /* Send buffer over bearers until all targets reached */
930 bcbearer->remains = bclink->bcast_nodes;
931
932 for (bp_index = 0; bp_index < MAX_BEARERS; bp_index++) {
933 struct tipc_bearer *p = bcbearer->bpairs[bp_index].primary;
934 struct tipc_bearer *s = bcbearer->bpairs[bp_index].secondary;
935 struct tipc_bearer *bp[2] = {p, s};
936 struct tipc_bearer *b = bp[msg_link_selector(msg)];
937 struct sk_buff *tbuf;
938
939 if (!p)
940 break; /* No more bearers to try */
941 if (!b)
942 b = p;
943 tipc_nmap_diff(&bcbearer->remains, &b->nodes,
944 &bcbearer->remains_new);
945 if (bcbearer->remains_new.count == bcbearer->remains.count)
946 continue; /* Nothing added by bearer pair */
947
948 if (bp_index == 0) {
949 /* Use original buffer for first bearer */
950 tipc_bearer_send(net, b->identity, buf, &b->bcast_addr);
951 } else {
952 /* Avoid concurrent buffer access */
953 tbuf = pskb_copy_for_clone(buf, GFP_ATOMIC);
954 if (!tbuf)
955 break;
956 tipc_bearer_send(net, b->identity, tbuf,
957 &b->bcast_addr);
958 kfree_skb(tbuf); /* Bearer keeps a clone */
959 }
960 if (bcbearer->remains_new.count == 0)
961 break; /* All targets reached */
962
963 bcbearer->remains = bcbearer->remains_new;
964 }
965
966 return 0;
967}
968
969/**
970 * tipc_bcbearer_sort - create sets of bearer pairs used by broadcast bearer
971 */
972void tipc_bcbearer_sort(struct net *net, struct tipc_node_map *nm_ptr,
973 u32 node, bool action)
974{
975 struct tipc_net *tn = net_generic(net, tipc_net_id);
976 struct tipc_bcbearer *bcbearer = tn->bcbearer;
977 struct tipc_bcbearer_pair *bp_temp = bcbearer->bpairs_temp;
978 struct tipc_bcbearer_pair *bp_curr;
979 struct tipc_bearer *b;
980 int b_index;
981 int pri;
982
983 tipc_bclink_lock(net);
984
985 if (action)
986 tipc_nmap_add(nm_ptr, node);
987 else
988 tipc_nmap_remove(nm_ptr, node);
989
990 /* Group bearers by priority (can assume max of two per priority) */
991 memset(bp_temp, 0, sizeof(bcbearer->bpairs_temp));
992
993 rcu_read_lock();
994 for (b_index = 0; b_index < MAX_BEARERS; b_index++) {
995 b = rcu_dereference_rtnl(tn->bearer_list[b_index]);
996 if (!b || !b->nodes.count)
997 continue;
998
999 if (!bp_temp[b->priority].primary)
1000 bp_temp[b->priority].primary = b;
1001 else
1002 bp_temp[b->priority].secondary = b;
1003 }
1004 rcu_read_unlock();
1005
1006 /* Create array of bearer pairs for broadcasting */
1007 bp_curr = bcbearer->bpairs;
1008 memset(bcbearer->bpairs, 0, sizeof(bcbearer->bpairs));
1009
1010 for (pri = TIPC_MAX_LINK_PRI; pri >= 0; pri--) {
1011
1012 if (!bp_temp[pri].primary)
1013 continue;
1014
1015 bp_curr->primary = bp_temp[pri].primary;
1016
1017 if (bp_temp[pri].secondary) {
1018 if (tipc_nmap_equal(&bp_temp[pri].primary->nodes,
1019 &bp_temp[pri].secondary->nodes)) {
1020 bp_curr->secondary = bp_temp[pri].secondary;
1021 } else {
1022 bp_curr++;
1023 bp_curr->primary = bp_temp[pri].secondary;
1024 }
1025 }
1026
1027 bp_curr++;
1028 }
1029
1030 tipc_bclink_unlock(net);
1031}
1032
1033static int __tipc_nl_add_bc_link_stat(struct sk_buff *skb, 335static int __tipc_nl_add_bc_link_stat(struct sk_buff *skb,
1034 struct tipc_stats *stats) 336 struct tipc_stats *stats)
1035{ 337{
@@ -1093,7 +395,7 @@ int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg)
1093 if (!bcl) 395 if (!bcl)
1094 return 0; 396 return 0;
1095 397
1096 tipc_bclink_lock(net); 398 tipc_bcast_lock(net);
1097 399
1098 hdr = genlmsg_put(msg->skb, msg->portid, msg->seq, &tipc_genl_family, 400 hdr = genlmsg_put(msg->skb, msg->portid, msg->seq, &tipc_genl_family,
1099 NLM_F_MULTI, TIPC_NL_LINK_GET); 401 NLM_F_MULTI, TIPC_NL_LINK_GET);
@@ -1128,7 +430,7 @@ int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg)
1128 if (err) 430 if (err)
1129 goto attr_msg_full; 431 goto attr_msg_full;
1130 432
1131 tipc_bclink_unlock(net); 433 tipc_bcast_unlock(net);
1132 nla_nest_end(msg->skb, attrs); 434 nla_nest_end(msg->skb, attrs);
1133 genlmsg_end(msg->skb, hdr); 435 genlmsg_end(msg->skb, hdr);
1134 436
@@ -1139,7 +441,7 @@ prop_msg_full:
1139attr_msg_full: 441attr_msg_full:
1140 nla_nest_cancel(msg->skb, attrs); 442 nla_nest_cancel(msg->skb, attrs);
1141msg_full: 443msg_full:
1142 tipc_bclink_unlock(net); 444 tipc_bcast_unlock(net);
1143 genlmsg_cancel(msg->skb, hdr); 445 genlmsg_cancel(msg->skb, hdr);
1144 446
1145 return -EMSGSIZE; 447 return -EMSGSIZE;
@@ -1153,26 +455,25 @@ int tipc_bclink_reset_stats(struct net *net)
1153 if (!bcl) 455 if (!bcl)
1154 return -ENOPROTOOPT; 456 return -ENOPROTOOPT;
1155 457
1156 tipc_bclink_lock(net); 458 tipc_bcast_lock(net);
1157 memset(&bcl->stats, 0, sizeof(bcl->stats)); 459 memset(&bcl->stats, 0, sizeof(bcl->stats));
1158 tipc_bclink_unlock(net); 460 tipc_bcast_unlock(net);
1159 return 0; 461 return 0;
1160} 462}
1161 463
1162int tipc_bclink_set_queue_limits(struct net *net, u32 limit) 464static int tipc_bc_link_set_queue_limits(struct net *net, u32 limit)
1163{ 465{
1164 struct tipc_net *tn = net_generic(net, tipc_net_id); 466 struct tipc_link *l = tipc_bc_sndlink(net);
1165 struct tipc_link *bcl = tn->bcl;
1166 467
1167 if (!bcl) 468 if (!l)
1168 return -ENOPROTOOPT; 469 return -ENOPROTOOPT;
1169 if (limit < BCLINK_WIN_MIN) 470 if (limit < BCLINK_WIN_MIN)
1170 limit = BCLINK_WIN_MIN; 471 limit = BCLINK_WIN_MIN;
1171 if (limit > TIPC_MAX_LINK_WIN) 472 if (limit > TIPC_MAX_LINK_WIN)
1172 return -EINVAL; 473 return -EINVAL;
1173 tipc_bclink_lock(net); 474 tipc_bcast_lock(net);
1174 tipc_link_set_queue_limits(bcl, limit); 475 tipc_link_set_queue_limits(l, limit);
1175 tipc_bclink_unlock(net); 476 tipc_bcast_unlock(net);
1176 return 0; 477 return 0;
1177} 478}
1178 479
@@ -1194,53 +495,34 @@ int tipc_nl_bc_link_set(struct net *net, struct nlattr *attrs[])
1194 495
1195 win = nla_get_u32(props[TIPC_NLA_PROP_WIN]); 496 win = nla_get_u32(props[TIPC_NLA_PROP_WIN]);
1196 497
1197 return tipc_bclink_set_queue_limits(net, win); 498 return tipc_bc_link_set_queue_limits(net, win);
1198} 499}
1199 500
1200int tipc_bcast_init(struct net *net) 501int tipc_bcast_init(struct net *net)
1201{ 502{
1202 struct tipc_net *tn = tipc_net(net); 503 struct tipc_net *tn = tipc_net(net);
1203 struct tipc_bcbearer *bcb = NULL;
1204 struct tipc_bc_base *bb = NULL; 504 struct tipc_bc_base *bb = NULL;
1205 struct tipc_link *l = NULL; 505 struct tipc_link *l = NULL;
1206 506
1207 bcb = kzalloc(sizeof(*bcb), GFP_ATOMIC);
1208 if (!bcb)
1209 goto enomem;
1210 tn->bcbearer = bcb;
1211
1212 bcb->bearer.window = BCLINK_WIN_DEFAULT;
1213 bcb->bearer.mtu = MAX_PKT_DEFAULT_MCAST;
1214 bcb->bearer.identity = MAX_BEARERS;
1215
1216 bcb->bearer.media = &bcb->media;
1217 bcb->media.send_msg = tipc_bcbearer_send;
1218 sprintf(bcb->media.name, "tipc-broadcast");
1219 strcpy(bcb->bearer.name, bcb->media.name);
1220
1221 bb = kzalloc(sizeof(*bb), GFP_ATOMIC); 507 bb = kzalloc(sizeof(*bb), GFP_ATOMIC);
1222 if (!bb) 508 if (!bb)
1223 goto enomem; 509 goto enomem;
1224 tn->bcbase = bb; 510 tn->bcbase = bb;
1225 __skb_queue_head_init(&bb->arrvq);
1226 spin_lock_init(&tipc_net(net)->bclock); 511 spin_lock_init(&tipc_net(net)->bclock);
1227 bb->node.net = net;
1228 512
1229 if (!tipc_link_bc_create(net, 0, 0, 513 if (!tipc_link_bc_create(net, 0, 0,
1230 U16_MAX, 514 U16_MAX,
1231 BCLINK_WIN_DEFAULT, 515 BCLINK_WIN_DEFAULT,
1232 0, 516 0,
1233 &bb->inputq, 517 &bb->inputq,
1234 &bb->namedq, 518 NULL,
1235 NULL, 519 NULL,
1236 &l)) 520 &l))
1237 goto enomem; 521 goto enomem;
1238 bb->link = l; 522 bb->link = l;
1239 tn->bcl = l; 523 tn->bcl = l;
1240 rcu_assign_pointer(tn->bearer_list[MAX_BEARERS], &bcb->bearer);
1241 return 0; 524 return 0;
1242enomem: 525enomem:
1243 kfree(bcb);
1244 kfree(bb); 526 kfree(bb);
1245 kfree(l); 527 kfree(l);
1246 return -ENOMEM; 528 return -ENOMEM;
@@ -1257,70 +539,7 @@ void tipc_bcast_stop(struct net *net)
1257{ 539{
1258 struct tipc_net *tn = net_generic(net, tipc_net_id); 540 struct tipc_net *tn = net_generic(net, tipc_net_id);
1259 541
1260 tipc_bclink_lock(net);
1261 tipc_link_purge_queues(tn->bcl);
1262 tipc_bclink_unlock(net);
1263 RCU_INIT_POINTER(tn->bearer_list[BCBEARER], NULL);
1264 synchronize_net(); 542 synchronize_net();
1265 kfree(tn->bcbearer);
1266 kfree(tn->bcbase); 543 kfree(tn->bcbase);
1267 kfree(tn->bcl); 544 kfree(tn->bcl);
1268} 545}
1269
1270/**
1271 * tipc_nmap_add - add a node to a node map
1272 */
1273static void tipc_nmap_add(struct tipc_node_map *nm_ptr, u32 node)
1274{
1275 int n = tipc_node(node);
1276 int w = n / WSIZE;
1277 u32 mask = (1 << (n % WSIZE));
1278
1279 if ((nm_ptr->map[w] & mask) == 0) {
1280 nm_ptr->count++;
1281 nm_ptr->map[w] |= mask;
1282 }
1283}
1284
1285/**
1286 * tipc_nmap_remove - remove a node from a node map
1287 */
1288static void tipc_nmap_remove(struct tipc_node_map *nm_ptr, u32 node)
1289{
1290 int n = tipc_node(node);
1291 int w = n / WSIZE;
1292 u32 mask = (1 << (n % WSIZE));
1293
1294 if ((nm_ptr->map[w] & mask) != 0) {
1295 nm_ptr->map[w] &= ~mask;
1296 nm_ptr->count--;
1297 }
1298}
1299
1300/**
1301 * tipc_nmap_diff - find differences between node maps
1302 * @nm_a: input node map A
1303 * @nm_b: input node map B
1304 * @nm_diff: output node map A-B (i.e. nodes of A that are not in B)
1305 */
1306static void tipc_nmap_diff(struct tipc_node_map *nm_a,
1307 struct tipc_node_map *nm_b,
1308 struct tipc_node_map *nm_diff)
1309{
1310 int stop = ARRAY_SIZE(nm_a->map);
1311 int w;
1312 int b;
1313 u32 map;
1314
1315 memset(nm_diff, 0, sizeof(*nm_diff));
1316 for (w = 0; w < stop; w++) {
1317 map = nm_a->map[w] ^ (nm_a->map[w] & nm_b->map[w]);
1318 nm_diff->map[w] = map;
1319 if (map != 0) {
1320 for (b = 0 ; b < WSIZE; b++) {
1321 if (map & (1 << b))
1322 nm_diff->count++;
1323 }
1324 }
1325 }
1326}
diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h
index 0cc72200f1cd..2855b9356a15 100644
--- a/net/tipc/bcast.h
+++ b/net/tipc/bcast.h
@@ -52,28 +52,15 @@ void tipc_bcast_add_peer(struct net *net, struct tipc_link *l,
52void tipc_bcast_remove_peer(struct net *net, struct tipc_link *rcv_bcl); 52void tipc_bcast_remove_peer(struct net *net, struct tipc_link *rcv_bcl);
53void tipc_bcast_inc_bearer_dst_cnt(struct net *net, int bearer_id); 53void tipc_bcast_inc_bearer_dst_cnt(struct net *net, int bearer_id);
54void tipc_bcast_dec_bearer_dst_cnt(struct net *net, int bearer_id); 54void tipc_bcast_dec_bearer_dst_cnt(struct net *net, int bearer_id);
55struct tipc_node *tipc_bclink_retransmit_to(struct net *tn);
56void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked);
57void tipc_bclink_rcv(struct net *net, struct sk_buff *buf);
58u32 tipc_bclink_get_last_sent(struct net *net);
59u32 tipc_bclink_acks_missing(struct tipc_node *n_ptr);
60void tipc_bclink_update_link_state(struct tipc_node *node,
61 u32 last_sent);
62void tipc_bcbearer_sort(struct net *net, struct tipc_node_map *nm_ptr,
63 u32 node, bool action);
64int tipc_bclink_reset_stats(struct net *net);
65int tipc_bclink_set_queue_limits(struct net *net, u32 limit);
66int tipc_bcast_get_mtu(struct net *net); 55int tipc_bcast_get_mtu(struct net *net);
67int tipc_bcast_xmit(struct net *net, struct sk_buff_head *list); 56int tipc_bcast_xmit(struct net *net, struct sk_buff_head *list);
68int tipc_bcast_rcv(struct net *net, struct tipc_link *l, struct sk_buff *skb); 57int tipc_bcast_rcv(struct net *net, struct tipc_link *l, struct sk_buff *skb);
69void tipc_bcast_ack_rcv(struct net *net, struct tipc_link *l, u32 acked); 58void tipc_bcast_ack_rcv(struct net *net, struct tipc_link *l, u32 acked);
70void tipc_bcast_sync_rcv(struct net *net, struct tipc_link *l, 59void tipc_bcast_sync_rcv(struct net *net, struct tipc_link *l,
71 struct tipc_msg *hdr); 60 struct tipc_msg *hdr);
72void tipc_bclink_wakeup_users(struct net *net);
73int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg); 61int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg);
74int tipc_nl_bc_link_set(struct net *net, struct nlattr *attrs[]); 62int tipc_nl_bc_link_set(struct net *net, struct nlattr *attrs[]);
75void tipc_bclink_input(struct net *net); 63int tipc_bclink_reset_stats(struct net *net);
76void tipc_bclink_sync_state(struct tipc_node *n, struct tipc_msg *msg);
77 64
78static inline void tipc_bcast_lock(struct net *net) 65static inline void tipc_bcast_lock(struct net *net)
79{ 66{
diff --git a/net/tipc/bearer.c b/net/tipc/bearer.c
index c3fa13a92ac8..648f2a67f314 100644
--- a/net/tipc/bearer.c
+++ b/net/tipc/bearer.c
@@ -440,25 +440,6 @@ int tipc_l2_send_msg(struct net *net, struct sk_buff *skb,
440 return 0; 440 return 0;
441} 441}
442 442
443/* tipc_bearer_send- sends buffer to destination over bearer
444 *
445 * IMPORTANT:
446 * The media send routine must not alter the buffer being passed in
447 * as it may be needed for later retransmission!
448 */
449void tipc_bearer_send(struct net *net, u32 bearer_id, struct sk_buff *buf,
450 struct tipc_media_addr *dest)
451{
452 struct tipc_net *tn = net_generic(net, tipc_net_id);
453 struct tipc_bearer *b_ptr;
454
455 rcu_read_lock();
456 b_ptr = rcu_dereference_rtnl(tn->bearer_list[bearer_id]);
457 if (likely(b_ptr))
458 b_ptr->media->send_msg(net, buf, b_ptr, dest);
459 rcu_read_unlock();
460}
461
462int tipc_bearer_mtu(struct net *net, u32 bearer_id) 443int tipc_bearer_mtu(struct net *net, u32 bearer_id)
463{ 444{
464 int mtu = 0; 445 int mtu = 0;
diff --git a/net/tipc/bearer.h b/net/tipc/bearer.h
index 5a600e1b197f..552185bc4773 100644
--- a/net/tipc/bearer.h
+++ b/net/tipc/bearer.h
@@ -216,8 +216,6 @@ struct tipc_media *tipc_media_find(const char *name);
216int tipc_bearer_setup(void); 216int tipc_bearer_setup(void);
217void tipc_bearer_cleanup(void); 217void tipc_bearer_cleanup(void);
218void tipc_bearer_stop(struct net *net); 218void tipc_bearer_stop(struct net *net);
219void tipc_bearer_send(struct net *net, u32 bearer_id, struct sk_buff *buf,
220 struct tipc_media_addr *dest);
221int tipc_bearer_mtu(struct net *net, u32 bearer_id); 219int tipc_bearer_mtu(struct net *net, u32 bearer_id);
222void tipc_bearer_xmit_skb(struct net *net, u32 bearer_id, 220void tipc_bearer_xmit_skb(struct net *net, u32 bearer_id,
223 struct sk_buff *skb, 221 struct sk_buff *skb,
diff --git a/net/tipc/core.h b/net/tipc/core.h
index 6d589aaad054..18e95a8020cd 100644
--- a/net/tipc/core.h
+++ b/net/tipc/core.h
@@ -62,7 +62,6 @@
62 62
63struct tipc_node; 63struct tipc_node;
64struct tipc_bearer; 64struct tipc_bearer;
65struct tipc_bcbearer;
66struct tipc_bc_base; 65struct tipc_bc_base;
67struct tipc_link; 66struct tipc_link;
68struct tipc_name_table; 67struct tipc_name_table;
@@ -94,7 +93,6 @@ struct tipc_net {
94 93
95 /* Broadcast link */ 94 /* Broadcast link */
96 spinlock_t bclock; 95 spinlock_t bclock;
97 struct tipc_bcbearer *bcbearer;
98 struct tipc_bc_base *bcbase; 96 struct tipc_bc_base *bcbase;
99 struct tipc_link *bcl; 97 struct tipc_link *bcl;
100 98
diff --git a/net/tipc/link.c b/net/tipc/link.c
index 819fb7163fa2..4449fa01e232 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -244,7 +244,6 @@ static u32 link_own_addr(struct tipc_link *l)
244 * @ownnode: identity of own node 244 * @ownnode: identity of own node
245 * @peer: node id of peer node 245 * @peer: node id of peer node
246 * @peer_caps: bitmap describing peer node capabilities 246 * @peer_caps: bitmap describing peer node capabilities
247 * @maddr: media address to be used
248 * @bc_sndlink: the namespace global link used for broadcast sending 247 * @bc_sndlink: the namespace global link used for broadcast sending
249 * @bc_rcvlink: the peer specific link used for broadcast reception 248 * @bc_rcvlink: the peer specific link used for broadcast reception
250 * @inputq: queue to put messages ready for delivery 249 * @inputq: queue to put messages ready for delivery
@@ -257,7 +256,6 @@ bool tipc_link_create(struct net *net, char *if_name, int bearer_id,
257 int tolerance, char net_plane, u32 mtu, int priority, 256 int tolerance, char net_plane, u32 mtu, int priority,
258 int window, u32 session, u32 ownnode, u32 peer, 257 int window, u32 session, u32 ownnode, u32 peer,
259 u16 peer_caps, 258 u16 peer_caps,
260 struct tipc_media_addr *maddr,
261 struct tipc_link *bc_sndlink, 259 struct tipc_link *bc_sndlink,
262 struct tipc_link *bc_rcvlink, 260 struct tipc_link *bc_rcvlink,
263 struct sk_buff_head *inputq, 261 struct sk_buff_head *inputq,
@@ -286,7 +284,6 @@ bool tipc_link_create(struct net *net, char *if_name, int bearer_id,
286 284
287 l->addr = peer; 285 l->addr = peer;
288 l->peer_caps = peer_caps; 286 l->peer_caps = peer_caps;
289 l->media_addr = maddr;
290 l->net = net; 287 l->net = net;
291 l->peer_session = WILDCARD_SESSION; 288 l->peer_session = WILDCARD_SESSION;
292 l->bearer_id = bearer_id; 289 l->bearer_id = bearer_id;
@@ -331,7 +328,7 @@ bool tipc_link_bc_create(struct net *net, u32 ownnode, u32 peer,
331 struct tipc_link *l; 328 struct tipc_link *l;
332 329
333 if (!tipc_link_create(net, "", MAX_BEARERS, 0, 'Z', mtu, 0, window, 330 if (!tipc_link_create(net, "", MAX_BEARERS, 0, 'Z', mtu, 0, window,
334 0, ownnode, peer, peer_caps, NULL, bc_sndlink, 331 0, ownnode, peer, peer_caps, bc_sndlink,
335 NULL, inputq, namedq, link)) 332 NULL, inputq, namedq, link))
336 return false; 333 return false;
337 334
@@ -662,38 +659,6 @@ void link_prepare_wakeup(struct tipc_link *l)
662 } 659 }
663} 660}
664 661
665/**
666 * tipc_link_reset_fragments - purge link's inbound message fragments queue
667 * @l_ptr: pointer to link
668 */
669void tipc_link_reset_fragments(struct tipc_link *l_ptr)
670{
671 kfree_skb(l_ptr->reasm_buf);
672 l_ptr->reasm_buf = NULL;
673}
674
675void tipc_link_purge_backlog(struct tipc_link *l)
676{
677 __skb_queue_purge(&l->backlogq);
678 l->backlog[TIPC_LOW_IMPORTANCE].len = 0;
679 l->backlog[TIPC_MEDIUM_IMPORTANCE].len = 0;
680 l->backlog[TIPC_HIGH_IMPORTANCE].len = 0;
681 l->backlog[TIPC_CRITICAL_IMPORTANCE].len = 0;
682 l->backlog[TIPC_SYSTEM_IMPORTANCE].len = 0;
683}
684
685/**
686 * tipc_link_purge_queues - purge all pkt queues associated with link
687 * @l_ptr: pointer to link
688 */
689void tipc_link_purge_queues(struct tipc_link *l_ptr)
690{
691 __skb_queue_purge(&l_ptr->deferdq);
692 __skb_queue_purge(&l_ptr->transmq);
693 tipc_link_purge_backlog(l_ptr);
694 tipc_link_reset_fragments(l_ptr);
695}
696
697void tipc_link_reset(struct tipc_link *l) 662void tipc_link_reset(struct tipc_link *l)
698{ 663{
699 /* Link is down, accept any session */ 664 /* Link is down, accept any session */
@@ -705,12 +670,16 @@ void tipc_link_reset(struct tipc_link *l)
705 /* Prepare for renewed mtu size negotiation */ 670 /* Prepare for renewed mtu size negotiation */
706 l->mtu = l->advertised_mtu; 671 l->mtu = l->advertised_mtu;
707 672
708 /* Clean up all queues: */ 673 /* Clean up all queues and counters: */
709 __skb_queue_purge(&l->transmq); 674 __skb_queue_purge(&l->transmq);
710 __skb_queue_purge(&l->deferdq); 675 __skb_queue_purge(&l->deferdq);
711 skb_queue_splice_init(&l->wakeupq, l->inputq); 676 skb_queue_splice_init(&l->wakeupq, l->inputq);
712 677 __skb_queue_purge(&l->backlogq);
713 tipc_link_purge_backlog(l); 678 l->backlog[TIPC_LOW_IMPORTANCE].len = 0;
679 l->backlog[TIPC_MEDIUM_IMPORTANCE].len = 0;
680 l->backlog[TIPC_HIGH_IMPORTANCE].len = 0;
681 l->backlog[TIPC_CRITICAL_IMPORTANCE].len = 0;
682 l->backlog[TIPC_SYSTEM_IMPORTANCE].len = 0;
714 kfree_skb(l->reasm_buf); 683 kfree_skb(l->reasm_buf);
715 kfree_skb(l->failover_reasm_skb); 684 kfree_skb(l->failover_reasm_skb);
716 l->reasm_buf = NULL; 685 l->reasm_buf = NULL;
@@ -727,74 +696,6 @@ void tipc_link_reset(struct tipc_link *l)
727} 696}
728 697
729/** 698/**
730 * __tipc_link_xmit(): same as tipc_link_xmit, but destlink is known & locked
731 * @link: link to use
732 * @list: chain of buffers containing message
733 *
734 * Consumes the buffer chain, except when returning an error code,
735 * Returns 0 if success, or errno: -ELINKCONG, -EMSGSIZE or -ENOBUFS
736 * Messages at TIPC_SYSTEM_IMPORTANCE are always accepted
737 */
738int __tipc_link_xmit(struct net *net, struct tipc_link *link,
739 struct sk_buff_head *list)
740{
741 struct tipc_msg *msg = buf_msg(skb_peek(list));
742 unsigned int maxwin = link->window;
743 unsigned int i, imp = msg_importance(msg);
744 uint mtu = link->mtu;
745 u16 ack = mod(link->rcv_nxt - 1);
746 u16 seqno = link->snd_nxt;
747 u16 bc_ack = link->bc_rcvlink->rcv_nxt - 1;
748 struct tipc_media_addr *addr = link->media_addr;
749 struct sk_buff_head *transmq = &link->transmq;
750 struct sk_buff_head *backlogq = &link->backlogq;
751 struct sk_buff *skb, *bskb;
752
753 /* Match msg importance against this and all higher backlog limits: */
754 for (i = imp; i <= TIPC_SYSTEM_IMPORTANCE; i++) {
755 if (unlikely(link->backlog[i].len >= link->backlog[i].limit))
756 return link_schedule_user(link, list);
757 }
758 if (unlikely(msg_size(msg) > mtu))
759 return -EMSGSIZE;
760
761 /* Prepare each packet for sending, and add to relevant queue: */
762 while (skb_queue_len(list)) {
763 skb = skb_peek(list);
764 msg = buf_msg(skb);
765 msg_set_seqno(msg, seqno);
766 msg_set_ack(msg, ack);
767 msg_set_bcast_ack(msg, bc_ack);
768
769 if (likely(skb_queue_len(transmq) < maxwin)) {
770 __skb_dequeue(list);
771 __skb_queue_tail(transmq, skb);
772 tipc_bearer_send(net, link->bearer_id, skb, addr);
773 link->rcv_unacked = 0;
774 seqno++;
775 continue;
776 }
777 if (tipc_msg_bundle(skb_peek_tail(backlogq), msg, mtu)) {
778 kfree_skb(__skb_dequeue(list));
779 link->stats.sent_bundled++;
780 continue;
781 }
782 if (tipc_msg_make_bundle(&bskb, msg, mtu, link->addr)) {
783 kfree_skb(__skb_dequeue(list));
784 __skb_queue_tail(backlogq, bskb);
785 link->backlog[msg_importance(buf_msg(bskb))].len++;
786 link->stats.sent_bundled++;
787 link->stats.sent_bundles++;
788 continue;
789 }
790 link->backlog[imp].len += skb_queue_len(list);
791 skb_queue_splice_tail_init(list, backlogq);
792 }
793 link->snd_nxt = seqno;
794 return 0;
795}
796
797/**
798 * tipc_link_xmit(): enqueue buffer list according to queue situation 699 * tipc_link_xmit(): enqueue buffer list according to queue situation
799 * @link: link to use 700 * @link: link to use
800 * @list: chain of buffers containing message 701 * @list: chain of buffers containing message
@@ -867,40 +768,6 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list,
867 return 0; 768 return 0;
868} 769}
869 770
870/*
871 * tipc_link_push_packets - push unsent packets to bearer
872 *
873 * Push out the unsent messages of a link where congestion
874 * has abated. Node is locked.
875 *
876 * Called with node locked
877 */
878void tipc_link_push_packets(struct tipc_link *link)
879{
880 struct sk_buff *skb;
881 struct tipc_msg *msg;
882 u16 seqno = link->snd_nxt;
883 u16 ack = mod(link->rcv_nxt - 1);
884
885 while (skb_queue_len(&link->transmq) < link->window) {
886 skb = __skb_dequeue(&link->backlogq);
887 if (!skb)
888 break;
889 TIPC_SKB_CB(skb)->ackers = link->ackers;
890 msg = buf_msg(skb);
891 link->backlog[msg_importance(msg)].len--;
892 msg_set_ack(msg, ack);
893 msg_set_seqno(msg, seqno);
894 seqno = mod(seqno + 1);
895 /* msg_set_bcast_ack(msg, link->owner->bclink.last_in); */
896 link->rcv_unacked = 0;
897 __skb_queue_tail(&link->transmq, skb);
898 tipc_bearer_send(link->net, link->bearer_id,
899 skb, link->media_addr);
900 }
901 link->snd_nxt = seqno;
902}
903
904void tipc_link_advance_backlog(struct tipc_link *l, struct sk_buff_head *xmitq) 771void tipc_link_advance_backlog(struct tipc_link *l, struct sk_buff_head *xmitq)
905{ 772{
906 struct sk_buff *skb, *_skb; 773 struct sk_buff *skb, *_skb;
@@ -943,40 +810,6 @@ static void link_retransmit_failure(struct tipc_link *l, struct sk_buff *skb)
943 msg_seqno(hdr), msg_prevnode(hdr), msg_orignode(hdr)); 810 msg_seqno(hdr), msg_prevnode(hdr), msg_orignode(hdr));
944} 811}
945 812
946void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *skb,
947 u32 retransmits)
948{
949 struct tipc_msg *msg;
950
951 if (!skb)
952 return;
953
954 msg = buf_msg(skb);
955
956 /* Detect repeated retransmit failures */
957 if (l_ptr->last_retransm == msg_seqno(msg)) {
958 if (++l_ptr->stale_count > 100) {
959 link_retransmit_failure(l_ptr, skb);
960 return;
961 }
962 } else {
963 l_ptr->last_retransm = msg_seqno(msg);
964 l_ptr->stale_count = 1;
965 }
966
967 skb_queue_walk_from(&l_ptr->transmq, skb) {
968 if (!retransmits)
969 break;
970 msg = buf_msg(skb);
971 msg_set_ack(msg, mod(l_ptr->rcv_nxt - 1));
972 /* msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in); */
973 tipc_bearer_send(l_ptr->net, l_ptr->bearer_id, skb,
974 l_ptr->media_addr);
975 retransmits--;
976 l_ptr->stats.retransmitted++;
977 }
978}
979
980int tipc_link_retrans(struct tipc_link *l, u16 from, u16 to, 813int tipc_link_retrans(struct tipc_link *l, u16 from, u16 to,
981 struct sk_buff_head *xmitq) 814 struct sk_buff_head *xmitq)
982{ 815{
@@ -1249,45 +1082,6 @@ drop:
1249 return rc; 1082 return rc;
1250} 1083}
1251 1084
1252/**
1253 * tipc_link_defer_pkt - Add out-of-sequence message to deferred reception queue
1254 *
1255 * Returns increase in queue length (i.e. 0 or 1)
1256 */
1257u32 tipc_link_defer_pkt(struct sk_buff_head *list, struct sk_buff *skb)
1258{
1259 struct sk_buff *skb1;
1260 u16 seq_no = buf_seqno(skb);
1261
1262 /* Empty queue ? */
1263 if (skb_queue_empty(list)) {
1264 __skb_queue_tail(list, skb);
1265 return 1;
1266 }
1267
1268 /* Last ? */
1269 if (less(buf_seqno(skb_peek_tail(list)), seq_no)) {
1270 __skb_queue_tail(list, skb);
1271 return 1;
1272 }
1273
1274 /* Locate insertion point in queue, then insert; discard if duplicate */
1275 skb_queue_walk(list, skb1) {
1276 u16 curr_seqno = buf_seqno(skb1);
1277
1278 if (seq_no == curr_seqno) {
1279 kfree_skb(skb);
1280 return 0;
1281 }
1282
1283 if (less(seq_no, curr_seqno))
1284 break;
1285 }
1286
1287 __skb_queue_before(list, skb1, skb);
1288 return 1;
1289}
1290
1291/* 1085/*
1292 * Send protocol message to the other endpoint. 1086 * Send protocol message to the other endpoint.
1293 */ 1087 */
diff --git a/net/tipc/link.h b/net/tipc/link.h
index eb1d7f9d5522..66d859b66c84 100644
--- a/net/tipc/link.h
+++ b/net/tipc/link.h
@@ -224,7 +224,6 @@ bool tipc_link_create(struct net *net, char *if_name, int bearer_id,
224 int tolerance, char net_plane, u32 mtu, int priority, 224 int tolerance, char net_plane, u32 mtu, int priority,
225 int window, u32 session, u32 ownnode, u32 peer, 225 int window, u32 session, u32 ownnode, u32 peer,
226 u16 peer_caps, 226 u16 peer_caps,
227 struct tipc_media_addr *maddr,
228 struct tipc_link *bc_sndlink, 227 struct tipc_link *bc_sndlink,
229 struct tipc_link *bc_rcvlink, 228 struct tipc_link *bc_rcvlink,
230 struct sk_buff_head *inputq, 229 struct sk_buff_head *inputq,
@@ -249,22 +248,10 @@ bool tipc_link_is_synching(struct tipc_link *l);
249bool tipc_link_is_failingover(struct tipc_link *l); 248bool tipc_link_is_failingover(struct tipc_link *l);
250bool tipc_link_is_blocked(struct tipc_link *l); 249bool tipc_link_is_blocked(struct tipc_link *l);
251void tipc_link_set_active(struct tipc_link *l, bool active); 250void tipc_link_set_active(struct tipc_link *l, bool active);
252void tipc_link_purge_queues(struct tipc_link *l_ptr);
253void tipc_link_purge_backlog(struct tipc_link *l);
254void tipc_link_reset(struct tipc_link *l_ptr); 251void tipc_link_reset(struct tipc_link *l_ptr);
255int __tipc_link_xmit(struct net *net, struct tipc_link *link,
256 struct sk_buff_head *list);
257int tipc_link_xmit(struct tipc_link *link, struct sk_buff_head *list, 252int tipc_link_xmit(struct tipc_link *link, struct sk_buff_head *list,
258 struct sk_buff_head *xmitq); 253 struct sk_buff_head *xmitq);
259void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int prob, 254void tipc_link_set_queue_limits(struct tipc_link *l, u32 window);
260 u32 gap, u32 tolerance, u32 priority);
261void tipc_link_push_packets(struct tipc_link *l_ptr);
262u32 tipc_link_defer_pkt(struct sk_buff_head *list, struct sk_buff *buf);
263void tipc_link_set_queue_limits(struct tipc_link *l_ptr, u32 window);
264void tipc_link_retransmit(struct tipc_link *l_ptr,
265 struct sk_buff *start, u32 retransmits);
266struct sk_buff *tipc_skb_queue_next(const struct sk_buff_head *list,
267 const struct sk_buff *skb);
268 255
269int tipc_nl_link_dump(struct sk_buff *skb, struct netlink_callback *cb); 256int tipc_nl_link_dump(struct sk_buff *skb, struct netlink_callback *cb);
270int tipc_nl_link_get(struct sk_buff *skb, struct genl_info *info); 257int tipc_nl_link_get(struct sk_buff *skb, struct genl_info *info);
diff --git a/net/tipc/node.c b/net/tipc/node.c
index f4772f53f41a..7493506b069b 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -606,7 +606,7 @@ void tipc_node_check_dest(struct net *net, u32 onode,
606 b->net_plane, b->mtu, b->priority, 606 b->net_plane, b->mtu, b->priority,
607 b->window, mod(tipc_net(net)->random), 607 b->window, mod(tipc_net(net)->random),
608 tipc_own_addr(net), onode, 608 tipc_own_addr(net), onode,
609 n->capabilities, &le->maddr, 609 n->capabilities,
610 tipc_bc_sndlink(n->net), n->bc_entry.link, 610 tipc_bc_sndlink(n->net), n->bc_entry.link,
611 &le->inputq, 611 &le->inputq,
612 &n->bc_entry.namedq, &l)) { 612 &n->bc_entry.namedq, &l)) {
@@ -943,18 +943,13 @@ void tipc_node_unlock(struct tipc_node *node)
943 publ_list = &node->publ_list; 943 publ_list = &node->publ_list;
944 944
945 node->action_flags &= ~(TIPC_NOTIFY_NODE_DOWN | TIPC_NOTIFY_NODE_UP | 945 node->action_flags &= ~(TIPC_NOTIFY_NODE_DOWN | TIPC_NOTIFY_NODE_UP |
946 TIPC_NOTIFY_LINK_DOWN | TIPC_NOTIFY_LINK_UP | 946 TIPC_NOTIFY_LINK_DOWN | TIPC_NOTIFY_LINK_UP);
947 TIPC_WAKEUP_BCAST_USERS | TIPC_BCAST_MSG_EVT |
948 TIPC_BCAST_RESET);
949 947
950 spin_unlock_bh(&node->lock); 948 spin_unlock_bh(&node->lock);
951 949
952 if (flags & TIPC_NOTIFY_NODE_DOWN) 950 if (flags & TIPC_NOTIFY_NODE_DOWN)
953 tipc_publ_notify(net, publ_list, addr); 951 tipc_publ_notify(net, publ_list, addr);
954 952
955 if (flags & TIPC_WAKEUP_BCAST_USERS)
956 tipc_bclink_wakeup_users(net);
957
958 if (flags & TIPC_NOTIFY_NODE_UP) 953 if (flags & TIPC_NOTIFY_NODE_UP)
959 tipc_named_node_up(net, addr); 954 tipc_named_node_up(net, addr);
960 955
@@ -966,11 +961,6 @@ void tipc_node_unlock(struct tipc_node *node)
966 tipc_nametbl_withdraw(net, TIPC_LINK_STATE, addr, 961 tipc_nametbl_withdraw(net, TIPC_LINK_STATE, addr,
967 link_id, addr); 962 link_id, addr);
968 963
969 if (flags & TIPC_BCAST_MSG_EVT)
970 tipc_bclink_input(net);
971
972 if (flags & TIPC_BCAST_RESET)
973 tipc_node_reset_links(node);
974} 964}
975 965
976/* Caller should hold node lock for the passed node */ 966/* Caller should hold node lock for the passed node */
diff --git a/net/tipc/node.h b/net/tipc/node.h
index 36a1cd0bc1f1..6734562d3c6e 100644
--- a/net/tipc/node.h
+++ b/net/tipc/node.h
@@ -55,11 +55,8 @@
55enum { 55enum {
56 TIPC_NOTIFY_NODE_DOWN = (1 << 3), 56 TIPC_NOTIFY_NODE_DOWN = (1 << 3),
57 TIPC_NOTIFY_NODE_UP = (1 << 4), 57 TIPC_NOTIFY_NODE_UP = (1 << 4),
58 TIPC_WAKEUP_BCAST_USERS = (1 << 5),
59 TIPC_NOTIFY_LINK_UP = (1 << 6), 58 TIPC_NOTIFY_LINK_UP = (1 << 6),
60 TIPC_NOTIFY_LINK_DOWN = (1 << 7), 59 TIPC_NOTIFY_LINK_DOWN = (1 << 7)
61 TIPC_BCAST_MSG_EVT = (1 << 9),
62 TIPC_BCAST_RESET = (1 << 10)
63}; 60};
64 61
65/* Optional capabilities supported by this code version 62/* Optional capabilities supported by this code version
@@ -70,29 +67,6 @@ enum {
70 67
71#define TIPC_NODE_CAPABILITIES TIPC_BCAST_SYNCH 68#define TIPC_NODE_CAPABILITIES TIPC_BCAST_SYNCH
72 69
73/**
74 * struct tipc_node_bclink - TIPC node bclink structure
75 * @acked: sequence # of last outbound b'cast message acknowledged by node
76 * @last_in: sequence # of last in-sequence b'cast message received from node
77 * @last_sent: sequence # of last b'cast message sent by node
78 * @oos_state: state tracker for handling OOS b'cast messages
79 * @deferred_queue: deferred queue saved OOS b'cast message received from node
80 * @reasm_buf: broadcast reassembly queue head from node
81 * @inputq_map: bitmap indicating which inqueues should be kicked
82 * @recv_permitted: true if node is allowed to receive b'cast messages
83 */
84struct tipc_node_bclink {
85 u32 acked;
86 u32 last_in;
87 u32 last_sent;
88 u32 oos_state;
89 u32 deferred_size;
90 struct sk_buff_head deferdq;
91 struct sk_buff *reasm_buf;
92 struct sk_buff_head namedq;
93 bool recv_permitted;
94};
95
96struct tipc_link_entry { 70struct tipc_link_entry {
97 struct tipc_link *link; 71 struct tipc_link *link;
98 u32 mtu; 72 u32 mtu;
@@ -120,7 +94,6 @@ struct tipc_bclink_entry {
120 * @active_links: bearer ids of active links, used as index into links[] array 94 * @active_links: bearer ids of active links, used as index into links[] array
121 * @links: array containing references to all links to node 95 * @links: array containing references to all links to node
122 * @action_flags: bit mask of different types of node actions 96 * @action_flags: bit mask of different types of node actions
123 * @bclink: broadcast-related info
124 * @state: connectivity state vs peer node 97 * @state: connectivity state vs peer node
125 * @sync_point: sequence number where synch/failover is finished 98 * @sync_point: sequence number where synch/failover is finished
126 * @list: links to adjacent nodes in sorted list of cluster's nodes 99 * @list: links to adjacent nodes in sorted list of cluster's nodes
@@ -142,7 +115,6 @@ struct tipc_node {
142 struct tipc_link_entry links[MAX_BEARERS]; 115 struct tipc_link_entry links[MAX_BEARERS];
143 struct tipc_bclink_entry bc_entry; 116 struct tipc_bclink_entry bc_entry;
144 int action_flags; 117 int action_flags;
145 struct tipc_node_bclink bclink;
146 struct list_head list; 118 struct list_head list;
147 int state; 119 int state;
148 u16 sync_point; 120 u16 sync_point;