aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDavid S. Miller <davem@davemloft.net>2015-03-14 14:38:41 -0400
committerDavid S. Miller <davem@davemloft.net>2015-03-14 14:38:41 -0400
commita3795208b9a9801a66b305395e9ebaae850eee03 (patch)
tree8d940842c102ed18354bc51ef64598374deb893c
parent5f1764ddfeb038decfe2b2fda030d0bed43fa36a (diff)
parente3eea1eb47ac616ee09cf0ae5d1e7790ef8461ea (diff)
Merge branch 'tipc-next'
Jon Maloy says: ==================== tipc: some optimizations and impovements The commits in this series contain some relatively simple changes that lead to better throughput across TIPC connections. We also make changes to the implementation of link transmission queueing and priority handling, in order to make the code more comprehensible and maintainable. v2: Commit #2: Redesigned tipc_msg_validate() to use pskb_may_pull(), as per feedback from David Miller. Commit #3: Some cosmetic changes to tipc_msg_extract(). I tried to replace the unconditional skb_linearize() with calls to pskb_may_pull() at selected locations, but I gave up. First, skb_trim() requires a fully linearized buffer. Second, it doesn't make much sense; the whole buffer will end up linearized, one way or another. ==================== Signed-off-by: David S. Miller <davem@davemloft.net>
-rw-r--r--net/tipc/bcast.c53
-rw-r--r--net/tipc/discover.c3
-rw-r--r--net/tipc/link.c352
-rw-r--r--net/tipc/link.h17
-rw-r--r--net/tipc/msg.c119
-rw-r--r--net/tipc/msg.h87
-rw-r--r--net/tipc/node.c4
-rw-r--r--net/tipc/node.h6
8 files changed, 300 insertions, 341 deletions
diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index 3e41704832de..5aff0844d4d3 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -135,9 +135,10 @@ static void bclink_set_last_sent(struct net *net)
135{ 135{
136 struct tipc_net *tn = net_generic(net, tipc_net_id); 136 struct tipc_net *tn = net_generic(net, tipc_net_id);
137 struct tipc_link *bcl = tn->bcl; 137 struct tipc_link *bcl = tn->bcl;
138 struct sk_buff *skb = skb_peek(&bcl->backlogq);
138 139
139 if (bcl->next_out) 140 if (skb)
140 bcl->fsm_msg_cnt = mod(buf_seqno(bcl->next_out) - 1); 141 bcl->fsm_msg_cnt = mod(buf_seqno(skb) - 1);
141 else 142 else
142 bcl->fsm_msg_cnt = mod(bcl->next_out_no - 1); 143 bcl->fsm_msg_cnt = mod(bcl->next_out_no - 1);
143} 144}
@@ -180,7 +181,7 @@ static void bclink_retransmit_pkt(struct tipc_net *tn, u32 after, u32 to)
180 struct sk_buff *skb; 181 struct sk_buff *skb;
181 struct tipc_link *bcl = tn->bcl; 182 struct tipc_link *bcl = tn->bcl;
182 183
183 skb_queue_walk(&bcl->outqueue, skb) { 184 skb_queue_walk(&bcl->transmq, skb) {
184 if (more(buf_seqno(skb), after)) { 185 if (more(buf_seqno(skb), after)) {
185 tipc_link_retransmit(bcl, skb, mod(to - after)); 186 tipc_link_retransmit(bcl, skb, mod(to - after));
186 break; 187 break;
@@ -210,14 +211,17 @@ void tipc_bclink_wakeup_users(struct net *net)
210void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked) 211void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked)
211{ 212{
212 struct sk_buff *skb, *tmp; 213 struct sk_buff *skb, *tmp;
213 struct sk_buff *next;
214 unsigned int released = 0; 214 unsigned int released = 0;
215 struct net *net = n_ptr->net; 215 struct net *net = n_ptr->net;
216 struct tipc_net *tn = net_generic(net, tipc_net_id); 216 struct tipc_net *tn = net_generic(net, tipc_net_id);
217 217
218 if (unlikely(!n_ptr->bclink.recv_permitted))
219 return;
220
218 tipc_bclink_lock(net); 221 tipc_bclink_lock(net);
222
219 /* Bail out if tx queue is empty (no clean up is required) */ 223 /* Bail out if tx queue is empty (no clean up is required) */
220 skb = skb_peek(&tn->bcl->outqueue); 224 skb = skb_peek(&tn->bcl->transmq);
221 if (!skb) 225 if (!skb)
222 goto exit; 226 goto exit;
223 227
@@ -244,27 +248,19 @@ void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked)
244 } 248 }
245 249
246 /* Skip over packets that node has previously acknowledged */ 250 /* Skip over packets that node has previously acknowledged */
247 skb_queue_walk(&tn->bcl->outqueue, skb) { 251 skb_queue_walk(&tn->bcl->transmq, skb) {
248 if (more(buf_seqno(skb), n_ptr->bclink.acked)) 252 if (more(buf_seqno(skb), n_ptr->bclink.acked))
249 break; 253 break;
250 } 254 }
251 255
252 /* Update packets that node is now acknowledging */ 256 /* Update packets that node is now acknowledging */
253 skb_queue_walk_from_safe(&tn->bcl->outqueue, skb, tmp) { 257 skb_queue_walk_from_safe(&tn->bcl->transmq, skb, tmp) {
254 if (more(buf_seqno(skb), acked)) 258 if (more(buf_seqno(skb), acked))
255 break; 259 break;
256 260 bcbuf_decr_acks(skb);
257 next = tipc_skb_queue_next(&tn->bcl->outqueue, skb); 261 bclink_set_last_sent(net);
258 if (skb != tn->bcl->next_out) {
259 bcbuf_decr_acks(skb);
260 } else {
261 bcbuf_set_acks(skb, 0);
262 tn->bcl->next_out = next;
263 bclink_set_last_sent(net);
264 }
265
266 if (bcbuf_acks(skb) == 0) { 262 if (bcbuf_acks(skb) == 0) {
267 __skb_unlink(skb, &tn->bcl->outqueue); 263 __skb_unlink(skb, &tn->bcl->transmq);
268 kfree_skb(skb); 264 kfree_skb(skb);
269 released = 1; 265 released = 1;
270 } 266 }
@@ -272,7 +268,7 @@ void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked)
272 n_ptr->bclink.acked = acked; 268 n_ptr->bclink.acked = acked;
273 269
274 /* Try resolving broadcast link congestion, if necessary */ 270 /* Try resolving broadcast link congestion, if necessary */
275 if (unlikely(tn->bcl->next_out)) { 271 if (unlikely(skb_peek(&tn->bcl->backlogq))) {
276 tipc_link_push_packets(tn->bcl); 272 tipc_link_push_packets(tn->bcl);
277 bclink_set_last_sent(net); 273 bclink_set_last_sent(net);
278 } 274 }
@@ -319,7 +315,7 @@ void tipc_bclink_update_link_state(struct tipc_node *n_ptr,
319 buf = tipc_buf_acquire(INT_H_SIZE); 315 buf = tipc_buf_acquire(INT_H_SIZE);
320 if (buf) { 316 if (buf) {
321 struct tipc_msg *msg = buf_msg(buf); 317 struct tipc_msg *msg = buf_msg(buf);
322 struct sk_buff *skb = skb_peek(&n_ptr->bclink.deferred_queue); 318 struct sk_buff *skb = skb_peek(&n_ptr->bclink.deferdq);
323 u32 to = skb ? buf_seqno(skb) - 1 : n_ptr->bclink.last_sent; 319 u32 to = skb ? buf_seqno(skb) - 1 : n_ptr->bclink.last_sent;
324 320
325 tipc_msg_init(tn->own_addr, msg, BCAST_PROTOCOL, STATE_MSG, 321 tipc_msg_init(tn->own_addr, msg, BCAST_PROTOCOL, STATE_MSG,
@@ -387,14 +383,13 @@ int tipc_bclink_xmit(struct net *net, struct sk_buff_head *list)
387 __skb_queue_purge(list); 383 __skb_queue_purge(list);
388 return -EHOSTUNREACH; 384 return -EHOSTUNREACH;
389 } 385 }
390
391 /* Broadcast to all nodes */ 386 /* Broadcast to all nodes */
392 if (likely(bclink)) { 387 if (likely(bclink)) {
393 tipc_bclink_lock(net); 388 tipc_bclink_lock(net);
394 if (likely(bclink->bcast_nodes.count)) { 389 if (likely(bclink->bcast_nodes.count)) {
395 rc = __tipc_link_xmit(net, bcl, list); 390 rc = __tipc_link_xmit(net, bcl, list);
396 if (likely(!rc)) { 391 if (likely(!rc)) {
397 u32 len = skb_queue_len(&bcl->outqueue); 392 u32 len = skb_queue_len(&bcl->transmq);
398 393
399 bclink_set_last_sent(net); 394 bclink_set_last_sent(net);
400 bcl->stats.queue_sz_counts++; 395 bcl->stats.queue_sz_counts++;
@@ -559,25 +554,25 @@ receive:
559 if (node->bclink.last_in == node->bclink.last_sent) 554 if (node->bclink.last_in == node->bclink.last_sent)
560 goto unlock; 555 goto unlock;
561 556
562 if (skb_queue_empty(&node->bclink.deferred_queue)) { 557 if (skb_queue_empty(&node->bclink.deferdq)) {
563 node->bclink.oos_state = 1; 558 node->bclink.oos_state = 1;
564 goto unlock; 559 goto unlock;
565 } 560 }
566 561
567 msg = buf_msg(skb_peek(&node->bclink.deferred_queue)); 562 msg = buf_msg(skb_peek(&node->bclink.deferdq));
568 seqno = msg_seqno(msg); 563 seqno = msg_seqno(msg);
569 next_in = mod(next_in + 1); 564 next_in = mod(next_in + 1);
570 if (seqno != next_in) 565 if (seqno != next_in)
571 goto unlock; 566 goto unlock;
572 567
573 /* Take in-sequence message from deferred queue & deliver it */ 568 /* Take in-sequence message from deferred queue & deliver it */
574 buf = __skb_dequeue(&node->bclink.deferred_queue); 569 buf = __skb_dequeue(&node->bclink.deferdq);
575 goto receive; 570 goto receive;
576 } 571 }
577 572
578 /* Handle out-of-sequence broadcast message */ 573 /* Handle out-of-sequence broadcast message */
579 if (less(next_in, seqno)) { 574 if (less(next_in, seqno)) {
580 deferred = tipc_link_defer_pkt(&node->bclink.deferred_queue, 575 deferred = tipc_link_defer_pkt(&node->bclink.deferdq,
581 buf); 576 buf);
582 bclink_update_last_sent(node, seqno); 577 bclink_update_last_sent(node, seqno);
583 buf = NULL; 578 buf = NULL;
@@ -634,7 +629,6 @@ static int tipc_bcbearer_send(struct net *net, struct sk_buff *buf,
634 msg_set_non_seq(msg, 1); 629 msg_set_non_seq(msg, 1);
635 msg_set_mc_netid(msg, tn->net_id); 630 msg_set_mc_netid(msg, tn->net_id);
636 tn->bcl->stats.sent_info++; 631 tn->bcl->stats.sent_info++;
637
638 if (WARN_ON(!bclink->bcast_nodes.count)) { 632 if (WARN_ON(!bclink->bcast_nodes.count)) {
639 dump_stack(); 633 dump_stack();
640 return 0; 634 return 0;
@@ -913,8 +907,9 @@ int tipc_bclink_init(struct net *net)
913 sprintf(bcbearer->media.name, "tipc-broadcast"); 907 sprintf(bcbearer->media.name, "tipc-broadcast");
914 908
915 spin_lock_init(&bclink->lock); 909 spin_lock_init(&bclink->lock);
916 __skb_queue_head_init(&bcl->outqueue); 910 __skb_queue_head_init(&bcl->transmq);
917 __skb_queue_head_init(&bcl->deferred_queue); 911 __skb_queue_head_init(&bcl->backlogq);
912 __skb_queue_head_init(&bcl->deferdq);
918 skb_queue_head_init(&bcl->wakeupq); 913 skb_queue_head_init(&bcl->wakeupq);
919 bcl->next_out_no = 1; 914 bcl->next_out_no = 1;
920 spin_lock_init(&bclink->node.lock); 915 spin_lock_init(&bclink->node.lock);
diff --git a/net/tipc/discover.c b/net/tipc/discover.c
index 5967506833ce..169f3dd038b9 100644
--- a/net/tipc/discover.c
+++ b/net/tipc/discover.c
@@ -89,6 +89,7 @@ static void tipc_disc_init_msg(struct net *net, struct sk_buff *buf, u32 type,
89 MAX_H_SIZE, dest_domain); 89 MAX_H_SIZE, dest_domain);
90 msg_set_non_seq(msg, 1); 90 msg_set_non_seq(msg, 1);
91 msg_set_node_sig(msg, tn->random); 91 msg_set_node_sig(msg, tn->random);
92 msg_set_node_capabilities(msg, 0);
92 msg_set_dest_domain(msg, dest_domain); 93 msg_set_dest_domain(msg, dest_domain);
93 msg_set_bc_netid(msg, tn->net_id); 94 msg_set_bc_netid(msg, tn->net_id);
94 b_ptr->media->addr2msg(msg_media_addr(msg), &b_ptr->addr); 95 b_ptr->media->addr2msg(msg_media_addr(msg), &b_ptr->addr);
@@ -133,6 +134,7 @@ void tipc_disc_rcv(struct net *net, struct sk_buff *buf,
133 u32 net_id = msg_bc_netid(msg); 134 u32 net_id = msg_bc_netid(msg);
134 u32 mtyp = msg_type(msg); 135 u32 mtyp = msg_type(msg);
135 u32 signature = msg_node_sig(msg); 136 u32 signature = msg_node_sig(msg);
137 u16 caps = msg_node_capabilities(msg);
136 bool addr_match = false; 138 bool addr_match = false;
137 bool sign_match = false; 139 bool sign_match = false;
138 bool link_up = false; 140 bool link_up = false;
@@ -167,6 +169,7 @@ void tipc_disc_rcv(struct net *net, struct sk_buff *buf,
167 if (!node) 169 if (!node)
168 return; 170 return;
169 tipc_node_lock(node); 171 tipc_node_lock(node);
172 node->capabilities = caps;
170 link = node->links[bearer->identity]; 173 link = node->links[bearer->identity];
171 174
172 /* Prepare to validate requesting node's signature and media address */ 175 /* Prepare to validate requesting node's signature and media address */
diff --git a/net/tipc/link.c b/net/tipc/link.c
index 98609fdfb06a..bc49120bfb44 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -1,7 +1,7 @@
1/* 1/*
2 * net/tipc/link.c: TIPC link code 2 * net/tipc/link.c: TIPC link code
3 * 3 *
4 * Copyright (c) 1996-2007, 2012-2014, Ericsson AB 4 * Copyright (c) 1996-2007, 2012-2015, Ericsson AB
5 * Copyright (c) 2004-2007, 2010-2013, Wind River Systems 5 * Copyright (c) 2004-2007, 2010-2013, Wind River Systems
6 * All rights reserved. 6 * All rights reserved.
7 * 7 *
@@ -35,6 +35,7 @@
35 */ 35 */
36 36
37#include "core.h" 37#include "core.h"
38#include "subscr.h"
38#include "link.h" 39#include "link.h"
39#include "bcast.h" 40#include "bcast.h"
40#include "socket.h" 41#include "socket.h"
@@ -194,10 +195,10 @@ static void link_timeout(unsigned long data)
194 tipc_node_lock(l_ptr->owner); 195 tipc_node_lock(l_ptr->owner);
195 196
196 /* update counters used in statistical profiling of send traffic */ 197 /* update counters used in statistical profiling of send traffic */
197 l_ptr->stats.accu_queue_sz += skb_queue_len(&l_ptr->outqueue); 198 l_ptr->stats.accu_queue_sz += skb_queue_len(&l_ptr->transmq);
198 l_ptr->stats.queue_sz_counts++; 199 l_ptr->stats.queue_sz_counts++;
199 200
200 skb = skb_peek(&l_ptr->outqueue); 201 skb = skb_peek(&l_ptr->transmq);
201 if (skb) { 202 if (skb) {
202 struct tipc_msg *msg = buf_msg(skb); 203 struct tipc_msg *msg = buf_msg(skb);
203 u32 length = msg_size(msg); 204 u32 length = msg_size(msg);
@@ -229,7 +230,7 @@ static void link_timeout(unsigned long data)
229 /* do all other link processing performed on a periodic basis */ 230 /* do all other link processing performed on a periodic basis */
230 link_state_event(l_ptr, TIMEOUT_EVT); 231 link_state_event(l_ptr, TIMEOUT_EVT);
231 232
232 if (l_ptr->next_out) 233 if (skb_queue_len(&l_ptr->backlogq))
233 tipc_link_push_packets(l_ptr); 234 tipc_link_push_packets(l_ptr);
234 235
235 tipc_node_unlock(l_ptr->owner); 236 tipc_node_unlock(l_ptr->owner);
@@ -305,16 +306,15 @@ struct tipc_link *tipc_link_create(struct tipc_node *n_ptr,
305 msg_set_session(msg, (tn->random & 0xffff)); 306 msg_set_session(msg, (tn->random & 0xffff));
306 msg_set_bearer_id(msg, b_ptr->identity); 307 msg_set_bearer_id(msg, b_ptr->identity);
307 strcpy((char *)msg_data(msg), if_name); 308 strcpy((char *)msg_data(msg), if_name);
308
309 l_ptr->priority = b_ptr->priority;
310 tipc_link_set_queue_limits(l_ptr, b_ptr->window);
311
312 l_ptr->net_plane = b_ptr->net_plane; 309 l_ptr->net_plane = b_ptr->net_plane;
313 link_init_max_pkt(l_ptr); 310 link_init_max_pkt(l_ptr);
311 l_ptr->priority = b_ptr->priority;
312 tipc_link_set_queue_limits(l_ptr, b_ptr->window);
314 313
315 l_ptr->next_out_no = 1; 314 l_ptr->next_out_no = 1;
316 __skb_queue_head_init(&l_ptr->outqueue); 315 __skb_queue_head_init(&l_ptr->transmq);
317 __skb_queue_head_init(&l_ptr->deferred_queue); 316 __skb_queue_head_init(&l_ptr->backlogq);
317 __skb_queue_head_init(&l_ptr->deferdq);
318 skb_queue_head_init(&l_ptr->wakeupq); 318 skb_queue_head_init(&l_ptr->wakeupq);
319 skb_queue_head_init(&l_ptr->inputq); 319 skb_queue_head_init(&l_ptr->inputq);
320 skb_queue_head_init(&l_ptr->namedq); 320 skb_queue_head_init(&l_ptr->namedq);
@@ -400,7 +400,7 @@ static bool link_schedule_user(struct tipc_link *link, u32 oport,
400 */ 400 */
401void link_prepare_wakeup(struct tipc_link *link) 401void link_prepare_wakeup(struct tipc_link *link)
402{ 402{
403 uint pend_qsz = skb_queue_len(&link->outqueue); 403 uint pend_qsz = skb_queue_len(&link->backlogq);
404 struct sk_buff *skb, *tmp; 404 struct sk_buff *skb, *tmp;
405 405
406 skb_queue_walk_safe(&link->wakeupq, skb, tmp) { 406 skb_queue_walk_safe(&link->wakeupq, skb, tmp) {
@@ -430,8 +430,9 @@ void tipc_link_reset_fragments(struct tipc_link *l_ptr)
430 */ 430 */
431void tipc_link_purge_queues(struct tipc_link *l_ptr) 431void tipc_link_purge_queues(struct tipc_link *l_ptr)
432{ 432{
433 __skb_queue_purge(&l_ptr->deferred_queue); 433 __skb_queue_purge(&l_ptr->deferdq);
434 __skb_queue_purge(&l_ptr->outqueue); 434 __skb_queue_purge(&l_ptr->transmq);
435 __skb_queue_purge(&l_ptr->backlogq);
435 tipc_link_reset_fragments(l_ptr); 436 tipc_link_reset_fragments(l_ptr);
436} 437}
437 438
@@ -464,15 +465,15 @@ void tipc_link_reset(struct tipc_link *l_ptr)
464 } 465 }
465 466
466 /* Clean up all queues, except inputq: */ 467 /* Clean up all queues, except inputq: */
467 __skb_queue_purge(&l_ptr->outqueue); 468 __skb_queue_purge(&l_ptr->transmq);
468 __skb_queue_purge(&l_ptr->deferred_queue); 469 __skb_queue_purge(&l_ptr->backlogq);
470 __skb_queue_purge(&l_ptr->deferdq);
469 if (!owner->inputq) 471 if (!owner->inputq)
470 owner->inputq = &l_ptr->inputq; 472 owner->inputq = &l_ptr->inputq;
471 skb_queue_splice_init(&l_ptr->wakeupq, owner->inputq); 473 skb_queue_splice_init(&l_ptr->wakeupq, owner->inputq);
472 if (!skb_queue_empty(owner->inputq)) 474 if (!skb_queue_empty(owner->inputq))
473 owner->action_flags |= TIPC_MSG_EVT; 475 owner->action_flags |= TIPC_MSG_EVT;
474 l_ptr->next_out = NULL; 476 l_ptr->rcv_unacked = 0;
475 l_ptr->unacked_window = 0;
476 l_ptr->checkpoint = 1; 477 l_ptr->checkpoint = 1;
477 l_ptr->next_out_no = 1; 478 l_ptr->next_out_no = 1;
478 l_ptr->fsm_msg_cnt = 0; 479 l_ptr->fsm_msg_cnt = 0;
@@ -706,7 +707,7 @@ static int tipc_link_cong(struct tipc_link *link, struct sk_buff_head *list)
706{ 707{
707 struct sk_buff *skb = skb_peek(list); 708 struct sk_buff *skb = skb_peek(list);
708 struct tipc_msg *msg = buf_msg(skb); 709 struct tipc_msg *msg = buf_msg(skb);
709 uint imp = tipc_msg_tot_importance(msg); 710 int imp = msg_importance(msg);
710 u32 oport = msg_tot_origport(msg); 711 u32 oport = msg_tot_origport(msg);
711 712
712 if (unlikely(imp > TIPC_CRITICAL_IMPORTANCE)) { 713 if (unlikely(imp > TIPC_CRITICAL_IMPORTANCE)) {
@@ -742,54 +743,51 @@ int __tipc_link_xmit(struct net *net, struct tipc_link *link,
742 struct sk_buff_head *list) 743 struct sk_buff_head *list)
743{ 744{
744 struct tipc_msg *msg = buf_msg(skb_peek(list)); 745 struct tipc_msg *msg = buf_msg(skb_peek(list));
745 uint psz = msg_size(msg); 746 unsigned int maxwin = link->window;
746 uint sndlim = link->queue_limit[0]; 747 unsigned int imp = msg_importance(msg);
747 uint imp = tipc_msg_tot_importance(msg);
748 uint mtu = link->max_pkt; 748 uint mtu = link->max_pkt;
749 uint ack = mod(link->next_in_no - 1); 749 uint ack = mod(link->next_in_no - 1);
750 uint seqno = link->next_out_no; 750 uint seqno = link->next_out_no;
751 uint bc_last_in = link->owner->bclink.last_in; 751 uint bc_last_in = link->owner->bclink.last_in;
752 struct tipc_media_addr *addr = &link->media_addr; 752 struct tipc_media_addr *addr = &link->media_addr;
753 struct sk_buff_head *outqueue = &link->outqueue; 753 struct sk_buff_head *transmq = &link->transmq;
754 struct sk_buff_head *backlogq = &link->backlogq;
754 struct sk_buff *skb, *tmp; 755 struct sk_buff *skb, *tmp;
755 756
756 /* Match queue limits against msg importance: */ 757 /* Match queue limit against msg importance: */
757 if (unlikely(skb_queue_len(outqueue) >= link->queue_limit[imp])) 758 if (unlikely(skb_queue_len(backlogq) >= link->queue_limit[imp]))
758 return tipc_link_cong(link, list); 759 return tipc_link_cong(link, list);
759 760
760 /* Has valid packet limit been used ? */ 761 /* Has valid packet limit been used ? */
761 if (unlikely(psz > mtu)) { 762 if (unlikely(msg_size(msg) > mtu)) {
762 __skb_queue_purge(list); 763 __skb_queue_purge(list);
763 return -EMSGSIZE; 764 return -EMSGSIZE;
764 } 765 }
765 766
766 /* Prepare each packet for sending, and add to outqueue: */ 767 /* Prepare each packet for sending, and add to relevant queue: */
767 skb_queue_walk_safe(list, skb, tmp) { 768 skb_queue_walk_safe(list, skb, tmp) {
768 __skb_unlink(skb, list); 769 __skb_unlink(skb, list);
769 msg = buf_msg(skb); 770 msg = buf_msg(skb);
770 msg_set_word(msg, 2, ((ack << 16) | mod(seqno))); 771 msg_set_seqno(msg, seqno);
772 msg_set_ack(msg, ack);
771 msg_set_bcast_ack(msg, bc_last_in); 773 msg_set_bcast_ack(msg, bc_last_in);
772 774
773 if (skb_queue_len(outqueue) < sndlim) { 775 if (likely(skb_queue_len(transmq) < maxwin)) {
774 __skb_queue_tail(outqueue, skb); 776 __skb_queue_tail(transmq, skb);
775 tipc_bearer_send(net, link->bearer_id, 777 tipc_bearer_send(net, link->bearer_id, skb, addr);
776 skb, addr); 778 link->rcv_unacked = 0;
777 link->next_out = NULL; 779 seqno++;
778 link->unacked_window = 0; 780 continue;
779 } else if (tipc_msg_bundle(outqueue, skb, mtu)) { 781 }
782 if (tipc_msg_bundle(skb_peek_tail(backlogq), skb, mtu)) {
780 link->stats.sent_bundled++; 783 link->stats.sent_bundled++;
781 continue; 784 continue;
782 } else if (tipc_msg_make_bundle(outqueue, skb, mtu, 785 }
783 link->addr)) { 786 if (tipc_msg_make_bundle(&skb, mtu, link->addr)) {
784 link->stats.sent_bundled++; 787 link->stats.sent_bundled++;
785 link->stats.sent_bundles++; 788 link->stats.sent_bundles++;
786 if (!link->next_out)
787 link->next_out = skb_peek_tail(outqueue);
788 } else {
789 __skb_queue_tail(outqueue, skb);
790 if (!link->next_out)
791 link->next_out = skb;
792 } 789 }
790 __skb_queue_tail(backlogq, skb);
793 seqno++; 791 seqno++;
794 } 792 }
795 link->next_out_no = seqno; 793 link->next_out_no = seqno;
@@ -895,14 +893,6 @@ static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf)
895 kfree_skb(buf); 893 kfree_skb(buf);
896} 894}
897 895
898struct sk_buff *tipc_skb_queue_next(const struct sk_buff_head *list,
899 const struct sk_buff *skb)
900{
901 if (skb_queue_is_last(list, skb))
902 return NULL;
903 return skb->next;
904}
905
906/* 896/*
907 * tipc_link_push_packets - push unsent packets to bearer 897 * tipc_link_push_packets - push unsent packets to bearer
908 * 898 *
@@ -911,30 +901,23 @@ struct sk_buff *tipc_skb_queue_next(const struct sk_buff_head *list,
911 * 901 *
912 * Called with node locked 902 * Called with node locked
913 */ 903 */
914void tipc_link_push_packets(struct tipc_link *l_ptr) 904void tipc_link_push_packets(struct tipc_link *link)
915{ 905{
916 struct sk_buff_head *outqueue = &l_ptr->outqueue; 906 struct sk_buff *skb;
917 struct sk_buff *skb = l_ptr->next_out;
918 struct tipc_msg *msg; 907 struct tipc_msg *msg;
919 u32 next, first; 908 unsigned int ack = mod(link->next_in_no - 1);
920 909
921 skb_queue_walk_from(outqueue, skb) { 910 while (skb_queue_len(&link->transmq) < link->window) {
922 msg = buf_msg(skb); 911 skb = __skb_dequeue(&link->backlogq);
923 next = msg_seqno(msg); 912 if (!skb)
924 first = buf_seqno(skb_peek(outqueue));
925
926 if (mod(next - first) < l_ptr->queue_limit[0]) {
927 msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
928 msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
929 if (msg_user(msg) == MSG_BUNDLER)
930 TIPC_SKB_CB(skb)->bundling = false;
931 tipc_bearer_send(l_ptr->owner->net,
932 l_ptr->bearer_id, skb,
933 &l_ptr->media_addr);
934 l_ptr->next_out = tipc_skb_queue_next(outqueue, skb);
935 } else {
936 break; 913 break;
937 } 914 msg = buf_msg(skb);
915 msg_set_ack(msg, ack);
916 msg_set_bcast_ack(msg, link->owner->bclink.last_in);
917 link->rcv_unacked = 0;
918 __skb_queue_tail(&link->transmq, skb);
919 tipc_bearer_send(link->owner->net, link->bearer_id,
920 skb, &link->media_addr);
938 } 921 }
939} 922}
940 923
@@ -1021,8 +1004,8 @@ void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *skb,
1021 l_ptr->stale_count = 1; 1004 l_ptr->stale_count = 1;
1022 } 1005 }
1023 1006
1024 skb_queue_walk_from(&l_ptr->outqueue, skb) { 1007 skb_queue_walk_from(&l_ptr->transmq, skb) {
1025 if (!retransmits || skb == l_ptr->next_out) 1008 if (!retransmits)
1026 break; 1009 break;
1027 msg = buf_msg(skb); 1010 msg = buf_msg(skb);
1028 msg_set_ack(msg, mod(l_ptr->next_in_no - 1)); 1011 msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
@@ -1039,67 +1022,12 @@ static void link_retrieve_defq(struct tipc_link *link,
1039{ 1022{
1040 u32 seq_no; 1023 u32 seq_no;
1041 1024
1042 if (skb_queue_empty(&link->deferred_queue)) 1025 if (skb_queue_empty(&link->deferdq))
1043 return; 1026 return;
1044 1027
1045 seq_no = buf_seqno(skb_peek(&link->deferred_queue)); 1028 seq_no = buf_seqno(skb_peek(&link->deferdq));
1046 if (seq_no == mod(link->next_in_no)) 1029 if (seq_no == mod(link->next_in_no))
1047 skb_queue_splice_tail_init(&link->deferred_queue, list); 1030 skb_queue_splice_tail_init(&link->deferdq, list);
1048}
1049
1050/**
1051 * link_recv_buf_validate - validate basic format of received message
1052 *
1053 * This routine ensures a TIPC message has an acceptable header, and at least
1054 * as much data as the header indicates it should. The routine also ensures
1055 * that the entire message header is stored in the main fragment of the message
1056 * buffer, to simplify future access to message header fields.
1057 *
1058 * Note: Having extra info present in the message header or data areas is OK.
1059 * TIPC will ignore the excess, under the assumption that it is optional info
1060 * introduced by a later release of the protocol.
1061 */
1062static int link_recv_buf_validate(struct sk_buff *buf)
1063{
1064 static u32 min_data_hdr_size[8] = {
1065 SHORT_H_SIZE, MCAST_H_SIZE, NAMED_H_SIZE, BASIC_H_SIZE,
1066 MAX_H_SIZE, MAX_H_SIZE, MAX_H_SIZE, MAX_H_SIZE
1067 };
1068
1069 struct tipc_msg *msg;
1070 u32 tipc_hdr[2];
1071 u32 size;
1072 u32 hdr_size;
1073 u32 min_hdr_size;
1074
1075 /* If this packet comes from the defer queue, the skb has already
1076 * been validated
1077 */
1078 if (unlikely(TIPC_SKB_CB(buf)->deferred))
1079 return 1;
1080
1081 if (unlikely(buf->len < MIN_H_SIZE))
1082 return 0;
1083
1084 msg = skb_header_pointer(buf, 0, sizeof(tipc_hdr), tipc_hdr);
1085 if (msg == NULL)
1086 return 0;
1087
1088 if (unlikely(msg_version(msg) != TIPC_VERSION))
1089 return 0;
1090
1091 size = msg_size(msg);
1092 hdr_size = msg_hdr_sz(msg);
1093 min_hdr_size = msg_isdata(msg) ?
1094 min_data_hdr_size[msg_type(msg)] : INT_H_SIZE;
1095
1096 if (unlikely((hdr_size < min_hdr_size) ||
1097 (size < hdr_size) ||
1098 (buf->len < size) ||
1099 (size - hdr_size > TIPC_MAX_USER_MSG_SIZE)))
1100 return 0;
1101
1102 return pskb_may_pull(buf, hdr_size);
1103} 1031}
1104 1032
1105/** 1033/**
@@ -1127,16 +1055,11 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b_ptr)
1127 1055
1128 while ((skb = __skb_dequeue(&head))) { 1056 while ((skb = __skb_dequeue(&head))) {
1129 /* Ensure message is well-formed */ 1057 /* Ensure message is well-formed */
1130 if (unlikely(!link_recv_buf_validate(skb))) 1058 if (unlikely(!tipc_msg_validate(skb)))
1131 goto discard;
1132
1133 /* Ensure message data is a single contiguous unit */
1134 if (unlikely(skb_linearize(skb)))
1135 goto discard; 1059 goto discard;
1136 1060
1137 /* Handle arrival of a non-unicast link message */ 1061 /* Handle arrival of a non-unicast link message */
1138 msg = buf_msg(skb); 1062 msg = buf_msg(skb);
1139
1140 if (unlikely(msg_non_seq(msg))) { 1063 if (unlikely(msg_non_seq(msg))) {
1141 if (msg_user(msg) == LINK_CONFIG) 1064 if (msg_user(msg) == LINK_CONFIG)
1142 tipc_disc_rcv(net, skb, b_ptr); 1065 tipc_disc_rcv(net, skb, b_ptr);
@@ -1177,21 +1100,20 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b_ptr)
1177 ackd = msg_ack(msg); 1100 ackd = msg_ack(msg);
1178 1101
1179 /* Release acked messages */ 1102 /* Release acked messages */
1180 if (n_ptr->bclink.recv_permitted) 1103 if (unlikely(n_ptr->bclink.acked != msg_bcast_ack(msg)))
1181 tipc_bclink_acknowledge(n_ptr, msg_bcast_ack(msg)); 1104 tipc_bclink_acknowledge(n_ptr, msg_bcast_ack(msg));
1182 1105
1183 released = 0; 1106 released = 0;
1184 skb_queue_walk_safe(&l_ptr->outqueue, skb1, tmp) { 1107 skb_queue_walk_safe(&l_ptr->transmq, skb1, tmp) {
1185 if (skb1 == l_ptr->next_out || 1108 if (more(buf_seqno(skb1), ackd))
1186 more(buf_seqno(skb1), ackd))
1187 break; 1109 break;
1188 __skb_unlink(skb1, &l_ptr->outqueue); 1110 __skb_unlink(skb1, &l_ptr->transmq);
1189 kfree_skb(skb1); 1111 kfree_skb(skb1);
1190 released = 1; 1112 released = 1;
1191 } 1113 }
1192 1114
1193 /* Try sending any messages link endpoint has pending */ 1115 /* Try sending any messages link endpoint has pending */
1194 if (unlikely(l_ptr->next_out)) 1116 if (unlikely(skb_queue_len(&l_ptr->backlogq)))
1195 tipc_link_push_packets(l_ptr); 1117 tipc_link_push_packets(l_ptr);
1196 1118
1197 if (released && !skb_queue_empty(&l_ptr->wakeupq)) 1119 if (released && !skb_queue_empty(&l_ptr->wakeupq))
@@ -1226,10 +1148,9 @@ void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b_ptr)
1226 goto unlock; 1148 goto unlock;
1227 } 1149 }
1228 l_ptr->next_in_no++; 1150 l_ptr->next_in_no++;
1229 if (unlikely(!skb_queue_empty(&l_ptr->deferred_queue))) 1151 if (unlikely(!skb_queue_empty(&l_ptr->deferdq)))
1230 link_retrieve_defq(l_ptr, &head); 1152 link_retrieve_defq(l_ptr, &head);
1231 1153 if (unlikely(++l_ptr->rcv_unacked >= TIPC_MIN_LINK_WIN)) {
1232 if (unlikely(++l_ptr->unacked_window >= TIPC_MIN_LINK_WIN)) {
1233 l_ptr->stats.sent_acks++; 1154 l_ptr->stats.sent_acks++;
1234 tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0, 0); 1155 tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);
1235 } 1156 }
@@ -1396,10 +1317,9 @@ static void link_handle_out_of_seq_msg(struct tipc_link *l_ptr,
1396 return; 1317 return;
1397 } 1318 }
1398 1319
1399 if (tipc_link_defer_pkt(&l_ptr->deferred_queue, buf)) { 1320 if (tipc_link_defer_pkt(&l_ptr->deferdq, buf)) {
1400 l_ptr->stats.deferred_recv++; 1321 l_ptr->stats.deferred_recv++;
1401 TIPC_SKB_CB(buf)->deferred = true; 1322 if ((skb_queue_len(&l_ptr->deferdq) % TIPC_MIN_LINK_WIN) == 1)
1402 if ((skb_queue_len(&l_ptr->deferred_queue) % 16) == 1)
1403 tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0, 0); 1323 tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);
1404 } else { 1324 } else {
1405 l_ptr->stats.duplicates++; 1325 l_ptr->stats.duplicates++;
@@ -1436,11 +1356,11 @@ void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int probe_msg,
1436 1356
1437 if (!tipc_link_is_up(l_ptr)) 1357 if (!tipc_link_is_up(l_ptr))
1438 return; 1358 return;
1439 if (l_ptr->next_out) 1359 if (skb_queue_len(&l_ptr->backlogq))
1440 next_sent = buf_seqno(l_ptr->next_out); 1360 next_sent = buf_seqno(skb_peek(&l_ptr->backlogq));
1441 msg_set_next_sent(msg, next_sent); 1361 msg_set_next_sent(msg, next_sent);
1442 if (!skb_queue_empty(&l_ptr->deferred_queue)) { 1362 if (!skb_queue_empty(&l_ptr->deferdq)) {
1443 u32 rec = buf_seqno(skb_peek(&l_ptr->deferred_queue)); 1363 u32 rec = buf_seqno(skb_peek(&l_ptr->deferdq));
1444 gap = mod(rec - mod(l_ptr->next_in_no)); 1364 gap = mod(rec - mod(l_ptr->next_in_no));
1445 } 1365 }
1446 msg_set_seq_gap(msg, gap); 1366 msg_set_seq_gap(msg, gap);
@@ -1492,10 +1412,9 @@ void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int probe_msg,
1492 1412
1493 skb_copy_to_linear_data(buf, msg, sizeof(l_ptr->proto_msg)); 1413 skb_copy_to_linear_data(buf, msg, sizeof(l_ptr->proto_msg));
1494 buf->priority = TC_PRIO_CONTROL; 1414 buf->priority = TC_PRIO_CONTROL;
1495
1496 tipc_bearer_send(l_ptr->owner->net, l_ptr->bearer_id, buf, 1415 tipc_bearer_send(l_ptr->owner->net, l_ptr->bearer_id, buf,
1497 &l_ptr->media_addr); 1416 &l_ptr->media_addr);
1498 l_ptr->unacked_window = 0; 1417 l_ptr->rcv_unacked = 0;
1499 kfree_skb(buf); 1418 kfree_skb(buf);
1500} 1419}
1501 1420
@@ -1630,7 +1549,7 @@ static void tipc_link_proto_rcv(struct tipc_link *l_ptr,
1630 } 1549 }
1631 if (msg_seq_gap(msg)) { 1550 if (msg_seq_gap(msg)) {
1632 l_ptr->stats.recv_nacks++; 1551 l_ptr->stats.recv_nacks++;
1633 tipc_link_retransmit(l_ptr, skb_peek(&l_ptr->outqueue), 1552 tipc_link_retransmit(l_ptr, skb_peek(&l_ptr->transmq),
1634 msg_seq_gap(msg)); 1553 msg_seq_gap(msg));
1635 } 1554 }
1636 break; 1555 break;
@@ -1677,7 +1596,7 @@ static void tipc_link_tunnel_xmit(struct tipc_link *l_ptr,
1677 */ 1596 */
1678void tipc_link_failover_send_queue(struct tipc_link *l_ptr) 1597void tipc_link_failover_send_queue(struct tipc_link *l_ptr)
1679{ 1598{
1680 u32 msgcount = skb_queue_len(&l_ptr->outqueue); 1599 int msgcount;
1681 struct tipc_link *tunnel = l_ptr->owner->active_links[0]; 1600 struct tipc_link *tunnel = l_ptr->owner->active_links[0];
1682 struct tipc_msg tunnel_hdr; 1601 struct tipc_msg tunnel_hdr;
1683 struct sk_buff *skb; 1602 struct sk_buff *skb;
@@ -1688,10 +1607,12 @@ void tipc_link_failover_send_queue(struct tipc_link *l_ptr)
1688 1607
1689 tipc_msg_init(link_own_addr(l_ptr), &tunnel_hdr, CHANGEOVER_PROTOCOL, 1608 tipc_msg_init(link_own_addr(l_ptr), &tunnel_hdr, CHANGEOVER_PROTOCOL,
1690 ORIGINAL_MSG, INT_H_SIZE, l_ptr->addr); 1609 ORIGINAL_MSG, INT_H_SIZE, l_ptr->addr);
1610 skb_queue_splice_tail_init(&l_ptr->backlogq, &l_ptr->transmq);
1611 msgcount = skb_queue_len(&l_ptr->transmq);
1691 msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id); 1612 msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id);
1692 msg_set_msgcnt(&tunnel_hdr, msgcount); 1613 msg_set_msgcnt(&tunnel_hdr, msgcount);
1693 1614
1694 if (skb_queue_empty(&l_ptr->outqueue)) { 1615 if (skb_queue_empty(&l_ptr->transmq)) {
1695 skb = tipc_buf_acquire(INT_H_SIZE); 1616 skb = tipc_buf_acquire(INT_H_SIZE);
1696 if (skb) { 1617 if (skb) {
1697 skb_copy_to_linear_data(skb, &tunnel_hdr, INT_H_SIZE); 1618 skb_copy_to_linear_data(skb, &tunnel_hdr, INT_H_SIZE);
@@ -1707,7 +1628,7 @@ void tipc_link_failover_send_queue(struct tipc_link *l_ptr)
1707 split_bundles = (l_ptr->owner->active_links[0] != 1628 split_bundles = (l_ptr->owner->active_links[0] !=
1708 l_ptr->owner->active_links[1]); 1629 l_ptr->owner->active_links[1]);
1709 1630
1710 skb_queue_walk(&l_ptr->outqueue, skb) { 1631 skb_queue_walk(&l_ptr->transmq, skb) {
1711 struct tipc_msg *msg = buf_msg(skb); 1632 struct tipc_msg *msg = buf_msg(skb);
1712 1633
1713 if ((msg_user(msg) == MSG_BUNDLER) && split_bundles) { 1634 if ((msg_user(msg) == MSG_BUNDLER) && split_bundles) {
@@ -1738,80 +1659,66 @@ void tipc_link_failover_send_queue(struct tipc_link *l_ptr)
1738 * and sequence order is preserved per sender/receiver socket pair. 1659 * and sequence order is preserved per sender/receiver socket pair.
1739 * Owner node is locked. 1660 * Owner node is locked.
1740 */ 1661 */
1741void tipc_link_dup_queue_xmit(struct tipc_link *l_ptr, 1662void tipc_link_dup_queue_xmit(struct tipc_link *link,
1742 struct tipc_link *tunnel) 1663 struct tipc_link *tnl)
1743{ 1664{
1744 struct sk_buff *skb; 1665 struct sk_buff *skb;
1745 struct tipc_msg tunnel_hdr; 1666 struct tipc_msg tnl_hdr;
1746 1667 struct sk_buff_head *queue = &link->transmq;
1747 tipc_msg_init(link_own_addr(l_ptr), &tunnel_hdr, CHANGEOVER_PROTOCOL, 1668 int mcnt;
1748 DUPLICATE_MSG, INT_H_SIZE, l_ptr->addr); 1669
1749 msg_set_msgcnt(&tunnel_hdr, skb_queue_len(&l_ptr->outqueue)); 1670 tipc_msg_init(link_own_addr(link), &tnl_hdr, CHANGEOVER_PROTOCOL,
1750 msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id); 1671 DUPLICATE_MSG, INT_H_SIZE, link->addr);
1751 skb_queue_walk(&l_ptr->outqueue, skb) { 1672 mcnt = skb_queue_len(&link->transmq) + skb_queue_len(&link->backlogq);
1673 msg_set_msgcnt(&tnl_hdr, mcnt);
1674 msg_set_bearer_id(&tnl_hdr, link->peer_bearer_id);
1675
1676tunnel_queue:
1677 skb_queue_walk(queue, skb) {
1752 struct sk_buff *outskb; 1678 struct sk_buff *outskb;
1753 struct tipc_msg *msg = buf_msg(skb); 1679 struct tipc_msg *msg = buf_msg(skb);
1754 u32 length = msg_size(msg); 1680 u32 len = msg_size(msg);
1755 1681
1756 if (msg_user(msg) == MSG_BUNDLER) 1682 msg_set_ack(msg, mod(link->next_in_no - 1));
1757 msg_set_type(msg, CLOSED_MSG); 1683 msg_set_bcast_ack(msg, link->owner->bclink.last_in);
1758 msg_set_ack(msg, mod(l_ptr->next_in_no - 1)); /* Update */ 1684 msg_set_size(&tnl_hdr, len + INT_H_SIZE);
1759 msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in); 1685 outskb = tipc_buf_acquire(len + INT_H_SIZE);
1760 msg_set_size(&tunnel_hdr, length + INT_H_SIZE);
1761 outskb = tipc_buf_acquire(length + INT_H_SIZE);
1762 if (outskb == NULL) { 1686 if (outskb == NULL) {
1763 pr_warn("%sunable to send duplicate msg\n", 1687 pr_warn("%sunable to send duplicate msg\n",
1764 link_co_err); 1688 link_co_err);
1765 return; 1689 return;
1766 } 1690 }
1767 skb_copy_to_linear_data(outskb, &tunnel_hdr, INT_H_SIZE); 1691 skb_copy_to_linear_data(outskb, &tnl_hdr, INT_H_SIZE);
1768 skb_copy_to_linear_data_offset(outskb, INT_H_SIZE, skb->data, 1692 skb_copy_to_linear_data_offset(outskb, INT_H_SIZE,
1769 length); 1693 skb->data, len);
1770 __tipc_link_xmit_skb(tunnel, outskb); 1694 __tipc_link_xmit_skb(tnl, outskb);
1771 if (!tipc_link_is_up(l_ptr)) 1695 if (!tipc_link_is_up(link))
1772 return; 1696 return;
1773 } 1697 }
1774} 1698 if (queue == &link->backlogq)
1775 1699 return;
1776/** 1700 queue = &link->backlogq;
1777 * buf_extract - extracts embedded TIPC message from another message 1701 goto tunnel_queue;
1778 * @skb: encapsulating message buffer
1779 * @from_pos: offset to extract from
1780 *
1781 * Returns a new message buffer containing an embedded message. The
1782 * encapsulating buffer is left unchanged.
1783 */
1784static struct sk_buff *buf_extract(struct sk_buff *skb, u32 from_pos)
1785{
1786 struct tipc_msg *msg = (struct tipc_msg *)(skb->data + from_pos);
1787 u32 size = msg_size(msg);
1788 struct sk_buff *eb;
1789
1790 eb = tipc_buf_acquire(size);
1791 if (eb)
1792 skb_copy_to_linear_data(eb, msg, size);
1793 return eb;
1794} 1702}
1795 1703
1796/* tipc_link_dup_rcv(): Receive a tunnelled DUPLICATE_MSG packet. 1704/* tipc_link_dup_rcv(): Receive a tunnelled DUPLICATE_MSG packet.
1797 * Owner node is locked. 1705 * Owner node is locked.
1798 */ 1706 */
1799static void tipc_link_dup_rcv(struct tipc_link *l_ptr, 1707static void tipc_link_dup_rcv(struct tipc_link *link,
1800 struct sk_buff *t_buf) 1708 struct sk_buff *skb)
1801{ 1709{
1802 struct sk_buff *buf; 1710 struct sk_buff *iskb;
1711 int pos = 0;
1803 1712
1804 if (!tipc_link_is_up(l_ptr)) 1713 if (!tipc_link_is_up(link))
1805 return; 1714 return;
1806 1715
1807 buf = buf_extract(t_buf, INT_H_SIZE); 1716 if (!tipc_msg_extract(skb, &iskb, &pos)) {
1808 if (buf == NULL) {
1809 pr_warn("%sfailed to extract inner dup pkt\n", link_co_err); 1717 pr_warn("%sfailed to extract inner dup pkt\n", link_co_err);
1810 return; 1718 return;
1811 } 1719 }
1812 1720 /* Append buffer to deferred queue, if applicable: */
1813 /* Add buffer to deferred queue, if applicable: */ 1721 link_handle_out_of_seq_msg(link, iskb);
1814 link_handle_out_of_seq_msg(l_ptr, buf);
1815} 1722}
1816 1723
1817/* tipc_link_failover_rcv(): Receive a tunnelled ORIGINAL_MSG packet 1724/* tipc_link_failover_rcv(): Receive a tunnelled ORIGINAL_MSG packet
@@ -1823,6 +1730,7 @@ static struct sk_buff *tipc_link_failover_rcv(struct tipc_link *l_ptr,
1823 struct tipc_msg *t_msg = buf_msg(t_buf); 1730 struct tipc_msg *t_msg = buf_msg(t_buf);
1824 struct sk_buff *buf = NULL; 1731 struct sk_buff *buf = NULL;
1825 struct tipc_msg *msg; 1732 struct tipc_msg *msg;
1733 int pos = 0;
1826 1734
1827 if (tipc_link_is_up(l_ptr)) 1735 if (tipc_link_is_up(l_ptr))
1828 tipc_link_reset(l_ptr); 1736 tipc_link_reset(l_ptr);
@@ -1834,8 +1742,7 @@ static struct sk_buff *tipc_link_failover_rcv(struct tipc_link *l_ptr,
1834 /* Should there be an inner packet? */ 1742 /* Should there be an inner packet? */
1835 if (l_ptr->exp_msg_count) { 1743 if (l_ptr->exp_msg_count) {
1836 l_ptr->exp_msg_count--; 1744 l_ptr->exp_msg_count--;
1837 buf = buf_extract(t_buf, INT_H_SIZE); 1745 if (!tipc_msg_extract(t_buf, &buf, &pos)) {
1838 if (buf == NULL) {
1839 pr_warn("%sno inner failover pkt\n", link_co_err); 1746 pr_warn("%sno inner failover pkt\n", link_co_err);
1840 goto exit; 1747 goto exit;
1841 } 1748 }
@@ -1903,23 +1810,16 @@ static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tol)
1903 l_ptr->abort_limit = tol / (jiffies_to_msecs(l_ptr->cont_intv) / 4); 1810 l_ptr->abort_limit = tol / (jiffies_to_msecs(l_ptr->cont_intv) / 4);
1904} 1811}
1905 1812
1906void tipc_link_set_queue_limits(struct tipc_link *l_ptr, u32 window) 1813void tipc_link_set_queue_limits(struct tipc_link *l, u32 win)
1907{ 1814{
1908 /* Data messages from this node, inclusive FIRST_FRAGM */ 1815 int max_bulk = TIPC_MAX_PUBLICATIONS / (l->max_pkt / ITEM_SIZE);
1909 l_ptr->queue_limit[TIPC_LOW_IMPORTANCE] = window; 1816
1910 l_ptr->queue_limit[TIPC_MEDIUM_IMPORTANCE] = (window / 3) * 4; 1817 l->window = win;
1911 l_ptr->queue_limit[TIPC_HIGH_IMPORTANCE] = (window / 3) * 5; 1818 l->queue_limit[TIPC_LOW_IMPORTANCE] = win / 2;
1912 l_ptr->queue_limit[TIPC_CRITICAL_IMPORTANCE] = (window / 3) * 6; 1819 l->queue_limit[TIPC_MEDIUM_IMPORTANCE] = win;
1913 /* Transiting data messages,inclusive FIRST_FRAGM */ 1820 l->queue_limit[TIPC_HIGH_IMPORTANCE] = win / 2 * 3;
1914 l_ptr->queue_limit[TIPC_LOW_IMPORTANCE + 4] = 300; 1821 l->queue_limit[TIPC_CRITICAL_IMPORTANCE] = win * 2;
1915 l_ptr->queue_limit[TIPC_MEDIUM_IMPORTANCE + 4] = 600; 1822 l->queue_limit[TIPC_SYSTEM_IMPORTANCE] = max_bulk;
1916 l_ptr->queue_limit[TIPC_HIGH_IMPORTANCE + 4] = 900;
1917 l_ptr->queue_limit[TIPC_CRITICAL_IMPORTANCE + 4] = 1200;
1918 l_ptr->queue_limit[CONN_MANAGER] = 1200;
1919 l_ptr->queue_limit[CHANGEOVER_PROTOCOL] = 2500;
1920 l_ptr->queue_limit[NAME_DISTRIBUTOR] = 3000;
1921 /* FRAGMENT and LAST_FRAGMENT packets */
1922 l_ptr->queue_limit[MSG_FRAGMENTER] = 4000;
1923} 1823}
1924 1824
1925/* tipc_link_find_owner - locate owner node of link by link's name 1825/* tipc_link_find_owner - locate owner node of link by link's name
diff --git a/net/tipc/link.h b/net/tipc/link.h
index 7aeb52092bf3..eec3ecf2d450 100644
--- a/net/tipc/link.h
+++ b/net/tipc/link.h
@@ -124,7 +124,8 @@ struct tipc_stats {
124 * @max_pkt: current maximum packet size for this link 124 * @max_pkt: current maximum packet size for this link
125 * @max_pkt_target: desired maximum packet size for this link 125 * @max_pkt_target: desired maximum packet size for this link
126 * @max_pkt_probes: # of probes based on current (max_pkt, max_pkt_target) 126 * @max_pkt_probes: # of probes based on current (max_pkt, max_pkt_target)
127 * @outqueue: outbound message queue 127 * @transmitq: queue for sent, non-acked messages
128 * @backlogq: queue for messages waiting to be sent
128 * @next_out_no: next sequence number to use for outbound messages 129 * @next_out_no: next sequence number to use for outbound messages
129 * @last_retransmitted: sequence number of most recently retransmitted message 130 * @last_retransmitted: sequence number of most recently retransmitted message
130 * @stale_count: # of identical retransmit requests made by peer 131 * @stale_count: # of identical retransmit requests made by peer
@@ -177,20 +178,21 @@ struct tipc_link {
177 u32 max_pkt_probes; 178 u32 max_pkt_probes;
178 179
179 /* Sending */ 180 /* Sending */
180 struct sk_buff_head outqueue; 181 struct sk_buff_head transmq;
182 struct sk_buff_head backlogq;
181 u32 next_out_no; 183 u32 next_out_no;
184 u32 window;
182 u32 last_retransmitted; 185 u32 last_retransmitted;
183 u32 stale_count; 186 u32 stale_count;
184 187
185 /* Reception */ 188 /* Reception */
186 u32 next_in_no; 189 u32 next_in_no;
187 struct sk_buff_head deferred_queue; 190 u32 rcv_unacked;
188 u32 unacked_window; 191 struct sk_buff_head deferdq;
189 struct sk_buff_head inputq; 192 struct sk_buff_head inputq;
190 struct sk_buff_head namedq; 193 struct sk_buff_head namedq;
191 194
192 /* Congestion handling */ 195 /* Congestion handling */
193 struct sk_buff *next_out;
194 struct sk_buff_head wakeupq; 196 struct sk_buff_head wakeupq;
195 197
196 /* Fragmentation/reassembly */ 198 /* Fragmentation/reassembly */
@@ -302,9 +304,4 @@ static inline int link_reset_reset(struct tipc_link *l_ptr)
302 return l_ptr->state == RESET_RESET; 304 return l_ptr->state == RESET_RESET;
303} 305}
304 306
305static inline int link_congested(struct tipc_link *l_ptr)
306{
307 return skb_queue_len(&l_ptr->outqueue) >= l_ptr->queue_limit[0];
308}
309
310#endif 307#endif
diff --git a/net/tipc/msg.c b/net/tipc/msg.c
index b6eb90cd3ef7..0c6dad8180a0 100644
--- a/net/tipc/msg.c
+++ b/net/tipc/msg.c
@@ -1,7 +1,7 @@
1/* 1/*
2 * net/tipc/msg.c: TIPC message header routines 2 * net/tipc/msg.c: TIPC message header routines
3 * 3 *
4 * Copyright (c) 2000-2006, 2014, Ericsson AB 4 * Copyright (c) 2000-2006, 2014-2015, Ericsson AB
5 * Copyright (c) 2005, 2010-2011, Wind River Systems 5 * Copyright (c) 2005, 2010-2011, Wind River Systems
6 * All rights reserved. 6 * All rights reserved.
7 * 7 *
@@ -165,6 +165,9 @@ int tipc_buf_append(struct sk_buff **headbuf, struct sk_buff **buf)
165 } 165 }
166 166
167 if (fragid == LAST_FRAGMENT) { 167 if (fragid == LAST_FRAGMENT) {
168 TIPC_SKB_CB(head)->validated = false;
169 if (unlikely(!tipc_msg_validate(head)))
170 goto err;
168 *buf = head; 171 *buf = head;
169 TIPC_SKB_CB(head)->tail = NULL; 172 TIPC_SKB_CB(head)->tail = NULL;
170 *headbuf = NULL; 173 *headbuf = NULL;
@@ -172,7 +175,6 @@ int tipc_buf_append(struct sk_buff **headbuf, struct sk_buff **buf)
172 } 175 }
173 *buf = NULL; 176 *buf = NULL;
174 return 0; 177 return 0;
175
176err: 178err:
177 pr_warn_ratelimited("Unable to build fragment list\n"); 179 pr_warn_ratelimited("Unable to build fragment list\n");
178 kfree_skb(*buf); 180 kfree_skb(*buf);
@@ -181,6 +183,48 @@ err:
181 return 0; 183 return 0;
182} 184}
183 185
186/* tipc_msg_validate - validate basic format of received message
187 *
188 * This routine ensures a TIPC message has an acceptable header, and at least
189 * as much data as the header indicates it should. The routine also ensures
190 * that the entire message header is stored in the main fragment of the message
191 * buffer, to simplify future access to message header fields.
192 *
193 * Note: Having extra info present in the message header or data areas is OK.
194 * TIPC will ignore the excess, under the assumption that it is optional info
195 * introduced by a later release of the protocol.
196 */
197bool tipc_msg_validate(struct sk_buff *skb)
198{
199 struct tipc_msg *msg;
200 int msz, hsz;
201
202 if (unlikely(TIPC_SKB_CB(skb)->validated))
203 return true;
204 if (unlikely(!pskb_may_pull(skb, MIN_H_SIZE)))
205 return false;
206
207 hsz = msg_hdr_sz(buf_msg(skb));
208 if (unlikely(hsz < MIN_H_SIZE) || (hsz > MAX_H_SIZE))
209 return false;
210 if (unlikely(!pskb_may_pull(skb, hsz)))
211 return false;
212
213 msg = buf_msg(skb);
214 if (unlikely(msg_version(msg) != TIPC_VERSION))
215 return false;
216
217 msz = msg_size(msg);
218 if (unlikely(msz < hsz))
219 return false;
220 if (unlikely((msz - hsz) > TIPC_MAX_USER_MSG_SIZE))
221 return false;
222 if (unlikely(skb->len < msz))
223 return false;
224
225 TIPC_SKB_CB(skb)->validated = true;
226 return true;
227}
184 228
185/** 229/**
186 * tipc_msg_build - create buffer chain containing specified header and data 230 * tipc_msg_build - create buffer chain containing specified header and data
@@ -228,6 +272,7 @@ int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m,
228 FIRST_FRAGMENT, INT_H_SIZE, msg_destnode(mhdr)); 272 FIRST_FRAGMENT, INT_H_SIZE, msg_destnode(mhdr));
229 msg_set_size(&pkthdr, pktmax); 273 msg_set_size(&pkthdr, pktmax);
230 msg_set_fragm_no(&pkthdr, pktno); 274 msg_set_fragm_no(&pkthdr, pktno);
275 msg_set_importance(&pkthdr, msg_importance(mhdr));
231 276
232 /* Prepare first fragment */ 277 /* Prepare first fragment */
233 skb = tipc_buf_acquire(pktmax); 278 skb = tipc_buf_acquire(pktmax);
@@ -286,33 +331,36 @@ error:
286 331
287/** 332/**
288 * tipc_msg_bundle(): Append contents of a buffer to tail of an existing one 333 * tipc_msg_bundle(): Append contents of a buffer to tail of an existing one
289 * @list: the buffer chain of the existing buffer ("bundle") 334 * @bskb: the buffer to append to ("bundle")
290 * @skb: buffer to be appended 335 * @skb: buffer to be appended
291 * @mtu: max allowable size for the bundle buffer 336 * @mtu: max allowable size for the bundle buffer
292 * Consumes buffer if successful 337 * Consumes buffer if successful
293 * Returns true if bundling could be performed, otherwise false 338 * Returns true if bundling could be performed, otherwise false
294 */ 339 */
295bool tipc_msg_bundle(struct sk_buff_head *list, struct sk_buff *skb, u32 mtu) 340bool tipc_msg_bundle(struct sk_buff *bskb, struct sk_buff *skb, u32 mtu)
296{ 341{
297 struct sk_buff *bskb = skb_peek_tail(list); 342 struct tipc_msg *bmsg;
298 struct tipc_msg *bmsg = buf_msg(bskb);
299 struct tipc_msg *msg = buf_msg(skb); 343 struct tipc_msg *msg = buf_msg(skb);
300 unsigned int bsz = msg_size(bmsg); 344 unsigned int bsz;
301 unsigned int msz = msg_size(msg); 345 unsigned int msz = msg_size(msg);
302 u32 start = align(bsz); 346 u32 start, pad;
303 u32 max = mtu - INT_H_SIZE; 347 u32 max = mtu - INT_H_SIZE;
304 u32 pad = start - bsz;
305 348
306 if (likely(msg_user(msg) == MSG_FRAGMENTER)) 349 if (likely(msg_user(msg) == MSG_FRAGMENTER))
307 return false; 350 return false;
351 if (!bskb)
352 return false;
353 bmsg = buf_msg(bskb);
354 bsz = msg_size(bmsg);
355 start = align(bsz);
356 pad = start - bsz;
357
308 if (unlikely(msg_user(msg) == CHANGEOVER_PROTOCOL)) 358 if (unlikely(msg_user(msg) == CHANGEOVER_PROTOCOL))
309 return false; 359 return false;
310 if (unlikely(msg_user(msg) == BCAST_PROTOCOL)) 360 if (unlikely(msg_user(msg) == BCAST_PROTOCOL))
311 return false; 361 return false;
312 if (likely(msg_user(bmsg) != MSG_BUNDLER)) 362 if (likely(msg_user(bmsg) != MSG_BUNDLER))
313 return false; 363 return false;
314 if (likely(!TIPC_SKB_CB(bskb)->bundling))
315 return false;
316 if (unlikely(skb_tailroom(bskb) < (pad + msz))) 364 if (unlikely(skb_tailroom(bskb) < (pad + msz)))
317 return false; 365 return false;
318 if (unlikely(max < (start + msz))) 366 if (unlikely(max < (start + msz)))
@@ -328,34 +376,40 @@ bool tipc_msg_bundle(struct sk_buff_head *list, struct sk_buff *skb, u32 mtu)
328 376
329/** 377/**
330 * tipc_msg_extract(): extract bundled inner packet from buffer 378 * tipc_msg_extract(): extract bundled inner packet from buffer
331 * @skb: linear outer buffer, to be extracted from. 379 * @skb: buffer to be extracted from.
332 * @iskb: extracted inner buffer, to be returned 380 * @iskb: extracted inner buffer, to be returned
333 * @pos: position of msg to be extracted. Returns with pointer of next msg 381 * @pos: position in outer message of msg to be extracted.
382 * Returns position of next msg
334 * Consumes outer buffer when last packet extracted 383 * Consumes outer buffer when last packet extracted
335 * Returns true when when there is an extracted buffer, otherwise false 384 * Returns true when when there is an extracted buffer, otherwise false
336 */ 385 */
337bool tipc_msg_extract(struct sk_buff *skb, struct sk_buff **iskb, int *pos) 386bool tipc_msg_extract(struct sk_buff *skb, struct sk_buff **iskb, int *pos)
338{ 387{
339 struct tipc_msg *msg = buf_msg(skb); 388 struct tipc_msg *msg;
340 int imsz; 389 int imsz, offset;
341 struct tipc_msg *imsg = (struct tipc_msg *)(msg_data(msg) + *pos); 390
391 *iskb = NULL;
392 if (unlikely(skb_linearize(skb)))
393 goto none;
342 394
343 /* Is there space left for shortest possible message? */ 395 msg = buf_msg(skb);
344 if (*pos > (msg_data_sz(msg) - SHORT_H_SIZE)) 396 offset = msg_hdr_sz(msg) + *pos;
397 if (unlikely(offset > (msg_size(msg) - MIN_H_SIZE)))
345 goto none; 398 goto none;
346 imsz = msg_size(imsg);
347 399
348 /* Is there space left for current message ? */ 400 *iskb = skb_clone(skb, GFP_ATOMIC);
349 if ((*pos + imsz) > msg_data_sz(msg)) 401 if (unlikely(!*iskb))
350 goto none; 402 goto none;
351 *iskb = tipc_buf_acquire(imsz); 403 skb_pull(*iskb, offset);
352 if (!*iskb) 404 imsz = msg_size(buf_msg(*iskb));
405 skb_trim(*iskb, imsz);
406 if (unlikely(!tipc_msg_validate(*iskb)))
353 goto none; 407 goto none;
354 skb_copy_to_linear_data(*iskb, imsg, imsz);
355 *pos += align(imsz); 408 *pos += align(imsz);
356 return true; 409 return true;
357none: 410none:
358 kfree_skb(skb); 411 kfree_skb(skb);
412 kfree_skb(*iskb);
359 *iskb = NULL; 413 *iskb = NULL;
360 return false; 414 return false;
361} 415}
@@ -369,12 +423,11 @@ none:
369 * Replaces buffer if successful 423 * Replaces buffer if successful
370 * Returns true if success, otherwise false 424 * Returns true if success, otherwise false
371 */ 425 */
372bool tipc_msg_make_bundle(struct sk_buff_head *list, 426bool tipc_msg_make_bundle(struct sk_buff **skb, u32 mtu, u32 dnode)
373 struct sk_buff *skb, u32 mtu, u32 dnode)
374{ 427{
375 struct sk_buff *bskb; 428 struct sk_buff *bskb;
376 struct tipc_msg *bmsg; 429 struct tipc_msg *bmsg;
377 struct tipc_msg *msg = buf_msg(skb); 430 struct tipc_msg *msg = buf_msg(*skb);
378 u32 msz = msg_size(msg); 431 u32 msz = msg_size(msg);
379 u32 max = mtu - INT_H_SIZE; 432 u32 max = mtu - INT_H_SIZE;
380 433
@@ -398,9 +451,9 @@ bool tipc_msg_make_bundle(struct sk_buff_head *list,
398 msg_set_seqno(bmsg, msg_seqno(msg)); 451 msg_set_seqno(bmsg, msg_seqno(msg));
399 msg_set_ack(bmsg, msg_ack(msg)); 452 msg_set_ack(bmsg, msg_ack(msg));
400 msg_set_bcast_ack(bmsg, msg_bcast_ack(msg)); 453 msg_set_bcast_ack(bmsg, msg_bcast_ack(msg));
401 TIPC_SKB_CB(bskb)->bundling = true; 454 tipc_msg_bundle(bskb, *skb, mtu);
402 __skb_queue_tail(list, bskb); 455 *skb = bskb;
403 return tipc_msg_bundle(list, skb, mtu); 456 return true;
404} 457}
405 458
406/** 459/**
@@ -415,21 +468,17 @@ bool tipc_msg_reverse(u32 own_addr, struct sk_buff *buf, u32 *dnode,
415 int err) 468 int err)
416{ 469{
417 struct tipc_msg *msg = buf_msg(buf); 470 struct tipc_msg *msg = buf_msg(buf);
418 uint imp = msg_importance(msg);
419 struct tipc_msg ohdr; 471 struct tipc_msg ohdr;
420 uint rdsz = min_t(uint, msg_data_sz(msg), MAX_FORWARD_SIZE); 472 uint rdsz = min_t(uint, msg_data_sz(msg), MAX_FORWARD_SIZE);
421 473
422 if (skb_linearize(buf)) 474 if (skb_linearize(buf))
423 goto exit; 475 goto exit;
476 msg = buf_msg(buf);
424 if (msg_dest_droppable(msg)) 477 if (msg_dest_droppable(msg))
425 goto exit; 478 goto exit;
426 if (msg_errcode(msg)) 479 if (msg_errcode(msg))
427 goto exit; 480 goto exit;
428
429 memcpy(&ohdr, msg, msg_hdr_sz(msg)); 481 memcpy(&ohdr, msg, msg_hdr_sz(msg));
430 imp = min_t(uint, imp + 1, TIPC_CRITICAL_IMPORTANCE);
431 if (msg_isdata(msg))
432 msg_set_importance(msg, imp);
433 msg_set_errcode(msg, err); 482 msg_set_errcode(msg, err);
434 msg_set_origport(msg, msg_destport(&ohdr)); 483 msg_set_origport(msg, msg_destport(&ohdr));
435 msg_set_destport(msg, msg_origport(&ohdr)); 484 msg_set_destport(msg, msg_origport(&ohdr));
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index fa167846d1ab..bd3969a80dd4 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -1,7 +1,7 @@
1/* 1/*
2 * net/tipc/msg.h: Include file for TIPC message header routines 2 * net/tipc/msg.h: Include file for TIPC message header routines
3 * 3 *
4 * Copyright (c) 2000-2007, 2014, Ericsson AB 4 * Copyright (c) 2000-2007, 2014-2015 Ericsson AB
5 * Copyright (c) 2005-2008, 2010-2011, Wind River Systems 5 * Copyright (c) 2005-2008, 2010-2011, Wind River Systems
6 * All rights reserved. 6 * All rights reserved.
7 * 7 *
@@ -54,6 +54,8 @@ struct plist;
54 * - TIPC_HIGH_IMPORTANCE 54 * - TIPC_HIGH_IMPORTANCE
55 * - TIPC_CRITICAL_IMPORTANCE 55 * - TIPC_CRITICAL_IMPORTANCE
56 */ 56 */
57#define TIPC_SYSTEM_IMPORTANCE 4
58
57 59
58/* 60/*
59 * Payload message types 61 * Payload message types
@@ -64,6 +66,19 @@ struct plist;
64#define TIPC_DIRECT_MSG 3 66#define TIPC_DIRECT_MSG 3
65 67
66/* 68/*
69 * Internal message users
70 */
71#define BCAST_PROTOCOL 5
72#define MSG_BUNDLER 6
73#define LINK_PROTOCOL 7
74#define CONN_MANAGER 8
75#define CHANGEOVER_PROTOCOL 10
76#define NAME_DISTRIBUTOR 11
77#define MSG_FRAGMENTER 12
78#define LINK_CONFIG 13
79#define SOCK_WAKEUP 14 /* pseudo user */
80
81/*
67 * Message header sizes 82 * Message header sizes
68 */ 83 */
69#define SHORT_H_SIZE 24 /* In-cluster basic payload message */ 84#define SHORT_H_SIZE 24 /* In-cluster basic payload message */
@@ -92,7 +107,7 @@ struct plist;
92struct tipc_skb_cb { 107struct tipc_skb_cb {
93 void *handle; 108 void *handle;
94 struct sk_buff *tail; 109 struct sk_buff *tail;
95 bool deferred; 110 bool validated;
96 bool wakeup_pending; 111 bool wakeup_pending;
97 bool bundling; 112 bool bundling;
98 u16 chain_sz; 113 u16 chain_sz;
@@ -170,16 +185,6 @@ static inline void msg_set_user(struct tipc_msg *m, u32 n)
170 msg_set_bits(m, 0, 25, 0xf, n); 185 msg_set_bits(m, 0, 25, 0xf, n);
171} 186}
172 187
173static inline u32 msg_importance(struct tipc_msg *m)
174{
175 return msg_bits(m, 0, 25, 0xf);
176}
177
178static inline void msg_set_importance(struct tipc_msg *m, u32 i)
179{
180 msg_set_user(m, i);
181}
182
183static inline u32 msg_hdr_sz(struct tipc_msg *m) 188static inline u32 msg_hdr_sz(struct tipc_msg *m)
184{ 189{
185 return msg_bits(m, 0, 21, 0xf) << 2; 190 return msg_bits(m, 0, 21, 0xf) << 2;
@@ -336,6 +341,25 @@ static inline void msg_set_seqno(struct tipc_msg *m, u32 n)
336/* 341/*
337 * Words 3-10 342 * Words 3-10
338 */ 343 */
344static inline u32 msg_importance(struct tipc_msg *m)
345{
346 if (unlikely(msg_user(m) == MSG_FRAGMENTER))
347 return msg_bits(m, 5, 13, 0x7);
348 if (likely(msg_isdata(m) && !msg_errcode(m)))
349 return msg_user(m);
350 return TIPC_SYSTEM_IMPORTANCE;
351}
352
353static inline void msg_set_importance(struct tipc_msg *m, u32 i)
354{
355 if (unlikely(msg_user(m) == MSG_FRAGMENTER))
356 msg_set_bits(m, 5, 13, 0x7, i);
357 else if (likely(i < TIPC_SYSTEM_IMPORTANCE))
358 msg_set_user(m, i);
359 else
360 pr_warn("Trying to set illegal importance in message\n");
361}
362
339static inline u32 msg_prevnode(struct tipc_msg *m) 363static inline u32 msg_prevnode(struct tipc_msg *m)
340{ 364{
341 return msg_word(m, 3); 365 return msg_word(m, 3);
@@ -458,20 +482,6 @@ static inline struct tipc_msg *msg_get_wrapped(struct tipc_msg *m)
458 */ 482 */
459 483
460/* 484/*
461 * Internal message users
462 */
463#define BCAST_PROTOCOL 5
464#define MSG_BUNDLER 6
465#define LINK_PROTOCOL 7
466#define CONN_MANAGER 8
467#define ROUTE_DISTRIBUTOR 9 /* obsoleted */
468#define CHANGEOVER_PROTOCOL 10
469#define NAME_DISTRIBUTOR 11
470#define MSG_FRAGMENTER 12
471#define LINK_CONFIG 13
472#define SOCK_WAKEUP 14 /* pseudo user */
473
474/*
475 * Connection management protocol message types 485 * Connection management protocol message types
476 */ 486 */
477#define CONN_PROBE 0 487#define CONN_PROBE 0
@@ -510,7 +520,6 @@ static inline struct tipc_msg *msg_get_wrapped(struct tipc_msg *m)
510#define DSC_REQ_MSG 0 520#define DSC_REQ_MSG 0
511#define DSC_RESP_MSG 1 521#define DSC_RESP_MSG 1
512 522
513
514/* 523/*
515 * Word 1 524 * Word 1
516 */ 525 */
@@ -534,6 +543,16 @@ static inline void msg_set_node_sig(struct tipc_msg *m, u32 n)
534 msg_set_bits(m, 1, 0, 0xffff, n); 543 msg_set_bits(m, 1, 0, 0xffff, n);
535} 544}
536 545
546static inline u32 msg_node_capabilities(struct tipc_msg *m)
547{
548 return msg_bits(m, 1, 15, 0x1fff);
549}
550
551static inline void msg_set_node_capabilities(struct tipc_msg *m, u32 n)
552{
553 msg_set_bits(m, 1, 15, 0x1fff, n);
554}
555
537 556
538/* 557/*
539 * Word 2 558 * Word 2
@@ -734,13 +753,6 @@ static inline void msg_set_link_tolerance(struct tipc_msg *m, u32 n)
734 msg_set_bits(m, 9, 0, 0xffff, n); 753 msg_set_bits(m, 9, 0, 0xffff, n);
735} 754}
736 755
737static inline u32 tipc_msg_tot_importance(struct tipc_msg *m)
738{
739 if ((msg_user(m) == MSG_FRAGMENTER) && (msg_type(m) == FIRST_FRAGMENT))
740 return msg_importance(msg_get_wrapped(m));
741 return msg_importance(m);
742}
743
744static inline u32 msg_tot_origport(struct tipc_msg *m) 756static inline u32 msg_tot_origport(struct tipc_msg *m)
745{ 757{
746 if ((msg_user(m) == MSG_FRAGMENTER) && (msg_type(m) == FIRST_FRAGMENT)) 758 if ((msg_user(m) == MSG_FRAGMENTER) && (msg_type(m) == FIRST_FRAGMENT))
@@ -749,6 +761,7 @@ static inline u32 msg_tot_origport(struct tipc_msg *m)
749} 761}
750 762
751struct sk_buff *tipc_buf_acquire(u32 size); 763struct sk_buff *tipc_buf_acquire(u32 size);
764bool tipc_msg_validate(struct sk_buff *skb);
752bool tipc_msg_reverse(u32 own_addr, struct sk_buff *buf, u32 *dnode, 765bool tipc_msg_reverse(u32 own_addr, struct sk_buff *buf, u32 *dnode,
753 int err); 766 int err);
754void tipc_msg_init(u32 own_addr, struct tipc_msg *m, u32 user, u32 type, 767void tipc_msg_init(u32 own_addr, struct tipc_msg *m, u32 user, u32 type,
@@ -757,9 +770,9 @@ struct sk_buff *tipc_msg_create(uint user, uint type, uint hdr_sz,
757 uint data_sz, u32 dnode, u32 onode, 770 uint data_sz, u32 dnode, u32 onode,
758 u32 dport, u32 oport, int errcode); 771 u32 dport, u32 oport, int errcode);
759int tipc_buf_append(struct sk_buff **headbuf, struct sk_buff **buf); 772int tipc_buf_append(struct sk_buff **headbuf, struct sk_buff **buf);
760bool tipc_msg_bundle(struct sk_buff_head *list, struct sk_buff *skb, u32 mtu); 773bool tipc_msg_bundle(struct sk_buff *bskb, struct sk_buff *skb, u32 mtu);
761bool tipc_msg_make_bundle(struct sk_buff_head *list, 774
762 struct sk_buff *skb, u32 mtu, u32 dnode); 775bool tipc_msg_make_bundle(struct sk_buff **skb, u32 mtu, u32 dnode);
763bool tipc_msg_extract(struct sk_buff *skb, struct sk_buff **iskb, int *pos); 776bool tipc_msg_extract(struct sk_buff *skb, struct sk_buff **iskb, int *pos);
764int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m, 777int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m,
765 int offset, int dsz, int mtu, struct sk_buff_head *list); 778 int offset, int dsz, int mtu, struct sk_buff_head *list);
diff --git a/net/tipc/node.c b/net/tipc/node.c
index 86152de8248d..26d1de1bf34d 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -111,7 +111,7 @@ struct tipc_node *tipc_node_create(struct net *net, u32 addr)
111 INIT_LIST_HEAD(&n_ptr->list); 111 INIT_LIST_HEAD(&n_ptr->list);
112 INIT_LIST_HEAD(&n_ptr->publ_list); 112 INIT_LIST_HEAD(&n_ptr->publ_list);
113 INIT_LIST_HEAD(&n_ptr->conn_sks); 113 INIT_LIST_HEAD(&n_ptr->conn_sks);
114 __skb_queue_head_init(&n_ptr->bclink.deferred_queue); 114 __skb_queue_head_init(&n_ptr->bclink.deferdq);
115 hlist_add_head_rcu(&n_ptr->hash, &tn->node_htable[tipc_hashfn(addr)]); 115 hlist_add_head_rcu(&n_ptr->hash, &tn->node_htable[tipc_hashfn(addr)]);
116 list_for_each_entry_rcu(temp_node, &tn->node_list, list) { 116 list_for_each_entry_rcu(temp_node, &tn->node_list, list) {
117 if (n_ptr->addr < temp_node->addr) 117 if (n_ptr->addr < temp_node->addr)
@@ -354,7 +354,7 @@ static void node_lost_contact(struct tipc_node *n_ptr)
354 354
355 /* Flush broadcast link info associated with lost node */ 355 /* Flush broadcast link info associated with lost node */
356 if (n_ptr->bclink.recv_permitted) { 356 if (n_ptr->bclink.recv_permitted) {
357 __skb_queue_purge(&n_ptr->bclink.deferred_queue); 357 __skb_queue_purge(&n_ptr->bclink.deferdq);
358 358
359 if (n_ptr->bclink.reasm_buf) { 359 if (n_ptr->bclink.reasm_buf) {
360 kfree_skb(n_ptr->bclink.reasm_buf); 360 kfree_skb(n_ptr->bclink.reasm_buf);
diff --git a/net/tipc/node.h b/net/tipc/node.h
index 3d18c66b7f78..e89ac04ec2c3 100644
--- a/net/tipc/node.h
+++ b/net/tipc/node.h
@@ -84,7 +84,7 @@ struct tipc_node_bclink {
84 u32 last_sent; 84 u32 last_sent;
85 u32 oos_state; 85 u32 oos_state;
86 u32 deferred_size; 86 u32 deferred_size;
87 struct sk_buff_head deferred_queue; 87 struct sk_buff_head deferdq;
88 struct sk_buff *reasm_buf; 88 struct sk_buff *reasm_buf;
89 int inputq_map; 89 int inputq_map;
90 bool recv_permitted; 90 bool recv_permitted;
@@ -106,6 +106,7 @@ struct tipc_node_bclink {
106 * @list: links to adjacent nodes in sorted list of cluster's nodes 106 * @list: links to adjacent nodes in sorted list of cluster's nodes
107 * @working_links: number of working links to node (both active and standby) 107 * @working_links: number of working links to node (both active and standby)
108 * @link_cnt: number of links to node 108 * @link_cnt: number of links to node
109 * @capabilities: bitmap, indicating peer node's functional capabilities
109 * @signature: node instance identifier 110 * @signature: node instance identifier
110 * @link_id: local and remote bearer ids of changing link, if any 111 * @link_id: local and remote bearer ids of changing link, if any
111 * @publ_list: list of publications 112 * @publ_list: list of publications
@@ -125,7 +126,8 @@ struct tipc_node {
125 struct tipc_node_bclink bclink; 126 struct tipc_node_bclink bclink;
126 struct list_head list; 127 struct list_head list;
127 int link_cnt; 128 int link_cnt;
128 int working_links; 129 u16 working_links;
130 u16 capabilities;
129 u32 signature; 131 u32 signature;
130 u32 link_id; 132 u32 link_id;
131 struct list_head publ_list; 133 struct list_head publ_list;