aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc/bcast.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/tipc/bcast.c')
-rw-r--r--net/tipc/bcast.c194
1 files changed, 119 insertions, 75 deletions
diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index 95ab5ef92920..26631679a1fa 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -71,7 +71,7 @@ struct tipc_bcbearer_pair {
71 * Note: The fields labelled "temporary" are incorporated into the bearer 71 * Note: The fields labelled "temporary" are incorporated into the bearer
72 * to avoid consuming potentially limited stack space through the use of 72 * to avoid consuming potentially limited stack space through the use of
73 * large local variables within multicast routines. Concurrent access is 73 * large local variables within multicast routines. Concurrent access is
74 * prevented through use of the spinlock "bc_lock". 74 * prevented through use of the spinlock "bclink_lock".
75 */ 75 */
76struct tipc_bcbearer { 76struct tipc_bcbearer {
77 struct tipc_bearer bearer; 77 struct tipc_bearer bearer;
@@ -84,34 +84,64 @@ struct tipc_bcbearer {
84 84
85/** 85/**
86 * struct tipc_bclink - link used for broadcast messages 86 * struct tipc_bclink - link used for broadcast messages
87 * @lock: spinlock governing access to structure
87 * @link: (non-standard) broadcast link structure 88 * @link: (non-standard) broadcast link structure
88 * @node: (non-standard) node structure representing b'cast link's peer node 89 * @node: (non-standard) node structure representing b'cast link's peer node
90 * @flags: represent bclink states
89 * @bcast_nodes: map of broadcast-capable nodes 91 * @bcast_nodes: map of broadcast-capable nodes
90 * @retransmit_to: node that most recently requested a retransmit 92 * @retransmit_to: node that most recently requested a retransmit
91 * 93 *
92 * Handles sequence numbering, fragmentation, bundling, etc. 94 * Handles sequence numbering, fragmentation, bundling, etc.
93 */ 95 */
94struct tipc_bclink { 96struct tipc_bclink {
97 spinlock_t lock;
95 struct tipc_link link; 98 struct tipc_link link;
96 struct tipc_node node; 99 struct tipc_node node;
100 unsigned int flags;
97 struct tipc_node_map bcast_nodes; 101 struct tipc_node_map bcast_nodes;
98 struct tipc_node *retransmit_to; 102 struct tipc_node *retransmit_to;
99}; 103};
100 104
101static struct tipc_bcbearer bcast_bearer; 105static struct tipc_bcbearer *bcbearer;
102static struct tipc_bclink bcast_link; 106static struct tipc_bclink *bclink;
103 107static struct tipc_link *bcl;
104static struct tipc_bcbearer *bcbearer = &bcast_bearer;
105static struct tipc_bclink *bclink = &bcast_link;
106static struct tipc_link *bcl = &bcast_link.link;
107
108static DEFINE_SPINLOCK(bc_lock);
109 108
110const char tipc_bclink_name[] = "broadcast-link"; 109const char tipc_bclink_name[] = "broadcast-link";
111 110
112static void tipc_nmap_diff(struct tipc_node_map *nm_a, 111static void tipc_nmap_diff(struct tipc_node_map *nm_a,
113 struct tipc_node_map *nm_b, 112 struct tipc_node_map *nm_b,
114 struct tipc_node_map *nm_diff); 113 struct tipc_node_map *nm_diff);
114static void tipc_nmap_add(struct tipc_node_map *nm_ptr, u32 node);
115static void tipc_nmap_remove(struct tipc_node_map *nm_ptr, u32 node);
116
117static void tipc_bclink_lock(void)
118{
119 spin_lock_bh(&bclink->lock);
120}
121
122static void tipc_bclink_unlock(void)
123{
124 struct tipc_node *node = NULL;
125
126 if (likely(!bclink->flags)) {
127 spin_unlock_bh(&bclink->lock);
128 return;
129 }
130
131 if (bclink->flags & TIPC_BCLINK_RESET) {
132 bclink->flags &= ~TIPC_BCLINK_RESET;
133 node = tipc_bclink_retransmit_to();
134 }
135 spin_unlock_bh(&bclink->lock);
136
137 if (node)
138 tipc_link_reset_all(node);
139}
140
141void tipc_bclink_set_flags(unsigned int flags)
142{
143 bclink->flags |= flags;
144}
115 145
116static u32 bcbuf_acks(struct sk_buff *buf) 146static u32 bcbuf_acks(struct sk_buff *buf)
117{ 147{
@@ -130,16 +160,16 @@ static void bcbuf_decr_acks(struct sk_buff *buf)
130 160
131void tipc_bclink_add_node(u32 addr) 161void tipc_bclink_add_node(u32 addr)
132{ 162{
133 spin_lock_bh(&bc_lock); 163 tipc_bclink_lock();
134 tipc_nmap_add(&bclink->bcast_nodes, addr); 164 tipc_nmap_add(&bclink->bcast_nodes, addr);
135 spin_unlock_bh(&bc_lock); 165 tipc_bclink_unlock();
136} 166}
137 167
138void tipc_bclink_remove_node(u32 addr) 168void tipc_bclink_remove_node(u32 addr)
139{ 169{
140 spin_lock_bh(&bc_lock); 170 tipc_bclink_lock();
141 tipc_nmap_remove(&bclink->bcast_nodes, addr); 171 tipc_nmap_remove(&bclink->bcast_nodes, addr);
142 spin_unlock_bh(&bc_lock); 172 tipc_bclink_unlock();
143} 173}
144 174
145static void bclink_set_last_sent(void) 175static void bclink_set_last_sent(void)
@@ -165,7 +195,7 @@ static void bclink_update_last_sent(struct tipc_node *node, u32 seqno)
165/** 195/**
166 * tipc_bclink_retransmit_to - get most recent node to request retransmission 196 * tipc_bclink_retransmit_to - get most recent node to request retransmission
167 * 197 *
168 * Called with bc_lock locked 198 * Called with bclink_lock locked
169 */ 199 */
170struct tipc_node *tipc_bclink_retransmit_to(void) 200struct tipc_node *tipc_bclink_retransmit_to(void)
171{ 201{
@@ -177,7 +207,7 @@ struct tipc_node *tipc_bclink_retransmit_to(void)
177 * @after: sequence number of last packet to *not* retransmit 207 * @after: sequence number of last packet to *not* retransmit
178 * @to: sequence number of last packet to retransmit 208 * @to: sequence number of last packet to retransmit
179 * 209 *
180 * Called with bc_lock locked 210 * Called with bclink_lock locked
181 */ 211 */
182static void bclink_retransmit_pkt(u32 after, u32 to) 212static void bclink_retransmit_pkt(u32 after, u32 to)
183{ 213{
@@ -194,7 +224,7 @@ static void bclink_retransmit_pkt(u32 after, u32 to)
194 * @n_ptr: node that sent acknowledgement info 224 * @n_ptr: node that sent acknowledgement info
195 * @acked: broadcast sequence # that has been acknowledged 225 * @acked: broadcast sequence # that has been acknowledged
196 * 226 *
197 * Node is locked, bc_lock unlocked. 227 * Node is locked, bclink_lock unlocked.
198 */ 228 */
199void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked) 229void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked)
200{ 230{
@@ -202,8 +232,7 @@ void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked)
202 struct sk_buff *next; 232 struct sk_buff *next;
203 unsigned int released = 0; 233 unsigned int released = 0;
204 234
205 spin_lock_bh(&bc_lock); 235 tipc_bclink_lock();
206
207 /* Bail out if tx queue is empty (no clean up is required) */ 236 /* Bail out if tx queue is empty (no clean up is required) */
208 crs = bcl->first_out; 237 crs = bcl->first_out;
209 if (!crs) 238 if (!crs)
@@ -267,13 +296,13 @@ void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked)
267 if (unlikely(released && !list_empty(&bcl->waiting_ports))) 296 if (unlikely(released && !list_empty(&bcl->waiting_ports)))
268 tipc_link_wakeup_ports(bcl, 0); 297 tipc_link_wakeup_ports(bcl, 0);
269exit: 298exit:
270 spin_unlock_bh(&bc_lock); 299 tipc_bclink_unlock();
271} 300}
272 301
273/** 302/**
274 * tipc_bclink_update_link_state - update broadcast link state 303 * tipc_bclink_update_link_state - update broadcast link state
275 * 304 *
276 * tipc_net_lock and node lock set 305 * RCU and node lock set
277 */ 306 */
278void tipc_bclink_update_link_state(struct tipc_node *n_ptr, u32 last_sent) 307void tipc_bclink_update_link_state(struct tipc_node *n_ptr, u32 last_sent)
279{ 308{
@@ -320,10 +349,10 @@ void tipc_bclink_update_link_state(struct tipc_node *n_ptr, u32 last_sent)
320 ? buf_seqno(n_ptr->bclink.deferred_head) - 1 349 ? buf_seqno(n_ptr->bclink.deferred_head) - 1
321 : n_ptr->bclink.last_sent); 350 : n_ptr->bclink.last_sent);
322 351
323 spin_lock_bh(&bc_lock); 352 tipc_bclink_lock();
324 tipc_bearer_send(&bcbearer->bearer, buf, NULL); 353 tipc_bearer_send(MAX_BEARERS, buf, NULL);
325 bcl->stats.sent_nacks++; 354 bcl->stats.sent_nacks++;
326 spin_unlock_bh(&bc_lock); 355 tipc_bclink_unlock();
327 kfree_skb(buf); 356 kfree_skb(buf);
328 357
329 n_ptr->bclink.oos_state++; 358 n_ptr->bclink.oos_state++;
@@ -335,8 +364,6 @@ void tipc_bclink_update_link_state(struct tipc_node *n_ptr, u32 last_sent)
335 * 364 *
336 * Delay any upcoming NACK by this node if another node has already 365 * Delay any upcoming NACK by this node if another node has already
337 * requested the first message this node is going to ask for. 366 * requested the first message this node is going to ask for.
338 *
339 * Only tipc_net_lock set.
340 */ 367 */
341static void bclink_peek_nack(struct tipc_msg *msg) 368static void bclink_peek_nack(struct tipc_msg *msg)
342{ 369{
@@ -362,7 +389,7 @@ int tipc_bclink_xmit(struct sk_buff *buf)
362{ 389{
363 int res; 390 int res;
364 391
365 spin_lock_bh(&bc_lock); 392 tipc_bclink_lock();
366 393
367 if (!bclink->bcast_nodes.count) { 394 if (!bclink->bcast_nodes.count) {
368 res = msg_data_sz(buf_msg(buf)); 395 res = msg_data_sz(buf_msg(buf));
@@ -377,14 +404,14 @@ int tipc_bclink_xmit(struct sk_buff *buf)
377 bcl->stats.accu_queue_sz += bcl->out_queue_size; 404 bcl->stats.accu_queue_sz += bcl->out_queue_size;
378 } 405 }
379exit: 406exit:
380 spin_unlock_bh(&bc_lock); 407 tipc_bclink_unlock();
381 return res; 408 return res;
382} 409}
383 410
384/** 411/**
385 * bclink_accept_pkt - accept an incoming, in-sequence broadcast packet 412 * bclink_accept_pkt - accept an incoming, in-sequence broadcast packet
386 * 413 *
387 * Called with both sending node's lock and bc_lock taken. 414 * Called with both sending node's lock and bclink_lock taken.
388 */ 415 */
389static void bclink_accept_pkt(struct tipc_node *node, u32 seqno) 416static void bclink_accept_pkt(struct tipc_node *node, u32 seqno)
390{ 417{
@@ -408,7 +435,7 @@ static void bclink_accept_pkt(struct tipc_node *node, u32 seqno)
408/** 435/**
409 * tipc_bclink_rcv - receive a broadcast packet, and deliver upwards 436 * tipc_bclink_rcv - receive a broadcast packet, and deliver upwards
410 * 437 *
411 * tipc_net_lock is read_locked, no other locks set 438 * RCU is locked, no other locks set
412 */ 439 */
413void tipc_bclink_rcv(struct sk_buff *buf) 440void tipc_bclink_rcv(struct sk_buff *buf)
414{ 441{
@@ -439,12 +466,12 @@ void tipc_bclink_rcv(struct sk_buff *buf)
439 if (msg_destnode(msg) == tipc_own_addr) { 466 if (msg_destnode(msg) == tipc_own_addr) {
440 tipc_bclink_acknowledge(node, msg_bcast_ack(msg)); 467 tipc_bclink_acknowledge(node, msg_bcast_ack(msg));
441 tipc_node_unlock(node); 468 tipc_node_unlock(node);
442 spin_lock_bh(&bc_lock); 469 tipc_bclink_lock();
443 bcl->stats.recv_nacks++; 470 bcl->stats.recv_nacks++;
444 bclink->retransmit_to = node; 471 bclink->retransmit_to = node;
445 bclink_retransmit_pkt(msg_bcgap_after(msg), 472 bclink_retransmit_pkt(msg_bcgap_after(msg),
446 msg_bcgap_to(msg)); 473 msg_bcgap_to(msg));
447 spin_unlock_bh(&bc_lock); 474 tipc_bclink_unlock();
448 } else { 475 } else {
449 tipc_node_unlock(node); 476 tipc_node_unlock(node);
450 bclink_peek_nack(msg); 477 bclink_peek_nack(msg);
@@ -462,51 +489,47 @@ receive:
462 /* Deliver message to destination */ 489 /* Deliver message to destination */
463 490
464 if (likely(msg_isdata(msg))) { 491 if (likely(msg_isdata(msg))) {
465 spin_lock_bh(&bc_lock); 492 tipc_bclink_lock();
466 bclink_accept_pkt(node, seqno); 493 bclink_accept_pkt(node, seqno);
467 spin_unlock_bh(&bc_lock); 494 tipc_bclink_unlock();
468 tipc_node_unlock(node); 495 tipc_node_unlock(node);
469 if (likely(msg_mcast(msg))) 496 if (likely(msg_mcast(msg)))
470 tipc_port_mcast_rcv(buf, NULL); 497 tipc_port_mcast_rcv(buf, NULL);
471 else 498 else
472 kfree_skb(buf); 499 kfree_skb(buf);
473 } else if (msg_user(msg) == MSG_BUNDLER) { 500 } else if (msg_user(msg) == MSG_BUNDLER) {
474 spin_lock_bh(&bc_lock); 501 tipc_bclink_lock();
475 bclink_accept_pkt(node, seqno); 502 bclink_accept_pkt(node, seqno);
476 bcl->stats.recv_bundles++; 503 bcl->stats.recv_bundles++;
477 bcl->stats.recv_bundled += msg_msgcnt(msg); 504 bcl->stats.recv_bundled += msg_msgcnt(msg);
478 spin_unlock_bh(&bc_lock); 505 tipc_bclink_unlock();
479 tipc_node_unlock(node); 506 tipc_node_unlock(node);
480 tipc_link_bundle_rcv(buf); 507 tipc_link_bundle_rcv(buf);
481 } else if (msg_user(msg) == MSG_FRAGMENTER) { 508 } else if (msg_user(msg) == MSG_FRAGMENTER) {
482 int ret; 509 tipc_buf_append(&node->bclink.reasm_buf, &buf);
483 ret = tipc_link_frag_rcv(&node->bclink.reasm_head, 510 if (unlikely(!buf && !node->bclink.reasm_buf))
484 &node->bclink.reasm_tail,
485 &buf);
486 if (ret == LINK_REASM_ERROR)
487 goto unlock; 511 goto unlock;
488 spin_lock_bh(&bc_lock); 512 tipc_bclink_lock();
489 bclink_accept_pkt(node, seqno); 513 bclink_accept_pkt(node, seqno);
490 bcl->stats.recv_fragments++; 514 bcl->stats.recv_fragments++;
491 if (ret == LINK_REASM_COMPLETE) { 515 if (buf) {
492 bcl->stats.recv_fragmented++; 516 bcl->stats.recv_fragmented++;
493 /* Point msg to inner header */
494 msg = buf_msg(buf); 517 msg = buf_msg(buf);
495 spin_unlock_bh(&bc_lock); 518 tipc_bclink_unlock();
496 goto receive; 519 goto receive;
497 } 520 }
498 spin_unlock_bh(&bc_lock); 521 tipc_bclink_unlock();
499 tipc_node_unlock(node); 522 tipc_node_unlock(node);
500 } else if (msg_user(msg) == NAME_DISTRIBUTOR) { 523 } else if (msg_user(msg) == NAME_DISTRIBUTOR) {
501 spin_lock_bh(&bc_lock); 524 tipc_bclink_lock();
502 bclink_accept_pkt(node, seqno); 525 bclink_accept_pkt(node, seqno);
503 spin_unlock_bh(&bc_lock); 526 tipc_bclink_unlock();
504 tipc_node_unlock(node); 527 tipc_node_unlock(node);
505 tipc_named_rcv(buf); 528 tipc_named_rcv(buf);
506 } else { 529 } else {
507 spin_lock_bh(&bc_lock); 530 tipc_bclink_lock();
508 bclink_accept_pkt(node, seqno); 531 bclink_accept_pkt(node, seqno);
509 spin_unlock_bh(&bc_lock); 532 tipc_bclink_unlock();
510 tipc_node_unlock(node); 533 tipc_node_unlock(node);
511 kfree_skb(buf); 534 kfree_skb(buf);
512 } 535 }
@@ -552,14 +575,14 @@ receive:
552 } else 575 } else
553 deferred = 0; 576 deferred = 0;
554 577
555 spin_lock_bh(&bc_lock); 578 tipc_bclink_lock();
556 579
557 if (deferred) 580 if (deferred)
558 bcl->stats.deferred_recv++; 581 bcl->stats.deferred_recv++;
559 else 582 else
560 bcl->stats.duplicates++; 583 bcl->stats.duplicates++;
561 584
562 spin_unlock_bh(&bc_lock); 585 tipc_bclink_unlock();
563 586
564unlock: 587unlock:
565 tipc_node_unlock(node); 588 tipc_node_unlock(node);
@@ -627,13 +650,13 @@ static int tipc_bcbearer_send(struct sk_buff *buf, struct tipc_bearer *unused1,
627 650
628 if (bp_index == 0) { 651 if (bp_index == 0) {
629 /* Use original buffer for first bearer */ 652 /* Use original buffer for first bearer */
630 tipc_bearer_send(b, buf, &b->bcast_addr); 653 tipc_bearer_send(b->identity, buf, &b->bcast_addr);
631 } else { 654 } else {
632 /* Avoid concurrent buffer access */ 655 /* Avoid concurrent buffer access */
633 tbuf = pskb_copy(buf, GFP_ATOMIC); 656 tbuf = pskb_copy_for_clone(buf, GFP_ATOMIC);
634 if (!tbuf) 657 if (!tbuf)
635 break; 658 break;
636 tipc_bearer_send(b, tbuf, &b->bcast_addr); 659 tipc_bearer_send(b->identity, tbuf, &b->bcast_addr);
637 kfree_skb(tbuf); /* Bearer keeps a clone */ 660 kfree_skb(tbuf); /* Bearer keeps a clone */
638 } 661 }
639 662
@@ -655,20 +678,27 @@ static int tipc_bcbearer_send(struct sk_buff *buf, struct tipc_bearer *unused1,
655/** 678/**
656 * tipc_bcbearer_sort - create sets of bearer pairs used by broadcast bearer 679 * tipc_bcbearer_sort - create sets of bearer pairs used by broadcast bearer
657 */ 680 */
658void tipc_bcbearer_sort(void) 681void tipc_bcbearer_sort(struct tipc_node_map *nm_ptr, u32 node, bool action)
659{ 682{
660 struct tipc_bcbearer_pair *bp_temp = bcbearer->bpairs_temp; 683 struct tipc_bcbearer_pair *bp_temp = bcbearer->bpairs_temp;
661 struct tipc_bcbearer_pair *bp_curr; 684 struct tipc_bcbearer_pair *bp_curr;
685 struct tipc_bearer *b;
662 int b_index; 686 int b_index;
663 int pri; 687 int pri;
664 688
665 spin_lock_bh(&bc_lock); 689 tipc_bclink_lock();
690
691 if (action)
692 tipc_nmap_add(nm_ptr, node);
693 else
694 tipc_nmap_remove(nm_ptr, node);
666 695
667 /* Group bearers by priority (can assume max of two per priority) */ 696 /* Group bearers by priority (can assume max of two per priority) */
668 memset(bp_temp, 0, sizeof(bcbearer->bpairs_temp)); 697 memset(bp_temp, 0, sizeof(bcbearer->bpairs_temp));
669 698
699 rcu_read_lock();
670 for (b_index = 0; b_index < MAX_BEARERS; b_index++) { 700 for (b_index = 0; b_index < MAX_BEARERS; b_index++) {
671 struct tipc_bearer *b = bearer_list[b_index]; 701 b = rcu_dereference_rtnl(bearer_list[b_index]);
672 if (!b || !b->nodes.count) 702 if (!b || !b->nodes.count)
673 continue; 703 continue;
674 704
@@ -677,6 +707,7 @@ void tipc_bcbearer_sort(void)
677 else 707 else
678 bp_temp[b->priority].secondary = b; 708 bp_temp[b->priority].secondary = b;
679 } 709 }
710 rcu_read_unlock();
680 711
681 /* Create array of bearer pairs for broadcasting */ 712 /* Create array of bearer pairs for broadcasting */
682 bp_curr = bcbearer->bpairs; 713 bp_curr = bcbearer->bpairs;
@@ -702,7 +733,7 @@ void tipc_bcbearer_sort(void)
702 bp_curr++; 733 bp_curr++;
703 } 734 }
704 735
705 spin_unlock_bh(&bc_lock); 736 tipc_bclink_unlock();
706} 737}
707 738
708 739
@@ -714,7 +745,7 @@ int tipc_bclink_stats(char *buf, const u32 buf_size)
714 if (!bcl) 745 if (!bcl)
715 return 0; 746 return 0;
716 747
717 spin_lock_bh(&bc_lock); 748 tipc_bclink_lock();
718 749
719 s = &bcl->stats; 750 s = &bcl->stats;
720 751
@@ -743,7 +774,7 @@ int tipc_bclink_stats(char *buf, const u32 buf_size)
743 s->queue_sz_counts ? 774 s->queue_sz_counts ?
744 (s->accu_queue_sz / s->queue_sz_counts) : 0); 775 (s->accu_queue_sz / s->queue_sz_counts) : 0);
745 776
746 spin_unlock_bh(&bc_lock); 777 tipc_bclink_unlock();
747 return ret; 778 return ret;
748} 779}
749 780
@@ -752,9 +783,9 @@ int tipc_bclink_reset_stats(void)
752 if (!bcl) 783 if (!bcl)
753 return -ENOPROTOOPT; 784 return -ENOPROTOOPT;
754 785
755 spin_lock_bh(&bc_lock); 786 tipc_bclink_lock();
756 memset(&bcl->stats, 0, sizeof(bcl->stats)); 787 memset(&bcl->stats, 0, sizeof(bcl->stats));
757 spin_unlock_bh(&bc_lock); 788 tipc_bclink_unlock();
758 return 0; 789 return 0;
759} 790}
760 791
@@ -765,46 +796,59 @@ int tipc_bclink_set_queue_limits(u32 limit)
765 if ((limit < TIPC_MIN_LINK_WIN) || (limit > TIPC_MAX_LINK_WIN)) 796 if ((limit < TIPC_MIN_LINK_WIN) || (limit > TIPC_MAX_LINK_WIN))
766 return -EINVAL; 797 return -EINVAL;
767 798
768 spin_lock_bh(&bc_lock); 799 tipc_bclink_lock();
769 tipc_link_set_queue_limits(bcl, limit); 800 tipc_link_set_queue_limits(bcl, limit);
770 spin_unlock_bh(&bc_lock); 801 tipc_bclink_unlock();
771 return 0; 802 return 0;
772} 803}
773 804
774void tipc_bclink_init(void) 805int tipc_bclink_init(void)
775{ 806{
807 bcbearer = kzalloc(sizeof(*bcbearer), GFP_ATOMIC);
808 if (!bcbearer)
809 return -ENOMEM;
810
811 bclink = kzalloc(sizeof(*bclink), GFP_ATOMIC);
812 if (!bclink) {
813 kfree(bcbearer);
814 return -ENOMEM;
815 }
816
817 bcl = &bclink->link;
776 bcbearer->bearer.media = &bcbearer->media; 818 bcbearer->bearer.media = &bcbearer->media;
777 bcbearer->media.send_msg = tipc_bcbearer_send; 819 bcbearer->media.send_msg = tipc_bcbearer_send;
778 sprintf(bcbearer->media.name, "tipc-broadcast"); 820 sprintf(bcbearer->media.name, "tipc-broadcast");
779 821
822 spin_lock_init(&bclink->lock);
780 INIT_LIST_HEAD(&bcl->waiting_ports); 823 INIT_LIST_HEAD(&bcl->waiting_ports);
781 bcl->next_out_no = 1; 824 bcl->next_out_no = 1;
782 spin_lock_init(&bclink->node.lock); 825 spin_lock_init(&bclink->node.lock);
783 bcl->owner = &bclink->node; 826 bcl->owner = &bclink->node;
784 bcl->max_pkt = MAX_PKT_DEFAULT_MCAST; 827 bcl->max_pkt = MAX_PKT_DEFAULT_MCAST;
785 tipc_link_set_queue_limits(bcl, BCLINK_WIN_DEFAULT); 828 tipc_link_set_queue_limits(bcl, BCLINK_WIN_DEFAULT);
786 bcl->b_ptr = &bcbearer->bearer; 829 bcl->bearer_id = MAX_BEARERS;
787 bearer_list[BCBEARER] = &bcbearer->bearer; 830 rcu_assign_pointer(bearer_list[MAX_BEARERS], &bcbearer->bearer);
788 bcl->state = WORKING_WORKING; 831 bcl->state = WORKING_WORKING;
789 strlcpy(bcl->name, tipc_bclink_name, TIPC_MAX_LINK_NAME); 832 strlcpy(bcl->name, tipc_bclink_name, TIPC_MAX_LINK_NAME);
833 return 0;
790} 834}
791 835
792void tipc_bclink_stop(void) 836void tipc_bclink_stop(void)
793{ 837{
794 spin_lock_bh(&bc_lock); 838 tipc_bclink_lock();
795 tipc_link_purge_queues(bcl); 839 tipc_link_purge_queues(bcl);
796 spin_unlock_bh(&bc_lock); 840 tipc_bclink_unlock();
797 841
798 bearer_list[BCBEARER] = NULL; 842 RCU_INIT_POINTER(bearer_list[BCBEARER], NULL);
799 memset(bclink, 0, sizeof(*bclink)); 843 synchronize_net();
800 memset(bcbearer, 0, sizeof(*bcbearer)); 844 kfree(bcbearer);
845 kfree(bclink);
801} 846}
802 847
803
804/** 848/**
805 * tipc_nmap_add - add a node to a node map 849 * tipc_nmap_add - add a node to a node map
806 */ 850 */
807void tipc_nmap_add(struct tipc_node_map *nm_ptr, u32 node) 851static void tipc_nmap_add(struct tipc_node_map *nm_ptr, u32 node)
808{ 852{
809 int n = tipc_node(node); 853 int n = tipc_node(node);
810 int w = n / WSIZE; 854 int w = n / WSIZE;
@@ -819,7 +863,7 @@ void tipc_nmap_add(struct tipc_node_map *nm_ptr, u32 node)
819/** 863/**
820 * tipc_nmap_remove - remove a node from a node map 864 * tipc_nmap_remove - remove a node from a node map
821 */ 865 */
822void tipc_nmap_remove(struct tipc_node_map *nm_ptr, u32 node) 866static void tipc_nmap_remove(struct tipc_node_map *nm_ptr, u32 node)
823{ 867{
824 int n = tipc_node(node); 868 int n = tipc_node(node);
825 int w = n / WSIZE; 869 int w = n / WSIZE;