aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDavid S. Miller <davem@davemloft.net>2014-11-26 12:30:23 -0500
committerDavid S. Miller <davem@davemloft.net>2014-11-26 12:30:23 -0500
commit8b7f8a99906fc21c287ad63ad3a89cf662b0293e (patch)
tree82658ff3ecd103abdad794b9b0833e45160b235e
parent73cf0e923d685a6a1b7754c7d29cc14944f271d9 (diff)
parenta6ca109443842e7251c68451f8137ae68ae6d8a6 (diff)
Merge branch 'tipc-next'
Ying Xue says: ==================== standardize TIPC SKB queue operations Now the following SKB queues are created and maintained within internal TIPC stack: - link transmission queue - link deferred queue - link receive queue - socket outgoing packet chain - name table outgoing packet chain In order to manage above queues, TIPC stack declares a sk_buff pointer for each queue to record its head, and directly modifies "prev" and "next" SKB pointers of SKB structure when inserting or deleting a SKB to or from the queue. As these operations are pretty complex, they easily involve fatal mistakes. If these sk_buff pointers are replaced with sk_buff_head instances as queue heads and corresponding generic SKB list APIs are used to manage them, the entire TIPC code would become quite clean and readable. But before make the change, we need to clean up below redundant functionalities: - remove node subscribe infrastructure - remove protocol message queue - remove retransmission queue - clean up process of pushing packets in link layer ==================== Signed-off-by: David S. Miller <davem@davemloft.net>
-rw-r--r--net/tipc/Makefile4
-rw-r--r--net/tipc/bcast.c109
-rw-r--r--net/tipc/bcast.h2
-rw-r--r--net/tipc/bearer.h2
-rw-r--r--net/tipc/core.h1
-rw-r--r--net/tipc/link.c514
-rw-r--r--net/tipc/link.h48
-rw-r--r--net/tipc/msg.c125
-rw-r--r--net/tipc/msg.h16
-rw-r--r--net/tipc/name_distr.c98
-rw-r--r--net/tipc/name_distr.h1
-rw-r--r--net/tipc/name_table.c2
-rw-r--r--net/tipc/name_table.h6
-rw-r--r--net/tipc/node.c10
-rw-r--r--net/tipc/node.h12
-rw-r--r--net/tipc/node_subscr.c96
-rw-r--r--net/tipc/node_subscr.h63
-rw-r--r--net/tipc/socket.c127
18 files changed, 496 insertions, 740 deletions
diff --git a/net/tipc/Makefile b/net/tipc/Makefile
index b8a13caad59a..333e4592772c 100644
--- a/net/tipc/Makefile
+++ b/net/tipc/Makefile
@@ -7,8 +7,8 @@ obj-$(CONFIG_TIPC) := tipc.o
7tipc-y += addr.o bcast.o bearer.o config.o \ 7tipc-y += addr.o bcast.o bearer.o config.o \
8 core.o link.o discover.o msg.o \ 8 core.o link.o discover.o msg.o \
9 name_distr.o subscr.o name_table.o net.o \ 9 name_distr.o subscr.o name_table.o net.o \
10 netlink.o node.o node_subscr.o \ 10 netlink.o node.o socket.o log.o eth_media.o \
11 socket.o log.o eth_media.o server.o 11 server.o
12 12
13tipc-$(CONFIG_TIPC_MEDIA_IB) += ib_media.o 13tipc-$(CONFIG_TIPC_MEDIA_IB) += ib_media.o
14tipc-$(CONFIG_SYSCTL) += sysctl.o 14tipc-$(CONFIG_SYSCTL) += sysctl.o
diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index 556b26ad4b1e..f0761c771734 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -217,12 +217,13 @@ struct tipc_node *tipc_bclink_retransmit_to(void)
217 */ 217 */
218static void bclink_retransmit_pkt(u32 after, u32 to) 218static void bclink_retransmit_pkt(u32 after, u32 to)
219{ 219{
220 struct sk_buff *buf; 220 struct sk_buff *skb;
221 221
222 buf = bcl->first_out; 222 skb_queue_walk(&bcl->outqueue, skb) {
223 while (buf && less_eq(buf_seqno(buf), after)) 223 if (more(buf_seqno(skb), after))
224 buf = buf->next; 224 break;
225 tipc_link_retransmit(bcl, buf, mod(to - after)); 225 }
226 tipc_link_retransmit(bcl, skb, mod(to - after));
226} 227}
227 228
228/** 229/**
@@ -245,14 +246,14 @@ void tipc_bclink_wakeup_users(void)
245 */ 246 */
246void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked) 247void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked)
247{ 248{
248 struct sk_buff *crs; 249 struct sk_buff *skb, *tmp;
249 struct sk_buff *next; 250 struct sk_buff *next;
250 unsigned int released = 0; 251 unsigned int released = 0;
251 252
252 tipc_bclink_lock(); 253 tipc_bclink_lock();
253 /* Bail out if tx queue is empty (no clean up is required) */ 254 /* Bail out if tx queue is empty (no clean up is required) */
254 crs = bcl->first_out; 255 skb = skb_peek(&bcl->outqueue);
255 if (!crs) 256 if (!skb)
256 goto exit; 257 goto exit;
257 258
258 /* Determine which messages need to be acknowledged */ 259 /* Determine which messages need to be acknowledged */
@@ -271,43 +272,43 @@ void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked)
271 * Bail out if specified sequence number does not correspond 272 * Bail out if specified sequence number does not correspond
272 * to a message that has been sent and not yet acknowledged 273 * to a message that has been sent and not yet acknowledged
273 */ 274 */
274 if (less(acked, buf_seqno(crs)) || 275 if (less(acked, buf_seqno(skb)) ||
275 less(bcl->fsm_msg_cnt, acked) || 276 less(bcl->fsm_msg_cnt, acked) ||
276 less_eq(acked, n_ptr->bclink.acked)) 277 less_eq(acked, n_ptr->bclink.acked))
277 goto exit; 278 goto exit;
278 } 279 }
279 280
280 /* Skip over packets that node has previously acknowledged */ 281 /* Skip over packets that node has previously acknowledged */
281 while (crs && less_eq(buf_seqno(crs), n_ptr->bclink.acked)) 282 skb_queue_walk(&bcl->outqueue, skb) {
282 crs = crs->next; 283 if (more(buf_seqno(skb), n_ptr->bclink.acked))
284 break;
285 }
283 286
284 /* Update packets that node is now acknowledging */ 287 /* Update packets that node is now acknowledging */
288 skb_queue_walk_from_safe(&bcl->outqueue, skb, tmp) {
289 if (more(buf_seqno(skb), acked))
290 break;
285 291
286 while (crs && less_eq(buf_seqno(crs), acked)) { 292 next = tipc_skb_queue_next(&bcl->outqueue, skb);
287 next = crs->next; 293 if (skb != bcl->next_out) {
288 294 bcbuf_decr_acks(skb);
289 if (crs != bcl->next_out) 295 } else {
290 bcbuf_decr_acks(crs); 296 bcbuf_set_acks(skb, 0);
291 else {
292 bcbuf_set_acks(crs, 0);
293 bcl->next_out = next; 297 bcl->next_out = next;
294 bclink_set_last_sent(); 298 bclink_set_last_sent();
295 } 299 }
296 300
297 if (bcbuf_acks(crs) == 0) { 301 if (bcbuf_acks(skb) == 0) {
298 bcl->first_out = next; 302 __skb_unlink(skb, &bcl->outqueue);
299 bcl->out_queue_size--; 303 kfree_skb(skb);
300 kfree_skb(crs);
301 released = 1; 304 released = 1;
302 } 305 }
303 crs = next;
304 } 306 }
305 n_ptr->bclink.acked = acked; 307 n_ptr->bclink.acked = acked;
306 308
307 /* Try resolving broadcast link congestion, if necessary */ 309 /* Try resolving broadcast link congestion, if necessary */
308
309 if (unlikely(bcl->next_out)) { 310 if (unlikely(bcl->next_out)) {
310 tipc_link_push_queue(bcl); 311 tipc_link_push_packets(bcl);
311 bclink_set_last_sent(); 312 bclink_set_last_sent();
312 } 313 }
313 if (unlikely(released && !skb_queue_empty(&bcl->waiting_sks))) 314 if (unlikely(released && !skb_queue_empty(&bcl->waiting_sks)))
@@ -327,19 +328,16 @@ void tipc_bclink_update_link_state(struct tipc_node *n_ptr, u32 last_sent)
327 struct sk_buff *buf; 328 struct sk_buff *buf;
328 329
329 /* Ignore "stale" link state info */ 330 /* Ignore "stale" link state info */
330
331 if (less_eq(last_sent, n_ptr->bclink.last_in)) 331 if (less_eq(last_sent, n_ptr->bclink.last_in))
332 return; 332 return;
333 333
334 /* Update link synchronization state; quit if in sync */ 334 /* Update link synchronization state; quit if in sync */
335
336 bclink_update_last_sent(n_ptr, last_sent); 335 bclink_update_last_sent(n_ptr, last_sent);
337 336
338 if (n_ptr->bclink.last_sent == n_ptr->bclink.last_in) 337 if (n_ptr->bclink.last_sent == n_ptr->bclink.last_in)
339 return; 338 return;
340 339
341 /* Update out-of-sync state; quit if loss is still unconfirmed */ 340 /* Update out-of-sync state; quit if loss is still unconfirmed */
342
343 if ((++n_ptr->bclink.oos_state) == 1) { 341 if ((++n_ptr->bclink.oos_state) == 1) {
344 if (n_ptr->bclink.deferred_size < (TIPC_MIN_LINK_WIN / 2)) 342 if (n_ptr->bclink.deferred_size < (TIPC_MIN_LINK_WIN / 2))
345 return; 343 return;
@@ -347,15 +345,15 @@ void tipc_bclink_update_link_state(struct tipc_node *n_ptr, u32 last_sent)
347 } 345 }
348 346
349 /* Don't NACK if one has been recently sent (or seen) */ 347 /* Don't NACK if one has been recently sent (or seen) */
350
351 if (n_ptr->bclink.oos_state & 0x1) 348 if (n_ptr->bclink.oos_state & 0x1)
352 return; 349 return;
353 350
354 /* Send NACK */ 351 /* Send NACK */
355
356 buf = tipc_buf_acquire(INT_H_SIZE); 352 buf = tipc_buf_acquire(INT_H_SIZE);
357 if (buf) { 353 if (buf) {
358 struct tipc_msg *msg = buf_msg(buf); 354 struct tipc_msg *msg = buf_msg(buf);
355 struct sk_buff *skb = skb_peek(&n_ptr->bclink.deferred_queue);
356 u32 to = skb ? buf_seqno(skb) - 1 : n_ptr->bclink.last_sent;
359 357
360 tipc_msg_init(msg, BCAST_PROTOCOL, STATE_MSG, 358 tipc_msg_init(msg, BCAST_PROTOCOL, STATE_MSG,
361 INT_H_SIZE, n_ptr->addr); 359 INT_H_SIZE, n_ptr->addr);
@@ -363,9 +361,7 @@ void tipc_bclink_update_link_state(struct tipc_node *n_ptr, u32 last_sent)
363 msg_set_mc_netid(msg, tipc_net_id); 361 msg_set_mc_netid(msg, tipc_net_id);
364 msg_set_bcast_ack(msg, n_ptr->bclink.last_in); 362 msg_set_bcast_ack(msg, n_ptr->bclink.last_in);
365 msg_set_bcgap_after(msg, n_ptr->bclink.last_in); 363 msg_set_bcgap_after(msg, n_ptr->bclink.last_in);
366 msg_set_bcgap_to(msg, n_ptr->bclink.deferred_head 364 msg_set_bcgap_to(msg, to);
367 ? buf_seqno(n_ptr->bclink.deferred_head) - 1
368 : n_ptr->bclink.last_sent);
369 365
370 tipc_bclink_lock(); 366 tipc_bclink_lock();
371 tipc_bearer_send(MAX_BEARERS, buf, NULL); 367 tipc_bearer_send(MAX_BEARERS, buf, NULL);
@@ -402,20 +398,20 @@ static void bclink_peek_nack(struct tipc_msg *msg)
402 398
403/* tipc_bclink_xmit - broadcast buffer chain to all nodes in cluster 399/* tipc_bclink_xmit - broadcast buffer chain to all nodes in cluster
404 * and to identified node local sockets 400 * and to identified node local sockets
405 * @buf: chain of buffers containing message 401 * @list: chain of buffers containing message
406 * Consumes the buffer chain, except when returning -ELINKCONG 402 * Consumes the buffer chain, except when returning -ELINKCONG
407 * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE 403 * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE
408 */ 404 */
409int tipc_bclink_xmit(struct sk_buff *buf) 405int tipc_bclink_xmit(struct sk_buff_head *list)
410{ 406{
411 int rc = 0; 407 int rc = 0;
412 int bc = 0; 408 int bc = 0;
413 struct sk_buff *clbuf; 409 struct sk_buff *skb;
414 410
415 /* Prepare clone of message for local node */ 411 /* Prepare clone of message for local node */
416 clbuf = tipc_msg_reassemble(buf); 412 skb = tipc_msg_reassemble(list);
417 if (unlikely(!clbuf)) { 413 if (unlikely(!skb)) {
418 kfree_skb_list(buf); 414 __skb_queue_purge(list);
419 return -EHOSTUNREACH; 415 return -EHOSTUNREACH;
420 } 416 }
421 417
@@ -423,11 +419,13 @@ int tipc_bclink_xmit(struct sk_buff *buf)
423 if (likely(bclink)) { 419 if (likely(bclink)) {
424 tipc_bclink_lock(); 420 tipc_bclink_lock();
425 if (likely(bclink->bcast_nodes.count)) { 421 if (likely(bclink->bcast_nodes.count)) {
426 rc = __tipc_link_xmit(bcl, buf); 422 rc = __tipc_link_xmit(bcl, list);
427 if (likely(!rc)) { 423 if (likely(!rc)) {
424 u32 len = skb_queue_len(&bcl->outqueue);
425
428 bclink_set_last_sent(); 426 bclink_set_last_sent();
429 bcl->stats.queue_sz_counts++; 427 bcl->stats.queue_sz_counts++;
430 bcl->stats.accu_queue_sz += bcl->out_queue_size; 428 bcl->stats.accu_queue_sz += len;
431 } 429 }
432 bc = 1; 430 bc = 1;
433 } 431 }
@@ -435,13 +433,13 @@ int tipc_bclink_xmit(struct sk_buff *buf)
435 } 433 }
436 434
437 if (unlikely(!bc)) 435 if (unlikely(!bc))
438 kfree_skb_list(buf); 436 __skb_queue_purge(list);
439 437
440 /* Deliver message clone */ 438 /* Deliver message clone */
441 if (likely(!rc)) 439 if (likely(!rc))
442 tipc_sk_mcast_rcv(clbuf); 440 tipc_sk_mcast_rcv(skb);
443 else 441 else
444 kfree_skb(clbuf); 442 kfree_skb(skb);
445 443
446 return rc; 444 return rc;
447} 445}
@@ -462,7 +460,6 @@ static void bclink_accept_pkt(struct tipc_node *node, u32 seqno)
462 * Unicast an ACK periodically, ensuring that 460 * Unicast an ACK periodically, ensuring that
463 * all nodes in the cluster don't ACK at the same time 461 * all nodes in the cluster don't ACK at the same time
464 */ 462 */
465
466 if (((seqno - tipc_own_addr) % TIPC_MIN_LINK_WIN) == 0) { 463 if (((seqno - tipc_own_addr) % TIPC_MIN_LINK_WIN) == 0) {
467 tipc_link_proto_xmit(node->active_links[node->addr & 1], 464 tipc_link_proto_xmit(node->active_links[node->addr & 1],
468 STATE_MSG, 0, 0, 0, 0, 0); 465 STATE_MSG, 0, 0, 0, 0, 0);
@@ -484,7 +481,6 @@ void tipc_bclink_rcv(struct sk_buff *buf)
484 int deferred = 0; 481 int deferred = 0;
485 482
486 /* Screen out unwanted broadcast messages */ 483 /* Screen out unwanted broadcast messages */
487
488 if (msg_mc_netid(msg) != tipc_net_id) 484 if (msg_mc_netid(msg) != tipc_net_id)
489 goto exit; 485 goto exit;
490 486
@@ -497,7 +493,6 @@ void tipc_bclink_rcv(struct sk_buff *buf)
497 goto unlock; 493 goto unlock;
498 494
499 /* Handle broadcast protocol message */ 495 /* Handle broadcast protocol message */
500
501 if (unlikely(msg_user(msg) == BCAST_PROTOCOL)) { 496 if (unlikely(msg_user(msg) == BCAST_PROTOCOL)) {
502 if (msg_type(msg) != STATE_MSG) 497 if (msg_type(msg) != STATE_MSG)
503 goto unlock; 498 goto unlock;
@@ -518,14 +513,12 @@ void tipc_bclink_rcv(struct sk_buff *buf)
518 } 513 }
519 514
520 /* Handle in-sequence broadcast message */ 515 /* Handle in-sequence broadcast message */
521
522 seqno = msg_seqno(msg); 516 seqno = msg_seqno(msg);
523 next_in = mod(node->bclink.last_in + 1); 517 next_in = mod(node->bclink.last_in + 1);
524 518
525 if (likely(seqno == next_in)) { 519 if (likely(seqno == next_in)) {
526receive: 520receive:
527 /* Deliver message to destination */ 521 /* Deliver message to destination */
528
529 if (likely(msg_isdata(msg))) { 522 if (likely(msg_isdata(msg))) {
530 tipc_bclink_lock(); 523 tipc_bclink_lock();
531 bclink_accept_pkt(node, seqno); 524 bclink_accept_pkt(node, seqno);
@@ -574,7 +567,6 @@ receive:
574 buf = NULL; 567 buf = NULL;
575 568
576 /* Determine new synchronization state */ 569 /* Determine new synchronization state */
577
578 tipc_node_lock(node); 570 tipc_node_lock(node);
579 if (unlikely(!tipc_node_is_up(node))) 571 if (unlikely(!tipc_node_is_up(node)))
580 goto unlock; 572 goto unlock;
@@ -582,33 +574,26 @@ receive:
582 if (node->bclink.last_in == node->bclink.last_sent) 574 if (node->bclink.last_in == node->bclink.last_sent)
583 goto unlock; 575 goto unlock;
584 576
585 if (!node->bclink.deferred_head) { 577 if (skb_queue_empty(&node->bclink.deferred_queue)) {
586 node->bclink.oos_state = 1; 578 node->bclink.oos_state = 1;
587 goto unlock; 579 goto unlock;
588 } 580 }
589 581
590 msg = buf_msg(node->bclink.deferred_head); 582 msg = buf_msg(skb_peek(&node->bclink.deferred_queue));
591 seqno = msg_seqno(msg); 583 seqno = msg_seqno(msg);
592 next_in = mod(next_in + 1); 584 next_in = mod(next_in + 1);
593 if (seqno != next_in) 585 if (seqno != next_in)
594 goto unlock; 586 goto unlock;
595 587
596 /* Take in-sequence message from deferred queue & deliver it */ 588 /* Take in-sequence message from deferred queue & deliver it */
597 589 buf = __skb_dequeue(&node->bclink.deferred_queue);
598 buf = node->bclink.deferred_head;
599 node->bclink.deferred_head = buf->next;
600 buf->next = NULL;
601 node->bclink.deferred_size--;
602 goto receive; 590 goto receive;
603 } 591 }
604 592
605 /* Handle out-of-sequence broadcast message */ 593 /* Handle out-of-sequence broadcast message */
606
607 if (less(next_in, seqno)) { 594 if (less(next_in, seqno)) {
608 deferred = tipc_link_defer_pkt(&node->bclink.deferred_head, 595 deferred = tipc_link_defer_pkt(&node->bclink.deferred_queue,
609 &node->bclink.deferred_tail,
610 buf); 596 buf);
611 node->bclink.deferred_size += deferred;
612 bclink_update_last_sent(node, seqno); 597 bclink_update_last_sent(node, seqno);
613 buf = NULL; 598 buf = NULL;
614 } 599 }
@@ -963,6 +948,8 @@ int tipc_bclink_init(void)
963 sprintf(bcbearer->media.name, "tipc-broadcast"); 948 sprintf(bcbearer->media.name, "tipc-broadcast");
964 949
965 spin_lock_init(&bclink->lock); 950 spin_lock_init(&bclink->lock);
951 __skb_queue_head_init(&bcl->outqueue);
952 __skb_queue_head_init(&bcl->deferred_queue);
966 __skb_queue_head_init(&bcl->waiting_sks); 953 __skb_queue_head_init(&bcl->waiting_sks);
967 bcl->next_out_no = 1; 954 bcl->next_out_no = 1;
968 spin_lock_init(&bclink->node.lock); 955 spin_lock_init(&bclink->node.lock);
diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h
index 443de084d3e8..644d79129fba 100644
--- a/net/tipc/bcast.h
+++ b/net/tipc/bcast.h
@@ -100,7 +100,7 @@ int tipc_bclink_reset_stats(void);
100int tipc_bclink_set_queue_limits(u32 limit); 100int tipc_bclink_set_queue_limits(u32 limit);
101void tipc_bcbearer_sort(struct tipc_node_map *nm_ptr, u32 node, bool action); 101void tipc_bcbearer_sort(struct tipc_node_map *nm_ptr, u32 node, bool action);
102uint tipc_bclink_get_mtu(void); 102uint tipc_bclink_get_mtu(void);
103int tipc_bclink_xmit(struct sk_buff *buf); 103int tipc_bclink_xmit(struct sk_buff_head *list);
104void tipc_bclink_wakeup_users(void); 104void tipc_bclink_wakeup_users(void);
105int tipc_nl_add_bc_link(struct tipc_nl_msg *msg); 105int tipc_nl_add_bc_link(struct tipc_nl_msg *msg);
106 106
diff --git a/net/tipc/bearer.h b/net/tipc/bearer.h
index b1d905209e83..2c1230ac5dfe 100644
--- a/net/tipc/bearer.h
+++ b/net/tipc/bearer.h
@@ -165,7 +165,7 @@ extern struct tipc_bearer __rcu *bearer_list[];
165 * TIPC routines available to supported media types 165 * TIPC routines available to supported media types
166 */ 166 */
167 167
168void tipc_rcv(struct sk_buff *buf, struct tipc_bearer *tb_ptr); 168void tipc_rcv(struct sk_buff *skb, struct tipc_bearer *tb_ptr);
169int tipc_enable_bearer(const char *bearer_name, u32 disc_domain, u32 priority); 169int tipc_enable_bearer(const char *bearer_name, u32 disc_domain, u32 priority);
170int tipc_disable_bearer(const char *name); 170int tipc_disable_bearer(const char *name);
171 171
diff --git a/net/tipc/core.h b/net/tipc/core.h
index b578b10feefa..84602137ce20 100644
--- a/net/tipc/core.h
+++ b/net/tipc/core.h
@@ -192,6 +192,7 @@ struct tipc_skb_cb {
192 struct sk_buff *tail; 192 struct sk_buff *tail;
193 bool deferred; 193 bool deferred;
194 bool wakeup_pending; 194 bool wakeup_pending;
195 bool bundling;
195 u16 chain_sz; 196 u16 chain_sz;
196 u16 chain_imp; 197 u16 chain_imp;
197}; 198};
diff --git a/net/tipc/link.c b/net/tipc/link.c
index 4738cb1bf7c0..34bf15c90c78 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -149,18 +149,6 @@ static void link_init_max_pkt(struct tipc_link *l_ptr)
149 l_ptr->max_pkt_probes = 0; 149 l_ptr->max_pkt_probes = 0;
150} 150}
151 151
152static u32 link_next_sent(struct tipc_link *l_ptr)
153{
154 if (l_ptr->next_out)
155 return buf_seqno(l_ptr->next_out);
156 return mod(l_ptr->next_out_no);
157}
158
159static u32 link_last_sent(struct tipc_link *l_ptr)
160{
161 return mod(link_next_sent(l_ptr) - 1);
162}
163
164/* 152/*
165 * Simple non-static link routines (i.e. referenced outside this file) 153 * Simple non-static link routines (i.e. referenced outside this file)
166 */ 154 */
@@ -183,14 +171,17 @@ int tipc_link_is_active(struct tipc_link *l_ptr)
183 */ 171 */
184static void link_timeout(struct tipc_link *l_ptr) 172static void link_timeout(struct tipc_link *l_ptr)
185{ 173{
174 struct sk_buff *skb;
175
186 tipc_node_lock(l_ptr->owner); 176 tipc_node_lock(l_ptr->owner);
187 177
188 /* update counters used in statistical profiling of send traffic */ 178 /* update counters used in statistical profiling of send traffic */
189 l_ptr->stats.accu_queue_sz += l_ptr->out_queue_size; 179 l_ptr->stats.accu_queue_sz += skb_queue_len(&l_ptr->outqueue);
190 l_ptr->stats.queue_sz_counts++; 180 l_ptr->stats.queue_sz_counts++;
191 181
192 if (l_ptr->first_out) { 182 skb = skb_peek(&l_ptr->outqueue);
193 struct tipc_msg *msg = buf_msg(l_ptr->first_out); 183 if (skb) {
184 struct tipc_msg *msg = buf_msg(skb);
194 u32 length = msg_size(msg); 185 u32 length = msg_size(msg);
195 186
196 if ((msg_user(msg) == MSG_FRAGMENTER) && 187 if ((msg_user(msg) == MSG_FRAGMENTER) &&
@@ -218,11 +209,10 @@ static void link_timeout(struct tipc_link *l_ptr)
218 } 209 }
219 210
220 /* do all other link processing performed on a periodic basis */ 211 /* do all other link processing performed on a periodic basis */
221
222 link_state_event(l_ptr, TIMEOUT_EVT); 212 link_state_event(l_ptr, TIMEOUT_EVT);
223 213
224 if (l_ptr->next_out) 214 if (l_ptr->next_out)
225 tipc_link_push_queue(l_ptr); 215 tipc_link_push_packets(l_ptr);
226 216
227 tipc_node_unlock(l_ptr->owner); 217 tipc_node_unlock(l_ptr->owner);
228} 218}
@@ -301,6 +291,8 @@ struct tipc_link *tipc_link_create(struct tipc_node *n_ptr,
301 link_init_max_pkt(l_ptr); 291 link_init_max_pkt(l_ptr);
302 292
303 l_ptr->next_out_no = 1; 293 l_ptr->next_out_no = 1;
294 __skb_queue_head_init(&l_ptr->outqueue);
295 __skb_queue_head_init(&l_ptr->deferred_queue);
304 __skb_queue_head_init(&l_ptr->waiting_sks); 296 __skb_queue_head_init(&l_ptr->waiting_sks);
305 297
306 link_reset_statistics(l_ptr); 298 link_reset_statistics(l_ptr);
@@ -379,30 +371,19 @@ static bool link_schedule_user(struct tipc_link *link, u32 oport,
379 */ 371 */
380static void link_prepare_wakeup(struct tipc_link *link) 372static void link_prepare_wakeup(struct tipc_link *link)
381{ 373{
382 struct sk_buff_head *wq = &link->waiting_sks; 374 uint pend_qsz = skb_queue_len(&link->outqueue);
383 struct sk_buff *buf; 375 struct sk_buff *skb, *tmp;
384 uint pend_qsz = link->out_queue_size;
385 376
386 for (buf = skb_peek(wq); buf; buf = skb_peek(wq)) { 377 skb_queue_walk_safe(&link->waiting_sks, skb, tmp) {
387 if (pend_qsz >= link->queue_limit[TIPC_SKB_CB(buf)->chain_imp]) 378 if (pend_qsz >= link->queue_limit[TIPC_SKB_CB(skb)->chain_imp])
388 break; 379 break;
389 pend_qsz += TIPC_SKB_CB(buf)->chain_sz; 380 pend_qsz += TIPC_SKB_CB(skb)->chain_sz;
390 __skb_queue_tail(&link->owner->waiting_sks, __skb_dequeue(wq)); 381 __skb_unlink(skb, &link->waiting_sks);
382 __skb_queue_tail(&link->owner->waiting_sks, skb);
391 } 383 }
392} 384}
393 385
394/** 386/**
395 * link_release_outqueue - purge link's outbound message queue
396 * @l_ptr: pointer to link
397 */
398static void link_release_outqueue(struct tipc_link *l_ptr)
399{
400 kfree_skb_list(l_ptr->first_out);
401 l_ptr->first_out = NULL;
402 l_ptr->out_queue_size = 0;
403}
404
405/**
406 * tipc_link_reset_fragments - purge link's inbound message fragments queue 387 * tipc_link_reset_fragments - purge link's inbound message fragments queue
407 * @l_ptr: pointer to link 388 * @l_ptr: pointer to link
408 */ 389 */
@@ -418,11 +399,9 @@ void tipc_link_reset_fragments(struct tipc_link *l_ptr)
418 */ 399 */
419void tipc_link_purge_queues(struct tipc_link *l_ptr) 400void tipc_link_purge_queues(struct tipc_link *l_ptr)
420{ 401{
421 kfree_skb_list(l_ptr->oldest_deferred_in); 402 __skb_queue_purge(&l_ptr->deferred_queue);
422 kfree_skb_list(l_ptr->first_out); 403 __skb_queue_purge(&l_ptr->outqueue);
423 tipc_link_reset_fragments(l_ptr); 404 tipc_link_reset_fragments(l_ptr);
424 kfree_skb(l_ptr->proto_msg_queue);
425 l_ptr->proto_msg_queue = NULL;
426} 405}
427 406
428void tipc_link_reset(struct tipc_link *l_ptr) 407void tipc_link_reset(struct tipc_link *l_ptr)
@@ -454,25 +433,16 @@ void tipc_link_reset(struct tipc_link *l_ptr)
454 } 433 }
455 434
456 /* Clean up all queues: */ 435 /* Clean up all queues: */
457 link_release_outqueue(l_ptr); 436 __skb_queue_purge(&l_ptr->outqueue);
458 kfree_skb(l_ptr->proto_msg_queue); 437 __skb_queue_purge(&l_ptr->deferred_queue);
459 l_ptr->proto_msg_queue = NULL;
460 kfree_skb_list(l_ptr->oldest_deferred_in);
461 if (!skb_queue_empty(&l_ptr->waiting_sks)) { 438 if (!skb_queue_empty(&l_ptr->waiting_sks)) {
462 skb_queue_splice_init(&l_ptr->waiting_sks, &owner->waiting_sks); 439 skb_queue_splice_init(&l_ptr->waiting_sks, &owner->waiting_sks);
463 owner->action_flags |= TIPC_WAKEUP_USERS; 440 owner->action_flags |= TIPC_WAKEUP_USERS;
464 } 441 }
465 l_ptr->retransm_queue_head = 0;
466 l_ptr->retransm_queue_size = 0;
467 l_ptr->last_out = NULL;
468 l_ptr->first_out = NULL;
469 l_ptr->next_out = NULL; 442 l_ptr->next_out = NULL;
470 l_ptr->unacked_window = 0; 443 l_ptr->unacked_window = 0;
471 l_ptr->checkpoint = 1; 444 l_ptr->checkpoint = 1;
472 l_ptr->next_out_no = 1; 445 l_ptr->next_out_no = 1;
473 l_ptr->deferred_inqueue_sz = 0;
474 l_ptr->oldest_deferred_in = NULL;
475 l_ptr->newest_deferred_in = NULL;
476 l_ptr->fsm_msg_cnt = 0; 446 l_ptr->fsm_msg_cnt = 0;
477 l_ptr->stale_count = 0; 447 l_ptr->stale_count = 0;
478 link_reset_statistics(l_ptr); 448 link_reset_statistics(l_ptr);
@@ -694,9 +664,10 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)
694 * - For all other messages we discard the buffer and return -EHOSTUNREACH 664 * - For all other messages we discard the buffer and return -EHOSTUNREACH
695 * - For TIPC internal messages we also reset the link 665 * - For TIPC internal messages we also reset the link
696 */ 666 */
697static int tipc_link_cong(struct tipc_link *link, struct sk_buff *buf) 667static int tipc_link_cong(struct tipc_link *link, struct sk_buff_head *list)
698{ 668{
699 struct tipc_msg *msg = buf_msg(buf); 669 struct sk_buff *skb = skb_peek(list);
670 struct tipc_msg *msg = buf_msg(skb);
700 uint imp = tipc_msg_tot_importance(msg); 671 uint imp = tipc_msg_tot_importance(msg);
701 u32 oport = msg_tot_origport(msg); 672 u32 oport = msg_tot_origport(msg);
702 673
@@ -709,30 +680,30 @@ static int tipc_link_cong(struct tipc_link *link, struct sk_buff *buf)
709 goto drop; 680 goto drop;
710 if (unlikely(msg_reroute_cnt(msg))) 681 if (unlikely(msg_reroute_cnt(msg)))
711 goto drop; 682 goto drop;
712 if (TIPC_SKB_CB(buf)->wakeup_pending) 683 if (TIPC_SKB_CB(skb)->wakeup_pending)
713 return -ELINKCONG; 684 return -ELINKCONG;
714 if (link_schedule_user(link, oport, TIPC_SKB_CB(buf)->chain_sz, imp)) 685 if (link_schedule_user(link, oport, skb_queue_len(list), imp))
715 return -ELINKCONG; 686 return -ELINKCONG;
716drop: 687drop:
717 kfree_skb_list(buf); 688 __skb_queue_purge(list);
718 return -EHOSTUNREACH; 689 return -EHOSTUNREACH;
719} 690}
720 691
721/** 692/**
722 * __tipc_link_xmit(): same as tipc_link_xmit, but destlink is known & locked 693 * __tipc_link_xmit(): same as tipc_link_xmit, but destlink is known & locked
723 * @link: link to use 694 * @link: link to use
724 * @buf: chain of buffers containing message 695 * @list: chain of buffers containing message
696 *
725 * Consumes the buffer chain, except when returning -ELINKCONG 697 * Consumes the buffer chain, except when returning -ELINKCONG
726 * Returns 0 if success, otherwise errno: -ELINKCONG, -EMSGSIZE (plain socket 698 * Returns 0 if success, otherwise errno: -ELINKCONG, -EMSGSIZE (plain socket
727 * user data messages) or -EHOSTUNREACH (all other messages/senders) 699 * user data messages) or -EHOSTUNREACH (all other messages/senders)
728 * Only the socket functions tipc_send_stream() and tipc_send_packet() need 700 * Only the socket functions tipc_send_stream() and tipc_send_packet() need
729 * to act on the return value, since they may need to do more send attempts. 701 * to act on the return value, since they may need to do more send attempts.
730 */ 702 */
731int __tipc_link_xmit(struct tipc_link *link, struct sk_buff *buf) 703int __tipc_link_xmit(struct tipc_link *link, struct sk_buff_head *list)
732{ 704{
733 struct tipc_msg *msg = buf_msg(buf); 705 struct tipc_msg *msg = buf_msg(skb_peek(list));
734 uint psz = msg_size(msg); 706 uint psz = msg_size(msg);
735 uint qsz = link->out_queue_size;
736 uint sndlim = link->queue_limit[0]; 707 uint sndlim = link->queue_limit[0];
737 uint imp = tipc_msg_tot_importance(msg); 708 uint imp = tipc_msg_tot_importance(msg);
738 uint mtu = link->max_pkt; 709 uint mtu = link->max_pkt;
@@ -740,71 +711,83 @@ int __tipc_link_xmit(struct tipc_link *link, struct sk_buff *buf)
740 uint seqno = link->next_out_no; 711 uint seqno = link->next_out_no;
741 uint bc_last_in = link->owner->bclink.last_in; 712 uint bc_last_in = link->owner->bclink.last_in;
742 struct tipc_media_addr *addr = &link->media_addr; 713 struct tipc_media_addr *addr = &link->media_addr;
743 struct sk_buff *next = buf->next; 714 struct sk_buff_head *outqueue = &link->outqueue;
715 struct sk_buff *skb, *tmp;
744 716
745 /* Match queue limits against msg importance: */ 717 /* Match queue limits against msg importance: */
746 if (unlikely(qsz >= link->queue_limit[imp])) 718 if (unlikely(skb_queue_len(outqueue) >= link->queue_limit[imp]))
747 return tipc_link_cong(link, buf); 719 return tipc_link_cong(link, list);
748 720
749 /* Has valid packet limit been used ? */ 721 /* Has valid packet limit been used ? */
750 if (unlikely(psz > mtu)) { 722 if (unlikely(psz > mtu)) {
751 kfree_skb_list(buf); 723 __skb_queue_purge(list);
752 return -EMSGSIZE; 724 return -EMSGSIZE;
753 } 725 }
754 726
755 /* Prepare each packet for sending, and add to outqueue: */ 727 /* Prepare each packet for sending, and add to outqueue: */
756 while (buf) { 728 skb_queue_walk_safe(list, skb, tmp) {
757 next = buf->next; 729 __skb_unlink(skb, list);
758 msg = buf_msg(buf); 730 msg = buf_msg(skb);
759 msg_set_word(msg, 2, ((ack << 16) | mod(seqno))); 731 msg_set_word(msg, 2, ((ack << 16) | mod(seqno)));
760 msg_set_bcast_ack(msg, bc_last_in); 732 msg_set_bcast_ack(msg, bc_last_in);
761 733
762 if (!link->first_out) { 734 if (skb_queue_len(outqueue) < sndlim) {
763 link->first_out = buf; 735 __skb_queue_tail(outqueue, skb);
764 } else if (qsz < sndlim) { 736 tipc_bearer_send(link->bearer_id, skb, addr);
765 link->last_out->next = buf; 737 link->next_out = NULL;
766 } else if (tipc_msg_bundle(link->last_out, buf, mtu)) { 738 link->unacked_window = 0;
739 } else if (tipc_msg_bundle(outqueue, skb, mtu)) {
767 link->stats.sent_bundled++; 740 link->stats.sent_bundled++;
768 buf = next;
769 next = buf->next;
770 continue; 741 continue;
771 } else if (tipc_msg_make_bundle(&buf, mtu, link->addr)) { 742 } else if (tipc_msg_make_bundle(outqueue, skb, mtu,
743 link->addr)) {
772 link->stats.sent_bundled++; 744 link->stats.sent_bundled++;
773 link->stats.sent_bundles++; 745 link->stats.sent_bundles++;
774 link->last_out->next = buf;
775 if (!link->next_out) 746 if (!link->next_out)
776 link->next_out = buf; 747 link->next_out = skb_peek_tail(outqueue);
777 } else { 748 } else {
778 link->last_out->next = buf; 749 __skb_queue_tail(outqueue, skb);
779 if (!link->next_out) 750 if (!link->next_out)
780 link->next_out = buf; 751 link->next_out = skb;
781 }
782
783 /* Send packet if possible: */
784 if (likely(++qsz <= sndlim)) {
785 tipc_bearer_send(link->bearer_id, buf, addr);
786 link->next_out = next;
787 link->unacked_window = 0;
788 } 752 }
789 seqno++; 753 seqno++;
790 link->last_out = buf;
791 buf = next;
792 } 754 }
793 link->next_out_no = seqno; 755 link->next_out_no = seqno;
794 link->out_queue_size = qsz;
795 return 0; 756 return 0;
796} 757}
797 758
759static void skb2list(struct sk_buff *skb, struct sk_buff_head *list)
760{
761 __skb_queue_head_init(list);
762 __skb_queue_tail(list, skb);
763}
764
765static int __tipc_link_xmit_skb(struct tipc_link *link, struct sk_buff *skb)
766{
767 struct sk_buff_head head;
768
769 skb2list(skb, &head);
770 return __tipc_link_xmit(link, &head);
771}
772
773int tipc_link_xmit_skb(struct sk_buff *skb, u32 dnode, u32 selector)
774{
775 struct sk_buff_head head;
776
777 skb2list(skb, &head);
778 return tipc_link_xmit(&head, dnode, selector);
779}
780
798/** 781/**
799 * tipc_link_xmit() is the general link level function for message sending 782 * tipc_link_xmit() is the general link level function for message sending
800 * @buf: chain of buffers containing message 783 * @list: chain of buffers containing message
801 * @dsz: amount of user data to be sent 784 * @dsz: amount of user data to be sent
802 * @dnode: address of destination node 785 * @dnode: address of destination node
803 * @selector: a number used for deterministic link selection 786 * @selector: a number used for deterministic link selection
804 * Consumes the buffer chain, except when returning -ELINKCONG 787 * Consumes the buffer chain, except when returning -ELINKCONG
805 * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE 788 * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE
806 */ 789 */
807int tipc_link_xmit(struct sk_buff *buf, u32 dnode, u32 selector) 790int tipc_link_xmit(struct sk_buff_head *list, u32 dnode, u32 selector)
808{ 791{
809 struct tipc_link *link = NULL; 792 struct tipc_link *link = NULL;
810 struct tipc_node *node; 793 struct tipc_node *node;
@@ -815,17 +798,22 @@ int tipc_link_xmit(struct sk_buff *buf, u32 dnode, u32 selector)
815 tipc_node_lock(node); 798 tipc_node_lock(node);
816 link = node->active_links[selector & 1]; 799 link = node->active_links[selector & 1];
817 if (link) 800 if (link)
818 rc = __tipc_link_xmit(link, buf); 801 rc = __tipc_link_xmit(link, list);
819 tipc_node_unlock(node); 802 tipc_node_unlock(node);
820 } 803 }
821 804
822 if (link) 805 if (link)
823 return rc; 806 return rc;
824 807
825 if (likely(in_own_node(dnode))) 808 if (likely(in_own_node(dnode))) {
826 return tipc_sk_rcv(buf); 809 /* As a node local message chain never contains more than one
810 * buffer, we just need to dequeue one SKB buffer from the
811 * head list.
812 */
813 return tipc_sk_rcv(__skb_dequeue(list));
814 }
815 __skb_queue_purge(list);
827 816
828 kfree_skb_list(buf);
829 return rc; 817 return rc;
830} 818}
831 819
@@ -839,17 +827,17 @@ int tipc_link_xmit(struct sk_buff *buf, u32 dnode, u32 selector)
839 */ 827 */
840static void tipc_link_sync_xmit(struct tipc_link *link) 828static void tipc_link_sync_xmit(struct tipc_link *link)
841{ 829{
842 struct sk_buff *buf; 830 struct sk_buff *skb;
843 struct tipc_msg *msg; 831 struct tipc_msg *msg;
844 832
845 buf = tipc_buf_acquire(INT_H_SIZE); 833 skb = tipc_buf_acquire(INT_H_SIZE);
846 if (!buf) 834 if (!skb)
847 return; 835 return;
848 836
849 msg = buf_msg(buf); 837 msg = buf_msg(skb);
850 tipc_msg_init(msg, BCAST_PROTOCOL, STATE_MSG, INT_H_SIZE, link->addr); 838 tipc_msg_init(msg, BCAST_PROTOCOL, STATE_MSG, INT_H_SIZE, link->addr);
851 msg_set_last_bcast(msg, link->owner->bclink.acked); 839 msg_set_last_bcast(msg, link->owner->bclink.acked);
852 __tipc_link_xmit(link, buf); 840 __tipc_link_xmit_skb(link, skb);
853} 841}
854 842
855/* 843/*
@@ -869,85 +857,46 @@ static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf)
869 kfree_skb(buf); 857 kfree_skb(buf);
870} 858}
871 859
860struct sk_buff *tipc_skb_queue_next(const struct sk_buff_head *list,
861 const struct sk_buff *skb)
862{
863 if (skb_queue_is_last(list, skb))
864 return NULL;
865 return skb->next;
866}
867
872/* 868/*
873 * tipc_link_push_packet: Push one unsent packet to the media 869 * tipc_link_push_packets - push unsent packets to bearer
870 *
871 * Push out the unsent messages of a link where congestion
872 * has abated. Node is locked.
873 *
874 * Called with node locked
874 */ 875 */
875static u32 tipc_link_push_packet(struct tipc_link *l_ptr) 876void tipc_link_push_packets(struct tipc_link *l_ptr)
876{ 877{
877 struct sk_buff *buf = l_ptr->first_out; 878 struct sk_buff_head *outqueue = &l_ptr->outqueue;
878 u32 r_q_size = l_ptr->retransm_queue_size; 879 struct sk_buff *skb = l_ptr->next_out;
879 u32 r_q_head = l_ptr->retransm_queue_head; 880 struct tipc_msg *msg;
880 881 u32 next, first;
881 /* Step to position where retransmission failed, if any, */
882 /* consider that buffers may have been released in meantime */
883 if (r_q_size && buf) {
884 u32 last = lesser(mod(r_q_head + r_q_size),
885 link_last_sent(l_ptr));
886 u32 first = buf_seqno(buf);
887
888 while (buf && less(first, r_q_head)) {
889 first = mod(first + 1);
890 buf = buf->next;
891 }
892 l_ptr->retransm_queue_head = r_q_head = first;
893 l_ptr->retransm_queue_size = r_q_size = mod(last - first);
894 }
895
896 /* Continue retransmission now, if there is anything: */
897 if (r_q_size && buf) {
898 msg_set_ack(buf_msg(buf), mod(l_ptr->next_in_no - 1));
899 msg_set_bcast_ack(buf_msg(buf), l_ptr->owner->bclink.last_in);
900 tipc_bearer_send(l_ptr->bearer_id, buf, &l_ptr->media_addr);
901 l_ptr->retransm_queue_head = mod(++r_q_head);
902 l_ptr->retransm_queue_size = --r_q_size;
903 l_ptr->stats.retransmitted++;
904 return 0;
905 }
906
907 /* Send deferred protocol message, if any: */
908 buf = l_ptr->proto_msg_queue;
909 if (buf) {
910 msg_set_ack(buf_msg(buf), mod(l_ptr->next_in_no - 1));
911 msg_set_bcast_ack(buf_msg(buf), l_ptr->owner->bclink.last_in);
912 tipc_bearer_send(l_ptr->bearer_id, buf, &l_ptr->media_addr);
913 l_ptr->unacked_window = 0;
914 kfree_skb(buf);
915 l_ptr->proto_msg_queue = NULL;
916 return 0;
917 }
918 882
919 /* Send one deferred data message, if send window not full: */ 883 skb_queue_walk_from(outqueue, skb) {
920 buf = l_ptr->next_out; 884 msg = buf_msg(skb);
921 if (buf) { 885 next = msg_seqno(msg);
922 struct tipc_msg *msg = buf_msg(buf); 886 first = buf_seqno(skb_peek(outqueue));
923 u32 next = msg_seqno(msg);
924 u32 first = buf_seqno(l_ptr->first_out);
925 887
926 if (mod(next - first) < l_ptr->queue_limit[0]) { 888 if (mod(next - first) < l_ptr->queue_limit[0]) {
927 msg_set_ack(msg, mod(l_ptr->next_in_no - 1)); 889 msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
928 msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in); 890 msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
929 tipc_bearer_send(l_ptr->bearer_id, buf,
930 &l_ptr->media_addr);
931 if (msg_user(msg) == MSG_BUNDLER) 891 if (msg_user(msg) == MSG_BUNDLER)
932 msg_set_type(msg, BUNDLE_CLOSED); 892 TIPC_SKB_CB(skb)->bundling = false;
933 l_ptr->next_out = buf->next; 893 tipc_bearer_send(l_ptr->bearer_id, skb,
934 return 0; 894 &l_ptr->media_addr);
895 l_ptr->next_out = tipc_skb_queue_next(outqueue, skb);
896 } else {
897 break;
935 } 898 }
936 } 899 }
937 return 1;
938}
939
940/*
941 * push_queue(): push out the unsent messages of a link where
942 * congestion has abated. Node is locked
943 */
944void tipc_link_push_queue(struct tipc_link *l_ptr)
945{
946 u32 res;
947
948 do {
949 res = tipc_link_push_packet(l_ptr);
950 } while (!res);
951} 900}
952 901
953void tipc_link_reset_all(struct tipc_node *node) 902void tipc_link_reset_all(struct tipc_node *node)
@@ -1011,20 +960,20 @@ static void link_retransmit_failure(struct tipc_link *l_ptr,
1011 } 960 }
1012} 961}
1013 962
1014void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *buf, 963void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *skb,
1015 u32 retransmits) 964 u32 retransmits)
1016{ 965{
1017 struct tipc_msg *msg; 966 struct tipc_msg *msg;
1018 967
1019 if (!buf) 968 if (!skb)
1020 return; 969 return;
1021 970
1022 msg = buf_msg(buf); 971 msg = buf_msg(skb);
1023 972
1024 /* Detect repeated retransmit failures */ 973 /* Detect repeated retransmit failures */
1025 if (l_ptr->last_retransmitted == msg_seqno(msg)) { 974 if (l_ptr->last_retransmitted == msg_seqno(msg)) {
1026 if (++l_ptr->stale_count > 100) { 975 if (++l_ptr->stale_count > 100) {
1027 link_retransmit_failure(l_ptr, buf); 976 link_retransmit_failure(l_ptr, skb);
1028 return; 977 return;
1029 } 978 }
1030 } else { 979 } else {
@@ -1032,38 +981,29 @@ void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *buf,
1032 l_ptr->stale_count = 1; 981 l_ptr->stale_count = 1;
1033 } 982 }
1034 983
1035 while (retransmits && (buf != l_ptr->next_out) && buf) { 984 skb_queue_walk_from(&l_ptr->outqueue, skb) {
1036 msg = buf_msg(buf); 985 if (!retransmits || skb == l_ptr->next_out)
986 break;
987 msg = buf_msg(skb);
1037 msg_set_ack(msg, mod(l_ptr->next_in_no - 1)); 988 msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
1038 msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in); 989 msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
1039 tipc_bearer_send(l_ptr->bearer_id, buf, &l_ptr->media_addr); 990 tipc_bearer_send(l_ptr->bearer_id, skb, &l_ptr->media_addr);
1040 buf = buf->next;
1041 retransmits--; 991 retransmits--;
1042 l_ptr->stats.retransmitted++; 992 l_ptr->stats.retransmitted++;
1043 } 993 }
1044
1045 l_ptr->retransm_queue_head = l_ptr->retransm_queue_size = 0;
1046} 994}
1047 995
1048/** 996static void link_retrieve_defq(struct tipc_link *link,
1049 * link_insert_deferred_queue - insert deferred messages back into receive chain 997 struct sk_buff_head *list)
1050 */
1051static struct sk_buff *link_insert_deferred_queue(struct tipc_link *l_ptr,
1052 struct sk_buff *buf)
1053{ 998{
1054 u32 seq_no; 999 u32 seq_no;
1055 1000
1056 if (l_ptr->oldest_deferred_in == NULL) 1001 if (skb_queue_empty(&link->deferred_queue))
1057 return buf; 1002 return;
1058 1003
1059 seq_no = buf_seqno(l_ptr->oldest_deferred_in); 1004 seq_no = buf_seqno(skb_peek(&link->deferred_queue));
1060 if (seq_no == mod(l_ptr->next_in_no)) { 1005 if (seq_no == mod(link->next_in_no))
1061 l_ptr->newest_deferred_in->next = buf; 1006 skb_queue_splice_tail_init(&link->deferred_queue, list);
1062 buf = l_ptr->oldest_deferred_in;
1063 l_ptr->oldest_deferred_in = NULL;
1064 l_ptr->deferred_inqueue_sz = 0;
1065 }
1066 return buf;
1067} 1007}
1068 1008
1069/** 1009/**
@@ -1123,43 +1063,42 @@ static int link_recv_buf_validate(struct sk_buff *buf)
1123 1063
1124/** 1064/**
1125 * tipc_rcv - process TIPC packets/messages arriving from off-node 1065 * tipc_rcv - process TIPC packets/messages arriving from off-node
1126 * @head: pointer to message buffer chain 1066 * @skb: TIPC packet
1127 * @b_ptr: pointer to bearer message arrived on 1067 * @b_ptr: pointer to bearer message arrived on
1128 * 1068 *
1129 * Invoked with no locks held. Bearer pointer must point to a valid bearer 1069 * Invoked with no locks held. Bearer pointer must point to a valid bearer
1130 * structure (i.e. cannot be NULL), but bearer can be inactive. 1070 * structure (i.e. cannot be NULL), but bearer can be inactive.
1131 */ 1071 */
1132void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr) 1072void tipc_rcv(struct sk_buff *skb, struct tipc_bearer *b_ptr)
1133{ 1073{
1134 while (head) { 1074 struct sk_buff_head head;
1135 struct tipc_node *n_ptr; 1075 struct tipc_node *n_ptr;
1136 struct tipc_link *l_ptr; 1076 struct tipc_link *l_ptr;
1137 struct sk_buff *crs; 1077 struct sk_buff *skb1, *tmp;
1138 struct sk_buff *buf = head; 1078 struct tipc_msg *msg;
1139 struct tipc_msg *msg; 1079 u32 seq_no;
1140 u32 seq_no; 1080 u32 ackd;
1141 u32 ackd; 1081 u32 released;
1142 u32 released = 0;
1143 1082
1144 head = head->next; 1083 skb2list(skb, &head);
1145 buf->next = NULL;
1146 1084
1085 while ((skb = __skb_dequeue(&head))) {
1147 /* Ensure message is well-formed */ 1086 /* Ensure message is well-formed */
1148 if (unlikely(!link_recv_buf_validate(buf))) 1087 if (unlikely(!link_recv_buf_validate(skb)))
1149 goto discard; 1088 goto discard;
1150 1089
1151 /* Ensure message data is a single contiguous unit */ 1090 /* Ensure message data is a single contiguous unit */
1152 if (unlikely(skb_linearize(buf))) 1091 if (unlikely(skb_linearize(skb)))
1153 goto discard; 1092 goto discard;
1154 1093
1155 /* Handle arrival of a non-unicast link message */ 1094 /* Handle arrival of a non-unicast link message */
1156 msg = buf_msg(buf); 1095 msg = buf_msg(skb);
1157 1096
1158 if (unlikely(msg_non_seq(msg))) { 1097 if (unlikely(msg_non_seq(msg))) {
1159 if (msg_user(msg) == LINK_CONFIG) 1098 if (msg_user(msg) == LINK_CONFIG)
1160 tipc_disc_rcv(buf, b_ptr); 1099 tipc_disc_rcv(skb, b_ptr);
1161 else 1100 else
1162 tipc_bclink_rcv(buf); 1101 tipc_bclink_rcv(skb);
1163 continue; 1102 continue;
1164 } 1103 }
1165 1104
@@ -1198,22 +1137,19 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr)
1198 if (n_ptr->bclink.recv_permitted) 1137 if (n_ptr->bclink.recv_permitted)
1199 tipc_bclink_acknowledge(n_ptr, msg_bcast_ack(msg)); 1138 tipc_bclink_acknowledge(n_ptr, msg_bcast_ack(msg));
1200 1139
1201 crs = l_ptr->first_out; 1140 released = 0;
1202 while ((crs != l_ptr->next_out) && 1141 skb_queue_walk_safe(&l_ptr->outqueue, skb1, tmp) {
1203 less_eq(buf_seqno(crs), ackd)) { 1142 if (skb1 == l_ptr->next_out ||
1204 struct sk_buff *next = crs->next; 1143 more(buf_seqno(skb1), ackd))
1205 kfree_skb(crs); 1144 break;
1206 crs = next; 1145 __skb_unlink(skb1, &l_ptr->outqueue);
1207 released++; 1146 kfree_skb(skb1);
1208 } 1147 released = 1;
1209 if (released) {
1210 l_ptr->first_out = crs;
1211 l_ptr->out_queue_size -= released;
1212 } 1148 }
1213 1149
1214 /* Try sending any messages link endpoint has pending */ 1150 /* Try sending any messages link endpoint has pending */
1215 if (unlikely(l_ptr->next_out)) 1151 if (unlikely(l_ptr->next_out))
1216 tipc_link_push_queue(l_ptr); 1152 tipc_link_push_packets(l_ptr);
1217 1153
1218 if (released && !skb_queue_empty(&l_ptr->waiting_sks)) { 1154 if (released && !skb_queue_empty(&l_ptr->waiting_sks)) {
1219 link_prepare_wakeup(l_ptr); 1155 link_prepare_wakeup(l_ptr);
@@ -1223,8 +1159,8 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr)
1223 /* Process the incoming packet */ 1159 /* Process the incoming packet */
1224 if (unlikely(!link_working_working(l_ptr))) { 1160 if (unlikely(!link_working_working(l_ptr))) {
1225 if (msg_user(msg) == LINK_PROTOCOL) { 1161 if (msg_user(msg) == LINK_PROTOCOL) {
1226 tipc_link_proto_rcv(l_ptr, buf); 1162 tipc_link_proto_rcv(l_ptr, skb);
1227 head = link_insert_deferred_queue(l_ptr, head); 1163 link_retrieve_defq(l_ptr, &head);
1228 tipc_node_unlock(n_ptr); 1164 tipc_node_unlock(n_ptr);
1229 continue; 1165 continue;
1230 } 1166 }
@@ -1234,8 +1170,7 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr)
1234 1170
1235 if (link_working_working(l_ptr)) { 1171 if (link_working_working(l_ptr)) {
1236 /* Re-insert buffer in front of queue */ 1172 /* Re-insert buffer in front of queue */
1237 buf->next = head; 1173 __skb_queue_head(&head, skb);
1238 head = buf;
1239 tipc_node_unlock(n_ptr); 1174 tipc_node_unlock(n_ptr);
1240 continue; 1175 continue;
1241 } 1176 }
@@ -1244,33 +1179,33 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr)
1244 1179
1245 /* Link is now in state WORKING_WORKING */ 1180 /* Link is now in state WORKING_WORKING */
1246 if (unlikely(seq_no != mod(l_ptr->next_in_no))) { 1181 if (unlikely(seq_no != mod(l_ptr->next_in_no))) {
1247 link_handle_out_of_seq_msg(l_ptr, buf); 1182 link_handle_out_of_seq_msg(l_ptr, skb);
1248 head = link_insert_deferred_queue(l_ptr, head); 1183 link_retrieve_defq(l_ptr, &head);
1249 tipc_node_unlock(n_ptr); 1184 tipc_node_unlock(n_ptr);
1250 continue; 1185 continue;
1251 } 1186 }
1252 l_ptr->next_in_no++; 1187 l_ptr->next_in_no++;
1253 if (unlikely(l_ptr->oldest_deferred_in)) 1188 if (unlikely(!skb_queue_empty(&l_ptr->deferred_queue)))
1254 head = link_insert_deferred_queue(l_ptr, head); 1189 link_retrieve_defq(l_ptr, &head);
1255 1190
1256 if (unlikely(++l_ptr->unacked_window >= TIPC_MIN_LINK_WIN)) { 1191 if (unlikely(++l_ptr->unacked_window >= TIPC_MIN_LINK_WIN)) {
1257 l_ptr->stats.sent_acks++; 1192 l_ptr->stats.sent_acks++;
1258 tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0, 0); 1193 tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);
1259 } 1194 }
1260 1195
1261 if (tipc_link_prepare_input(l_ptr, &buf)) { 1196 if (tipc_link_prepare_input(l_ptr, &skb)) {
1262 tipc_node_unlock(n_ptr); 1197 tipc_node_unlock(n_ptr);
1263 continue; 1198 continue;
1264 } 1199 }
1265 tipc_node_unlock(n_ptr); 1200 tipc_node_unlock(n_ptr);
1266 msg = buf_msg(buf); 1201
1267 if (tipc_link_input(l_ptr, buf) != 0) 1202 if (tipc_link_input(l_ptr, skb) != 0)
1268 goto discard; 1203 goto discard;
1269 continue; 1204 continue;
1270unlock_discard: 1205unlock_discard:
1271 tipc_node_unlock(n_ptr); 1206 tipc_node_unlock(n_ptr);
1272discard: 1207discard:
1273 kfree_skb(buf); 1208 kfree_skb(skb);
1274 } 1209 }
1275} 1210}
1276 1211
@@ -1353,48 +1288,37 @@ static int tipc_link_input(struct tipc_link *l, struct sk_buff *buf)
1353 * 1288 *
1354 * Returns increase in queue length (i.e. 0 or 1) 1289 * Returns increase in queue length (i.e. 0 or 1)
1355 */ 1290 */
1356u32 tipc_link_defer_pkt(struct sk_buff **head, struct sk_buff **tail, 1291u32 tipc_link_defer_pkt(struct sk_buff_head *list, struct sk_buff *skb)
1357 struct sk_buff *buf)
1358{ 1292{
1359 struct sk_buff *queue_buf; 1293 struct sk_buff *skb1;
1360 struct sk_buff **prev; 1294 u32 seq_no = buf_seqno(skb);
1361 u32 seq_no = buf_seqno(buf);
1362
1363 buf->next = NULL;
1364 1295
1365 /* Empty queue ? */ 1296 /* Empty queue ? */
1366 if (*head == NULL) { 1297 if (skb_queue_empty(list)) {
1367 *head = *tail = buf; 1298 __skb_queue_tail(list, skb);
1368 return 1; 1299 return 1;
1369 } 1300 }
1370 1301
1371 /* Last ? */ 1302 /* Last ? */
1372 if (less(buf_seqno(*tail), seq_no)) { 1303 if (less(buf_seqno(skb_peek_tail(list)), seq_no)) {
1373 (*tail)->next = buf; 1304 __skb_queue_tail(list, skb);
1374 *tail = buf;
1375 return 1; 1305 return 1;
1376 } 1306 }
1377 1307
1378 /* Locate insertion point in queue, then insert; discard if duplicate */ 1308 /* Locate insertion point in queue, then insert; discard if duplicate */
1379 prev = head; 1309 skb_queue_walk(list, skb1) {
1380 queue_buf = *head; 1310 u32 curr_seqno = buf_seqno(skb1);
1381 for (;;) {
1382 u32 curr_seqno = buf_seqno(queue_buf);
1383 1311
1384 if (seq_no == curr_seqno) { 1312 if (seq_no == curr_seqno) {
1385 kfree_skb(buf); 1313 kfree_skb(skb);
1386 return 0; 1314 return 0;
1387 } 1315 }
1388 1316
1389 if (less(seq_no, curr_seqno)) 1317 if (less(seq_no, curr_seqno))
1390 break; 1318 break;
1391
1392 prev = &queue_buf->next;
1393 queue_buf = queue_buf->next;
1394 } 1319 }
1395 1320
1396 buf->next = queue_buf; 1321 __skb_queue_before(list, skb1, skb);
1397 *prev = buf;
1398 return 1; 1322 return 1;
1399} 1323}
1400 1324
@@ -1424,15 +1348,14 @@ static void link_handle_out_of_seq_msg(struct tipc_link *l_ptr,
1424 return; 1348 return;
1425 } 1349 }
1426 1350
1427 if (tipc_link_defer_pkt(&l_ptr->oldest_deferred_in, 1351 if (tipc_link_defer_pkt(&l_ptr->deferred_queue, buf)) {
1428 &l_ptr->newest_deferred_in, buf)) {
1429 l_ptr->deferred_inqueue_sz++;
1430 l_ptr->stats.deferred_recv++; 1352 l_ptr->stats.deferred_recv++;
1431 TIPC_SKB_CB(buf)->deferred = true; 1353 TIPC_SKB_CB(buf)->deferred = true;
1432 if ((l_ptr->deferred_inqueue_sz % 16) == 1) 1354 if ((skb_queue_len(&l_ptr->deferred_queue) % 16) == 1)
1433 tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0, 0); 1355 tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);
1434 } else 1356 } else {
1435 l_ptr->stats.duplicates++; 1357 l_ptr->stats.duplicates++;
1358 }
1436} 1359}
1437 1360
1438/* 1361/*
@@ -1446,12 +1369,6 @@ void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int probe_msg,
1446 u32 msg_size = sizeof(l_ptr->proto_msg); 1369 u32 msg_size = sizeof(l_ptr->proto_msg);
1447 int r_flag; 1370 int r_flag;
1448 1371
1449 /* Discard any previous message that was deferred due to congestion */
1450 if (l_ptr->proto_msg_queue) {
1451 kfree_skb(l_ptr->proto_msg_queue);
1452 l_ptr->proto_msg_queue = NULL;
1453 }
1454
1455 /* Don't send protocol message during link changeover */ 1372 /* Don't send protocol message during link changeover */
1456 if (l_ptr->exp_msg_count) 1373 if (l_ptr->exp_msg_count)
1457 return; 1374 return;
@@ -1474,8 +1391,8 @@ void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int probe_msg,
1474 if (l_ptr->next_out) 1391 if (l_ptr->next_out)
1475 next_sent = buf_seqno(l_ptr->next_out); 1392 next_sent = buf_seqno(l_ptr->next_out);
1476 msg_set_next_sent(msg, next_sent); 1393 msg_set_next_sent(msg, next_sent);
1477 if (l_ptr->oldest_deferred_in) { 1394 if (!skb_queue_empty(&l_ptr->deferred_queue)) {
1478 u32 rec = buf_seqno(l_ptr->oldest_deferred_in); 1395 u32 rec = buf_seqno(skb_peek(&l_ptr->deferred_queue));
1479 gap = mod(rec - mod(l_ptr->next_in_no)); 1396 gap = mod(rec - mod(l_ptr->next_in_no));
1480 } 1397 }
1481 msg_set_seq_gap(msg, gap); 1398 msg_set_seq_gap(msg, gap);
@@ -1663,7 +1580,7 @@ static void tipc_link_proto_rcv(struct tipc_link *l_ptr, struct sk_buff *buf)
1663 } 1580 }
1664 if (msg_seq_gap(msg)) { 1581 if (msg_seq_gap(msg)) {
1665 l_ptr->stats.recv_nacks++; 1582 l_ptr->stats.recv_nacks++;
1666 tipc_link_retransmit(l_ptr, l_ptr->first_out, 1583 tipc_link_retransmit(l_ptr, skb_peek(&l_ptr->outqueue),
1667 msg_seq_gap(msg)); 1584 msg_seq_gap(msg));
1668 } 1585 }
1669 break; 1586 break;
@@ -1682,7 +1599,7 @@ static void tipc_link_tunnel_xmit(struct tipc_link *l_ptr,
1682 u32 selector) 1599 u32 selector)
1683{ 1600{
1684 struct tipc_link *tunnel; 1601 struct tipc_link *tunnel;
1685 struct sk_buff *buf; 1602 struct sk_buff *skb;
1686 u32 length = msg_size(msg); 1603 u32 length = msg_size(msg);
1687 1604
1688 tunnel = l_ptr->owner->active_links[selector & 1]; 1605 tunnel = l_ptr->owner->active_links[selector & 1];
@@ -1691,14 +1608,14 @@ static void tipc_link_tunnel_xmit(struct tipc_link *l_ptr,
1691 return; 1608 return;
1692 } 1609 }
1693 msg_set_size(tunnel_hdr, length + INT_H_SIZE); 1610 msg_set_size(tunnel_hdr, length + INT_H_SIZE);
1694 buf = tipc_buf_acquire(length + INT_H_SIZE); 1611 skb = tipc_buf_acquire(length + INT_H_SIZE);
1695 if (!buf) { 1612 if (!skb) {
1696 pr_warn("%sunable to send tunnel msg\n", link_co_err); 1613 pr_warn("%sunable to send tunnel msg\n", link_co_err);
1697 return; 1614 return;
1698 } 1615 }
1699 skb_copy_to_linear_data(buf, tunnel_hdr, INT_H_SIZE); 1616 skb_copy_to_linear_data(skb, tunnel_hdr, INT_H_SIZE);
1700 skb_copy_to_linear_data_offset(buf, INT_H_SIZE, msg, length); 1617 skb_copy_to_linear_data_offset(skb, INT_H_SIZE, msg, length);
1701 __tipc_link_xmit(tunnel, buf); 1618 __tipc_link_xmit_skb(tunnel, skb);
1702} 1619}
1703 1620
1704 1621
@@ -1710,10 +1627,10 @@ static void tipc_link_tunnel_xmit(struct tipc_link *l_ptr,
1710 */ 1627 */
1711void tipc_link_failover_send_queue(struct tipc_link *l_ptr) 1628void tipc_link_failover_send_queue(struct tipc_link *l_ptr)
1712{ 1629{
1713 u32 msgcount = l_ptr->out_queue_size; 1630 u32 msgcount = skb_queue_len(&l_ptr->outqueue);
1714 struct sk_buff *crs = l_ptr->first_out;
1715 struct tipc_link *tunnel = l_ptr->owner->active_links[0]; 1631 struct tipc_link *tunnel = l_ptr->owner->active_links[0];
1716 struct tipc_msg tunnel_hdr; 1632 struct tipc_msg tunnel_hdr;
1633 struct sk_buff *skb;
1717 int split_bundles; 1634 int split_bundles;
1718 1635
1719 if (!tunnel) 1636 if (!tunnel)
@@ -1724,14 +1641,12 @@ void tipc_link_failover_send_queue(struct tipc_link *l_ptr)
1724 msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id); 1641 msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id);
1725 msg_set_msgcnt(&tunnel_hdr, msgcount); 1642 msg_set_msgcnt(&tunnel_hdr, msgcount);
1726 1643
1727 if (!l_ptr->first_out) { 1644 if (skb_queue_empty(&l_ptr->outqueue)) {
1728 struct sk_buff *buf; 1645 skb = tipc_buf_acquire(INT_H_SIZE);
1729 1646 if (skb) {
1730 buf = tipc_buf_acquire(INT_H_SIZE); 1647 skb_copy_to_linear_data(skb, &tunnel_hdr, INT_H_SIZE);
1731 if (buf) {
1732 skb_copy_to_linear_data(buf, &tunnel_hdr, INT_H_SIZE);
1733 msg_set_size(&tunnel_hdr, INT_H_SIZE); 1648 msg_set_size(&tunnel_hdr, INT_H_SIZE);
1734 __tipc_link_xmit(tunnel, buf); 1649 __tipc_link_xmit_skb(tunnel, skb);
1735 } else { 1650 } else {
1736 pr_warn("%sunable to send changeover msg\n", 1651 pr_warn("%sunable to send changeover msg\n",
1737 link_co_err); 1652 link_co_err);
@@ -1742,8 +1657,8 @@ void tipc_link_failover_send_queue(struct tipc_link *l_ptr)
1742 split_bundles = (l_ptr->owner->active_links[0] != 1657 split_bundles = (l_ptr->owner->active_links[0] !=
1743 l_ptr->owner->active_links[1]); 1658 l_ptr->owner->active_links[1]);
1744 1659
1745 while (crs) { 1660 skb_queue_walk(&l_ptr->outqueue, skb) {
1746 struct tipc_msg *msg = buf_msg(crs); 1661 struct tipc_msg *msg = buf_msg(skb);
1747 1662
1748 if ((msg_user(msg) == MSG_BUNDLER) && split_bundles) { 1663 if ((msg_user(msg) == MSG_BUNDLER) && split_bundles) {
1749 struct tipc_msg *m = msg_get_wrapped(msg); 1664 struct tipc_msg *m = msg_get_wrapped(msg);
@@ -1761,7 +1676,6 @@ void tipc_link_failover_send_queue(struct tipc_link *l_ptr)
1761 tipc_link_tunnel_xmit(l_ptr, &tunnel_hdr, msg, 1676 tipc_link_tunnel_xmit(l_ptr, &tunnel_hdr, msg,
1762 msg_link_selector(msg)); 1677 msg_link_selector(msg));
1763 } 1678 }
1764 crs = crs->next;
1765 } 1679 }
1766} 1680}
1767 1681
@@ -1777,17 +1691,16 @@ void tipc_link_failover_send_queue(struct tipc_link *l_ptr)
1777void tipc_link_dup_queue_xmit(struct tipc_link *l_ptr, 1691void tipc_link_dup_queue_xmit(struct tipc_link *l_ptr,
1778 struct tipc_link *tunnel) 1692 struct tipc_link *tunnel)
1779{ 1693{
1780 struct sk_buff *iter; 1694 struct sk_buff *skb;
1781 struct tipc_msg tunnel_hdr; 1695 struct tipc_msg tunnel_hdr;
1782 1696
1783 tipc_msg_init(&tunnel_hdr, CHANGEOVER_PROTOCOL, 1697 tipc_msg_init(&tunnel_hdr, CHANGEOVER_PROTOCOL,
1784 DUPLICATE_MSG, INT_H_SIZE, l_ptr->addr); 1698 DUPLICATE_MSG, INT_H_SIZE, l_ptr->addr);
1785 msg_set_msgcnt(&tunnel_hdr, l_ptr->out_queue_size); 1699 msg_set_msgcnt(&tunnel_hdr, skb_queue_len(&l_ptr->outqueue));
1786 msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id); 1700 msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id);
1787 iter = l_ptr->first_out; 1701 skb_queue_walk(&l_ptr->outqueue, skb) {
1788 while (iter) { 1702 struct sk_buff *outskb;
1789 struct sk_buff *outbuf; 1703 struct tipc_msg *msg = buf_msg(skb);
1790 struct tipc_msg *msg = buf_msg(iter);
1791 u32 length = msg_size(msg); 1704 u32 length = msg_size(msg);
1792 1705
1793 if (msg_user(msg) == MSG_BUNDLER) 1706 if (msg_user(msg) == MSG_BUNDLER)
@@ -1795,19 +1708,18 @@ void tipc_link_dup_queue_xmit(struct tipc_link *l_ptr,
1795 msg_set_ack(msg, mod(l_ptr->next_in_no - 1)); /* Update */ 1708 msg_set_ack(msg, mod(l_ptr->next_in_no - 1)); /* Update */
1796 msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in); 1709 msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
1797 msg_set_size(&tunnel_hdr, length + INT_H_SIZE); 1710 msg_set_size(&tunnel_hdr, length + INT_H_SIZE);
1798 outbuf = tipc_buf_acquire(length + INT_H_SIZE); 1711 outskb = tipc_buf_acquire(length + INT_H_SIZE);
1799 if (outbuf == NULL) { 1712 if (outskb == NULL) {
1800 pr_warn("%sunable to send duplicate msg\n", 1713 pr_warn("%sunable to send duplicate msg\n",
1801 link_co_err); 1714 link_co_err);
1802 return; 1715 return;
1803 } 1716 }
1804 skb_copy_to_linear_data(outbuf, &tunnel_hdr, INT_H_SIZE); 1717 skb_copy_to_linear_data(outskb, &tunnel_hdr, INT_H_SIZE);
1805 skb_copy_to_linear_data_offset(outbuf, INT_H_SIZE, iter->data, 1718 skb_copy_to_linear_data_offset(outskb, INT_H_SIZE, skb->data,
1806 length); 1719 length);
1807 __tipc_link_xmit(tunnel, outbuf); 1720 __tipc_link_xmit_skb(tunnel, outskb);
1808 if (!tipc_link_is_up(l_ptr)) 1721 if (!tipc_link_is_up(l_ptr))
1809 return; 1722 return;
1810 iter = iter->next;
1811 } 1723 }
1812} 1724}
1813 1725
diff --git a/net/tipc/link.h b/net/tipc/link.h
index f463e7be801c..55812e87ca1e 100644
--- a/net/tipc/link.h
+++ b/net/tipc/link.h
@@ -119,20 +119,13 @@ struct tipc_stats {
119 * @max_pkt: current maximum packet size for this link 119 * @max_pkt: current maximum packet size for this link
120 * @max_pkt_target: desired maximum packet size for this link 120 * @max_pkt_target: desired maximum packet size for this link
121 * @max_pkt_probes: # of probes based on current (max_pkt, max_pkt_target) 121 * @max_pkt_probes: # of probes based on current (max_pkt, max_pkt_target)
122 * @out_queue_size: # of messages in outbound message queue 122 * @outqueue: outbound message queue
123 * @first_out: ptr to first outbound message in queue
124 * @last_out: ptr to last outbound message in queue
125 * @next_out_no: next sequence number to use for outbound messages 123 * @next_out_no: next sequence number to use for outbound messages
126 * @last_retransmitted: sequence number of most recently retransmitted message 124 * @last_retransmitted: sequence number of most recently retransmitted message
127 * @stale_count: # of identical retransmit requests made by peer 125 * @stale_count: # of identical retransmit requests made by peer
128 * @next_in_no: next sequence number to expect for inbound messages 126 * @next_in_no: next sequence number to expect for inbound messages
129 * @deferred_inqueue_sz: # of messages in inbound message queue 127 * @deferred_queue: deferred queue saved OOS b'cast message received from node
130 * @oldest_deferred_in: ptr to first inbound message in queue
131 * @newest_deferred_in: ptr to last inbound message in queue
132 * @unacked_window: # of inbound messages rx'd without ack'ing back to peer 128 * @unacked_window: # of inbound messages rx'd without ack'ing back to peer
133 * @proto_msg_queue: ptr to (single) outbound control message
134 * @retransm_queue_size: number of messages to retransmit
135 * @retransm_queue_head: sequence number of first message to retransmit
136 * @next_out: ptr to first unsent outbound message in queue 129 * @next_out: ptr to first unsent outbound message in queue
137 * @waiting_sks: linked list of sockets waiting for link congestion to abate 130 * @waiting_sks: linked list of sockets waiting for link congestion to abate
138 * @long_msg_seq_no: next identifier to use for outbound fragmented messages 131 * @long_msg_seq_no: next identifier to use for outbound fragmented messages
@@ -176,24 +169,17 @@ struct tipc_link {
176 u32 max_pkt_probes; 169 u32 max_pkt_probes;
177 170
178 /* Sending */ 171 /* Sending */
179 u32 out_queue_size; 172 struct sk_buff_head outqueue;
180 struct sk_buff *first_out;
181 struct sk_buff *last_out;
182 u32 next_out_no; 173 u32 next_out_no;
183 u32 last_retransmitted; 174 u32 last_retransmitted;
184 u32 stale_count; 175 u32 stale_count;
185 176
186 /* Reception */ 177 /* Reception */
187 u32 next_in_no; 178 u32 next_in_no;
188 u32 deferred_inqueue_sz; 179 struct sk_buff_head deferred_queue;
189 struct sk_buff *oldest_deferred_in;
190 struct sk_buff *newest_deferred_in;
191 u32 unacked_window; 180 u32 unacked_window;
192 181
193 /* Congestion handling */ 182 /* Congestion handling */
194 struct sk_buff *proto_msg_queue;
195 u32 retransm_queue_size;
196 u32 retransm_queue_head;
197 struct sk_buff *next_out; 183 struct sk_buff *next_out;
198 struct sk_buff_head waiting_sks; 184 struct sk_buff_head waiting_sks;
199 185
@@ -227,18 +213,20 @@ struct sk_buff *tipc_link_cmd_reset_stats(const void *req_tlv_area,
227void tipc_link_reset_all(struct tipc_node *node); 213void tipc_link_reset_all(struct tipc_node *node);
228void tipc_link_reset(struct tipc_link *l_ptr); 214void tipc_link_reset(struct tipc_link *l_ptr);
229void tipc_link_reset_list(unsigned int bearer_id); 215void tipc_link_reset_list(unsigned int bearer_id);
230int tipc_link_xmit(struct sk_buff *buf, u32 dest, u32 selector); 216int tipc_link_xmit_skb(struct sk_buff *skb, u32 dest, u32 selector);
231int __tipc_link_xmit(struct tipc_link *link, struct sk_buff *buf); 217int tipc_link_xmit(struct sk_buff_head *list, u32 dest, u32 selector);
218int __tipc_link_xmit(struct tipc_link *link, struct sk_buff_head *list);
232u32 tipc_link_get_max_pkt(u32 dest, u32 selector); 219u32 tipc_link_get_max_pkt(u32 dest, u32 selector);
233void tipc_link_bundle_rcv(struct sk_buff *buf); 220void tipc_link_bundle_rcv(struct sk_buff *buf);
234void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int prob, 221void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int prob,
235 u32 gap, u32 tolerance, u32 priority, u32 acked_mtu); 222 u32 gap, u32 tolerance, u32 priority, u32 acked_mtu);
236void tipc_link_push_queue(struct tipc_link *l_ptr); 223void tipc_link_push_packets(struct tipc_link *l_ptr);
237u32 tipc_link_defer_pkt(struct sk_buff **head, struct sk_buff **tail, 224u32 tipc_link_defer_pkt(struct sk_buff_head *list, struct sk_buff *buf);
238 struct sk_buff *buf);
239void tipc_link_set_queue_limits(struct tipc_link *l_ptr, u32 window); 225void tipc_link_set_queue_limits(struct tipc_link *l_ptr, u32 window);
240void tipc_link_retransmit(struct tipc_link *l_ptr, 226void tipc_link_retransmit(struct tipc_link *l_ptr,
241 struct sk_buff *start, u32 retransmits); 227 struct sk_buff *start, u32 retransmits);
228struct sk_buff *tipc_skb_queue_next(const struct sk_buff_head *list,
229 const struct sk_buff *skb);
242 230
243int tipc_nl_link_dump(struct sk_buff *skb, struct netlink_callback *cb); 231int tipc_nl_link_dump(struct sk_buff *skb, struct netlink_callback *cb);
244int tipc_nl_link_get(struct sk_buff *skb, struct genl_info *info); 232int tipc_nl_link_get(struct sk_buff *skb, struct genl_info *info);
@@ -259,18 +247,14 @@ static inline u32 mod(u32 x)
259 return x & 0xffffu; 247 return x & 0xffffu;
260} 248}
261 249
262static inline int between(u32 lower, u32 upper, u32 n) 250static inline int less_eq(u32 left, u32 right)
263{ 251{
264 if ((lower < n) && (n < upper)) 252 return mod(right - left) < 32768u;
265 return 1;
266 if ((upper < lower) && ((n > lower) || (n < upper)))
267 return 1;
268 return 0;
269} 253}
270 254
271static inline int less_eq(u32 left, u32 right) 255static inline int more(u32 left, u32 right)
272{ 256{
273 return mod(right - left) < 32768u; 257 return !less_eq(left, right);
274} 258}
275 259
276static inline int less(u32 left, u32 right) 260static inline int less(u32 left, u32 right)
@@ -309,7 +293,7 @@ static inline int link_reset_reset(struct tipc_link *l_ptr)
309 293
310static inline int link_congested(struct tipc_link *l_ptr) 294static inline int link_congested(struct tipc_link *l_ptr)
311{ 295{
312 return l_ptr->out_queue_size >= l_ptr->queue_limit[0]; 296 return skb_queue_len(&l_ptr->outqueue) >= l_ptr->queue_limit[0];
313} 297}
314 298
315#endif 299#endif
diff --git a/net/tipc/msg.c b/net/tipc/msg.c
index 9155496b8a8a..5b0659791c07 100644
--- a/net/tipc/msg.c
+++ b/net/tipc/msg.c
@@ -166,11 +166,12 @@ err:
166 * @offset: Posision in iov to start copying from 166 * @offset: Posision in iov to start copying from
167 * @dsz: Total length of user data 167 * @dsz: Total length of user data
168 * @pktmax: Max packet size that can be used 168 * @pktmax: Max packet size that can be used
169 * @chain: Buffer or chain of buffers to be returned to caller 169 * @list: Buffer or chain of buffers to be returned to caller
170 *
170 * Returns message data size or errno: -ENOMEM, -EFAULT 171 * Returns message data size or errno: -ENOMEM, -EFAULT
171 */ 172 */
172int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m, 173int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m, int offset,
173 int offset, int dsz, int pktmax , struct sk_buff **chain) 174 int dsz, int pktmax, struct sk_buff_head *list)
174{ 175{
175 int mhsz = msg_hdr_sz(mhdr); 176 int mhsz = msg_hdr_sz(mhdr);
176 int msz = mhsz + dsz; 177 int msz = mhsz + dsz;
@@ -179,22 +180,22 @@ int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m,
179 int pktrem = pktmax; 180 int pktrem = pktmax;
180 int drem = dsz; 181 int drem = dsz;
181 struct tipc_msg pkthdr; 182 struct tipc_msg pkthdr;
182 struct sk_buff *buf, *prev; 183 struct sk_buff *skb;
183 char *pktpos; 184 char *pktpos;
184 int rc; 185 int rc;
185 uint chain_sz = 0; 186
186 msg_set_size(mhdr, msz); 187 msg_set_size(mhdr, msz);
187 188
188 /* No fragmentation needed? */ 189 /* No fragmentation needed? */
189 if (likely(msz <= pktmax)) { 190 if (likely(msz <= pktmax)) {
190 buf = tipc_buf_acquire(msz); 191 skb = tipc_buf_acquire(msz);
191 *chain = buf; 192 if (unlikely(!skb))
192 if (unlikely(!buf))
193 return -ENOMEM; 193 return -ENOMEM;
194 skb_copy_to_linear_data(buf, mhdr, mhsz); 194 __skb_queue_tail(list, skb);
195 pktpos = buf->data + mhsz; 195 skb_copy_to_linear_data(skb, mhdr, mhsz);
196 TIPC_SKB_CB(buf)->chain_sz = 1; 196 pktpos = skb->data + mhsz;
197 if (!dsz || !memcpy_fromiovecend(pktpos, m->msg_iov, offset, dsz)) 197 if (!dsz || !memcpy_fromiovecend(pktpos, m->msg_iov, offset,
198 dsz))
198 return dsz; 199 return dsz;
199 rc = -EFAULT; 200 rc = -EFAULT;
200 goto error; 201 goto error;
@@ -207,15 +208,15 @@ int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m,
207 msg_set_fragm_no(&pkthdr, pktno); 208 msg_set_fragm_no(&pkthdr, pktno);
208 209
209 /* Prepare first fragment */ 210 /* Prepare first fragment */
210 *chain = buf = tipc_buf_acquire(pktmax); 211 skb = tipc_buf_acquire(pktmax);
211 if (!buf) 212 if (!skb)
212 return -ENOMEM; 213 return -ENOMEM;
213 chain_sz = 1; 214 __skb_queue_tail(list, skb);
214 pktpos = buf->data; 215 pktpos = skb->data;
215 skb_copy_to_linear_data(buf, &pkthdr, INT_H_SIZE); 216 skb_copy_to_linear_data(skb, &pkthdr, INT_H_SIZE);
216 pktpos += INT_H_SIZE; 217 pktpos += INT_H_SIZE;
217 pktrem -= INT_H_SIZE; 218 pktrem -= INT_H_SIZE;
218 skb_copy_to_linear_data_offset(buf, INT_H_SIZE, mhdr, mhsz); 219 skb_copy_to_linear_data_offset(skb, INT_H_SIZE, mhdr, mhsz);
219 pktpos += mhsz; 220 pktpos += mhsz;
220 pktrem -= mhsz; 221 pktrem -= mhsz;
221 222
@@ -238,43 +239,41 @@ int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m,
238 pktsz = drem + INT_H_SIZE; 239 pktsz = drem + INT_H_SIZE;
239 else 240 else
240 pktsz = pktmax; 241 pktsz = pktmax;
241 prev = buf; 242 skb = tipc_buf_acquire(pktsz);
242 buf = tipc_buf_acquire(pktsz); 243 if (!skb) {
243 if (!buf) {
244 rc = -ENOMEM; 244 rc = -ENOMEM;
245 goto error; 245 goto error;
246 } 246 }
247 chain_sz++; 247 __skb_queue_tail(list, skb);
248 prev->next = buf;
249 msg_set_type(&pkthdr, FRAGMENT); 248 msg_set_type(&pkthdr, FRAGMENT);
250 msg_set_size(&pkthdr, pktsz); 249 msg_set_size(&pkthdr, pktsz);
251 msg_set_fragm_no(&pkthdr, ++pktno); 250 msg_set_fragm_no(&pkthdr, ++pktno);
252 skb_copy_to_linear_data(buf, &pkthdr, INT_H_SIZE); 251 skb_copy_to_linear_data(skb, &pkthdr, INT_H_SIZE);
253 pktpos = buf->data + INT_H_SIZE; 252 pktpos = skb->data + INT_H_SIZE;
254 pktrem = pktsz - INT_H_SIZE; 253 pktrem = pktsz - INT_H_SIZE;
255 254
256 } while (1); 255 } while (1);
257 TIPC_SKB_CB(*chain)->chain_sz = chain_sz; 256 msg_set_type(buf_msg(skb), LAST_FRAGMENT);
258 msg_set_type(buf_msg(buf), LAST_FRAGMENT);
259 return dsz; 257 return dsz;
260error: 258error:
261 kfree_skb_list(*chain); 259 __skb_queue_purge(list);
262 *chain = NULL; 260 __skb_queue_head_init(list);
263 return rc; 261 return rc;
264} 262}
265 263
266/** 264/**
267 * tipc_msg_bundle(): Append contents of a buffer to tail of an existing one 265 * tipc_msg_bundle(): Append contents of a buffer to tail of an existing one
268 * @bbuf: the existing buffer ("bundle") 266 * @list: the buffer chain of the existing buffer ("bundle")
269 * @buf: buffer to be appended 267 * @skb: buffer to be appended
270 * @mtu: max allowable size for the bundle buffer 268 * @mtu: max allowable size for the bundle buffer
271 * Consumes buffer if successful 269 * Consumes buffer if successful
272 * Returns true if bundling could be performed, otherwise false 270 * Returns true if bundling could be performed, otherwise false
273 */ 271 */
274bool tipc_msg_bundle(struct sk_buff *bbuf, struct sk_buff *buf, u32 mtu) 272bool tipc_msg_bundle(struct sk_buff_head *list, struct sk_buff *skb, u32 mtu)
275{ 273{
276 struct tipc_msg *bmsg = buf_msg(bbuf); 274 struct sk_buff *bskb = skb_peek_tail(list);
277 struct tipc_msg *msg = buf_msg(buf); 275 struct tipc_msg *bmsg = buf_msg(bskb);
276 struct tipc_msg *msg = buf_msg(skb);
278 unsigned int bsz = msg_size(bmsg); 277 unsigned int bsz = msg_size(bmsg);
279 unsigned int msz = msg_size(msg); 278 unsigned int msz = msg_size(msg);
280 u32 start = align(bsz); 279 u32 start = align(bsz);
@@ -289,35 +288,36 @@ bool tipc_msg_bundle(struct sk_buff *bbuf, struct sk_buff *buf, u32 mtu)
289 return false; 288 return false;
290 if (likely(msg_user(bmsg) != MSG_BUNDLER)) 289 if (likely(msg_user(bmsg) != MSG_BUNDLER))
291 return false; 290 return false;
292 if (likely(msg_type(bmsg) != BUNDLE_OPEN)) 291 if (likely(!TIPC_SKB_CB(bskb)->bundling))
293 return false; 292 return false;
294 if (unlikely(skb_tailroom(bbuf) < (pad + msz))) 293 if (unlikely(skb_tailroom(bskb) < (pad + msz)))
295 return false; 294 return false;
296 if (unlikely(max < (start + msz))) 295 if (unlikely(max < (start + msz)))
297 return false; 296 return false;
298 297
299 skb_put(bbuf, pad + msz); 298 skb_put(bskb, pad + msz);
300 skb_copy_to_linear_data_offset(bbuf, start, buf->data, msz); 299 skb_copy_to_linear_data_offset(bskb, start, skb->data, msz);
301 msg_set_size(bmsg, start + msz); 300 msg_set_size(bmsg, start + msz);
302 msg_set_msgcnt(bmsg, msg_msgcnt(bmsg) + 1); 301 msg_set_msgcnt(bmsg, msg_msgcnt(bmsg) + 1);
303 bbuf->next = buf->next; 302 kfree_skb(skb);
304 kfree_skb(buf);
305 return true; 303 return true;
306} 304}
307 305
308/** 306/**
309 * tipc_msg_make_bundle(): Create bundle buf and append message to its tail 307 * tipc_msg_make_bundle(): Create bundle buf and append message to its tail
310 * @buf: buffer to be appended and replaced 308 * @list: the buffer chain
311 * @mtu: max allowable size for the bundle buffer, inclusive header 309 * @skb: buffer to be appended and replaced
310 * @mtu: max allowable size for the bundle buffer, inclusive header
312 * @dnode: destination node for message. (Not always present in header) 311 * @dnode: destination node for message. (Not always present in header)
313 * Replaces buffer if successful 312 * Replaces buffer if successful
314 * Returns true if success, otherwise false 313 * Returns true if success, otherwise false
315 */ 314 */
316bool tipc_msg_make_bundle(struct sk_buff **buf, u32 mtu, u32 dnode) 315bool tipc_msg_make_bundle(struct sk_buff_head *list, struct sk_buff *skb,
316 u32 mtu, u32 dnode)
317{ 317{
318 struct sk_buff *bbuf; 318 struct sk_buff *bskb;
319 struct tipc_msg *bmsg; 319 struct tipc_msg *bmsg;
320 struct tipc_msg *msg = buf_msg(*buf); 320 struct tipc_msg *msg = buf_msg(skb);
321 u32 msz = msg_size(msg); 321 u32 msz = msg_size(msg);
322 u32 max = mtu - INT_H_SIZE; 322 u32 max = mtu - INT_H_SIZE;
323 323
@@ -330,20 +330,19 @@ bool tipc_msg_make_bundle(struct sk_buff **buf, u32 mtu, u32 dnode)
330 if (msz > (max / 2)) 330 if (msz > (max / 2))
331 return false; 331 return false;
332 332
333 bbuf = tipc_buf_acquire(max); 333 bskb = tipc_buf_acquire(max);
334 if (!bbuf) 334 if (!bskb)
335 return false; 335 return false;
336 336
337 skb_trim(bbuf, INT_H_SIZE); 337 skb_trim(bskb, INT_H_SIZE);
338 bmsg = buf_msg(bbuf); 338 bmsg = buf_msg(bskb);
339 tipc_msg_init(bmsg, MSG_BUNDLER, BUNDLE_OPEN, INT_H_SIZE, dnode); 339 tipc_msg_init(bmsg, MSG_BUNDLER, 0, INT_H_SIZE, dnode);
340 msg_set_seqno(bmsg, msg_seqno(msg)); 340 msg_set_seqno(bmsg, msg_seqno(msg));
341 msg_set_ack(bmsg, msg_ack(msg)); 341 msg_set_ack(bmsg, msg_ack(msg));
342 msg_set_bcast_ack(bmsg, msg_bcast_ack(msg)); 342 msg_set_bcast_ack(bmsg, msg_bcast_ack(msg));
343 bbuf->next = (*buf)->next; 343 TIPC_SKB_CB(bskb)->bundling = true;
344 tipc_msg_bundle(bbuf, *buf, mtu); 344 __skb_queue_tail(list, bskb);
345 *buf = bbuf; 345 return tipc_msg_bundle(list, skb, mtu);
346 return true;
347} 346}
348 347
349/** 348/**
@@ -429,22 +428,23 @@ int tipc_msg_eval(struct sk_buff *buf, u32 *dnode)
429/* tipc_msg_reassemble() - clone a buffer chain of fragments and 428/* tipc_msg_reassemble() - clone a buffer chain of fragments and
430 * reassemble the clones into one message 429 * reassemble the clones into one message
431 */ 430 */
432struct sk_buff *tipc_msg_reassemble(struct sk_buff *chain) 431struct sk_buff *tipc_msg_reassemble(struct sk_buff_head *list)
433{ 432{
434 struct sk_buff *buf = chain; 433 struct sk_buff *skb;
435 struct sk_buff *frag = buf; 434 struct sk_buff *frag = NULL;
436 struct sk_buff *head = NULL; 435 struct sk_buff *head = NULL;
437 int hdr_sz; 436 int hdr_sz;
438 437
439 /* Copy header if single buffer */ 438 /* Copy header if single buffer */
440 if (!buf->next) { 439 if (skb_queue_len(list) == 1) {
441 hdr_sz = skb_headroom(buf) + msg_hdr_sz(buf_msg(buf)); 440 skb = skb_peek(list);
442 return __pskb_copy(buf, hdr_sz, GFP_ATOMIC); 441 hdr_sz = skb_headroom(skb) + msg_hdr_sz(buf_msg(skb));
442 return __pskb_copy(skb, hdr_sz, GFP_ATOMIC);
443 } 443 }
444 444
445 /* Clone all fragments and reassemble */ 445 /* Clone all fragments and reassemble */
446 while (buf) { 446 skb_queue_walk(list, skb) {
447 frag = skb_clone(buf, GFP_ATOMIC); 447 frag = skb_clone(skb, GFP_ATOMIC);
448 if (!frag) 448 if (!frag)
449 goto error; 449 goto error;
450 frag->next = NULL; 450 frag->next = NULL;
@@ -452,7 +452,6 @@ struct sk_buff *tipc_msg_reassemble(struct sk_buff *chain)
452 break; 452 break;
453 if (!head) 453 if (!head)
454 goto error; 454 goto error;
455 buf = buf->next;
456 } 455 }
457 return frag; 456 return frag;
458error: 457error:
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index d7d2ba2afe6c..d5c83d7ecb47 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -464,11 +464,6 @@ static inline struct tipc_msg *msg_get_wrapped(struct tipc_msg *m)
464#define FRAGMENT 1 464#define FRAGMENT 1
465#define LAST_FRAGMENT 2 465#define LAST_FRAGMENT 2
466 466
467/* Bundling protocol message types
468 */
469#define BUNDLE_OPEN 0
470#define BUNDLE_CLOSED 1
471
472/* 467/*
473 * Link management protocol message types 468 * Link management protocol message types
474 */ 469 */
@@ -739,13 +734,14 @@ struct sk_buff *tipc_msg_create(uint user, uint type, uint hdr_sz,
739 734
740int tipc_buf_append(struct sk_buff **headbuf, struct sk_buff **buf); 735int tipc_buf_append(struct sk_buff **headbuf, struct sk_buff **buf);
741 736
742bool tipc_msg_bundle(struct sk_buff *bbuf, struct sk_buff *buf, u32 mtu); 737bool tipc_msg_bundle(struct sk_buff_head *list, struct sk_buff *skb, u32 mtu);
743 738
744bool tipc_msg_make_bundle(struct sk_buff **buf, u32 mtu, u32 dnode); 739bool tipc_msg_make_bundle(struct sk_buff_head *list, struct sk_buff *skb,
740 u32 mtu, u32 dnode);
745 741
746int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m, 742int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m, int offset,
747 int offset, int dsz, int mtu , struct sk_buff **chain); 743 int dsz, int mtu, struct sk_buff_head *list);
748 744
749struct sk_buff *tipc_msg_reassemble(struct sk_buff *chain); 745struct sk_buff *tipc_msg_reassemble(struct sk_buff_head *list);
750 746
751#endif 747#endif
diff --git a/net/tipc/name_distr.c b/net/tipc/name_distr.c
index 376d2bb51d8d..56248db75274 100644
--- a/net/tipc/name_distr.c
+++ b/net/tipc/name_distr.c
@@ -114,9 +114,9 @@ static struct sk_buff *named_prepare_buf(u32 type, u32 size, u32 dest)
114 return buf; 114 return buf;
115} 115}
116 116
117void named_cluster_distribute(struct sk_buff *buf) 117void named_cluster_distribute(struct sk_buff *skb)
118{ 118{
119 struct sk_buff *obuf; 119 struct sk_buff *oskb;
120 struct tipc_node *node; 120 struct tipc_node *node;
121 u32 dnode; 121 u32 dnode;
122 122
@@ -127,15 +127,15 @@ void named_cluster_distribute(struct sk_buff *buf)
127 continue; 127 continue;
128 if (!tipc_node_active_links(node)) 128 if (!tipc_node_active_links(node))
129 continue; 129 continue;
130 obuf = skb_copy(buf, GFP_ATOMIC); 130 oskb = skb_copy(skb, GFP_ATOMIC);
131 if (!obuf) 131 if (!oskb)
132 break; 132 break;
133 msg_set_destnode(buf_msg(obuf), dnode); 133 msg_set_destnode(buf_msg(oskb), dnode);
134 tipc_link_xmit(obuf, dnode, dnode); 134 tipc_link_xmit_skb(oskb, dnode, dnode);
135 } 135 }
136 rcu_read_unlock(); 136 rcu_read_unlock();
137 137
138 kfree_skb(buf); 138 kfree_skb(skb);
139} 139}
140 140
141/** 141/**
@@ -190,15 +190,15 @@ struct sk_buff *tipc_named_withdraw(struct publication *publ)
190 190
191/** 191/**
192 * named_distribute - prepare name info for bulk distribution to another node 192 * named_distribute - prepare name info for bulk distribution to another node
193 * @msg_list: list of messages (buffers) to be returned from this function 193 * @list: list of messages (buffers) to be returned from this function
194 * @dnode: node to be updated 194 * @dnode: node to be updated
195 * @pls: linked list of publication items to be packed into buffer chain 195 * @pls: linked list of publication items to be packed into buffer chain
196 */ 196 */
197static void named_distribute(struct list_head *msg_list, u32 dnode, 197static void named_distribute(struct sk_buff_head *list, u32 dnode,
198 struct publ_list *pls) 198 struct publ_list *pls)
199{ 199{
200 struct publication *publ; 200 struct publication *publ;
201 struct sk_buff *buf = NULL; 201 struct sk_buff *skb = NULL;
202 struct distr_item *item = NULL; 202 struct distr_item *item = NULL;
203 uint dsz = pls->size * ITEM_SIZE; 203 uint dsz = pls->size * ITEM_SIZE;
204 uint msg_dsz = (tipc_node_get_mtu(dnode, 0) / ITEM_SIZE) * ITEM_SIZE; 204 uint msg_dsz = (tipc_node_get_mtu(dnode, 0) / ITEM_SIZE) * ITEM_SIZE;
@@ -207,15 +207,15 @@ static void named_distribute(struct list_head *msg_list, u32 dnode,
207 207
208 list_for_each_entry(publ, &pls->list, local_list) { 208 list_for_each_entry(publ, &pls->list, local_list) {
209 /* Prepare next buffer: */ 209 /* Prepare next buffer: */
210 if (!buf) { 210 if (!skb) {
211 msg_rem = min_t(uint, rem, msg_dsz); 211 msg_rem = min_t(uint, rem, msg_dsz);
212 rem -= msg_rem; 212 rem -= msg_rem;
213 buf = named_prepare_buf(PUBLICATION, msg_rem, dnode); 213 skb = named_prepare_buf(PUBLICATION, msg_rem, dnode);
214 if (!buf) { 214 if (!skb) {
215 pr_warn("Bulk publication failure\n"); 215 pr_warn("Bulk publication failure\n");
216 return; 216 return;
217 } 217 }
218 item = (struct distr_item *)msg_data(buf_msg(buf)); 218 item = (struct distr_item *)msg_data(buf_msg(skb));
219 } 219 }
220 220
221 /* Pack publication into message: */ 221 /* Pack publication into message: */
@@ -225,8 +225,8 @@ static void named_distribute(struct list_head *msg_list, u32 dnode,
225 225
226 /* Append full buffer to list: */ 226 /* Append full buffer to list: */
227 if (!msg_rem) { 227 if (!msg_rem) {
228 list_add_tail((struct list_head *)buf, msg_list); 228 __skb_queue_tail(list, skb);
229 buf = NULL; 229 skb = NULL;
230 } 230 }
231 } 231 }
232} 232}
@@ -236,27 +236,57 @@ static void named_distribute(struct list_head *msg_list, u32 dnode,
236 */ 236 */
237void tipc_named_node_up(u32 dnode) 237void tipc_named_node_up(u32 dnode)
238{ 238{
239 LIST_HEAD(msg_list); 239 struct sk_buff_head head;
240 struct sk_buff *buf_chain; 240
241 __skb_queue_head_init(&head);
241 242
242 read_lock_bh(&tipc_nametbl_lock); 243 read_lock_bh(&tipc_nametbl_lock);
243 named_distribute(&msg_list, dnode, &publ_cluster); 244 named_distribute(&head, dnode, &publ_cluster);
244 named_distribute(&msg_list, dnode, &publ_zone); 245 named_distribute(&head, dnode, &publ_zone);
245 read_unlock_bh(&tipc_nametbl_lock); 246 read_unlock_bh(&tipc_nametbl_lock);
246 247
247 /* Convert circular list to linear list and send: */ 248 tipc_link_xmit(&head, dnode, dnode);
248 buf_chain = (struct sk_buff *)msg_list.next; 249}
249 ((struct sk_buff *)msg_list.prev)->next = NULL; 250
250 tipc_link_xmit(buf_chain, dnode, dnode); 251static void tipc_publ_subscribe(struct publication *publ, u32 addr)
252{
253 struct tipc_node *node;
254
255 if (in_own_node(addr))
256 return;
257
258 node = tipc_node_find(addr);
259 if (!node) {
260 pr_warn("Node subscription rejected, unknown node 0x%x\n",
261 addr);
262 return;
263 }
264
265 tipc_node_lock(node);
266 list_add_tail(&publ->nodesub_list, &node->publ_list);
267 tipc_node_unlock(node);
268}
269
270static void tipc_publ_unsubscribe(struct publication *publ, u32 addr)
271{
272 struct tipc_node *node;
273
274 node = tipc_node_find(addr);
275 if (!node)
276 return;
277
278 tipc_node_lock(node);
279 list_del_init(&publ->nodesub_list);
280 tipc_node_unlock(node);
251} 281}
252 282
253/** 283/**
254 * named_purge_publ - remove publication associated with a failed node 284 * tipc_publ_purge - remove publication associated with a failed node
255 * 285 *
256 * Invoked for each publication issued by a newly failed node. 286 * Invoked for each publication issued by a newly failed node.
257 * Removes publication structure from name table & deletes it. 287 * Removes publication structure from name table & deletes it.
258 */ 288 */
259static void named_purge_publ(struct publication *publ) 289static void tipc_publ_purge(struct publication *publ, u32 addr)
260{ 290{
261 struct publication *p; 291 struct publication *p;
262 292
@@ -264,7 +294,7 @@ static void named_purge_publ(struct publication *publ)
264 p = tipc_nametbl_remove_publ(publ->type, publ->lower, 294 p = tipc_nametbl_remove_publ(publ->type, publ->lower,
265 publ->node, publ->ref, publ->key); 295 publ->node, publ->ref, publ->key);
266 if (p) 296 if (p)
267 tipc_nodesub_unsubscribe(&p->subscr); 297 tipc_publ_unsubscribe(p, addr);
268 write_unlock_bh(&tipc_nametbl_lock); 298 write_unlock_bh(&tipc_nametbl_lock);
269 299
270 if (p != publ) { 300 if (p != publ) {
@@ -277,6 +307,14 @@ static void named_purge_publ(struct publication *publ)
277 kfree(p); 307 kfree(p);
278} 308}
279 309
310void tipc_publ_notify(struct list_head *nsub_list, u32 addr)
311{
312 struct publication *publ, *tmp;
313
314 list_for_each_entry_safe(publ, tmp, nsub_list, nodesub_list)
315 tipc_publ_purge(publ, addr);
316}
317
280/** 318/**
281 * tipc_update_nametbl - try to process a nametable update and notify 319 * tipc_update_nametbl - try to process a nametable update and notify
282 * subscribers 320 * subscribers
@@ -294,9 +332,7 @@ static bool tipc_update_nametbl(struct distr_item *i, u32 node, u32 dtype)
294 TIPC_CLUSTER_SCOPE, node, 332 TIPC_CLUSTER_SCOPE, node,
295 ntohl(i->ref), ntohl(i->key)); 333 ntohl(i->ref), ntohl(i->key));
296 if (publ) { 334 if (publ) {
297 tipc_nodesub_subscribe(&publ->subscr, node, publ, 335 tipc_publ_subscribe(publ, node);
298 (net_ev_handler)
299 named_purge_publ);
300 return true; 336 return true;
301 } 337 }
302 } else if (dtype == WITHDRAWAL) { 338 } else if (dtype == WITHDRAWAL) {
@@ -304,7 +340,7 @@ static bool tipc_update_nametbl(struct distr_item *i, u32 node, u32 dtype)
304 node, ntohl(i->ref), 340 node, ntohl(i->ref),
305 ntohl(i->key)); 341 ntohl(i->key));
306 if (publ) { 342 if (publ) {
307 tipc_nodesub_unsubscribe(&publ->subscr); 343 tipc_publ_unsubscribe(publ, node);
308 kfree(publ); 344 kfree(publ);
309 return true; 345 return true;
310 } 346 }
diff --git a/net/tipc/name_distr.h b/net/tipc/name_distr.h
index b9e75feb3434..cef55cedcfb2 100644
--- a/net/tipc/name_distr.h
+++ b/net/tipc/name_distr.h
@@ -74,5 +74,6 @@ void tipc_named_node_up(u32 dnode);
74void tipc_named_rcv(struct sk_buff *buf); 74void tipc_named_rcv(struct sk_buff *buf);
75void tipc_named_reinit(void); 75void tipc_named_reinit(void);
76void tipc_named_process_backlog(void); 76void tipc_named_process_backlog(void);
77void tipc_publ_notify(struct list_head *nsub_list, u32 addr);
77 78
78#endif 79#endif
diff --git a/net/tipc/name_table.c b/net/tipc/name_table.c
index 7cfb7a4aa58f..772be1cd8bf6 100644
--- a/net/tipc/name_table.c
+++ b/net/tipc/name_table.c
@@ -144,7 +144,7 @@ static struct publication *publ_create(u32 type, u32 lower, u32 upper,
144 publ->key = key; 144 publ->key = key;
145 INIT_LIST_HEAD(&publ->local_list); 145 INIT_LIST_HEAD(&publ->local_list);
146 INIT_LIST_HEAD(&publ->pport_list); 146 INIT_LIST_HEAD(&publ->pport_list);
147 INIT_LIST_HEAD(&publ->subscr.nodesub_list); 147 INIT_LIST_HEAD(&publ->nodesub_list);
148 return publ; 148 return publ;
149} 149}
150 150
diff --git a/net/tipc/name_table.h b/net/tipc/name_table.h
index b38ebecac766..c62877826655 100644
--- a/net/tipc/name_table.h
+++ b/net/tipc/name_table.h
@@ -37,8 +37,6 @@
37#ifndef _TIPC_NAME_TABLE_H 37#ifndef _TIPC_NAME_TABLE_H
38#define _TIPC_NAME_TABLE_H 38#define _TIPC_NAME_TABLE_H
39 39
40#include "node_subscr.h"
41
42struct tipc_subscription; 40struct tipc_subscription;
43struct tipc_port_list; 41struct tipc_port_list;
44 42
@@ -56,7 +54,7 @@ struct tipc_port_list;
56 * @node: network address of publishing port's node 54 * @node: network address of publishing port's node
57 * @ref: publishing port 55 * @ref: publishing port
58 * @key: publication key 56 * @key: publication key
59 * @subscr: subscription to "node down" event (for off-node publications only) 57 * @nodesub_list: subscription to "node down" event (off-node publication only)
60 * @local_list: adjacent entries in list of publications made by this node 58 * @local_list: adjacent entries in list of publications made by this node
61 * @pport_list: adjacent entries in list of publications made by this port 59 * @pport_list: adjacent entries in list of publications made by this port
62 * @node_list: adjacent matching name seq publications with >= node scope 60 * @node_list: adjacent matching name seq publications with >= node scope
@@ -73,7 +71,7 @@ struct publication {
73 u32 node; 71 u32 node;
74 u32 ref; 72 u32 ref;
75 u32 key; 73 u32 key;
76 struct tipc_node_subscr subscr; 74 struct list_head nodesub_list;
77 struct list_head local_list; 75 struct list_head local_list;
78 struct list_head pport_list; 76 struct list_head pport_list;
79 struct list_head node_list; 77 struct list_head node_list;
diff --git a/net/tipc/node.c b/net/tipc/node.c
index 82e5edddc376..69b96be09a86 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -113,9 +113,10 @@ struct tipc_node *tipc_node_create(u32 addr)
113 spin_lock_init(&n_ptr->lock); 113 spin_lock_init(&n_ptr->lock);
114 INIT_HLIST_NODE(&n_ptr->hash); 114 INIT_HLIST_NODE(&n_ptr->hash);
115 INIT_LIST_HEAD(&n_ptr->list); 115 INIT_LIST_HEAD(&n_ptr->list);
116 INIT_LIST_HEAD(&n_ptr->nsub); 116 INIT_LIST_HEAD(&n_ptr->publ_list);
117 INIT_LIST_HEAD(&n_ptr->conn_sks); 117 INIT_LIST_HEAD(&n_ptr->conn_sks);
118 __skb_queue_head_init(&n_ptr->waiting_sks); 118 __skb_queue_head_init(&n_ptr->waiting_sks);
119 __skb_queue_head_init(&n_ptr->bclink.deferred_queue);
119 120
120 hlist_add_head_rcu(&n_ptr->hash, &node_htable[tipc_hashfn(addr)]); 121 hlist_add_head_rcu(&n_ptr->hash, &node_htable[tipc_hashfn(addr)]);
121 122
@@ -381,8 +382,7 @@ static void node_lost_contact(struct tipc_node *n_ptr)
381 382
382 /* Flush broadcast link info associated with lost node */ 383 /* Flush broadcast link info associated with lost node */
383 if (n_ptr->bclink.recv_permitted) { 384 if (n_ptr->bclink.recv_permitted) {
384 kfree_skb_list(n_ptr->bclink.deferred_head); 385 __skb_queue_purge(&n_ptr->bclink.deferred_queue);
385 n_ptr->bclink.deferred_size = 0;
386 386
387 if (n_ptr->bclink.reasm_buf) { 387 if (n_ptr->bclink.reasm_buf) {
388 kfree_skb(n_ptr->bclink.reasm_buf); 388 kfree_skb(n_ptr->bclink.reasm_buf);
@@ -574,7 +574,7 @@ void tipc_node_unlock(struct tipc_node *node)
574 skb_queue_splice_init(&node->waiting_sks, &waiting_sks); 574 skb_queue_splice_init(&node->waiting_sks, &waiting_sks);
575 575
576 if (flags & TIPC_NOTIFY_NODE_DOWN) { 576 if (flags & TIPC_NOTIFY_NODE_DOWN) {
577 list_replace_init(&node->nsub, &nsub_list); 577 list_replace_init(&node->publ_list, &nsub_list);
578 list_replace_init(&node->conn_sks, &conn_sks); 578 list_replace_init(&node->conn_sks, &conn_sks);
579 } 579 }
580 node->action_flags &= ~(TIPC_WAKEUP_USERS | TIPC_NOTIFY_NODE_DOWN | 580 node->action_flags &= ~(TIPC_WAKEUP_USERS | TIPC_NOTIFY_NODE_DOWN |
@@ -591,7 +591,7 @@ void tipc_node_unlock(struct tipc_node *node)
591 tipc_node_abort_sock_conns(&conn_sks); 591 tipc_node_abort_sock_conns(&conn_sks);
592 592
593 if (!list_empty(&nsub_list)) 593 if (!list_empty(&nsub_list))
594 tipc_nodesub_notify(&nsub_list); 594 tipc_publ_notify(&nsub_list, addr);
595 595
596 if (flags & TIPC_WAKEUP_BCAST_USERS) 596 if (flags & TIPC_WAKEUP_BCAST_USERS)
597 tipc_bclink_wakeup_users(); 597 tipc_bclink_wakeup_users();
diff --git a/net/tipc/node.h b/net/tipc/node.h
index 005fbcef3212..cbe0e950f1cc 100644
--- a/net/tipc/node.h
+++ b/net/tipc/node.h
@@ -37,7 +37,6 @@
37#ifndef _TIPC_NODE_H 37#ifndef _TIPC_NODE_H
38#define _TIPC_NODE_H 38#define _TIPC_NODE_H
39 39
40#include "node_subscr.h"
41#include "addr.h" 40#include "addr.h"
42#include "net.h" 41#include "net.h"
43#include "bearer.h" 42#include "bearer.h"
@@ -72,9 +71,7 @@ enum {
72 * @last_in: sequence # of last in-sequence b'cast message received from node 71 * @last_in: sequence # of last in-sequence b'cast message received from node
73 * @last_sent: sequence # of last b'cast message sent by node 72 * @last_sent: sequence # of last b'cast message sent by node
74 * @oos_state: state tracker for handling OOS b'cast messages 73 * @oos_state: state tracker for handling OOS b'cast messages
75 * @deferred_size: number of OOS b'cast messages in deferred queue 74 * @deferred_queue: deferred queue saved OOS b'cast message received from node
76 * @deferred_head: oldest OOS b'cast message received from node
77 * @deferred_tail: newest OOS b'cast message received from node
78 * @reasm_buf: broadcast reassembly queue head from node 75 * @reasm_buf: broadcast reassembly queue head from node
79 * @recv_permitted: true if node is allowed to receive b'cast messages 76 * @recv_permitted: true if node is allowed to receive b'cast messages
80 */ 77 */
@@ -84,8 +81,7 @@ struct tipc_node_bclink {
84 u32 last_sent; 81 u32 last_sent;
85 u32 oos_state; 82 u32 oos_state;
86 u32 deferred_size; 83 u32 deferred_size;
87 struct sk_buff *deferred_head; 84 struct sk_buff_head deferred_queue;
88 struct sk_buff *deferred_tail;
89 struct sk_buff *reasm_buf; 85 struct sk_buff *reasm_buf;
90 bool recv_permitted; 86 bool recv_permitted;
91}; 87};
@@ -104,7 +100,7 @@ struct tipc_node_bclink {
104 * @link_cnt: number of links to node 100 * @link_cnt: number of links to node
105 * @signature: node instance identifier 101 * @signature: node instance identifier
106 * @link_id: local and remote bearer ids of changing link, if any 102 * @link_id: local and remote bearer ids of changing link, if any
107 * @nsub: list of "node down" subscriptions monitoring node 103 * @publ_list: list of publications
108 * @rcu: rcu struct for tipc_node 104 * @rcu: rcu struct for tipc_node
109 */ 105 */
110struct tipc_node { 106struct tipc_node {
@@ -121,7 +117,7 @@ struct tipc_node {
121 int working_links; 117 int working_links;
122 u32 signature; 118 u32 signature;
123 u32 link_id; 119 u32 link_id;
124 struct list_head nsub; 120 struct list_head publ_list;
125 struct sk_buff_head waiting_sks; 121 struct sk_buff_head waiting_sks;
126 struct list_head conn_sks; 122 struct list_head conn_sks;
127 struct rcu_head rcu; 123 struct rcu_head rcu;
diff --git a/net/tipc/node_subscr.c b/net/tipc/node_subscr.c
deleted file mode 100644
index 2d13eea8574a..000000000000
--- a/net/tipc/node_subscr.c
+++ /dev/null
@@ -1,96 +0,0 @@
1/*
2 * net/tipc/node_subscr.c: TIPC "node down" subscription handling
3 *
4 * Copyright (c) 1995-2006, Ericsson AB
5 * Copyright (c) 2005, 2010-2011, Wind River Systems
6 * All rights reserved.
7 *
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions are met:
10 *
11 * 1. Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * 2. Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in the
15 * documentation and/or other materials provided with the distribution.
16 * 3. Neither the names of the copyright holders nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * Alternatively, this software may be distributed under the terms of the
21 * GNU General Public License ("GPL") version 2 as published by the Free
22 * Software Foundation.
23 *
24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
25 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
26 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
27 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
28 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
29 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
30 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
31 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
32 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
33 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
34 * POSSIBILITY OF SUCH DAMAGE.
35 */
36
37#include "core.h"
38#include "node_subscr.h"
39#include "node.h"
40
41/**
42 * tipc_nodesub_subscribe - create "node down" subscription for specified node
43 */
44void tipc_nodesub_subscribe(struct tipc_node_subscr *node_sub, u32 addr,
45 void *usr_handle, net_ev_handler handle_down)
46{
47 if (in_own_node(addr)) {
48 node_sub->node = NULL;
49 return;
50 }
51
52 node_sub->node = tipc_node_find(addr);
53 if (!node_sub->node) {
54 pr_warn("Node subscription rejected, unknown node 0x%x\n",
55 addr);
56 return;
57 }
58 node_sub->handle_node_down = handle_down;
59 node_sub->usr_handle = usr_handle;
60
61 tipc_node_lock(node_sub->node);
62 list_add_tail(&node_sub->nodesub_list, &node_sub->node->nsub);
63 tipc_node_unlock(node_sub->node);
64}
65
66/**
67 * tipc_nodesub_unsubscribe - cancel "node down" subscription (if any)
68 */
69void tipc_nodesub_unsubscribe(struct tipc_node_subscr *node_sub)
70{
71 if (!node_sub->node)
72 return;
73
74 tipc_node_lock(node_sub->node);
75 list_del_init(&node_sub->nodesub_list);
76 tipc_node_unlock(node_sub->node);
77}
78
79/**
80 * tipc_nodesub_notify - notify subscribers that a node is unreachable
81 *
82 * Note: node is locked by caller
83 */
84void tipc_nodesub_notify(struct list_head *nsub_list)
85{
86 struct tipc_node_subscr *ns, *safe;
87 net_ev_handler handle_node_down;
88
89 list_for_each_entry_safe(ns, safe, nsub_list, nodesub_list) {
90 handle_node_down = ns->handle_node_down;
91 if (handle_node_down) {
92 ns->handle_node_down = NULL;
93 handle_node_down(ns->usr_handle);
94 }
95 }
96}
diff --git a/net/tipc/node_subscr.h b/net/tipc/node_subscr.h
deleted file mode 100644
index d91b8cc81e3d..000000000000
--- a/net/tipc/node_subscr.h
+++ /dev/null
@@ -1,63 +0,0 @@
1/*
2 * net/tipc/node_subscr.h: Include file for TIPC "node down" subscription handling
3 *
4 * Copyright (c) 1995-2006, Ericsson AB
5 * Copyright (c) 2005, 2010-2011, Wind River Systems
6 * All rights reserved.
7 *
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions are met:
10 *
11 * 1. Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * 2. Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in the
15 * documentation and/or other materials provided with the distribution.
16 * 3. Neither the names of the copyright holders nor the names of its
17 * contributors may be used to endorse or promote products derived from
18 * this software without specific prior written permission.
19 *
20 * Alternatively, this software may be distributed under the terms of the
21 * GNU General Public License ("GPL") version 2 as published by the Free
22 * Software Foundation.
23 *
24 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
25 * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
26 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
27 * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
28 * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
29 * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
30 * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
31 * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
32 * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
33 * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
34 * POSSIBILITY OF SUCH DAMAGE.
35 */
36
37#ifndef _TIPC_NODE_SUBSCR_H
38#define _TIPC_NODE_SUBSCR_H
39
40#include "addr.h"
41
42typedef void (*net_ev_handler) (void *usr_handle);
43
44/**
45 * struct tipc_node_subscr - "node down" subscription entry
46 * @node: ptr to node structure of interest (or NULL, if none)
47 * @handle_node_down: routine to invoke when node fails
48 * @usr_handle: argument to pass to routine when node fails
49 * @nodesub_list: adjacent entries in list of subscriptions for the node
50 */
51struct tipc_node_subscr {
52 struct tipc_node *node;
53 net_ev_handler handle_node_down;
54 void *usr_handle;
55 struct list_head nodesub_list;
56};
57
58void tipc_nodesub_subscribe(struct tipc_node_subscr *node_sub, u32 addr,
59 void *usr_handle, net_ev_handler handle_down);
60void tipc_nodesub_unsubscribe(struct tipc_node_subscr *node_sub);
61void tipc_nodesub_notify(struct list_head *nsub_list);
62
63#endif
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index 341fbd1b5f74..9658d9b63876 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -244,12 +244,12 @@ static void tsk_advance_rx_queue(struct sock *sk)
244 */ 244 */
245static void tsk_rej_rx_queue(struct sock *sk) 245static void tsk_rej_rx_queue(struct sock *sk)
246{ 246{
247 struct sk_buff *buf; 247 struct sk_buff *skb;
248 u32 dnode; 248 u32 dnode;
249 249
250 while ((buf = __skb_dequeue(&sk->sk_receive_queue))) { 250 while ((skb = __skb_dequeue(&sk->sk_receive_queue))) {
251 if (tipc_msg_reverse(buf, &dnode, TIPC_ERR_NO_PORT)) 251 if (tipc_msg_reverse(skb, &dnode, TIPC_ERR_NO_PORT))
252 tipc_link_xmit(buf, dnode, 0); 252 tipc_link_xmit_skb(skb, dnode, 0);
253 } 253 }
254} 254}
255 255
@@ -462,7 +462,7 @@ static int tipc_release(struct socket *sock)
462{ 462{
463 struct sock *sk = sock->sk; 463 struct sock *sk = sock->sk;
464 struct tipc_sock *tsk; 464 struct tipc_sock *tsk;
465 struct sk_buff *buf; 465 struct sk_buff *skb;
466 u32 dnode; 466 u32 dnode;
467 467
468 /* 468 /*
@@ -481,11 +481,11 @@ static int tipc_release(struct socket *sock)
481 */ 481 */
482 dnode = tsk_peer_node(tsk); 482 dnode = tsk_peer_node(tsk);
483 while (sock->state != SS_DISCONNECTING) { 483 while (sock->state != SS_DISCONNECTING) {
484 buf = __skb_dequeue(&sk->sk_receive_queue); 484 skb = __skb_dequeue(&sk->sk_receive_queue);
485 if (buf == NULL) 485 if (skb == NULL)
486 break; 486 break;
487 if (TIPC_SKB_CB(buf)->handle != NULL) 487 if (TIPC_SKB_CB(skb)->handle != NULL)
488 kfree_skb(buf); 488 kfree_skb(skb);
489 else { 489 else {
490 if ((sock->state == SS_CONNECTING) || 490 if ((sock->state == SS_CONNECTING) ||
491 (sock->state == SS_CONNECTED)) { 491 (sock->state == SS_CONNECTED)) {
@@ -493,8 +493,8 @@ static int tipc_release(struct socket *sock)
493 tsk->connected = 0; 493 tsk->connected = 0;
494 tipc_node_remove_conn(dnode, tsk->ref); 494 tipc_node_remove_conn(dnode, tsk->ref);
495 } 495 }
496 if (tipc_msg_reverse(buf, &dnode, TIPC_ERR_NO_PORT)) 496 if (tipc_msg_reverse(skb, &dnode, TIPC_ERR_NO_PORT))
497 tipc_link_xmit(buf, dnode, 0); 497 tipc_link_xmit_skb(skb, dnode, 0);
498 } 498 }
499 } 499 }
500 500
@@ -502,12 +502,12 @@ static int tipc_release(struct socket *sock)
502 tipc_sk_ref_discard(tsk->ref); 502 tipc_sk_ref_discard(tsk->ref);
503 k_cancel_timer(&tsk->timer); 503 k_cancel_timer(&tsk->timer);
504 if (tsk->connected) { 504 if (tsk->connected) {
505 buf = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, TIPC_CONN_MSG, 505 skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, TIPC_CONN_MSG,
506 SHORT_H_SIZE, 0, dnode, tipc_own_addr, 506 SHORT_H_SIZE, 0, dnode, tipc_own_addr,
507 tsk_peer_port(tsk), 507 tsk_peer_port(tsk),
508 tsk->ref, TIPC_ERR_NO_PORT); 508 tsk->ref, TIPC_ERR_NO_PORT);
509 if (buf) 509 if (skb)
510 tipc_link_xmit(buf, dnode, tsk->ref); 510 tipc_link_xmit_skb(skb, dnode, tsk->ref);
511 tipc_node_remove_conn(dnode, tsk->ref); 511 tipc_node_remove_conn(dnode, tsk->ref);
512 } 512 }
513 k_term_timer(&tsk->timer); 513 k_term_timer(&tsk->timer);
@@ -712,7 +712,7 @@ static int tipc_sendmcast(struct socket *sock, struct tipc_name_seq *seq,
712{ 712{
713 struct sock *sk = sock->sk; 713 struct sock *sk = sock->sk;
714 struct tipc_msg *mhdr = &tipc_sk(sk)->phdr; 714 struct tipc_msg *mhdr = &tipc_sk(sk)->phdr;
715 struct sk_buff *buf; 715 struct sk_buff_head head;
716 uint mtu; 716 uint mtu;
717 int rc; 717 int rc;
718 718
@@ -727,12 +727,13 @@ static int tipc_sendmcast(struct socket *sock, struct tipc_name_seq *seq,
727 727
728new_mtu: 728new_mtu:
729 mtu = tipc_bclink_get_mtu(); 729 mtu = tipc_bclink_get_mtu();
730 rc = tipc_msg_build(mhdr, msg, 0, dsz, mtu, &buf); 730 __skb_queue_head_init(&head);
731 rc = tipc_msg_build(mhdr, msg, 0, dsz, mtu, &head);
731 if (unlikely(rc < 0)) 732 if (unlikely(rc < 0))
732 return rc; 733 return rc;
733 734
734 do { 735 do {
735 rc = tipc_bclink_xmit(buf); 736 rc = tipc_bclink_xmit(&head);
736 if (likely(rc >= 0)) { 737 if (likely(rc >= 0)) {
737 rc = dsz; 738 rc = dsz;
738 break; 739 break;
@@ -744,7 +745,7 @@ new_mtu:
744 tipc_sk(sk)->link_cong = 1; 745 tipc_sk(sk)->link_cong = 1;
745 rc = tipc_wait_for_sndmsg(sock, &timeo); 746 rc = tipc_wait_for_sndmsg(sock, &timeo);
746 if (rc) 747 if (rc)
747 kfree_skb_list(buf); 748 __skb_queue_purge(&head);
748 } while (!rc); 749 } while (!rc);
749 return rc; 750 return rc;
750} 751}
@@ -906,7 +907,8 @@ static int tipc_sendmsg(struct kiocb *iocb, struct socket *sock,
906 struct tipc_sock *tsk = tipc_sk(sk); 907 struct tipc_sock *tsk = tipc_sk(sk);
907 struct tipc_msg *mhdr = &tsk->phdr; 908 struct tipc_msg *mhdr = &tsk->phdr;
908 u32 dnode, dport; 909 u32 dnode, dport;
909 struct sk_buff *buf; 910 struct sk_buff_head head;
911 struct sk_buff *skb;
910 struct tipc_name_seq *seq = &dest->addr.nameseq; 912 struct tipc_name_seq *seq = &dest->addr.nameseq;
911 u32 mtu; 913 u32 mtu;
912 long timeo; 914 long timeo;
@@ -981,13 +983,15 @@ static int tipc_sendmsg(struct kiocb *iocb, struct socket *sock,
981 983
982new_mtu: 984new_mtu:
983 mtu = tipc_node_get_mtu(dnode, tsk->ref); 985 mtu = tipc_node_get_mtu(dnode, tsk->ref);
984 rc = tipc_msg_build(mhdr, m, 0, dsz, mtu, &buf); 986 __skb_queue_head_init(&head);
987 rc = tipc_msg_build(mhdr, m, 0, dsz, mtu, &head);
985 if (rc < 0) 988 if (rc < 0)
986 goto exit; 989 goto exit;
987 990
988 do { 991 do {
989 TIPC_SKB_CB(buf)->wakeup_pending = tsk->link_cong; 992 skb = skb_peek(&head);
990 rc = tipc_link_xmit(buf, dnode, tsk->ref); 993 TIPC_SKB_CB(skb)->wakeup_pending = tsk->link_cong;
994 rc = tipc_link_xmit(&head, dnode, tsk->ref);
991 if (likely(rc >= 0)) { 995 if (likely(rc >= 0)) {
992 if (sock->state != SS_READY) 996 if (sock->state != SS_READY)
993 sock->state = SS_CONNECTING; 997 sock->state = SS_CONNECTING;
@@ -1001,7 +1005,7 @@ new_mtu:
1001 tsk->link_cong = 1; 1005 tsk->link_cong = 1;
1002 rc = tipc_wait_for_sndmsg(sock, &timeo); 1006 rc = tipc_wait_for_sndmsg(sock, &timeo);
1003 if (rc) 1007 if (rc)
1004 kfree_skb_list(buf); 1008 __skb_queue_purge(&head);
1005 } while (!rc); 1009 } while (!rc);
1006exit: 1010exit:
1007 if (iocb) 1011 if (iocb)
@@ -1058,7 +1062,7 @@ static int tipc_send_stream(struct kiocb *iocb, struct socket *sock,
1058 struct sock *sk = sock->sk; 1062 struct sock *sk = sock->sk;
1059 struct tipc_sock *tsk = tipc_sk(sk); 1063 struct tipc_sock *tsk = tipc_sk(sk);
1060 struct tipc_msg *mhdr = &tsk->phdr; 1064 struct tipc_msg *mhdr = &tsk->phdr;
1061 struct sk_buff *buf; 1065 struct sk_buff_head head;
1062 DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name); 1066 DECLARE_SOCKADDR(struct sockaddr_tipc *, dest, m->msg_name);
1063 u32 ref = tsk->ref; 1067 u32 ref = tsk->ref;
1064 int rc = -EINVAL; 1068 int rc = -EINVAL;
@@ -1093,12 +1097,13 @@ static int tipc_send_stream(struct kiocb *iocb, struct socket *sock,
1093next: 1097next:
1094 mtu = tsk->max_pkt; 1098 mtu = tsk->max_pkt;
1095 send = min_t(uint, dsz - sent, TIPC_MAX_USER_MSG_SIZE); 1099 send = min_t(uint, dsz - sent, TIPC_MAX_USER_MSG_SIZE);
1096 rc = tipc_msg_build(mhdr, m, sent, send, mtu, &buf); 1100 __skb_queue_head_init(&head);
1101 rc = tipc_msg_build(mhdr, m, sent, send, mtu, &head);
1097 if (unlikely(rc < 0)) 1102 if (unlikely(rc < 0))
1098 goto exit; 1103 goto exit;
1099 do { 1104 do {
1100 if (likely(!tsk_conn_cong(tsk))) { 1105 if (likely(!tsk_conn_cong(tsk))) {
1101 rc = tipc_link_xmit(buf, dnode, ref); 1106 rc = tipc_link_xmit(&head, dnode, ref);
1102 if (likely(!rc)) { 1107 if (likely(!rc)) {
1103 tsk->sent_unacked++; 1108 tsk->sent_unacked++;
1104 sent += send; 1109 sent += send;
@@ -1116,7 +1121,7 @@ next:
1116 } 1121 }
1117 rc = tipc_wait_for_sndpkt(sock, &timeo); 1122 rc = tipc_wait_for_sndpkt(sock, &timeo);
1118 if (rc) 1123 if (rc)
1119 kfree_skb_list(buf); 1124 __skb_queue_purge(&head);
1120 } while (!rc); 1125 } while (!rc);
1121exit: 1126exit:
1122 if (iocb) 1127 if (iocb)
@@ -1261,20 +1266,20 @@ static int tipc_sk_anc_data_recv(struct msghdr *m, struct tipc_msg *msg,
1261 1266
1262static void tipc_sk_send_ack(struct tipc_sock *tsk, uint ack) 1267static void tipc_sk_send_ack(struct tipc_sock *tsk, uint ack)
1263{ 1268{
1264 struct sk_buff *buf = NULL; 1269 struct sk_buff *skb = NULL;
1265 struct tipc_msg *msg; 1270 struct tipc_msg *msg;
1266 u32 peer_port = tsk_peer_port(tsk); 1271 u32 peer_port = tsk_peer_port(tsk);
1267 u32 dnode = tsk_peer_node(tsk); 1272 u32 dnode = tsk_peer_node(tsk);
1268 1273
1269 if (!tsk->connected) 1274 if (!tsk->connected)
1270 return; 1275 return;
1271 buf = tipc_msg_create(CONN_MANAGER, CONN_ACK, INT_H_SIZE, 0, dnode, 1276 skb = tipc_msg_create(CONN_MANAGER, CONN_ACK, INT_H_SIZE, 0, dnode,
1272 tipc_own_addr, peer_port, tsk->ref, TIPC_OK); 1277 tipc_own_addr, peer_port, tsk->ref, TIPC_OK);
1273 if (!buf) 1278 if (!skb)
1274 return; 1279 return;
1275 msg = buf_msg(buf); 1280 msg = buf_msg(skb);
1276 msg_set_msgcnt(msg, ack); 1281 msg_set_msgcnt(msg, ack);
1277 tipc_link_xmit(buf, dnode, msg_link_selector(msg)); 1282 tipc_link_xmit_skb(skb, dnode, msg_link_selector(msg));
1278} 1283}
1279 1284
1280static int tipc_wait_for_rcvmsg(struct socket *sock, long *timeop) 1285static int tipc_wait_for_rcvmsg(struct socket *sock, long *timeop)
@@ -1729,20 +1734,20 @@ static int filter_rcv(struct sock *sk, struct sk_buff *buf)
1729/** 1734/**
1730 * tipc_backlog_rcv - handle incoming message from backlog queue 1735 * tipc_backlog_rcv - handle incoming message from backlog queue
1731 * @sk: socket 1736 * @sk: socket
1732 * @buf: message 1737 * @skb: message
1733 * 1738 *
1734 * Caller must hold socket lock, but not port lock. 1739 * Caller must hold socket lock, but not port lock.
1735 * 1740 *
1736 * Returns 0 1741 * Returns 0
1737 */ 1742 */
1738static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *buf) 1743static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *skb)
1739{ 1744{
1740 int rc; 1745 int rc;
1741 u32 onode; 1746 u32 onode;
1742 struct tipc_sock *tsk = tipc_sk(sk); 1747 struct tipc_sock *tsk = tipc_sk(sk);
1743 uint truesize = buf->truesize; 1748 uint truesize = skb->truesize;
1744 1749
1745 rc = filter_rcv(sk, buf); 1750 rc = filter_rcv(sk, skb);
1746 1751
1747 if (likely(!rc)) { 1752 if (likely(!rc)) {
1748 if (atomic_read(&tsk->dupl_rcvcnt) < TIPC_CONN_OVERLOAD_LIMIT) 1753 if (atomic_read(&tsk->dupl_rcvcnt) < TIPC_CONN_OVERLOAD_LIMIT)
@@ -1750,25 +1755,25 @@ static int tipc_backlog_rcv(struct sock *sk, struct sk_buff *buf)
1750 return 0; 1755 return 0;
1751 } 1756 }
1752 1757
1753 if ((rc < 0) && !tipc_msg_reverse(buf, &onode, -rc)) 1758 if ((rc < 0) && !tipc_msg_reverse(skb, &onode, -rc))
1754 return 0; 1759 return 0;
1755 1760
1756 tipc_link_xmit(buf, onode, 0); 1761 tipc_link_xmit_skb(skb, onode, 0);
1757 1762
1758 return 0; 1763 return 0;
1759} 1764}
1760 1765
1761/** 1766/**
1762 * tipc_sk_rcv - handle incoming message 1767 * tipc_sk_rcv - handle incoming message
1763 * @buf: buffer containing arriving message 1768 * @skb: buffer containing arriving message
1764 * Consumes buffer 1769 * Consumes buffer
1765 * Returns 0 if success, or errno: -EHOSTUNREACH 1770 * Returns 0 if success, or errno: -EHOSTUNREACH
1766 */ 1771 */
1767int tipc_sk_rcv(struct sk_buff *buf) 1772int tipc_sk_rcv(struct sk_buff *skb)
1768{ 1773{
1769 struct tipc_sock *tsk; 1774 struct tipc_sock *tsk;
1770 struct sock *sk; 1775 struct sock *sk;
1771 u32 dport = msg_destport(buf_msg(buf)); 1776 u32 dport = msg_destport(buf_msg(skb));
1772 int rc = TIPC_OK; 1777 int rc = TIPC_OK;
1773 uint limit; 1778 uint limit;
1774 u32 dnode; 1779 u32 dnode;
@@ -1776,7 +1781,7 @@ int tipc_sk_rcv(struct sk_buff *buf)
1776 /* Validate destination and message */ 1781 /* Validate destination and message */
1777 tsk = tipc_sk_get(dport); 1782 tsk = tipc_sk_get(dport);
1778 if (unlikely(!tsk)) { 1783 if (unlikely(!tsk)) {
1779 rc = tipc_msg_eval(buf, &dnode); 1784 rc = tipc_msg_eval(skb, &dnode);
1780 goto exit; 1785 goto exit;
1781 } 1786 }
1782 sk = &tsk->sk; 1787 sk = &tsk->sk;
@@ -1785,12 +1790,12 @@ int tipc_sk_rcv(struct sk_buff *buf)
1785 spin_lock_bh(&sk->sk_lock.slock); 1790 spin_lock_bh(&sk->sk_lock.slock);
1786 1791
1787 if (!sock_owned_by_user(sk)) { 1792 if (!sock_owned_by_user(sk)) {
1788 rc = filter_rcv(sk, buf); 1793 rc = filter_rcv(sk, skb);
1789 } else { 1794 } else {
1790 if (sk->sk_backlog.len == 0) 1795 if (sk->sk_backlog.len == 0)
1791 atomic_set(&tsk->dupl_rcvcnt, 0); 1796 atomic_set(&tsk->dupl_rcvcnt, 0);
1792 limit = rcvbuf_limit(sk, buf) + atomic_read(&tsk->dupl_rcvcnt); 1797 limit = rcvbuf_limit(sk, skb) + atomic_read(&tsk->dupl_rcvcnt);
1793 if (sk_add_backlog(sk, buf, limit)) 1798 if (sk_add_backlog(sk, skb, limit))
1794 rc = -TIPC_ERR_OVERLOAD; 1799 rc = -TIPC_ERR_OVERLOAD;
1795 } 1800 }
1796 spin_unlock_bh(&sk->sk_lock.slock); 1801 spin_unlock_bh(&sk->sk_lock.slock);
@@ -1798,10 +1803,10 @@ int tipc_sk_rcv(struct sk_buff *buf)
1798 if (likely(!rc)) 1803 if (likely(!rc))
1799 return 0; 1804 return 0;
1800exit: 1805exit:
1801 if ((rc < 0) && !tipc_msg_reverse(buf, &dnode, -rc)) 1806 if ((rc < 0) && !tipc_msg_reverse(skb, &dnode, -rc))
1802 return -EHOSTUNREACH; 1807 return -EHOSTUNREACH;
1803 1808
1804 tipc_link_xmit(buf, dnode, 0); 1809 tipc_link_xmit_skb(skb, dnode, 0);
1805 return (rc < 0) ? -EHOSTUNREACH : 0; 1810 return (rc < 0) ? -EHOSTUNREACH : 0;
1806} 1811}
1807 1812
@@ -2059,7 +2064,7 @@ static int tipc_shutdown(struct socket *sock, int how)
2059{ 2064{
2060 struct sock *sk = sock->sk; 2065 struct sock *sk = sock->sk;
2061 struct tipc_sock *tsk = tipc_sk(sk); 2066 struct tipc_sock *tsk = tipc_sk(sk);
2062 struct sk_buff *buf; 2067 struct sk_buff *skb;
2063 u32 dnode; 2068 u32 dnode;
2064 int res; 2069 int res;
2065 2070
@@ -2074,23 +2079,23 @@ static int tipc_shutdown(struct socket *sock, int how)
2074 2079
2075restart: 2080restart:
2076 /* Disconnect and send a 'FIN+' or 'FIN-' message to peer */ 2081 /* Disconnect and send a 'FIN+' or 'FIN-' message to peer */
2077 buf = __skb_dequeue(&sk->sk_receive_queue); 2082 skb = __skb_dequeue(&sk->sk_receive_queue);
2078 if (buf) { 2083 if (skb) {
2079 if (TIPC_SKB_CB(buf)->handle != NULL) { 2084 if (TIPC_SKB_CB(skb)->handle != NULL) {
2080 kfree_skb(buf); 2085 kfree_skb(skb);
2081 goto restart; 2086 goto restart;
2082 } 2087 }
2083 if (tipc_msg_reverse(buf, &dnode, TIPC_CONN_SHUTDOWN)) 2088 if (tipc_msg_reverse(skb, &dnode, TIPC_CONN_SHUTDOWN))
2084 tipc_link_xmit(buf, dnode, tsk->ref); 2089 tipc_link_xmit_skb(skb, dnode, tsk->ref);
2085 tipc_node_remove_conn(dnode, tsk->ref); 2090 tipc_node_remove_conn(dnode, tsk->ref);
2086 } else { 2091 } else {
2087 dnode = tsk_peer_node(tsk); 2092 dnode = tsk_peer_node(tsk);
2088 buf = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, 2093 skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE,
2089 TIPC_CONN_MSG, SHORT_H_SIZE, 2094 TIPC_CONN_MSG, SHORT_H_SIZE,
2090 0, dnode, tipc_own_addr, 2095 0, dnode, tipc_own_addr,
2091 tsk_peer_port(tsk), 2096 tsk_peer_port(tsk),
2092 tsk->ref, TIPC_CONN_SHUTDOWN); 2097 tsk->ref, TIPC_CONN_SHUTDOWN);
2093 tipc_link_xmit(buf, dnode, tsk->ref); 2098 tipc_link_xmit_skb(skb, dnode, tsk->ref);
2094 } 2099 }
2095 tsk->connected = 0; 2100 tsk->connected = 0;
2096 sock->state = SS_DISCONNECTING; 2101 sock->state = SS_DISCONNECTING;
@@ -2119,7 +2124,7 @@ static void tipc_sk_timeout(unsigned long ref)
2119{ 2124{
2120 struct tipc_sock *tsk; 2125 struct tipc_sock *tsk;
2121 struct sock *sk; 2126 struct sock *sk;
2122 struct sk_buff *buf = NULL; 2127 struct sk_buff *skb = NULL;
2123 u32 peer_port, peer_node; 2128 u32 peer_port, peer_node;
2124 2129
2125 tsk = tipc_sk_get(ref); 2130 tsk = tipc_sk_get(ref);
@@ -2137,20 +2142,20 @@ static void tipc_sk_timeout(unsigned long ref)
2137 2142
2138 if (tsk->probing_state == TIPC_CONN_PROBING) { 2143 if (tsk->probing_state == TIPC_CONN_PROBING) {
2139 /* Previous probe not answered -> self abort */ 2144 /* Previous probe not answered -> self abort */
2140 buf = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, TIPC_CONN_MSG, 2145 skb = tipc_msg_create(TIPC_CRITICAL_IMPORTANCE, TIPC_CONN_MSG,
2141 SHORT_H_SIZE, 0, tipc_own_addr, 2146 SHORT_H_SIZE, 0, tipc_own_addr,
2142 peer_node, ref, peer_port, 2147 peer_node, ref, peer_port,
2143 TIPC_ERR_NO_PORT); 2148 TIPC_ERR_NO_PORT);
2144 } else { 2149 } else {
2145 buf = tipc_msg_create(CONN_MANAGER, CONN_PROBE, INT_H_SIZE, 2150 skb = tipc_msg_create(CONN_MANAGER, CONN_PROBE, INT_H_SIZE,
2146 0, peer_node, tipc_own_addr, 2151 0, peer_node, tipc_own_addr,
2147 peer_port, ref, TIPC_OK); 2152 peer_port, ref, TIPC_OK);
2148 tsk->probing_state = TIPC_CONN_PROBING; 2153 tsk->probing_state = TIPC_CONN_PROBING;
2149 k_start_timer(&tsk->timer, tsk->probing_interval); 2154 k_start_timer(&tsk->timer, tsk->probing_interval);
2150 } 2155 }
2151 bh_unlock_sock(sk); 2156 bh_unlock_sock(sk);
2152 if (buf) 2157 if (skb)
2153 tipc_link_xmit(buf, peer_node, ref); 2158 tipc_link_xmit_skb(skb, peer_node, ref);
2154exit: 2159exit:
2155 tipc_sk_put(tsk); 2160 tipc_sk_put(tsk);
2156} 2161}