aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJon Paul Maloy <jon.maloy@ericsson.com>2015-11-19 14:30:43 -0500
committerDavid S. Miller <davem@davemloft.net>2015-11-20 14:06:10 -0500
commit2312bf61ae365fdd6b9bfb24558a417859759447 (patch)
tree182ac3a58a25951a475c3bc88a32b0e82662201e
parent1d7e1c2595bd20c5274a8e49d89cf0cf483759de (diff)
tipc: introduce per-link spinlock
As a preparation to allow parallel links to work more independently from each other we introduce a per-link spinlock, to be stored in the struct nodes's link entry area. Since the node lock still is a regular spinlock there is no increase in parallellism at this stage. Reviewed-by: Ying Xue <ying.xue@windriver.com> Signed-off-by: Jon Maloy <jon.maloy@ericsson.com> Signed-off-by: David S. Miller <davem@davemloft.net>
-rw-r--r--net/tipc/link.c9
-rw-r--r--net/tipc/node.c39
-rw-r--r--net/tipc/node.h3
3 files changed, 25 insertions, 26 deletions
diff --git a/net/tipc/link.c b/net/tipc/link.c
index fa452fb5f34e..b5e895c6f1aa 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -1995,6 +1995,7 @@ int tipc_nl_link_reset_stats(struct sk_buff *skb, struct genl_info *info)
1995 struct tipc_node *node; 1995 struct tipc_node *node;
1996 struct nlattr *attrs[TIPC_NLA_LINK_MAX + 1]; 1996 struct nlattr *attrs[TIPC_NLA_LINK_MAX + 1];
1997 struct net *net = sock_net(skb->sk); 1997 struct net *net = sock_net(skb->sk);
1998 struct tipc_link_entry *le;
1998 1999
1999 if (!info->attrs[TIPC_NLA_LINK]) 2000 if (!info->attrs[TIPC_NLA_LINK])
2000 return -EINVAL; 2001 return -EINVAL;
@@ -2020,17 +2021,17 @@ int tipc_nl_link_reset_stats(struct sk_buff *skb, struct genl_info *info)
2020 node = tipc_link_find_owner(net, link_name, &bearer_id); 2021 node = tipc_link_find_owner(net, link_name, &bearer_id);
2021 if (!node) 2022 if (!node)
2022 return -EINVAL; 2023 return -EINVAL;
2023 2024 le = &node->links[bearer_id];
2024 tipc_node_lock(node); 2025 tipc_node_lock(node);
2025 2026 spin_lock_bh(&le->lock);
2026 link = node->links[bearer_id].link; 2027 link = le->link;
2027 if (!link) { 2028 if (!link) {
2028 tipc_node_unlock(node); 2029 tipc_node_unlock(node);
2029 return -EINVAL; 2030 return -EINVAL;
2030 } 2031 }
2031 2032
2032 link_reset_statistics(link); 2033 link_reset_statistics(link);
2033 2034 spin_unlock_bh(&le->lock);
2034 tipc_node_unlock(node); 2035 tipc_node_unlock(node);
2035 2036
2036 return 0; 2037 return 0;
diff --git a/net/tipc/node.c b/net/tipc/node.c
index 932195258551..572063a0190e 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -339,11 +339,13 @@ static void tipc_node_timeout(unsigned long data)
339 for (bearer_id = 0; bearer_id < MAX_BEARERS; bearer_id++) { 339 for (bearer_id = 0; bearer_id < MAX_BEARERS; bearer_id++) {
340 tipc_node_lock(n); 340 tipc_node_lock(n);
341 le = &n->links[bearer_id]; 341 le = &n->links[bearer_id];
342 spin_lock_bh(&le->lock);
342 if (le->link) { 343 if (le->link) {
343 /* Link tolerance may change asynchronously: */ 344 /* Link tolerance may change asynchronously: */
344 tipc_node_calculate_timer(n, le->link); 345 tipc_node_calculate_timer(n, le->link);
345 rc = tipc_link_timeout(le->link, &xmitq); 346 rc = tipc_link_timeout(le->link, &xmitq);
346 } 347 }
348 spin_unlock_bh(&le->lock);
347 tipc_node_unlock(n); 349 tipc_node_unlock(n);
348 tipc_bearer_xmit(n->net, bearer_id, &xmitq, &le->maddr); 350 tipc_bearer_xmit(n->net, bearer_id, &xmitq, &le->maddr);
349 if (rc & TIPC_LINK_DOWN_EVT) 351 if (rc & TIPC_LINK_DOWN_EVT)
@@ -654,6 +656,7 @@ void tipc_node_check_dest(struct net *net, u32 onode,
654 if (n->state == NODE_FAILINGOVER) 656 if (n->state == NODE_FAILINGOVER)
655 tipc_link_fsm_evt(l, LINK_FAILOVER_BEGIN_EVT); 657 tipc_link_fsm_evt(l, LINK_FAILOVER_BEGIN_EVT);
656 le->link = l; 658 le->link = l;
659 spin_lock_init(&le->lock);
657 n->link_cnt++; 660 n->link_cnt++;
658 tipc_node_calculate_timer(n, l); 661 tipc_node_calculate_timer(n, l);
659 if (n->link_cnt == 1) 662 if (n->link_cnt == 1)
@@ -1033,20 +1036,6 @@ msg_full:
1033 return -EMSGSIZE; 1036 return -EMSGSIZE;
1034} 1037}
1035 1038
1036static struct tipc_link *tipc_node_select_link(struct tipc_node *n, int sel,
1037 int *bearer_id,
1038 struct tipc_media_addr **maddr)
1039{
1040 int id = n->active_links[sel & 1];
1041
1042 if (unlikely(id < 0))
1043 return NULL;
1044
1045 *bearer_id = id;
1046 *maddr = &n->links[id].maddr;
1047 return n->links[id].link;
1048}
1049
1050/** 1039/**
1051 * tipc_node_xmit() is the general link level function for message sending 1040 * tipc_node_xmit() is the general link level function for message sending
1052 * @net: the applicable net namespace 1041 * @net: the applicable net namespace
@@ -1059,26 +1048,32 @@ static struct tipc_link *tipc_node_select_link(struct tipc_node *n, int sel,
1059int tipc_node_xmit(struct net *net, struct sk_buff_head *list, 1048int tipc_node_xmit(struct net *net, struct sk_buff_head *list,
1060 u32 dnode, int selector) 1049 u32 dnode, int selector)
1061{ 1050{
1062 struct tipc_link *l = NULL; 1051 struct tipc_link_entry *le;
1063 struct tipc_node *n; 1052 struct tipc_node *n;
1064 struct sk_buff_head xmitq; 1053 struct sk_buff_head xmitq;
1065 struct tipc_media_addr *maddr; 1054 struct tipc_media_addr *maddr = NULL;
1066 int bearer_id; 1055 int bearer_id = -1;
1067 int rc = -EHOSTUNREACH; 1056 int rc = -EHOSTUNREACH;
1068 1057
1069 __skb_queue_head_init(&xmitq); 1058 __skb_queue_head_init(&xmitq);
1070 n = tipc_node_find(net, dnode); 1059 n = tipc_node_find(net, dnode);
1071 if (likely(n)) { 1060 if (likely(n)) {
1072 tipc_node_lock(n); 1061 tipc_node_lock(n);
1073 l = tipc_node_select_link(n, selector, &bearer_id, &maddr); 1062 bearer_id = n->active_links[selector & 1];
1074 if (likely(l)) 1063 if (bearer_id >= 0) {
1075 rc = tipc_link_xmit(l, list, &xmitq); 1064 le = &n->links[bearer_id];
1065 maddr = &le->maddr;
1066 spin_lock_bh(&le->lock);
1067 if (likely(le->link))
1068 rc = tipc_link_xmit(le->link, list, &xmitq);
1069 spin_unlock_bh(&le->lock);
1070 }
1076 tipc_node_unlock(n); 1071 tipc_node_unlock(n);
1077 if (unlikely(rc == -ENOBUFS)) 1072 if (unlikely(rc == -ENOBUFS))
1078 tipc_node_link_down(n, bearer_id, false); 1073 tipc_node_link_down(n, bearer_id, false);
1079 tipc_node_put(n); 1074 tipc_node_put(n);
1080 } 1075 }
1081 if (likely(!rc)) { 1076 if (likely(!skb_queue_empty(&xmitq))) {
1082 tipc_bearer_xmit(net, bearer_id, &xmitq, maddr); 1077 tipc_bearer_xmit(net, bearer_id, &xmitq, maddr);
1083 return 0; 1078 return 0;
1084 } 1079 }
@@ -1374,7 +1369,9 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b)
1374 1369
1375 /* Check and if necessary update node state */ 1370 /* Check and if necessary update node state */
1376 if (likely(tipc_node_check_state(n, skb, bearer_id, &xmitq))) { 1371 if (likely(tipc_node_check_state(n, skb, bearer_id, &xmitq))) {
1372 spin_lock_bh(&le->lock);
1377 rc = tipc_link_rcv(le->link, skb, &xmitq); 1373 rc = tipc_link_rcv(le->link, skb, &xmitq);
1374 spin_unlock_bh(&le->lock);
1378 skb = NULL; 1375 skb = NULL;
1379 } 1376 }
1380unlock: 1377unlock:
diff --git a/net/tipc/node.h b/net/tipc/node.h
index dd79e9742bd6..8784907486c0 100644
--- a/net/tipc/node.h
+++ b/net/tipc/node.h
@@ -69,6 +69,7 @@ enum {
69 69
70struct tipc_link_entry { 70struct tipc_link_entry {
71 struct tipc_link *link; 71 struct tipc_link *link;
72 spinlock_t lock; /* per-link */
72 u32 mtu; 73 u32 mtu;
73 struct sk_buff_head inputq; 74 struct sk_buff_head inputq;
74 struct tipc_media_addr maddr; 75 struct tipc_media_addr maddr;
@@ -86,7 +87,7 @@ struct tipc_bclink_entry {
86 * struct tipc_node - TIPC node structure 87 * struct tipc_node - TIPC node structure
87 * @addr: network address of node 88 * @addr: network address of node
88 * @ref: reference counter to node object 89 * @ref: reference counter to node object
89 * @lock: spinlock governing access to structure 90 * @lock: rwlock governing access to structure
90 * @net: the applicable net namespace 91 * @net: the applicable net namespace
91 * @hash: links to adjacent nodes in unsorted hash chain 92 * @hash: links to adjacent nodes in unsorted hash chain
92 * @inputq: pointer to input queue containing messages for msg event 93 * @inputq: pointer to input queue containing messages for msg event