aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDavid S. Miller <davem@davemloft.net>2014-05-05 17:26:54 -0400
committerDavid S. Miller <davem@davemloft.net>2014-05-05 17:26:54 -0400
commit5a50a92784e4630f7b3e163ecd32ac3f783d9b4e (patch)
treef8a32816ff224eddbc8a8b18e5b83cdf8b5b5f81
parent5b579e212fc77b6731e2767a0658ae7b64a67a10 (diff)
parent52ff872055e06af10f94b8853c946f07ed8a0672 (diff)
Merge branch 'tipc-next'
Ying Xue says: ==================== tipc: purge signal handler infrastructure When we delay some actions to be executed in asynchronous contexts, these usually add unnecessary code complexities, and make their behaviours unpredictable and indeterministic. Moreover, as the signal handler infrastructure is first stopped when tipc module is removed, this may cause some potential risks for us. For instance, although signal handler is already stopped, some tipc components still submit signal requests to signal handler infrastructure, which may lead to some resources not to be released or freed correctly. So the series aims to convert all actions being performed in tasklet context asynchronously with interface provided by signal handler infrastructure to be executed synchronously, thereby deleting the whole infrastructure of signal handler. ==================== Signed-off-by: David S. Miller <davem@davemloft.net>
-rw-r--r--net/tipc/Makefile2
-rw-r--r--net/tipc/bcast.c145
-rw-r--r--net/tipc/bcast.h4
-rw-r--r--net/tipc/config.c6
-rw-r--r--net/tipc/core.c7
-rw-r--r--net/tipc/core.h6
-rw-r--r--net/tipc/handler.c134
-rw-r--r--net/tipc/link.c54
-rw-r--r--net/tipc/link.h1
-rw-r--r--net/tipc/name_distr.c58
-rw-r--r--net/tipc/name_distr.h30
-rw-r--r--net/tipc/net.c9
-rw-r--r--net/tipc/net.h2
-rw-r--r--net/tipc/node.c59
-rw-r--r--net/tipc/node.h92
-rw-r--r--net/tipc/node_subscr.c9
-rw-r--r--net/tipc/node_subscr.h2
17 files changed, 268 insertions, 352 deletions
diff --git a/net/tipc/Makefile b/net/tipc/Makefile
index b282f7130d2b..a080c66d819a 100644
--- a/net/tipc/Makefile
+++ b/net/tipc/Makefile
@@ -5,7 +5,7 @@
5obj-$(CONFIG_TIPC) := tipc.o 5obj-$(CONFIG_TIPC) := tipc.o
6 6
7tipc-y += addr.o bcast.o bearer.o config.o \ 7tipc-y += addr.o bcast.o bearer.o config.o \
8 core.o handler.o link.o discover.o msg.o \ 8 core.o link.o discover.o msg.o \
9 name_distr.o subscr.o name_table.o net.o \ 9 name_distr.o subscr.o name_table.o net.o \
10 netlink.o node.o node_subscr.o port.o ref.o \ 10 netlink.o node.o node_subscr.o port.o ref.o \
11 socket.o log.o eth_media.o server.o 11 socket.o log.o eth_media.o server.o
diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index 119a59b4bec6..a0978d0890cb 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -71,7 +71,7 @@ struct tipc_bcbearer_pair {
71 * Note: The fields labelled "temporary" are incorporated into the bearer 71 * Note: The fields labelled "temporary" are incorporated into the bearer
72 * to avoid consuming potentially limited stack space through the use of 72 * to avoid consuming potentially limited stack space through the use of
73 * large local variables within multicast routines. Concurrent access is 73 * large local variables within multicast routines. Concurrent access is
74 * prevented through use of the spinlock "bc_lock". 74 * prevented through use of the spinlock "bclink_lock".
75 */ 75 */
76struct tipc_bcbearer { 76struct tipc_bcbearer {
77 struct tipc_bearer bearer; 77 struct tipc_bearer bearer;
@@ -84,28 +84,27 @@ struct tipc_bcbearer {
84 84
85/** 85/**
86 * struct tipc_bclink - link used for broadcast messages 86 * struct tipc_bclink - link used for broadcast messages
87 * @lock: spinlock governing access to structure
87 * @link: (non-standard) broadcast link structure 88 * @link: (non-standard) broadcast link structure
88 * @node: (non-standard) node structure representing b'cast link's peer node 89 * @node: (non-standard) node structure representing b'cast link's peer node
90 * @flags: represent bclink states
89 * @bcast_nodes: map of broadcast-capable nodes 91 * @bcast_nodes: map of broadcast-capable nodes
90 * @retransmit_to: node that most recently requested a retransmit 92 * @retransmit_to: node that most recently requested a retransmit
91 * 93 *
92 * Handles sequence numbering, fragmentation, bundling, etc. 94 * Handles sequence numbering, fragmentation, bundling, etc.
93 */ 95 */
94struct tipc_bclink { 96struct tipc_bclink {
97 spinlock_t lock;
95 struct tipc_link link; 98 struct tipc_link link;
96 struct tipc_node node; 99 struct tipc_node node;
100 unsigned int flags;
97 struct tipc_node_map bcast_nodes; 101 struct tipc_node_map bcast_nodes;
98 struct tipc_node *retransmit_to; 102 struct tipc_node *retransmit_to;
99}; 103};
100 104
101static struct tipc_bcbearer bcast_bearer; 105static struct tipc_bcbearer *bcbearer;
102static struct tipc_bclink bcast_link; 106static struct tipc_bclink *bclink;
103 107static struct tipc_link *bcl;
104static struct tipc_bcbearer *bcbearer = &bcast_bearer;
105static struct tipc_bclink *bclink = &bcast_link;
106static struct tipc_link *bcl = &bcast_link.link;
107
108static DEFINE_SPINLOCK(bc_lock);
109 108
110const char tipc_bclink_name[] = "broadcast-link"; 109const char tipc_bclink_name[] = "broadcast-link";
111 110
@@ -115,6 +114,35 @@ static void tipc_nmap_diff(struct tipc_node_map *nm_a,
115static void tipc_nmap_add(struct tipc_node_map *nm_ptr, u32 node); 114static void tipc_nmap_add(struct tipc_node_map *nm_ptr, u32 node);
116static void tipc_nmap_remove(struct tipc_node_map *nm_ptr, u32 node); 115static void tipc_nmap_remove(struct tipc_node_map *nm_ptr, u32 node);
117 116
117static void tipc_bclink_lock(void)
118{
119 spin_lock_bh(&bclink->lock);
120}
121
122static void tipc_bclink_unlock(void)
123{
124 struct tipc_node *node = NULL;
125
126 if (likely(!bclink->flags)) {
127 spin_unlock_bh(&bclink->lock);
128 return;
129 }
130
131 if (bclink->flags & TIPC_BCLINK_RESET) {
132 bclink->flags &= ~TIPC_BCLINK_RESET;
133 node = tipc_bclink_retransmit_to();
134 }
135 spin_unlock_bh(&bclink->lock);
136
137 if (node)
138 tipc_link_reset_all(node);
139}
140
141void tipc_bclink_set_flags(unsigned int flags)
142{
143 bclink->flags |= flags;
144}
145
118static u32 bcbuf_acks(struct sk_buff *buf) 146static u32 bcbuf_acks(struct sk_buff *buf)
119{ 147{
120 return (u32)(unsigned long)TIPC_SKB_CB(buf)->handle; 148 return (u32)(unsigned long)TIPC_SKB_CB(buf)->handle;
@@ -132,16 +160,16 @@ static void bcbuf_decr_acks(struct sk_buff *buf)
132 160
133void tipc_bclink_add_node(u32 addr) 161void tipc_bclink_add_node(u32 addr)
134{ 162{
135 spin_lock_bh(&bc_lock); 163 tipc_bclink_lock();
136 tipc_nmap_add(&bclink->bcast_nodes, addr); 164 tipc_nmap_add(&bclink->bcast_nodes, addr);
137 spin_unlock_bh(&bc_lock); 165 tipc_bclink_unlock();
138} 166}
139 167
140void tipc_bclink_remove_node(u32 addr) 168void tipc_bclink_remove_node(u32 addr)
141{ 169{
142 spin_lock_bh(&bc_lock); 170 tipc_bclink_lock();
143 tipc_nmap_remove(&bclink->bcast_nodes, addr); 171 tipc_nmap_remove(&bclink->bcast_nodes, addr);
144 spin_unlock_bh(&bc_lock); 172 tipc_bclink_unlock();
145} 173}
146 174
147static void bclink_set_last_sent(void) 175static void bclink_set_last_sent(void)
@@ -167,7 +195,7 @@ static void bclink_update_last_sent(struct tipc_node *node, u32 seqno)
167/** 195/**
168 * tipc_bclink_retransmit_to - get most recent node to request retransmission 196 * tipc_bclink_retransmit_to - get most recent node to request retransmission
169 * 197 *
170 * Called with bc_lock locked 198 * Called with bclink_lock locked
171 */ 199 */
172struct tipc_node *tipc_bclink_retransmit_to(void) 200struct tipc_node *tipc_bclink_retransmit_to(void)
173{ 201{
@@ -179,7 +207,7 @@ struct tipc_node *tipc_bclink_retransmit_to(void)
179 * @after: sequence number of last packet to *not* retransmit 207 * @after: sequence number of last packet to *not* retransmit
180 * @to: sequence number of last packet to retransmit 208 * @to: sequence number of last packet to retransmit
181 * 209 *
182 * Called with bc_lock locked 210 * Called with bclink_lock locked
183 */ 211 */
184static void bclink_retransmit_pkt(u32 after, u32 to) 212static void bclink_retransmit_pkt(u32 after, u32 to)
185{ 213{
@@ -196,7 +224,7 @@ static void bclink_retransmit_pkt(u32 after, u32 to)
196 * @n_ptr: node that sent acknowledgement info 224 * @n_ptr: node that sent acknowledgement info
197 * @acked: broadcast sequence # that has been acknowledged 225 * @acked: broadcast sequence # that has been acknowledged
198 * 226 *
199 * Node is locked, bc_lock unlocked. 227 * Node is locked, bclink_lock unlocked.
200 */ 228 */
201void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked) 229void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked)
202{ 230{
@@ -204,8 +232,7 @@ void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked)
204 struct sk_buff *next; 232 struct sk_buff *next;
205 unsigned int released = 0; 233 unsigned int released = 0;
206 234
207 spin_lock_bh(&bc_lock); 235 tipc_bclink_lock();
208
209 /* Bail out if tx queue is empty (no clean up is required) */ 236 /* Bail out if tx queue is empty (no clean up is required) */
210 crs = bcl->first_out; 237 crs = bcl->first_out;
211 if (!crs) 238 if (!crs)
@@ -269,7 +296,7 @@ void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked)
269 if (unlikely(released && !list_empty(&bcl->waiting_ports))) 296 if (unlikely(released && !list_empty(&bcl->waiting_ports)))
270 tipc_link_wakeup_ports(bcl, 0); 297 tipc_link_wakeup_ports(bcl, 0);
271exit: 298exit:
272 spin_unlock_bh(&bc_lock); 299 tipc_bclink_unlock();
273} 300}
274 301
275/** 302/**
@@ -322,10 +349,10 @@ void tipc_bclink_update_link_state(struct tipc_node *n_ptr, u32 last_sent)
322 ? buf_seqno(n_ptr->bclink.deferred_head) - 1 349 ? buf_seqno(n_ptr->bclink.deferred_head) - 1
323 : n_ptr->bclink.last_sent); 350 : n_ptr->bclink.last_sent);
324 351
325 spin_lock_bh(&bc_lock); 352 tipc_bclink_lock();
326 tipc_bearer_send(MAX_BEARERS, buf, NULL); 353 tipc_bearer_send(MAX_BEARERS, buf, NULL);
327 bcl->stats.sent_nacks++; 354 bcl->stats.sent_nacks++;
328 spin_unlock_bh(&bc_lock); 355 tipc_bclink_unlock();
329 kfree_skb(buf); 356 kfree_skb(buf);
330 357
331 n_ptr->bclink.oos_state++; 358 n_ptr->bclink.oos_state++;
@@ -362,7 +389,7 @@ int tipc_bclink_xmit(struct sk_buff *buf)
362{ 389{
363 int res; 390 int res;
364 391
365 spin_lock_bh(&bc_lock); 392 tipc_bclink_lock();
366 393
367 if (!bclink->bcast_nodes.count) { 394 if (!bclink->bcast_nodes.count) {
368 res = msg_data_sz(buf_msg(buf)); 395 res = msg_data_sz(buf_msg(buf));
@@ -377,14 +404,14 @@ int tipc_bclink_xmit(struct sk_buff *buf)
377 bcl->stats.accu_queue_sz += bcl->out_queue_size; 404 bcl->stats.accu_queue_sz += bcl->out_queue_size;
378 } 405 }
379exit: 406exit:
380 spin_unlock_bh(&bc_lock); 407 tipc_bclink_unlock();
381 return res; 408 return res;
382} 409}
383 410
384/** 411/**
385 * bclink_accept_pkt - accept an incoming, in-sequence broadcast packet 412 * bclink_accept_pkt - accept an incoming, in-sequence broadcast packet
386 * 413 *
387 * Called with both sending node's lock and bc_lock taken. 414 * Called with both sending node's lock and bclink_lock taken.
388 */ 415 */
389static void bclink_accept_pkt(struct tipc_node *node, u32 seqno) 416static void bclink_accept_pkt(struct tipc_node *node, u32 seqno)
390{ 417{
@@ -439,12 +466,12 @@ void tipc_bclink_rcv(struct sk_buff *buf)
439 if (msg_destnode(msg) == tipc_own_addr) { 466 if (msg_destnode(msg) == tipc_own_addr) {
440 tipc_bclink_acknowledge(node, msg_bcast_ack(msg)); 467 tipc_bclink_acknowledge(node, msg_bcast_ack(msg));
441 tipc_node_unlock(node); 468 tipc_node_unlock(node);
442 spin_lock_bh(&bc_lock); 469 tipc_bclink_lock();
443 bcl->stats.recv_nacks++; 470 bcl->stats.recv_nacks++;
444 bclink->retransmit_to = node; 471 bclink->retransmit_to = node;
445 bclink_retransmit_pkt(msg_bcgap_after(msg), 472 bclink_retransmit_pkt(msg_bcgap_after(msg),
446 msg_bcgap_to(msg)); 473 msg_bcgap_to(msg));
447 spin_unlock_bh(&bc_lock); 474 tipc_bclink_unlock();
448 } else { 475 } else {
449 tipc_node_unlock(node); 476 tipc_node_unlock(node);
450 bclink_peek_nack(msg); 477 bclink_peek_nack(msg);
@@ -462,20 +489,20 @@ receive:
462 /* Deliver message to destination */ 489 /* Deliver message to destination */
463 490
464 if (likely(msg_isdata(msg))) { 491 if (likely(msg_isdata(msg))) {
465 spin_lock_bh(&bc_lock); 492 tipc_bclink_lock();
466 bclink_accept_pkt(node, seqno); 493 bclink_accept_pkt(node, seqno);
467 spin_unlock_bh(&bc_lock); 494 tipc_bclink_unlock();
468 tipc_node_unlock(node); 495 tipc_node_unlock(node);
469 if (likely(msg_mcast(msg))) 496 if (likely(msg_mcast(msg)))
470 tipc_port_mcast_rcv(buf, NULL); 497 tipc_port_mcast_rcv(buf, NULL);
471 else 498 else
472 kfree_skb(buf); 499 kfree_skb(buf);
473 } else if (msg_user(msg) == MSG_BUNDLER) { 500 } else if (msg_user(msg) == MSG_BUNDLER) {
474 spin_lock_bh(&bc_lock); 501 tipc_bclink_lock();
475 bclink_accept_pkt(node, seqno); 502 bclink_accept_pkt(node, seqno);
476 bcl->stats.recv_bundles++; 503 bcl->stats.recv_bundles++;
477 bcl->stats.recv_bundled += msg_msgcnt(msg); 504 bcl->stats.recv_bundled += msg_msgcnt(msg);
478 spin_unlock_bh(&bc_lock); 505 tipc_bclink_unlock();
479 tipc_node_unlock(node); 506 tipc_node_unlock(node);
480 tipc_link_bundle_rcv(buf); 507 tipc_link_bundle_rcv(buf);
481 } else if (msg_user(msg) == MSG_FRAGMENTER) { 508 } else if (msg_user(msg) == MSG_FRAGMENTER) {
@@ -485,28 +512,28 @@ receive:
485 &buf); 512 &buf);
486 if (ret == LINK_REASM_ERROR) 513 if (ret == LINK_REASM_ERROR)
487 goto unlock; 514 goto unlock;
488 spin_lock_bh(&bc_lock); 515 tipc_bclink_lock();
489 bclink_accept_pkt(node, seqno); 516 bclink_accept_pkt(node, seqno);
490 bcl->stats.recv_fragments++; 517 bcl->stats.recv_fragments++;
491 if (ret == LINK_REASM_COMPLETE) { 518 if (ret == LINK_REASM_COMPLETE) {
492 bcl->stats.recv_fragmented++; 519 bcl->stats.recv_fragmented++;
493 /* Point msg to inner header */ 520 /* Point msg to inner header */
494 msg = buf_msg(buf); 521 msg = buf_msg(buf);
495 spin_unlock_bh(&bc_lock); 522 tipc_bclink_unlock();
496 goto receive; 523 goto receive;
497 } 524 }
498 spin_unlock_bh(&bc_lock); 525 tipc_bclink_unlock();
499 tipc_node_unlock(node); 526 tipc_node_unlock(node);
500 } else if (msg_user(msg) == NAME_DISTRIBUTOR) { 527 } else if (msg_user(msg) == NAME_DISTRIBUTOR) {
501 spin_lock_bh(&bc_lock); 528 tipc_bclink_lock();
502 bclink_accept_pkt(node, seqno); 529 bclink_accept_pkt(node, seqno);
503 spin_unlock_bh(&bc_lock); 530 tipc_bclink_unlock();
504 tipc_node_unlock(node); 531 tipc_node_unlock(node);
505 tipc_named_rcv(buf); 532 tipc_named_rcv(buf);
506 } else { 533 } else {
507 spin_lock_bh(&bc_lock); 534 tipc_bclink_lock();
508 bclink_accept_pkt(node, seqno); 535 bclink_accept_pkt(node, seqno);
509 spin_unlock_bh(&bc_lock); 536 tipc_bclink_unlock();
510 tipc_node_unlock(node); 537 tipc_node_unlock(node);
511 kfree_skb(buf); 538 kfree_skb(buf);
512 } 539 }
@@ -552,14 +579,14 @@ receive:
552 } else 579 } else
553 deferred = 0; 580 deferred = 0;
554 581
555 spin_lock_bh(&bc_lock); 582 tipc_bclink_lock();
556 583
557 if (deferred) 584 if (deferred)
558 bcl->stats.deferred_recv++; 585 bcl->stats.deferred_recv++;
559 else 586 else
560 bcl->stats.duplicates++; 587 bcl->stats.duplicates++;
561 588
562 spin_unlock_bh(&bc_lock); 589 tipc_bclink_unlock();
563 590
564unlock: 591unlock:
565 tipc_node_unlock(node); 592 tipc_node_unlock(node);
@@ -663,7 +690,7 @@ void tipc_bcbearer_sort(struct tipc_node_map *nm_ptr, u32 node, bool action)
663 int b_index; 690 int b_index;
664 int pri; 691 int pri;
665 692
666 spin_lock_bh(&bc_lock); 693 tipc_bclink_lock();
667 694
668 if (action) 695 if (action)
669 tipc_nmap_add(nm_ptr, node); 696 tipc_nmap_add(nm_ptr, node);
@@ -710,7 +737,7 @@ void tipc_bcbearer_sort(struct tipc_node_map *nm_ptr, u32 node, bool action)
710 bp_curr++; 737 bp_curr++;
711 } 738 }
712 739
713 spin_unlock_bh(&bc_lock); 740 tipc_bclink_unlock();
714} 741}
715 742
716 743
@@ -722,7 +749,7 @@ int tipc_bclink_stats(char *buf, const u32 buf_size)
722 if (!bcl) 749 if (!bcl)
723 return 0; 750 return 0;
724 751
725 spin_lock_bh(&bc_lock); 752 tipc_bclink_lock();
726 753
727 s = &bcl->stats; 754 s = &bcl->stats;
728 755
@@ -751,7 +778,7 @@ int tipc_bclink_stats(char *buf, const u32 buf_size)
751 s->queue_sz_counts ? 778 s->queue_sz_counts ?
752 (s->accu_queue_sz / s->queue_sz_counts) : 0); 779 (s->accu_queue_sz / s->queue_sz_counts) : 0);
753 780
754 spin_unlock_bh(&bc_lock); 781 tipc_bclink_unlock();
755 return ret; 782 return ret;
756} 783}
757 784
@@ -760,9 +787,9 @@ int tipc_bclink_reset_stats(void)
760 if (!bcl) 787 if (!bcl)
761 return -ENOPROTOOPT; 788 return -ENOPROTOOPT;
762 789
763 spin_lock_bh(&bc_lock); 790 tipc_bclink_lock();
764 memset(&bcl->stats, 0, sizeof(bcl->stats)); 791 memset(&bcl->stats, 0, sizeof(bcl->stats));
765 spin_unlock_bh(&bc_lock); 792 tipc_bclink_unlock();
766 return 0; 793 return 0;
767} 794}
768 795
@@ -773,18 +800,30 @@ int tipc_bclink_set_queue_limits(u32 limit)
773 if ((limit < TIPC_MIN_LINK_WIN) || (limit > TIPC_MAX_LINK_WIN)) 800 if ((limit < TIPC_MIN_LINK_WIN) || (limit > TIPC_MAX_LINK_WIN))
774 return -EINVAL; 801 return -EINVAL;
775 802
776 spin_lock_bh(&bc_lock); 803 tipc_bclink_lock();
777 tipc_link_set_queue_limits(bcl, limit); 804 tipc_link_set_queue_limits(bcl, limit);
778 spin_unlock_bh(&bc_lock); 805 tipc_bclink_unlock();
779 return 0; 806 return 0;
780} 807}
781 808
782void tipc_bclink_init(void) 809int tipc_bclink_init(void)
783{ 810{
811 bcbearer = kzalloc(sizeof(*bcbearer), GFP_ATOMIC);
812 if (!bcbearer)
813 return -ENOMEM;
814
815 bclink = kzalloc(sizeof(*bclink), GFP_ATOMIC);
816 if (!bclink) {
817 kfree(bcbearer);
818 return -ENOMEM;
819 }
820
821 bcl = &bclink->link;
784 bcbearer->bearer.media = &bcbearer->media; 822 bcbearer->bearer.media = &bcbearer->media;
785 bcbearer->media.send_msg = tipc_bcbearer_send; 823 bcbearer->media.send_msg = tipc_bcbearer_send;
786 sprintf(bcbearer->media.name, "tipc-broadcast"); 824 sprintf(bcbearer->media.name, "tipc-broadcast");
787 825
826 spin_lock_init(&bclink->lock);
788 INIT_LIST_HEAD(&bcl->waiting_ports); 827 INIT_LIST_HEAD(&bcl->waiting_ports);
789 bcl->next_out_no = 1; 828 bcl->next_out_no = 1;
790 spin_lock_init(&bclink->node.lock); 829 spin_lock_init(&bclink->node.lock);
@@ -795,17 +834,19 @@ void tipc_bclink_init(void)
795 rcu_assign_pointer(bearer_list[MAX_BEARERS], &bcbearer->bearer); 834 rcu_assign_pointer(bearer_list[MAX_BEARERS], &bcbearer->bearer);
796 bcl->state = WORKING_WORKING; 835 bcl->state = WORKING_WORKING;
797 strlcpy(bcl->name, tipc_bclink_name, TIPC_MAX_LINK_NAME); 836 strlcpy(bcl->name, tipc_bclink_name, TIPC_MAX_LINK_NAME);
837 return 0;
798} 838}
799 839
800void tipc_bclink_stop(void) 840void tipc_bclink_stop(void)
801{ 841{
802 spin_lock_bh(&bc_lock); 842 tipc_bclink_lock();
803 tipc_link_purge_queues(bcl); 843 tipc_link_purge_queues(bcl);
804 spin_unlock_bh(&bc_lock); 844 tipc_bclink_unlock();
805 845
806 RCU_INIT_POINTER(bearer_list[BCBEARER], NULL); 846 RCU_INIT_POINTER(bearer_list[BCBEARER], NULL);
807 memset(bclink, 0, sizeof(*bclink)); 847 synchronize_net();
808 memset(bcbearer, 0, sizeof(*bcbearer)); 848 kfree(bcbearer);
849 kfree(bclink);
809} 850}
810 851
811/** 852/**
diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h
index 7c1ef1b3d7b3..00330c45df3e 100644
--- a/net/tipc/bcast.h
+++ b/net/tipc/bcast.h
@@ -39,6 +39,7 @@
39 39
40#define MAX_NODES 4096 40#define MAX_NODES 4096
41#define WSIZE 32 41#define WSIZE 32
42#define TIPC_BCLINK_RESET 1
42 43
43/** 44/**
44 * struct tipc_node_map - set of node identifiers 45 * struct tipc_node_map - set of node identifiers
@@ -81,8 +82,9 @@ static inline int tipc_nmap_equal(struct tipc_node_map *nm_a,
81void tipc_port_list_add(struct tipc_port_list *pl_ptr, u32 port); 82void tipc_port_list_add(struct tipc_port_list *pl_ptr, u32 port);
82void tipc_port_list_free(struct tipc_port_list *pl_ptr); 83void tipc_port_list_free(struct tipc_port_list *pl_ptr);
83 84
84void tipc_bclink_init(void); 85int tipc_bclink_init(void);
85void tipc_bclink_stop(void); 86void tipc_bclink_stop(void);
87void tipc_bclink_set_flags(unsigned int flags);
86void tipc_bclink_add_node(u32 addr); 88void tipc_bclink_add_node(u32 addr);
87void tipc_bclink_remove_node(u32 addr); 89void tipc_bclink_remove_node(u32 addr);
88struct tipc_node *tipc_bclink_retransmit_to(void); 90struct tipc_node *tipc_bclink_retransmit_to(void);
diff --git a/net/tipc/config.c b/net/tipc/config.c
index 251f5a2028e4..2b42403ad33a 100644
--- a/net/tipc/config.c
+++ b/net/tipc/config.c
@@ -177,8 +177,10 @@ static struct sk_buff *cfg_set_own_addr(void)
177 if (tipc_own_addr) 177 if (tipc_own_addr)
178 return tipc_cfg_reply_error_string(TIPC_CFG_NOT_SUPPORTED 178 return tipc_cfg_reply_error_string(TIPC_CFG_NOT_SUPPORTED
179 " (cannot change node address once assigned)"); 179 " (cannot change node address once assigned)");
180 tipc_net_start(addr); 180 if (!tipc_net_start(addr))
181 return tipc_cfg_reply_none(); 181 return tipc_cfg_reply_none();
182
183 return tipc_cfg_reply_error_string("cannot change to network mode");
182} 184}
183 185
184static struct sk_buff *cfg_set_max_ports(void) 186static struct sk_buff *cfg_set_max_ports(void)
diff --git a/net/tipc/core.c b/net/tipc/core.c
index 50d57429ebca..57f8ae9aa466 100644
--- a/net/tipc/core.c
+++ b/net/tipc/core.c
@@ -80,7 +80,6 @@ struct sk_buff *tipc_buf_acquire(u32 size)
80 */ 80 */
81static void tipc_core_stop(void) 81static void tipc_core_stop(void)
82{ 82{
83 tipc_handler_stop();
84 tipc_net_stop(); 83 tipc_net_stop();
85 tipc_bearer_cleanup(); 84 tipc_bearer_cleanup();
86 tipc_netlink_stop(); 85 tipc_netlink_stop();
@@ -100,10 +99,6 @@ static int tipc_core_start(void)
100 99
101 get_random_bytes(&tipc_random, sizeof(tipc_random)); 100 get_random_bytes(&tipc_random, sizeof(tipc_random));
102 101
103 err = tipc_handler_start();
104 if (err)
105 goto out_handler;
106
107 err = tipc_ref_table_init(tipc_max_ports, tipc_random); 102 err = tipc_ref_table_init(tipc_max_ports, tipc_random);
108 if (err) 103 if (err)
109 goto out_reftbl; 104 goto out_reftbl;
@@ -146,8 +141,6 @@ out_netlink:
146out_nametbl: 141out_nametbl:
147 tipc_ref_table_stop(); 142 tipc_ref_table_stop();
148out_reftbl: 143out_reftbl:
149 tipc_handler_stop();
150out_handler:
151 return err; 144 return err;
152} 145}
153 146
diff --git a/net/tipc/core.h b/net/tipc/core.h
index 36cbf158845f..ae55d37267e6 100644
--- a/net/tipc/core.h
+++ b/net/tipc/core.h
@@ -89,8 +89,6 @@ extern int tipc_random __read_mostly;
89/* 89/*
90 * Routines available to privileged subsystems 90 * Routines available to privileged subsystems
91 */ 91 */
92int tipc_handler_start(void);
93void tipc_handler_stop(void);
94int tipc_netlink_start(void); 92int tipc_netlink_start(void);
95void tipc_netlink_stop(void); 93void tipc_netlink_stop(void);
96int tipc_socket_init(void); 94int tipc_socket_init(void);
@@ -109,12 +107,10 @@ void tipc_unregister_sysctl(void);
109#endif 107#endif
110 108
111/* 109/*
112 * TIPC timer and signal code 110 * TIPC timer code
113 */ 111 */
114typedef void (*Handler) (unsigned long); 112typedef void (*Handler) (unsigned long);
115 113
116u32 tipc_k_signal(Handler routine, unsigned long argument);
117
118/** 114/**
119 * k_init_timer - initialize a timer 115 * k_init_timer - initialize a timer
120 * @timer: pointer to timer structure 116 * @timer: pointer to timer structure
diff --git a/net/tipc/handler.c b/net/tipc/handler.c
deleted file mode 100644
index 1fabf160501f..000000000000
--- a/net/tipc/handler.c
+++ /dev/null
@@ -1,134 +0,0 @@
1/*
2 * net/tipc/handler.c: TIPC signal handling
3 *
4 * Copyright (c) 2000-2006, Ericsson AB
5 * Copyright (c) 2005, Wind River Systems
6 * All rights reserved.
7 *
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions are met:
10 *
11 * 1. Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * 2. Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in the
15 * documentation and/or other materials provided with the distribution.
16 * 3. Neither the names of the copyright holders nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * Alternatively, this software may be distributed under the terms of the
21 * GNU General Public License ("GPL") version 2 as published by the Free
22 * Software Foundation.
23 *
24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
25 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
26 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
27 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
28 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
29 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
30 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
31 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
32 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
33 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
34 * POSSIBILITY OF SUCH DAMAGE.
35 */
36
37#include "core.h"
38
39struct queue_item {
40 struct list_head next_signal;
41 void (*handler) (unsigned long);
42 unsigned long data;
43};
44
45static struct kmem_cache *tipc_queue_item_cache;
46static struct list_head signal_queue_head;
47static DEFINE_SPINLOCK(qitem_lock);
48static int handler_enabled __read_mostly;
49
50static void process_signal_queue(unsigned long dummy);
51
52static DECLARE_TASKLET_DISABLED(tipc_tasklet, process_signal_queue, 0);
53
54
55unsigned int tipc_k_signal(Handler routine, unsigned long argument)
56{
57 struct queue_item *item;
58
59 spin_lock_bh(&qitem_lock);
60 if (!handler_enabled) {
61 spin_unlock_bh(&qitem_lock);
62 return -ENOPROTOOPT;
63 }
64
65 item = kmem_cache_alloc(tipc_queue_item_cache, GFP_ATOMIC);
66 if (!item) {
67 pr_err("Signal queue out of memory\n");
68 spin_unlock_bh(&qitem_lock);
69 return -ENOMEM;
70 }
71 item->handler = routine;
72 item->data = argument;
73 list_add_tail(&item->next_signal, &signal_queue_head);
74 spin_unlock_bh(&qitem_lock);
75 tasklet_schedule(&tipc_tasklet);
76 return 0;
77}
78
79static void process_signal_queue(unsigned long dummy)
80{
81 struct queue_item *__volatile__ item;
82 struct list_head *l, *n;
83
84 spin_lock_bh(&qitem_lock);
85 list_for_each_safe(l, n, &signal_queue_head) {
86 item = list_entry(l, struct queue_item, next_signal);
87 list_del(&item->next_signal);
88 spin_unlock_bh(&qitem_lock);
89 item->handler(item->data);
90 spin_lock_bh(&qitem_lock);
91 kmem_cache_free(tipc_queue_item_cache, item);
92 }
93 spin_unlock_bh(&qitem_lock);
94}
95
96int tipc_handler_start(void)
97{
98 tipc_queue_item_cache =
99 kmem_cache_create("tipc_queue_items", sizeof(struct queue_item),
100 0, SLAB_HWCACHE_ALIGN, NULL);
101 if (!tipc_queue_item_cache)
102 return -ENOMEM;
103
104 INIT_LIST_HEAD(&signal_queue_head);
105 tasklet_enable(&tipc_tasklet);
106 handler_enabled = 1;
107 return 0;
108}
109
110void tipc_handler_stop(void)
111{
112 struct list_head *l, *n;
113 struct queue_item *item;
114
115 spin_lock_bh(&qitem_lock);
116 if (!handler_enabled) {
117 spin_unlock_bh(&qitem_lock);
118 return;
119 }
120 handler_enabled = 0;
121 spin_unlock_bh(&qitem_lock);
122
123 tasklet_kill(&tipc_tasklet);
124
125 spin_lock_bh(&qitem_lock);
126 list_for_each_safe(l, n, &signal_queue_head) {
127 item = list_entry(l, struct queue_item, next_signal);
128 list_del(&item->next_signal);
129 kmem_cache_free(tipc_queue_item_cache, item);
130 }
131 spin_unlock_bh(&qitem_lock);
132
133 kmem_cache_destroy(tipc_queue_item_cache);
134}
diff --git a/net/tipc/link.c b/net/tipc/link.c
index c723ee90219d..dce2bef81720 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -297,14 +297,14 @@ void tipc_link_delete_list(unsigned int bearer_id, bool shutting_down)
297 297
298 rcu_read_lock(); 298 rcu_read_lock();
299 list_for_each_entry_rcu(n_ptr, &tipc_node_list, list) { 299 list_for_each_entry_rcu(n_ptr, &tipc_node_list, list) {
300 spin_lock_bh(&n_ptr->lock); 300 tipc_node_lock(n_ptr);
301 l_ptr = n_ptr->links[bearer_id]; 301 l_ptr = n_ptr->links[bearer_id];
302 if (l_ptr) { 302 if (l_ptr) {
303 tipc_link_reset(l_ptr); 303 tipc_link_reset(l_ptr);
304 if (shutting_down || !tipc_node_is_up(n_ptr)) { 304 if (shutting_down || !tipc_node_is_up(n_ptr)) {
305 tipc_node_detach_link(l_ptr->owner, l_ptr); 305 tipc_node_detach_link(l_ptr->owner, l_ptr);
306 tipc_link_reset_fragments(l_ptr); 306 tipc_link_reset_fragments(l_ptr);
307 spin_unlock_bh(&n_ptr->lock); 307 tipc_node_unlock(n_ptr);
308 308
309 /* Nobody else can access this link now: */ 309 /* Nobody else can access this link now: */
310 del_timer_sync(&l_ptr->timer); 310 del_timer_sync(&l_ptr->timer);
@@ -312,12 +312,12 @@ void tipc_link_delete_list(unsigned int bearer_id, bool shutting_down)
312 } else { 312 } else {
313 /* Detach/delete when failover is finished: */ 313 /* Detach/delete when failover is finished: */
314 l_ptr->flags |= LINK_STOPPED; 314 l_ptr->flags |= LINK_STOPPED;
315 spin_unlock_bh(&n_ptr->lock); 315 tipc_node_unlock(n_ptr);
316 del_timer_sync(&l_ptr->timer); 316 del_timer_sync(&l_ptr->timer);
317 } 317 }
318 continue; 318 continue;
319 } 319 }
320 spin_unlock_bh(&n_ptr->lock); 320 tipc_node_unlock(n_ptr);
321 } 321 }
322 rcu_read_unlock(); 322 rcu_read_unlock();
323} 323}
@@ -474,11 +474,11 @@ void tipc_link_reset_list(unsigned int bearer_id)
474 474
475 rcu_read_lock(); 475 rcu_read_lock();
476 list_for_each_entry_rcu(n_ptr, &tipc_node_list, list) { 476 list_for_each_entry_rcu(n_ptr, &tipc_node_list, list) {
477 spin_lock_bh(&n_ptr->lock); 477 tipc_node_lock(n_ptr);
478 l_ptr = n_ptr->links[bearer_id]; 478 l_ptr = n_ptr->links[bearer_id];
479 if (l_ptr) 479 if (l_ptr)
480 tipc_link_reset(l_ptr); 480 tipc_link_reset(l_ptr);
481 spin_unlock_bh(&n_ptr->lock); 481 tipc_node_unlock(n_ptr);
482 } 482 }
483 rcu_read_unlock(); 483 rcu_read_unlock();
484} 484}
@@ -1259,29 +1259,24 @@ void tipc_link_push_queue(struct tipc_link *l_ptr)
1259 } while (!res); 1259 } while (!res);
1260} 1260}
1261 1261
1262static void link_reset_all(unsigned long addr) 1262void tipc_link_reset_all(struct tipc_node *node)
1263{ 1263{
1264 struct tipc_node *n_ptr;
1265 char addr_string[16]; 1264 char addr_string[16];
1266 u32 i; 1265 u32 i;
1267 1266
1268 n_ptr = tipc_node_find((u32)addr); 1267 tipc_node_lock(node);
1269 if (!n_ptr)
1270 return; /* node no longer exists */
1271
1272 tipc_node_lock(n_ptr);
1273 1268
1274 pr_warn("Resetting all links to %s\n", 1269 pr_warn("Resetting all links to %s\n",
1275 tipc_addr_string_fill(addr_string, n_ptr->addr)); 1270 tipc_addr_string_fill(addr_string, node->addr));
1276 1271
1277 for (i = 0; i < MAX_BEARERS; i++) { 1272 for (i = 0; i < MAX_BEARERS; i++) {
1278 if (n_ptr->links[i]) { 1273 if (node->links[i]) {
1279 link_print(n_ptr->links[i], "Resetting link\n"); 1274 link_print(node->links[i], "Resetting link\n");
1280 tipc_link_reset(n_ptr->links[i]); 1275 tipc_link_reset(node->links[i]);
1281 } 1276 }
1282 } 1277 }
1283 1278
1284 tipc_node_unlock(n_ptr); 1279 tipc_node_unlock(node);
1285} 1280}
1286 1281
1287static void link_retransmit_failure(struct tipc_link *l_ptr, 1282static void link_retransmit_failure(struct tipc_link *l_ptr,
@@ -1318,10 +1313,9 @@ static void link_retransmit_failure(struct tipc_link *l_ptr,
1318 n_ptr->bclink.oos_state, 1313 n_ptr->bclink.oos_state,
1319 n_ptr->bclink.last_sent); 1314 n_ptr->bclink.last_sent);
1320 1315
1321 tipc_k_signal((Handler)link_reset_all, (unsigned long)n_ptr->addr);
1322
1323 tipc_node_unlock(n_ptr); 1316 tipc_node_unlock(n_ptr);
1324 1317
1318 tipc_bclink_set_flags(TIPC_BCLINK_RESET);
1325 l_ptr->stale_count = 0; 1319 l_ptr->stale_count = 0;
1326 } 1320 }
1327} 1321}
@@ -1495,14 +1489,14 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr)
1495 goto unlock_discard; 1489 goto unlock_discard;
1496 1490
1497 /* Verify that communication with node is currently allowed */ 1491 /* Verify that communication with node is currently allowed */
1498 if ((n_ptr->block_setup & WAIT_PEER_DOWN) && 1492 if ((n_ptr->flags & TIPC_NODE_DOWN) &&
1499 msg_user(msg) == LINK_PROTOCOL && 1493 msg_user(msg) == LINK_PROTOCOL &&
1500 (msg_type(msg) == RESET_MSG || 1494 (msg_type(msg) == RESET_MSG ||
1501 msg_type(msg) == ACTIVATE_MSG) && 1495 msg_type(msg) == ACTIVATE_MSG) &&
1502 !msg_redundant_link(msg)) 1496 !msg_redundant_link(msg))
1503 n_ptr->block_setup &= ~WAIT_PEER_DOWN; 1497 n_ptr->flags &= ~TIPC_NODE_DOWN;
1504 1498
1505 if (n_ptr->block_setup) 1499 if (tipc_node_blocked(n_ptr))
1506 goto unlock_discard; 1500 goto unlock_discard;
1507 1501
1508 /* Validate message sequence number info */ 1502 /* Validate message sequence number info */
@@ -1744,7 +1738,7 @@ void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int probe_msg,
1744 return; 1738 return;
1745 1739
1746 /* Abort non-RESET send if communication with node is prohibited */ 1740 /* Abort non-RESET send if communication with node is prohibited */
1747 if ((l_ptr->owner->block_setup) && (msg_typ != RESET_MSG)) 1741 if ((tipc_node_blocked(l_ptr->owner)) && (msg_typ != RESET_MSG))
1748 return; 1742 return;
1749 1743
1750 /* Create protocol message with "out-of-sequence" sequence number */ 1744 /* Create protocol message with "out-of-sequence" sequence number */
@@ -1859,7 +1853,7 @@ static void tipc_link_proto_rcv(struct tipc_link *l_ptr, struct sk_buff *buf)
1859 * peer has lost contact -- don't allow peer's links 1853 * peer has lost contact -- don't allow peer's links
1860 * to reactivate before we recognize loss & clean up 1854 * to reactivate before we recognize loss & clean up
1861 */ 1855 */
1862 l_ptr->owner->block_setup = WAIT_NODE_DOWN; 1856 l_ptr->owner->flags = TIPC_NODE_RESET;
1863 } 1857 }
1864 1858
1865 link_state_event(l_ptr, RESET_MSG); 1859 link_state_event(l_ptr, RESET_MSG);
diff --git a/net/tipc/link.h b/net/tipc/link.h
index 4b556c181bae..7ba73fa6b81e 100644
--- a/net/tipc/link.h
+++ b/net/tipc/link.h
@@ -230,6 +230,7 @@ struct sk_buff *tipc_link_cmd_show_stats(const void *req_tlv_area,
230 int req_tlv_space); 230 int req_tlv_space);
231struct sk_buff *tipc_link_cmd_reset_stats(const void *req_tlv_area, 231struct sk_buff *tipc_link_cmd_reset_stats(const void *req_tlv_area,
232 int req_tlv_space); 232 int req_tlv_space);
233void tipc_link_reset_all(struct tipc_node *node);
233void tipc_link_reset(struct tipc_link *l_ptr); 234void tipc_link_reset(struct tipc_link *l_ptr);
234void tipc_link_reset_list(unsigned int bearer_id); 235void tipc_link_reset_list(unsigned int bearer_id);
235int tipc_link_xmit(struct sk_buff *buf, u32 dest, u32 selector); 236int tipc_link_xmit(struct sk_buff *buf, u32 dest, u32 selector);
diff --git a/net/tipc/name_distr.c b/net/tipc/name_distr.c
index 974a73f3d876..8ce730984aa1 100644
--- a/net/tipc/name_distr.c
+++ b/net/tipc/name_distr.c
@@ -38,34 +38,6 @@
38#include "link.h" 38#include "link.h"
39#include "name_distr.h" 39#include "name_distr.h"
40 40
41#define ITEM_SIZE sizeof(struct distr_item)
42
43/**
44 * struct distr_item - publication info distributed to other nodes
45 * @type: name sequence type
46 * @lower: name sequence lower bound
47 * @upper: name sequence upper bound
48 * @ref: publishing port reference
49 * @key: publication key
50 *
51 * ===> All fields are stored in network byte order. <===
52 *
53 * First 3 fields identify (name or) name sequence being published.
54 * Reference field uniquely identifies port that published name sequence.
55 * Key field uniquely identifies publication, in the event a port has
56 * multiple publications of the same name sequence.
57 *
58 * Note: There is no field that identifies the publishing node because it is
59 * the same for all items contained within a publication message.
60 */
61struct distr_item {
62 __be32 type;
63 __be32 lower;
64 __be32 upper;
65 __be32 ref;
66 __be32 key;
67};
68
69/** 41/**
70 * struct publ_list - list of publications made by this node 42 * struct publ_list - list of publications made by this node
71 * @list: circular list of publications 43 * @list: circular list of publications
@@ -135,18 +107,18 @@ void named_cluster_distribute(struct sk_buff *buf)
135 107
136 rcu_read_lock(); 108 rcu_read_lock();
137 list_for_each_entry_rcu(n_ptr, &tipc_node_list, list) { 109 list_for_each_entry_rcu(n_ptr, &tipc_node_list, list) {
138 spin_lock_bh(&n_ptr->lock); 110 tipc_node_lock(n_ptr);
139 l_ptr = n_ptr->active_links[n_ptr->addr & 1]; 111 l_ptr = n_ptr->active_links[n_ptr->addr & 1];
140 if (l_ptr) { 112 if (l_ptr) {
141 buf_copy = skb_copy(buf, GFP_ATOMIC); 113 buf_copy = skb_copy(buf, GFP_ATOMIC);
142 if (!buf_copy) { 114 if (!buf_copy) {
143 spin_unlock_bh(&n_ptr->lock); 115 tipc_node_unlock(n_ptr);
144 break; 116 break;
145 } 117 }
146 msg_set_destnode(buf_msg(buf_copy), n_ptr->addr); 118 msg_set_destnode(buf_msg(buf_copy), n_ptr->addr);
147 __tipc_link_xmit(l_ptr, buf_copy); 119 __tipc_link_xmit(l_ptr, buf_copy);
148 } 120 }
149 spin_unlock_bh(&n_ptr->lock); 121 tipc_node_unlock(n_ptr);
150 } 122 }
151 rcu_read_unlock(); 123 rcu_read_unlock();
152 124
@@ -239,29 +211,9 @@ static void named_distribute(struct list_head *message_list, u32 node,
239/** 211/**
240 * tipc_named_node_up - tell specified node about all publications by this node 212 * tipc_named_node_up - tell specified node about all publications by this node
241 */ 213 */
242void tipc_named_node_up(unsigned long nodearg) 214void tipc_named_node_up(u32 max_item_buf, u32 node)
243{ 215{
244 struct tipc_node *n_ptr; 216 LIST_HEAD(message_list);
245 struct tipc_link *l_ptr;
246 struct list_head message_list;
247 u32 node = (u32)nodearg;
248 u32 max_item_buf = 0;
249
250 /* compute maximum amount of publication data to send per message */
251 n_ptr = tipc_node_find(node);
252 if (n_ptr) {
253 tipc_node_lock(n_ptr);
254 l_ptr = n_ptr->active_links[0];
255 if (l_ptr)
256 max_item_buf = ((l_ptr->max_pkt - INT_H_SIZE) /
257 ITEM_SIZE) * ITEM_SIZE;
258 tipc_node_unlock(n_ptr);
259 }
260 if (!max_item_buf)
261 return;
262
263 /* create list of publication messages, then send them as a unit */
264 INIT_LIST_HEAD(&message_list);
265 217
266 read_lock_bh(&tipc_nametbl_lock); 218 read_lock_bh(&tipc_nametbl_lock);
267 named_distribute(&message_list, node, &publ_cluster, max_item_buf); 219 named_distribute(&message_list, node, &publ_cluster, max_item_buf);
diff --git a/net/tipc/name_distr.h b/net/tipc/name_distr.h
index 47ff829f9361..b2eed4ec1526 100644
--- a/net/tipc/name_distr.h
+++ b/net/tipc/name_distr.h
@@ -39,10 +39,38 @@
39 39
40#include "name_table.h" 40#include "name_table.h"
41 41
42#define ITEM_SIZE sizeof(struct distr_item)
43
44/**
45 * struct distr_item - publication info distributed to other nodes
46 * @type: name sequence type
47 * @lower: name sequence lower bound
48 * @upper: name sequence upper bound
49 * @ref: publishing port reference
50 * @key: publication key
51 *
52 * ===> All fields are stored in network byte order. <===
53 *
54 * First 3 fields identify (name or) name sequence being published.
55 * Reference field uniquely identifies port that published name sequence.
56 * Key field uniquely identifies publication, in the event a port has
57 * multiple publications of the same name sequence.
58 *
59 * Note: There is no field that identifies the publishing node because it is
60 * the same for all items contained within a publication message.
61 */
62struct distr_item {
63 __be32 type;
64 __be32 lower;
65 __be32 upper;
66 __be32 ref;
67 __be32 key;
68};
69
42struct sk_buff *tipc_named_publish(struct publication *publ); 70struct sk_buff *tipc_named_publish(struct publication *publ);
43struct sk_buff *tipc_named_withdraw(struct publication *publ); 71struct sk_buff *tipc_named_withdraw(struct publication *publ);
44void named_cluster_distribute(struct sk_buff *buf); 72void named_cluster_distribute(struct sk_buff *buf);
45void tipc_named_node_up(unsigned long node); 73void tipc_named_node_up(u32 max_item_buf, u32 node);
46void tipc_named_rcv(struct sk_buff *buf); 74void tipc_named_rcv(struct sk_buff *buf);
47void tipc_named_reinit(void); 75void tipc_named_reinit(void);
48 76
diff --git a/net/tipc/net.c b/net/tipc/net.c
index 75bb39025d53..f8fc95d58c0d 100644
--- a/net/tipc/net.c
+++ b/net/tipc/net.c
@@ -164,20 +164,25 @@ void tipc_net_route_msg(struct sk_buff *buf)
164 tipc_link_xmit(buf, dnode, msg_link_selector(msg)); 164 tipc_link_xmit(buf, dnode, msg_link_selector(msg));
165} 165}
166 166
167void tipc_net_start(u32 addr) 167int tipc_net_start(u32 addr)
168{ 168{
169 char addr_string[16]; 169 char addr_string[16];
170 int res;
170 171
171 tipc_own_addr = addr; 172 tipc_own_addr = addr;
172 tipc_named_reinit(); 173 tipc_named_reinit();
173 tipc_port_reinit(); 174 tipc_port_reinit();
174 tipc_bclink_init(); 175 res = tipc_bclink_init();
176 if (res)
177 return res;
178
175 tipc_nametbl_publish(TIPC_CFG_SRV, tipc_own_addr, tipc_own_addr, 179 tipc_nametbl_publish(TIPC_CFG_SRV, tipc_own_addr, tipc_own_addr,
176 TIPC_ZONE_SCOPE, 0, tipc_own_addr); 180 TIPC_ZONE_SCOPE, 0, tipc_own_addr);
177 181
178 pr_info("Started in network mode\n"); 182 pr_info("Started in network mode\n");
179 pr_info("Own node address %s, network identity %u\n", 183 pr_info("Own node address %s, network identity %u\n",
180 tipc_addr_string_fill(addr_string, tipc_own_addr), tipc_net_id); 184 tipc_addr_string_fill(addr_string, tipc_own_addr), tipc_net_id);
185 return 0;
181} 186}
182 187
183void tipc_net_stop(void) 188void tipc_net_stop(void)
diff --git a/net/tipc/net.h b/net/tipc/net.h
index f781cae8df4b..c6c2b46f7c28 100644
--- a/net/tipc/net.h
+++ b/net/tipc/net.h
@@ -39,7 +39,7 @@
39 39
40void tipc_net_route_msg(struct sk_buff *buf); 40void tipc_net_route_msg(struct sk_buff *buf);
41 41
42void tipc_net_start(u32 addr); 42int tipc_net_start(u32 addr);
43void tipc_net_stop(void); 43void tipc_net_stop(void);
44 44
45#endif 45#endif
diff --git a/net/tipc/node.c b/net/tipc/node.c
index 6d6543e88c2c..74efebc1cb7a 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -108,7 +108,7 @@ struct tipc_node *tipc_node_create(u32 addr)
108 break; 108 break;
109 } 109 }
110 list_add_tail_rcu(&n_ptr->list, &temp_node->list); 110 list_add_tail_rcu(&n_ptr->list, &temp_node->list);
111 n_ptr->block_setup = WAIT_PEER_DOWN; 111 n_ptr->flags = TIPC_NODE_DOWN;
112 n_ptr->signature = INVALID_NODE_SIG; 112 n_ptr->signature = INVALID_NODE_SIG;
113 113
114 tipc_num_nodes++; 114 tipc_num_nodes++;
@@ -267,24 +267,12 @@ void tipc_node_detach_link(struct tipc_node *n_ptr, struct tipc_link *l_ptr)
267 267
268static void node_established_contact(struct tipc_node *n_ptr) 268static void node_established_contact(struct tipc_node *n_ptr)
269{ 269{
270 tipc_k_signal((Handler)tipc_named_node_up, n_ptr->addr); 270 n_ptr->flags |= TIPC_NODE_UP;
271 n_ptr->bclink.oos_state = 0; 271 n_ptr->bclink.oos_state = 0;
272 n_ptr->bclink.acked = tipc_bclink_get_last_sent(); 272 n_ptr->bclink.acked = tipc_bclink_get_last_sent();
273 tipc_bclink_add_node(n_ptr->addr); 273 tipc_bclink_add_node(n_ptr->addr);
274} 274}
275 275
276static void node_name_purge_complete(unsigned long node_addr)
277{
278 struct tipc_node *n_ptr;
279
280 n_ptr = tipc_node_find(node_addr);
281 if (n_ptr) {
282 tipc_node_lock(n_ptr);
283 n_ptr->block_setup &= ~WAIT_NAMES_GONE;
284 tipc_node_unlock(n_ptr);
285 }
286}
287
288static void node_lost_contact(struct tipc_node *n_ptr) 276static void node_lost_contact(struct tipc_node *n_ptr)
289{ 277{
290 char addr_string[16]; 278 char addr_string[16];
@@ -320,12 +308,10 @@ static void node_lost_contact(struct tipc_node *n_ptr)
320 tipc_link_reset_fragments(l_ptr); 308 tipc_link_reset_fragments(l_ptr);
321 } 309 }
322 310
323 /* Notify subscribers */ 311 /* Notify subscribers and prevent re-contact with node until
324 tipc_nodesub_notify(n_ptr); 312 * cleanup is done.
325 313 */
326 /* Prevent re-contact with node until cleanup is done */ 314 n_ptr->flags = TIPC_NODE_DOWN | TIPC_NODE_LOST;
327 n_ptr->block_setup = WAIT_PEER_DOWN | WAIT_NAMES_GONE;
328 tipc_k_signal((Handler)node_name_purge_complete, n_ptr->addr);
329} 315}
330 316
331struct sk_buff *tipc_node_get_nodes(const void *req_tlv_area, int req_tlv_space) 317struct sk_buff *tipc_node_get_nodes(const void *req_tlv_area, int req_tlv_space)
@@ -465,3 +451,36 @@ int tipc_node_get_linkname(u32 bearer_id, u32 addr, char *linkname, size_t len)
465 tipc_node_unlock(node); 451 tipc_node_unlock(node);
466 return -EINVAL; 452 return -EINVAL;
467} 453}
454
455void tipc_node_unlock(struct tipc_node *node)
456{
457 LIST_HEAD(nsub_list);
458 struct tipc_link *link;
459 int pkt_sz = 0;
460 u32 addr = 0;
461
462 if (likely(!node->flags)) {
463 spin_unlock_bh(&node->lock);
464 return;
465 }
466
467 if (node->flags & TIPC_NODE_LOST) {
468 list_replace_init(&node->nsub, &nsub_list);
469 node->flags &= ~TIPC_NODE_LOST;
470 }
471 if (node->flags & TIPC_NODE_UP) {
472 link = node->active_links[0];
473 node->flags &= ~TIPC_NODE_UP;
474 if (link) {
475 pkt_sz = ((link->max_pkt - INT_H_SIZE) / ITEM_SIZE) *
476 ITEM_SIZE;
477 addr = node->addr;
478 }
479 }
480 spin_unlock_bh(&node->lock);
481
482 if (!list_empty(&nsub_list))
483 tipc_nodesub_notify(&nsub_list);
484 if (pkt_sz)
485 tipc_named_node_up(pkt_sz, addr);
486}
diff --git a/net/tipc/node.h b/net/tipc/node.h
index 411b19114064..38f710fb75dc 100644
--- a/net/tipc/node.h
+++ b/net/tipc/node.h
@@ -47,62 +47,78 @@
47 */ 47 */
48#define INVALID_NODE_SIG 0x10000 48#define INVALID_NODE_SIG 0x10000
49 49
50/* Flags used to block (re)establishment of contact with a neighboring node */ 50/* Flags used to block (re)establishment of contact with a neighboring node
51#define WAIT_PEER_DOWN 0x0001 /* wait to see that peer's links are down */ 51 * TIPC_NODE_DOWN: indicate node is down and it's used to block the node's
52#define WAIT_NAMES_GONE 0x0002 /* wait for peer's publications to be purged */ 52 * links until RESET or ACTIVE message arrives
53#define WAIT_NODE_DOWN 0x0004 /* wait until peer node is declared down */ 53 * TIPC_NODE_RESET: indicate node is reset
54 * TIPC_NODE_LOST: indicate node is lost and it's used to notify subscriptions
55 * when node lock is released
56 * TIPC_NODE_UP: indicate node is up and it's used to deliver local name table
57 * when node lock is released
58 */
59enum {
60 TIPC_NODE_DOWN = (1 << 1),
61 TIPC_NODE_RESET = (1 << 2),
62 TIPC_NODE_LOST = (1 << 3),
63 TIPC_NODE_UP = (1 << 4)
64};
65
66/**
67 * struct tipc_node_bclink - TIPC node bclink structure
68 * @acked: sequence # of last outbound b'cast message acknowledged by node
69 * @last_in: sequence # of last in-sequence b'cast message received from node
70 * @last_sent: sequence # of last b'cast message sent by node
71 * @oos_state: state tracker for handling OOS b'cast messages
72 * @deferred_size: number of OOS b'cast messages in deferred queue
73 * @deferred_head: oldest OOS b'cast message received from node
74 * @deferred_tail: newest OOS b'cast message received from node
75 * @reasm_head: broadcast reassembly queue head from node
76 * @reasm_tail: last broadcast fragment received from node
77 * @recv_permitted: true if node is allowed to receive b'cast messages
78 */
79struct tipc_node_bclink {
80 u32 acked;
81 u32 last_in;
82 u32 last_sent;
83 u32 oos_state;
84 u32 deferred_size;
85 struct sk_buff *deferred_head;
86 struct sk_buff *deferred_tail;
87 struct sk_buff *reasm_head;
88 struct sk_buff *reasm_tail;
89 bool recv_permitted;
90};
54 91
55/** 92/**
56 * struct tipc_node - TIPC node structure 93 * struct tipc_node - TIPC node structure
57 * @addr: network address of node 94 * @addr: network address of node
58 * @lock: spinlock governing access to structure 95 * @lock: spinlock governing access to structure
59 * @hash: links to adjacent nodes in unsorted hash chain 96 * @hash: links to adjacent nodes in unsorted hash chain
60 * @list: links to adjacent nodes in sorted list of cluster's nodes
61 * @nsub: list of "node down" subscriptions monitoring node
62 * @active_links: pointers to active links to node 97 * @active_links: pointers to active links to node
63 * @links: pointers to all links to node 98 * @links: pointers to all links to node
99 * @flags: bit mask of conditions preventing link establishment to node
100 * @bclink: broadcast-related info
101 * @list: links to adjacent nodes in sorted list of cluster's nodes
64 * @working_links: number of working links to node (both active and standby) 102 * @working_links: number of working links to node (both active and standby)
65 * @block_setup: bit mask of conditions preventing link establishment to node
66 * @link_cnt: number of links to node 103 * @link_cnt: number of links to node
67 * @signature: node instance identifier 104 * @signature: node instance identifier
68 * @bclink: broadcast-related info 105 * @nsub: list of "node down" subscriptions monitoring node
69 * @rcu: rcu struct for tipc_node 106 * @rcu: rcu struct for tipc_node
70 * @acked: sequence # of last outbound b'cast message acknowledged by node
71 * @last_in: sequence # of last in-sequence b'cast message received from node
72 * @last_sent: sequence # of last b'cast message sent by node
73 * @oos_state: state tracker for handling OOS b'cast messages
74 * @deferred_size: number of OOS b'cast messages in deferred queue
75 * @deferred_head: oldest OOS b'cast message received from node
76 * @deferred_tail: newest OOS b'cast message received from node
77 * @reasm_head: broadcast reassembly queue head from node
78 * @reasm_tail: last broadcast fragment received from node
79 * @recv_permitted: true if node is allowed to receive b'cast messages
80 */ 107 */
81struct tipc_node { 108struct tipc_node {
82 u32 addr; 109 u32 addr;
83 spinlock_t lock; 110 spinlock_t lock;
84 struct hlist_node hash; 111 struct hlist_node hash;
85 struct list_head list;
86 struct list_head nsub;
87 struct tipc_link *active_links[2]; 112 struct tipc_link *active_links[2];
88 struct tipc_link *links[MAX_BEARERS]; 113 struct tipc_link *links[MAX_BEARERS];
114 unsigned int flags;
115 struct tipc_node_bclink bclink;
116 struct list_head list;
89 int link_cnt; 117 int link_cnt;
90 int working_links; 118 int working_links;
91 int block_setup;
92 u32 signature; 119 u32 signature;
120 struct list_head nsub;
93 struct rcu_head rcu; 121 struct rcu_head rcu;
94 struct {
95 u32 acked;
96 u32 last_in;
97 u32 last_sent;
98 u32 oos_state;
99 u32 deferred_size;
100 struct sk_buff *deferred_head;
101 struct sk_buff *deferred_tail;
102 struct sk_buff *reasm_head;
103 struct sk_buff *reasm_tail;
104 bool recv_permitted;
105 } bclink;
106}; 122};
107 123
108extern struct list_head tipc_node_list; 124extern struct list_head tipc_node_list;
@@ -119,15 +135,17 @@ int tipc_node_is_up(struct tipc_node *n_ptr);
119struct sk_buff *tipc_node_get_links(const void *req_tlv_area, int req_tlv_space); 135struct sk_buff *tipc_node_get_links(const void *req_tlv_area, int req_tlv_space);
120struct sk_buff *tipc_node_get_nodes(const void *req_tlv_area, int req_tlv_space); 136struct sk_buff *tipc_node_get_nodes(const void *req_tlv_area, int req_tlv_space);
121int tipc_node_get_linkname(u32 bearer_id, u32 node, char *linkname, size_t len); 137int tipc_node_get_linkname(u32 bearer_id, u32 node, char *linkname, size_t len);
138void tipc_node_unlock(struct tipc_node *node);
122 139
123static inline void tipc_node_lock(struct tipc_node *n_ptr) 140static inline void tipc_node_lock(struct tipc_node *node)
124{ 141{
125 spin_lock_bh(&n_ptr->lock); 142 spin_lock_bh(&node->lock);
126} 143}
127 144
128static inline void tipc_node_unlock(struct tipc_node *n_ptr) 145static inline bool tipc_node_blocked(struct tipc_node *node)
129{ 146{
130 spin_unlock_bh(&n_ptr->lock); 147 return (node->flags & (TIPC_NODE_DOWN | TIPC_NODE_LOST |
148 TIPC_NODE_RESET));
131} 149}
132 150
133#endif 151#endif
diff --git a/net/tipc/node_subscr.c b/net/tipc/node_subscr.c
index 8a7384c04add..7c59ab1d6ecb 100644
--- a/net/tipc/node_subscr.c
+++ b/net/tipc/node_subscr.c
@@ -81,14 +81,13 @@ void tipc_nodesub_unsubscribe(struct tipc_node_subscr *node_sub)
81 * 81 *
82 * Note: node is locked by caller 82 * Note: node is locked by caller
83 */ 83 */
84void tipc_nodesub_notify(struct tipc_node *node) 84void tipc_nodesub_notify(struct list_head *nsub_list)
85{ 85{
86 struct tipc_node_subscr *ns; 86 struct tipc_node_subscr *ns, *safe;
87 87
88 list_for_each_entry(ns, &node->nsub, nodesub_list) { 88 list_for_each_entry_safe(ns, safe, nsub_list, nodesub_list) {
89 if (ns->handle_node_down) { 89 if (ns->handle_node_down) {
90 tipc_k_signal((Handler)ns->handle_node_down, 90 ns->handle_node_down(ns->usr_handle);
91 (unsigned long)ns->usr_handle);
92 ns->handle_node_down = NULL; 91 ns->handle_node_down = NULL;
93 } 92 }
94 } 93 }
diff --git a/net/tipc/node_subscr.h b/net/tipc/node_subscr.h
index c95d20727ded..d91b8cc81e3d 100644
--- a/net/tipc/node_subscr.h
+++ b/net/tipc/node_subscr.h
@@ -58,6 +58,6 @@ struct tipc_node_subscr {
58void tipc_nodesub_subscribe(struct tipc_node_subscr *node_sub, u32 addr, 58void tipc_nodesub_subscribe(struct tipc_node_subscr *node_sub, u32 addr,
59 void *usr_handle, net_ev_handler handle_down); 59 void *usr_handle, net_ev_handler handle_down);
60void tipc_nodesub_unsubscribe(struct tipc_node_subscr *node_sub); 60void tipc_nodesub_unsubscribe(struct tipc_node_subscr *node_sub);
61void tipc_nodesub_notify(struct tipc_node *node); 61void tipc_nodesub_notify(struct list_head *nsub_list);
62 62
63#endif 63#endif