aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJon Paul Maloy <jon.maloy@ericsson.com>2014-08-22 18:09:08 -0400
committerDavid S. Miller <davem@davemloft.net>2014-08-23 14:18:33 -0400
commit02be61a981fb5ca5f1526323336198ee92cadf95 (patch)
tree2cd5f3dc25025a071ab2806b2e10981851b21bc5
parent50100a5e39461b2a61d6040e73c384766c29975d (diff)
tipc: use message to abort connections when losing contact to node
In the current implementation, each 'struct tipc_node' instance keeps a linked list of those ports/sockets that are connected to the node represented by that struct. The purpose of this is to let the node object know which sockets to alert when it loses contact with its peer node, i.e., which sockets need to have their connections aborted. This entails an unwanted direct reference from the node structure back to the port/socket structure, and a need to grab port_lock when we have to make an upcall to the port. We want to get rid of this unecessary BH entry point into the socket, and also eliminate its use of port_lock. In this commit, we instead let the node struct keep list of "connected socket" structs, which each represents a connected socket, but is allocated independently by the node at the moment of connection. If the node loses contact with its peer node, the list is traversed, and a "connection abort" message is created for each entry in the list. The message is sent to it respective connected socket using the ordinary data path, and the receiving socket aborts its connections upon reception of the message. This enables us to get rid of the direct reference from 'struct node' to ´struct port', and another unwanted BH access point to the latter. Signed-off-by: Jon Maloy <jon.maloy@ericsson.com> Reviewed-by: Erik Hugne <erik.hugne@ericsson.com> Reviewed-by: Ying Xue <ying.xue@windriver.com> Signed-off-by: David S. Miller <davem@davemloft.net>
-rw-r--r--net/tipc/node.c78
-rw-r--r--net/tipc/node.h3
-rw-r--r--net/tipc/port.c29
3 files changed, 85 insertions, 25 deletions
diff --git a/net/tipc/node.c b/net/tipc/node.c
index 6ea2c15cfc88..17e6378c4dfe 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -51,6 +51,13 @@ static u32 tipc_num_nodes;
51static u32 tipc_num_links; 51static u32 tipc_num_links;
52static DEFINE_SPINLOCK(node_list_lock); 52static DEFINE_SPINLOCK(node_list_lock);
53 53
54struct tipc_sock_conn {
55 u32 port;
56 u32 peer_port;
57 u32 peer_node;
58 struct list_head list;
59};
60
54/* 61/*
55 * A trivial power-of-two bitmask technique is used for speed, since this 62 * A trivial power-of-two bitmask technique is used for speed, since this
56 * operation is done for every incoming TIPC packet. The number of hash table 63 * operation is done for every incoming TIPC packet. The number of hash table
@@ -101,6 +108,7 @@ struct tipc_node *tipc_node_create(u32 addr)
101 INIT_HLIST_NODE(&n_ptr->hash); 108 INIT_HLIST_NODE(&n_ptr->hash);
102 INIT_LIST_HEAD(&n_ptr->list); 109 INIT_LIST_HEAD(&n_ptr->list);
103 INIT_LIST_HEAD(&n_ptr->nsub); 110 INIT_LIST_HEAD(&n_ptr->nsub);
111 INIT_LIST_HEAD(&n_ptr->conn_sks);
104 __skb_queue_head_init(&n_ptr->waiting_sks); 112 __skb_queue_head_init(&n_ptr->waiting_sks);
105 113
106 hlist_add_head_rcu(&n_ptr->hash, &node_htable[tipc_hashfn(addr)]); 114 hlist_add_head_rcu(&n_ptr->hash, &node_htable[tipc_hashfn(addr)]);
@@ -138,6 +146,71 @@ void tipc_node_stop(void)
138 spin_unlock_bh(&node_list_lock); 146 spin_unlock_bh(&node_list_lock);
139} 147}
140 148
149int tipc_node_add_conn(u32 dnode, u32 port, u32 peer_port)
150{
151 struct tipc_node *node;
152 struct tipc_sock_conn *conn;
153
154 if (in_own_node(dnode))
155 return 0;
156
157 node = tipc_node_find(dnode);
158 if (!node) {
159 pr_warn("Connecting sock to node 0x%x failed\n", dnode);
160 return -EHOSTUNREACH;
161 }
162 conn = kmalloc(sizeof(*conn), GFP_ATOMIC);
163 if (!conn)
164 return -EHOSTUNREACH;
165 conn->peer_node = dnode;
166 conn->port = port;
167 conn->peer_port = peer_port;
168
169 tipc_node_lock(node);
170 list_add_tail(&conn->list, &node->conn_sks);
171 tipc_node_unlock(node);
172 return 0;
173}
174
175void tipc_node_remove_conn(u32 dnode, u32 port)
176{
177 struct tipc_node *node;
178 struct tipc_sock_conn *conn, *safe;
179
180 if (in_own_node(dnode))
181 return;
182
183 node = tipc_node_find(dnode);
184 if (!node)
185 return;
186
187 tipc_node_lock(node);
188 list_for_each_entry_safe(conn, safe, &node->conn_sks, list) {
189 if (port != conn->port)
190 continue;
191 list_del(&conn->list);
192 kfree(conn);
193 }
194 tipc_node_unlock(node);
195}
196
197void tipc_node_abort_sock_conns(struct list_head *conns)
198{
199 struct tipc_sock_conn *conn, *safe;
200 struct sk_buff *buf;
201
202 list_for_each_entry_safe(conn, safe, conns, list) {
203 buf = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, TIPC_CONN_MSG,
204 SHORT_H_SIZE, 0, tipc_own_addr,
205 conn->peer_node, conn->port,
206 conn->peer_port, TIPC_ERR_NO_NODE);
207 if (likely(buf))
208 tipc_sk_rcv(buf);
209 list_del(&conn->list);
210 kfree(conn);
211 }
212}
213
141/** 214/**
142 * tipc_node_link_up - handle addition of link 215 * tipc_node_link_up - handle addition of link
143 * 216 *
@@ -476,6 +549,7 @@ int tipc_node_get_linkname(u32 bearer_id, u32 addr, char *linkname, size_t len)
476void tipc_node_unlock(struct tipc_node *node) 549void tipc_node_unlock(struct tipc_node *node)
477{ 550{
478 LIST_HEAD(nsub_list); 551 LIST_HEAD(nsub_list);
552 LIST_HEAD(conn_sks);
479 struct sk_buff_head waiting_sks; 553 struct sk_buff_head waiting_sks;
480 u32 addr = 0; 554 u32 addr = 0;
481 555
@@ -491,6 +565,7 @@ void tipc_node_unlock(struct tipc_node *node)
491 } 565 }
492 if (node->action_flags & TIPC_NOTIFY_NODE_DOWN) { 566 if (node->action_flags & TIPC_NOTIFY_NODE_DOWN) {
493 list_replace_init(&node->nsub, &nsub_list); 567 list_replace_init(&node->nsub, &nsub_list);
568 list_replace_init(&node->conn_sks, &conn_sks);
494 node->action_flags &= ~TIPC_NOTIFY_NODE_DOWN; 569 node->action_flags &= ~TIPC_NOTIFY_NODE_DOWN;
495 } 570 }
496 if (node->action_flags & TIPC_NOTIFY_NODE_UP) { 571 if (node->action_flags & TIPC_NOTIFY_NODE_UP) {
@@ -502,6 +577,9 @@ void tipc_node_unlock(struct tipc_node *node)
502 while (!skb_queue_empty(&waiting_sks)) 577 while (!skb_queue_empty(&waiting_sks))
503 tipc_sk_rcv(__skb_dequeue(&waiting_sks)); 578 tipc_sk_rcv(__skb_dequeue(&waiting_sks));
504 579
580 if (!list_empty(&conn_sks))
581 tipc_node_abort_sock_conns(&conn_sks);
582
505 if (!list_empty(&nsub_list)) 583 if (!list_empty(&nsub_list))
506 tipc_nodesub_notify(&nsub_list); 584 tipc_nodesub_notify(&nsub_list);
507 585
diff --git a/net/tipc/node.h b/net/tipc/node.h
index 2ebf9e8b50fd..522d6f3157b3 100644
--- a/net/tipc/node.h
+++ b/net/tipc/node.h
@@ -117,6 +117,7 @@ struct tipc_node {
117 u32 signature; 117 u32 signature;
118 struct list_head nsub; 118 struct list_head nsub;
119 struct sk_buff_head waiting_sks; 119 struct sk_buff_head waiting_sks;
120 struct list_head conn_sks;
120 struct rcu_head rcu; 121 struct rcu_head rcu;
121}; 122};
122 123
@@ -135,6 +136,8 @@ struct sk_buff *tipc_node_get_links(const void *req_tlv_area, int req_tlv_space)
135struct sk_buff *tipc_node_get_nodes(const void *req_tlv_area, int req_tlv_space); 136struct sk_buff *tipc_node_get_nodes(const void *req_tlv_area, int req_tlv_space);
136int tipc_node_get_linkname(u32 bearer_id, u32 node, char *linkname, size_t len); 137int tipc_node_get_linkname(u32 bearer_id, u32 node, char *linkname, size_t len);
137void tipc_node_unlock(struct tipc_node *node); 138void tipc_node_unlock(struct tipc_node *node);
139int tipc_node_add_conn(u32 dnode, u32 port, u32 peer_port);
140void tipc_node_remove_conn(u32 dnode, u32 port);
138 141
139static inline void tipc_node_lock(struct tipc_node *node) 142static inline void tipc_node_lock(struct tipc_node *node)
140{ 143{
diff --git a/net/tipc/port.c b/net/tipc/port.c
index b58a777a4399..edbd83d223c5 100644
--- a/net/tipc/port.c
+++ b/net/tipc/port.c
@@ -48,7 +48,6 @@
48DEFINE_SPINLOCK(tipc_port_list_lock); 48DEFINE_SPINLOCK(tipc_port_list_lock);
49 49
50static LIST_HEAD(ports); 50static LIST_HEAD(ports);
51static void port_handle_node_down(unsigned long ref);
52static struct sk_buff *port_build_self_abort_msg(struct tipc_port *, u32 err); 51static struct sk_buff *port_build_self_abort_msg(struct tipc_port *, u32 err);
53static struct sk_buff *port_build_peer_abort_msg(struct tipc_port *, u32 err); 52static struct sk_buff *port_build_peer_abort_msg(struct tipc_port *, u32 err);
54static void port_timeout(unsigned long ref); 53static void port_timeout(unsigned long ref);
@@ -126,10 +125,10 @@ void tipc_port_destroy(struct tipc_port *p_ptr)
126 k_cancel_timer(&p_ptr->timer); 125 k_cancel_timer(&p_ptr->timer);
127 if (p_ptr->connected) { 126 if (p_ptr->connected) {
128 buf = port_build_peer_abort_msg(p_ptr, TIPC_ERR_NO_PORT); 127 buf = port_build_peer_abort_msg(p_ptr, TIPC_ERR_NO_PORT);
129 tipc_nodesub_unsubscribe(&p_ptr->subscription);
130 msg = buf_msg(buf); 128 msg = buf_msg(buf);
131 peer = msg_destnode(msg); 129 peer = msg_destnode(msg);
132 tipc_link_xmit(buf, peer, msg_link_selector(msg)); 130 tipc_link_xmit(buf, peer, msg_link_selector(msg));
131 tipc_node_remove_conn(peer, p_ptr->ref);
133 } 132 }
134 spin_lock_bh(&tipc_port_list_lock); 133 spin_lock_bh(&tipc_port_list_lock);
135 list_del(&p_ptr->port_list); 134 list_del(&p_ptr->port_list);
@@ -188,22 +187,6 @@ static void port_timeout(unsigned long ref)
188 tipc_link_xmit(buf, msg_destnode(msg), msg_link_selector(msg)); 187 tipc_link_xmit(buf, msg_destnode(msg), msg_link_selector(msg));
189} 188}
190 189
191
192static void port_handle_node_down(unsigned long ref)
193{
194 struct tipc_port *p_ptr = tipc_port_lock(ref);
195 struct sk_buff *buf = NULL;
196 struct tipc_msg *msg = NULL;
197
198 if (!p_ptr)
199 return;
200 buf = port_build_self_abort_msg(p_ptr, TIPC_ERR_NO_NODE);
201 tipc_port_unlock(p_ptr);
202 msg = buf_msg(buf);
203 tipc_link_xmit(buf, msg_destnode(msg), msg_link_selector(msg));
204}
205
206
207static struct sk_buff *port_build_self_abort_msg(struct tipc_port *p_ptr, u32 err) 190static struct sk_buff *port_build_self_abort_msg(struct tipc_port *p_ptr, u32 err)
208{ 191{
209 struct sk_buff *buf = port_build_peer_abort_msg(p_ptr, err); 192 struct sk_buff *buf = port_build_peer_abort_msg(p_ptr, err);
@@ -217,7 +200,6 @@ static struct sk_buff *port_build_self_abort_msg(struct tipc_port *p_ptr, u32 er
217 return buf; 200 return buf;
218} 201}
219 202
220
221static struct sk_buff *port_build_peer_abort_msg(struct tipc_port *p_ptr, u32 err) 203static struct sk_buff *port_build_peer_abort_msg(struct tipc_port *p_ptr, u32 err)
222{ 204{
223 struct sk_buff *buf; 205 struct sk_buff *buf;
@@ -447,11 +429,8 @@ int __tipc_port_connect(u32 ref, struct tipc_port *p_ptr,
447 p_ptr->probing_state = TIPC_CONN_OK; 429 p_ptr->probing_state = TIPC_CONN_OK;
448 p_ptr->connected = 1; 430 p_ptr->connected = 1;
449 k_start_timer(&p_ptr->timer, p_ptr->probing_interval); 431 k_start_timer(&p_ptr->timer, p_ptr->probing_interval);
450 432 res = tipc_node_add_conn(tipc_port_peernode(p_ptr), p_ptr->ref,
451 tipc_nodesub_subscribe(&p_ptr->subscription, peer->node, 433 tipc_port_peerport(p_ptr));
452 (void *)(unsigned long)ref,
453 (net_ev_handler)port_handle_node_down);
454 res = 0;
455exit: 434exit:
456 p_ptr->max_pkt = tipc_node_get_mtu(peer->node, ref); 435 p_ptr->max_pkt = tipc_node_get_mtu(peer->node, ref);
457 return res; 436 return res;
@@ -467,7 +446,7 @@ int __tipc_port_disconnect(struct tipc_port *tp_ptr)
467 if (tp_ptr->connected) { 446 if (tp_ptr->connected) {
468 tp_ptr->connected = 0; 447 tp_ptr->connected = 0;
469 /* let timer expire on it's own to avoid deadlock! */ 448 /* let timer expire on it's own to avoid deadlock! */
470 tipc_nodesub_unsubscribe(&tp_ptr->subscription); 449 tipc_node_remove_conn(tipc_port_peernode(tp_ptr), tp_ptr->ref);
471 return 0; 450 return 0;
472 } 451 }
473 452