aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc/bcast.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/tipc/bcast.c')
-rw-r--r--net/tipc/bcast.c57
1 files changed, 35 insertions, 22 deletions
diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index 3eaa931e2e8c..81b1fef1f5e0 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -79,6 +79,13 @@ static void tipc_bclink_unlock(struct net *net)
79 tipc_link_reset_all(node); 79 tipc_link_reset_all(node);
80} 80}
81 81
82void tipc_bclink_input(struct net *net)
83{
84 struct tipc_net *tn = net_generic(net, tipc_net_id);
85
86 tipc_sk_mcast_rcv(net, &tn->bclink->arrvq, &tn->bclink->inputq);
87}
88
82uint tipc_bclink_get_mtu(void) 89uint tipc_bclink_get_mtu(void)
83{ 90{
84 return MAX_PKT_DEFAULT_MCAST; 91 return MAX_PKT_DEFAULT_MCAST;
@@ -356,7 +363,7 @@ static void bclink_peek_nack(struct net *net, struct tipc_msg *msg)
356 tipc_node_unlock(n_ptr); 363 tipc_node_unlock(n_ptr);
357} 364}
358 365
359/* tipc_bclink_xmit - broadcast buffer chain to all nodes in cluster 366/* tipc_bclink_xmit - deliver buffer chain to all nodes in cluster
360 * and to identified node local sockets 367 * and to identified node local sockets
361 * @net: the applicable net namespace 368 * @net: the applicable net namespace
362 * @list: chain of buffers containing message 369 * @list: chain of buffers containing message
@@ -371,6 +378,8 @@ int tipc_bclink_xmit(struct net *net, struct sk_buff_head *list)
371 int rc = 0; 378 int rc = 0;
372 int bc = 0; 379 int bc = 0;
373 struct sk_buff *skb; 380 struct sk_buff *skb;
381 struct sk_buff_head arrvq;
382 struct sk_buff_head inputq;
374 383
375 /* Prepare clone of message for local node */ 384 /* Prepare clone of message for local node */
376 skb = tipc_msg_reassemble(list); 385 skb = tipc_msg_reassemble(list);
@@ -379,7 +388,7 @@ int tipc_bclink_xmit(struct net *net, struct sk_buff_head *list)
379 return -EHOSTUNREACH; 388 return -EHOSTUNREACH;
380 } 389 }
381 390
382 /* Broadcast to all other nodes */ 391 /* Broadcast to all nodes */
383 if (likely(bclink)) { 392 if (likely(bclink)) {
384 tipc_bclink_lock(net); 393 tipc_bclink_lock(net);
385 if (likely(bclink->bcast_nodes.count)) { 394 if (likely(bclink->bcast_nodes.count)) {
@@ -399,12 +408,15 @@ int tipc_bclink_xmit(struct net *net, struct sk_buff_head *list)
399 if (unlikely(!bc)) 408 if (unlikely(!bc))
400 __skb_queue_purge(list); 409 __skb_queue_purge(list);
401 410
402 /* Deliver message clone */ 411 if (unlikely(rc)) {
403 if (likely(!rc))
404 tipc_sk_mcast_rcv(net, skb);
405 else
406 kfree_skb(skb); 412 kfree_skb(skb);
407 413 return rc;
414 }
415 /* Deliver message clone */
416 __skb_queue_head_init(&arrvq);
417 skb_queue_head_init(&inputq);
418 __skb_queue_tail(&arrvq, skb);
419 tipc_sk_mcast_rcv(net, &arrvq, &inputq);
408 return rc; 420 return rc;
409} 421}
410 422
@@ -449,7 +461,7 @@ void tipc_bclink_rcv(struct net *net, struct sk_buff *buf)
449 int deferred = 0; 461 int deferred = 0;
450 int pos = 0; 462 int pos = 0;
451 struct sk_buff *iskb; 463 struct sk_buff *iskb;
452 struct sk_buff_head msgs; 464 struct sk_buff_head *arrvq, *inputq;
453 465
454 /* Screen out unwanted broadcast messages */ 466 /* Screen out unwanted broadcast messages */
455 if (msg_mc_netid(msg) != tn->net_id) 467 if (msg_mc_netid(msg) != tn->net_id)
@@ -486,6 +498,8 @@ void tipc_bclink_rcv(struct net *net, struct sk_buff *buf)
486 /* Handle in-sequence broadcast message */ 498 /* Handle in-sequence broadcast message */
487 seqno = msg_seqno(msg); 499 seqno = msg_seqno(msg);
488 next_in = mod(node->bclink.last_in + 1); 500 next_in = mod(node->bclink.last_in + 1);
501 arrvq = &tn->bclink->arrvq;
502 inputq = &tn->bclink->inputq;
489 503
490 if (likely(seqno == next_in)) { 504 if (likely(seqno == next_in)) {
491receive: 505receive:
@@ -493,21 +507,26 @@ receive:
493 if (likely(msg_isdata(msg))) { 507 if (likely(msg_isdata(msg))) {
494 tipc_bclink_lock(net); 508 tipc_bclink_lock(net);
495 bclink_accept_pkt(node, seqno); 509 bclink_accept_pkt(node, seqno);
510 spin_lock_bh(&inputq->lock);
511 __skb_queue_tail(arrvq, buf);
512 spin_unlock_bh(&inputq->lock);
513 node->action_flags |= TIPC_BCAST_MSG_EVT;
496 tipc_bclink_unlock(net); 514 tipc_bclink_unlock(net);
497 tipc_node_unlock(node); 515 tipc_node_unlock(node);
498 if (likely(msg_mcast(msg)))
499 tipc_sk_mcast_rcv(net, buf);
500 else
501 kfree_skb(buf);
502 } else if (msg_user(msg) == MSG_BUNDLER) { 516 } else if (msg_user(msg) == MSG_BUNDLER) {
503 tipc_bclink_lock(net); 517 tipc_bclink_lock(net);
504 bclink_accept_pkt(node, seqno); 518 bclink_accept_pkt(node, seqno);
505 bcl->stats.recv_bundles++; 519 bcl->stats.recv_bundles++;
506 bcl->stats.recv_bundled += msg_msgcnt(msg); 520 bcl->stats.recv_bundled += msg_msgcnt(msg);
521 pos = 0;
522 while (tipc_msg_extract(buf, &iskb, &pos)) {
523 spin_lock_bh(&inputq->lock);
524 __skb_queue_tail(arrvq, iskb);
525 spin_unlock_bh(&inputq->lock);
526 }
527 node->action_flags |= TIPC_BCAST_MSG_EVT;
507 tipc_bclink_unlock(net); 528 tipc_bclink_unlock(net);
508 tipc_node_unlock(node); 529 tipc_node_unlock(node);
509 while (tipc_msg_extract(buf, &iskb, &pos))
510 tipc_sk_mcast_rcv(net, iskb);
511 } else if (msg_user(msg) == MSG_FRAGMENTER) { 530 } else if (msg_user(msg) == MSG_FRAGMENTER) {
512 tipc_buf_append(&node->bclink.reasm_buf, &buf); 531 tipc_buf_append(&node->bclink.reasm_buf, &buf);
513 if (unlikely(!buf && !node->bclink.reasm_buf)) 532 if (unlikely(!buf && !node->bclink.reasm_buf))
@@ -523,14 +542,6 @@ receive:
523 } 542 }
524 tipc_bclink_unlock(net); 543 tipc_bclink_unlock(net);
525 tipc_node_unlock(node); 544 tipc_node_unlock(node);
526 } else if (msg_user(msg) == NAME_DISTRIBUTOR) {
527 tipc_bclink_lock(net);
528 bclink_accept_pkt(node, seqno);
529 tipc_bclink_unlock(net);
530 tipc_node_unlock(node);
531 skb_queue_head_init(&msgs);
532 skb_queue_tail(&msgs, buf);
533 tipc_named_rcv(net, &msgs);
534 } else { 545 } else {
535 tipc_bclink_lock(net); 546 tipc_bclink_lock(net);
536 bclink_accept_pkt(node, seqno); 547 bclink_accept_pkt(node, seqno);
@@ -950,6 +961,8 @@ int tipc_bclink_init(struct net *net)
950 skb_queue_head_init(&bcl->wakeupq); 961 skb_queue_head_init(&bcl->wakeupq);
951 bcl->next_out_no = 1; 962 bcl->next_out_no = 1;
952 spin_lock_init(&bclink->node.lock); 963 spin_lock_init(&bclink->node.lock);
964 __skb_queue_head_init(&bclink->arrvq);
965 skb_queue_head_init(&bclink->inputq);
953 bcl->owner = &bclink->node; 966 bcl->owner = &bclink->node;
954 bcl->owner->net = net; 967 bcl->owner->net = net;
955 bcl->max_pkt = MAX_PKT_DEFAULT_MCAST; 968 bcl->max_pkt = MAX_PKT_DEFAULT_MCAST;