aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc
diff options
context:
space:
mode:
authorJon Paul Maloy <jon.maloy@ericsson.com>2015-02-05 08:36:41 -0500
committerDavid S. Miller <davem@davemloft.net>2015-02-05 19:00:02 -0500
commitc637c1035534867b85b78b453c38c495b58e2c5a (patch)
tree77cd2a48a5b04e43b014da64168a6c1e209a1d40 /net/tipc
parent94153e36e709e78fc4e1f93dc4e4da785690c7d1 (diff)
tipc: resolve race problem at unicast message reception
TIPC handles message cardinality and sequencing at the link layer, before passing messages upwards to the destination sockets. During the upcall from link to socket no locks are held. It is therefore possible, and we see it happen occasionally, that messages arriving in different threads and delivered in sequence still bypass each other before they reach the destination socket. This must not happen, since it violates the sequentiality guarantee. We solve this by adding a new input buffer queue to the link structure. Arriving messages are added safely to the tail of that queue by the link, while the head of the queue is consumed, also safely, by the receiving socket. Sequentiality is secured per socket by only allowing buffers to be dequeued inside the socket lock. Since there may be multiple simultaneous readers of the queue, we use a 'filter' parameter to reduce the risk that they peek the same buffer from the queue, hence also reducing the risk of contention on the receiving socket locks. This solves the sequentiality problem, and seems to cause no measurable performance degradation. A nice side effect of this change is that lock handling in the functions tipc_rcv() and tipc_bcast_rcv() now becomes uniform, something that will enable future simplifications of those functions. Reviewed-by: Ying Xue <ying.xue@windriver.com> Signed-off-by: Jon Maloy <jon.maloy@ericsson.com> Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net/tipc')
-rw-r--r--net/tipc/bcast.c20
-rw-r--r--net/tipc/link.c247
-rw-r--r--net/tipc/link.h10
-rw-r--r--net/tipc/msg.c34
-rw-r--r--net/tipc/msg.h73
-rw-r--r--net/tipc/name_distr.c33
-rw-r--r--net/tipc/name_distr.h2
-rw-r--r--net/tipc/node.c43
-rw-r--r--net/tipc/node.h17
-rw-r--r--net/tipc/socket.c132
-rw-r--r--net/tipc/socket.h2
11 files changed, 372 insertions, 241 deletions
diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index 3b886eb35c87..2dfaf272928a 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -189,10 +189,8 @@ static void bclink_retransmit_pkt(struct tipc_net *tn, u32 after, u32 to)
189void tipc_bclink_wakeup_users(struct net *net) 189void tipc_bclink_wakeup_users(struct net *net)
190{ 190{
191 struct tipc_net *tn = net_generic(net, tipc_net_id); 191 struct tipc_net *tn = net_generic(net, tipc_net_id);
192 struct sk_buff *skb;
193 192
194 while ((skb = skb_dequeue(&tn->bclink->link.waiting_sks))) 193 tipc_sk_rcv(net, &tn->bclink->link.wakeupq);
195 tipc_sk_rcv(net, skb);
196} 194}
197 195
198/** 196/**
@@ -271,9 +269,8 @@ void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked)
271 tipc_link_push_packets(tn->bcl); 269 tipc_link_push_packets(tn->bcl);
272 bclink_set_last_sent(net); 270 bclink_set_last_sent(net);
273 } 271 }
274 if (unlikely(released && !skb_queue_empty(&tn->bcl->waiting_sks))) 272 if (unlikely(released && !skb_queue_empty(&tn->bcl->wakeupq)))
275 n_ptr->action_flags |= TIPC_WAKEUP_BCAST_USERS; 273 n_ptr->action_flags |= TIPC_WAKEUP_BCAST_USERS;
276
277exit: 274exit:
278 tipc_bclink_unlock(net); 275 tipc_bclink_unlock(net);
279} 276}
@@ -450,6 +447,9 @@ void tipc_bclink_rcv(struct net *net, struct sk_buff *buf)
450 u32 next_in; 447 u32 next_in;
451 u32 seqno; 448 u32 seqno;
452 int deferred = 0; 449 int deferred = 0;
450 int pos = 0;
451 struct sk_buff *iskb;
452 struct sk_buff_head msgs;
453 453
454 /* Screen out unwanted broadcast messages */ 454 /* Screen out unwanted broadcast messages */
455 if (msg_mc_netid(msg) != tn->net_id) 455 if (msg_mc_netid(msg) != tn->net_id)
@@ -506,7 +506,8 @@ receive:
506 bcl->stats.recv_bundled += msg_msgcnt(msg); 506 bcl->stats.recv_bundled += msg_msgcnt(msg);
507 tipc_bclink_unlock(net); 507 tipc_bclink_unlock(net);
508 tipc_node_unlock(node); 508 tipc_node_unlock(node);
509 tipc_link_bundle_rcv(net, buf); 509 while (tipc_msg_extract(buf, &iskb, &pos))
510 tipc_sk_mcast_rcv(net, iskb);
510 } else if (msg_user(msg) == MSG_FRAGMENTER) { 511 } else if (msg_user(msg) == MSG_FRAGMENTER) {
511 tipc_buf_append(&node->bclink.reasm_buf, &buf); 512 tipc_buf_append(&node->bclink.reasm_buf, &buf);
512 if (unlikely(!buf && !node->bclink.reasm_buf)) 513 if (unlikely(!buf && !node->bclink.reasm_buf))
@@ -527,7 +528,9 @@ receive:
527 bclink_accept_pkt(node, seqno); 528 bclink_accept_pkt(node, seqno);
528 tipc_bclink_unlock(net); 529 tipc_bclink_unlock(net);
529 tipc_node_unlock(node); 530 tipc_node_unlock(node);
530 tipc_named_rcv(net, buf); 531 skb_queue_head_init(&msgs);
532 skb_queue_tail(&msgs, buf);
533 tipc_named_rcv(net, &msgs);
531 } else { 534 } else {
532 tipc_bclink_lock(net); 535 tipc_bclink_lock(net);
533 bclink_accept_pkt(node, seqno); 536 bclink_accept_pkt(node, seqno);
@@ -944,10 +947,9 @@ int tipc_bclink_init(struct net *net)
944 spin_lock_init(&bclink->lock); 947 spin_lock_init(&bclink->lock);
945 __skb_queue_head_init(&bcl->outqueue); 948 __skb_queue_head_init(&bcl->outqueue);
946 __skb_queue_head_init(&bcl->deferred_queue); 949 __skb_queue_head_init(&bcl->deferred_queue);
947 skb_queue_head_init(&bcl->waiting_sks); 950 skb_queue_head_init(&bcl->wakeupq);
948 bcl->next_out_no = 1; 951 bcl->next_out_no = 1;
949 spin_lock_init(&bclink->node.lock); 952 spin_lock_init(&bclink->node.lock);
950 __skb_queue_head_init(&bclink->node.waiting_sks);
951 bcl->owner = &bclink->node; 953 bcl->owner = &bclink->node;
952 bcl->owner->net = net; 954 bcl->owner->net = net;
953 bcl->max_pkt = MAX_PKT_DEFAULT_MCAST; 955 bcl->max_pkt = MAX_PKT_DEFAULT_MCAST;
diff --git a/net/tipc/link.c b/net/tipc/link.c
index 41cb09aa41de..942491234099 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -113,10 +113,8 @@ static void link_reset_statistics(struct tipc_link *l_ptr);
113static void link_print(struct tipc_link *l_ptr, const char *str); 113static void link_print(struct tipc_link *l_ptr, const char *str);
114static void tipc_link_sync_xmit(struct tipc_link *l); 114static void tipc_link_sync_xmit(struct tipc_link *l);
115static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf); 115static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf);
116static int tipc_link_input(struct net *net, struct tipc_link *l, 116static void tipc_link_input(struct tipc_link *l, struct sk_buff *skb);
117 struct sk_buff *buf); 117static bool tipc_data_input(struct tipc_link *l, struct sk_buff *skb);
118static int tipc_link_prepare_input(struct net *net, struct tipc_link *l,
119 struct sk_buff **buf);
120 118
121/* 119/*
122 * Simple link routines 120 * Simple link routines
@@ -318,8 +316,9 @@ struct tipc_link *tipc_link_create(struct tipc_node *n_ptr,
318 l_ptr->next_out_no = 1; 316 l_ptr->next_out_no = 1;
319 __skb_queue_head_init(&l_ptr->outqueue); 317 __skb_queue_head_init(&l_ptr->outqueue);
320 __skb_queue_head_init(&l_ptr->deferred_queue); 318 __skb_queue_head_init(&l_ptr->deferred_queue);
321 skb_queue_head_init(&l_ptr->waiting_sks); 319 skb_queue_head_init(&l_ptr->wakeupq);
322 320 skb_queue_head_init(&l_ptr->inputq);
321 skb_queue_head_init(&l_ptr->namedq);
323 link_reset_statistics(l_ptr); 322 link_reset_statistics(l_ptr);
324 tipc_node_attach_link(n_ptr, l_ptr); 323 tipc_node_attach_link(n_ptr, l_ptr);
325 setup_timer(&l_ptr->timer, link_timeout, (unsigned long)l_ptr); 324 setup_timer(&l_ptr->timer, link_timeout, (unsigned long)l_ptr);
@@ -387,7 +386,7 @@ static bool link_schedule_user(struct tipc_link *link, u32 oport,
387 return false; 386 return false;
388 TIPC_SKB_CB(buf)->chain_sz = chain_sz; 387 TIPC_SKB_CB(buf)->chain_sz = chain_sz;
389 TIPC_SKB_CB(buf)->chain_imp = imp; 388 TIPC_SKB_CB(buf)->chain_imp = imp;
390 skb_queue_tail(&link->waiting_sks, buf); 389 skb_queue_tail(&link->wakeupq, buf);
391 link->stats.link_congs++; 390 link->stats.link_congs++;
392 return true; 391 return true;
393} 392}
@@ -398,17 +397,19 @@ static bool link_schedule_user(struct tipc_link *link, u32 oport,
398 * Move a number of waiting users, as permitted by available space in 397 * Move a number of waiting users, as permitted by available space in
399 * the send queue, from link wait queue to node wait queue for wakeup 398 * the send queue, from link wait queue to node wait queue for wakeup
400 */ 399 */
401static void link_prepare_wakeup(struct tipc_link *link) 400void link_prepare_wakeup(struct tipc_link *link)
402{ 401{
403 uint pend_qsz = skb_queue_len(&link->outqueue); 402 uint pend_qsz = skb_queue_len(&link->outqueue);
404 struct sk_buff *skb, *tmp; 403 struct sk_buff *skb, *tmp;
405 404
406 skb_queue_walk_safe(&link->waiting_sks, skb, tmp) { 405 skb_queue_walk_safe(&link->wakeupq, skb, tmp) {
407 if (pend_qsz >= link->queue_limit[TIPC_SKB_CB(skb)->chain_imp]) 406 if (pend_qsz >= link->queue_limit[TIPC_SKB_CB(skb)->chain_imp])
408 break; 407 break;
409 pend_qsz += TIPC_SKB_CB(skb)->chain_sz; 408 pend_qsz += TIPC_SKB_CB(skb)->chain_sz;
410 skb_unlink(skb, &link->waiting_sks); 409 skb_unlink(skb, &link->wakeupq);
411 skb_queue_tail(&link->owner->waiting_sks, skb); 410 skb_queue_tail(&link->inputq, skb);
411 link->owner->inputq = &link->inputq;
412 link->owner->action_flags |= TIPC_MSG_EVT;
412 } 413 }
413} 414}
414 415
@@ -461,13 +462,13 @@ void tipc_link_reset(struct tipc_link *l_ptr)
461 l_ptr->exp_msg_count = START_CHANGEOVER; 462 l_ptr->exp_msg_count = START_CHANGEOVER;
462 } 463 }
463 464
464 /* Clean up all queues: */ 465 /* Clean up all queues, except inputq: */
465 __skb_queue_purge(&l_ptr->outqueue); 466 __skb_queue_purge(&l_ptr->outqueue);
466 __skb_queue_purge(&l_ptr->deferred_queue); 467 __skb_queue_purge(&l_ptr->deferred_queue);
467 if (!skb_queue_empty(&l_ptr->waiting_sks)) { 468 skb_queue_splice_init(&l_ptr->wakeupq, &l_ptr->inputq);
468 skb_queue_splice_init(&l_ptr->waiting_sks, &owner->waiting_sks); 469 if (!skb_queue_empty(&l_ptr->inputq))
469 owner->action_flags |= TIPC_WAKEUP_USERS; 470 owner->action_flags |= TIPC_MSG_EVT;
470 } 471 owner->inputq = &l_ptr->inputq;
471 l_ptr->next_out = NULL; 472 l_ptr->next_out = NULL;
472 l_ptr->unacked_window = 0; 473 l_ptr->unacked_window = 0;
473 l_ptr->checkpoint = 1; 474 l_ptr->checkpoint = 1;
@@ -795,7 +796,7 @@ int __tipc_link_xmit(struct net *net, struct tipc_link *link,
795 796
796static void skb2list(struct sk_buff *skb, struct sk_buff_head *list) 797static void skb2list(struct sk_buff *skb, struct sk_buff_head *list)
797{ 798{
798 __skb_queue_head_init(list); 799 skb_queue_head_init(list);
799 __skb_queue_tail(list, skb); 800 __skb_queue_tail(list, skb);
800} 801}
801 802
@@ -841,19 +842,13 @@ int tipc_link_xmit(struct net *net, struct sk_buff_head *list, u32 dnode,
841 rc = __tipc_link_xmit(net, link, list); 842 rc = __tipc_link_xmit(net, link, list);
842 tipc_node_unlock(node); 843 tipc_node_unlock(node);
843 } 844 }
844
845 if (link) 845 if (link)
846 return rc; 846 return rc;
847 847
848 if (likely(in_own_node(net, dnode))) { 848 if (likely(in_own_node(net, dnode)))
849 /* As a node local message chain never contains more than one 849 return tipc_sk_rcv(net, list);
850 * buffer, we just need to dequeue one SKB buffer from the
851 * head list.
852 */
853 return tipc_sk_rcv(net, __skb_dequeue(list));
854 }
855 __skb_queue_purge(list);
856 850
851 __skb_queue_purge(list);
857 return rc; 852 return rc;
858} 853}
859 854
@@ -1162,7 +1157,7 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b_ptr)
1162 /* Locate unicast link endpoint that should handle message */ 1157 /* Locate unicast link endpoint that should handle message */
1163 l_ptr = n_ptr->links[b_ptr->identity]; 1158 l_ptr = n_ptr->links[b_ptr->identity];
1164 if (unlikely(!l_ptr)) 1159 if (unlikely(!l_ptr))
1165 goto unlock_discard; 1160 goto unlock;
1166 1161
1167 /* Verify that communication with node is currently allowed */ 1162 /* Verify that communication with node is currently allowed */
1168 if ((n_ptr->action_flags & TIPC_WAIT_PEER_LINKS_DOWN) && 1163 if ((n_ptr->action_flags & TIPC_WAIT_PEER_LINKS_DOWN) &&
@@ -1173,7 +1168,7 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b_ptr)
1173 n_ptr->action_flags &= ~TIPC_WAIT_PEER_LINKS_DOWN; 1168 n_ptr->action_flags &= ~TIPC_WAIT_PEER_LINKS_DOWN;
1174 1169
1175 if (tipc_node_blocked(n_ptr)) 1170 if (tipc_node_blocked(n_ptr))
1176 goto unlock_discard; 1171 goto unlock;
1177 1172
1178 /* Validate message sequence number info */ 1173 /* Validate message sequence number info */
1179 seq_no = msg_seqno(msg); 1174 seq_no = msg_seqno(msg);
@@ -1197,18 +1192,16 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b_ptr)
1197 if (unlikely(l_ptr->next_out)) 1192 if (unlikely(l_ptr->next_out))
1198 tipc_link_push_packets(l_ptr); 1193 tipc_link_push_packets(l_ptr);
1199 1194
1200 if (released && !skb_queue_empty(&l_ptr->waiting_sks)) { 1195 if (released && !skb_queue_empty(&l_ptr->wakeupq))
1201 link_prepare_wakeup(l_ptr); 1196 link_prepare_wakeup(l_ptr);
1202 l_ptr->owner->action_flags |= TIPC_WAKEUP_USERS;
1203 }
1204 1197
1205 /* Process the incoming packet */ 1198 /* Process the incoming packet */
1206 if (unlikely(!link_working_working(l_ptr))) { 1199 if (unlikely(!link_working_working(l_ptr))) {
1207 if (msg_user(msg) == LINK_PROTOCOL) { 1200 if (msg_user(msg) == LINK_PROTOCOL) {
1208 tipc_link_proto_rcv(l_ptr, skb); 1201 tipc_link_proto_rcv(l_ptr, skb);
1209 link_retrieve_defq(l_ptr, &head); 1202 link_retrieve_defq(l_ptr, &head);
1210 tipc_node_unlock(n_ptr); 1203 skb = NULL;
1211 continue; 1204 goto unlock;
1212 } 1205 }
1213 1206
1214 /* Traffic message. Conditionally activate link */ 1207 /* Traffic message. Conditionally activate link */
@@ -1217,18 +1210,18 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b_ptr)
1217 if (link_working_working(l_ptr)) { 1210 if (link_working_working(l_ptr)) {
1218 /* Re-insert buffer in front of queue */ 1211 /* Re-insert buffer in front of queue */
1219 __skb_queue_head(&head, skb); 1212 __skb_queue_head(&head, skb);
1220 tipc_node_unlock(n_ptr); 1213 skb = NULL;
1221 continue; 1214 goto unlock;
1222 } 1215 }
1223 goto unlock_discard; 1216 goto unlock;
1224 } 1217 }
1225 1218
1226 /* Link is now in state WORKING_WORKING */ 1219 /* Link is now in state WORKING_WORKING */
1227 if (unlikely(seq_no != mod(l_ptr->next_in_no))) { 1220 if (unlikely(seq_no != mod(l_ptr->next_in_no))) {
1228 link_handle_out_of_seq_msg(l_ptr, skb); 1221 link_handle_out_of_seq_msg(l_ptr, skb);
1229 link_retrieve_defq(l_ptr, &head); 1222 link_retrieve_defq(l_ptr, &head);
1230 tipc_node_unlock(n_ptr); 1223 skb = NULL;
1231 continue; 1224 goto unlock;
1232 } 1225 }
1233 l_ptr->next_in_no++; 1226 l_ptr->next_in_no++;
1234 if (unlikely(!skb_queue_empty(&l_ptr->deferred_queue))) 1227 if (unlikely(!skb_queue_empty(&l_ptr->deferred_queue)))
@@ -1238,97 +1231,102 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b_ptr)
1238 l_ptr->stats.sent_acks++; 1231 l_ptr->stats.sent_acks++;
1239 tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0, 0); 1232 tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);
1240 } 1233 }
1241 1234 tipc_link_input(l_ptr, skb);
1242 if (tipc_link_prepare_input(net, l_ptr, &skb)) { 1235 skb = NULL;
1243 tipc_node_unlock(n_ptr); 1236unlock:
1244 continue;
1245 }
1246 tipc_node_unlock(n_ptr);
1247
1248 if (tipc_link_input(net, l_ptr, skb) != 0)
1249 goto discard;
1250 continue;
1251unlock_discard:
1252 tipc_node_unlock(n_ptr); 1237 tipc_node_unlock(n_ptr);
1253discard: 1238discard:
1254 kfree_skb(skb); 1239 if (unlikely(skb))
1240 kfree_skb(skb);
1255 } 1241 }
1256} 1242}
1257 1243
1258/** 1244/* tipc_data_input - deliver data and name distr msgs to upper layer
1259 * tipc_link_prepare_input - process TIPC link messages
1260 *
1261 * returns nonzero if the message was consumed
1262 * 1245 *
1246 * Consumes buffer if message is of right type
1263 * Node lock must be held 1247 * Node lock must be held
1264 */ 1248 */
1265static int tipc_link_prepare_input(struct net *net, struct tipc_link *l, 1249static bool tipc_data_input(struct tipc_link *link, struct sk_buff *skb)
1266 struct sk_buff **buf)
1267{ 1250{
1268 struct tipc_node *n; 1251 struct tipc_node *node = link->owner;
1269 struct tipc_msg *msg; 1252 struct tipc_msg *msg = buf_msg(skb);
1270 int res = -EINVAL; 1253 u32 dport = msg_destport(msg);
1271 1254
1272 n = l->owner;
1273 msg = buf_msg(*buf);
1274 switch (msg_user(msg)) { 1255 switch (msg_user(msg)) {
1275 case CHANGEOVER_PROTOCOL: 1256 case TIPC_LOW_IMPORTANCE:
1276 if (tipc_link_tunnel_rcv(n, buf)) 1257 case TIPC_MEDIUM_IMPORTANCE:
1277 res = 0; 1258 case TIPC_HIGH_IMPORTANCE:
1278 break; 1259 case TIPC_CRITICAL_IMPORTANCE:
1279 case MSG_FRAGMENTER: 1260 case CONN_MANAGER:
1280 l->stats.recv_fragments++; 1261 if (tipc_skb_queue_tail(&link->inputq, skb, dport)) {
1281 if (tipc_buf_append(&l->reasm_buf, buf)) { 1262 node->inputq = &link->inputq;
1282 l->stats.recv_fragmented++; 1263 node->action_flags |= TIPC_MSG_EVT;
1283 res = 0;
1284 } else if (!l->reasm_buf) {
1285 tipc_link_reset(l);
1286 } 1264 }
1287 break; 1265 return true;
1288 case MSG_BUNDLER:
1289 l->stats.recv_bundles++;
1290 l->stats.recv_bundled += msg_msgcnt(msg);
1291 res = 0;
1292 break;
1293 case NAME_DISTRIBUTOR: 1266 case NAME_DISTRIBUTOR:
1294 n->bclink.recv_permitted = true; 1267 node->bclink.recv_permitted = true;
1295 res = 0; 1268 node->namedq = &link->namedq;
1296 break; 1269 skb_queue_tail(&link->namedq, skb);
1270 if (skb_queue_len(&link->namedq) == 1)
1271 node->action_flags |= TIPC_NAMED_MSG_EVT;
1272 return true;
1273 case MSG_BUNDLER:
1274 case CHANGEOVER_PROTOCOL:
1275 case MSG_FRAGMENTER:
1297 case BCAST_PROTOCOL: 1276 case BCAST_PROTOCOL:
1298 tipc_link_sync_rcv(n, *buf); 1277 return false;
1299 break;
1300 default: 1278 default:
1301 res = 0; 1279 pr_warn("Dropping received illegal msg type\n");
1302 } 1280 kfree_skb(skb);
1303 return res; 1281 return false;
1282 };
1304} 1283}
1305/** 1284
1306 * tipc_link_input - Deliver message too higher layers 1285/* tipc_link_input - process packet that has passed link protocol check
1286 *
1287 * Consumes buffer
1288 * Node lock must be held
1307 */ 1289 */
1308static int tipc_link_input(struct net *net, struct tipc_link *l, 1290static void tipc_link_input(struct tipc_link *link, struct sk_buff *skb)
1309 struct sk_buff *buf)
1310{ 1291{
1311 struct tipc_msg *msg = buf_msg(buf); 1292 struct tipc_node *node = link->owner;
1312 int res = 0; 1293 struct tipc_msg *msg = buf_msg(skb);
1294 struct sk_buff *iskb;
1295 int pos = 0;
1296
1297 if (likely(tipc_data_input(link, skb)))
1298 return;
1313 1299
1314 switch (msg_user(msg)) { 1300 switch (msg_user(msg)) {
1315 case TIPC_LOW_IMPORTANCE: 1301 case CHANGEOVER_PROTOCOL:
1316 case TIPC_MEDIUM_IMPORTANCE: 1302 if (!tipc_link_tunnel_rcv(node, &skb))
1317 case TIPC_HIGH_IMPORTANCE: 1303 break;
1318 case TIPC_CRITICAL_IMPORTANCE: 1304 if (msg_user(buf_msg(skb)) != MSG_BUNDLER) {
1319 case CONN_MANAGER: 1305 tipc_data_input(link, skb);
1320 tipc_sk_rcv(net, buf); 1306 break;
1307 }
1308 case MSG_BUNDLER:
1309 link->stats.recv_bundles++;
1310 link->stats.recv_bundled += msg_msgcnt(msg);
1311
1312 while (tipc_msg_extract(skb, &iskb, &pos))
1313 tipc_data_input(link, iskb);
1321 break; 1314 break;
1322 case NAME_DISTRIBUTOR: 1315 case MSG_FRAGMENTER:
1323 tipc_named_rcv(net, buf); 1316 link->stats.recv_fragments++;
1317 if (tipc_buf_append(&link->reasm_buf, &skb)) {
1318 link->stats.recv_fragmented++;
1319 tipc_data_input(link, skb);
1320 } else if (!link->reasm_buf) {
1321 tipc_link_reset(link);
1322 }
1324 break; 1323 break;
1325 case MSG_BUNDLER: 1324 case BCAST_PROTOCOL:
1326 tipc_link_bundle_rcv(net, buf); 1325 tipc_link_sync_rcv(node, skb);
1327 break; 1326 break;
1328 default: 1327 default:
1329 res = -EINVAL; 1328 break;
1330 } 1329 };
1331 return res;
1332} 1330}
1333 1331
1334/** 1332/**
@@ -1779,7 +1777,7 @@ void tipc_link_dup_queue_xmit(struct tipc_link *l_ptr,
1779 * @from_pos: offset to extract from 1777 * @from_pos: offset to extract from
1780 * 1778 *
1781 * Returns a new message buffer containing an embedded message. The 1779 * Returns a new message buffer containing an embedded message. The
1782 * encapsulating message itself is left unchanged. 1780 * encapsulating buffer is left unchanged.
1783 */ 1781 */
1784static struct sk_buff *buf_extract(struct sk_buff *skb, u32 from_pos) 1782static struct sk_buff *buf_extract(struct sk_buff *skb, u32 from_pos)
1785{ 1783{
@@ -1793,8 +1791,6 @@ static struct sk_buff *buf_extract(struct sk_buff *skb, u32 from_pos)
1793 return eb; 1791 return eb;
1794} 1792}
1795 1793
1796
1797
1798/* tipc_link_dup_rcv(): Receive a tunnelled DUPLICATE_MSG packet. 1794/* tipc_link_dup_rcv(): Receive a tunnelled DUPLICATE_MSG packet.
1799 * Owner node is locked. 1795 * Owner node is locked.
1800 */ 1796 */
@@ -1893,41 +1889,6 @@ exit:
1893 return *buf != NULL; 1889 return *buf != NULL;
1894} 1890}
1895 1891
1896/*
1897 * Bundler functionality:
1898 */
1899void tipc_link_bundle_rcv(struct net *net, struct sk_buff *buf)
1900{
1901 u32 msgcount = msg_msgcnt(buf_msg(buf));
1902 u32 pos = INT_H_SIZE;
1903 struct sk_buff *obuf;
1904 struct tipc_msg *omsg;
1905
1906 while (msgcount--) {
1907 obuf = buf_extract(buf, pos);
1908 if (obuf == NULL) {
1909 pr_warn("Link unable to unbundle message(s)\n");
1910 break;
1911 }
1912 omsg = buf_msg(obuf);
1913 pos += align(msg_size(omsg));
1914 if (msg_isdata(omsg)) {
1915 if (unlikely(msg_type(omsg) == TIPC_MCAST_MSG))
1916 tipc_sk_mcast_rcv(net, obuf);
1917 else
1918 tipc_sk_rcv(net, obuf);
1919 } else if (msg_user(omsg) == CONN_MANAGER) {
1920 tipc_sk_rcv(net, obuf);
1921 } else if (msg_user(omsg) == NAME_DISTRIBUTOR) {
1922 tipc_named_rcv(net, obuf);
1923 } else {
1924 pr_warn("Illegal bundled msg: %u\n", msg_user(omsg));
1925 kfree_skb(obuf);
1926 }
1927 }
1928 kfree_skb(buf);
1929}
1930
1931static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tol) 1892static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tol)
1932{ 1893{
1933 unsigned long intv = ((tol / 4) > 500) ? 500 : tol / 4; 1894 unsigned long intv = ((tol / 4) > 500) ? 500 : tol / 4;
diff --git a/net/tipc/link.h b/net/tipc/link.h
index 5b9a17f26280..34d3f55c4cea 100644
--- a/net/tipc/link.h
+++ b/net/tipc/link.h
@@ -131,8 +131,10 @@ struct tipc_stats {
131 * @next_in_no: next sequence number to expect for inbound messages 131 * @next_in_no: next sequence number to expect for inbound messages
132 * @deferred_queue: deferred queue saved OOS b'cast message received from node 132 * @deferred_queue: deferred queue saved OOS b'cast message received from node
133 * @unacked_window: # of inbound messages rx'd without ack'ing back to peer 133 * @unacked_window: # of inbound messages rx'd without ack'ing back to peer
134 * @inputq: buffer queue for messages to be delivered upwards
135 * @namedq: buffer queue for name table messages to be delivered upwards
134 * @next_out: ptr to first unsent outbound message in queue 136 * @next_out: ptr to first unsent outbound message in queue
135 * @waiting_sks: linked list of sockets waiting for link congestion to abate 137 * @wakeupq: linked list of wakeup msgs waiting for link congestion to abate
136 * @long_msg_seq_no: next identifier to use for outbound fragmented messages 138 * @long_msg_seq_no: next identifier to use for outbound fragmented messages
137 * @reasm_buf: head of partially reassembled inbound message fragments 139 * @reasm_buf: head of partially reassembled inbound message fragments
138 * @stats: collects statistics regarding link activity 140 * @stats: collects statistics regarding link activity
@@ -184,10 +186,12 @@ struct tipc_link {
184 u32 next_in_no; 186 u32 next_in_no;
185 struct sk_buff_head deferred_queue; 187 struct sk_buff_head deferred_queue;
186 u32 unacked_window; 188 u32 unacked_window;
189 struct sk_buff_head inputq;
190 struct sk_buff_head namedq;
187 191
188 /* Congestion handling */ 192 /* Congestion handling */
189 struct sk_buff *next_out; 193 struct sk_buff *next_out;
190 struct sk_buff_head waiting_sks; 194 struct sk_buff_head wakeupq;
191 195
192 /* Fragmentation/reassembly */ 196 /* Fragmentation/reassembly */
193 u32 long_msg_seq_no; 197 u32 long_msg_seq_no;
@@ -228,7 +232,6 @@ int tipc_link_xmit(struct net *net, struct sk_buff_head *list, u32 dest,
228 u32 selector); 232 u32 selector);
229int __tipc_link_xmit(struct net *net, struct tipc_link *link, 233int __tipc_link_xmit(struct net *net, struct tipc_link *link,
230 struct sk_buff_head *list); 234 struct sk_buff_head *list);
231void tipc_link_bundle_rcv(struct net *net, struct sk_buff *buf);
232void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int prob, 235void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int prob,
233 u32 gap, u32 tolerance, u32 priority, u32 acked_mtu); 236 u32 gap, u32 tolerance, u32 priority, u32 acked_mtu);
234void tipc_link_push_packets(struct tipc_link *l_ptr); 237void tipc_link_push_packets(struct tipc_link *l_ptr);
@@ -244,6 +247,7 @@ int tipc_nl_link_get(struct sk_buff *skb, struct genl_info *info);
244int tipc_nl_link_set(struct sk_buff *skb, struct genl_info *info); 247int tipc_nl_link_set(struct sk_buff *skb, struct genl_info *info);
245int tipc_nl_link_reset_stats(struct sk_buff *skb, struct genl_info *info); 248int tipc_nl_link_reset_stats(struct sk_buff *skb, struct genl_info *info);
246int tipc_nl_parse_link_prop(struct nlattr *prop, struct nlattr *props[]); 249int tipc_nl_parse_link_prop(struct nlattr *prop, struct nlattr *props[]);
250void link_prepare_wakeup(struct tipc_link *l);
247 251
248/* 252/*
249 * Link sequence number manipulation routines (uses modulo 2**16 arithmetic) 253 * Link sequence number manipulation routines (uses modulo 2**16 arithmetic)
diff --git a/net/tipc/msg.c b/net/tipc/msg.c
index 697223a21240..b6eb90cd3ef7 100644
--- a/net/tipc/msg.c
+++ b/net/tipc/msg.c
@@ -327,6 +327,40 @@ bool tipc_msg_bundle(struct sk_buff_head *list, struct sk_buff *skb, u32 mtu)
327} 327}
328 328
329/** 329/**
330 * tipc_msg_extract(): extract bundled inner packet from buffer
331 * @skb: linear outer buffer, to be extracted from.
332 * @iskb: extracted inner buffer, to be returned
333 * @pos: position of msg to be extracted. Returns with pointer of next msg
334 * Consumes outer buffer when last packet extracted
335 * Returns true when when there is an extracted buffer, otherwise false
336 */
337bool tipc_msg_extract(struct sk_buff *skb, struct sk_buff **iskb, int *pos)
338{
339 struct tipc_msg *msg = buf_msg(skb);
340 int imsz;
341 struct tipc_msg *imsg = (struct tipc_msg *)(msg_data(msg) + *pos);
342
343 /* Is there space left for shortest possible message? */
344 if (*pos > (msg_data_sz(msg) - SHORT_H_SIZE))
345 goto none;
346 imsz = msg_size(imsg);
347
348 /* Is there space left for current message ? */
349 if ((*pos + imsz) > msg_data_sz(msg))
350 goto none;
351 *iskb = tipc_buf_acquire(imsz);
352 if (!*iskb)
353 goto none;
354 skb_copy_to_linear_data(*iskb, imsg, imsz);
355 *pos += align(imsz);
356 return true;
357none:
358 kfree_skb(skb);
359 *iskb = NULL;
360 return false;
361}
362
363/**
330 * tipc_msg_make_bundle(): Create bundle buf and append message to its tail 364 * tipc_msg_make_bundle(): Create bundle buf and append message to its tail
331 * @list: the buffer chain 365 * @list: the buffer chain
332 * @skb: buffer to be appended and replaced 366 * @skb: buffer to be appended and replaced
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index 60702992933d..ab467261bd9d 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -45,6 +45,7 @@
45 * Note: Some items are also used with TIPC internal message headers 45 * Note: Some items are also used with TIPC internal message headers
46 */ 46 */
47#define TIPC_VERSION 2 47#define TIPC_VERSION 2
48struct plist;
48 49
49/* 50/*
50 * Payload message users are defined in TIPC's public API: 51 * Payload message users are defined in TIPC's public API:
@@ -759,10 +760,82 @@ int tipc_buf_append(struct sk_buff **headbuf, struct sk_buff **buf);
759bool tipc_msg_bundle(struct sk_buff_head *list, struct sk_buff *skb, u32 mtu); 760bool tipc_msg_bundle(struct sk_buff_head *list, struct sk_buff *skb, u32 mtu);
760bool tipc_msg_make_bundle(struct sk_buff_head *list, 761bool tipc_msg_make_bundle(struct sk_buff_head *list,
761 struct sk_buff *skb, u32 mtu, u32 dnode); 762 struct sk_buff *skb, u32 mtu, u32 dnode);
763bool tipc_msg_extract(struct sk_buff *skb, struct sk_buff **iskb, int *pos);
762int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m, 764int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m,
763 int offset, int dsz, int mtu, struct sk_buff_head *list); 765 int offset, int dsz, int mtu, struct sk_buff_head *list);
764bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb, u32 *dnode, 766bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb, u32 *dnode,
765 int *err); 767 int *err);
766struct sk_buff *tipc_msg_reassemble(struct sk_buff_head *list); 768struct sk_buff *tipc_msg_reassemble(struct sk_buff_head *list);
767 769
770/* tipc_skb_peek_port(): find a destination port, ignoring all destinations
771 * up to and including 'filter'.
772 * Note: ignoring previously tried destinations minimizes the risk of
773 * contention on the socket lock
774 * @list: list to be peeked in
775 * @filter: last destination to be ignored from search
776 * Returns a destination port number, of applicable.
777 */
778static inline u32 tipc_skb_peek_port(struct sk_buff_head *list, u32 filter)
779{
780 struct sk_buff *skb;
781 u32 dport = 0;
782 bool ignore = true;
783
784 spin_lock_bh(&list->lock);
785 skb_queue_walk(list, skb) {
786 dport = msg_destport(buf_msg(skb));
787 if (!filter || skb_queue_is_last(list, skb))
788 break;
789 if (dport == filter)
790 ignore = false;
791 else if (!ignore)
792 break;
793 }
794 spin_unlock_bh(&list->lock);
795 return dport;
796}
797
798/* tipc_skb_dequeue(): unlink first buffer with dest 'dport' from list
799 * @list: list to be unlinked from
800 * @dport: selection criteria for buffer to unlink
801 */
802static inline struct sk_buff *tipc_skb_dequeue(struct sk_buff_head *list,
803 u32 dport)
804{
805 struct sk_buff *_skb, *tmp, *skb = NULL;
806
807 spin_lock_bh(&list->lock);
808 skb_queue_walk_safe(list, _skb, tmp) {
809 if (msg_destport(buf_msg(_skb)) == dport) {
810 __skb_unlink(_skb, list);
811 skb = _skb;
812 break;
813 }
814 }
815 spin_unlock_bh(&list->lock);
816 return skb;
817}
818
819/* tipc_skb_queue_tail(): add buffer to tail of list;
820 * @list: list to be appended to
821 * @skb: buffer to append. Always appended
822 * @dport: the destination port of the buffer
823 * returns true if dport differs from previous destination
824 */
825static inline bool tipc_skb_queue_tail(struct sk_buff_head *list,
826 struct sk_buff *skb, u32 dport)
827{
828 struct sk_buff *_skb = NULL;
829 bool rv = false;
830
831 spin_lock_bh(&list->lock);
832 _skb = skb_peek_tail(list);
833 if (!_skb || (msg_destport(buf_msg(_skb)) != dport) ||
834 (skb_queue_len(list) > 32))
835 rv = true;
836 __skb_queue_tail(list, skb);
837 spin_unlock_bh(&list->lock);
838 return rv;
839}
840
768#endif 841#endif
diff --git a/net/tipc/name_distr.c b/net/tipc/name_distr.c
index dd8564cd9dbb..fcb07915aaac 100644
--- a/net/tipc/name_distr.c
+++ b/net/tipc/name_distr.c
@@ -381,25 +381,34 @@ void tipc_named_process_backlog(struct net *net)
381} 381}
382 382
383/** 383/**
384 * tipc_named_rcv - process name table update message sent by another node 384 * tipc_named_rcv - process name table update messages sent by another node
385 */ 385 */
386void tipc_named_rcv(struct net *net, struct sk_buff *buf) 386void tipc_named_rcv(struct net *net, struct sk_buff_head *inputq)
387{ 387{
388 struct tipc_net *tn = net_generic(net, tipc_net_id); 388 struct tipc_net *tn = net_generic(net, tipc_net_id);
389 struct tipc_msg *msg = buf_msg(buf); 389 struct tipc_msg *msg;
390 struct distr_item *item = (struct distr_item *)msg_data(msg); 390 struct distr_item *item;
391 u32 count = msg_data_sz(msg) / ITEM_SIZE; 391 uint count;
392 u32 node = msg_orignode(msg); 392 u32 node;
393 struct sk_buff *skb;
394 int mtype;
393 395
394 spin_lock_bh(&tn->nametbl_lock); 396 spin_lock_bh(&tn->nametbl_lock);
395 while (count--) { 397 for (skb = skb_dequeue(inputq); skb; skb = skb_dequeue(inputq)) {
396 if (!tipc_update_nametbl(net, item, node, msg_type(msg))) 398 msg = buf_msg(skb);
397 tipc_named_add_backlog(item, msg_type(msg), node); 399 mtype = msg_type(msg);
398 item++; 400 item = (struct distr_item *)msg_data(msg);
401 count = msg_data_sz(msg) / ITEM_SIZE;
402 node = msg_orignode(msg);
403 while (count--) {
404 if (!tipc_update_nametbl(net, item, node, mtype))
405 tipc_named_add_backlog(item, mtype, node);
406 item++;
407 }
408 kfree_skb(skb);
409 tipc_named_process_backlog(net);
399 } 410 }
400 tipc_named_process_backlog(net);
401 spin_unlock_bh(&tn->nametbl_lock); 411 spin_unlock_bh(&tn->nametbl_lock);
402 kfree_skb(buf);
403} 412}
404 413
405/** 414/**
diff --git a/net/tipc/name_distr.h b/net/tipc/name_distr.h
index 5ec10b59527b..dd2d9fd80da2 100644
--- a/net/tipc/name_distr.h
+++ b/net/tipc/name_distr.h
@@ -71,7 +71,7 @@ struct sk_buff *tipc_named_publish(struct net *net, struct publication *publ);
71struct sk_buff *tipc_named_withdraw(struct net *net, struct publication *publ); 71struct sk_buff *tipc_named_withdraw(struct net *net, struct publication *publ);
72void named_cluster_distribute(struct net *net, struct sk_buff *buf); 72void named_cluster_distribute(struct net *net, struct sk_buff *buf);
73void tipc_named_node_up(struct net *net, u32 dnode); 73void tipc_named_node_up(struct net *net, u32 dnode);
74void tipc_named_rcv(struct net *net, struct sk_buff *buf); 74void tipc_named_rcv(struct net *net, struct sk_buff_head *msg_queue);
75void tipc_named_reinit(struct net *net); 75void tipc_named_reinit(struct net *net);
76void tipc_named_process_backlog(struct net *net); 76void tipc_named_process_backlog(struct net *net);
77void tipc_publ_notify(struct net *net, struct list_head *nsub_list, u32 addr); 77void tipc_publ_notify(struct net *net, struct list_head *nsub_list, u32 addr);
diff --git a/net/tipc/node.c b/net/tipc/node.c
index 1c409c45f0fe..dcb83d9b2193 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -111,11 +111,8 @@ struct tipc_node *tipc_node_create(struct net *net, u32 addr)
111 INIT_LIST_HEAD(&n_ptr->list); 111 INIT_LIST_HEAD(&n_ptr->list);
112 INIT_LIST_HEAD(&n_ptr->publ_list); 112 INIT_LIST_HEAD(&n_ptr->publ_list);
113 INIT_LIST_HEAD(&n_ptr->conn_sks); 113 INIT_LIST_HEAD(&n_ptr->conn_sks);
114 skb_queue_head_init(&n_ptr->waiting_sks);
115 __skb_queue_head_init(&n_ptr->bclink.deferred_queue); 114 __skb_queue_head_init(&n_ptr->bclink.deferred_queue);
116
117 hlist_add_head_rcu(&n_ptr->hash, &tn->node_htable[tipc_hashfn(addr)]); 115 hlist_add_head_rcu(&n_ptr->hash, &tn->node_htable[tipc_hashfn(addr)]);
118
119 list_for_each_entry_rcu(temp_node, &tn->node_list, list) { 116 list_for_each_entry_rcu(temp_node, &tn->node_list, list) {
120 if (n_ptr->addr < temp_node->addr) 117 if (n_ptr->addr < temp_node->addr)
121 break; 118 break;
@@ -201,19 +198,22 @@ void tipc_node_abort_sock_conns(struct net *net, struct list_head *conns)
201{ 198{
202 struct tipc_net *tn = net_generic(net, tipc_net_id); 199 struct tipc_net *tn = net_generic(net, tipc_net_id);
203 struct tipc_sock_conn *conn, *safe; 200 struct tipc_sock_conn *conn, *safe;
204 struct sk_buff *buf; 201 struct sk_buff *skb;
202 struct sk_buff_head skbs;
205 203
204 skb_queue_head_init(&skbs);
206 list_for_each_entry_safe(conn, safe, conns, list) { 205 list_for_each_entry_safe(conn, safe, conns, list) {
207 buf = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, 206 skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE,
208 TIPC_CONN_MSG, SHORT_H_SIZE, 0, 207 TIPC_CONN_MSG, SHORT_H_SIZE, 0,
209 tn->own_addr, conn->peer_node, 208 tn->own_addr, conn->peer_node,
210 conn->port, conn->peer_port, 209 conn->port, conn->peer_port,
211 TIPC_ERR_NO_NODE); 210 TIPC_ERR_NO_NODE);
212 if (likely(buf)) 211 if (likely(skb))
213 tipc_sk_rcv(net, buf); 212 skb_queue_tail(&skbs, skb);
214 list_del(&conn->list); 213 list_del(&conn->list);
215 kfree(conn); 214 kfree(conn);
216 } 215 }
216 tipc_sk_rcv(net, &skbs);
217} 217}
218 218
219/** 219/**
@@ -568,37 +568,36 @@ void tipc_node_unlock(struct tipc_node *node)
568 struct net *net = node->net; 568 struct net *net = node->net;
569 LIST_HEAD(nsub_list); 569 LIST_HEAD(nsub_list);
570 LIST_HEAD(conn_sks); 570 LIST_HEAD(conn_sks);
571 struct sk_buff_head waiting_sks;
572 u32 addr = 0; 571 u32 addr = 0;
573 int flags = node->action_flags; 572 u32 flags = node->action_flags;
574 u32 link_id = 0; 573 u32 link_id = 0;
574 struct sk_buff_head *inputq = node->inputq;
575 struct sk_buff_head *namedq = node->inputq;
575 576
576 if (likely(!flags)) { 577 if (likely(!flags || (flags == TIPC_MSG_EVT))) {
578 node->action_flags = 0;
577 spin_unlock_bh(&node->lock); 579 spin_unlock_bh(&node->lock);
580 if (flags == TIPC_MSG_EVT)
581 tipc_sk_rcv(net, inputq);
578 return; 582 return;
579 } 583 }
580 584
581 addr = node->addr; 585 addr = node->addr;
582 link_id = node->link_id; 586 link_id = node->link_id;
583 __skb_queue_head_init(&waiting_sks); 587 namedq = node->namedq;
584
585 if (flags & TIPC_WAKEUP_USERS)
586 skb_queue_splice_init(&node->waiting_sks, &waiting_sks);
587 588
588 if (flags & TIPC_NOTIFY_NODE_DOWN) { 589 if (flags & TIPC_NOTIFY_NODE_DOWN) {
589 list_replace_init(&node->publ_list, &nsub_list); 590 list_replace_init(&node->publ_list, &nsub_list);
590 list_replace_init(&node->conn_sks, &conn_sks); 591 list_replace_init(&node->conn_sks, &conn_sks);
591 } 592 }
592 node->action_flags &= ~(TIPC_WAKEUP_USERS | TIPC_NOTIFY_NODE_DOWN | 593 node->action_flags &= ~(TIPC_MSG_EVT | TIPC_NOTIFY_NODE_DOWN |
593 TIPC_NOTIFY_NODE_UP | TIPC_NOTIFY_LINK_UP | 594 TIPC_NOTIFY_NODE_UP | TIPC_NOTIFY_LINK_UP |
594 TIPC_NOTIFY_LINK_DOWN | 595 TIPC_NOTIFY_LINK_DOWN |
595 TIPC_WAKEUP_BCAST_USERS); 596 TIPC_WAKEUP_BCAST_USERS |
597 TIPC_NAMED_MSG_EVT);
596 598
597 spin_unlock_bh(&node->lock); 599 spin_unlock_bh(&node->lock);
598 600
599 while (!skb_queue_empty(&waiting_sks))
600 tipc_sk_rcv(net, __skb_dequeue(&waiting_sks));
601
602 if (!list_empty(&conn_sks)) 601 if (!list_empty(&conn_sks))
603 tipc_node_abort_sock_conns(net, &conn_sks); 602 tipc_node_abort_sock_conns(net, &conn_sks);
604 603
@@ -618,6 +617,12 @@ void tipc_node_unlock(struct tipc_node *node)
618 if (flags & TIPC_NOTIFY_LINK_DOWN) 617 if (flags & TIPC_NOTIFY_LINK_DOWN)
619 tipc_nametbl_withdraw(net, TIPC_LINK_STATE, addr, 618 tipc_nametbl_withdraw(net, TIPC_LINK_STATE, addr,
620 link_id, addr); 619 link_id, addr);
620
621 if (flags & TIPC_MSG_EVT)
622 tipc_sk_rcv(net, inputq);
623
624 if (flags & TIPC_NAMED_MSG_EVT)
625 tipc_named_rcv(net, namedq);
621} 626}
622 627
623/* Caller should hold node lock for the passed node */ 628/* Caller should hold node lock for the passed node */
diff --git a/net/tipc/node.h b/net/tipc/node.h
index 43ef88ef3035..c2b0fcf4042b 100644
--- a/net/tipc/node.h
+++ b/net/tipc/node.h
@@ -55,14 +55,15 @@
55 * TIPC_DISTRIBUTE_NAME: publish or withdraw link state name type 55 * TIPC_DISTRIBUTE_NAME: publish or withdraw link state name type
56 */ 56 */
57enum { 57enum {
58 TIPC_MSG_EVT = 1,
58 TIPC_WAIT_PEER_LINKS_DOWN = (1 << 1), 59 TIPC_WAIT_PEER_LINKS_DOWN = (1 << 1),
59 TIPC_WAIT_OWN_LINKS_DOWN = (1 << 2), 60 TIPC_WAIT_OWN_LINKS_DOWN = (1 << 2),
60 TIPC_NOTIFY_NODE_DOWN = (1 << 3), 61 TIPC_NOTIFY_NODE_DOWN = (1 << 3),
61 TIPC_NOTIFY_NODE_UP = (1 << 4), 62 TIPC_NOTIFY_NODE_UP = (1 << 4),
62 TIPC_WAKEUP_USERS = (1 << 5), 63 TIPC_WAKEUP_BCAST_USERS = (1 << 5),
63 TIPC_WAKEUP_BCAST_USERS = (1 << 6), 64 TIPC_NOTIFY_LINK_UP = (1 << 6),
64 TIPC_NOTIFY_LINK_UP = (1 << 7), 65 TIPC_NOTIFY_LINK_DOWN = (1 << 7),
65 TIPC_NOTIFY_LINK_DOWN = (1 << 8) 66 TIPC_NAMED_MSG_EVT = (1 << 8)
66}; 67};
67 68
68/** 69/**
@@ -92,6 +93,9 @@ struct tipc_node_bclink {
92 * @lock: spinlock governing access to structure 93 * @lock: spinlock governing access to structure
93 * @net: the applicable net namespace 94 * @net: the applicable net namespace
94 * @hash: links to adjacent nodes in unsorted hash chain 95 * @hash: links to adjacent nodes in unsorted hash chain
96 * @inputq: pointer to input queue containing messages for msg event
97 * @namedq: pointer to name table input queue with name table messages
98 * @curr_link: the link holding the node lock, if any
95 * @active_links: pointers to active links to node 99 * @active_links: pointers to active links to node
96 * @links: pointers to all links to node 100 * @links: pointers to all links to node
97 * @action_flags: bit mask of different types of node actions 101 * @action_flags: bit mask of different types of node actions
@@ -109,10 +113,12 @@ struct tipc_node {
109 spinlock_t lock; 113 spinlock_t lock;
110 struct net *net; 114 struct net *net;
111 struct hlist_node hash; 115 struct hlist_node hash;
116 struct sk_buff_head *inputq;
117 struct sk_buff_head *namedq;
112 struct tipc_link *active_links[2]; 118 struct tipc_link *active_links[2];
113 u32 act_mtus[2]; 119 u32 act_mtus[2];
114 struct tipc_link *links[MAX_BEARERS]; 120 struct tipc_link *links[MAX_BEARERS];
115 unsigned int action_flags; 121 int action_flags;
116 struct tipc_node_bclink bclink; 122 struct tipc_node_bclink bclink;
117 struct list_head list; 123 struct list_head list;
118 int link_cnt; 124 int link_cnt;
@@ -120,7 +126,6 @@ struct tipc_node {
120 u32 signature; 126 u32 signature;
121 u32 link_id; 127 u32 link_id;
122 struct list_head publ_list; 128 struct list_head publ_list;
123 struct sk_buff_head waiting_sks;
124 struct list_head conn_sks; 129 struct list_head conn_sks;
125 struct rcu_head rcu; 130 struct rcu_head rcu;
126}; 131};
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index 611a04fb0ddc..c1a4611649ab 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -41,6 +41,7 @@
41#include "node.h" 41#include "node.h"
42#include "link.h" 42#include "link.h"
43#include "config.h" 43#include "config.h"
44#include "name_distr.h"
44#include "socket.h" 45#include "socket.h"
45 46
46#define SS_LISTENING -1 /* socket is listening */ 47#define SS_LISTENING -1 /* socket is listening */
@@ -785,10 +786,16 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff *buf)
785 struct sk_buff *b; 786 struct sk_buff *b;
786 uint i, last, dst = 0; 787 uint i, last, dst = 0;
787 u32 scope = TIPC_CLUSTER_SCOPE; 788 u32 scope = TIPC_CLUSTER_SCOPE;
789 struct sk_buff_head msgs;
788 790
789 if (in_own_node(net, msg_orignode(msg))) 791 if (in_own_node(net, msg_orignode(msg)))
790 scope = TIPC_NODE_SCOPE; 792 scope = TIPC_NODE_SCOPE;
791 793
794 if (unlikely(!msg_mcast(msg))) {
795 pr_warn("Received non-multicast msg in multicast\n");
796 kfree_skb(buf);
797 goto exit;
798 }
792 /* Create destination port list: */ 799 /* Create destination port list: */
793 tipc_nametbl_mc_translate(net, msg_nametype(msg), msg_namelower(msg), 800 tipc_nametbl_mc_translate(net, msg_nametype(msg), msg_namelower(msg),
794 msg_nameupper(msg), scope, &dports); 801 msg_nameupper(msg), scope, &dports);
@@ -806,9 +813,12 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff *buf)
806 continue; 813 continue;
807 } 814 }
808 msg_set_destport(msg, item->ports[i]); 815 msg_set_destport(msg, item->ports[i]);
809 tipc_sk_rcv(net, b); 816 skb_queue_head_init(&msgs);
817 skb_queue_tail(&msgs, b);
818 tipc_sk_rcv(net, &msgs);
810 } 819 }
811 } 820 }
821exit:
812 tipc_port_list_free(&dports); 822 tipc_port_list_free(&dports);
813} 823}
814 824
@@ -1760,71 +1770,99 @@ static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb)
1760} 1770}
1761 1771
1762/** 1772/**
1763 * tipc_sk_enqueue_skb - enqueue buffer to socket or backlog queue 1773 * tipc_sk_enqueue - extract all buffers with destination 'dport' from
1764 * @sk: socket 1774 * inputq and try adding them to socket or backlog queue
1765 * @skb: pointer to message. Set to NULL if buffer is consumed. 1775 * @inputq: list of incoming buffers with potentially different destinations
1766 * @dnode: if buffer should be forwarded/returned, send to this node 1776 * @sk: socket where the buffers should be enqueued
1777 * @dport: port number for the socket
1778 * @_skb: returned buffer to be forwarded or rejected, if applicable
1767 * 1779 *
1768 * Caller must hold socket lock 1780 * Caller must hold socket lock
1769 * 1781 *
1770 * Returns TIPC_OK (0) or -tipc error code 1782 * Returns TIPC_OK if all buffers enqueued, otherwise -TIPC_ERR_OVERLOAD
1783 * or -TIPC_ERR_NO_PORT
1771 */ 1784 */
1772static int tipc_sk_enqueue_skb(struct sock *sk, struct sk_buff **skb) 1785static int tipc_sk_enqueue(struct sk_buff_head *inputq, struct sock *sk,
1786 u32 dport, struct sk_buff **_skb)
1773{ 1787{
1774 unsigned int lim; 1788 unsigned int lim;
1775 atomic_t *dcnt; 1789 atomic_t *dcnt;
1776 1790 int err;
1777 if (unlikely(!*skb)) 1791 struct sk_buff *skb;
1778 return TIPC_OK; 1792 unsigned long time_limit = jiffies + 2;
1779 if (!sock_owned_by_user(sk)) 1793
1780 return filter_rcv(sk, skb); 1794 while (skb_queue_len(inputq)) {
1781 dcnt = &tipc_sk(sk)->dupl_rcvcnt; 1795 skb = tipc_skb_dequeue(inputq, dport);
1782 if (sk->sk_backlog.len) 1796 if (unlikely(!skb))
1783 atomic_set(dcnt, 0); 1797 return TIPC_OK;
1784 lim = rcvbuf_limit(sk, *skb) + atomic_read(dcnt); 1798 /* Return if softirq window exhausted */
1785 if (unlikely(sk_add_backlog(sk, *skb, lim))) 1799 if (unlikely(time_after_eq(jiffies, time_limit)))
1800 return TIPC_OK;
1801 if (!sock_owned_by_user(sk)) {
1802 err = filter_rcv(sk, &skb);
1803 if (likely(!skb))
1804 continue;
1805 *_skb = skb;
1806 return err;
1807 }
1808 dcnt = &tipc_sk(sk)->dupl_rcvcnt;
1809 if (sk->sk_backlog.len)
1810 atomic_set(dcnt, 0);
1811 lim = rcvbuf_limit(sk, skb) + atomic_read(dcnt);
1812 if (likely(!sk_add_backlog(sk, skb, lim)))
1813 continue;
1814 *_skb = skb;
1786 return -TIPC_ERR_OVERLOAD; 1815 return -TIPC_ERR_OVERLOAD;
1787 *skb = NULL; 1816 }
1788 return TIPC_OK; 1817 return TIPC_OK;
1789} 1818}
1790 1819
1791/** 1820/**
1792 * tipc_sk_rcv - handle incoming message 1821 * tipc_sk_rcv - handle a chain of incoming buffers
1793 * @skb: buffer containing arriving message 1822 * @inputq: buffer list containing the buffers
1794 * Consumes buffer 1823 * Consumes all buffers in list until inputq is empty
1795 * Returns 0 if success, or errno: -EHOSTUNREACH 1824 * Note: may be called in multiple threads referring to the same queue
1825 * Returns 0 if last buffer was accepted, otherwise -EHOSTUNREACH
1826 * Only node local calls check the return value, sending single-buffer queues
1796 */ 1827 */
1797int tipc_sk_rcv(struct net *net, struct sk_buff *skb) 1828int tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq)
1798{ 1829{
1830 u32 dnode, dport = 0;
1831 int err = -TIPC_ERR_NO_PORT;
1832 struct sk_buff *skb;
1799 struct tipc_sock *tsk; 1833 struct tipc_sock *tsk;
1800 struct tipc_net *tn; 1834 struct tipc_net *tn;
1801 struct sock *sk; 1835 struct sock *sk;
1802 u32 dport = msg_destport(buf_msg(skb));
1803 int err = -TIPC_ERR_NO_PORT;
1804 u32 dnode;
1805 1836
1806 /* Find destination */ 1837 while (skb_queue_len(inputq)) {
1807 tsk = tipc_sk_lookup(net, dport); 1838 skb = NULL;
1808 if (likely(tsk)) { 1839 dport = tipc_skb_peek_port(inputq, dport);
1809 sk = &tsk->sk; 1840 tsk = tipc_sk_lookup(net, dport);
1810 spin_lock_bh(&sk->sk_lock.slock); 1841 if (likely(tsk)) {
1811 err = tipc_sk_enqueue_skb(sk, &skb); 1842 sk = &tsk->sk;
1812 spin_unlock_bh(&sk->sk_lock.slock); 1843 if (likely(spin_trylock_bh(&sk->sk_lock.slock))) {
1813 sock_put(sk); 1844 err = tipc_sk_enqueue(inputq, sk, dport, &skb);
1814 } 1845 spin_unlock_bh(&sk->sk_lock.slock);
1815 if (likely(!skb)) 1846 dport = 0;
1816 return 0; 1847 }
1817 if (tipc_msg_lookup_dest(net, skb, &dnode, &err)) 1848 sock_put(sk);
1818 goto xmit; 1849 } else {
1819 if (!err) { 1850 skb = tipc_skb_dequeue(inputq, dport);
1820 dnode = msg_destnode(buf_msg(skb)); 1851 }
1821 goto xmit; 1852 if (likely(!skb))
1822 } 1853 continue;
1823 tn = net_generic(net, tipc_net_id); 1854 if (tipc_msg_lookup_dest(net, skb, &dnode, &err))
1824 if (!tipc_msg_reverse(tn->own_addr, skb, &dnode, -err)) 1855 goto xmit;
1825 return -EHOSTUNREACH; 1856 if (!err) {
1857 dnode = msg_destnode(buf_msg(skb));
1858 goto xmit;
1859 }
1860 tn = net_generic(net, tipc_net_id);
1861 if (!tipc_msg_reverse(tn->own_addr, skb, &dnode, -err))
1862 continue;
1826xmit: 1863xmit:
1827 tipc_link_xmit_skb(net, skb, dnode, dport); 1864 tipc_link_xmit_skb(net, skb, dnode, dport);
1865 }
1828 return err ? -EHOSTUNREACH : 0; 1866 return err ? -EHOSTUNREACH : 0;
1829} 1867}
1830 1868
diff --git a/net/tipc/socket.h b/net/tipc/socket.h
index f56c3fded51f..e3dbdc0e1be7 100644
--- a/net/tipc/socket.h
+++ b/net/tipc/socket.h
@@ -49,7 +49,7 @@ int tipc_sock_create_local(struct net *net, int type, struct socket **res);
49void tipc_sock_release_local(struct socket *sock); 49void tipc_sock_release_local(struct socket *sock);
50int tipc_sock_accept_local(struct socket *sock, struct socket **newsock, 50int tipc_sock_accept_local(struct socket *sock, struct socket **newsock,
51 int flags); 51 int flags);
52int tipc_sk_rcv(struct net *net, struct sk_buff *buf); 52int tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq);
53struct sk_buff *tipc_sk_socks_show(struct net *net); 53struct sk_buff *tipc_sk_socks_show(struct net *net);
54void tipc_sk_mcast_rcv(struct net *net, struct sk_buff *buf); 54void tipc_sk_mcast_rcv(struct net *net, struct sk_buff *buf);
55void tipc_sk_reinit(struct net *net); 55void tipc_sk_reinit(struct net *net);