aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc/bcast.c
diff options
context:
space:
mode:
authorJon Paul Maloy <jon.maloy@ericsson.com>2015-02-05 08:36:44 -0500
committerDavid S. Miller <davem@davemloft.net>2015-02-05 19:00:03 -0500
commitcb1b728096f54e7408d60fb571944bed00c5b771 (patch)
tree1b1e50e705f4ddd3b36a43ac9067538db3e8233f /net/tipc/bcast.c
parent3c724acdd5049907555a831f814bfd5927c3350c (diff)
tipc: eliminate race condition at multicast reception
In a previous commit in this series we resolved a race problem during unicast message reception. Here, we resolve the same problem at multicast reception. We apply the same technique: an input queue serializing the delivery of arriving buffers. The main difference is that here we do it in two steps. First, the broadcast link feeds arriving buffers into the tail of an arrival queue, which head is consumed at the socket level, and where destination lookup is performed. Second, if the lookup is successful, the resulting buffer clones are fed into a second queue, the input queue. This queue is consumed at reception in the socket just like in the unicast case. Both queues are protected by the same lock, -the one of the input queue. Reviewed-by: Ying Xue <ying.xue@windriver.com> Signed-off-by: Jon Maloy <jon.maloy@ericsson.com> Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net/tipc/bcast.c')
-rw-r--r--net/tipc/bcast.c57
1 files changed, 35 insertions, 22 deletions
diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index 3eaa931e2e8c..81b1fef1f5e0 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -79,6 +79,13 @@ static void tipc_bclink_unlock(struct net *net)
79 tipc_link_reset_all(node); 79 tipc_link_reset_all(node);
80} 80}
81 81
82void tipc_bclink_input(struct net *net)
83{
84 struct tipc_net *tn = net_generic(net, tipc_net_id);
85
86 tipc_sk_mcast_rcv(net, &tn->bclink->arrvq, &tn->bclink->inputq);
87}
88
82uint tipc_bclink_get_mtu(void) 89uint tipc_bclink_get_mtu(void)
83{ 90{
84 return MAX_PKT_DEFAULT_MCAST; 91 return MAX_PKT_DEFAULT_MCAST;
@@ -356,7 +363,7 @@ static void bclink_peek_nack(struct net *net, struct tipc_msg *msg)
356 tipc_node_unlock(n_ptr); 363 tipc_node_unlock(n_ptr);
357} 364}
358 365
359/* tipc_bclink_xmit - broadcast buffer chain to all nodes in cluster 366/* tipc_bclink_xmit - deliver buffer chain to all nodes in cluster
360 * and to identified node local sockets 367 * and to identified node local sockets
361 * @net: the applicable net namespace 368 * @net: the applicable net namespace
362 * @list: chain of buffers containing message 369 * @list: chain of buffers containing message
@@ -371,6 +378,8 @@ int tipc_bclink_xmit(struct net *net, struct sk_buff_head *list)
371 int rc = 0; 378 int rc = 0;
372 int bc = 0; 379 int bc = 0;
373 struct sk_buff *skb; 380 struct sk_buff *skb;
381 struct sk_buff_head arrvq;
382 struct sk_buff_head inputq;
374 383
375 /* Prepare clone of message for local node */ 384 /* Prepare clone of message for local node */
376 skb = tipc_msg_reassemble(list); 385 skb = tipc_msg_reassemble(list);
@@ -379,7 +388,7 @@ int tipc_bclink_xmit(struct net *net, struct sk_buff_head *list)
379 return -EHOSTUNREACH; 388 return -EHOSTUNREACH;
380 } 389 }
381 390
382 /* Broadcast to all other nodes */ 391 /* Broadcast to all nodes */
383 if (likely(bclink)) { 392 if (likely(bclink)) {
384 tipc_bclink_lock(net); 393 tipc_bclink_lock(net);
385 if (likely(bclink->bcast_nodes.count)) { 394 if (likely(bclink->bcast_nodes.count)) {
@@ -399,12 +408,15 @@ int tipc_bclink_xmit(struct net *net, struct sk_buff_head *list)
399 if (unlikely(!bc)) 408 if (unlikely(!bc))
400 __skb_queue_purge(list); 409 __skb_queue_purge(list);
401 410
402 /* Deliver message clone */ 411 if (unlikely(rc)) {
403 if (likely(!rc))
404 tipc_sk_mcast_rcv(net, skb);
405 else
406 kfree_skb(skb); 412 kfree_skb(skb);
407 413 return rc;
414 }
415 /* Deliver message clone */
416 __skb_queue_head_init(&arrvq);
417 skb_queue_head_init(&inputq);
418 __skb_queue_tail(&arrvq, skb);
419 tipc_sk_mcast_rcv(net, &arrvq, &inputq);
408 return rc; 420 return rc;
409} 421}
410 422
@@ -449,7 +461,7 @@ void tipc_bclink_rcv(struct net *net, struct sk_buff *buf)
449 int deferred = 0; 461 int deferred = 0;
450 int pos = 0; 462 int pos = 0;
451 struct sk_buff *iskb; 463 struct sk_buff *iskb;
452 struct sk_buff_head msgs; 464 struct sk_buff_head *arrvq, *inputq;
453 465
454 /* Screen out unwanted broadcast messages */ 466 /* Screen out unwanted broadcast messages */
455 if (msg_mc_netid(msg) != tn->net_id) 467 if (msg_mc_netid(msg) != tn->net_id)
@@ -486,6 +498,8 @@ void tipc_bclink_rcv(struct net *net, struct sk_buff *buf)
486 /* Handle in-sequence broadcast message */ 498 /* Handle in-sequence broadcast message */
487 seqno = msg_seqno(msg); 499 seqno = msg_seqno(msg);
488 next_in = mod(node->bclink.last_in + 1); 500 next_in = mod(node->bclink.last_in + 1);
501 arrvq = &tn->bclink->arrvq;
502 inputq = &tn->bclink->inputq;
489 503
490 if (likely(seqno == next_in)) { 504 if (likely(seqno == next_in)) {
491receive: 505receive:
@@ -493,21 +507,26 @@ receive:
493 if (likely(msg_isdata(msg))) { 507 if (likely(msg_isdata(msg))) {
494 tipc_bclink_lock(net); 508 tipc_bclink_lock(net);
495 bclink_accept_pkt(node, seqno); 509 bclink_accept_pkt(node, seqno);
510 spin_lock_bh(&inputq->lock);
511 __skb_queue_tail(arrvq, buf);
512 spin_unlock_bh(&inputq->lock);
513 node->action_flags |= TIPC_BCAST_MSG_EVT;
496 tipc_bclink_unlock(net); 514 tipc_bclink_unlock(net);
497 tipc_node_unlock(node); 515 tipc_node_unlock(node);
498 if (likely(msg_mcast(msg)))
499 tipc_sk_mcast_rcv(net, buf);
500 else
501 kfree_skb(buf);
502 } else if (msg_user(msg) == MSG_BUNDLER) { 516 } else if (msg_user(msg) == MSG_BUNDLER) {
503 tipc_bclink_lock(net); 517 tipc_bclink_lock(net);
504 bclink_accept_pkt(node, seqno); 518 bclink_accept_pkt(node, seqno);
505 bcl->stats.recv_bundles++; 519 bcl->stats.recv_bundles++;
506 bcl->stats.recv_bundled += msg_msgcnt(msg); 520 bcl->stats.recv_bundled += msg_msgcnt(msg);
521 pos = 0;
522 while (tipc_msg_extract(buf, &iskb, &pos)) {
523 spin_lock_bh(&inputq->lock);
524 __skb_queue_tail(arrvq, iskb);
525 spin_unlock_bh(&inputq->lock);
526 }
527 node->action_flags |= TIPC_BCAST_MSG_EVT;
507 tipc_bclink_unlock(net); 528 tipc_bclink_unlock(net);
508 tipc_node_unlock(node); 529 tipc_node_unlock(node);
509 while (tipc_msg_extract(buf, &iskb, &pos))
510 tipc_sk_mcast_rcv(net, iskb);
511 } else if (msg_user(msg) == MSG_FRAGMENTER) { 530 } else if (msg_user(msg) == MSG_FRAGMENTER) {
512 tipc_buf_append(&node->bclink.reasm_buf, &buf); 531 tipc_buf_append(&node->bclink.reasm_buf, &buf);
513 if (unlikely(!buf && !node->bclink.reasm_buf)) 532 if (unlikely(!buf && !node->bclink.reasm_buf))
@@ -523,14 +542,6 @@ receive:
523 } 542 }
524 tipc_bclink_unlock(net); 543 tipc_bclink_unlock(net);
525 tipc_node_unlock(node); 544 tipc_node_unlock(node);
526 } else if (msg_user(msg) == NAME_DISTRIBUTOR) {
527 tipc_bclink_lock(net);
528 bclink_accept_pkt(node, seqno);
529 tipc_bclink_unlock(net);
530 tipc_node_unlock(node);
531 skb_queue_head_init(&msgs);
532 skb_queue_tail(&msgs, buf);
533 tipc_named_rcv(net, &msgs);
534 } else { 545 } else {
535 tipc_bclink_lock(net); 546 tipc_bclink_lock(net);
536 bclink_accept_pkt(node, seqno); 547 bclink_accept_pkt(node, seqno);
@@ -950,6 +961,8 @@ int tipc_bclink_init(struct net *net)
950 skb_queue_head_init(&bcl->wakeupq); 961 skb_queue_head_init(&bcl->wakeupq);
951 bcl->next_out_no = 1; 962 bcl->next_out_no = 1;
952 spin_lock_init(&bclink->node.lock); 963 spin_lock_init(&bclink->node.lock);
964 __skb_queue_head_init(&bclink->arrvq);
965 skb_queue_head_init(&bclink->inputq);
953 bcl->owner = &bclink->node; 966 bcl->owner = &bclink->node;
954 bcl->owner->net = net; 967 bcl->owner->net = net;
955 bcl->max_pkt = MAX_PKT_DEFAULT_MCAST; 968 bcl->max_pkt = MAX_PKT_DEFAULT_MCAST;