aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc/socket.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/tipc/socket.c')
-rw-r--r--net/tipc/socket.c72
1 files changed, 44 insertions, 28 deletions
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index 26aec8414ac1..66666805b53c 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -776,44 +776,60 @@ new_mtu:
776 return rc; 776 return rc;
777} 777}
778 778
779/* tipc_sk_mcast_rcv - Deliver multicast message to all destination sockets 779/**
780 * tipc_sk_mcast_rcv - Deliver multicast messages to all destination sockets
781 * @arrvq: queue with arriving messages, to be cloned after destination lookup
782 * @inputq: queue with cloned messages, delivered to socket after dest lookup
783 *
784 * Multi-threaded: parallel calls with reference to same queues may occur
780 */ 785 */
781void tipc_sk_mcast_rcv(struct net *net, struct sk_buff *skb) 786void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
787 struct sk_buff_head *inputq)
782{ 788{
783 struct tipc_msg *msg = buf_msg(skb); 789 struct tipc_msg *msg;
784 struct tipc_plist dports; 790 struct tipc_plist dports;
785 struct sk_buff *cskb;
786 u32 portid; 791 u32 portid;
787 u32 scope = TIPC_CLUSTER_SCOPE; 792 u32 scope = TIPC_CLUSTER_SCOPE;
788 struct sk_buff_head msgq; 793 struct sk_buff_head tmpq;
789 uint hsz = skb_headroom(skb) + msg_hdr_sz(msg); 794 uint hsz;
795 struct sk_buff *skb, *_skb;
790 796
791 skb_queue_head_init(&msgq); 797 __skb_queue_head_init(&tmpq);
792 tipc_plist_init(&dports); 798 tipc_plist_init(&dports);
793 799
794 if (in_own_node(net, msg_orignode(msg))) 800 skb = tipc_skb_peek(arrvq, &inputq->lock);
795 scope = TIPC_NODE_SCOPE; 801 for (; skb; skb = tipc_skb_peek(arrvq, &inputq->lock)) {
796 802 msg = buf_msg(skb);
797 if (unlikely(!msg_mcast(msg))) { 803 hsz = skb_headroom(skb) + msg_hdr_sz(msg);
798 pr_warn("Received non-multicast msg in multicast\n"); 804
799 goto exit; 805 if (in_own_node(net, msg_orignode(msg)))
800 } 806 scope = TIPC_NODE_SCOPE;
801 /* Create destination port list: */ 807
802 tipc_nametbl_mc_translate(net, msg_nametype(msg), msg_namelower(msg), 808 /* Create destination port list and message clones: */
803 msg_nameupper(msg), scope, &dports); 809 tipc_nametbl_mc_translate(net,
804 portid = tipc_plist_pop(&dports); 810 msg_nametype(msg), msg_namelower(msg),
805 for (; portid; portid = tipc_plist_pop(&dports)) { 811 msg_nameupper(msg), scope, &dports);
806 cskb = __pskb_copy(skb, hsz, GFP_ATOMIC); 812 portid = tipc_plist_pop(&dports);
807 if (!cskb) { 813 for (; portid; portid = tipc_plist_pop(&dports)) {
808 pr_warn("Failed do clone mcast rcv buffer\n"); 814 _skb = __pskb_copy(skb, hsz, GFP_ATOMIC);
809 continue; 815 if (_skb) {
816 msg_set_destport(buf_msg(_skb), portid);
817 __skb_queue_tail(&tmpq, _skb);
818 continue;
819 }
820 pr_warn("Failed to clone mcast rcv buffer\n");
810 } 821 }
811 msg_set_destport(buf_msg(cskb), portid); 822 /* Append to inputq if not already done by other thread */
812 skb_queue_tail(&msgq, cskb); 823 spin_lock_bh(&inputq->lock);
824 if (skb_peek(arrvq) == skb) {
825 skb_queue_splice_tail_init(&tmpq, inputq);
826 kfree_skb(__skb_dequeue(arrvq));
827 }
828 spin_unlock_bh(&inputq->lock);
829 __skb_queue_purge(&tmpq);
830 kfree_skb(skb);
813 } 831 }
814 tipc_sk_rcv(net, &msgq); 832 tipc_sk_rcv(net, inputq);
815exit:
816 kfree_skb(skb);
817} 833}
818 834
819/** 835/**