aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc/socket.c
diff options
context:
space:
mode:
authorJon Paul Maloy <jon.maloy@ericsson.com>2015-02-05 08:36:44 -0500
committerDavid S. Miller <davem@davemloft.net>2015-02-05 19:00:03 -0500
commitcb1b728096f54e7408d60fb571944bed00c5b771 (patch)
tree1b1e50e705f4ddd3b36a43ac9067538db3e8233f /net/tipc/socket.c
parent3c724acdd5049907555a831f814bfd5927c3350c (diff)
tipc: eliminate race condition at multicast reception
In a previous commit in this series we resolved a race problem during unicast message reception. Here, we resolve the same problem at multicast reception. We apply the same technique: an input queue serializing the delivery of arriving buffers. The main difference is that here we do it in two steps. First, the broadcast link feeds arriving buffers into the tail of an arrival queue, which head is consumed at the socket level, and where destination lookup is performed. Second, if the lookup is successful, the resulting buffer clones are fed into a second queue, the input queue. This queue is consumed at reception in the socket just like in the unicast case. Both queues are protected by the same lock, -the one of the input queue. Reviewed-by: Ying Xue <ying.xue@windriver.com> Signed-off-by: Jon Maloy <jon.maloy@ericsson.com> Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net/tipc/socket.c')
-rw-r--r--net/tipc/socket.c72
1 files changed, 44 insertions, 28 deletions
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index 26aec8414ac1..66666805b53c 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -776,44 +776,60 @@ new_mtu:
776 return rc; 776 return rc;
777} 777}
778 778
779/* tipc_sk_mcast_rcv - Deliver multicast message to all destination sockets 779/**
780 * tipc_sk_mcast_rcv - Deliver multicast messages to all destination sockets
781 * @arrvq: queue with arriving messages, to be cloned after destination lookup
782 * @inputq: queue with cloned messages, delivered to socket after dest lookup
783 *
784 * Multi-threaded: parallel calls with reference to same queues may occur
780 */ 785 */
781void tipc_sk_mcast_rcv(struct net *net, struct sk_buff *skb) 786void tipc_sk_mcast_rcv(struct net *net, struct sk_buff_head *arrvq,
787 struct sk_buff_head *inputq)
782{ 788{
783 struct tipc_msg *msg = buf_msg(skb); 789 struct tipc_msg *msg;
784 struct tipc_plist dports; 790 struct tipc_plist dports;
785 struct sk_buff *cskb;
786 u32 portid; 791 u32 portid;
787 u32 scope = TIPC_CLUSTER_SCOPE; 792 u32 scope = TIPC_CLUSTER_SCOPE;
788 struct sk_buff_head msgq; 793 struct sk_buff_head tmpq;
789 uint hsz = skb_headroom(skb) + msg_hdr_sz(msg); 794 uint hsz;
795 struct sk_buff *skb, *_skb;
790 796
791 skb_queue_head_init(&msgq); 797 __skb_queue_head_init(&tmpq);
792 tipc_plist_init(&dports); 798 tipc_plist_init(&dports);
793 799
794 if (in_own_node(net, msg_orignode(msg))) 800 skb = tipc_skb_peek(arrvq, &inputq->lock);
795 scope = TIPC_NODE_SCOPE; 801 for (; skb; skb = tipc_skb_peek(arrvq, &inputq->lock)) {
796 802 msg = buf_msg(skb);
797 if (unlikely(!msg_mcast(msg))) { 803 hsz = skb_headroom(skb) + msg_hdr_sz(msg);
798 pr_warn("Received non-multicast msg in multicast\n"); 804
799 goto exit; 805 if (in_own_node(net, msg_orignode(msg)))
800 } 806 scope = TIPC_NODE_SCOPE;
801 /* Create destination port list: */ 807
802 tipc_nametbl_mc_translate(net, msg_nametype(msg), msg_namelower(msg), 808 /* Create destination port list and message clones: */
803 msg_nameupper(msg), scope, &dports); 809 tipc_nametbl_mc_translate(net,
804 portid = tipc_plist_pop(&dports); 810 msg_nametype(msg), msg_namelower(msg),
805 for (; portid; portid = tipc_plist_pop(&dports)) { 811 msg_nameupper(msg), scope, &dports);
806 cskb = __pskb_copy(skb, hsz, GFP_ATOMIC); 812 portid = tipc_plist_pop(&dports);
807 if (!cskb) { 813 for (; portid; portid = tipc_plist_pop(&dports)) {
808 pr_warn("Failed do clone mcast rcv buffer\n"); 814 _skb = __pskb_copy(skb, hsz, GFP_ATOMIC);
809 continue; 815 if (_skb) {
816 msg_set_destport(buf_msg(_skb), portid);
817 __skb_queue_tail(&tmpq, _skb);
818 continue;
819 }
820 pr_warn("Failed to clone mcast rcv buffer\n");
810 } 821 }
811 msg_set_destport(buf_msg(cskb), portid); 822 /* Append to inputq if not already done by other thread */
812 skb_queue_tail(&msgq, cskb); 823 spin_lock_bh(&inputq->lock);
824 if (skb_peek(arrvq) == skb) {
825 skb_queue_splice_tail_init(&tmpq, inputq);
826 kfree_skb(__skb_dequeue(arrvq));
827 }
828 spin_unlock_bh(&inputq->lock);
829 __skb_queue_purge(&tmpq);
830 kfree_skb(skb);
813 } 831 }
814 tipc_sk_rcv(net, &msgq); 832 tipc_sk_rcv(net, inputq);
815exit:
816 kfree_skb(skb);
817} 833}
818 834
819/** 835/**