summaryrefslogtreecommitdiffstats
path: root/net/tipc/node.c
diff options
context:
space:
mode:
authorJon Paul Maloy <jon.maloy@ericsson.com>2015-10-22 08:51:41 -0400
committerDavid S. Miller <davem@davemloft.net>2015-10-24 09:56:37 -0400
commit5266698661401afc5e4a1a521cf9ba10724d10dd (patch)
treecf3d466a2d9982f403a689e8a0c819c7e3693bde /net/tipc/node.c
parentfd556f209af53b9cdc45df8c467feb235376c4df (diff)
tipc: let broadcast packet reception use new link receive function
The code path for receiving broadcast packets is currently distinct from the unicast path. This leads to unnecessary code and data duplication, something that can be avoided with some effort. We now introduce separate per-peer tipc_link instances for handling broadcast packet reception. Each receive link keeps a pointer to the common, single, broadcast link instance, and can hence handle release and retransmission of send buffers as if they belonged to the own instance. Furthermore, we let each unicast link instance keep a reference to both the pertaining broadcast receive link, and to the common send link. This makes it possible for the unicast links to easily access data for broadcast link synchronization, as well as for carrying acknowledges for received broadcast packets. Signed-off-by: Jon Maloy <jon.maloy@ericsson.com> Reviewed-by: Ying Xue <ying.xue@windriver.com> Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net/tipc/node.c')
-rw-r--r--net/tipc/node.c158
1 files changed, 106 insertions, 52 deletions
diff --git a/net/tipc/node.c b/net/tipc/node.c
index 28bcd7be23c6..cd924552244b 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -72,7 +72,6 @@ static void __tipc_node_link_down(struct tipc_node *n, int *bearer_id,
72static void tipc_node_link_down(struct tipc_node *n, int bearer_id, 72static void tipc_node_link_down(struct tipc_node *n, int bearer_id,
73 bool delete); 73 bool delete);
74static void node_lost_contact(struct tipc_node *n, struct sk_buff_head *inputq); 74static void node_lost_contact(struct tipc_node *n, struct sk_buff_head *inputq);
75static void node_established_contact(struct tipc_node *n_ptr);
76static void tipc_node_delete(struct tipc_node *node); 75static void tipc_node_delete(struct tipc_node *node);
77static void tipc_node_timeout(unsigned long data); 76static void tipc_node_timeout(unsigned long data);
78static void tipc_node_fsm_evt(struct tipc_node *n, int evt); 77static void tipc_node_fsm_evt(struct tipc_node *n, int evt);
@@ -165,8 +164,10 @@ struct tipc_node *tipc_node_create(struct net *net, u32 addr, u16 capabilities)
165 INIT_LIST_HEAD(&n_ptr->list); 164 INIT_LIST_HEAD(&n_ptr->list);
166 INIT_LIST_HEAD(&n_ptr->publ_list); 165 INIT_LIST_HEAD(&n_ptr->publ_list);
167 INIT_LIST_HEAD(&n_ptr->conn_sks); 166 INIT_LIST_HEAD(&n_ptr->conn_sks);
168 skb_queue_head_init(&n_ptr->bclink.namedq); 167 skb_queue_head_init(&n_ptr->bc_entry.namedq);
169 __skb_queue_head_init(&n_ptr->bclink.deferdq); 168 skb_queue_head_init(&n_ptr->bc_entry.inputq1);
169 __skb_queue_head_init(&n_ptr->bc_entry.arrvq);
170 skb_queue_head_init(&n_ptr->bc_entry.inputq2);
170 hlist_add_head_rcu(&n_ptr->hash, &tn->node_htable[tipc_hashfn(addr)]); 171 hlist_add_head_rcu(&n_ptr->hash, &tn->node_htable[tipc_hashfn(addr)]);
171 list_for_each_entry_rcu(temp_node, &tn->node_list, list) { 172 list_for_each_entry_rcu(temp_node, &tn->node_list, list) {
172 if (n_ptr->addr < temp_node->addr) 173 if (n_ptr->addr < temp_node->addr)
@@ -177,6 +178,18 @@ struct tipc_node *tipc_node_create(struct net *net, u32 addr, u16 capabilities)
177 n_ptr->signature = INVALID_NODE_SIG; 178 n_ptr->signature = INVALID_NODE_SIG;
178 n_ptr->active_links[0] = INVALID_BEARER_ID; 179 n_ptr->active_links[0] = INVALID_BEARER_ID;
179 n_ptr->active_links[1] = INVALID_BEARER_ID; 180 n_ptr->active_links[1] = INVALID_BEARER_ID;
181 if (!tipc_link_bc_create(n_ptr, tipc_own_addr(net), n_ptr->addr,
182 U16_MAX, tipc_bc_sndlink(net)->window,
183 n_ptr->capabilities,
184 &n_ptr->bc_entry.inputq1,
185 &n_ptr->bc_entry.namedq,
186 tipc_bc_sndlink(net),
187 &n_ptr->bc_entry.link)) {
188 pr_warn("Broadcast rcv link creation failed, no memory\n");
189 kfree(n_ptr);
190 n_ptr = NULL;
191 goto exit;
192 }
180 tipc_node_get(n_ptr); 193 tipc_node_get(n_ptr);
181 setup_timer(&n_ptr->timer, tipc_node_timeout, (unsigned long)n_ptr); 194 setup_timer(&n_ptr->timer, tipc_node_timeout, (unsigned long)n_ptr);
182 n_ptr->keepalive_intv = U32_MAX; 195 n_ptr->keepalive_intv = U32_MAX;
@@ -203,6 +216,7 @@ static void tipc_node_delete(struct tipc_node *node)
203{ 216{
204 list_del_rcu(&node->list); 217 list_del_rcu(&node->list);
205 hlist_del_rcu(&node->hash); 218 hlist_del_rcu(&node->hash);
219 kfree(node->bc_entry.link);
206 kfree_rcu(node, rcu); 220 kfree_rcu(node, rcu);
207} 221}
208 222
@@ -340,8 +354,9 @@ static void __tipc_node_link_up(struct tipc_node *n, int bearer_id,
340 if (!ol) { 354 if (!ol) {
341 *slot0 = bearer_id; 355 *slot0 = bearer_id;
342 *slot1 = bearer_id; 356 *slot1 = bearer_id;
343 tipc_link_build_bcast_sync_msg(nl, xmitq); 357 tipc_node_fsm_evt(n, SELF_ESTABL_CONTACT_EVT);
344 node_established_contact(n); 358 n->action_flags |= TIPC_NOTIFY_NODE_UP;
359 tipc_bcast_add_peer(n->net, n->addr, nl, xmitq);
345 return; 360 return;
346 } 361 }
347 362
@@ -585,9 +600,10 @@ void tipc_node_check_dest(struct net *net, u32 onode,
585 b->net_plane, b->mtu, b->priority, 600 b->net_plane, b->mtu, b->priority,
586 b->window, mod(tipc_net(net)->random), 601 b->window, mod(tipc_net(net)->random),
587 tipc_own_addr(net), onode, 602 tipc_own_addr(net), onode,
588 n->capabilities, 603 n->capabilities, &le->maddr,
589 &le->maddr, &le->inputq, 604 tipc_bc_sndlink(n->net), n->bc_entry.link,
590 &n->bclink.namedq, &l)) { 605 &le->inputq,
606 &n->bc_entry.namedq, &l)) {
591 *respond = false; 607 *respond = false;
592 goto exit; 608 goto exit;
593 } 609 }
@@ -830,58 +846,36 @@ bool tipc_node_filter_pkt(struct tipc_node *n, struct tipc_msg *hdr)
830 return true; 846 return true;
831} 847}
832 848
833static void node_established_contact(struct tipc_node *n_ptr) 849static void node_lost_contact(struct tipc_node *n,
834{
835 tipc_node_fsm_evt(n_ptr, SELF_ESTABL_CONTACT_EVT);
836 n_ptr->action_flags |= TIPC_NOTIFY_NODE_UP;
837 n_ptr->bclink.oos_state = 0;
838 n_ptr->bclink.acked = tipc_bclink_get_last_sent(n_ptr->net);
839 tipc_bclink_add_node(n_ptr->net, n_ptr->addr);
840}
841
842static void node_lost_contact(struct tipc_node *n_ptr,
843 struct sk_buff_head *inputq) 850 struct sk_buff_head *inputq)
844{ 851{
845 char addr_string[16]; 852 char addr_string[16];
846 struct tipc_sock_conn *conn, *safe; 853 struct tipc_sock_conn *conn, *safe;
847 struct tipc_link *l; 854 struct tipc_link *l;
848 struct list_head *conns = &n_ptr->conn_sks; 855 struct list_head *conns = &n->conn_sks;
849 struct sk_buff *skb; 856 struct sk_buff *skb;
850 struct tipc_net *tn = net_generic(n_ptr->net, tipc_net_id);
851 uint i; 857 uint i;
852 858
853 pr_debug("Lost contact with %s\n", 859 pr_debug("Lost contact with %s\n",
854 tipc_addr_string_fill(addr_string, n_ptr->addr)); 860 tipc_addr_string_fill(addr_string, n->addr));
855
856 /* Flush broadcast link info associated with lost node */
857 if (n_ptr->bclink.recv_permitted) {
858 __skb_queue_purge(&n_ptr->bclink.deferdq);
859 861
860 if (n_ptr->bclink.reasm_buf) { 862 /* Clean up broadcast state */
861 kfree_skb(n_ptr->bclink.reasm_buf); 863 tipc_bcast_remove_peer(n->net, n->addr, n->bc_entry.link);
862 n_ptr->bclink.reasm_buf = NULL;
863 }
864
865 tipc_bclink_remove_node(n_ptr->net, n_ptr->addr);
866 tipc_bclink_acknowledge(n_ptr, INVALID_LINK_SEQ);
867
868 n_ptr->bclink.recv_permitted = false;
869 }
870 864
871 /* Abort any ongoing link failover */ 865 /* Abort any ongoing link failover */
872 for (i = 0; i < MAX_BEARERS; i++) { 866 for (i = 0; i < MAX_BEARERS; i++) {
873 l = n_ptr->links[i].link; 867 l = n->links[i].link;
874 if (l) 868 if (l)
875 tipc_link_fsm_evt(l, LINK_FAILOVER_END_EVT); 869 tipc_link_fsm_evt(l, LINK_FAILOVER_END_EVT);
876 } 870 }
877 871
878 /* Notify publications from this node */ 872 /* Notify publications from this node */
879 n_ptr->action_flags |= TIPC_NOTIFY_NODE_DOWN; 873 n->action_flags |= TIPC_NOTIFY_NODE_DOWN;
880 874
881 /* Notify sockets connected to node */ 875 /* Notify sockets connected to node */
882 list_for_each_entry_safe(conn, safe, conns, list) { 876 list_for_each_entry_safe(conn, safe, conns, list) {
883 skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, TIPC_CONN_MSG, 877 skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, TIPC_CONN_MSG,
884 SHORT_H_SIZE, 0, tn->own_addr, 878 SHORT_H_SIZE, 0, tipc_own_addr(n->net),
885 conn->peer_node, conn->port, 879 conn->peer_node, conn->port,
886 conn->peer_port, TIPC_ERR_NO_NODE); 880 conn->peer_port, TIPC_ERR_NO_NODE);
887 if (likely(skb)) 881 if (likely(skb))
@@ -1086,6 +1080,67 @@ int tipc_node_xmit_skb(struct net *net, struct sk_buff *skb, u32 dnode,
1086} 1080}
1087 1081
1088/** 1082/**
1083 * tipc_node_bc_rcv - process TIPC broadcast packet arriving from off-node
1084 * @net: the applicable net namespace
1085 * @skb: TIPC packet
1086 * @bearer_id: id of bearer message arrived on
1087 *
1088 * Invoked with no locks held.
1089 */
1090void tipc_node_bc_rcv(struct net *net, struct sk_buff *skb, int bearer_id)
1091{
1092 int rc;
1093 struct sk_buff_head xmitq;
1094 struct tipc_bclink_entry *be;
1095 struct tipc_link_entry *le;
1096 struct tipc_msg *hdr = buf_msg(skb);
1097 int usr = msg_user(hdr);
1098 u32 dnode = msg_destnode(hdr);
1099 struct tipc_node *n;
1100
1101 __skb_queue_head_init(&xmitq);
1102
1103 /* If NACK for other node, let rcv link for that node peek into it */
1104 if ((usr == BCAST_PROTOCOL) && (dnode != tipc_own_addr(net)))
1105 n = tipc_node_find(net, dnode);
1106 else
1107 n = tipc_node_find(net, msg_prevnode(hdr));
1108 if (!n) {
1109 kfree_skb(skb);
1110 return;
1111 }
1112 be = &n->bc_entry;
1113 le = &n->links[bearer_id];
1114
1115 rc = tipc_bcast_rcv(net, be->link, skb);
1116
1117 /* Broadcast link reset may happen at reassembly failure */
1118 if (rc & TIPC_LINK_DOWN_EVT)
1119 tipc_node_reset_links(n);
1120
1121 /* Broadcast ACKs are sent on a unicast link */
1122 if (rc & TIPC_LINK_SND_BC_ACK) {
1123 tipc_node_lock(n);
1124 tipc_link_build_ack_msg(le->link, &xmitq);
1125 tipc_node_unlock(n);
1126 }
1127
1128 if (!skb_queue_empty(&xmitq))
1129 tipc_bearer_xmit(net, bearer_id, &xmitq, &le->maddr);
1130
1131 /* Deliver. 'arrvq' is under inputq2's lock protection */
1132 if (!skb_queue_empty(&be->inputq1)) {
1133 spin_lock_bh(&be->inputq2.lock);
1134 spin_lock_bh(&be->inputq1.lock);
1135 skb_queue_splice_tail_init(&be->inputq1, &be->arrvq);
1136 spin_unlock_bh(&be->inputq1.lock);
1137 spin_unlock_bh(&be->inputq2.lock);
1138 tipc_sk_mcast_rcv(net, &be->arrvq, &be->inputq2);
1139 }
1140 tipc_node_put(n);
1141}
1142
1143/**
1089 * tipc_node_check_state - check and if necessary update node state 1144 * tipc_node_check_state - check and if necessary update node state
1090 * @skb: TIPC packet 1145 * @skb: TIPC packet
1091 * @bearer_id: identity of bearer delivering the packet 1146 * @bearer_id: identity of bearer delivering the packet
@@ -1227,6 +1282,7 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b)
1227 int usr = msg_user(hdr); 1282 int usr = msg_user(hdr);
1228 int bearer_id = b->identity; 1283 int bearer_id = b->identity;
1229 struct tipc_link_entry *le; 1284 struct tipc_link_entry *le;
1285 u16 bc_ack = msg_bcast_ack(hdr);
1230 int rc = 0; 1286 int rc = 0;
1231 1287
1232 __skb_queue_head_init(&xmitq); 1288 __skb_queue_head_init(&xmitq);
@@ -1235,13 +1291,12 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b)
1235 if (unlikely(!tipc_msg_validate(skb))) 1291 if (unlikely(!tipc_msg_validate(skb)))
1236 goto discard; 1292 goto discard;
1237 1293
1238 /* Handle arrival of a non-unicast link packet */ 1294 /* Handle arrival of discovery or broadcast packet */
1239 if (unlikely(msg_non_seq(hdr))) { 1295 if (unlikely(msg_non_seq(hdr))) {
1240 if (usr == LINK_CONFIG) 1296 if (unlikely(usr == LINK_CONFIG))
1241 tipc_disc_rcv(net, skb, b); 1297 return tipc_disc_rcv(net, skb, b);
1242 else 1298 else
1243 tipc_bclink_rcv(net, skb); 1299 return tipc_node_bc_rcv(net, skb, bearer_id);
1244 return;
1245 } 1300 }
1246 1301
1247 /* Locate neighboring node that sent packet */ 1302 /* Locate neighboring node that sent packet */
@@ -1250,19 +1305,18 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b)
1250 goto discard; 1305 goto discard;
1251 le = &n->links[bearer_id]; 1306 le = &n->links[bearer_id];
1252 1307
1308 /* Ensure broadcast reception is in synch with peer's send state */
1309 if (unlikely(usr == LINK_PROTOCOL))
1310 tipc_bcast_sync_rcv(net, n->bc_entry.link, hdr);
1311 else if (unlikely(n->bc_entry.link->acked != bc_ack))
1312 tipc_bcast_ack_rcv(net, n->bc_entry.link, bc_ack);
1313
1253 tipc_node_lock(n); 1314 tipc_node_lock(n);
1254 1315
1255 /* Is reception permitted at the moment ? */ 1316 /* Is reception permitted at the moment ? */
1256 if (!tipc_node_filter_pkt(n, hdr)) 1317 if (!tipc_node_filter_pkt(n, hdr))
1257 goto unlock; 1318 goto unlock;
1258 1319
1259 if (unlikely(msg_user(hdr) == LINK_PROTOCOL))
1260 tipc_bclink_sync_state(n, hdr);
1261
1262 /* Release acked broadcast packets */
1263 if (unlikely(n->bclink.acked != msg_bcast_ack(hdr)))
1264 tipc_bclink_acknowledge(n, msg_bcast_ack(hdr));
1265
1266 /* Check and if necessary update node state */ 1320 /* Check and if necessary update node state */
1267 if (likely(tipc_node_check_state(n, skb, bearer_id, &xmitq))) { 1321 if (likely(tipc_node_check_state(n, skb, bearer_id, &xmitq))) {
1268 rc = tipc_link_rcv(le->link, skb, &xmitq); 1322 rc = tipc_link_rcv(le->link, skb, &xmitq);
@@ -1277,8 +1331,8 @@ unlock:
1277 if (unlikely(rc & TIPC_LINK_DOWN_EVT)) 1331 if (unlikely(rc & TIPC_LINK_DOWN_EVT))
1278 tipc_node_link_down(n, bearer_id, false); 1332 tipc_node_link_down(n, bearer_id, false);
1279 1333
1280 if (unlikely(!skb_queue_empty(&n->bclink.namedq))) 1334 if (unlikely(!skb_queue_empty(&n->bc_entry.namedq)))
1281 tipc_named_rcv(net, &n->bclink.namedq); 1335 tipc_named_rcv(net, &n->bc_entry.namedq);
1282 1336
1283 if (!skb_queue_empty(&le->inputq)) 1337 if (!skb_queue_empty(&le->inputq))
1284 tipc_sk_rcv(net, &le->inputq); 1338 tipc_sk_rcv(net, &le->inputq);