aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--net/tipc/bcast.c57
-rw-r--r--net/tipc/bcast.h3
-rw-r--r--net/tipc/msg.h17
-rw-r--r--net/tipc/name_table.h2
-rw-r--r--net/tipc/node.c11
-rw-r--r--net/tipc/node.h7
-rw-r--r--net/tipc/socket.c72
-rw-r--r--net/tipc/socket.h4
8 files changed, 114 insertions, 59 deletions
diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index 3eaa931e2e8c..81b1fef1f5e0 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -79,6 +79,13 @@ static void tipc_bclink_unlock(struct net *net)
79 tipc_link_reset_all(node); 79 tipc_link_reset_all(node);
80} 80}
81 81
82void tipc_bclink_input(struct net *net)
83{
84 struct tipc_net *tn = net_generic(net, tipc_net_id);
85
86 tipc_sk_mcast_rcv(net, &tn->bclink->arrvq, &tn->bclink->inputq);
87}
88
82uint tipc_bclink_get_mtu(void) 89uint tipc_bclink_get_mtu(void)
83{ 90{
84 return MAX_PKT_DEFAULT_MCAST; 91 return MAX_PKT_DEFAULT_MCAST;
@@ -356,7 +363,7 @@ static void bclink_peek_nack(struct net *net, struct tipc_msg *msg)
356 tipc_node_unlock(n_ptr); 363 tipc_node_unlock(n_ptr);
357} 364}
358 365
359/* tipc_bclink_xmit - broadcast buffer chain to all nodes in cluster 366/* tipc_bclink_xmit - deliver buffer chain to all nodes in cluster
360 * and to identified node local sockets 367 * and to identified node local sockets
361 * @net: the applicable net namespace 368 * @net: the applicable net namespace
362 * @list: chain of buffers containing message 369 * @list: chain of buffers containing message
@@ -371,6 +378,8 @@ int tipc_bclink_xmit(struct net *net, struct sk_buff_head *list)
371 int rc = 0; 378 int rc = 0;
372 int bc = 0; 379 int bc = 0;
373 struct sk_buff *skb; 380 struct sk_buff *skb;
381 struct sk_buff_head arrvq;
382 struct sk_buff_head inputq;
374 383
375 /* Prepare clone of message for local node */ 384 /* Prepare clone of message for local node */
376 skb = tipc_msg_reassemble(list); 385 skb = tipc_msg_reassemble(list);
@@ -379,7 +388,7 @@ int tipc_bclink_xmit(struct net *net, struct sk_buff_head *list)
379 return -EHOSTUNREACH; 388 return -EHOSTUNREACH;
380 } 389 }
381 390
382 /* Broadcast to all other nodes */ 391 /* Broadcast to all nodes */
383 if (likely(bclink)) { 392 if (likely(bclink)) {
384 tipc_bclink_lock(net); 393 tipc_bclink_lock(net);
385 if (likely(bclink->bcast_nodes.count)) { 394 if (likely(bclink->bcast_nodes.count)) {
@@ -399,12 +408,15 @@ int tipc_bclink_xmit(struct net *net, struct sk_buff_head *list)
399 if (unlikely(!bc)) 408 if (unlikely(!bc))
400 __skb_queue_purge(list); 409 __skb_queue_purge(list);
401 410
402 /* Deliver message clone */ 411 if (unlikely(rc)) {
403 if (likely(!rc))
404 tipc_sk_mcast_rcv(net, skb);
405 else
406 kfree_skb(skb); 412 kfree_skb(skb);
407 413 return rc;
414 }
415 /* Deliver message clone */
416 __skb_queue_head_init(&arrvq);
417 skb_queue_head_init(&inputq);
418 __skb_queue_tail(&arrvq, skb);
419 tipc_sk_mcast_rcv(net, &arrvq, &inputq);
408 return rc; 420 return rc;
409} 421}
410 422
@@ -449,7 +461,7 @@ void tipc_bclink_rcv(struct net *net, struct sk_buff *buf)
449 int deferred = 0; 461 int deferred = 0;
450 int pos = 0; 462 int pos = 0;
451 struct sk_buff *iskb; 463 struct sk_buff *iskb;
452 struct sk_buff_head msgs; 464 struct sk_buff_head *arrvq, *inputq;
453 465
454 /* Screen out unwanted broadcast messages */ 466 /* Screen out unwanted broadcast messages */
455 if (msg_mc_netid(msg) != tn->net_id) 467 if (msg_mc_netid(msg) != tn->net_id)
@@ -486,6 +498,8 @@ void tipc_bclink_rcv(struct net *net, struct sk_buff *buf)
486 /* Handle in-sequence broadcast message */ 498 /* Handle in-sequence broadcast message */
487 seqno = msg_seqno(msg); 499 seqno = msg_seqno(msg);
488 next_in = mod(node->bclink.last_in + 1); 500 next_in = mod(node->bclink.last_in + 1);
501 arrvq = &tn->bclink->arrvq;
502 inputq = &tn->bclink->inputq;
489 503
490 if (likely(seqno == next_in)) { 504 if (likely(seqno == next_in)) {
491receive: 505receive:
@@ -493,21 +507,26 @@ receive:
493 if (likely(msg_isdata(msg))) { 507 if (likely(msg_isdata(msg))) {
494 tipc_bclink_lock(net); 508 tipc_bclink_lock(net);
495 bclink_accept_pkt(node, seqno); 509 bclink_accept_pkt(node, seqno);
510 spin_lock_bh(&inputq->lock);
511 __skb_queue_tail(arrvq, buf);
512 spin_unlock_bh(&inputq->lock);
513 node->action_flags |= TIPC_BCAST_MSG_EVT;
496 tipc_bclink_unlock(net); 514 tipc_bclink_unlock(net);
497 tipc_node_unlock(node); 515 tipc_node_unlock(node);
498 if (likely(msg_mcast(msg)))
499 tipc_sk_mcast_rcv(net, buf);
500 else
501 kfree_skb(buf);
502 } else if (msg_user(msg) == MSG_BUNDLER) { 516 } else if (msg_user(msg) == MSG_BUNDLER) {
503 tipc_bclink_lock(net); 517 tipc_bclink_lock(net);
504 bclink_accept_pkt(node, seqno); 518 bclink_accept_pkt(node, seqno);
505 bcl->stats.recv_bundles++; 519 bcl->stats.recv_bundles++;
506 bcl->stats.recv_bundled += msg_msgcnt(msg); 520 bcl->stats.recv_bundled += msg_msgcnt(msg);
521 pos = 0;
522 while (tipc_msg_extract(buf, &iskb, &pos)) {
523 spin_lock_bh(&inputq->lock);
524 __skb_queue_tail(arrvq, iskb);
525 spin_unlock_bh(&inputq->lock);
526 }
527 node->action_flags |= TIPC_BCAST_MSG_EVT;
507 tipc_bclink_unlock(net); 528 tipc_bclink_unlock(net);
508 tipc_node_unlock(node); 529 tipc_node_unlock(node);
509 while (tipc_msg_extract(buf, &iskb, &pos))
510 tipc_sk_mcast_rcv(net, iskb);
511 } else if (msg_user(msg) == MSG_FRAGMENTER) { 530 } else if (msg_user(msg) == MSG_FRAGMENTER) {
512 tipc_buf_append(&node->bclink.reasm_buf, &buf); 531 tipc_buf_append(&node->bclink.reasm_buf, &buf);
513 if (unlikely(!buf && !node->bclink.reasm_buf)) 532 if (unlikely(!buf && !node->bclink.reasm_buf))
@@ -523,14 +542,6 @@ receive:
523 } 542 }
524 tipc_bclink_unlock(net); 543 tipc_bclink_unlock(net);
525 tipc_node_unlock(node); 544 tipc_node_unlock(node);
526 } else if (msg_user(msg) == NAME_DISTRIBUTOR) {
527 tipc_bclink_lock(net);
528 bclink_accept_pkt(node, seqno);
529 tipc_bclink_unlock(net);
530 tipc_node_unlock(node);
531 skb_queue_head_init(&msgs);
532 skb_queue_tail(&msgs, buf);
533 tipc_named_rcv(net, &msgs);
534 } else { 545 } else {
535 tipc_bclink_lock(net); 546 tipc_bclink_lock(net);
536 bclink_accept_pkt(node, seqno); 547 bclink_accept_pkt(node, seqno);
@@ -950,6 +961,8 @@ int tipc_bclink_init(struct net *net)
950 skb_queue_head_init(&bcl->wakeupq); 961 skb_queue_head_init(&bcl->wakeupq);
951 bcl->next_out_no = 1; 962 bcl->next_out_no = 1;
952 spin_lock_init(&bclink->node.lock); 963 spin_lock_init(&bclink->node.lock);
964 __skb_queue_head_init(&bclink->arrvq);
965 skb_queue_head_init(&bclink->inputq);
953 bcl->owner = &bclink->node; 966 bcl->owner = &bclink->node;
954 bcl->owner->net = net; 967 bcl->owner->net = net;
955 bcl->max_pkt = MAX_PKT_DEFAULT_MCAST; 968 bcl->max_pkt = MAX_PKT_DEFAULT_MCAST;
diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h
index 8f4d4dc38e11..a910c0b9f249 100644
--- a/net/tipc/bcast.h
+++ b/net/tipc/bcast.h
@@ -97,6 +97,8 @@ struct tipc_bclink {
97 struct tipc_link link; 97 struct tipc_link link;
98 struct tipc_node node; 98 struct tipc_node node;
99 unsigned int flags; 99 unsigned int flags;
100 struct sk_buff_head arrvq;
101 struct sk_buff_head inputq;
100 struct tipc_node_map bcast_nodes; 102 struct tipc_node_map bcast_nodes;
101 struct tipc_node *retransmit_to; 103 struct tipc_node *retransmit_to;
102}; 104};
@@ -134,5 +136,6 @@ uint tipc_bclink_get_mtu(void);
134int tipc_bclink_xmit(struct net *net, struct sk_buff_head *list); 136int tipc_bclink_xmit(struct net *net, struct sk_buff_head *list);
135void tipc_bclink_wakeup_users(struct net *net); 137void tipc_bclink_wakeup_users(struct net *net);
136int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg); 138int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg);
139void tipc_bclink_input(struct net *net);
137 140
138#endif 141#endif
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index ab467261bd9d..9ace47f44a69 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -767,6 +767,23 @@ bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb, u32 *dnode,
767 int *err); 767 int *err);
768struct sk_buff *tipc_msg_reassemble(struct sk_buff_head *list); 768struct sk_buff *tipc_msg_reassemble(struct sk_buff_head *list);
769 769
770/* tipc_skb_peek(): peek and reserve first buffer in list
771 * @list: list to be peeked in
772 * Returns pointer to first buffer in list, if any
773 */
774static inline struct sk_buff *tipc_skb_peek(struct sk_buff_head *list,
775 spinlock_t *lock)
776{
777 struct sk_buff *skb;
778
779 spin_lock_bh(lock);
780 skb = skb_peek(list);
781 if (skb)
782 skb_get(skb);
783 spin_unlock_bh(lock);
784 return skb;
785}
786
770/* tipc_skb_peek_port(): find a destination port, ignoring all destinations 787/* tipc_skb_peek_port(): find a destination port, ignoring all destinations
771 * up to and including 'filter'. 788 * up to and including 'filter'.
772 * Note: ignoring previously tried destinations minimizes the risk of 789 * Note: ignoring previously tried destinations minimizes the risk of
diff --git a/net/tipc/name_table.h b/net/tipc/name_table.h
index 52501fdaafa5..0304ddc6b101 100644
--- a/net/tipc/name_table.h
+++ b/net/tipc/name_table.h
@@ -1,7 +1,7 @@
1/* 1/*
2 * net/tipc/name_table.h: Include file for TIPC name table code 2 * net/tipc/name_table.h: Include file for TIPC name table code
3 * 3 *
4 * Copyright (c) 2000-2006, 2014, Ericsson AB 4 * Copyright (c) 2000-2006, 2014-2015, Ericsson AB
5 * Copyright (c) 2004-2005, 2010-2011, Wind River Systems 5 * Copyright (c) 2004-2005, 2010-2011, Wind River Systems
6 * All rights reserved. 6 * All rights reserved.
7 * 7 *
diff --git a/net/tipc/node.c b/net/tipc/node.c
index c7fdf3dec92c..52308498f208 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -582,10 +582,10 @@ void tipc_node_unlock(struct tipc_node *node)
582 namedq = node->namedq; 582 namedq = node->namedq;
583 publ_list = &node->publ_list; 583 publ_list = &node->publ_list;
584 584
585 node->action_flags &= ~(TIPC_MSG_EVT | TIPC_NOTIFY_NODE_DOWN | 585 node->action_flags &= ~(TIPC_MSG_EVT |
586 TIPC_NOTIFY_NODE_UP | TIPC_NOTIFY_LINK_UP | 586 TIPC_NOTIFY_NODE_DOWN | TIPC_NOTIFY_NODE_UP |
587 TIPC_NOTIFY_LINK_DOWN | 587 TIPC_NOTIFY_LINK_DOWN | TIPC_NOTIFY_LINK_UP |
588 TIPC_WAKEUP_BCAST_USERS | 588 TIPC_WAKEUP_BCAST_USERS | TIPC_BCAST_MSG_EVT |
589 TIPC_NAMED_MSG_EVT); 589 TIPC_NAMED_MSG_EVT);
590 590
591 spin_unlock_bh(&node->lock); 591 spin_unlock_bh(&node->lock);
@@ -612,6 +612,9 @@ void tipc_node_unlock(struct tipc_node *node)
612 612
613 if (flags & TIPC_NAMED_MSG_EVT) 613 if (flags & TIPC_NAMED_MSG_EVT)
614 tipc_named_rcv(net, namedq); 614 tipc_named_rcv(net, namedq);
615
616 if (flags & TIPC_BCAST_MSG_EVT)
617 tipc_bclink_input(net);
615} 618}
616 619
617/* Caller should hold node lock for the passed node */ 620/* Caller should hold node lock for the passed node */
diff --git a/net/tipc/node.h b/net/tipc/node.h
index c2b0fcf4042b..20ec13f9bede 100644
--- a/net/tipc/node.h
+++ b/net/tipc/node.h
@@ -1,7 +1,7 @@
1/* 1/*
2 * net/tipc/node.h: Include file for TIPC node management routines 2 * net/tipc/node.h: Include file for TIPC node management routines
3 * 3 *
4 * Copyright (c) 2000-2006, 2014, Ericsson AB 4 * Copyright (c) 2000-2006, 2014-2015, Ericsson AB
5 * Copyright (c) 2005, 2010-2014, Wind River Systems 5 * Copyright (c) 2005, 2010-2014, Wind River Systems
6 * All rights reserved. 6 * All rights reserved.
7 * 7 *
@@ -63,7 +63,8 @@ enum {
63 TIPC_WAKEUP_BCAST_USERS = (1 << 5), 63 TIPC_WAKEUP_BCAST_USERS = (1 << 5),
64 TIPC_NOTIFY_LINK_UP = (1 << 6), 64 TIPC_NOTIFY_LINK_UP = (1 << 6),
65 TIPC_NOTIFY_LINK_DOWN = (1 << 7), 65 TIPC_NOTIFY_LINK_DOWN = (1 << 7),
66 TIPC_NAMED_MSG_EVT = (1 << 8) 66 TIPC_NAMED_MSG_EVT = (1 << 8),
67 TIPC_BCAST_MSG_EVT = (1 << 9)
67}; 68};
68 69
69/** 70/**
@@ -74,6 +75,7 @@ enum {
74 * @oos_state: state tracker for handling OOS b'cast messages 75 * @oos_state: state tracker for handling OOS b'cast messages
75 * @deferred_queue: deferred queue saved OOS b'cast message received from node 76 * @deferred_queue: deferred queue saved OOS b'cast message received from node
76 * @reasm_buf: broadcast reassembly queue head from node 77 * @reasm_buf: broadcast reassembly queue head from node
78 * @inputq_map: bitmap indicating which inqueues should be kicked
77 * @recv_permitted: true if node is allowed to receive b'cast messages 79 * @recv_permitted: true if node is allowed to receive b'cast messages
78 */ 80 */
79struct tipc_node_bclink { 81struct tipc_node_bclink {
@@ -84,6 +86,7 @@ struct tipc_node_bclink {
84 u32 deferred_size; 86 u32 deferred_size;
85 struct sk_buff_head deferred_queue; 87 struct sk_buff_head deferred_queue;
86 struct sk_buff *reasm_buf; 88 struct sk_buff *reasm_buf;
89 int inputq_map;
87 bool recv_permitted; 90 bool recv_permitted;
88}; 91};
89 92
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index 26aec8414ac1..66666805b53c 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -776,44 +776,60 @@ new_mtu:
776 return rc; 776 return rc;
777} 777}
778 778
779/* tipc_sk_mcast_rcv - Deliver multicast message to all destination sockets 779/**
780 * tipc_sk_mcast_rcv - Deliver multicast messages to all destination sockets
781 * @arrvq: queue with arriving messages, to be cloned after destination lookup
782 * @inputq: queue with cloned messages, delivered to socket after dest lookup
783 *
784 * Multi-threaded: parallel calls with reference to same queues may occur
780 */ 785 */
781void tipc_sk_mcast_rcv(struct net *net, struct sk_buff *skb) 786void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
787 struct sk_buff_head *inputq)
782{ 788{
783 struct tipc_msg *msg = buf_msg(skb); 789 struct tipc_msg *msg;
784 struct tipc_plist dports; 790 struct tipc_plist dports;
785 struct sk_buff *cskb;
786 u32 portid; 791 u32 portid;
787 u32 scope = TIPC_CLUSTER_SCOPE; 792 u32 scope = TIPC_CLUSTER_SCOPE;
788 struct sk_buff_head msgq; 793 struct sk_buff_head tmpq;
789 uint hsz = skb_headroom(skb) + msg_hdr_sz(msg); 794 uint hsz;
795 struct sk_buff *skb, *_skb;
790 796
791 skb_queue_head_init(&msgq); 797 __skb_queue_head_init(&tmpq);
792 tipc_plist_init(&dports); 798 tipc_plist_init(&dports);
793 799
794 if (in_own_node(net, msg_orignode(msg))) 800 skb = tipc_skb_peek(arrvq, &inputq->lock);
795 scope = TIPC_NODE_SCOPE; 801 for (; skb; skb = tipc_skb_peek(arrvq, &inputq->lock)) {
796 802 msg = buf_msg(skb);
797 if (unlikely(!msg_mcast(msg))) { 803 hsz = skb_headroom(skb) + msg_hdr_sz(msg);
798 pr_warn("Received non-multicast msg in multicast\n"); 804
799 goto exit; 805 if (in_own_node(net, msg_orignode(msg)))
800 } 806 scope = TIPC_NODE_SCOPE;
801 /* Create destination port list: */ 807
802 tipc_nametbl_mc_translate(net, msg_nametype(msg), msg_namelower(msg), 808 /* Create destination port list and message clones: */
803 msg_nameupper(msg), scope, &dports); 809 tipc_nametbl_mc_translate(net,
804 portid = tipc_plist_pop(&dports); 810 msg_nametype(msg), msg_namelower(msg),
805 for (; portid; portid = tipc_plist_pop(&dports)) { 811 msg_nameupper(msg), scope, &dports);
806 cskb = __pskb_copy(skb, hsz, GFP_ATOMIC); 812 portid = tipc_plist_pop(&dports);
807 if (!cskb) { 813 for (; portid; portid = tipc_plist_pop(&dports)) {
808 pr_warn("Failed do clone mcast rcv buffer\n"); 814 _skb = __pskb_copy(skb, hsz, GFP_ATOMIC);
809 continue; 815 if (_skb) {
816 msg_set_destport(buf_msg(_skb), portid);
817 __skb_queue_tail(&tmpq, _skb);
818 continue;
819 }
820 pr_warn("Failed to clone mcast rcv buffer\n");
810 } 821 }
811 msg_set_destport(buf_msg(cskb), portid); 822 /* Append to inputq if not already done by other thread */
812 skb_queue_tail(&msgq, cskb); 823 spin_lock_bh(&inputq->lock);
824 if (skb_peek(arrvq) == skb) {
825 skb_queue_splice_tail_init(&tmpq, inputq);
826 kfree_skb(__skb_dequeue(arrvq));
827 }
828 spin_unlock_bh(&inputq->lock);
829 __skb_queue_purge(&tmpq);
830 kfree_skb(skb);
813 } 831 }
814 tipc_sk_rcv(net, &msgq); 832 tipc_sk_rcv(net, inputq);
815exit:
816 kfree_skb(skb);
817} 833}
818 834
819/** 835/**
diff --git a/net/tipc/socket.h b/net/tipc/socket.h
index 95b015909ac1..8be0da7df8fc 100644
--- a/net/tipc/socket.h
+++ b/net/tipc/socket.h
@@ -42,7 +42,6 @@
42#define TIPC_FLOWCTRL_WIN (TIPC_CONNACK_INTV * 2) 42#define TIPC_FLOWCTRL_WIN (TIPC_CONNACK_INTV * 2)
43#define TIPC_CONN_OVERLOAD_LIMIT ((TIPC_FLOWCTRL_WIN * 2 + 1) * \ 43#define TIPC_CONN_OVERLOAD_LIMIT ((TIPC_FLOWCTRL_WIN * 2 + 1) * \
44 SKB_TRUESIZE(TIPC_MAX_USER_MSG_SIZE)) 44 SKB_TRUESIZE(TIPC_MAX_USER_MSG_SIZE))
45
46int tipc_socket_init(void); 45int tipc_socket_init(void);
47void tipc_socket_stop(void); 46void tipc_socket_stop(void);
48int tipc_sock_create_local(struct net *net, int type, struct socket **res); 47int tipc_sock_create_local(struct net *net, int type, struct socket **res);
@@ -51,7 +50,8 @@ int tipc_sock_accept_local(struct socket *sock, struct socket **newsock,
51 int flags); 50 int flags);
52int tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq); 51int tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq);
53struct sk_buff *tipc_sk_socks_show(struct net *net); 52struct sk_buff *tipc_sk_socks_show(struct net *net);
54void tipc_sk_mcast_rcv(struct net *net, struct sk_buff *buf); 53void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
54 struct sk_buff_head *inputq);
55void tipc_sk_reinit(struct net *net); 55void tipc_sk_reinit(struct net *net);
56int tipc_sk_rht_init(struct net *net); 56int tipc_sk_rht_init(struct net *net);
57void tipc_sk_rht_destroy(struct net *net); 57void tipc_sk_rht_destroy(struct net *net);