aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJon Paul Maloy <jon.maloy@ericsson.com>2015-11-19 14:30:44 -0500
committerDavid S. Miller <davem@davemloft.net>2015-11-20 14:06:10 -0500
commit5405ff6e15f40f2f53e37d2dcd7de521e2b7a96f (patch)
tree226f40f32f063d27a8d9a6abe6708d550721f1fd
parent2312bf61ae365fdd6b9bfb24558a417859759447 (diff)
tipc: convert node lock to rwlock
According to the node FSM a node in state SELF_UP_PEER_UP cannot change state inside a lock context, except when a TUNNEL_PROTOCOL (SYNCH or FAILOVER) packet arrives. However, the node's individual links may still change state. Since each link now is protected by its own spinlock, we finally have the conditions in place to convert the node spinlock to an rwlock_t. If the node state and arriving packet type are rigth, we can let the link directly receive the packet under protection of its own spinlock and the node lock in read mode. In all other cases we use the node lock in write mode. This enables full concurrent execution between parallel links during steady-state traffic situations, i.e., 99+ % of the time. This commit implements this change. Reviewed-by: Ying Xue <ying.xue@windriver.com> Signed-off-by: Jon Maloy <jon.maloy@ericsson.com> Signed-off-by: David S. Miller <davem@davemloft.net>
-rw-r--r--net/tipc/link.c32
-rw-r--r--net/tipc/node.c227
-rw-r--r--net/tipc/node.h10
3 files changed, 136 insertions, 133 deletions
diff --git a/net/tipc/link.c b/net/tipc/link.c
index b5e895c6f1aa..1dda46e5dd83 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -1547,7 +1547,7 @@ static struct tipc_node *tipc_link_find_owner(struct net *net,
1547 *bearer_id = 0; 1547 *bearer_id = 0;
1548 rcu_read_lock(); 1548 rcu_read_lock();
1549 list_for_each_entry_rcu(n_ptr, &tn->node_list, list) { 1549 list_for_each_entry_rcu(n_ptr, &tn->node_list, list) {
1550 tipc_node_lock(n_ptr); 1550 tipc_node_read_lock(n_ptr);
1551 for (i = 0; i < MAX_BEARERS; i++) { 1551 for (i = 0; i < MAX_BEARERS; i++) {
1552 l_ptr = n_ptr->links[i].link; 1552 l_ptr = n_ptr->links[i].link;
1553 if (l_ptr && !strcmp(l_ptr->name, link_name)) { 1553 if (l_ptr && !strcmp(l_ptr->name, link_name)) {
@@ -1556,7 +1556,7 @@ static struct tipc_node *tipc_link_find_owner(struct net *net,
1556 break; 1556 break;
1557 } 1557 }
1558 } 1558 }
1559 tipc_node_unlock(n_ptr); 1559 tipc_node_read_unlock(n_ptr);
1560 if (found_node) 1560 if (found_node)
1561 break; 1561 break;
1562 } 1562 }
@@ -1658,7 +1658,7 @@ int tipc_nl_link_set(struct sk_buff *skb, struct genl_info *info)
1658 if (!node) 1658 if (!node)
1659 return -EINVAL; 1659 return -EINVAL;
1660 1660
1661 tipc_node_lock(node); 1661 tipc_node_read_lock(node);
1662 1662
1663 link = node->links[bearer_id].link; 1663 link = node->links[bearer_id].link;
1664 if (!link) { 1664 if (!link) {
@@ -1699,7 +1699,7 @@ int tipc_nl_link_set(struct sk_buff *skb, struct genl_info *info)
1699 } 1699 }
1700 1700
1701out: 1701out:
1702 tipc_node_unlock(node); 1702 tipc_node_read_unlock(node);
1703 1703
1704 return res; 1704 return res;
1705} 1705}
@@ -1898,10 +1898,10 @@ int tipc_nl_link_dump(struct sk_buff *skb, struct netlink_callback *cb)
1898 1898
1899 list_for_each_entry_continue_rcu(node, &tn->node_list, 1899 list_for_each_entry_continue_rcu(node, &tn->node_list,
1900 list) { 1900 list) {
1901 tipc_node_lock(node); 1901 tipc_node_read_lock(node);
1902 err = __tipc_nl_add_node_links(net, &msg, node, 1902 err = __tipc_nl_add_node_links(net, &msg, node,
1903 &prev_link); 1903 &prev_link);
1904 tipc_node_unlock(node); 1904 tipc_node_read_unlock(node);
1905 if (err) 1905 if (err)
1906 goto out; 1906 goto out;
1907 1907
@@ -1913,10 +1913,10 @@ int tipc_nl_link_dump(struct sk_buff *skb, struct netlink_callback *cb)
1913 goto out; 1913 goto out;
1914 1914
1915 list_for_each_entry_rcu(node, &tn->node_list, list) { 1915 list_for_each_entry_rcu(node, &tn->node_list, list) {
1916 tipc_node_lock(node); 1916 tipc_node_read_lock(node);
1917 err = __tipc_nl_add_node_links(net, &msg, node, 1917 err = __tipc_nl_add_node_links(net, &msg, node,
1918 &prev_link); 1918 &prev_link);
1919 tipc_node_unlock(node); 1919 tipc_node_read_unlock(node);
1920 if (err) 1920 if (err)
1921 goto out; 1921 goto out;
1922 1922
@@ -1967,16 +1967,16 @@ int tipc_nl_link_get(struct sk_buff *skb, struct genl_info *info)
1967 if (!node) 1967 if (!node)
1968 return -EINVAL; 1968 return -EINVAL;
1969 1969
1970 tipc_node_lock(node); 1970 tipc_node_read_lock(node);
1971 link = node->links[bearer_id].link; 1971 link = node->links[bearer_id].link;
1972 if (!link) { 1972 if (!link) {
1973 tipc_node_unlock(node); 1973 tipc_node_read_unlock(node);
1974 nlmsg_free(msg.skb); 1974 nlmsg_free(msg.skb);
1975 return -EINVAL; 1975 return -EINVAL;
1976 } 1976 }
1977 1977
1978 err = __tipc_nl_add_link(net, &msg, link, 0); 1978 err = __tipc_nl_add_link(net, &msg, link, 0);
1979 tipc_node_unlock(node); 1979 tipc_node_read_unlock(node);
1980 if (err) { 1980 if (err) {
1981 nlmsg_free(msg.skb); 1981 nlmsg_free(msg.skb);
1982 return err; 1982 return err;
@@ -2021,18 +2021,18 @@ int tipc_nl_link_reset_stats(struct sk_buff *skb, struct genl_info *info)
2021 node = tipc_link_find_owner(net, link_name, &bearer_id); 2021 node = tipc_link_find_owner(net, link_name, &bearer_id);
2022 if (!node) 2022 if (!node)
2023 return -EINVAL; 2023 return -EINVAL;
2024
2024 le = &node->links[bearer_id]; 2025 le = &node->links[bearer_id];
2025 tipc_node_lock(node); 2026 tipc_node_read_lock(node);
2026 spin_lock_bh(&le->lock); 2027 spin_lock_bh(&le->lock);
2027 link = le->link; 2028 link = le->link;
2028 if (!link) { 2029 if (!link) {
2029 tipc_node_unlock(node); 2030 spin_unlock_bh(&le->lock);
2031 tipc_node_read_unlock(node);
2030 return -EINVAL; 2032 return -EINVAL;
2031 } 2033 }
2032
2033 link_reset_statistics(link); 2034 link_reset_statistics(link);
2034 spin_unlock_bh(&le->lock); 2035 spin_unlock_bh(&le->lock);
2035 tipc_node_unlock(node); 2036 tipc_node_read_unlock(node);
2036
2037 return 0; 2037 return 0;
2038} 2038}
diff --git a/net/tipc/node.c b/net/tipc/node.c
index 572063a0190e..47d5f84c90c5 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -141,10 +141,63 @@ struct tipc_node *tipc_node_find(struct net *net, u32 addr)
141 return NULL; 141 return NULL;
142} 142}
143 143
144void tipc_node_read_lock(struct tipc_node *n)
145{
146 read_lock_bh(&n->lock);
147}
148
149void tipc_node_read_unlock(struct tipc_node *n)
150{
151 read_unlock_bh(&n->lock);
152}
153
154static void tipc_node_write_lock(struct tipc_node *n)
155{
156 write_lock_bh(&n->lock);
157}
158
159static void tipc_node_write_unlock(struct tipc_node *n)
160{
161 struct net *net = n->net;
162 u32 addr = 0;
163 u32 flags = n->action_flags;
164 u32 link_id = 0;
165 struct list_head *publ_list;
166
167 if (likely(!flags)) {
168 write_unlock_bh(&n->lock);
169 return;
170 }
171
172 addr = n->addr;
173 link_id = n->link_id;
174 publ_list = &n->publ_list;
175
176 n->action_flags &= ~(TIPC_NOTIFY_NODE_DOWN | TIPC_NOTIFY_NODE_UP |
177 TIPC_NOTIFY_LINK_DOWN | TIPC_NOTIFY_LINK_UP);
178
179 write_unlock_bh(&n->lock);
180
181 if (flags & TIPC_NOTIFY_NODE_DOWN)
182 tipc_publ_notify(net, publ_list, addr);
183
184 if (flags & TIPC_NOTIFY_NODE_UP)
185 tipc_named_node_up(net, addr);
186
187 if (flags & TIPC_NOTIFY_LINK_UP)
188 tipc_nametbl_publish(net, TIPC_LINK_STATE, addr, addr,
189 TIPC_NODE_SCOPE, link_id, addr);
190
191 if (flags & TIPC_NOTIFY_LINK_DOWN)
192 tipc_nametbl_withdraw(net, TIPC_LINK_STATE, addr,
193 link_id, addr);
194}
195
144struct tipc_node *tipc_node_create(struct net *net, u32 addr, u16 capabilities) 196struct tipc_node *tipc_node_create(struct net *net, u32 addr, u16 capabilities)
145{ 197{
146 struct tipc_net *tn = net_generic(net, tipc_net_id); 198 struct tipc_net *tn = net_generic(net, tipc_net_id);
147 struct tipc_node *n_ptr, *temp_node; 199 struct tipc_node *n_ptr, *temp_node;
200 int i;
148 201
149 spin_lock_bh(&tn->node_list_lock); 202 spin_lock_bh(&tn->node_list_lock);
150 n_ptr = tipc_node_find(net, addr); 203 n_ptr = tipc_node_find(net, addr);
@@ -159,7 +212,7 @@ struct tipc_node *tipc_node_create(struct net *net, u32 addr, u16 capabilities)
159 n_ptr->net = net; 212 n_ptr->net = net;
160 n_ptr->capabilities = capabilities; 213 n_ptr->capabilities = capabilities;
161 kref_init(&n_ptr->kref); 214 kref_init(&n_ptr->kref);
162 spin_lock_init(&n_ptr->lock); 215 rwlock_init(&n_ptr->lock);
163 INIT_HLIST_NODE(&n_ptr->hash); 216 INIT_HLIST_NODE(&n_ptr->hash);
164 INIT_LIST_HEAD(&n_ptr->list); 217 INIT_LIST_HEAD(&n_ptr->list);
165 INIT_LIST_HEAD(&n_ptr->publ_list); 218 INIT_LIST_HEAD(&n_ptr->publ_list);
@@ -168,6 +221,8 @@ struct tipc_node *tipc_node_create(struct net *net, u32 addr, u16 capabilities)
168 skb_queue_head_init(&n_ptr->bc_entry.inputq1); 221 skb_queue_head_init(&n_ptr->bc_entry.inputq1);
169 __skb_queue_head_init(&n_ptr->bc_entry.arrvq); 222 __skb_queue_head_init(&n_ptr->bc_entry.arrvq);
170 skb_queue_head_init(&n_ptr->bc_entry.inputq2); 223 skb_queue_head_init(&n_ptr->bc_entry.inputq2);
224 for (i = 0; i < MAX_BEARERS; i++)
225 spin_lock_init(&n_ptr->links[i].lock);
171 hlist_add_head_rcu(&n_ptr->hash, &tn->node_htable[tipc_hashfn(addr)]); 226 hlist_add_head_rcu(&n_ptr->hash, &tn->node_htable[tipc_hashfn(addr)]);
172 list_for_each_entry_rcu(temp_node, &tn->node_list, list) { 227 list_for_each_entry_rcu(temp_node, &tn->node_list, list) {
173 if (n_ptr->addr < temp_node->addr) 228 if (n_ptr->addr < temp_node->addr)
@@ -246,9 +301,9 @@ void tipc_node_subscribe(struct net *net, struct list_head *subscr, u32 addr)
246 pr_warn("Node subscribe rejected, unknown node 0x%x\n", addr); 301 pr_warn("Node subscribe rejected, unknown node 0x%x\n", addr);
247 return; 302 return;
248 } 303 }
249 tipc_node_lock(n); 304 tipc_node_write_lock(n);
250 list_add_tail(subscr, &n->publ_list); 305 list_add_tail(subscr, &n->publ_list);
251 tipc_node_unlock(n); 306 tipc_node_write_unlock(n);
252 tipc_node_put(n); 307 tipc_node_put(n);
253} 308}
254 309
@@ -264,9 +319,9 @@ void tipc_node_unsubscribe(struct net *net, struct list_head *subscr, u32 addr)
264 pr_warn("Node unsubscribe rejected, unknown node 0x%x\n", addr); 319 pr_warn("Node unsubscribe rejected, unknown node 0x%x\n", addr);
265 return; 320 return;
266 } 321 }
267 tipc_node_lock(n); 322 tipc_node_write_lock(n);
268 list_del_init(subscr); 323 list_del_init(subscr);
269 tipc_node_unlock(n); 324 tipc_node_write_unlock(n);
270 tipc_node_put(n); 325 tipc_node_put(n);
271} 326}
272 327
@@ -293,9 +348,9 @@ int tipc_node_add_conn(struct net *net, u32 dnode, u32 port, u32 peer_port)
293 conn->port = port; 348 conn->port = port;
294 conn->peer_port = peer_port; 349 conn->peer_port = peer_port;
295 350
296 tipc_node_lock(node); 351 tipc_node_write_lock(node);
297 list_add_tail(&conn->list, &node->conn_sks); 352 list_add_tail(&conn->list, &node->conn_sks);
298 tipc_node_unlock(node); 353 tipc_node_write_unlock(node);
299exit: 354exit:
300 tipc_node_put(node); 355 tipc_node_put(node);
301 return err; 356 return err;
@@ -313,14 +368,14 @@ void tipc_node_remove_conn(struct net *net, u32 dnode, u32 port)
313 if (!node) 368 if (!node)
314 return; 369 return;
315 370
316 tipc_node_lock(node); 371 tipc_node_write_lock(node);
317 list_for_each_entry_safe(conn, safe, &node->conn_sks, list) { 372 list_for_each_entry_safe(conn, safe, &node->conn_sks, list) {
318 if (port != conn->port) 373 if (port != conn->port)
319 continue; 374 continue;
320 list_del(&conn->list); 375 list_del(&conn->list);
321 kfree(conn); 376 kfree(conn);
322 } 377 }
323 tipc_node_unlock(node); 378 tipc_node_write_unlock(node);
324 tipc_node_put(node); 379 tipc_node_put(node);
325} 380}
326 381
@@ -337,7 +392,7 @@ static void tipc_node_timeout(unsigned long data)
337 __skb_queue_head_init(&xmitq); 392 __skb_queue_head_init(&xmitq);
338 393
339 for (bearer_id = 0; bearer_id < MAX_BEARERS; bearer_id++) { 394 for (bearer_id = 0; bearer_id < MAX_BEARERS; bearer_id++) {
340 tipc_node_lock(n); 395 tipc_node_read_lock(n);
341 le = &n->links[bearer_id]; 396 le = &n->links[bearer_id];
342 spin_lock_bh(&le->lock); 397 spin_lock_bh(&le->lock);
343 if (le->link) { 398 if (le->link) {
@@ -346,7 +401,7 @@ static void tipc_node_timeout(unsigned long data)
346 rc = tipc_link_timeout(le->link, &xmitq); 401 rc = tipc_link_timeout(le->link, &xmitq);
347 } 402 }
348 spin_unlock_bh(&le->lock); 403 spin_unlock_bh(&le->lock);
349 tipc_node_unlock(n); 404 tipc_node_read_unlock(n);
350 tipc_bearer_xmit(n->net, bearer_id, &xmitq, &le->maddr); 405 tipc_bearer_xmit(n->net, bearer_id, &xmitq, &le->maddr);
351 if (rc & TIPC_LINK_DOWN_EVT) 406 if (rc & TIPC_LINK_DOWN_EVT)
352 tipc_node_link_down(n, bearer_id, false); 407 tipc_node_link_down(n, bearer_id, false);
@@ -425,9 +480,9 @@ static void __tipc_node_link_up(struct tipc_node *n, int bearer_id,
425static void tipc_node_link_up(struct tipc_node *n, int bearer_id, 480static void tipc_node_link_up(struct tipc_node *n, int bearer_id,
426 struct sk_buff_head *xmitq) 481 struct sk_buff_head *xmitq)
427{ 482{
428 tipc_node_lock(n); 483 tipc_node_write_lock(n);
429 __tipc_node_link_up(n, bearer_id, xmitq); 484 __tipc_node_link_up(n, bearer_id, xmitq);
430 tipc_node_unlock(n); 485 tipc_node_write_unlock(n);
431} 486}
432 487
433/** 488/**
@@ -516,7 +571,7 @@ static void tipc_node_link_down(struct tipc_node *n, int bearer_id, bool delete)
516 571
517 __skb_queue_head_init(&xmitq); 572 __skb_queue_head_init(&xmitq);
518 573
519 tipc_node_lock(n); 574 tipc_node_write_lock(n);
520 if (!tipc_link_is_establishing(l)) { 575 if (!tipc_link_is_establishing(l)) {
521 __tipc_node_link_down(n, &bearer_id, &xmitq, &maddr); 576 __tipc_node_link_down(n, &bearer_id, &xmitq, &maddr);
522 if (delete) { 577 if (delete) {
@@ -528,7 +583,7 @@ static void tipc_node_link_down(struct tipc_node *n, int bearer_id, bool delete)
528 /* Defuse pending tipc_node_link_up() */ 583 /* Defuse pending tipc_node_link_up() */
529 tipc_link_fsm_evt(l, LINK_RESET_EVT); 584 tipc_link_fsm_evt(l, LINK_RESET_EVT);
530 } 585 }
531 tipc_node_unlock(n); 586 tipc_node_write_unlock(n);
532 tipc_bearer_xmit(n->net, bearer_id, &xmitq, maddr); 587 tipc_bearer_xmit(n->net, bearer_id, &xmitq, maddr);
533 tipc_sk_rcv(n->net, &le->inputq); 588 tipc_sk_rcv(n->net, &le->inputq);
534} 589}
@@ -561,7 +616,7 @@ void tipc_node_check_dest(struct net *net, u32 onode,
561 if (!n) 616 if (!n)
562 return; 617 return;
563 618
564 tipc_node_lock(n); 619 tipc_node_write_lock(n);
565 620
566 le = &n->links[b->identity]; 621 le = &n->links[b->identity];
567 622
@@ -656,7 +711,6 @@ void tipc_node_check_dest(struct net *net, u32 onode,
656 if (n->state == NODE_FAILINGOVER) 711 if (n->state == NODE_FAILINGOVER)
657 tipc_link_fsm_evt(l, LINK_FAILOVER_BEGIN_EVT); 712 tipc_link_fsm_evt(l, LINK_FAILOVER_BEGIN_EVT);
658 le->link = l; 713 le->link = l;
659 spin_lock_init(&le->lock);
660 n->link_cnt++; 714 n->link_cnt++;
661 tipc_node_calculate_timer(n, l); 715 tipc_node_calculate_timer(n, l);
662 if (n->link_cnt == 1) 716 if (n->link_cnt == 1)
@@ -665,7 +719,7 @@ void tipc_node_check_dest(struct net *net, u32 onode,
665 } 719 }
666 memcpy(&le->maddr, maddr, sizeof(*maddr)); 720 memcpy(&le->maddr, maddr, sizeof(*maddr));
667exit: 721exit:
668 tipc_node_unlock(n); 722 tipc_node_write_unlock(n);
669 if (reset && !tipc_link_is_reset(l)) 723 if (reset && !tipc_link_is_reset(l))
670 tipc_node_link_down(n, b->identity, false); 724 tipc_node_link_down(n, b->identity, false);
671 tipc_node_put(n); 725 tipc_node_put(n);
@@ -873,24 +927,6 @@ illegal_evt:
873 pr_err("Illegal node fsm evt %x in state %x\n", evt, state); 927 pr_err("Illegal node fsm evt %x in state %x\n", evt, state);
874} 928}
875 929
876bool tipc_node_filter_pkt(struct tipc_node *n, struct tipc_msg *hdr)
877{
878 int state = n->state;
879
880 if (likely(state == SELF_UP_PEER_UP))
881 return true;
882
883 if (state == SELF_LEAVING_PEER_DOWN)
884 return false;
885
886 if (state == SELF_DOWN_PEER_LEAVING) {
887 if (msg_peer_node_is_up(hdr))
888 return false;
889 }
890
891 return true;
892}
893
894static void node_lost_contact(struct tipc_node *n, 930static void node_lost_contact(struct tipc_node *n,
895 struct sk_buff_head *inputq) 931 struct sk_buff_head *inputq)
896{ 932{
@@ -952,56 +988,18 @@ int tipc_node_get_linkname(struct net *net, u32 bearer_id, u32 addr,
952 if (bearer_id >= MAX_BEARERS) 988 if (bearer_id >= MAX_BEARERS)
953 goto exit; 989 goto exit;
954 990
955 tipc_node_lock(node); 991 tipc_node_read_lock(node);
956 link = node->links[bearer_id].link; 992 link = node->links[bearer_id].link;
957 if (link) { 993 if (link) {
958 strncpy(linkname, link->name, len); 994 strncpy(linkname, link->name, len);
959 err = 0; 995 err = 0;
960 } 996 }
961exit: 997exit:
962 tipc_node_unlock(node); 998 tipc_node_read_unlock(node);
963 tipc_node_put(node); 999 tipc_node_put(node);
964 return err; 1000 return err;
965} 1001}
966 1002
967void tipc_node_unlock(struct tipc_node *node)
968{
969 struct net *net = node->net;
970 u32 addr = 0;
971 u32 flags = node->action_flags;
972 u32 link_id = 0;
973 struct list_head *publ_list;
974
975 if (likely(!flags)) {
976 spin_unlock_bh(&node->lock);
977 return;
978 }
979
980 addr = node->addr;
981 link_id = node->link_id;
982 publ_list = &node->publ_list;
983
984 node->action_flags &= ~(TIPC_NOTIFY_NODE_DOWN | TIPC_NOTIFY_NODE_UP |
985 TIPC_NOTIFY_LINK_DOWN | TIPC_NOTIFY_LINK_UP);
986
987 spin_unlock_bh(&node->lock);
988
989 if (flags & TIPC_NOTIFY_NODE_DOWN)
990 tipc_publ_notify(net, publ_list, addr);
991
992 if (flags & TIPC_NOTIFY_NODE_UP)
993 tipc_named_node_up(net, addr);
994
995 if (flags & TIPC_NOTIFY_LINK_UP)
996 tipc_nametbl_publish(net, TIPC_LINK_STATE, addr, addr,
997 TIPC_NODE_SCOPE, link_id, addr);
998
999 if (flags & TIPC_NOTIFY_LINK_DOWN)
1000 tipc_nametbl_withdraw(net, TIPC_LINK_STATE, addr,
1001 link_id, addr);
1002
1003}
1004
1005/* Caller should hold node lock for the passed node */ 1003/* Caller should hold node lock for the passed node */
1006static int __tipc_nl_add_node(struct tipc_nl_msg *msg, struct tipc_node *node) 1004static int __tipc_nl_add_node(struct tipc_nl_msg *msg, struct tipc_node *node)
1007{ 1005{
@@ -1048,40 +1046,38 @@ msg_full:
1048int tipc_node_xmit(struct net *net, struct sk_buff_head *list, 1046int tipc_node_xmit(struct net *net, struct sk_buff_head *list,
1049 u32 dnode, int selector) 1047 u32 dnode, int selector)
1050{ 1048{
1051 struct tipc_link_entry *le; 1049 struct tipc_link_entry *le = NULL;
1052 struct tipc_node *n; 1050 struct tipc_node *n;
1053 struct sk_buff_head xmitq; 1051 struct sk_buff_head xmitq;
1054 struct tipc_media_addr *maddr = NULL;
1055 int bearer_id = -1; 1052 int bearer_id = -1;
1056 int rc = -EHOSTUNREACH; 1053 int rc = -EHOSTUNREACH;
1057 1054
1058 __skb_queue_head_init(&xmitq); 1055 __skb_queue_head_init(&xmitq);
1059 n = tipc_node_find(net, dnode); 1056 n = tipc_node_find(net, dnode);
1060 if (likely(n)) { 1057 if (likely(n)) {
1061 tipc_node_lock(n); 1058 tipc_node_read_lock(n);
1062 bearer_id = n->active_links[selector & 1]; 1059 bearer_id = n->active_links[selector & 1];
1063 if (bearer_id >= 0) { 1060 if (bearer_id >= 0) {
1064 le = &n->links[bearer_id]; 1061 le = &n->links[bearer_id];
1065 maddr = &le->maddr;
1066 spin_lock_bh(&le->lock); 1062 spin_lock_bh(&le->lock);
1067 if (likely(le->link)) 1063 rc = tipc_link_xmit(le->link, list, &xmitq);
1068 rc = tipc_link_xmit(le->link, list, &xmitq);
1069 spin_unlock_bh(&le->lock); 1064 spin_unlock_bh(&le->lock);
1070 } 1065 }
1071 tipc_node_unlock(n); 1066 tipc_node_read_unlock(n);
1067 if (likely(!skb_queue_empty(&xmitq))) {
1068 tipc_bearer_xmit(net, bearer_id, &xmitq, &le->maddr);
1069 return 0;
1070 }
1072 if (unlikely(rc == -ENOBUFS)) 1071 if (unlikely(rc == -ENOBUFS))
1073 tipc_node_link_down(n, bearer_id, false); 1072 tipc_node_link_down(n, bearer_id, false);
1074 tipc_node_put(n); 1073 tipc_node_put(n);
1074 return rc;
1075 } 1075 }
1076 if (likely(!skb_queue_empty(&xmitq))) { 1076
1077 tipc_bearer_xmit(net, bearer_id, &xmitq, maddr); 1077 if (unlikely(!in_own_node(net, dnode)))
1078 return 0; 1078 return rc;
1079 } 1079 tipc_sk_rcv(net, list);
1080 if (likely(in_own_node(net, dnode))) { 1080 return 0;
1081 tipc_sk_rcv(net, list);
1082 return 0;
1083 }
1084 return rc;
1085} 1081}
1086 1082
1087/* tipc_node_xmit_skb(): send single buffer to destination 1083/* tipc_node_xmit_skb(): send single buffer to destination
@@ -1171,9 +1167,9 @@ static void tipc_node_bc_rcv(struct net *net, struct sk_buff *skb, int bearer_id
1171 1167
1172 /* Broadcast ACKs are sent on a unicast link */ 1168 /* Broadcast ACKs are sent on a unicast link */
1173 if (rc & TIPC_LINK_SND_BC_ACK) { 1169 if (rc & TIPC_LINK_SND_BC_ACK) {
1174 tipc_node_lock(n); 1170 tipc_node_read_lock(n);
1175 tipc_link_build_ack_msg(le->link, &xmitq); 1171 tipc_link_build_ack_msg(le->link, &xmitq);
1176 tipc_node_unlock(n); 1172 tipc_node_read_unlock(n);
1177 } 1173 }
1178 1174
1179 if (!skb_queue_empty(&xmitq)) 1175 if (!skb_queue_empty(&xmitq))
@@ -1229,7 +1225,7 @@ static bool tipc_node_check_state(struct tipc_node *n, struct sk_buff *skb,
1229 } 1225 }
1230 } 1226 }
1231 1227
1232 /* Update node accesibility if applicable */ 1228 /* Check and update node accesibility if applicable */
1233 if (state == SELF_UP_PEER_COMING) { 1229 if (state == SELF_UP_PEER_COMING) {
1234 if (!tipc_link_is_up(l)) 1230 if (!tipc_link_is_up(l))
1235 return true; 1231 return true;
@@ -1245,6 +1241,9 @@ static bool tipc_node_check_state(struct tipc_node *n, struct sk_buff *skb,
1245 return true; 1241 return true;
1246 } 1242 }
1247 1243
1244 if (state == SELF_LEAVING_PEER_DOWN)
1245 return false;
1246
1248 /* Ignore duplicate packets */ 1247 /* Ignore duplicate packets */
1249 if ((usr != LINK_PROTOCOL) && less(oseqno, rcv_nxt)) 1248 if ((usr != LINK_PROTOCOL) && less(oseqno, rcv_nxt))
1250 return true; 1249 return true;
@@ -1361,21 +1360,29 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b)
1361 else if (unlikely(n->bc_entry.link->acked != bc_ack)) 1360 else if (unlikely(n->bc_entry.link->acked != bc_ack))
1362 tipc_bcast_ack_rcv(net, n->bc_entry.link, bc_ack); 1361 tipc_bcast_ack_rcv(net, n->bc_entry.link, bc_ack);
1363 1362
1364 tipc_node_lock(n); 1363 /* Receive packet directly if conditions permit */
1365 1364 tipc_node_read_lock(n);
1366 /* Is reception permitted at the moment ? */ 1365 if (likely((n->state == SELF_UP_PEER_UP) && (usr != TUNNEL_PROTOCOL))) {
1367 if (!tipc_node_filter_pkt(n, hdr))
1368 goto unlock;
1369
1370 /* Check and if necessary update node state */
1371 if (likely(tipc_node_check_state(n, skb, bearer_id, &xmitq))) {
1372 spin_lock_bh(&le->lock); 1366 spin_lock_bh(&le->lock);
1373 rc = tipc_link_rcv(le->link, skb, &xmitq); 1367 if (le->link) {
1368 rc = tipc_link_rcv(le->link, skb, &xmitq);
1369 skb = NULL;
1370 }
1374 spin_unlock_bh(&le->lock); 1371 spin_unlock_bh(&le->lock);
1375 skb = NULL;
1376 } 1372 }
1377unlock: 1373 tipc_node_read_unlock(n);
1378 tipc_node_unlock(n); 1374
1375 /* Check/update node state before receiving */
1376 if (unlikely(skb)) {
1377 tipc_node_write_lock(n);
1378 if (tipc_node_check_state(n, skb, bearer_id, &xmitq)) {
1379 if (le->link) {
1380 rc = tipc_link_rcv(le->link, skb, &xmitq);
1381 skb = NULL;
1382 }
1383 }
1384 tipc_node_write_unlock(n);
1385 }
1379 1386
1380 if (unlikely(rc & TIPC_LINK_UP_EVT)) 1387 if (unlikely(rc & TIPC_LINK_UP_EVT))
1381 tipc_node_link_up(n, bearer_id, &xmitq); 1388 tipc_node_link_up(n, bearer_id, &xmitq);
@@ -1440,15 +1447,15 @@ int tipc_nl_node_dump(struct sk_buff *skb, struct netlink_callback *cb)
1440 continue; 1447 continue;
1441 } 1448 }
1442 1449
1443 tipc_node_lock(node); 1450 tipc_node_read_lock(node);
1444 err = __tipc_nl_add_node(&msg, node); 1451 err = __tipc_nl_add_node(&msg, node);
1445 if (err) { 1452 if (err) {
1446 last_addr = node->addr; 1453 last_addr = node->addr;
1447 tipc_node_unlock(node); 1454 tipc_node_read_unlock(node);
1448 goto out; 1455 goto out;
1449 } 1456 }
1450 1457
1451 tipc_node_unlock(node); 1458 tipc_node_read_unlock(node);
1452 } 1459 }
1453 done = 1; 1460 done = 1;
1454out: 1461out:
diff --git a/net/tipc/node.h b/net/tipc/node.h
index 8784907486c0..651a1581a210 100644
--- a/net/tipc/node.h
+++ b/net/tipc/node.h
@@ -109,7 +109,7 @@ struct tipc_bclink_entry {
109struct tipc_node { 109struct tipc_node {
110 u32 addr; 110 u32 addr;
111 struct kref kref; 111 struct kref kref;
112 spinlock_t lock; 112 rwlock_t lock;
113 struct net *net; 113 struct net *net;
114 struct hlist_node hash; 114 struct hlist_node hash;
115 int active_links[2]; 115 int active_links[2];
@@ -145,7 +145,8 @@ void tipc_node_detach_link(struct tipc_node *n_ptr, struct tipc_link *l_ptr);
145bool tipc_node_is_up(struct tipc_node *n); 145bool tipc_node_is_up(struct tipc_node *n);
146int tipc_node_get_linkname(struct net *net, u32 bearer_id, u32 node, 146int tipc_node_get_linkname(struct net *net, u32 bearer_id, u32 node,
147 char *linkname, size_t len); 147 char *linkname, size_t len);
148void tipc_node_unlock(struct tipc_node *node); 148void tipc_node_read_lock(struct tipc_node *n);
149void tipc_node_read_unlock(struct tipc_node *node);
149int tipc_node_xmit(struct net *net, struct sk_buff_head *list, u32 dnode, 150int tipc_node_xmit(struct net *net, struct sk_buff_head *list, u32 dnode,
150 int selector); 151 int selector);
151int tipc_node_xmit_skb(struct net *net, struct sk_buff *skb, u32 dest, 152int tipc_node_xmit_skb(struct net *net, struct sk_buff *skb, u32 dest,
@@ -157,11 +158,6 @@ int tipc_node_add_conn(struct net *net, u32 dnode, u32 port, u32 peer_port);
157void tipc_node_remove_conn(struct net *net, u32 dnode, u32 port); 158void tipc_node_remove_conn(struct net *net, u32 dnode, u32 port);
158int tipc_nl_node_dump(struct sk_buff *skb, struct netlink_callback *cb); 159int tipc_nl_node_dump(struct sk_buff *skb, struct netlink_callback *cb);
159 160
160static inline void tipc_node_lock(struct tipc_node *node)
161{
162 spin_lock_bh(&node->lock);
163}
164
165static inline struct tipc_link *node_active_link(struct tipc_node *n, int sel) 161static inline struct tipc_link *node_active_link(struct tipc_node *n, int sel)
166{ 162{
167 int bearer_id = n->active_links[sel & 1]; 163 int bearer_id = n->active_links[sel & 1];