summaryrefslogtreecommitdiffstats
path: root/net/tipc
diff options
context:
space:
mode:
Diffstat (limited to 'net/tipc')
-rw-r--r--net/tipc/bcast.c20
-rw-r--r--net/tipc/link.c247
-rw-r--r--net/tipc/link.h10
-rw-r--r--net/tipc/msg.c34
-rw-r--r--net/tipc/msg.h73
-rw-r--r--net/tipc/name_distr.c33
-rw-r--r--net/tipc/name_distr.h2
-rw-r--r--net/tipc/node.c43
-rw-r--r--net/tipc/node.h17
-rw-r--r--net/tipc/socket.c132
-rw-r--r--net/tipc/socket.h2
11 files changed, 372 insertions, 241 deletions
diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index 3b886eb35c87..2dfaf272928a 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -189,10 +189,8 @@ static void bclink_retransmit_pkt(struct tipc_net *tn, u32 after, u32 to)
189void tipc_bclink_wakeup_users(struct net *net) 189void tipc_bclink_wakeup_users(struct net *net)
190{ 190{
191 struct tipc_net *tn = net_generic(net, tipc_net_id); 191 struct tipc_net *tn = net_generic(net, tipc_net_id);
192 struct sk_buff *skb;
193 192
194 while ((skb = skb_dequeue(&tn->bclink->link.waiting_sks))) 193 tipc_sk_rcv(net, &tn->bclink->link.wakeupq);
195 tipc_sk_rcv(net, skb);
196} 194}
197 195
198/** 196/**
@@ -271,9 +269,8 @@ void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked)
271 tipc_link_push_packets(tn->bcl); 269 tipc_link_push_packets(tn->bcl);
272 bclink_set_last_sent(net); 270 bclink_set_last_sent(net);
273 } 271 }
274 if (unlikely(released && !skb_queue_empty(&tn->bcl->waiting_sks))) 272 if (unlikely(released && !skb_queue_empty(&tn->bcl->wakeupq)))
275 n_ptr->action_flags |= TIPC_WAKEUP_BCAST_USERS; 273 n_ptr->action_flags |= TIPC_WAKEUP_BCAST_USERS;
276
277exit: 274exit:
278 tipc_bclink_unlock(net); 275 tipc_bclink_unlock(net);
279} 276}
@@ -450,6 +447,9 @@ void tipc_bclink_rcv(struct net *net, struct sk_buff *buf)
450 u32 next_in; 447 u32 next_in;
451 u32 seqno; 448 u32 seqno;
452 int deferred = 0; 449 int deferred = 0;
450 int pos = 0;
451 struct sk_buff *iskb;
452 struct sk_buff_head msgs;
453 453
454 /* Screen out unwanted broadcast messages */ 454 /* Screen out unwanted broadcast messages */
455 if (msg_mc_netid(msg) != tn->net_id) 455 if (msg_mc_netid(msg) != tn->net_id)
@@ -506,7 +506,8 @@ receive:
506 bcl->stats.recv_bundled += msg_msgcnt(msg); 506 bcl->stats.recv_bundled += msg_msgcnt(msg);
507 tipc_bclink_unlock(net); 507 tipc_bclink_unlock(net);
508 tipc_node_unlock(node); 508 tipc_node_unlock(node);
509 tipc_link_bundle_rcv(net, buf); 509 while (tipc_msg_extract(buf, &iskb, &pos))
510 tipc_sk_mcast_rcv(net, iskb);
510 } else if (msg_user(msg) == MSG_FRAGMENTER) { 511 } else if (msg_user(msg) == MSG_FRAGMENTER) {
511 tipc_buf_append(&node->bclink.reasm_buf, &buf); 512 tipc_buf_append(&node->bclink.reasm_buf, &buf);
512 if (unlikely(!buf && !node->bclink.reasm_buf)) 513 if (unlikely(!buf && !node->bclink.reasm_buf))
@@ -527,7 +528,9 @@ receive:
527 bclink_accept_pkt(node, seqno); 528 bclink_accept_pkt(node, seqno);
528 tipc_bclink_unlock(net); 529 tipc_bclink_unlock(net);
529 tipc_node_unlock(node); 530 tipc_node_unlock(node);
530 tipc_named_rcv(net, buf); 531 skb_queue_head_init(&msgs);
532 skb_queue_tail(&msgs, buf);
533 tipc_named_rcv(net, &msgs);
531 } else { 534 } else {
532 tipc_bclink_lock(net); 535 tipc_bclink_lock(net);
533 bclink_accept_pkt(node, seqno); 536 bclink_accept_pkt(node, seqno);
@@ -944,10 +947,9 @@ int tipc_bclink_init(struct net *net)
944 spin_lock_init(&bclink->lock); 947 spin_lock_init(&bclink->lock);
945 __skb_queue_head_init(&bcl->outqueue); 948 __skb_queue_head_init(&bcl->outqueue);
946 __skb_queue_head_init(&bcl->deferred_queue); 949 __skb_queue_head_init(&bcl->deferred_queue);
947 skb_queue_head_init(&bcl->waiting_sks); 950 skb_queue_head_init(&bcl->wakeupq);
948 bcl->next_out_no = 1; 951 bcl->next_out_no = 1;
949 spin_lock_init(&bclink->node.lock); 952 spin_lock_init(&bclink->node.lock);
950 __skb_queue_head_init(&bclink->node.waiting_sks);
951 bcl->owner = &bclink->node; 953 bcl->owner = &bclink->node;
952 bcl->owner->net = net; 954 bcl->owner->net = net;
953 bcl->max_pkt = MAX_PKT_DEFAULT_MCAST; 955 bcl->max_pkt = MAX_PKT_DEFAULT_MCAST;
diff --git a/net/tipc/link.c b/net/tipc/link.c
index 41cb09aa41de..942491234099 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -113,10 +113,8 @@ static void link_reset_statistics(struct tipc_link *l_ptr);
113static void link_print(struct tipc_link *l_ptr, const char *str); 113static void link_print(struct tipc_link *l_ptr, const char *str);
114static void tipc_link_sync_xmit(struct tipc_link *l); 114static void tipc_link_sync_xmit(struct tipc_link *l);
115static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf); 115static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf);
116static int tipc_link_input(struct net *net, struct tipc_link *l, 116static void tipc_link_input(struct tipc_link *l, struct sk_buff *skb);
117 struct sk_buff *buf); 117static bool tipc_data_input(struct tipc_link *l, struct sk_buff *skb);
118static int tipc_link_prepare_input(struct net *net, struct tipc_link *l,
119 struct sk_buff **buf);
120 118
121/* 119/*
122 * Simple link routines 120 * Simple link routines
@@ -318,8 +316,9 @@ struct tipc_link *tipc_link_create(struct tipc_node *n_ptr,
318 l_ptr->next_out_no = 1; 316 l_ptr->next_out_no = 1;
319 __skb_queue_head_init(&l_ptr->outqueue); 317 __skb_queue_head_init(&l_ptr->outqueue);
320 __skb_queue_head_init(&l_ptr->deferred_queue); 318 __skb_queue_head_init(&l_ptr->deferred_queue);
321 skb_queue_head_init(&l_ptr->waiting_sks); 319 skb_queue_head_init(&l_ptr->wakeupq);
322 320 skb_queue_head_init(&l_ptr->inputq);
321 skb_queue_head_init(&l_ptr->namedq);
323 link_reset_statistics(l_ptr); 322 link_reset_statistics(l_ptr);
324 tipc_node_attach_link(n_ptr, l_ptr); 323 tipc_node_attach_link(n_ptr, l_ptr);
325 setup_timer(&l_ptr->timer, link_timeout, (unsigned long)l_ptr); 324 setup_timer(&l_ptr->timer, link_timeout, (unsigned long)l_ptr);
@@ -387,7 +386,7 @@ static bool link_schedule_user(struct tipc_link *link, u32 oport,
387 return false; 386 return false;
388 TIPC_SKB_CB(buf)->chain_sz = chain_sz; 387 TIPC_SKB_CB(buf)->chain_sz = chain_sz;
389 TIPC_SKB_CB(buf)->chain_imp = imp; 388 TIPC_SKB_CB(buf)->chain_imp = imp;
390 skb_queue_tail(&link->waiting_sks, buf); 389 skb_queue_tail(&link->wakeupq, buf);
391 link->stats.link_congs++; 390 link->stats.link_congs++;
392 return true; 391 return true;
393} 392}
@@ -398,17 +397,19 @@ static bool link_schedule_user(struct tipc_link *link, u32 oport,
398 * Move a number of waiting users, as permitted by available space in 397 * Move a number of waiting users, as permitted by available space in
399 * the send queue, from link wait queue to node wait queue for wakeup 398 * the send queue, from link wait queue to node wait queue for wakeup
400 */ 399 */
401static void link_prepare_wakeup(struct tipc_link *link) 400void link_prepare_wakeup(struct tipc_link *link)
402{ 401{
403 uint pend_qsz = skb_queue_len(&link->outqueue); 402 uint pend_qsz = skb_queue_len(&link->outqueue);
404 struct sk_buff *skb, *tmp; 403 struct sk_buff *skb, *tmp;
405 404
406 skb_queue_walk_safe(&link->waiting_sks, skb, tmp) { 405 skb_queue_walk_safe(&link->wakeupq, skb, tmp) {
407 if (pend_qsz >= link->queue_limit[TIPC_SKB_CB(skb)->chain_imp]) 406 if (pend_qsz >= link->queue_limit[TIPC_SKB_CB(skb)->chain_imp])
408 break; 407 break;
409 pend_qsz += TIPC_SKB_CB(skb)->chain_sz; 408 pend_qsz += TIPC_SKB_CB(skb)->chain_sz;
410 skb_unlink(skb, &link->waiting_sks); 409 skb_unlink(skb, &link->wakeupq);
411 skb_queue_tail(&link->owner->waiting_sks, skb); 410 skb_queue_tail(&link->inputq, skb);
411 link->owner->inputq = &link->inputq;
412 link->owner->action_flags |= TIPC_MSG_EVT;
412 } 413 }
413} 414}
414 415
@@ -461,13 +462,13 @@ void tipc_link_reset(struct tipc_link *l_ptr)
461 l_ptr->exp_msg_count = START_CHANGEOVER; 462 l_ptr->exp_msg_count = START_CHANGEOVER;
462 } 463 }
463 464
464 /* Clean up all queues: */ 465 /* Clean up all queues, except inputq: */
465 __skb_queue_purge(&l_ptr->outqueue); 466 __skb_queue_purge(&l_ptr->outqueue);
466 __skb_queue_purge(&l_ptr->deferred_queue); 467 __skb_queue_purge(&l_ptr->deferred_queue);
467 if (!skb_queue_empty(&l_ptr->waiting_sks)) { 468 skb_queue_splice_init(&l_ptr->wakeupq, &l_ptr->inputq);
468 skb_queue_splice_init(&l_ptr->waiting_sks, &owner->waiting_sks); 469 if (!skb_queue_empty(&l_ptr->inputq))
469 owner->action_flags |= TIPC_WAKEUP_USERS; 470 owner->action_flags |= TIPC_MSG_EVT;
470 } 471 owner->inputq = &l_ptr->inputq;
471 l_ptr->next_out = NULL; 472 l_ptr->next_out = NULL;
472 l_ptr->unacked_window = 0; 473 l_ptr->unacked_window = 0;
473 l_ptr->checkpoint = 1; 474 l_ptr->checkpoint = 1;
@@ -795,7 +796,7 @@ int __tipc_link_xmit(struct net *net, struct tipc_link *link,
795 796
796static void skb2list(struct sk_buff *skb, struct sk_buff_head *list) 797static void skb2list(struct sk_buff *skb, struct sk_buff_head *list)
797{ 798{
798 __skb_queue_head_init(list); 799 skb_queue_head_init(list);
799 __skb_queue_tail(list, skb); 800 __skb_queue_tail(list, skb);
800} 801}
801 802
@@ -841,19 +842,13 @@ int tipc_link_xmit(struct net *net, struct sk_buff_head *list, u32 dnode,
841 rc = __tipc_link_xmit(net, link, list); 842 rc = __tipc_link_xmit(net, link, list);
842 tipc_node_unlock(node); 843 tipc_node_unlock(node);
843 } 844 }
844
845 if (link) 845 if (link)
846 return rc; 846 return rc;
847 847
848 if (likely(in_own_node(net, dnode))) { 848 if (likely(in_own_node(net, dnode)))
849 /* As a node local message chain never contains more than one 849 return tipc_sk_rcv(net, list);
850 * buffer, we just need to dequeue one SKB buffer from the
851 * head list.
852 */
853 return tipc_sk_rcv(net, __skb_dequeue(list));
854 }
855 __skb_queue_purge(list);
856 850
851 __skb_queue_purge(list);
857 return rc; 852 return rc;
858} 853}
859 854
@@ -1162,7 +1157,7 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b_ptr)
1162 /* Locate unicast link endpoint that should handle message */ 1157 /* Locate unicast link endpoint that should handle message */
1163 l_ptr = n_ptr->links[b_ptr->identity]; 1158 l_ptr = n_ptr->links[b_ptr->identity];
1164 if (unlikely(!l_ptr)) 1159 if (unlikely(!l_ptr))
1165 goto unlock_discard; 1160 goto unlock;
1166 1161
1167 /* Verify that communication with node is currently allowed */ 1162 /* Verify that communication with node is currently allowed */
1168 if ((n_ptr->action_flags & TIPC_WAIT_PEER_LINKS_DOWN) && 1163 if ((n_ptr->action_flags & TIPC_WAIT_PEER_LINKS_DOWN) &&
@@ -1173,7 +1168,7 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b_ptr)
1173 n_ptr->action_flags &= ~TIPC_WAIT_PEER_LINKS_DOWN; 1168 n_ptr->action_flags &= ~TIPC_WAIT_PEER_LINKS_DOWN;
1174 1169
1175 if (tipc_node_blocked(n_ptr)) 1170 if (tipc_node_blocked(n_ptr))
1176 goto unlock_discard; 1171 goto unlock;
1177 1172
1178 /* Validate message sequence number info */ 1173 /* Validate message sequence number info */
1179 seq_no = msg_seqno(msg); 1174 seq_no = msg_seqno(msg);
@@ -1197,18 +1192,16 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b_ptr)
1197 if (unlikely(l_ptr->next_out)) 1192 if (unlikely(l_ptr->next_out))
1198 tipc_link_push_packets(l_ptr); 1193 tipc_link_push_packets(l_ptr);
1199 1194
1200 if (released && !skb_queue_empty(&l_ptr->waiting_sks)) { 1195 if (released && !skb_queue_empty(&l_ptr->wakeupq))
1201 link_prepare_wakeup(l_ptr); 1196 link_prepare_wakeup(l_ptr);
1202 l_ptr->owner->action_flags |= TIPC_WAKEUP_USERS;
1203 }
1204 1197
1205 /* Process the incoming packet */ 1198 /* Process the incoming packet */
1206 if (unlikely(!link_working_working(l_ptr))) { 1199 if (unlikely(!link_working_working(l_ptr))) {
1207 if (msg_user(msg) == LINK_PROTOCOL) { 1200 if (msg_user(msg) == LINK_PROTOCOL) {
1208 tipc_link_proto_rcv(l_ptr, skb); 1201 tipc_link_proto_rcv(l_ptr, skb);
1209 link_retrieve_defq(l_ptr, &head); 1202 link_retrieve_defq(l_ptr, &head);
1210 tipc_node_unlock(n_ptr); 1203 skb = NULL;
1211 continue; 1204 goto unlock;
1212 } 1205 }
1213 1206
1214 /* Traffic message. Conditionally activate link */ 1207 /* Traffic message. Conditionally activate link */
@@ -1217,18 +1210,18 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b_ptr)
1217 if (link_working_working(l_ptr)) { 1210 if (link_working_working(l_ptr)) {
1218 /* Re-insert buffer in front of queue */ 1211 /* Re-insert buffer in front of queue */
1219 __skb_queue_head(&head, skb); 1212 __skb_queue_head(&head, skb);
1220 tipc_node_unlock(n_ptr); 1213 skb = NULL;
1221 continue; 1214 goto unlock;
1222 } 1215 }
1223 goto unlock_discard; 1216 goto unlock;
1224 } 1217 }
1225 1218
1226 /* Link is now in state WORKING_WORKING */ 1219 /* Link is now in state WORKING_WORKING */
1227 if (unlikely(seq_no != mod(l_ptr->next_in_no))) { 1220 if (unlikely(seq_no != mod(l_ptr->next_in_no))) {
1228 link_handle_out_of_seq_msg(l_ptr, skb); 1221 link_handle_out_of_seq_msg(l_ptr, skb);
1229 link_retrieve_defq(l_ptr, &head); 1222 link_retrieve_defq(l_ptr, &head);
1230 tipc_node_unlock(n_ptr); 1223 skb = NULL;
1231 continue; 1224 goto unlock;
1232 } 1225 }
1233 l_ptr->next_in_no++; 1226 l_ptr->next_in_no++;
1234 if (unlikely(!skb_queue_empty(&l_ptr->deferred_queue))) 1227 if (unlikely(!skb_queue_empty(&l_ptr->deferred_queue)))
@@ -1238,97 +1231,102 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b_ptr)
1238 l_ptr->stats.sent_acks++; 1231 l_ptr->stats.sent_acks++;
1239 tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0, 0); 1232 tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);
1240 } 1233 }
1241 1234 tipc_link_input(l_ptr, skb);
1242 if (tipc_link_prepare_input(net, l_ptr, &skb)) { 1235 skb = NULL;
1243 tipc_node_unlock(n_ptr); 1236unlock:
1244 continue;
1245 }
1246 tipc_node_unlock(n_ptr);
1247
1248 if (tipc_link_input(net, l_ptr, skb) != 0)
1249 goto discard;
1250 continue;
1251unlock_discard:
1252 tipc_node_unlock(n_ptr); 1237 tipc_node_unlock(n_ptr);
1253discard: 1238discard:
1254 kfree_skb(skb); 1239 if (unlikely(skb))
1240 kfree_skb(skb);
1255 } 1241 }
1256} 1242}
1257 1243
1258/** 1244/* tipc_data_input - deliver data and name distr msgs to upper layer
1259 * tipc_link_prepare_input - process TIPC link messages
1260 *
1261 * returns nonzero if the message was consumed
1262 * 1245 *
1246 * Consumes buffer if message is of right type
1263 * Node lock must be held 1247 * Node lock must be held
1264 */ 1248 */
1265static int tipc_link_prepare_input(struct net *net, struct tipc_link *l, 1249static bool tipc_data_input(struct tipc_link *link, struct sk_buff *skb)
1266 struct sk_buff **buf)
1267{ 1250{
1268 struct tipc_node *n; 1251 struct tipc_node *node = link->owner;
1269 struct tipc_msg *msg; 1252 struct tipc_msg *msg = buf_msg(skb);
1270 int res = -EINVAL; 1253 u32 dport = msg_destport(msg);
1271 1254
1272 n = l->owner;
1273 msg = buf_msg(*buf);
1274 switch (msg_user(msg)) { 1255 switch (msg_user(msg)) {
1275 case CHANGEOVER_PROTOCOL: 1256 case TIPC_LOW_IMPORTANCE:
1276 if (tipc_link_tunnel_rcv(n, buf)) 1257 case TIPC_MEDIUM_IMPORTANCE:
1277 res = 0; 1258 case TIPC_HIGH_IMPORTANCE:
1278 break; 1259 case TIPC_CRITICAL_IMPORTANCE:
1279 case MSG_FRAGMENTER: 1260 case CONN_MANAGER:
1280 l->stats.recv_fragments++; 1261 if (tipc_skb_queue_tail(&link->inputq, skb, dport)) {
1281 if (tipc_buf_append(&l->reasm_buf, buf)) { 1262 node->inputq = &link->inputq;
1282 l->stats.recv_fragmented++; 1263 node->action_flags |= TIPC_MSG_EVT;
1283 res = 0;
1284 } else if (!l->reasm_buf) {
1285 tipc_link_reset(l);
1286 } 1264 }
1287 break; 1265 return true;
1288 case MSG_BUNDLER:
1289 l->stats.recv_bundles++;
1290 l->stats.recv_bundled += msg_msgcnt(msg);
1291 res = 0;
1292 break;
1293 case NAME_DISTRIBUTOR: 1266 case NAME_DISTRIBUTOR:
1294 n->bclink.recv_permitted = true; 1267 node->bclink.recv_permitted = true;
1295 res = 0; 1268 node->namedq = &link->namedq;
1296 break; 1269 skb_queue_tail(&link->namedq, skb);
1270 if (skb_queue_len(&link->namedq) == 1)
1271 node->action_flags |= TIPC_NAMED_MSG_EVT;
1272 return true;
1273 case MSG_BUNDLER:
1274 case CHANGEOVER_PROTOCOL:
1275 case MSG_FRAGMENTER:
1297 case BCAST_PROTOCOL: 1276 case BCAST_PROTOCOL:
1298 tipc_link_sync_rcv(n, *buf); 1277 return false;
1299 break;
1300 default: 1278 default:
1301 res = 0; 1279 pr_warn("Dropping received illegal msg type\n");
1302 } 1280 kfree_skb(skb);
1303 return res; 1281 return false;
1282 };
1304} 1283}
1305/** 1284
1306 * tipc_link_input - Deliver message too higher layers 1285/* tipc_link_input - process packet that has passed link protocol check
1286 *
1287 * Consumes buffer
1288 * Node lock must be held
1307 */ 1289 */
1308static int tipc_link_input(struct net *net, struct tipc_link *l, 1290static void tipc_link_input(struct tipc_link *link, struct sk_buff *skb)
1309 struct sk_buff *buf)
1310{ 1291{
1311 struct tipc_msg *msg = buf_msg(buf); 1292 struct tipc_node *node = link->owner;
1312 int res = 0; 1293 struct tipc_msg *msg = buf_msg(skb);
1294 struct sk_buff *iskb;
1295 int pos = 0;
1296
1297 if (likely(tipc_data_input(link, skb)))
1298 return;
1313 1299
1314 switch (msg_user(msg)) { 1300 switch (msg_user(msg)) {
1315 case TIPC_LOW_IMPORTANCE: 1301 case CHANGEOVER_PROTOCOL:
1316 case TIPC_MEDIUM_IMPORTANCE: 1302 if (!tipc_link_tunnel_rcv(node, &skb))
1317 case TIPC_HIGH_IMPORTANCE: 1303 break;
1318 case TIPC_CRITICAL_IMPORTANCE: 1304 if (msg_user(buf_msg(skb)) != MSG_BUNDLER) {
1319 case CONN_MANAGER: 1305 tipc_data_input(link, skb);
1320 tipc_sk_rcv(net, buf); 1306 break;
1307 }
1308 case MSG_BUNDLER:
1309 link->stats.recv_bundles++;
1310 link->stats.recv_bundled += msg_msgcnt(msg);
1311
1312 while (tipc_msg_extract(skb, &iskb, &pos))
1313 tipc_data_input(link, iskb);
1321 break; 1314 break;
1322 case NAME_DISTRIBUTOR: 1315 case MSG_FRAGMENTER:
1323 tipc_named_rcv(net, buf); 1316 link->stats.recv_fragments++;
1317 if (tipc_buf_append(&link->reasm_buf, &skb)) {
1318 link->stats.recv_fragmented++;
1319 tipc_data_input(link, skb);
1320 } else if (!link->reasm_buf) {
1321 tipc_link_reset(link);
1322 }
1324 break; 1323 break;
1325 case MSG_BUNDLER: 1324 case BCAST_PROTOCOL:
1326 tipc_link_bundle_rcv(net, buf); 1325 tipc_link_sync_rcv(node, skb);
1327 break; 1326 break;
1328 default: 1327 default:
1329 res = -EINVAL; 1328 break;
1330 } 1329 };
1331 return res;
1332} 1330}
1333 1331
1334/** 1332/**
@@ -1779,7 +1777,7 @@ void tipc_link_dup_queue_xmit(struct tipc_link *l_ptr,
1779 * @from_pos: offset to extract from 1777 * @from_pos: offset to extract from
1780 * 1778 *
1781 * Returns a new message buffer containing an embedded message. The 1779 * Returns a new message buffer containing an embedded message. The
1782 * encapsulating message itself is left unchanged. 1780 * encapsulating buffer is left unchanged.
1783 */ 1781 */
1784static struct sk_buff *buf_extract(struct sk_buff *skb, u32 from_pos) 1782static struct sk_buff *buf_extract(struct sk_buff *skb, u32 from_pos)
1785{ 1783{
@@ -1793,8 +1791,6 @@ static struct sk_buff *buf_extract(struct sk_buff *skb, u32 from_pos)
1793 return eb; 1791 return eb;
1794} 1792}
1795 1793
1796
1797
1798/* tipc_link_dup_rcv(): Receive a tunnelled DUPLICATE_MSG packet. 1794/* tipc_link_dup_rcv(): Receive a tunnelled DUPLICATE_MSG packet.
1799 * Owner node is locked. 1795 * Owner node is locked.
1800 */ 1796 */
@@ -1893,41 +1889,6 @@ exit:
1893 return *buf != NULL; 1889 return *buf != NULL;
1894} 1890}
1895 1891
1896/*
1897 * Bundler functionality:
1898 */
1899void tipc_link_bundle_rcv(struct net *net, struct sk_buff *buf)
1900{
1901 u32 msgcount = msg_msgcnt(buf_msg(buf));
1902 u32 pos = INT_H_SIZE;
1903 struct sk_buff *obuf;
1904 struct tipc_msg *omsg;
1905
1906 while (msgcount--) {
1907 obuf = buf_extract(buf, pos);
1908 if (obuf == NULL) {
1909 pr_warn("Link unable to unbundle message(s)\n");
1910 break;
1911 }
1912 omsg = buf_msg(obuf);
1913 pos += align(msg_size(omsg));
1914 if (msg_isdata(omsg)) {
1915 if (unlikely(msg_type(omsg) == TIPC_MCAST_MSG))
1916 tipc_sk_mcast_rcv(net, obuf);
1917 else
1918 tipc_sk_rcv(net, obuf);
1919 } else if (msg_user(omsg) == CONN_MANAGER) {
1920 tipc_sk_rcv(net, obuf);
1921 } else if (msg_user(omsg) == NAME_DISTRIBUTOR) {
1922 tipc_named_rcv(net, obuf);
1923 } else {
1924 pr_warn("Illegal bundled msg: %u\n", msg_user(omsg));
1925 kfree_skb(obuf);
1926 }
1927 }
1928 kfree_skb(buf);
1929}
1930
1931static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tol) 1892static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tol)
1932{ 1893{
1933 unsigned long intv = ((tol / 4) > 500) ? 500 : tol / 4; 1894 unsigned long intv = ((tol / 4) > 500) ? 500 : tol / 4;
diff --git a/net/tipc/link.h b/net/tipc/link.h
index 5b9a17f26280..34d3f55c4cea 100644
--- a/net/tipc/link.h
+++ b/net/tipc/link.h
@@ -131,8 +131,10 @@ struct tipc_stats {
131 * @next_in_no: next sequence number to expect for inbound messages 131 * @next_in_no: next sequence number to expect for inbound messages
132 * @deferred_queue: deferred queue saved OOS b'cast message received from node 132 * @deferred_queue: deferred queue saved OOS b'cast message received from node
133 * @unacked_window: # of inbound messages rx'd without ack'ing back to peer 133 * @unacked_window: # of inbound messages rx'd without ack'ing back to peer
134 * @inputq: buffer queue for messages to be delivered upwards
135 * @namedq: buffer queue for name table messages to be delivered upwards
134 * @next_out: ptr to first unsent outbound message in queue 136 * @next_out: ptr to first unsent outbound message in queue
135 * @waiting_sks: linked list of sockets waiting for link congestion to abate 137 * @wakeupq: linked list of wakeup msgs waiting for link congestion to abate
136 * @long_msg_seq_no: next identifier to use for outbound fragmented messages 138 * @long_msg_seq_no: next identifier to use for outbound fragmented messages
137 * @reasm_buf: head of partially reassembled inbound message fragments 139 * @reasm_buf: head of partially reassembled inbound message fragments
138 * @stats: collects statistics regarding link activity 140 * @stats: collects statistics regarding link activity
@@ -184,10 +186,12 @@ struct tipc_link {
184 u32 next_in_no; 186 u32 next_in_no;
185 struct sk_buff_head deferred_queue; 187 struct sk_buff_head deferred_queue;
186 u32 unacked_window; 188 u32 unacked_window;
189 struct sk_buff_head inputq;
190 struct sk_buff_head namedq;
187 191
188 /* Congestion handling */ 192 /* Congestion handling */
189 struct sk_buff *next_out; 193 struct sk_buff *next_out;
190 struct sk_buff_head waiting_sks; 194 struct sk_buff_head wakeupq;
191 195
192 /* Fragmentation/reassembly */ 196 /* Fragmentation/reassembly */
193 u32 long_msg_seq_no; 197 u32 long_msg_seq_no;
@@ -228,7 +232,6 @@ int tipc_link_xmit(struct net *net, struct sk_buff_head *list, u32 dest,
228 u32 selector); 232 u32 selector);
229int __tipc_link_xmit(struct net *net, struct tipc_link *link, 233int __tipc_link_xmit(struct net *net, struct tipc_link *link,
230 struct sk_buff_head *list); 234 struct sk_buff_head *list);
231void tipc_link_bundle_rcv(struct net *net, struct sk_buff *buf);
232void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int prob, 235void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int prob,
233 u32 gap, u32 tolerance, u32 priority, u32 acked_mtu); 236 u32 gap, u32 tolerance, u32 priority, u32 acked_mtu);
234void tipc_link_push_packets(struct tipc_link *l_ptr); 237void tipc_link_push_packets(struct tipc_link *l_ptr);
@@ -244,6 +247,7 @@ int tipc_nl_link_get(struct sk_buff *skb, struct genl_info *info);
244int tipc_nl_link_set(struct sk_buff *skb, struct genl_info *info); 247int tipc_nl_link_set(struct sk_buff *skb, struct genl_info *info);
245int tipc_nl_link_reset_stats(struct sk_buff *skb, struct genl_info *info); 248int tipc_nl_link_reset_stats(struct sk_buff *skb, struct genl_info *info);
246int tipc_nl_parse_link_prop(struct nlattr *prop, struct nlattr *props[]); 249int tipc_nl_parse_link_prop(struct nlattr *prop, struct nlattr *props[]);
250void link_prepare_wakeup(struct tipc_link *l);
247 251
248/* 252/*
249 * Link sequence number manipulation routines (uses modulo 2**16 arithmetic) 253 * Link sequence number manipulation routines (uses modulo 2**16 arithmetic)
diff --git a/net/tipc/msg.c b/net/tipc/msg.c
index 697223a21240..b6eb90cd3ef7 100644
--- a/net/tipc/msg.c
+++ b/net/tipc/msg.c
@@ -327,6 +327,40 @@ bool tipc_msg_bundle(struct sk_buff_head *list, struct sk_buff *skb, u32 mtu)
327} 327}
328 328
329/** 329/**
330 * tipc_msg_extract(): extract bundled inner packet from buffer
331 * @skb: linear outer buffer, to be extracted from.
332 * @iskb: extracted inner buffer, to be returned
333 * @pos: position of msg to be extracted. Returns with pointer of next msg
334 * Consumes outer buffer when last packet extracted
335 * Returns true when when there is an extracted buffer, otherwise false
336 */
337bool tipc_msg_extract(struct sk_buff *skb, struct sk_buff **iskb, int *pos)
338{
339 struct tipc_msg *msg = buf_msg(skb);
340 int imsz;
341 struct tipc_msg *imsg = (struct tipc_msg *)(msg_data(msg) + *pos);
342
343 /* Is there space left for shortest possible message? */
344 if (*pos > (msg_data_sz(msg) - SHORT_H_SIZE))
345 goto none;
346 imsz = msg_size(imsg);
347
348 /* Is there space left for current message ? */
349 if ((*pos + imsz) > msg_data_sz(msg))
350 goto none;
351 *iskb = tipc_buf_acquire(imsz);
352 if (!*iskb)
353 goto none;
354 skb_copy_to_linear_data(*iskb, imsg, imsz);
355 *pos += align(imsz);
356 return true;
357none:
358 kfree_skb(skb);
359 *iskb = NULL;
360 return false;
361}
362
363/**
330 * tipc_msg_make_bundle(): Create bundle buf and append message to its tail 364 * tipc_msg_make_bundle(): Create bundle buf and append message to its tail
331 * @list: the buffer chain 365 * @list: the buffer chain
332 * @skb: buffer to be appended and replaced 366 * @skb: buffer to be appended and replaced
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index 60702992933d..ab467261bd9d 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -45,6 +45,7 @@
45 * Note: Some items are also used with TIPC internal message headers 45 * Note: Some items are also used with TIPC internal message headers
46 */ 46 */
47#define TIPC_VERSION 2 47#define TIPC_VERSION 2
48struct plist;
48 49
49/* 50/*
50 * Payload message users are defined in TIPC's public API: 51 * Payload message users are defined in TIPC's public API:
@@ -759,10 +760,82 @@ int tipc_buf_append(struct sk_buff **headbuf, struct sk_buff **buf);
759bool tipc_msg_bundle(struct sk_buff_head *list, struct sk_buff *skb, u32 mtu); 760bool tipc_msg_bundle(struct sk_buff_head *list, struct sk_buff *skb, u32 mtu);
760bool tipc_msg_make_bundle(struct sk_buff_head *list, 761bool tipc_msg_make_bundle(struct sk_buff_head *list,
761 struct sk_buff *skb, u32 mtu, u32 dnode); 762 struct sk_buff *skb, u32 mtu, u32 dnode);
763bool tipc_msg_extract(struct sk_buff *skb, struct sk_buff **iskb, int *pos);
762int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m, 764int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m,
763 int offset, int dsz, int mtu, struct sk_buff_head *list); 765 int offset, int dsz, int mtu, struct sk_buff_head *list);
764bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb, u32 *dnode, 766bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb, u32 *dnode,
765 int *err); 767 int *err);
766struct sk_buff *tipc_msg_reassemble(struct sk_buff_head *list); 768struct sk_buff *tipc_msg_reassemble(struct sk_buff_head *list);
767 769
770/* tipc_skb_peek_port(): find a destination port, ignoring all destinations
771 * up to and including 'filter'.
772 * Note: ignoring previously tried destinations minimizes the risk of
773 * contention on the socket lock
774 * @list: list to be peeked in
775 * @filter: last destination to be ignored from search
776 * Returns a destination port number, of applicable.
777 */
778static inline u32 tipc_skb_peek_port(struct sk_buff_head *list, u32 filter)
779{
780 struct sk_buff *skb;
781 u32 dport = 0;
782 bool ignore = true;
783
784 spin_lock_bh(&list->lock);
785 skb_queue_walk(list, skb) {
786 dport = msg_destport(buf_msg(skb));
787 if (!filter || skb_queue_is_last(list, skb))
788 break;
789 if (dport == filter)
790 ignore = false;
791 else if (!ignore)
792 break;
793 }
794 spin_unlock_bh(&list->lock);
795 return dport;
796}
797
798/* tipc_skb_dequeue(): unlink first buffer with dest 'dport' from list
799 * @list: list to be unlinked from
800 * @dport: selection criteria for buffer to unlink
801 */
802static inline struct sk_buff *tipc_skb_dequeue(struct sk_buff_head *list,
803 u32 dport)
804{
805 struct sk_buff *_skb, *tmp, *skb = NULL;
806
807 spin_lock_bh(&list->lock);
808 skb_queue_walk_safe(list, _skb, tmp) {
809 if (msg_destport(buf_msg(_skb)) == dport) {
810 __skb_unlink(_skb, list);
811 skb = _skb;
812 break;
813 }
814 }
815 spin_unlock_bh(&list->lock);
816 return skb;
817}
818
819/* tipc_skb_queue_tail(): add buffer to tail of list;
820 * @list: list to be appended to
821 * @skb: buffer to append. Always appended
822 * @dport: the destination port of the buffer
823 * returns true if dport differs from previous destination
824 */
825static inline bool tipc_skb_queue_tail(struct sk_buff_head *list,
826 struct sk_buff *skb, u32 dport)
827{
828 struct sk_buff *_skb = NULL;
829 bool rv = false;
830
831 spin_lock_bh(&list->lock);
832 _skb = skb_peek_tail(list);
833 if (!_skb || (msg_destport(buf_msg(_skb)) != dport) ||
834 (skb_queue_len(list) > 32))
835 rv = true;
836 __skb_queue_tail(list, skb);
837 spin_unlock_bh(&list->lock);
838 return rv;
839}
840
768#endif 841#endif
diff --git a/net/tipc/name_distr.c b/net/tipc/name_distr.c
index dd8564cd9dbb..fcb07915aaac 100644
--- a/net/tipc/name_distr.c
+++ b/net/tipc/name_distr.c
@@ -381,25 +381,34 @@ void tipc_named_process_backlog(struct net *net)
381} 381}
382 382
383/** 383/**
384 * tipc_named_rcv - process name table update message sent by another node 384 * tipc_named_rcv - process name table update messages sent by another node
385 */ 385 */
386void tipc_named_rcv(struct net *net, struct sk_buff *buf) 386void tipc_named_rcv(struct net *net, struct sk_buff_head *inputq)
387{ 387{
388 struct tipc_net *tn = net_generic(net, tipc_net_id); 388 struct tipc_net *tn = net_generic(net, tipc_net_id);
389 struct tipc_msg *msg = buf_msg(buf); 389 struct tipc_msg *msg;
390 struct distr_item *item = (struct distr_item *)msg_data(msg); 390 struct distr_item *item;
391 u32 count = msg_data_sz(msg) / ITEM_SIZE; 391 uint count;
392 u32 node = msg_orignode(msg); 392 u32 node;
393 struct sk_buff *skb;
394 int mtype;
393 395
394 spin_lock_bh(&tn->nametbl_lock); 396 spin_lock_bh(&tn->nametbl_lock);
395 while (count--) { 397 for (skb = skb_dequeue(inputq); skb; skb = skb_dequeue(inputq)) {
396 if (!tipc_update_nametbl(net, item, node, msg_type(msg))) 398 msg = buf_msg(skb);
397 tipc_named_add_backlog(item, msg_type(msg), node); 399 mtype = msg_type(msg);
398 item++; 400 item = (struct distr_item *)msg_data(msg);
401 count = msg_data_sz(msg) / ITEM_SIZE;
402 node = msg_orignode(msg);
403 while (count--) {
404 if (!tipc_update_nametbl(net, item, node, mtype))
405 tipc_named_add_backlog(item, mtype, node);
406 item++;
407 }
408 kfree_skb(skb);
409 tipc_named_process_backlog(net);
399 } 410 }
400 tipc_named_process_backlog(net);
401 spin_unlock_bh(&tn->nametbl_lock); 411 spin_unlock_bh(&tn->nametbl_lock);
402 kfree_skb(buf);
403} 412}
404 413
405/** 414/**
diff --git a/net/tipc/name_distr.h b/net/tipc/name_distr.h
index 5ec10b59527b..dd2d9fd80da2 100644
--- a/net/tipc/name_distr.h
+++ b/net/tipc/name_distr.h
@@ -71,7 +71,7 @@ struct sk_buff *tipc_named_publish(struct net *net, struct publication *publ);
71struct sk_buff *tipc_named_withdraw(struct net *net, struct publication *publ); 71struct sk_buff *tipc_named_withdraw(struct net *net, struct publication *publ);
72void named_cluster_distribute(struct net *net, struct sk_buff *buf); 72void named_cluster_distribute(struct net *net, struct sk_buff *buf);
73void tipc_named_node_up(struct net *net, u32 dnode); 73void tipc_named_node_up(struct net *net, u32 dnode);
74void tipc_named_rcv(struct net *net, struct sk_buff *buf); 74void tipc_named_rcv(struct net *net, struct sk_buff_head *msg_queue);
75void tipc_named_reinit(struct net *net); 75void tipc_named_reinit(struct net *net);
76void tipc_named_process_backlog(struct net *net); 76void tipc_named_process_backlog(struct net *net);
77void tipc_publ_notify(struct net *net, struct list_head *nsub_list, u32 addr); 77void tipc_publ_notify(struct net *net, struct list_head *nsub_list, u32 addr);
diff --git a/net/tipc/node.c b/net/tipc/node.c
index 1c409c45f0fe..dcb83d9b2193 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -111,11 +111,8 @@ struct tipc_node *tipc_node_create(struct net *net, u32 addr)
111 INIT_LIST_HEAD(&n_ptr->list); 111 INIT_LIST_HEAD(&n_ptr->list);
112 INIT_LIST_HEAD(&n_ptr->publ_list); 112 INIT_LIST_HEAD(&n_ptr->publ_list);
113 INIT_LIST_HEAD(&n_ptr->conn_sks); 113 INIT_LIST_HEAD(&n_ptr->conn_sks);
114 skb_queue_head_init(&n_ptr->waiting_sks);
115 __skb_queue_head_init(&n_ptr->bclink.deferred_queue); 114 __skb_queue_head_init(&n_ptr->bclink.deferred_queue);
116
117 hlist_add_head_rcu(&n_ptr->hash, &tn->node_htable[tipc_hashfn(addr)]); 115 hlist_add_head_rcu(&n_ptr->hash, &tn->node_htable[tipc_hashfn(addr)]);
118
119 list_for_each_entry_rcu(temp_node, &tn->node_list, list) { 116 list_for_each_entry_rcu(temp_node, &tn->node_list, list) {
120 if (n_ptr->addr < temp_node->addr) 117 if (n_ptr->addr < temp_node->addr)
121 break; 118 break;
@@ -201,19 +198,22 @@ void tipc_node_abort_sock_conns(struct net *net, struct list_head *conns)
201{ 198{
202 struct tipc_net *tn = net_generic(net, tipc_net_id); 199 struct tipc_net *tn = net_generic(net, tipc_net_id);
203 struct tipc_sock_conn *conn, *safe; 200 struct tipc_sock_conn *conn, *safe;
204 struct sk_buff *buf; 201 struct sk_buff *skb;
202 struct sk_buff_head skbs;
205 203
204 skb_queue_head_init(&skbs);
206 list_for_each_entry_safe(conn, safe, conns, list) { 205 list_for_each_entry_safe(conn, safe, conns, list) {
207 buf = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, 206 skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE,
208 TIPC_CONN_MSG, SHORT_H_SIZE, 0, 207 TIPC_CONN_MSG, SHORT_H_SIZE, 0,
209 tn->own_addr, conn->peer_node, 208 tn->own_addr, conn->peer_node,
210 conn->port, conn->peer_port, 209 conn->port, conn->peer_port,
211 TIPC_ERR_NO_NODE); 210 TIPC_ERR_NO_NODE);
212 if (likely(buf)) 211 if (likely(skb))
213 tipc_sk_rcv(net, buf); 212 skb_queue_tail(&skbs, skb);
214 list_del(&conn->list); 213 list_del(&conn->list);
215 kfree(conn); 214 kfree(conn);
216 } 215 }
216 tipc_sk_rcv(net, &skbs);
217} 217}
218 218
219/** 219/**
@@ -568,37 +568,36 @@ void tipc_node_unlock(struct tipc_node *node)
568 struct net *net = node->net; 568 struct net *net = node->net;
569 LIST_HEAD(nsub_list); 569 LIST_HEAD(nsub_list);
570 LIST_HEAD(conn_sks); 570 LIST_HEAD(conn_sks);
571 struct sk_buff_head waiting_sks;
572 u32 addr = 0; 571 u32 addr = 0;
573 int flags = node->action_flags; 572 u32 flags = node->action_flags;
574 u32 link_id = 0; 573 u32 link_id = 0;
574 struct sk_buff_head *inputq = node->inputq;
575 struct sk_buff_head *namedq = node->inputq;
575 576
576 if (likely(!flags)) { 577 if (likely(!flags || (flags == TIPC_MSG_EVT))) {
578 node->action_flags = 0;
577 spin_unlock_bh(&node->lock); 579 spin_unlock_bh(&node->lock);
580 if (flags == TIPC_MSG_EVT)
581 tipc_sk_rcv(net, inputq);
578 return; 582 return;
579 } 583 }
580 584
581 addr = node->addr; 585 addr = node->addr;
582 link_id = node->link_id; 586 link_id = node->link_id;
583 __skb_queue_head_init(&waiting_sks); 587 namedq = node->namedq;
584
585 if (flags & TIPC_WAKEUP_USERS)
586 skb_queue_splice_init(&node->waiting_sks, &waiting_sks);
587 588
588 if (flags & TIPC_NOTIFY_NODE_DOWN) { 589 if (flags & TIPC_NOTIFY_NODE_DOWN) {
589 list_replace_init(&node->publ_list, &nsub_list); 590 list_replace_init(&node->publ_list, &nsub_list);
590 list_replace_init(&node->conn_sks, &conn_sks); 591 list_replace_init(&node->conn_sks, &conn_sks);
591 } 592 }
592 node->action_flags &= ~(TIPC_WAKEUP_USERS | TIPC_NOTIFY_NODE_DOWN | 593 node->action_flags &= ~(TIPC_MSG_EVT | TIPC_NOTIFY_NODE_DOWN |
593 TIPC_NOTIFY_NODE_UP | TIPC_NOTIFY_LINK_UP | 594 TIPC_NOTIFY_NODE_UP | TIPC_NOTIFY_LINK_UP |
594 TIPC_NOTIFY_LINK_DOWN | 595 TIPC_NOTIFY_LINK_DOWN |
595 TIPC_WAKEUP_BCAST_USERS); 596 TIPC_WAKEUP_BCAST_USERS |
597 TIPC_NAMED_MSG_EVT);
596 598
597 spin_unlock_bh(&node->lock); 599 spin_unlock_bh(&node->lock);
598 600
599 while (!skb_queue_empty(&waiting_sks))
600 tipc_sk_rcv(net, __skb_dequeue(&waiting_sks));
601
602 if (!list_empty(&conn_sks)) 601 if (!list_empty(&conn_sks))
603 tipc_node_abort_sock_conns(net, &conn_sks); 602 tipc_node_abort_sock_conns(net, &conn_sks);
604 603
@@ -618,6 +617,12 @@ void tipc_node_unlock(struct tipc_node *node)
618 if (flags & TIPC_NOTIFY_LINK_DOWN) 617 if (flags & TIPC_NOTIFY_LINK_DOWN)
619 tipc_nametbl_withdraw(net, TIPC_LINK_STATE, addr, 618 tipc_nametbl_withdraw(net, TIPC_LINK_STATE, addr,
620 link_id, addr); 619 link_id, addr);
620
621 if (flags & TIPC_MSG_EVT)
622 tipc_sk_rcv(net, inputq);
623
624 if (flags & TIPC_NAMED_MSG_EVT)
625 tipc_named_rcv(net, namedq);
621} 626}
622 627
623/* Caller should hold node lock for the passed node */ 628/* Caller should hold node lock for the passed node */
diff --git a/net/tipc/node.h b/net/tipc/node.h
index 43ef88ef3035..c2b0fcf4042b 100644
--- a/net/tipc/node.h
+++ b/net/tipc/node.h
@@ -55,14 +55,15 @@
55 * TIPC_DISTRIBUTE_NAME: publish or withdraw link state name type 55 * TIPC_DISTRIBUTE_NAME: publish or withdraw link state name type
56 */ 56 */
57enum { 57enum {
58 TIPC_MSG_EVT = 1,
58 TIPC_WAIT_PEER_LINKS_DOWN = (1 << 1), 59 TIPC_WAIT_PEER_LINKS_DOWN = (1 << 1),
59 TIPC_WAIT_OWN_LINKS_DOWN = (1 << 2), 60 TIPC_WAIT_OWN_LINKS_DOWN = (1 << 2),
60 TIPC_NOTIFY_NODE_DOWN = (1 << 3), 61 TIPC_NOTIFY_NODE_DOWN = (1 << 3),
61 TIPC_NOTIFY_NODE_UP = (1 << 4), 62 TIPC_NOTIFY_NODE_UP = (1 << 4),
62 TIPC_WAKEUP_USERS = (1 << 5), 63 TIPC_WAKEUP_BCAST_USERS = (1 << 5),
63 TIPC_WAKEUP_BCAST_USERS = (1 << 6), 64 TIPC_NOTIFY_LINK_UP = (1 << 6),
64 TIPC_NOTIFY_LINK_UP = (1 << 7), 65 TIPC_NOTIFY_LINK_DOWN = (1 << 7),
65 TIPC_NOTIFY_LINK_DOWN = (1 << 8) 66 TIPC_NAMED_MSG_EVT = (1 << 8)
66}; 67};
67 68
68/** 69/**
@@ -92,6 +93,9 @@ struct tipc_node_bclink {
92 * @lock: spinlock governing access to structure 93 * @lock: spinlock governing access to structure
93 * @net: the applicable net namespace 94 * @net: the applicable net namespace
94 * @hash: links to adjacent nodes in unsorted hash chain 95 * @hash: links to adjacent nodes in unsorted hash chain
96 * @inputq: pointer to input queue containing messages for msg event
97 * @namedq: pointer to name table input queue with name table messages
98 * @curr_link: the link holding the node lock, if any
95 * @active_links: pointers to active links to node 99 * @active_links: pointers to active links to node
96 * @links: pointers to all links to node 100 * @links: pointers to all links to node
97 * @action_flags: bit mask of different types of node actions 101 * @action_flags: bit mask of different types of node actions
@@ -109,10 +113,12 @@ struct tipc_node {
109 spinlock_t lock; 113 spinlock_t lock;
110 struct net *net; 114 struct net *net;
111 struct hlist_node hash; 115 struct hlist_node hash;
116 struct sk_buff_head *inputq;
117 struct sk_buff_head *namedq;
112 struct tipc_link *active_links[2]; 118 struct tipc_link *active_links[2];
113 u32 act_mtus[2]; 119 u32 act_mtus[2];
114 struct tipc_link *links[MAX_BEARERS]; 120 struct tipc_link *links[MAX_BEARERS];
115 unsigned int action_flags; 121 int action_flags;
116 struct tipc_node_bclink bclink; 122 struct tipc_node_bclink bclink;
117 struct list_head list; 123 struct list_head list;
118 int link_cnt; 124 int link_cnt;
@@ -120,7 +126,6 @@ struct tipc_node {
120 u32 signature; 126 u32 signature;
121 u32 link_id; 127 u32 link_id;
122 struct list_head publ_list; 128 struct list_head publ_list;
123 struct sk_buff_head waiting_sks;
124 struct list_head conn_sks; 129 struct list_head conn_sks;
125 struct rcu_head rcu; 130 struct rcu_head rcu;
126}; 131};
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index 611a04fb0ddc..c1a4611649ab 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -41,6 +41,7 @@
41#include "node.h" 41#include "node.h"
42#include "link.h" 42#include "link.h"
43#include "config.h" 43#include "config.h"
44#include "name_distr.h"
44#include "socket.h" 45#include "socket.h"
45 46
46#define SS_LISTENING -1 /* socket is listening */ 47#define SS_LISTENING -1 /* socket is listening */
@@ -785,10 +786,16 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff *buf)
785 struct sk_buff *b; 786 struct sk_buff *b;
786 uint i, last, dst = 0; 787 uint i, last, dst = 0;
787 u32 scope = TIPC_CLUSTER_SCOPE; 788 u32 scope = TIPC_CLUSTER_SCOPE;
789 struct sk_buff_head msgs;
788 790
789 if (in_own_node(net, msg_orignode(msg))) 791 if (in_own_node(net, msg_orignode(msg)))
790 scope = TIPC_NODE_SCOPE; 792 scope = TIPC_NODE_SCOPE;
791 793
794 if (unlikely(!msg_mcast(msg))) {
795 pr_warn("Received non-multicast msg in multicast\n");
796 kfree_skb(buf);
797 goto exit;
798 }
792 /* Create destination port list: */ 799 /* Create destination port list: */
793 tipc_nametbl_mc_translate(net, msg_nametype(msg), msg_namelower(msg), 800 tipc_nametbl_mc_translate(net, msg_nametype(msg), msg_namelower(msg),
794 msg_nameupper(msg), scope, &dports); 801 msg_nameupper(msg), scope, &dports);
@@ -806,9 +813,12 @@ void tipc_sk_mcast_rcv(struct net *net, struct sk_buff *buf)
806 continue; 813 continue;
807 } 814 }
808 msg_set_destport(msg, item->ports[i]); 815 msg_set_destport(msg, item->ports[i]);
809 tipc_sk_rcv(net, b); 816 skb_queue_head_init(&msgs);
817 skb_queue_tail(&msgs, b);
818 tipc_sk_rcv(net, &msgs);
810 } 819 }
811 } 820 }
821exit:
812 tipc_port_list_free(&dports); 822 tipc_port_list_free(&dports);
813} 823}
814 824
@@ -1760,71 +1770,99 @@ static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb)
1760} 1770}
1761 1771
1762/** 1772/**
1763 * tipc_sk_enqueue_skb - enqueue buffer to socket or backlog queue 1773 * tipc_sk_enqueue - extract all buffers with destination 'dport' from
1764 * @sk: socket 1774 * inputq and try adding them to socket or backlog queue
1765 * @skb: pointer to message. Set to NULL if buffer is consumed. 1775 * @inputq: list of incoming buffers with potentially different destinations
1766 * @dnode: if buffer should be forwarded/returned, send to this node 1776 * @sk: socket where the buffers should be enqueued
1777 * @dport: port number for the socket
1778 * @_skb: returned buffer to be forwarded or rejected, if applicable
1767 * 1779 *
1768 * Caller must hold socket lock 1780 * Caller must hold socket lock
1769 * 1781 *
1770 * Returns TIPC_OK (0) or -tipc error code 1782 * Returns TIPC_OK if all buffers enqueued, otherwise -TIPC_ERR_OVERLOAD
1783 * or -TIPC_ERR_NO_PORT
1771 */ 1784 */
1772static int tipc_sk_enqueue_skb(struct sock *sk, struct sk_buff **skb) 1785static int tipc_sk_enqueue(struct sk_buff_head *inputq, struct sock *sk,
1786 u32 dport, struct sk_buff **_skb)
1773{ 1787{
1774 unsigned int lim; 1788 unsigned int lim;
1775 atomic_t *dcnt; 1789 atomic_t *dcnt;
1776 1790 int err;
1777 if (unlikely(!*skb)) 1791 struct sk_buff *skb;
1778 return TIPC_OK; 1792 unsigned long time_limit = jiffies + 2;
1779 if (!sock_owned_by_user(sk)) 1793
1780 return filter_rcv(sk, skb); 1794 while (skb_queue_len(inputq)) {
1781 dcnt = &tipc_sk(sk)->dupl_rcvcnt; 1795 skb = tipc_skb_dequeue(inputq, dport);
1782 if (sk->sk_backlog.len) 1796 if (unlikely(!skb))
1783 atomic_set(dcnt, 0); 1797 return TIPC_OK;
1784 lim = rcvbuf_limit(sk, *skb) + atomic_read(dcnt); 1798 /* Return if softirq window exhausted */
1785 if (unlikely(sk_add_backlog(sk, *skb, lim))) 1799 if (unlikely(time_after_eq(jiffies, time_limit)))
1800 return TIPC_OK;
1801 if (!sock_owned_by_user(sk)) {
1802 err = filter_rcv(sk, &skb);
1803 if (likely(!skb))
1804 continue;
1805 *_skb = skb;
1806 return err;
1807 }
1808 dcnt = &tipc_sk(sk)->dupl_rcvcnt;
1809 if (sk->sk_backlog.len)
1810 atomic_set(dcnt, 0);
1811 lim = rcvbuf_limit(sk, skb) + atomic_read(dcnt);
1812 if (likely(!sk_add_backlog(sk, skb, lim)))
1813 continue;
1814 *_skb = skb;
1786 return -TIPC_ERR_OVERLOAD; 1815 return -TIPC_ERR_OVERLOAD;
1787 *skb = NULL; 1816 }
1788 return TIPC_OK; 1817 return TIPC_OK;
1789} 1818}
1790 1819
1791/** 1820/**
1792 * tipc_sk_rcv - handle incoming message 1821 * tipc_sk_rcv - handle a chain of incoming buffers
1793 * @skb: buffer containing arriving message 1822 * @inputq: buffer list containing the buffers
1794 * Consumes buffer 1823 * Consumes all buffers in list until inputq is empty
1795 * Returns 0 if success, or errno: -EHOSTUNREACH 1824 * Note: may be called in multiple threads referring to the same queue
1825 * Returns 0 if last buffer was accepted, otherwise -EHOSTUNREACH
1826 * Only node local calls check the return value, sending single-buffer queues
1796 */ 1827 */
1797int tipc_sk_rcv(struct net *net, struct sk_buff *skb) 1828int tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq)
1798{ 1829{
1830 u32 dnode, dport = 0;
1831 int err = -TIPC_ERR_NO_PORT;
1832 struct sk_buff *skb;
1799 struct tipc_sock *tsk; 1833 struct tipc_sock *tsk;
1800 struct tipc_net *tn; 1834 struct tipc_net *tn;
1801 struct sock *sk; 1835 struct sock *sk;
1802 u32 dport = msg_destport(buf_msg(skb));
1803 int err = -TIPC_ERR_NO_PORT;
1804 u32 dnode;
1805 1836
1806 /* Find destination */ 1837 while (skb_queue_len(inputq)) {
1807 tsk = tipc_sk_lookup(net, dport); 1838 skb = NULL;
1808 if (likely(tsk)) { 1839 dport = tipc_skb_peek_port(inputq, dport);
1809 sk = &tsk->sk; 1840 tsk = tipc_sk_lookup(net, dport);
1810 spin_lock_bh(&sk->sk_lock.slock); 1841 if (likely(tsk)) {
1811 err = tipc_sk_enqueue_skb(sk, &skb); 1842 sk = &tsk->sk;
1812 spin_unlock_bh(&sk->sk_lock.slock); 1843 if (likely(spin_trylock_bh(&sk->sk_lock.slock))) {
1813 sock_put(sk); 1844 err = tipc_sk_enqueue(inputq, sk, dport, &skb);
1814 } 1845 spin_unlock_bh(&sk->sk_lock.slock);
1815 if (likely(!skb)) 1846 dport = 0;
1816 return 0; 1847 }
1817 if (tipc_msg_lookup_dest(net, skb, &dnode, &err)) 1848 sock_put(sk);
1818 goto xmit; 1849 } else {
1819 if (!err) { 1850 skb = tipc_skb_dequeue(inputq, dport);
1820 dnode = msg_destnode(buf_msg(skb)); 1851 }
1821 goto xmit; 1852 if (likely(!skb))
1822 } 1853 continue;
1823 tn = net_generic(net, tipc_net_id); 1854 if (tipc_msg_lookup_dest(net, skb, &dnode, &err))
1824 if (!tipc_msg_reverse(tn->own_addr, skb, &dnode, -err)) 1855 goto xmit;
1825 return -EHOSTUNREACH; 1856 if (!err) {
1857 dnode = msg_destnode(buf_msg(skb));
1858 goto xmit;
1859 }
1860 tn = net_generic(net, tipc_net_id);
1861 if (!tipc_msg_reverse(tn->own_addr, skb, &dnode, -err))
1862 continue;
1826xmit: 1863xmit:
1827 tipc_link_xmit_skb(net, skb, dnode, dport); 1864 tipc_link_xmit_skb(net, skb, dnode, dport);
1865 }
1828 return err ? -EHOSTUNREACH : 0; 1866 return err ? -EHOSTUNREACH : 0;
1829} 1867}
1830 1868
diff --git a/net/tipc/socket.h b/net/tipc/socket.h
index f56c3fded51f..e3dbdc0e1be7 100644
--- a/net/tipc/socket.h
+++ b/net/tipc/socket.h
@@ -49,7 +49,7 @@ int tipc_sock_create_local(struct net *net, int type, struct socket **res);
49void tipc_sock_release_local(struct socket *sock); 49void tipc_sock_release_local(struct socket *sock);
50int tipc_sock_accept_local(struct socket *sock, struct socket **newsock, 50int tipc_sock_accept_local(struct socket *sock, struct socket **newsock,
51 int flags); 51 int flags);
52int tipc_sk_rcv(struct net *net, struct sk_buff *buf); 52int tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq);
53struct sk_buff *tipc_sk_socks_show(struct net *net); 53struct sk_buff *tipc_sk_socks_show(struct net *net);
54void tipc_sk_mcast_rcv(struct net *net, struct sk_buff *buf); 54void tipc_sk_mcast_rcv(struct net *net, struct sk_buff *buf);
55void tipc_sk_reinit(struct net *net); 55void tipc_sk_reinit(struct net *net);