aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc/bcast.c
diff options
context:
space:
mode:
authorJon Paul Maloy <jon.maloy@ericsson.com>2015-02-05 08:36:41 -0500
committerDavid S. Miller <davem@davemloft.net>2015-02-05 19:00:02 -0500
commitc637c1035534867b85b78b453c38c495b58e2c5a (patch)
tree77cd2a48a5b04e43b014da64168a6c1e209a1d40 /net/tipc/bcast.c
parent94153e36e709e78fc4e1f93dc4e4da785690c7d1 (diff)
tipc: resolve race problem at unicast message reception
TIPC handles message cardinality and sequencing at the link layer, before passing messages upwards to the destination sockets. During the upcall from link to socket no locks are held. It is therefore possible, and we see it happen occasionally, that messages arriving in different threads and delivered in sequence still bypass each other before they reach the destination socket. This must not happen, since it violates the sequentiality guarantee. We solve this by adding a new input buffer queue to the link structure. Arriving messages are added safely to the tail of that queue by the link, while the head of the queue is consumed, also safely, by the receiving socket. Sequentiality is secured per socket by only allowing buffers to be dequeued inside the socket lock. Since there may be multiple simultaneous readers of the queue, we use a 'filter' parameter to reduce the risk that they peek the same buffer from the queue, hence also reducing the risk of contention on the receiving socket locks. This solves the sequentiality problem, and seems to cause no measurable performance degradation. A nice side effect of this change is that lock handling in the functions tipc_rcv() and tipc_bcast_rcv() now becomes uniform, something that will enable future simplifications of those functions. Reviewed-by: Ying Xue <ying.xue@windriver.com> Signed-off-by: Jon Maloy <jon.maloy@ericsson.com> Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net/tipc/bcast.c')
-rw-r--r--net/tipc/bcast.c20
1 files changed, 11 insertions, 9 deletions
diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index 3b886eb35c87..2dfaf272928a 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -189,10 +189,8 @@ static void bclink_retransmit_pkt(struct tipc_net *tn, u32 after, u32 to)
189void tipc_bclink_wakeup_users(struct net *net) 189void tipc_bclink_wakeup_users(struct net *net)
190{ 190{
191 struct tipc_net *tn = net_generic(net, tipc_net_id); 191 struct tipc_net *tn = net_generic(net, tipc_net_id);
192 struct sk_buff *skb;
193 192
194 while ((skb = skb_dequeue(&tn->bclink->link.waiting_sks))) 193 tipc_sk_rcv(net, &tn->bclink->link.wakeupq);
195 tipc_sk_rcv(net, skb);
196} 194}
197 195
198/** 196/**
@@ -271,9 +269,8 @@ void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked)
271 tipc_link_push_packets(tn->bcl); 269 tipc_link_push_packets(tn->bcl);
272 bclink_set_last_sent(net); 270 bclink_set_last_sent(net);
273 } 271 }
274 if (unlikely(released && !skb_queue_empty(&tn->bcl->waiting_sks))) 272 if (unlikely(released && !skb_queue_empty(&tn->bcl->wakeupq)))
275 n_ptr->action_flags |= TIPC_WAKEUP_BCAST_USERS; 273 n_ptr->action_flags |= TIPC_WAKEUP_BCAST_USERS;
276
277exit: 274exit:
278 tipc_bclink_unlock(net); 275 tipc_bclink_unlock(net);
279} 276}
@@ -450,6 +447,9 @@ void tipc_bclink_rcv(struct net *net, struct sk_buff *buf)
450 u32 next_in; 447 u32 next_in;
451 u32 seqno; 448 u32 seqno;
452 int deferred = 0; 449 int deferred = 0;
450 int pos = 0;
451 struct sk_buff *iskb;
452 struct sk_buff_head msgs;
453 453
454 /* Screen out unwanted broadcast messages */ 454 /* Screen out unwanted broadcast messages */
455 if (msg_mc_netid(msg) != tn->net_id) 455 if (msg_mc_netid(msg) != tn->net_id)
@@ -506,7 +506,8 @@ receive:
506 bcl->stats.recv_bundled += msg_msgcnt(msg); 506 bcl->stats.recv_bundled += msg_msgcnt(msg);
507 tipc_bclink_unlock(net); 507 tipc_bclink_unlock(net);
508 tipc_node_unlock(node); 508 tipc_node_unlock(node);
509 tipc_link_bundle_rcv(net, buf); 509 while (tipc_msg_extract(buf, &iskb, &pos))
510 tipc_sk_mcast_rcv(net, iskb);
510 } else if (msg_user(msg) == MSG_FRAGMENTER) { 511 } else if (msg_user(msg) == MSG_FRAGMENTER) {
511 tipc_buf_append(&node->bclink.reasm_buf, &buf); 512 tipc_buf_append(&node->bclink.reasm_buf, &buf);
512 if (unlikely(!buf && !node->bclink.reasm_buf)) 513 if (unlikely(!buf && !node->bclink.reasm_buf))
@@ -527,7 +528,9 @@ receive:
527 bclink_accept_pkt(node, seqno); 528 bclink_accept_pkt(node, seqno);
528 tipc_bclink_unlock(net); 529 tipc_bclink_unlock(net);
529 tipc_node_unlock(node); 530 tipc_node_unlock(node);
530 tipc_named_rcv(net, buf); 531 skb_queue_head_init(&msgs);
532 skb_queue_tail(&msgs, buf);
533 tipc_named_rcv(net, &msgs);
531 } else { 534 } else {
532 tipc_bclink_lock(net); 535 tipc_bclink_lock(net);
533 bclink_accept_pkt(node, seqno); 536 bclink_accept_pkt(node, seqno);
@@ -944,10 +947,9 @@ int tipc_bclink_init(struct net *net)
944 spin_lock_init(&bclink->lock); 947 spin_lock_init(&bclink->lock);
945 __skb_queue_head_init(&bcl->outqueue); 948 __skb_queue_head_init(&bcl->outqueue);
946 __skb_queue_head_init(&bcl->deferred_queue); 949 __skb_queue_head_init(&bcl->deferred_queue);
947 skb_queue_head_init(&bcl->waiting_sks); 950 skb_queue_head_init(&bcl->wakeupq);
948 bcl->next_out_no = 1; 951 bcl->next_out_no = 1;
949 spin_lock_init(&bclink->node.lock); 952 spin_lock_init(&bclink->node.lock);
950 __skb_queue_head_init(&bclink->node.waiting_sks);
951 bcl->owner = &bclink->node; 953 bcl->owner = &bclink->node;
952 bcl->owner->net = net; 954 bcl->owner->net = net;
953 bcl->max_pkt = MAX_PKT_DEFAULT_MCAST; 955 bcl->max_pkt = MAX_PKT_DEFAULT_MCAST;