aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc/node.c
diff options
context:
space:
mode:
authorJon Paul Maloy <jon.maloy@ericsson.com>2015-02-05 08:36:41 -0500
committerDavid S. Miller <davem@davemloft.net>2015-02-05 19:00:02 -0500
commitc637c1035534867b85b78b453c38c495b58e2c5a (patch)
tree77cd2a48a5b04e43b014da64168a6c1e209a1d40 /net/tipc/node.c
parent94153e36e709e78fc4e1f93dc4e4da785690c7d1 (diff)
tipc: resolve race problem at unicast message reception
TIPC handles message cardinality and sequencing at the link layer, before passing messages upwards to the destination sockets. During the upcall from link to socket no locks are held. It is therefore possible, and we see it happen occasionally, that messages arriving in different threads and delivered in sequence still bypass each other before they reach the destination socket. This must not happen, since it violates the sequentiality guarantee. We solve this by adding a new input buffer queue to the link structure. Arriving messages are added safely to the tail of that queue by the link, while the head of the queue is consumed, also safely, by the receiving socket. Sequentiality is secured per socket by only allowing buffers to be dequeued inside the socket lock. Since there may be multiple simultaneous readers of the queue, we use a 'filter' parameter to reduce the risk that they peek the same buffer from the queue, hence also reducing the risk of contention on the receiving socket locks. This solves the sequentiality problem, and seems to cause no measurable performance degradation. A nice side effect of this change is that lock handling in the functions tipc_rcv() and tipc_bcast_rcv() now becomes uniform, something that will enable future simplifications of those functions. Reviewed-by: Ying Xue <ying.xue@windriver.com> Signed-off-by: Jon Maloy <jon.maloy@ericsson.com> Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net/tipc/node.c')
-rw-r--r--net/tipc/node.c43
1 files changed, 24 insertions, 19 deletions
diff --git a/net/tipc/node.c b/net/tipc/node.c
index 1c409c45f0fe..dcb83d9b2193 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -111,11 +111,8 @@ struct tipc_node *tipc_node_create(struct net *net, u32 addr)
111 INIT_LIST_HEAD(&n_ptr->list); 111 INIT_LIST_HEAD(&n_ptr->list);
112 INIT_LIST_HEAD(&n_ptr->publ_list); 112 INIT_LIST_HEAD(&n_ptr->publ_list);
113 INIT_LIST_HEAD(&n_ptr->conn_sks); 113 INIT_LIST_HEAD(&n_ptr->conn_sks);
114 skb_queue_head_init(&n_ptr->waiting_sks);
115 __skb_queue_head_init(&n_ptr->bclink.deferred_queue); 114 __skb_queue_head_init(&n_ptr->bclink.deferred_queue);
116
117 hlist_add_head_rcu(&n_ptr->hash, &tn->node_htable[tipc_hashfn(addr)]); 115 hlist_add_head_rcu(&n_ptr->hash, &tn->node_htable[tipc_hashfn(addr)]);
118
119 list_for_each_entry_rcu(temp_node, &tn->node_list, list) { 116 list_for_each_entry_rcu(temp_node, &tn->node_list, list) {
120 if (n_ptr->addr < temp_node->addr) 117 if (n_ptr->addr < temp_node->addr)
121 break; 118 break;
@@ -201,19 +198,22 @@ void tipc_node_abort_sock_conns(struct net *net, struct list_head *conns)
201{ 198{
202 struct tipc_net *tn = net_generic(net, tipc_net_id); 199 struct tipc_net *tn = net_generic(net, tipc_net_id);
203 struct tipc_sock_conn *conn, *safe; 200 struct tipc_sock_conn *conn, *safe;
204 struct sk_buff *buf; 201 struct sk_buff *skb;
202 struct sk_buff_head skbs;
205 203
204 skb_queue_head_init(&skbs);
206 list_for_each_entry_safe(conn, safe, conns, list) { 205 list_for_each_entry_safe(conn, safe, conns, list) {
207 buf = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, 206 skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE,
208 TIPC_CONN_MSG, SHORT_H_SIZE, 0, 207 TIPC_CONN_MSG, SHORT_H_SIZE, 0,
209 tn->own_addr, conn->peer_node, 208 tn->own_addr, conn->peer_node,
210 conn->port, conn->peer_port, 209 conn->port, conn->peer_port,
211 TIPC_ERR_NO_NODE); 210 TIPC_ERR_NO_NODE);
212 if (likely(buf)) 211 if (likely(skb))
213 tipc_sk_rcv(net, buf); 212 skb_queue_tail(&skbs, skb);
214 list_del(&conn->list); 213 list_del(&conn->list);
215 kfree(conn); 214 kfree(conn);
216 } 215 }
216 tipc_sk_rcv(net, &skbs);
217} 217}
218 218
219/** 219/**
@@ -568,37 +568,36 @@ void tipc_node_unlock(struct tipc_node *node)
568 struct net *net = node->net; 568 struct net *net = node->net;
569 LIST_HEAD(nsub_list); 569 LIST_HEAD(nsub_list);
570 LIST_HEAD(conn_sks); 570 LIST_HEAD(conn_sks);
571 struct sk_buff_head waiting_sks;
572 u32 addr = 0; 571 u32 addr = 0;
573 int flags = node->action_flags; 572 u32 flags = node->action_flags;
574 u32 link_id = 0; 573 u32 link_id = 0;
574 struct sk_buff_head *inputq = node->inputq;
575 struct sk_buff_head *namedq = node->inputq;
575 576
576 if (likely(!flags)) { 577 if (likely(!flags || (flags == TIPC_MSG_EVT))) {
578 node->action_flags = 0;
577 spin_unlock_bh(&node->lock); 579 spin_unlock_bh(&node->lock);
580 if (flags == TIPC_MSG_EVT)
581 tipc_sk_rcv(net, inputq);
578 return; 582 return;
579 } 583 }
580 584
581 addr = node->addr; 585 addr = node->addr;
582 link_id = node->link_id; 586 link_id = node->link_id;
583 __skb_queue_head_init(&waiting_sks); 587 namedq = node->namedq;
584
585 if (flags & TIPC_WAKEUP_USERS)
586 skb_queue_splice_init(&node->waiting_sks, &waiting_sks);
587 588
588 if (flags & TIPC_NOTIFY_NODE_DOWN) { 589 if (flags & TIPC_NOTIFY_NODE_DOWN) {
589 list_replace_init(&node->publ_list, &nsub_list); 590 list_replace_init(&node->publ_list, &nsub_list);
590 list_replace_init(&node->conn_sks, &conn_sks); 591 list_replace_init(&node->conn_sks, &conn_sks);
591 } 592 }
592 node->action_flags &= ~(TIPC_WAKEUP_USERS | TIPC_NOTIFY_NODE_DOWN | 593 node->action_flags &= ~(TIPC_MSG_EVT | TIPC_NOTIFY_NODE_DOWN |
593 TIPC_NOTIFY_NODE_UP | TIPC_NOTIFY_LINK_UP | 594 TIPC_NOTIFY_NODE_UP | TIPC_NOTIFY_LINK_UP |
594 TIPC_NOTIFY_LINK_DOWN | 595 TIPC_NOTIFY_LINK_DOWN |
595 TIPC_WAKEUP_BCAST_USERS); 596 TIPC_WAKEUP_BCAST_USERS |
597 TIPC_NAMED_MSG_EVT);
596 598
597 spin_unlock_bh(&node->lock); 599 spin_unlock_bh(&node->lock);
598 600
599 while (!skb_queue_empty(&waiting_sks))
600 tipc_sk_rcv(net, __skb_dequeue(&waiting_sks));
601
602 if (!list_empty(&conn_sks)) 601 if (!list_empty(&conn_sks))
603 tipc_node_abort_sock_conns(net, &conn_sks); 602 tipc_node_abort_sock_conns(net, &conn_sks);
604 603
@@ -618,6 +617,12 @@ void tipc_node_unlock(struct tipc_node *node)
618 if (flags & TIPC_NOTIFY_LINK_DOWN) 617 if (flags & TIPC_NOTIFY_LINK_DOWN)
619 tipc_nametbl_withdraw(net, TIPC_LINK_STATE, addr, 618 tipc_nametbl_withdraw(net, TIPC_LINK_STATE, addr,
620 link_id, addr); 619 link_id, addr);
620
621 if (flags & TIPC_MSG_EVT)
622 tipc_sk_rcv(net, inputq);
623
624 if (flags & TIPC_NAMED_MSG_EVT)
625 tipc_named_rcv(net, namedq);
621} 626}
622 627
623/* Caller should hold node lock for the passed node */ 628/* Caller should hold node lock for the passed node */