aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc/node.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/tipc/node.c')
-rw-r--r--net/tipc/node.c63
1 files changed, 37 insertions, 26 deletions
diff --git a/net/tipc/node.c b/net/tipc/node.c
index 9d2f4c2b08ab..4512e83652b1 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -263,6 +263,11 @@ static void tipc_node_write_lock(struct tipc_node *n)
263 write_lock_bh(&n->lock); 263 write_lock_bh(&n->lock);
264} 264}
265 265
266static void tipc_node_write_unlock_fast(struct tipc_node *n)
267{
268 write_unlock_bh(&n->lock);
269}
270
266static void tipc_node_write_unlock(struct tipc_node *n) 271static void tipc_node_write_unlock(struct tipc_node *n)
267{ 272{
268 struct net *net = n->net; 273 struct net *net = n->net;
@@ -417,7 +422,7 @@ void tipc_node_subscribe(struct net *net, struct list_head *subscr, u32 addr)
417 } 422 }
418 tipc_node_write_lock(n); 423 tipc_node_write_lock(n);
419 list_add_tail(subscr, &n->publ_list); 424 list_add_tail(subscr, &n->publ_list);
420 tipc_node_write_unlock(n); 425 tipc_node_write_unlock_fast(n);
421 tipc_node_put(n); 426 tipc_node_put(n);
422} 427}
423 428
@@ -435,7 +440,7 @@ void tipc_node_unsubscribe(struct net *net, struct list_head *subscr, u32 addr)
435 } 440 }
436 tipc_node_write_lock(n); 441 tipc_node_write_lock(n);
437 list_del_init(subscr); 442 list_del_init(subscr);
438 tipc_node_write_unlock(n); 443 tipc_node_write_unlock_fast(n);
439 tipc_node_put(n); 444 tipc_node_put(n);
440} 445}
441 446
@@ -1167,7 +1172,7 @@ msg_full:
1167 * @list: chain of buffers containing message 1172 * @list: chain of buffers containing message
1168 * @dnode: address of destination node 1173 * @dnode: address of destination node
1169 * @selector: a number used for deterministic link selection 1174 * @selector: a number used for deterministic link selection
1170 * Consumes the buffer chain, except when returning -ELINKCONG 1175 * Consumes the buffer chain.
1171 * Returns 0 if success, otherwise: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE,-ENOBUF 1176 * Returns 0 if success, otherwise: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE,-ENOBUF
1172 */ 1177 */
1173int tipc_node_xmit(struct net *net, struct sk_buff_head *list, 1178int tipc_node_xmit(struct net *net, struct sk_buff_head *list,
@@ -1206,10 +1211,10 @@ int tipc_node_xmit(struct net *net, struct sk_buff_head *list,
1206 spin_unlock_bh(&le->lock); 1211 spin_unlock_bh(&le->lock);
1207 tipc_node_read_unlock(n); 1212 tipc_node_read_unlock(n);
1208 1213
1209 if (likely(rc == 0)) 1214 if (unlikely(rc == -ENOBUFS))
1210 tipc_bearer_xmit(net, bearer_id, &xmitq, &le->maddr);
1211 else if (rc == -ENOBUFS)
1212 tipc_node_link_down(n, bearer_id, false); 1215 tipc_node_link_down(n, bearer_id, false);
1216 else
1217 tipc_bearer_xmit(net, bearer_id, &xmitq, &le->maddr);
1213 1218
1214 tipc_node_put(n); 1219 tipc_node_put(n);
1215 1220
@@ -1221,20 +1226,15 @@ int tipc_node_xmit(struct net *net, struct sk_buff_head *list,
1221 * messages, which will not be rejected 1226 * messages, which will not be rejected
1222 * The only exception is datagram messages rerouted after secondary 1227 * The only exception is datagram messages rerouted after secondary
1223 * lookup, which are rare and safe to dispose of anyway. 1228 * lookup, which are rare and safe to dispose of anyway.
1224 * TODO: Return real return value, and let callers use
1225 * tipc_wait_for_sendpkt() where applicable
1226 */ 1229 */
1227int tipc_node_xmit_skb(struct net *net, struct sk_buff *skb, u32 dnode, 1230int tipc_node_xmit_skb(struct net *net, struct sk_buff *skb, u32 dnode,
1228 u32 selector) 1231 u32 selector)
1229{ 1232{
1230 struct sk_buff_head head; 1233 struct sk_buff_head head;
1231 int rc;
1232 1234
1233 skb_queue_head_init(&head); 1235 skb_queue_head_init(&head);
1234 __skb_queue_tail(&head, skb); 1236 __skb_queue_tail(&head, skb);
1235 rc = tipc_node_xmit(net, &head, dnode, selector); 1237 tipc_node_xmit(net, &head, dnode, selector);
1236 if (rc == -ELINKCONG)
1237 kfree_skb(skb);
1238 return 0; 1238 return 0;
1239} 1239}
1240 1240
@@ -1262,6 +1262,19 @@ void tipc_node_broadcast(struct net *net, struct sk_buff *skb)
1262 kfree_skb(skb); 1262 kfree_skb(skb);
1263} 1263}
1264 1264
1265static void tipc_node_mcast_rcv(struct tipc_node *n)
1266{
1267 struct tipc_bclink_entry *be = &n->bc_entry;
1268
1269 /* 'arrvq' is under inputq2's lock protection */
1270 spin_lock_bh(&be->inputq2.lock);
1271 spin_lock_bh(&be->inputq1.lock);
1272 skb_queue_splice_tail_init(&be->inputq1, &be->arrvq);
1273 spin_unlock_bh(&be->inputq1.lock);
1274 spin_unlock_bh(&be->inputq2.lock);
1275 tipc_sk_mcast_rcv(n->net, &be->arrvq, &be->inputq2);
1276}
1277
1265static void tipc_node_bc_sync_rcv(struct tipc_node *n, struct tipc_msg *hdr, 1278static void tipc_node_bc_sync_rcv(struct tipc_node *n, struct tipc_msg *hdr,
1266 int bearer_id, struct sk_buff_head *xmitq) 1279 int bearer_id, struct sk_buff_head *xmitq)
1267{ 1280{
@@ -1335,15 +1348,8 @@ static void tipc_node_bc_rcv(struct net *net, struct sk_buff *skb, int bearer_id
1335 if (!skb_queue_empty(&xmitq)) 1348 if (!skb_queue_empty(&xmitq))
1336 tipc_bearer_xmit(net, bearer_id, &xmitq, &le->maddr); 1349 tipc_bearer_xmit(net, bearer_id, &xmitq, &le->maddr);
1337 1350
1338 /* Deliver. 'arrvq' is under inputq2's lock protection */ 1351 if (!skb_queue_empty(&be->inputq1))
1339 if (!skb_queue_empty(&be->inputq1)) { 1352 tipc_node_mcast_rcv(n);
1340 spin_lock_bh(&be->inputq2.lock);
1341 spin_lock_bh(&be->inputq1.lock);
1342 skb_queue_splice_tail_init(&be->inputq1, &be->arrvq);
1343 spin_unlock_bh(&be->inputq1.lock);
1344 spin_unlock_bh(&be->inputq2.lock);
1345 tipc_sk_mcast_rcv(net, &be->arrvq, &be->inputq2);
1346 }
1347 1353
1348 if (rc & TIPC_LINK_DOWN_EVT) { 1354 if (rc & TIPC_LINK_DOWN_EVT) {
1349 /* Reception reassembly failure => reset all links to peer */ 1355 /* Reception reassembly failure => reset all links to peer */
@@ -1499,19 +1505,21 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b)
1499{ 1505{
1500 struct sk_buff_head xmitq; 1506 struct sk_buff_head xmitq;
1501 struct tipc_node *n; 1507 struct tipc_node *n;
1502 struct tipc_msg *hdr = buf_msg(skb); 1508 struct tipc_msg *hdr;
1503 int usr = msg_user(hdr);
1504 int bearer_id = b->identity; 1509 int bearer_id = b->identity;
1505 struct tipc_link_entry *le; 1510 struct tipc_link_entry *le;
1506 u16 bc_ack = msg_bcast_ack(hdr);
1507 u32 self = tipc_own_addr(net); 1511 u32 self = tipc_own_addr(net);
1508 int rc = 0; 1512 int usr, rc = 0;
1513 u16 bc_ack;
1509 1514
1510 __skb_queue_head_init(&xmitq); 1515 __skb_queue_head_init(&xmitq);
1511 1516
1512 /* Ensure message is well-formed */ 1517 /* Ensure message is well-formed before touching the header */
1513 if (unlikely(!tipc_msg_validate(skb))) 1518 if (unlikely(!tipc_msg_validate(skb)))
1514 goto discard; 1519 goto discard;
1520 hdr = buf_msg(skb);
1521 usr = msg_user(hdr);
1522 bc_ack = msg_bcast_ack(hdr);
1515 1523
1516 /* Handle arrival of discovery or broadcast packet */ 1524 /* Handle arrival of discovery or broadcast packet */
1517 if (unlikely(msg_non_seq(hdr))) { 1525 if (unlikely(msg_non_seq(hdr))) {
@@ -1570,6 +1578,9 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b)
1570 if (unlikely(!skb_queue_empty(&n->bc_entry.namedq))) 1578 if (unlikely(!skb_queue_empty(&n->bc_entry.namedq)))
1571 tipc_named_rcv(net, &n->bc_entry.namedq); 1579 tipc_named_rcv(net, &n->bc_entry.namedq);
1572 1580
1581 if (unlikely(!skb_queue_empty(&n->bc_entry.inputq1)))
1582 tipc_node_mcast_rcv(n);
1583
1573 if (!skb_queue_empty(&le->inputq)) 1584 if (!skb_queue_empty(&le->inputq))
1574 tipc_sk_rcv(net, &le->inputq); 1585 tipc_sk_rcv(net, &le->inputq);
1575 1586