aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc/node.c
diff options
context:
space:
mode:
authorJon Paul Maloy <jon.maloy@ericsson.com>2015-11-19 14:30:44 -0500
committerDavid S. Miller <davem@davemloft.net>2015-11-20 14:06:10 -0500
commit5405ff6e15f40f2f53e37d2dcd7de521e2b7a96f (patch)
tree226f40f32f063d27a8d9a6abe6708d550721f1fd /net/tipc/node.c
parent2312bf61ae365fdd6b9bfb24558a417859759447 (diff)
tipc: convert node lock to rwlock
According to the node FSM a node in state SELF_UP_PEER_UP cannot change state inside a lock context, except when a TUNNEL_PROTOCOL (SYNCH or FAILOVER) packet arrives. However, the node's individual links may still change state. Since each link now is protected by its own spinlock, we finally have the conditions in place to convert the node spinlock to an rwlock_t. If the node state and arriving packet type are rigth, we can let the link directly receive the packet under protection of its own spinlock and the node lock in read mode. In all other cases we use the node lock in write mode. This enables full concurrent execution between parallel links during steady-state traffic situations, i.e., 99+ % of the time. This commit implements this change. Reviewed-by: Ying Xue <ying.xue@windriver.com> Signed-off-by: Jon Maloy <jon.maloy@ericsson.com> Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net/tipc/node.c')
-rw-r--r--net/tipc/node.c227
1 files changed, 117 insertions, 110 deletions
diff --git a/net/tipc/node.c b/net/tipc/node.c
index 572063a0190e..47d5f84c90c5 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -141,10 +141,63 @@ struct tipc_node *tipc_node_find(struct net *net, u32 addr)
141 return NULL; 141 return NULL;
142} 142}
143 143
144void tipc_node_read_lock(struct tipc_node *n)
145{
146 read_lock_bh(&n->lock);
147}
148
149void tipc_node_read_unlock(struct tipc_node *n)
150{
151 read_unlock_bh(&n->lock);
152}
153
154static void tipc_node_write_lock(struct tipc_node *n)
155{
156 write_lock_bh(&n->lock);
157}
158
159static void tipc_node_write_unlock(struct tipc_node *n)
160{
161 struct net *net = n->net;
162 u32 addr = 0;
163 u32 flags = n->action_flags;
164 u32 link_id = 0;
165 struct list_head *publ_list;
166
167 if (likely(!flags)) {
168 write_unlock_bh(&n->lock);
169 return;
170 }
171
172 addr = n->addr;
173 link_id = n->link_id;
174 publ_list = &n->publ_list;
175
176 n->action_flags &= ~(TIPC_NOTIFY_NODE_DOWN | TIPC_NOTIFY_NODE_UP |
177 TIPC_NOTIFY_LINK_DOWN | TIPC_NOTIFY_LINK_UP);
178
179 write_unlock_bh(&n->lock);
180
181 if (flags & TIPC_NOTIFY_NODE_DOWN)
182 tipc_publ_notify(net, publ_list, addr);
183
184 if (flags & TIPC_NOTIFY_NODE_UP)
185 tipc_named_node_up(net, addr);
186
187 if (flags & TIPC_NOTIFY_LINK_UP)
188 tipc_nametbl_publish(net, TIPC_LINK_STATE, addr, addr,
189 TIPC_NODE_SCOPE, link_id, addr);
190
191 if (flags & TIPC_NOTIFY_LINK_DOWN)
192 tipc_nametbl_withdraw(net, TIPC_LINK_STATE, addr,
193 link_id, addr);
194}
195
144struct tipc_node *tipc_node_create(struct net *net, u32 addr, u16 capabilities) 196struct tipc_node *tipc_node_create(struct net *net, u32 addr, u16 capabilities)
145{ 197{
146 struct tipc_net *tn = net_generic(net, tipc_net_id); 198 struct tipc_net *tn = net_generic(net, tipc_net_id);
147 struct tipc_node *n_ptr, *temp_node; 199 struct tipc_node *n_ptr, *temp_node;
200 int i;
148 201
149 spin_lock_bh(&tn->node_list_lock); 202 spin_lock_bh(&tn->node_list_lock);
150 n_ptr = tipc_node_find(net, addr); 203 n_ptr = tipc_node_find(net, addr);
@@ -159,7 +212,7 @@ struct tipc_node *tipc_node_create(struct net *net, u32 addr, u16 capabilities)
159 n_ptr->net = net; 212 n_ptr->net = net;
160 n_ptr->capabilities = capabilities; 213 n_ptr->capabilities = capabilities;
161 kref_init(&n_ptr->kref); 214 kref_init(&n_ptr->kref);
162 spin_lock_init(&n_ptr->lock); 215 rwlock_init(&n_ptr->lock);
163 INIT_HLIST_NODE(&n_ptr->hash); 216 INIT_HLIST_NODE(&n_ptr->hash);
164 INIT_LIST_HEAD(&n_ptr->list); 217 INIT_LIST_HEAD(&n_ptr->list);
165 INIT_LIST_HEAD(&n_ptr->publ_list); 218 INIT_LIST_HEAD(&n_ptr->publ_list);
@@ -168,6 +221,8 @@ struct tipc_node *tipc_node_create(struct net *net, u32 addr, u16 capabilities)
168 skb_queue_head_init(&n_ptr->bc_entry.inputq1); 221 skb_queue_head_init(&n_ptr->bc_entry.inputq1);
169 __skb_queue_head_init(&n_ptr->bc_entry.arrvq); 222 __skb_queue_head_init(&n_ptr->bc_entry.arrvq);
170 skb_queue_head_init(&n_ptr->bc_entry.inputq2); 223 skb_queue_head_init(&n_ptr->bc_entry.inputq2);
224 for (i = 0; i < MAX_BEARERS; i++)
225 spin_lock_init(&n_ptr->links[i].lock);
171 hlist_add_head_rcu(&n_ptr->hash, &tn->node_htable[tipc_hashfn(addr)]); 226 hlist_add_head_rcu(&n_ptr->hash, &tn->node_htable[tipc_hashfn(addr)]);
172 list_for_each_entry_rcu(temp_node, &tn->node_list, list) { 227 list_for_each_entry_rcu(temp_node, &tn->node_list, list) {
173 if (n_ptr->addr < temp_node->addr) 228 if (n_ptr->addr < temp_node->addr)
@@ -246,9 +301,9 @@ void tipc_node_subscribe(struct net *net, struct list_head *subscr, u32 addr)
246 pr_warn("Node subscribe rejected, unknown node 0x%x\n", addr); 301 pr_warn("Node subscribe rejected, unknown node 0x%x\n", addr);
247 return; 302 return;
248 } 303 }
249 tipc_node_lock(n); 304 tipc_node_write_lock(n);
250 list_add_tail(subscr, &n->publ_list); 305 list_add_tail(subscr, &n->publ_list);
251 tipc_node_unlock(n); 306 tipc_node_write_unlock(n);
252 tipc_node_put(n); 307 tipc_node_put(n);
253} 308}
254 309
@@ -264,9 +319,9 @@ void tipc_node_unsubscribe(struct net *net, struct list_head *subscr, u32 addr)
264 pr_warn("Node unsubscribe rejected, unknown node 0x%x\n", addr); 319 pr_warn("Node unsubscribe rejected, unknown node 0x%x\n", addr);
265 return; 320 return;
266 } 321 }
267 tipc_node_lock(n); 322 tipc_node_write_lock(n);
268 list_del_init(subscr); 323 list_del_init(subscr);
269 tipc_node_unlock(n); 324 tipc_node_write_unlock(n);
270 tipc_node_put(n); 325 tipc_node_put(n);
271} 326}
272 327
@@ -293,9 +348,9 @@ int tipc_node_add_conn(struct net *net, u32 dnode, u32 port, u32 peer_port)
293 conn->port = port; 348 conn->port = port;
294 conn->peer_port = peer_port; 349 conn->peer_port = peer_port;
295 350
296 tipc_node_lock(node); 351 tipc_node_write_lock(node);
297 list_add_tail(&conn->list, &node->conn_sks); 352 list_add_tail(&conn->list, &node->conn_sks);
298 tipc_node_unlock(node); 353 tipc_node_write_unlock(node);
299exit: 354exit:
300 tipc_node_put(node); 355 tipc_node_put(node);
301 return err; 356 return err;
@@ -313,14 +368,14 @@ void tipc_node_remove_conn(struct net *net, u32 dnode, u32 port)
313 if (!node) 368 if (!node)
314 return; 369 return;
315 370
316 tipc_node_lock(node); 371 tipc_node_write_lock(node);
317 list_for_each_entry_safe(conn, safe, &node->conn_sks, list) { 372 list_for_each_entry_safe(conn, safe, &node->conn_sks, list) {
318 if (port != conn->port) 373 if (port != conn->port)
319 continue; 374 continue;
320 list_del(&conn->list); 375 list_del(&conn->list);
321 kfree(conn); 376 kfree(conn);
322 } 377 }
323 tipc_node_unlock(node); 378 tipc_node_write_unlock(node);
324 tipc_node_put(node); 379 tipc_node_put(node);
325} 380}
326 381
@@ -337,7 +392,7 @@ static void tipc_node_timeout(unsigned long data)
337 __skb_queue_head_init(&xmitq); 392 __skb_queue_head_init(&xmitq);
338 393
339 for (bearer_id = 0; bearer_id < MAX_BEARERS; bearer_id++) { 394 for (bearer_id = 0; bearer_id < MAX_BEARERS; bearer_id++) {
340 tipc_node_lock(n); 395 tipc_node_read_lock(n);
341 le = &n->links[bearer_id]; 396 le = &n->links[bearer_id];
342 spin_lock_bh(&le->lock); 397 spin_lock_bh(&le->lock);
343 if (le->link) { 398 if (le->link) {
@@ -346,7 +401,7 @@ static void tipc_node_timeout(unsigned long data)
346 rc = tipc_link_timeout(le->link, &xmitq); 401 rc = tipc_link_timeout(le->link, &xmitq);
347 } 402 }
348 spin_unlock_bh(&le->lock); 403 spin_unlock_bh(&le->lock);
349 tipc_node_unlock(n); 404 tipc_node_read_unlock(n);
350 tipc_bearer_xmit(n->net, bearer_id, &xmitq, &le->maddr); 405 tipc_bearer_xmit(n->net, bearer_id, &xmitq, &le->maddr);
351 if (rc & TIPC_LINK_DOWN_EVT) 406 if (rc & TIPC_LINK_DOWN_EVT)
352 tipc_node_link_down(n, bearer_id, false); 407 tipc_node_link_down(n, bearer_id, false);
@@ -425,9 +480,9 @@ static void __tipc_node_link_up(struct tipc_node *n, int bearer_id,
425static void tipc_node_link_up(struct tipc_node *n, int bearer_id, 480static void tipc_node_link_up(struct tipc_node *n, int bearer_id,
426 struct sk_buff_head *xmitq) 481 struct sk_buff_head *xmitq)
427{ 482{
428 tipc_node_lock(n); 483 tipc_node_write_lock(n);
429 __tipc_node_link_up(n, bearer_id, xmitq); 484 __tipc_node_link_up(n, bearer_id, xmitq);
430 tipc_node_unlock(n); 485 tipc_node_write_unlock(n);
431} 486}
432 487
433/** 488/**
@@ -516,7 +571,7 @@ static void tipc_node_link_down(struct tipc_node *n, int bearer_id, bool delete)
516 571
517 __skb_queue_head_init(&xmitq); 572 __skb_queue_head_init(&xmitq);
518 573
519 tipc_node_lock(n); 574 tipc_node_write_lock(n);
520 if (!tipc_link_is_establishing(l)) { 575 if (!tipc_link_is_establishing(l)) {
521 __tipc_node_link_down(n, &bearer_id, &xmitq, &maddr); 576 __tipc_node_link_down(n, &bearer_id, &xmitq, &maddr);
522 if (delete) { 577 if (delete) {
@@ -528,7 +583,7 @@ static void tipc_node_link_down(struct tipc_node *n, int bearer_id, bool delete)
528 /* Defuse pending tipc_node_link_up() */ 583 /* Defuse pending tipc_node_link_up() */
529 tipc_link_fsm_evt(l, LINK_RESET_EVT); 584 tipc_link_fsm_evt(l, LINK_RESET_EVT);
530 } 585 }
531 tipc_node_unlock(n); 586 tipc_node_write_unlock(n);
532 tipc_bearer_xmit(n->net, bearer_id, &xmitq, maddr); 587 tipc_bearer_xmit(n->net, bearer_id, &xmitq, maddr);
533 tipc_sk_rcv(n->net, &le->inputq); 588 tipc_sk_rcv(n->net, &le->inputq);
534} 589}
@@ -561,7 +616,7 @@ void tipc_node_check_dest(struct net *net, u32 onode,
561 if (!n) 616 if (!n)
562 return; 617 return;
563 618
564 tipc_node_lock(n); 619 tipc_node_write_lock(n);
565 620
566 le = &n->links[b->identity]; 621 le = &n->links[b->identity];
567 622
@@ -656,7 +711,6 @@ void tipc_node_check_dest(struct net *net, u32 onode,
656 if (n->state == NODE_FAILINGOVER) 711 if (n->state == NODE_FAILINGOVER)
657 tipc_link_fsm_evt(l, LINK_FAILOVER_BEGIN_EVT); 712 tipc_link_fsm_evt(l, LINK_FAILOVER_BEGIN_EVT);
658 le->link = l; 713 le->link = l;
659 spin_lock_init(&le->lock);
660 n->link_cnt++; 714 n->link_cnt++;
661 tipc_node_calculate_timer(n, l); 715 tipc_node_calculate_timer(n, l);
662 if (n->link_cnt == 1) 716 if (n->link_cnt == 1)
@@ -665,7 +719,7 @@ void tipc_node_check_dest(struct net *net, u32 onode,
665 } 719 }
666 memcpy(&le->maddr, maddr, sizeof(*maddr)); 720 memcpy(&le->maddr, maddr, sizeof(*maddr));
667exit: 721exit:
668 tipc_node_unlock(n); 722 tipc_node_write_unlock(n);
669 if (reset && !tipc_link_is_reset(l)) 723 if (reset && !tipc_link_is_reset(l))
670 tipc_node_link_down(n, b->identity, false); 724 tipc_node_link_down(n, b->identity, false);
671 tipc_node_put(n); 725 tipc_node_put(n);
@@ -873,24 +927,6 @@ illegal_evt:
873 pr_err("Illegal node fsm evt %x in state %x\n", evt, state); 927 pr_err("Illegal node fsm evt %x in state %x\n", evt, state);
874} 928}
875 929
876bool tipc_node_filter_pkt(struct tipc_node *n, struct tipc_msg *hdr)
877{
878 int state = n->state;
879
880 if (likely(state == SELF_UP_PEER_UP))
881 return true;
882
883 if (state == SELF_LEAVING_PEER_DOWN)
884 return false;
885
886 if (state == SELF_DOWN_PEER_LEAVING) {
887 if (msg_peer_node_is_up(hdr))
888 return false;
889 }
890
891 return true;
892}
893
894static void node_lost_contact(struct tipc_node *n, 930static void node_lost_contact(struct tipc_node *n,
895 struct sk_buff_head *inputq) 931 struct sk_buff_head *inputq)
896{ 932{
@@ -952,56 +988,18 @@ int tipc_node_get_linkname(struct net *net, u32 bearer_id, u32 addr,
952 if (bearer_id >= MAX_BEARERS) 988 if (bearer_id >= MAX_BEARERS)
953 goto exit; 989 goto exit;
954 990
955 tipc_node_lock(node); 991 tipc_node_read_lock(node);
956 link = node->links[bearer_id].link; 992 link = node->links[bearer_id].link;
957 if (link) { 993 if (link) {
958 strncpy(linkname, link->name, len); 994 strncpy(linkname, link->name, len);
959 err = 0; 995 err = 0;
960 } 996 }
961exit: 997exit:
962 tipc_node_unlock(node); 998 tipc_node_read_unlock(node);
963 tipc_node_put(node); 999 tipc_node_put(node);
964 return err; 1000 return err;
965} 1001}
966 1002
967void tipc_node_unlock(struct tipc_node *node)
968{
969 struct net *net = node->net;
970 u32 addr = 0;
971 u32 flags = node->action_flags;
972 u32 link_id = 0;
973 struct list_head *publ_list;
974
975 if (likely(!flags)) {
976 spin_unlock_bh(&node->lock);
977 return;
978 }
979
980 addr = node->addr;
981 link_id = node->link_id;
982 publ_list = &node->publ_list;
983
984 node->action_flags &= ~(TIPC_NOTIFY_NODE_DOWN | TIPC_NOTIFY_NODE_UP |
985 TIPC_NOTIFY_LINK_DOWN | TIPC_NOTIFY_LINK_UP);
986
987 spin_unlock_bh(&node->lock);
988
989 if (flags & TIPC_NOTIFY_NODE_DOWN)
990 tipc_publ_notify(net, publ_list, addr);
991
992 if (flags & TIPC_NOTIFY_NODE_UP)
993 tipc_named_node_up(net, addr);
994
995 if (flags & TIPC_NOTIFY_LINK_UP)
996 tipc_nametbl_publish(net, TIPC_LINK_STATE, addr, addr,
997 TIPC_NODE_SCOPE, link_id, addr);
998
999 if (flags & TIPC_NOTIFY_LINK_DOWN)
1000 tipc_nametbl_withdraw(net, TIPC_LINK_STATE, addr,
1001 link_id, addr);
1002
1003}
1004
1005/* Caller should hold node lock for the passed node */ 1003/* Caller should hold node lock for the passed node */
1006static int __tipc_nl_add_node(struct tipc_nl_msg *msg, struct tipc_node *node) 1004static int __tipc_nl_add_node(struct tipc_nl_msg *msg, struct tipc_node *node)
1007{ 1005{
@@ -1048,40 +1046,38 @@ msg_full:
1048int tipc_node_xmit(struct net *net, struct sk_buff_head *list, 1046int tipc_node_xmit(struct net *net, struct sk_buff_head *list,
1049 u32 dnode, int selector) 1047 u32 dnode, int selector)
1050{ 1048{
1051 struct tipc_link_entry *le; 1049 struct tipc_link_entry *le = NULL;
1052 struct tipc_node *n; 1050 struct tipc_node *n;
1053 struct sk_buff_head xmitq; 1051 struct sk_buff_head xmitq;
1054 struct tipc_media_addr *maddr = NULL;
1055 int bearer_id = -1; 1052 int bearer_id = -1;
1056 int rc = -EHOSTUNREACH; 1053 int rc = -EHOSTUNREACH;
1057 1054
1058 __skb_queue_head_init(&xmitq); 1055 __skb_queue_head_init(&xmitq);
1059 n = tipc_node_find(net, dnode); 1056 n = tipc_node_find(net, dnode);
1060 if (likely(n)) { 1057 if (likely(n)) {
1061 tipc_node_lock(n); 1058 tipc_node_read_lock(n);
1062 bearer_id = n->active_links[selector & 1]; 1059 bearer_id = n->active_links[selector & 1];
1063 if (bearer_id >= 0) { 1060 if (bearer_id >= 0) {
1064 le = &n->links[bearer_id]; 1061 le = &n->links[bearer_id];
1065 maddr = &le->maddr;
1066 spin_lock_bh(&le->lock); 1062 spin_lock_bh(&le->lock);
1067 if (likely(le->link)) 1063 rc = tipc_link_xmit(le->link, list, &xmitq);
1068 rc = tipc_link_xmit(le->link, list, &xmitq);
1069 spin_unlock_bh(&le->lock); 1064 spin_unlock_bh(&le->lock);
1070 } 1065 }
1071 tipc_node_unlock(n); 1066 tipc_node_read_unlock(n);
1067 if (likely(!skb_queue_empty(&xmitq))) {
1068 tipc_bearer_xmit(net, bearer_id, &xmitq, &le->maddr);
1069 return 0;
1070 }
1072 if (unlikely(rc == -ENOBUFS)) 1071 if (unlikely(rc == -ENOBUFS))
1073 tipc_node_link_down(n, bearer_id, false); 1072 tipc_node_link_down(n, bearer_id, false);
1074 tipc_node_put(n); 1073 tipc_node_put(n);
1074 return rc;
1075 } 1075 }
1076 if (likely(!skb_queue_empty(&xmitq))) { 1076
1077 tipc_bearer_xmit(net, bearer_id, &xmitq, maddr); 1077 if (unlikely(!in_own_node(net, dnode)))
1078 return 0; 1078 return rc;
1079 } 1079 tipc_sk_rcv(net, list);
1080 if (likely(in_own_node(net, dnode))) { 1080 return 0;
1081 tipc_sk_rcv(net, list);
1082 return 0;
1083 }
1084 return rc;
1085} 1081}
1086 1082
1087/* tipc_node_xmit_skb(): send single buffer to destination 1083/* tipc_node_xmit_skb(): send single buffer to destination
@@ -1171,9 +1167,9 @@ static void tipc_node_bc_rcv(struct net *net, struct sk_buff *skb, int bearer_id
1171 1167
1172 /* Broadcast ACKs are sent on a unicast link */ 1168 /* Broadcast ACKs are sent on a unicast link */
1173 if (rc & TIPC_LINK_SND_BC_ACK) { 1169 if (rc & TIPC_LINK_SND_BC_ACK) {
1174 tipc_node_lock(n); 1170 tipc_node_read_lock(n);
1175 tipc_link_build_ack_msg(le->link, &xmitq); 1171 tipc_link_build_ack_msg(le->link, &xmitq);
1176 tipc_node_unlock(n); 1172 tipc_node_read_unlock(n);
1177 } 1173 }
1178 1174
1179 if (!skb_queue_empty(&xmitq)) 1175 if (!skb_queue_empty(&xmitq))
@@ -1229,7 +1225,7 @@ static bool tipc_node_check_state(struct tipc_node *n, struct sk_buff *skb,
1229 } 1225 }
1230 } 1226 }
1231 1227
1232 /* Update node accesibility if applicable */ 1228 /* Check and update node accesibility if applicable */
1233 if (state == SELF_UP_PEER_COMING) { 1229 if (state == SELF_UP_PEER_COMING) {
1234 if (!tipc_link_is_up(l)) 1230 if (!tipc_link_is_up(l))
1235 return true; 1231 return true;
@@ -1245,6 +1241,9 @@ static bool tipc_node_check_state(struct tipc_node *n, struct sk_buff *skb,
1245 return true; 1241 return true;
1246 } 1242 }
1247 1243
1244 if (state == SELF_LEAVING_PEER_DOWN)
1245 return false;
1246
1248 /* Ignore duplicate packets */ 1247 /* Ignore duplicate packets */
1249 if ((usr != LINK_PROTOCOL) && less(oseqno, rcv_nxt)) 1248 if ((usr != LINK_PROTOCOL) && less(oseqno, rcv_nxt))
1250 return true; 1249 return true;
@@ -1361,21 +1360,29 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b)
1361 else if (unlikely(n->bc_entry.link->acked != bc_ack)) 1360 else if (unlikely(n->bc_entry.link->acked != bc_ack))
1362 tipc_bcast_ack_rcv(net, n->bc_entry.link, bc_ack); 1361 tipc_bcast_ack_rcv(net, n->bc_entry.link, bc_ack);
1363 1362
1364 tipc_node_lock(n); 1363 /* Receive packet directly if conditions permit */
1365 1364 tipc_node_read_lock(n);
1366 /* Is reception permitted at the moment ? */ 1365 if (likely((n->state == SELF_UP_PEER_UP) && (usr != TUNNEL_PROTOCOL))) {
1367 if (!tipc_node_filter_pkt(n, hdr))
1368 goto unlock;
1369
1370 /* Check and if necessary update node state */
1371 if (likely(tipc_node_check_state(n, skb, bearer_id, &xmitq))) {
1372 spin_lock_bh(&le->lock); 1366 spin_lock_bh(&le->lock);
1373 rc = tipc_link_rcv(le->link, skb, &xmitq); 1367 if (le->link) {
1368 rc = tipc_link_rcv(le->link, skb, &xmitq);
1369 skb = NULL;
1370 }
1374 spin_unlock_bh(&le->lock); 1371 spin_unlock_bh(&le->lock);
1375 skb = NULL;
1376 } 1372 }
1377unlock: 1373 tipc_node_read_unlock(n);
1378 tipc_node_unlock(n); 1374
1375 /* Check/update node state before receiving */
1376 if (unlikely(skb)) {
1377 tipc_node_write_lock(n);
1378 if (tipc_node_check_state(n, skb, bearer_id, &xmitq)) {
1379 if (le->link) {
1380 rc = tipc_link_rcv(le->link, skb, &xmitq);
1381 skb = NULL;
1382 }
1383 }
1384 tipc_node_write_unlock(n);
1385 }
1379 1386
1380 if (unlikely(rc & TIPC_LINK_UP_EVT)) 1387 if (unlikely(rc & TIPC_LINK_UP_EVT))
1381 tipc_node_link_up(n, bearer_id, &xmitq); 1388 tipc_node_link_up(n, bearer_id, &xmitq);
@@ -1440,15 +1447,15 @@ int tipc_nl_node_dump(struct sk_buff *skb, struct netlink_callback *cb)
1440 continue; 1447 continue;
1441 } 1448 }
1442 1449
1443 tipc_node_lock(node); 1450 tipc_node_read_lock(node);
1444 err = __tipc_nl_add_node(&msg, node); 1451 err = __tipc_nl_add_node(&msg, node);
1445 if (err) { 1452 if (err) {
1446 last_addr = node->addr; 1453 last_addr = node->addr;
1447 tipc_node_unlock(node); 1454 tipc_node_read_unlock(node);
1448 goto out; 1455 goto out;
1449 } 1456 }
1450 1457
1451 tipc_node_unlock(node); 1458 tipc_node_read_unlock(node);
1452 } 1459 }
1453 done = 1; 1460 done = 1;
1454out: 1461out: