aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc/link.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/tipc/link.c')
-rw-r--r--net/tipc/link.c981
1 files changed, 676 insertions, 305 deletions
diff --git a/net/tipc/link.c b/net/tipc/link.c
index 1db162aa64a5..23bcc1132365 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -36,10 +36,12 @@
36 36
37#include "core.h" 37#include "core.h"
38#include "link.h" 38#include "link.h"
39#include "bcast.h"
39#include "socket.h" 40#include "socket.h"
40#include "name_distr.h" 41#include "name_distr.h"
41#include "discover.h" 42#include "discover.h"
42#include "config.h" 43#include "config.h"
44#include "netlink.h"
43 45
44#include <linux/pkt_sched.h> 46#include <linux/pkt_sched.h>
45 47
@@ -50,6 +52,30 @@ static const char *link_co_err = "Link changeover error, ";
50static const char *link_rst_msg = "Resetting link "; 52static const char *link_rst_msg = "Resetting link ";
51static const char *link_unk_evt = "Unknown link event "; 53static const char *link_unk_evt = "Unknown link event ";
52 54
55static const struct nla_policy tipc_nl_link_policy[TIPC_NLA_LINK_MAX + 1] = {
56 [TIPC_NLA_LINK_UNSPEC] = { .type = NLA_UNSPEC },
57 [TIPC_NLA_LINK_NAME] = {
58 .type = NLA_STRING,
59 .len = TIPC_MAX_LINK_NAME
60 },
61 [TIPC_NLA_LINK_MTU] = { .type = NLA_U32 },
62 [TIPC_NLA_LINK_BROADCAST] = { .type = NLA_FLAG },
63 [TIPC_NLA_LINK_UP] = { .type = NLA_FLAG },
64 [TIPC_NLA_LINK_ACTIVE] = { .type = NLA_FLAG },
65 [TIPC_NLA_LINK_PROP] = { .type = NLA_NESTED },
66 [TIPC_NLA_LINK_STATS] = { .type = NLA_NESTED },
67 [TIPC_NLA_LINK_RX] = { .type = NLA_U32 },
68 [TIPC_NLA_LINK_TX] = { .type = NLA_U32 }
69};
70
71/* Properties valid for media, bearar and link */
72static const struct nla_policy tipc_nl_prop_policy[TIPC_NLA_PROP_MAX + 1] = {
73 [TIPC_NLA_PROP_UNSPEC] = { .type = NLA_UNSPEC },
74 [TIPC_NLA_PROP_PRIO] = { .type = NLA_U32 },
75 [TIPC_NLA_PROP_TOL] = { .type = NLA_U32 },
76 [TIPC_NLA_PROP_WIN] = { .type = NLA_U32 }
77};
78
53/* 79/*
54 * Out-of-range value for link session numbers 80 * Out-of-range value for link session numbers
55 */ 81 */
@@ -123,18 +149,6 @@ static void link_init_max_pkt(struct tipc_link *l_ptr)
123 l_ptr->max_pkt_probes = 0; 149 l_ptr->max_pkt_probes = 0;
124} 150}
125 151
126static u32 link_next_sent(struct tipc_link *l_ptr)
127{
128 if (l_ptr->next_out)
129 return buf_seqno(l_ptr->next_out);
130 return mod(l_ptr->next_out_no);
131}
132
133static u32 link_last_sent(struct tipc_link *l_ptr)
134{
135 return mod(link_next_sent(l_ptr) - 1);
136}
137
138/* 152/*
139 * Simple non-static link routines (i.e. referenced outside this file) 153 * Simple non-static link routines (i.e. referenced outside this file)
140 */ 154 */
@@ -157,14 +171,17 @@ int tipc_link_is_active(struct tipc_link *l_ptr)
157 */ 171 */
158static void link_timeout(struct tipc_link *l_ptr) 172static void link_timeout(struct tipc_link *l_ptr)
159{ 173{
174 struct sk_buff *skb;
175
160 tipc_node_lock(l_ptr->owner); 176 tipc_node_lock(l_ptr->owner);
161 177
162 /* update counters used in statistical profiling of send traffic */ 178 /* update counters used in statistical profiling of send traffic */
163 l_ptr->stats.accu_queue_sz += l_ptr->out_queue_size; 179 l_ptr->stats.accu_queue_sz += skb_queue_len(&l_ptr->outqueue);
164 l_ptr->stats.queue_sz_counts++; 180 l_ptr->stats.queue_sz_counts++;
165 181
166 if (l_ptr->first_out) { 182 skb = skb_peek(&l_ptr->outqueue);
167 struct tipc_msg *msg = buf_msg(l_ptr->first_out); 183 if (skb) {
184 struct tipc_msg *msg = buf_msg(skb);
168 u32 length = msg_size(msg); 185 u32 length = msg_size(msg);
169 186
170 if ((msg_user(msg) == MSG_FRAGMENTER) && 187 if ((msg_user(msg) == MSG_FRAGMENTER) &&
@@ -192,11 +209,10 @@ static void link_timeout(struct tipc_link *l_ptr)
192 } 209 }
193 210
194 /* do all other link processing performed on a periodic basis */ 211 /* do all other link processing performed on a periodic basis */
195
196 link_state_event(l_ptr, TIMEOUT_EVT); 212 link_state_event(l_ptr, TIMEOUT_EVT);
197 213
198 if (l_ptr->next_out) 214 if (l_ptr->next_out)
199 tipc_link_push_queue(l_ptr); 215 tipc_link_push_packets(l_ptr);
200 216
201 tipc_node_unlock(l_ptr->owner); 217 tipc_node_unlock(l_ptr->owner);
202} 218}
@@ -224,9 +240,10 @@ struct tipc_link *tipc_link_create(struct tipc_node *n_ptr,
224 char addr_string[16]; 240 char addr_string[16];
225 u32 peer = n_ptr->addr; 241 u32 peer = n_ptr->addr;
226 242
227 if (n_ptr->link_cnt >= 2) { 243 if (n_ptr->link_cnt >= MAX_BEARERS) {
228 tipc_addr_string_fill(addr_string, n_ptr->addr); 244 tipc_addr_string_fill(addr_string, n_ptr->addr);
229 pr_err("Attempt to establish third link to %s\n", addr_string); 245 pr_err("Attempt to establish %uth link to %s. Max %u allowed.\n",
246 n_ptr->link_cnt, addr_string, MAX_BEARERS);
230 return NULL; 247 return NULL;
231 } 248 }
232 249
@@ -274,7 +291,9 @@ struct tipc_link *tipc_link_create(struct tipc_node *n_ptr,
274 link_init_max_pkt(l_ptr); 291 link_init_max_pkt(l_ptr);
275 292
276 l_ptr->next_out_no = 1; 293 l_ptr->next_out_no = 1;
277 __skb_queue_head_init(&l_ptr->waiting_sks); 294 __skb_queue_head_init(&l_ptr->outqueue);
295 __skb_queue_head_init(&l_ptr->deferred_queue);
296 skb_queue_head_init(&l_ptr->waiting_sks);
278 297
279 link_reset_statistics(l_ptr); 298 link_reset_statistics(l_ptr);
280 299
@@ -339,7 +358,7 @@ static bool link_schedule_user(struct tipc_link *link, u32 oport,
339 return false; 358 return false;
340 TIPC_SKB_CB(buf)->chain_sz = chain_sz; 359 TIPC_SKB_CB(buf)->chain_sz = chain_sz;
341 TIPC_SKB_CB(buf)->chain_imp = imp; 360 TIPC_SKB_CB(buf)->chain_imp = imp;
342 __skb_queue_tail(&link->waiting_sks, buf); 361 skb_queue_tail(&link->waiting_sks, buf);
343 link->stats.link_congs++; 362 link->stats.link_congs++;
344 return true; 363 return true;
345} 364}
@@ -352,30 +371,19 @@ static bool link_schedule_user(struct tipc_link *link, u32 oport,
352 */ 371 */
353static void link_prepare_wakeup(struct tipc_link *link) 372static void link_prepare_wakeup(struct tipc_link *link)
354{ 373{
355 struct sk_buff_head *wq = &link->waiting_sks; 374 uint pend_qsz = skb_queue_len(&link->outqueue);
356 struct sk_buff *buf; 375 struct sk_buff *skb, *tmp;
357 uint pend_qsz = link->out_queue_size;
358 376
359 for (buf = skb_peek(wq); buf; buf = skb_peek(wq)) { 377 skb_queue_walk_safe(&link->waiting_sks, skb, tmp) {
360 if (pend_qsz >= link->queue_limit[TIPC_SKB_CB(buf)->chain_imp]) 378 if (pend_qsz >= link->queue_limit[TIPC_SKB_CB(skb)->chain_imp])
361 break; 379 break;
362 pend_qsz += TIPC_SKB_CB(buf)->chain_sz; 380 pend_qsz += TIPC_SKB_CB(skb)->chain_sz;
363 __skb_queue_tail(&link->owner->waiting_sks, __skb_dequeue(wq)); 381 skb_unlink(skb, &link->waiting_sks);
382 skb_queue_tail(&link->owner->waiting_sks, skb);
364 } 383 }
365} 384}
366 385
367/** 386/**
368 * link_release_outqueue - purge link's outbound message queue
369 * @l_ptr: pointer to link
370 */
371static void link_release_outqueue(struct tipc_link *l_ptr)
372{
373 kfree_skb_list(l_ptr->first_out);
374 l_ptr->first_out = NULL;
375 l_ptr->out_queue_size = 0;
376}
377
378/**
379 * tipc_link_reset_fragments - purge link's inbound message fragments queue 387 * tipc_link_reset_fragments - purge link's inbound message fragments queue
380 * @l_ptr: pointer to link 388 * @l_ptr: pointer to link
381 */ 389 */
@@ -391,11 +399,9 @@ void tipc_link_reset_fragments(struct tipc_link *l_ptr)
391 */ 399 */
392void tipc_link_purge_queues(struct tipc_link *l_ptr) 400void tipc_link_purge_queues(struct tipc_link *l_ptr)
393{ 401{
394 kfree_skb_list(l_ptr->oldest_deferred_in); 402 __skb_queue_purge(&l_ptr->deferred_queue);
395 kfree_skb_list(l_ptr->first_out); 403 __skb_queue_purge(&l_ptr->outqueue);
396 tipc_link_reset_fragments(l_ptr); 404 tipc_link_reset_fragments(l_ptr);
397 kfree_skb(l_ptr->proto_msg_queue);
398 l_ptr->proto_msg_queue = NULL;
399} 405}
400 406
401void tipc_link_reset(struct tipc_link *l_ptr) 407void tipc_link_reset(struct tipc_link *l_ptr)
@@ -427,25 +433,16 @@ void tipc_link_reset(struct tipc_link *l_ptr)
427 } 433 }
428 434
429 /* Clean up all queues: */ 435 /* Clean up all queues: */
430 link_release_outqueue(l_ptr); 436 __skb_queue_purge(&l_ptr->outqueue);
431 kfree_skb(l_ptr->proto_msg_queue); 437 __skb_queue_purge(&l_ptr->deferred_queue);
432 l_ptr->proto_msg_queue = NULL;
433 kfree_skb_list(l_ptr->oldest_deferred_in);
434 if (!skb_queue_empty(&l_ptr->waiting_sks)) { 438 if (!skb_queue_empty(&l_ptr->waiting_sks)) {
435 skb_queue_splice_init(&l_ptr->waiting_sks, &owner->waiting_sks); 439 skb_queue_splice_init(&l_ptr->waiting_sks, &owner->waiting_sks);
436 owner->action_flags |= TIPC_WAKEUP_USERS; 440 owner->action_flags |= TIPC_WAKEUP_USERS;
437 } 441 }
438 l_ptr->retransm_queue_head = 0;
439 l_ptr->retransm_queue_size = 0;
440 l_ptr->last_out = NULL;
441 l_ptr->first_out = NULL;
442 l_ptr->next_out = NULL; 442 l_ptr->next_out = NULL;
443 l_ptr->unacked_window = 0; 443 l_ptr->unacked_window = 0;
444 l_ptr->checkpoint = 1; 444 l_ptr->checkpoint = 1;
445 l_ptr->next_out_no = 1; 445 l_ptr->next_out_no = 1;
446 l_ptr->deferred_inqueue_sz = 0;
447 l_ptr->oldest_deferred_in = NULL;
448 l_ptr->newest_deferred_in = NULL;
449 l_ptr->fsm_msg_cnt = 0; 446 l_ptr->fsm_msg_cnt = 0;
450 l_ptr->stale_count = 0; 447 l_ptr->stale_count = 0;
451 link_reset_statistics(l_ptr); 448 link_reset_statistics(l_ptr);
@@ -667,9 +664,10 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)
667 * - For all other messages we discard the buffer and return -EHOSTUNREACH 664 * - For all other messages we discard the buffer and return -EHOSTUNREACH
668 * - For TIPC internal messages we also reset the link 665 * - For TIPC internal messages we also reset the link
669 */ 666 */
670static int tipc_link_cong(struct tipc_link *link, struct sk_buff *buf) 667static int tipc_link_cong(struct tipc_link *link, struct sk_buff_head *list)
671{ 668{
672 struct tipc_msg *msg = buf_msg(buf); 669 struct sk_buff *skb = skb_peek(list);
670 struct tipc_msg *msg = buf_msg(skb);
673 uint imp = tipc_msg_tot_importance(msg); 671 uint imp = tipc_msg_tot_importance(msg);
674 u32 oport = msg_tot_origport(msg); 672 u32 oport = msg_tot_origport(msg);
675 673
@@ -682,30 +680,30 @@ static int tipc_link_cong(struct tipc_link *link, struct sk_buff *buf)
682 goto drop; 680 goto drop;
683 if (unlikely(msg_reroute_cnt(msg))) 681 if (unlikely(msg_reroute_cnt(msg)))
684 goto drop; 682 goto drop;
685 if (TIPC_SKB_CB(buf)->wakeup_pending) 683 if (TIPC_SKB_CB(skb)->wakeup_pending)
686 return -ELINKCONG; 684 return -ELINKCONG;
687 if (link_schedule_user(link, oport, TIPC_SKB_CB(buf)->chain_sz, imp)) 685 if (link_schedule_user(link, oport, skb_queue_len(list), imp))
688 return -ELINKCONG; 686 return -ELINKCONG;
689drop: 687drop:
690 kfree_skb_list(buf); 688 __skb_queue_purge(list);
691 return -EHOSTUNREACH; 689 return -EHOSTUNREACH;
692} 690}
693 691
694/** 692/**
695 * __tipc_link_xmit(): same as tipc_link_xmit, but destlink is known & locked 693 * __tipc_link_xmit(): same as tipc_link_xmit, but destlink is known & locked
696 * @link: link to use 694 * @link: link to use
697 * @buf: chain of buffers containing message 695 * @list: chain of buffers containing message
696 *
698 * Consumes the buffer chain, except when returning -ELINKCONG 697 * Consumes the buffer chain, except when returning -ELINKCONG
699 * Returns 0 if success, otherwise errno: -ELINKCONG, -EMSGSIZE (plain socket 698 * Returns 0 if success, otherwise errno: -ELINKCONG, -EMSGSIZE (plain socket
700 * user data messages) or -EHOSTUNREACH (all other messages/senders) 699 * user data messages) or -EHOSTUNREACH (all other messages/senders)
701 * Only the socket functions tipc_send_stream() and tipc_send_packet() need 700 * Only the socket functions tipc_send_stream() and tipc_send_packet() need
702 * to act on the return value, since they may need to do more send attempts. 701 * to act on the return value, since they may need to do more send attempts.
703 */ 702 */
704int __tipc_link_xmit(struct tipc_link *link, struct sk_buff *buf) 703int __tipc_link_xmit(struct tipc_link *link, struct sk_buff_head *list)
705{ 704{
706 struct tipc_msg *msg = buf_msg(buf); 705 struct tipc_msg *msg = buf_msg(skb_peek(list));
707 uint psz = msg_size(msg); 706 uint psz = msg_size(msg);
708 uint qsz = link->out_queue_size;
709 uint sndlim = link->queue_limit[0]; 707 uint sndlim = link->queue_limit[0];
710 uint imp = tipc_msg_tot_importance(msg); 708 uint imp = tipc_msg_tot_importance(msg);
711 uint mtu = link->max_pkt; 709 uint mtu = link->max_pkt;
@@ -713,71 +711,83 @@ int __tipc_link_xmit(struct tipc_link *link, struct sk_buff *buf)
713 uint seqno = link->next_out_no; 711 uint seqno = link->next_out_no;
714 uint bc_last_in = link->owner->bclink.last_in; 712 uint bc_last_in = link->owner->bclink.last_in;
715 struct tipc_media_addr *addr = &link->media_addr; 713 struct tipc_media_addr *addr = &link->media_addr;
716 struct sk_buff *next = buf->next; 714 struct sk_buff_head *outqueue = &link->outqueue;
715 struct sk_buff *skb, *tmp;
717 716
718 /* Match queue limits against msg importance: */ 717 /* Match queue limits against msg importance: */
719 if (unlikely(qsz >= link->queue_limit[imp])) 718 if (unlikely(skb_queue_len(outqueue) >= link->queue_limit[imp]))
720 return tipc_link_cong(link, buf); 719 return tipc_link_cong(link, list);
721 720
722 /* Has valid packet limit been used ? */ 721 /* Has valid packet limit been used ? */
723 if (unlikely(psz > mtu)) { 722 if (unlikely(psz > mtu)) {
724 kfree_skb_list(buf); 723 __skb_queue_purge(list);
725 return -EMSGSIZE; 724 return -EMSGSIZE;
726 } 725 }
727 726
728 /* Prepare each packet for sending, and add to outqueue: */ 727 /* Prepare each packet for sending, and add to outqueue: */
729 while (buf) { 728 skb_queue_walk_safe(list, skb, tmp) {
730 next = buf->next; 729 __skb_unlink(skb, list);
731 msg = buf_msg(buf); 730 msg = buf_msg(skb);
732 msg_set_word(msg, 2, ((ack << 16) | mod(seqno))); 731 msg_set_word(msg, 2, ((ack << 16) | mod(seqno)));
733 msg_set_bcast_ack(msg, bc_last_in); 732 msg_set_bcast_ack(msg, bc_last_in);
734 733
735 if (!link->first_out) { 734 if (skb_queue_len(outqueue) < sndlim) {
736 link->first_out = buf; 735 __skb_queue_tail(outqueue, skb);
737 } else if (qsz < sndlim) { 736 tipc_bearer_send(link->bearer_id, skb, addr);
738 link->last_out->next = buf; 737 link->next_out = NULL;
739 } else if (tipc_msg_bundle(link->last_out, buf, mtu)) { 738 link->unacked_window = 0;
739 } else if (tipc_msg_bundle(outqueue, skb, mtu)) {
740 link->stats.sent_bundled++; 740 link->stats.sent_bundled++;
741 buf = next;
742 next = buf->next;
743 continue; 741 continue;
744 } else if (tipc_msg_make_bundle(&buf, mtu, link->addr)) { 742 } else if (tipc_msg_make_bundle(outqueue, skb, mtu,
743 link->addr)) {
745 link->stats.sent_bundled++; 744 link->stats.sent_bundled++;
746 link->stats.sent_bundles++; 745 link->stats.sent_bundles++;
747 link->last_out->next = buf;
748 if (!link->next_out) 746 if (!link->next_out)
749 link->next_out = buf; 747 link->next_out = skb_peek_tail(outqueue);
750 } else { 748 } else {
751 link->last_out->next = buf; 749 __skb_queue_tail(outqueue, skb);
752 if (!link->next_out) 750 if (!link->next_out)
753 link->next_out = buf; 751 link->next_out = skb;
754 }
755
756 /* Send packet if possible: */
757 if (likely(++qsz <= sndlim)) {
758 tipc_bearer_send(link->bearer_id, buf, addr);
759 link->next_out = next;
760 link->unacked_window = 0;
761 } 752 }
762 seqno++; 753 seqno++;
763 link->last_out = buf;
764 buf = next;
765 } 754 }
766 link->next_out_no = seqno; 755 link->next_out_no = seqno;
767 link->out_queue_size = qsz;
768 return 0; 756 return 0;
769} 757}
770 758
759static void skb2list(struct sk_buff *skb, struct sk_buff_head *list)
760{
761 __skb_queue_head_init(list);
762 __skb_queue_tail(list, skb);
763}
764
765static int __tipc_link_xmit_skb(struct tipc_link *link, struct sk_buff *skb)
766{
767 struct sk_buff_head head;
768
769 skb2list(skb, &head);
770 return __tipc_link_xmit(link, &head);
771}
772
773int tipc_link_xmit_skb(struct sk_buff *skb, u32 dnode, u32 selector)
774{
775 struct sk_buff_head head;
776
777 skb2list(skb, &head);
778 return tipc_link_xmit(&head, dnode, selector);
779}
780
771/** 781/**
772 * tipc_link_xmit() is the general link level function for message sending 782 * tipc_link_xmit() is the general link level function for message sending
773 * @buf: chain of buffers containing message 783 * @list: chain of buffers containing message
774 * @dsz: amount of user data to be sent 784 * @dsz: amount of user data to be sent
775 * @dnode: address of destination node 785 * @dnode: address of destination node
776 * @selector: a number used for deterministic link selection 786 * @selector: a number used for deterministic link selection
777 * Consumes the buffer chain, except when returning -ELINKCONG 787 * Consumes the buffer chain, except when returning -ELINKCONG
778 * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE 788 * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE
779 */ 789 */
780int tipc_link_xmit(struct sk_buff *buf, u32 dnode, u32 selector) 790int tipc_link_xmit(struct sk_buff_head *list, u32 dnode, u32 selector)
781{ 791{
782 struct tipc_link *link = NULL; 792 struct tipc_link *link = NULL;
783 struct tipc_node *node; 793 struct tipc_node *node;
@@ -788,17 +798,22 @@ int tipc_link_xmit(struct sk_buff *buf, u32 dnode, u32 selector)
788 tipc_node_lock(node); 798 tipc_node_lock(node);
789 link = node->active_links[selector & 1]; 799 link = node->active_links[selector & 1];
790 if (link) 800 if (link)
791 rc = __tipc_link_xmit(link, buf); 801 rc = __tipc_link_xmit(link, list);
792 tipc_node_unlock(node); 802 tipc_node_unlock(node);
793 } 803 }
794 804
795 if (link) 805 if (link)
796 return rc; 806 return rc;
797 807
798 if (likely(in_own_node(dnode))) 808 if (likely(in_own_node(dnode))) {
799 return tipc_sk_rcv(buf); 809 /* As a node local message chain never contains more than one
810 * buffer, we just need to dequeue one SKB buffer from the
811 * head list.
812 */
813 return tipc_sk_rcv(__skb_dequeue(list));
814 }
815 __skb_queue_purge(list);
800 816
801 kfree_skb_list(buf);
802 return rc; 817 return rc;
803} 818}
804 819
@@ -812,17 +827,17 @@ int tipc_link_xmit(struct sk_buff *buf, u32 dnode, u32 selector)
812 */ 827 */
813static void tipc_link_sync_xmit(struct tipc_link *link) 828static void tipc_link_sync_xmit(struct tipc_link *link)
814{ 829{
815 struct sk_buff *buf; 830 struct sk_buff *skb;
816 struct tipc_msg *msg; 831 struct tipc_msg *msg;
817 832
818 buf = tipc_buf_acquire(INT_H_SIZE); 833 skb = tipc_buf_acquire(INT_H_SIZE);
819 if (!buf) 834 if (!skb)
820 return; 835 return;
821 836
822 msg = buf_msg(buf); 837 msg = buf_msg(skb);
823 tipc_msg_init(msg, BCAST_PROTOCOL, STATE_MSG, INT_H_SIZE, link->addr); 838 tipc_msg_init(msg, BCAST_PROTOCOL, STATE_MSG, INT_H_SIZE, link->addr);
824 msg_set_last_bcast(msg, link->owner->bclink.acked); 839 msg_set_last_bcast(msg, link->owner->bclink.acked);
825 __tipc_link_xmit(link, buf); 840 __tipc_link_xmit_skb(link, skb);
826} 841}
827 842
828/* 843/*
@@ -842,85 +857,46 @@ static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf)
842 kfree_skb(buf); 857 kfree_skb(buf);
843} 858}
844 859
860struct sk_buff *tipc_skb_queue_next(const struct sk_buff_head *list,
861 const struct sk_buff *skb)
862{
863 if (skb_queue_is_last(list, skb))
864 return NULL;
865 return skb->next;
866}
867
845/* 868/*
846 * tipc_link_push_packet: Push one unsent packet to the media 869 * tipc_link_push_packets - push unsent packets to bearer
870 *
871 * Push out the unsent messages of a link where congestion
872 * has abated. Node is locked.
873 *
874 * Called with node locked
847 */ 875 */
848static u32 tipc_link_push_packet(struct tipc_link *l_ptr) 876void tipc_link_push_packets(struct tipc_link *l_ptr)
849{ 877{
850 struct sk_buff *buf = l_ptr->first_out; 878 struct sk_buff_head *outqueue = &l_ptr->outqueue;
851 u32 r_q_size = l_ptr->retransm_queue_size; 879 struct sk_buff *skb = l_ptr->next_out;
852 u32 r_q_head = l_ptr->retransm_queue_head; 880 struct tipc_msg *msg;
853 881 u32 next, first;
854 /* Step to position where retransmission failed, if any, */
855 /* consider that buffers may have been released in meantime */
856 if (r_q_size && buf) {
857 u32 last = lesser(mod(r_q_head + r_q_size),
858 link_last_sent(l_ptr));
859 u32 first = buf_seqno(buf);
860
861 while (buf && less(first, r_q_head)) {
862 first = mod(first + 1);
863 buf = buf->next;
864 }
865 l_ptr->retransm_queue_head = r_q_head = first;
866 l_ptr->retransm_queue_size = r_q_size = mod(last - first);
867 }
868
869 /* Continue retransmission now, if there is anything: */
870 if (r_q_size && buf) {
871 msg_set_ack(buf_msg(buf), mod(l_ptr->next_in_no - 1));
872 msg_set_bcast_ack(buf_msg(buf), l_ptr->owner->bclink.last_in);
873 tipc_bearer_send(l_ptr->bearer_id, buf, &l_ptr->media_addr);
874 l_ptr->retransm_queue_head = mod(++r_q_head);
875 l_ptr->retransm_queue_size = --r_q_size;
876 l_ptr->stats.retransmitted++;
877 return 0;
878 }
879
880 /* Send deferred protocol message, if any: */
881 buf = l_ptr->proto_msg_queue;
882 if (buf) {
883 msg_set_ack(buf_msg(buf), mod(l_ptr->next_in_no - 1));
884 msg_set_bcast_ack(buf_msg(buf), l_ptr->owner->bclink.last_in);
885 tipc_bearer_send(l_ptr->bearer_id, buf, &l_ptr->media_addr);
886 l_ptr->unacked_window = 0;
887 kfree_skb(buf);
888 l_ptr->proto_msg_queue = NULL;
889 return 0;
890 }
891 882
892 /* Send one deferred data message, if send window not full: */ 883 skb_queue_walk_from(outqueue, skb) {
893 buf = l_ptr->next_out; 884 msg = buf_msg(skb);
894 if (buf) { 885 next = msg_seqno(msg);
895 struct tipc_msg *msg = buf_msg(buf); 886 first = buf_seqno(skb_peek(outqueue));
896 u32 next = msg_seqno(msg);
897 u32 first = buf_seqno(l_ptr->first_out);
898 887
899 if (mod(next - first) < l_ptr->queue_limit[0]) { 888 if (mod(next - first) < l_ptr->queue_limit[0]) {
900 msg_set_ack(msg, mod(l_ptr->next_in_no - 1)); 889 msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
901 msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in); 890 msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
902 tipc_bearer_send(l_ptr->bearer_id, buf,
903 &l_ptr->media_addr);
904 if (msg_user(msg) == MSG_BUNDLER) 891 if (msg_user(msg) == MSG_BUNDLER)
905 msg_set_type(msg, BUNDLE_CLOSED); 892 TIPC_SKB_CB(skb)->bundling = false;
906 l_ptr->next_out = buf->next; 893 tipc_bearer_send(l_ptr->bearer_id, skb,
907 return 0; 894 &l_ptr->media_addr);
895 l_ptr->next_out = tipc_skb_queue_next(outqueue, skb);
896 } else {
897 break;
908 } 898 }
909 } 899 }
910 return 1;
911}
912
913/*
914 * push_queue(): push out the unsent messages of a link where
915 * congestion has abated. Node is locked
916 */
917void tipc_link_push_queue(struct tipc_link *l_ptr)
918{
919 u32 res;
920
921 do {
922 res = tipc_link_push_packet(l_ptr);
923 } while (!res);
924} 900}
925 901
926void tipc_link_reset_all(struct tipc_node *node) 902void tipc_link_reset_all(struct tipc_node *node)
@@ -984,20 +960,20 @@ static void link_retransmit_failure(struct tipc_link *l_ptr,
984 } 960 }
985} 961}
986 962
987void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *buf, 963void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *skb,
988 u32 retransmits) 964 u32 retransmits)
989{ 965{
990 struct tipc_msg *msg; 966 struct tipc_msg *msg;
991 967
992 if (!buf) 968 if (!skb)
993 return; 969 return;
994 970
995 msg = buf_msg(buf); 971 msg = buf_msg(skb);
996 972
997 /* Detect repeated retransmit failures */ 973 /* Detect repeated retransmit failures */
998 if (l_ptr->last_retransmitted == msg_seqno(msg)) { 974 if (l_ptr->last_retransmitted == msg_seqno(msg)) {
999 if (++l_ptr->stale_count > 100) { 975 if (++l_ptr->stale_count > 100) {
1000 link_retransmit_failure(l_ptr, buf); 976 link_retransmit_failure(l_ptr, skb);
1001 return; 977 return;
1002 } 978 }
1003 } else { 979 } else {
@@ -1005,38 +981,29 @@ void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *buf,
1005 l_ptr->stale_count = 1; 981 l_ptr->stale_count = 1;
1006 } 982 }
1007 983
1008 while (retransmits && (buf != l_ptr->next_out) && buf) { 984 skb_queue_walk_from(&l_ptr->outqueue, skb) {
1009 msg = buf_msg(buf); 985 if (!retransmits || skb == l_ptr->next_out)
986 break;
987 msg = buf_msg(skb);
1010 msg_set_ack(msg, mod(l_ptr->next_in_no - 1)); 988 msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
1011 msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in); 989 msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
1012 tipc_bearer_send(l_ptr->bearer_id, buf, &l_ptr->media_addr); 990 tipc_bearer_send(l_ptr->bearer_id, skb, &l_ptr->media_addr);
1013 buf = buf->next;
1014 retransmits--; 991 retransmits--;
1015 l_ptr->stats.retransmitted++; 992 l_ptr->stats.retransmitted++;
1016 } 993 }
1017
1018 l_ptr->retransm_queue_head = l_ptr->retransm_queue_size = 0;
1019} 994}
1020 995
1021/** 996static void link_retrieve_defq(struct tipc_link *link,
1022 * link_insert_deferred_queue - insert deferred messages back into receive chain 997 struct sk_buff_head *list)
1023 */
1024static struct sk_buff *link_insert_deferred_queue(struct tipc_link *l_ptr,
1025 struct sk_buff *buf)
1026{ 998{
1027 u32 seq_no; 999 u32 seq_no;
1028 1000
1029 if (l_ptr->oldest_deferred_in == NULL) 1001 if (skb_queue_empty(&link->deferred_queue))
1030 return buf; 1002 return;
1031 1003
1032 seq_no = buf_seqno(l_ptr->oldest_deferred_in); 1004 seq_no = buf_seqno(skb_peek(&link->deferred_queue));
1033 if (seq_no == mod(l_ptr->next_in_no)) { 1005 if (seq_no == mod(link->next_in_no))
1034 l_ptr->newest_deferred_in->next = buf; 1006 skb_queue_splice_tail_init(&link->deferred_queue, list);
1035 buf = l_ptr->oldest_deferred_in;
1036 l_ptr->oldest_deferred_in = NULL;
1037 l_ptr->deferred_inqueue_sz = 0;
1038 }
1039 return buf;
1040} 1007}
1041 1008
1042/** 1009/**
@@ -1096,43 +1063,42 @@ static int link_recv_buf_validate(struct sk_buff *buf)
1096 1063
1097/** 1064/**
1098 * tipc_rcv - process TIPC packets/messages arriving from off-node 1065 * tipc_rcv - process TIPC packets/messages arriving from off-node
1099 * @head: pointer to message buffer chain 1066 * @skb: TIPC packet
1100 * @b_ptr: pointer to bearer message arrived on 1067 * @b_ptr: pointer to bearer message arrived on
1101 * 1068 *
1102 * Invoked with no locks held. Bearer pointer must point to a valid bearer 1069 * Invoked with no locks held. Bearer pointer must point to a valid bearer
1103 * structure (i.e. cannot be NULL), but bearer can be inactive. 1070 * structure (i.e. cannot be NULL), but bearer can be inactive.
1104 */ 1071 */
1105void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr) 1072void tipc_rcv(struct sk_buff *skb, struct tipc_bearer *b_ptr)
1106{ 1073{
1107 while (head) { 1074 struct sk_buff_head head;
1108 struct tipc_node *n_ptr; 1075 struct tipc_node *n_ptr;
1109 struct tipc_link *l_ptr; 1076 struct tipc_link *l_ptr;
1110 struct sk_buff *crs; 1077 struct sk_buff *skb1, *tmp;
1111 struct sk_buff *buf = head; 1078 struct tipc_msg *msg;
1112 struct tipc_msg *msg; 1079 u32 seq_no;
1113 u32 seq_no; 1080 u32 ackd;
1114 u32 ackd; 1081 u32 released;
1115 u32 released = 0;
1116 1082
1117 head = head->next; 1083 skb2list(skb, &head);
1118 buf->next = NULL;
1119 1084
1085 while ((skb = __skb_dequeue(&head))) {
1120 /* Ensure message is well-formed */ 1086 /* Ensure message is well-formed */
1121 if (unlikely(!link_recv_buf_validate(buf))) 1087 if (unlikely(!link_recv_buf_validate(skb)))
1122 goto discard; 1088 goto discard;
1123 1089
1124 /* Ensure message data is a single contiguous unit */ 1090 /* Ensure message data is a single contiguous unit */
1125 if (unlikely(skb_linearize(buf))) 1091 if (unlikely(skb_linearize(skb)))
1126 goto discard; 1092 goto discard;
1127 1093
1128 /* Handle arrival of a non-unicast link message */ 1094 /* Handle arrival of a non-unicast link message */
1129 msg = buf_msg(buf); 1095 msg = buf_msg(skb);
1130 1096
1131 if (unlikely(msg_non_seq(msg))) { 1097 if (unlikely(msg_non_seq(msg))) {
1132 if (msg_user(msg) == LINK_CONFIG) 1098 if (msg_user(msg) == LINK_CONFIG)
1133 tipc_disc_rcv(buf, b_ptr); 1099 tipc_disc_rcv(skb, b_ptr);
1134 else 1100 else
1135 tipc_bclink_rcv(buf); 1101 tipc_bclink_rcv(skb);
1136 continue; 1102 continue;
1137 } 1103 }
1138 1104
@@ -1171,22 +1137,19 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr)
1171 if (n_ptr->bclink.recv_permitted) 1137 if (n_ptr->bclink.recv_permitted)
1172 tipc_bclink_acknowledge(n_ptr, msg_bcast_ack(msg)); 1138 tipc_bclink_acknowledge(n_ptr, msg_bcast_ack(msg));
1173 1139
1174 crs = l_ptr->first_out; 1140 released = 0;
1175 while ((crs != l_ptr->next_out) && 1141 skb_queue_walk_safe(&l_ptr->outqueue, skb1, tmp) {
1176 less_eq(buf_seqno(crs), ackd)) { 1142 if (skb1 == l_ptr->next_out ||
1177 struct sk_buff *next = crs->next; 1143 more(buf_seqno(skb1), ackd))
1178 kfree_skb(crs); 1144 break;
1179 crs = next; 1145 __skb_unlink(skb1, &l_ptr->outqueue);
1180 released++; 1146 kfree_skb(skb1);
1181 } 1147 released = 1;
1182 if (released) {
1183 l_ptr->first_out = crs;
1184 l_ptr->out_queue_size -= released;
1185 } 1148 }
1186 1149
1187 /* Try sending any messages link endpoint has pending */ 1150 /* Try sending any messages link endpoint has pending */
1188 if (unlikely(l_ptr->next_out)) 1151 if (unlikely(l_ptr->next_out))
1189 tipc_link_push_queue(l_ptr); 1152 tipc_link_push_packets(l_ptr);
1190 1153
1191 if (released && !skb_queue_empty(&l_ptr->waiting_sks)) { 1154 if (released && !skb_queue_empty(&l_ptr->waiting_sks)) {
1192 link_prepare_wakeup(l_ptr); 1155 link_prepare_wakeup(l_ptr);
@@ -1196,8 +1159,8 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr)
1196 /* Process the incoming packet */ 1159 /* Process the incoming packet */
1197 if (unlikely(!link_working_working(l_ptr))) { 1160 if (unlikely(!link_working_working(l_ptr))) {
1198 if (msg_user(msg) == LINK_PROTOCOL) { 1161 if (msg_user(msg) == LINK_PROTOCOL) {
1199 tipc_link_proto_rcv(l_ptr, buf); 1162 tipc_link_proto_rcv(l_ptr, skb);
1200 head = link_insert_deferred_queue(l_ptr, head); 1163 link_retrieve_defq(l_ptr, &head);
1201 tipc_node_unlock(n_ptr); 1164 tipc_node_unlock(n_ptr);
1202 continue; 1165 continue;
1203 } 1166 }
@@ -1207,8 +1170,7 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr)
1207 1170
1208 if (link_working_working(l_ptr)) { 1171 if (link_working_working(l_ptr)) {
1209 /* Re-insert buffer in front of queue */ 1172 /* Re-insert buffer in front of queue */
1210 buf->next = head; 1173 __skb_queue_head(&head, skb);
1211 head = buf;
1212 tipc_node_unlock(n_ptr); 1174 tipc_node_unlock(n_ptr);
1213 continue; 1175 continue;
1214 } 1176 }
@@ -1217,33 +1179,33 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr)
1217 1179
1218 /* Link is now in state WORKING_WORKING */ 1180 /* Link is now in state WORKING_WORKING */
1219 if (unlikely(seq_no != mod(l_ptr->next_in_no))) { 1181 if (unlikely(seq_no != mod(l_ptr->next_in_no))) {
1220 link_handle_out_of_seq_msg(l_ptr, buf); 1182 link_handle_out_of_seq_msg(l_ptr, skb);
1221 head = link_insert_deferred_queue(l_ptr, head); 1183 link_retrieve_defq(l_ptr, &head);
1222 tipc_node_unlock(n_ptr); 1184 tipc_node_unlock(n_ptr);
1223 continue; 1185 continue;
1224 } 1186 }
1225 l_ptr->next_in_no++; 1187 l_ptr->next_in_no++;
1226 if (unlikely(l_ptr->oldest_deferred_in)) 1188 if (unlikely(!skb_queue_empty(&l_ptr->deferred_queue)))
1227 head = link_insert_deferred_queue(l_ptr, head); 1189 link_retrieve_defq(l_ptr, &head);
1228 1190
1229 if (unlikely(++l_ptr->unacked_window >= TIPC_MIN_LINK_WIN)) { 1191 if (unlikely(++l_ptr->unacked_window >= TIPC_MIN_LINK_WIN)) {
1230 l_ptr->stats.sent_acks++; 1192 l_ptr->stats.sent_acks++;
1231 tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0, 0); 1193 tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);
1232 } 1194 }
1233 1195
1234 if (tipc_link_prepare_input(l_ptr, &buf)) { 1196 if (tipc_link_prepare_input(l_ptr, &skb)) {
1235 tipc_node_unlock(n_ptr); 1197 tipc_node_unlock(n_ptr);
1236 continue; 1198 continue;
1237 } 1199 }
1238 tipc_node_unlock(n_ptr); 1200 tipc_node_unlock(n_ptr);
1239 msg = buf_msg(buf); 1201
1240 if (tipc_link_input(l_ptr, buf) != 0) 1202 if (tipc_link_input(l_ptr, skb) != 0)
1241 goto discard; 1203 goto discard;
1242 continue; 1204 continue;
1243unlock_discard: 1205unlock_discard:
1244 tipc_node_unlock(n_ptr); 1206 tipc_node_unlock(n_ptr);
1245discard: 1207discard:
1246 kfree_skb(buf); 1208 kfree_skb(skb);
1247 } 1209 }
1248} 1210}
1249 1211
@@ -1326,48 +1288,37 @@ static int tipc_link_input(struct tipc_link *l, struct sk_buff *buf)
1326 * 1288 *
1327 * Returns increase in queue length (i.e. 0 or 1) 1289 * Returns increase in queue length (i.e. 0 or 1)
1328 */ 1290 */
1329u32 tipc_link_defer_pkt(struct sk_buff **head, struct sk_buff **tail, 1291u32 tipc_link_defer_pkt(struct sk_buff_head *list, struct sk_buff *skb)
1330 struct sk_buff *buf)
1331{ 1292{
1332 struct sk_buff *queue_buf; 1293 struct sk_buff *skb1;
1333 struct sk_buff **prev; 1294 u32 seq_no = buf_seqno(skb);
1334 u32 seq_no = buf_seqno(buf);
1335
1336 buf->next = NULL;
1337 1295
1338 /* Empty queue ? */ 1296 /* Empty queue ? */
1339 if (*head == NULL) { 1297 if (skb_queue_empty(list)) {
1340 *head = *tail = buf; 1298 __skb_queue_tail(list, skb);
1341 return 1; 1299 return 1;
1342 } 1300 }
1343 1301
1344 /* Last ? */ 1302 /* Last ? */
1345 if (less(buf_seqno(*tail), seq_no)) { 1303 if (less(buf_seqno(skb_peek_tail(list)), seq_no)) {
1346 (*tail)->next = buf; 1304 __skb_queue_tail(list, skb);
1347 *tail = buf;
1348 return 1; 1305 return 1;
1349 } 1306 }
1350 1307
1351 /* Locate insertion point in queue, then insert; discard if duplicate */ 1308 /* Locate insertion point in queue, then insert; discard if duplicate */
1352 prev = head; 1309 skb_queue_walk(list, skb1) {
1353 queue_buf = *head; 1310 u32 curr_seqno = buf_seqno(skb1);
1354 for (;;) {
1355 u32 curr_seqno = buf_seqno(queue_buf);
1356 1311
1357 if (seq_no == curr_seqno) { 1312 if (seq_no == curr_seqno) {
1358 kfree_skb(buf); 1313 kfree_skb(skb);
1359 return 0; 1314 return 0;
1360 } 1315 }
1361 1316
1362 if (less(seq_no, curr_seqno)) 1317 if (less(seq_no, curr_seqno))
1363 break; 1318 break;
1364
1365 prev = &queue_buf->next;
1366 queue_buf = queue_buf->next;
1367 } 1319 }
1368 1320
1369 buf->next = queue_buf; 1321 __skb_queue_before(list, skb1, skb);
1370 *prev = buf;
1371 return 1; 1322 return 1;
1372} 1323}
1373 1324
@@ -1397,15 +1348,14 @@ static void link_handle_out_of_seq_msg(struct tipc_link *l_ptr,
1397 return; 1348 return;
1398 } 1349 }
1399 1350
1400 if (tipc_link_defer_pkt(&l_ptr->oldest_deferred_in, 1351 if (tipc_link_defer_pkt(&l_ptr->deferred_queue, buf)) {
1401 &l_ptr->newest_deferred_in, buf)) {
1402 l_ptr->deferred_inqueue_sz++;
1403 l_ptr->stats.deferred_recv++; 1352 l_ptr->stats.deferred_recv++;
1404 TIPC_SKB_CB(buf)->deferred = true; 1353 TIPC_SKB_CB(buf)->deferred = true;
1405 if ((l_ptr->deferred_inqueue_sz % 16) == 1) 1354 if ((skb_queue_len(&l_ptr->deferred_queue) % 16) == 1)
1406 tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0, 0); 1355 tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);
1407 } else 1356 } else {
1408 l_ptr->stats.duplicates++; 1357 l_ptr->stats.duplicates++;
1358 }
1409} 1359}
1410 1360
1411/* 1361/*
@@ -1419,12 +1369,6 @@ void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int probe_msg,
1419 u32 msg_size = sizeof(l_ptr->proto_msg); 1369 u32 msg_size = sizeof(l_ptr->proto_msg);
1420 int r_flag; 1370 int r_flag;
1421 1371
1422 /* Discard any previous message that was deferred due to congestion */
1423 if (l_ptr->proto_msg_queue) {
1424 kfree_skb(l_ptr->proto_msg_queue);
1425 l_ptr->proto_msg_queue = NULL;
1426 }
1427
1428 /* Don't send protocol message during link changeover */ 1372 /* Don't send protocol message during link changeover */
1429 if (l_ptr->exp_msg_count) 1373 if (l_ptr->exp_msg_count)
1430 return; 1374 return;
@@ -1447,8 +1391,8 @@ void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int probe_msg,
1447 if (l_ptr->next_out) 1391 if (l_ptr->next_out)
1448 next_sent = buf_seqno(l_ptr->next_out); 1392 next_sent = buf_seqno(l_ptr->next_out);
1449 msg_set_next_sent(msg, next_sent); 1393 msg_set_next_sent(msg, next_sent);
1450 if (l_ptr->oldest_deferred_in) { 1394 if (!skb_queue_empty(&l_ptr->deferred_queue)) {
1451 u32 rec = buf_seqno(l_ptr->oldest_deferred_in); 1395 u32 rec = buf_seqno(skb_peek(&l_ptr->deferred_queue));
1452 gap = mod(rec - mod(l_ptr->next_in_no)); 1396 gap = mod(rec - mod(l_ptr->next_in_no));
1453 } 1397 }
1454 msg_set_seq_gap(msg, gap); 1398 msg_set_seq_gap(msg, gap);
@@ -1636,7 +1580,7 @@ static void tipc_link_proto_rcv(struct tipc_link *l_ptr, struct sk_buff *buf)
1636 } 1580 }
1637 if (msg_seq_gap(msg)) { 1581 if (msg_seq_gap(msg)) {
1638 l_ptr->stats.recv_nacks++; 1582 l_ptr->stats.recv_nacks++;
1639 tipc_link_retransmit(l_ptr, l_ptr->first_out, 1583 tipc_link_retransmit(l_ptr, skb_peek(&l_ptr->outqueue),
1640 msg_seq_gap(msg)); 1584 msg_seq_gap(msg));
1641 } 1585 }
1642 break; 1586 break;
@@ -1655,7 +1599,7 @@ static void tipc_link_tunnel_xmit(struct tipc_link *l_ptr,
1655 u32 selector) 1599 u32 selector)
1656{ 1600{
1657 struct tipc_link *tunnel; 1601 struct tipc_link *tunnel;
1658 struct sk_buff *buf; 1602 struct sk_buff *skb;
1659 u32 length = msg_size(msg); 1603 u32 length = msg_size(msg);
1660 1604
1661 tunnel = l_ptr->owner->active_links[selector & 1]; 1605 tunnel = l_ptr->owner->active_links[selector & 1];
@@ -1664,14 +1608,14 @@ static void tipc_link_tunnel_xmit(struct tipc_link *l_ptr,
1664 return; 1608 return;
1665 } 1609 }
1666 msg_set_size(tunnel_hdr, length + INT_H_SIZE); 1610 msg_set_size(tunnel_hdr, length + INT_H_SIZE);
1667 buf = tipc_buf_acquire(length + INT_H_SIZE); 1611 skb = tipc_buf_acquire(length + INT_H_SIZE);
1668 if (!buf) { 1612 if (!skb) {
1669 pr_warn("%sunable to send tunnel msg\n", link_co_err); 1613 pr_warn("%sunable to send tunnel msg\n", link_co_err);
1670 return; 1614 return;
1671 } 1615 }
1672 skb_copy_to_linear_data(buf, tunnel_hdr, INT_H_SIZE); 1616 skb_copy_to_linear_data(skb, tunnel_hdr, INT_H_SIZE);
1673 skb_copy_to_linear_data_offset(buf, INT_H_SIZE, msg, length); 1617 skb_copy_to_linear_data_offset(skb, INT_H_SIZE, msg, length);
1674 __tipc_link_xmit(tunnel, buf); 1618 __tipc_link_xmit_skb(tunnel, skb);
1675} 1619}
1676 1620
1677 1621
@@ -1683,10 +1627,10 @@ static void tipc_link_tunnel_xmit(struct tipc_link *l_ptr,
1683 */ 1627 */
1684void tipc_link_failover_send_queue(struct tipc_link *l_ptr) 1628void tipc_link_failover_send_queue(struct tipc_link *l_ptr)
1685{ 1629{
1686 u32 msgcount = l_ptr->out_queue_size; 1630 u32 msgcount = skb_queue_len(&l_ptr->outqueue);
1687 struct sk_buff *crs = l_ptr->first_out;
1688 struct tipc_link *tunnel = l_ptr->owner->active_links[0]; 1631 struct tipc_link *tunnel = l_ptr->owner->active_links[0];
1689 struct tipc_msg tunnel_hdr; 1632 struct tipc_msg tunnel_hdr;
1633 struct sk_buff *skb;
1690 int split_bundles; 1634 int split_bundles;
1691 1635
1692 if (!tunnel) 1636 if (!tunnel)
@@ -1697,14 +1641,12 @@ void tipc_link_failover_send_queue(struct tipc_link *l_ptr)
1697 msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id); 1641 msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id);
1698 msg_set_msgcnt(&tunnel_hdr, msgcount); 1642 msg_set_msgcnt(&tunnel_hdr, msgcount);
1699 1643
1700 if (!l_ptr->first_out) { 1644 if (skb_queue_empty(&l_ptr->outqueue)) {
1701 struct sk_buff *buf; 1645 skb = tipc_buf_acquire(INT_H_SIZE);
1702 1646 if (skb) {
1703 buf = tipc_buf_acquire(INT_H_SIZE); 1647 skb_copy_to_linear_data(skb, &tunnel_hdr, INT_H_SIZE);
1704 if (buf) {
1705 skb_copy_to_linear_data(buf, &tunnel_hdr, INT_H_SIZE);
1706 msg_set_size(&tunnel_hdr, INT_H_SIZE); 1648 msg_set_size(&tunnel_hdr, INT_H_SIZE);
1707 __tipc_link_xmit(tunnel, buf); 1649 __tipc_link_xmit_skb(tunnel, skb);
1708 } else { 1650 } else {
1709 pr_warn("%sunable to send changeover msg\n", 1651 pr_warn("%sunable to send changeover msg\n",
1710 link_co_err); 1652 link_co_err);
@@ -1715,8 +1657,8 @@ void tipc_link_failover_send_queue(struct tipc_link *l_ptr)
1715 split_bundles = (l_ptr->owner->active_links[0] != 1657 split_bundles = (l_ptr->owner->active_links[0] !=
1716 l_ptr->owner->active_links[1]); 1658 l_ptr->owner->active_links[1]);
1717 1659
1718 while (crs) { 1660 skb_queue_walk(&l_ptr->outqueue, skb) {
1719 struct tipc_msg *msg = buf_msg(crs); 1661 struct tipc_msg *msg = buf_msg(skb);
1720 1662
1721 if ((msg_user(msg) == MSG_BUNDLER) && split_bundles) { 1663 if ((msg_user(msg) == MSG_BUNDLER) && split_bundles) {
1722 struct tipc_msg *m = msg_get_wrapped(msg); 1664 struct tipc_msg *m = msg_get_wrapped(msg);
@@ -1734,7 +1676,6 @@ void tipc_link_failover_send_queue(struct tipc_link *l_ptr)
1734 tipc_link_tunnel_xmit(l_ptr, &tunnel_hdr, msg, 1676 tipc_link_tunnel_xmit(l_ptr, &tunnel_hdr, msg,
1735 msg_link_selector(msg)); 1677 msg_link_selector(msg));
1736 } 1678 }
1737 crs = crs->next;
1738 } 1679 }
1739} 1680}
1740 1681
@@ -1750,17 +1691,16 @@ void tipc_link_failover_send_queue(struct tipc_link *l_ptr)
1750void tipc_link_dup_queue_xmit(struct tipc_link *l_ptr, 1691void tipc_link_dup_queue_xmit(struct tipc_link *l_ptr,
1751 struct tipc_link *tunnel) 1692 struct tipc_link *tunnel)
1752{ 1693{
1753 struct sk_buff *iter; 1694 struct sk_buff *skb;
1754 struct tipc_msg tunnel_hdr; 1695 struct tipc_msg tunnel_hdr;
1755 1696
1756 tipc_msg_init(&tunnel_hdr, CHANGEOVER_PROTOCOL, 1697 tipc_msg_init(&tunnel_hdr, CHANGEOVER_PROTOCOL,
1757 DUPLICATE_MSG, INT_H_SIZE, l_ptr->addr); 1698 DUPLICATE_MSG, INT_H_SIZE, l_ptr->addr);
1758 msg_set_msgcnt(&tunnel_hdr, l_ptr->out_queue_size); 1699 msg_set_msgcnt(&tunnel_hdr, skb_queue_len(&l_ptr->outqueue));
1759 msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id); 1700 msg_set_bearer_id(&tunnel_hdr, l_ptr->peer_bearer_id);
1760 iter = l_ptr->first_out; 1701 skb_queue_walk(&l_ptr->outqueue, skb) {
1761 while (iter) { 1702 struct sk_buff *outskb;
1762 struct sk_buff *outbuf; 1703 struct tipc_msg *msg = buf_msg(skb);
1763 struct tipc_msg *msg = buf_msg(iter);
1764 u32 length = msg_size(msg); 1704 u32 length = msg_size(msg);
1765 1705
1766 if (msg_user(msg) == MSG_BUNDLER) 1706 if (msg_user(msg) == MSG_BUNDLER)
@@ -1768,19 +1708,18 @@ void tipc_link_dup_queue_xmit(struct tipc_link *l_ptr,
1768 msg_set_ack(msg, mod(l_ptr->next_in_no - 1)); /* Update */ 1708 msg_set_ack(msg, mod(l_ptr->next_in_no - 1)); /* Update */
1769 msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in); 1709 msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
1770 msg_set_size(&tunnel_hdr, length + INT_H_SIZE); 1710 msg_set_size(&tunnel_hdr, length + INT_H_SIZE);
1771 outbuf = tipc_buf_acquire(length + INT_H_SIZE); 1711 outskb = tipc_buf_acquire(length + INT_H_SIZE);
1772 if (outbuf == NULL) { 1712 if (outskb == NULL) {
1773 pr_warn("%sunable to send duplicate msg\n", 1713 pr_warn("%sunable to send duplicate msg\n",
1774 link_co_err); 1714 link_co_err);
1775 return; 1715 return;
1776 } 1716 }
1777 skb_copy_to_linear_data(outbuf, &tunnel_hdr, INT_H_SIZE); 1717 skb_copy_to_linear_data(outskb, &tunnel_hdr, INT_H_SIZE);
1778 skb_copy_to_linear_data_offset(outbuf, INT_H_SIZE, iter->data, 1718 skb_copy_to_linear_data_offset(outskb, INT_H_SIZE, skb->data,
1779 length); 1719 length);
1780 __tipc_link_xmit(tunnel, outbuf); 1720 __tipc_link_xmit_skb(tunnel, outskb);
1781 if (!tipc_link_is_up(l_ptr)) 1721 if (!tipc_link_is_up(l_ptr))
1782 return; 1722 return;
1783 iter = iter->next;
1784 } 1723 }
1785} 1724}
1786 1725
@@ -2375,3 +2314,435 @@ static void link_print(struct tipc_link *l_ptr, const char *str)
2375 else 2314 else
2376 pr_cont("\n"); 2315 pr_cont("\n");
2377} 2316}
2317
2318/* Parse and validate nested (link) properties valid for media, bearer and link
2319 */
2320int tipc_nl_parse_link_prop(struct nlattr *prop, struct nlattr *props[])
2321{
2322 int err;
2323
2324 err = nla_parse_nested(props, TIPC_NLA_PROP_MAX, prop,
2325 tipc_nl_prop_policy);
2326 if (err)
2327 return err;
2328
2329 if (props[TIPC_NLA_PROP_PRIO]) {
2330 u32 prio;
2331
2332 prio = nla_get_u32(props[TIPC_NLA_PROP_PRIO]);
2333 if (prio > TIPC_MAX_LINK_PRI)
2334 return -EINVAL;
2335 }
2336
2337 if (props[TIPC_NLA_PROP_TOL]) {
2338 u32 tol;
2339
2340 tol = nla_get_u32(props[TIPC_NLA_PROP_TOL]);
2341 if ((tol < TIPC_MIN_LINK_TOL) || (tol > TIPC_MAX_LINK_TOL))
2342 return -EINVAL;
2343 }
2344
2345 if (props[TIPC_NLA_PROP_WIN]) {
2346 u32 win;
2347
2348 win = nla_get_u32(props[TIPC_NLA_PROP_WIN]);
2349 if ((win < TIPC_MIN_LINK_WIN) || (win > TIPC_MAX_LINK_WIN))
2350 return -EINVAL;
2351 }
2352
2353 return 0;
2354}
2355
2356int tipc_nl_link_set(struct sk_buff *skb, struct genl_info *info)
2357{
2358 int err;
2359 int res = 0;
2360 int bearer_id;
2361 char *name;
2362 struct tipc_link *link;
2363 struct tipc_node *node;
2364 struct nlattr *attrs[TIPC_NLA_LINK_MAX + 1];
2365
2366 if (!info->attrs[TIPC_NLA_LINK])
2367 return -EINVAL;
2368
2369 err = nla_parse_nested(attrs, TIPC_NLA_LINK_MAX,
2370 info->attrs[TIPC_NLA_LINK],
2371 tipc_nl_link_policy);
2372 if (err)
2373 return err;
2374
2375 if (!attrs[TIPC_NLA_LINK_NAME])
2376 return -EINVAL;
2377
2378 name = nla_data(attrs[TIPC_NLA_LINK_NAME]);
2379
2380 node = tipc_link_find_owner(name, &bearer_id);
2381 if (!node)
2382 return -EINVAL;
2383
2384 tipc_node_lock(node);
2385
2386 link = node->links[bearer_id];
2387 if (!link) {
2388 res = -EINVAL;
2389 goto out;
2390 }
2391
2392 if (attrs[TIPC_NLA_LINK_PROP]) {
2393 struct nlattr *props[TIPC_NLA_PROP_MAX + 1];
2394
2395 err = tipc_nl_parse_link_prop(attrs[TIPC_NLA_LINK_PROP],
2396 props);
2397 if (err) {
2398 res = err;
2399 goto out;
2400 }
2401
2402 if (props[TIPC_NLA_PROP_TOL]) {
2403 u32 tol;
2404
2405 tol = nla_get_u32(props[TIPC_NLA_PROP_TOL]);
2406 link_set_supervision_props(link, tol);
2407 tipc_link_proto_xmit(link, STATE_MSG, 0, 0, tol, 0, 0);
2408 }
2409 if (props[TIPC_NLA_PROP_PRIO]) {
2410 u32 prio;
2411
2412 prio = nla_get_u32(props[TIPC_NLA_PROP_PRIO]);
2413 link->priority = prio;
2414 tipc_link_proto_xmit(link, STATE_MSG, 0, 0, 0, prio, 0);
2415 }
2416 if (props[TIPC_NLA_PROP_WIN]) {
2417 u32 win;
2418
2419 win = nla_get_u32(props[TIPC_NLA_PROP_WIN]);
2420 tipc_link_set_queue_limits(link, win);
2421 }
2422 }
2423
2424out:
2425 tipc_node_unlock(node);
2426
2427 return res;
2428}
2429
2430static int __tipc_nl_add_stats(struct sk_buff *skb, struct tipc_stats *s)
2431{
2432 int i;
2433 struct nlattr *stats;
2434
2435 struct nla_map {
2436 u32 key;
2437 u32 val;
2438 };
2439
2440 struct nla_map map[] = {
2441 {TIPC_NLA_STATS_RX_INFO, s->recv_info},
2442 {TIPC_NLA_STATS_RX_FRAGMENTS, s->recv_fragments},
2443 {TIPC_NLA_STATS_RX_FRAGMENTED, s->recv_fragmented},
2444 {TIPC_NLA_STATS_RX_BUNDLES, s->recv_bundles},
2445 {TIPC_NLA_STATS_RX_BUNDLED, s->recv_bundled},
2446 {TIPC_NLA_STATS_TX_INFO, s->sent_info},
2447 {TIPC_NLA_STATS_TX_FRAGMENTS, s->sent_fragments},
2448 {TIPC_NLA_STATS_TX_FRAGMENTED, s->sent_fragmented},
2449 {TIPC_NLA_STATS_TX_BUNDLES, s->sent_bundles},
2450 {TIPC_NLA_STATS_TX_BUNDLED, s->sent_bundled},
2451 {TIPC_NLA_STATS_MSG_PROF_TOT, (s->msg_length_counts) ?
2452 s->msg_length_counts : 1},
2453 {TIPC_NLA_STATS_MSG_LEN_CNT, s->msg_length_counts},
2454 {TIPC_NLA_STATS_MSG_LEN_TOT, s->msg_lengths_total},
2455 {TIPC_NLA_STATS_MSG_LEN_P0, s->msg_length_profile[0]},
2456 {TIPC_NLA_STATS_MSG_LEN_P1, s->msg_length_profile[1]},
2457 {TIPC_NLA_STATS_MSG_LEN_P2, s->msg_length_profile[2]},
2458 {TIPC_NLA_STATS_MSG_LEN_P3, s->msg_length_profile[3]},
2459 {TIPC_NLA_STATS_MSG_LEN_P4, s->msg_length_profile[4]},
2460 {TIPC_NLA_STATS_MSG_LEN_P5, s->msg_length_profile[5]},
2461 {TIPC_NLA_STATS_MSG_LEN_P6, s->msg_length_profile[6]},
2462 {TIPC_NLA_STATS_RX_STATES, s->recv_states},
2463 {TIPC_NLA_STATS_RX_PROBES, s->recv_probes},
2464 {TIPC_NLA_STATS_RX_NACKS, s->recv_nacks},
2465 {TIPC_NLA_STATS_RX_DEFERRED, s->deferred_recv},
2466 {TIPC_NLA_STATS_TX_STATES, s->sent_states},
2467 {TIPC_NLA_STATS_TX_PROBES, s->sent_probes},
2468 {TIPC_NLA_STATS_TX_NACKS, s->sent_nacks},
2469 {TIPC_NLA_STATS_TX_ACKS, s->sent_acks},
2470 {TIPC_NLA_STATS_RETRANSMITTED, s->retransmitted},
2471 {TIPC_NLA_STATS_DUPLICATES, s->duplicates},
2472 {TIPC_NLA_STATS_LINK_CONGS, s->link_congs},
2473 {TIPC_NLA_STATS_MAX_QUEUE, s->max_queue_sz},
2474 {TIPC_NLA_STATS_AVG_QUEUE, s->queue_sz_counts ?
2475 (s->accu_queue_sz / s->queue_sz_counts) : 0}
2476 };
2477
2478 stats = nla_nest_start(skb, TIPC_NLA_LINK_STATS);
2479 if (!stats)
2480 return -EMSGSIZE;
2481
2482 for (i = 0; i < ARRAY_SIZE(map); i++)
2483 if (nla_put_u32(skb, map[i].key, map[i].val))
2484 goto msg_full;
2485
2486 nla_nest_end(skb, stats);
2487
2488 return 0;
2489msg_full:
2490 nla_nest_cancel(skb, stats);
2491
2492 return -EMSGSIZE;
2493}
2494
2495/* Caller should hold appropriate locks to protect the link */
2496static int __tipc_nl_add_link(struct tipc_nl_msg *msg, struct tipc_link *link)
2497{
2498 int err;
2499 void *hdr;
2500 struct nlattr *attrs;
2501 struct nlattr *prop;
2502
2503 hdr = genlmsg_put(msg->skb, msg->portid, msg->seq, &tipc_genl_v2_family,
2504 NLM_F_MULTI, TIPC_NL_LINK_GET);
2505 if (!hdr)
2506 return -EMSGSIZE;
2507
2508 attrs = nla_nest_start(msg->skb, TIPC_NLA_LINK);
2509 if (!attrs)
2510 goto msg_full;
2511
2512 if (nla_put_string(msg->skb, TIPC_NLA_LINK_NAME, link->name))
2513 goto attr_msg_full;
2514 if (nla_put_u32(msg->skb, TIPC_NLA_LINK_DEST,
2515 tipc_cluster_mask(tipc_own_addr)))
2516 goto attr_msg_full;
2517 if (nla_put_u32(msg->skb, TIPC_NLA_LINK_MTU, link->max_pkt))
2518 goto attr_msg_full;
2519 if (nla_put_u32(msg->skb, TIPC_NLA_LINK_RX, link->next_in_no))
2520 goto attr_msg_full;
2521 if (nla_put_u32(msg->skb, TIPC_NLA_LINK_TX, link->next_out_no))
2522 goto attr_msg_full;
2523
2524 if (tipc_link_is_up(link))
2525 if (nla_put_flag(msg->skb, TIPC_NLA_LINK_UP))
2526 goto attr_msg_full;
2527 if (tipc_link_is_active(link))
2528 if (nla_put_flag(msg->skb, TIPC_NLA_LINK_ACTIVE))
2529 goto attr_msg_full;
2530
2531 prop = nla_nest_start(msg->skb, TIPC_NLA_LINK_PROP);
2532 if (!prop)
2533 goto attr_msg_full;
2534 if (nla_put_u32(msg->skb, TIPC_NLA_PROP_PRIO, link->priority))
2535 goto prop_msg_full;
2536 if (nla_put_u32(msg->skb, TIPC_NLA_PROP_TOL, link->tolerance))
2537 goto prop_msg_full;
2538 if (nla_put_u32(msg->skb, TIPC_NLA_PROP_WIN,
2539 link->queue_limit[TIPC_LOW_IMPORTANCE]))
2540 goto prop_msg_full;
2541 if (nla_put_u32(msg->skb, TIPC_NLA_PROP_PRIO, link->priority))
2542 goto prop_msg_full;
2543 nla_nest_end(msg->skb, prop);
2544
2545 err = __tipc_nl_add_stats(msg->skb, &link->stats);
2546 if (err)
2547 goto attr_msg_full;
2548
2549 nla_nest_end(msg->skb, attrs);
2550 genlmsg_end(msg->skb, hdr);
2551
2552 return 0;
2553
2554prop_msg_full:
2555 nla_nest_cancel(msg->skb, prop);
2556attr_msg_full:
2557 nla_nest_cancel(msg->skb, attrs);
2558msg_full:
2559 genlmsg_cancel(msg->skb, hdr);
2560
2561 return -EMSGSIZE;
2562}
2563
2564/* Caller should hold node lock */
2565static int __tipc_nl_add_node_links(struct tipc_nl_msg *msg,
2566 struct tipc_node *node,
2567 u32 *prev_link)
2568{
2569 u32 i;
2570 int err;
2571
2572 for (i = *prev_link; i < MAX_BEARERS; i++) {
2573 *prev_link = i;
2574
2575 if (!node->links[i])
2576 continue;
2577
2578 err = __tipc_nl_add_link(msg, node->links[i]);
2579 if (err)
2580 return err;
2581 }
2582 *prev_link = 0;
2583
2584 return 0;
2585}
2586
2587int tipc_nl_link_dump(struct sk_buff *skb, struct netlink_callback *cb)
2588{
2589 struct tipc_node *node;
2590 struct tipc_nl_msg msg;
2591 u32 prev_node = cb->args[0];
2592 u32 prev_link = cb->args[1];
2593 int done = cb->args[2];
2594 int err;
2595
2596 if (done)
2597 return 0;
2598
2599 msg.skb = skb;
2600 msg.portid = NETLINK_CB(cb->skb).portid;
2601 msg.seq = cb->nlh->nlmsg_seq;
2602
2603 rcu_read_lock();
2604
2605 if (prev_node) {
2606 node = tipc_node_find(prev_node);
2607 if (!node) {
2608 /* We never set seq or call nl_dump_check_consistent()
2609 * this means that setting prev_seq here will cause the
2610 * consistence check to fail in the netlink callback
2611 * handler. Resulting in the last NLMSG_DONE message
2612 * having the NLM_F_DUMP_INTR flag set.
2613 */
2614 cb->prev_seq = 1;
2615 goto out;
2616 }
2617
2618 list_for_each_entry_continue_rcu(node, &tipc_node_list, list) {
2619 tipc_node_lock(node);
2620 err = __tipc_nl_add_node_links(&msg, node, &prev_link);
2621 tipc_node_unlock(node);
2622 if (err)
2623 goto out;
2624
2625 prev_node = node->addr;
2626 }
2627 } else {
2628 err = tipc_nl_add_bc_link(&msg);
2629 if (err)
2630 goto out;
2631
2632 list_for_each_entry_rcu(node, &tipc_node_list, list) {
2633 tipc_node_lock(node);
2634 err = __tipc_nl_add_node_links(&msg, node, &prev_link);
2635 tipc_node_unlock(node);
2636 if (err)
2637 goto out;
2638
2639 prev_node = node->addr;
2640 }
2641 }
2642 done = 1;
2643out:
2644 rcu_read_unlock();
2645
2646 cb->args[0] = prev_node;
2647 cb->args[1] = prev_link;
2648 cb->args[2] = done;
2649
2650 return skb->len;
2651}
2652
2653int tipc_nl_link_get(struct sk_buff *skb, struct genl_info *info)
2654{
2655 struct sk_buff *ans_skb;
2656 struct tipc_nl_msg msg;
2657 struct tipc_link *link;
2658 struct tipc_node *node;
2659 char *name;
2660 int bearer_id;
2661 int err;
2662
2663 if (!info->attrs[TIPC_NLA_LINK_NAME])
2664 return -EINVAL;
2665
2666 name = nla_data(info->attrs[TIPC_NLA_LINK_NAME]);
2667 node = tipc_link_find_owner(name, &bearer_id);
2668 if (!node)
2669 return -EINVAL;
2670
2671 ans_skb = nlmsg_new(NLMSG_GOODSIZE, GFP_KERNEL);
2672 if (!ans_skb)
2673 return -ENOMEM;
2674
2675 msg.skb = ans_skb;
2676 msg.portid = info->snd_portid;
2677 msg.seq = info->snd_seq;
2678
2679 tipc_node_lock(node);
2680 link = node->links[bearer_id];
2681 if (!link) {
2682 err = -EINVAL;
2683 goto err_out;
2684 }
2685
2686 err = __tipc_nl_add_link(&msg, link);
2687 if (err)
2688 goto err_out;
2689
2690 tipc_node_unlock(node);
2691
2692 return genlmsg_reply(ans_skb, info);
2693
2694err_out:
2695 tipc_node_unlock(node);
2696 nlmsg_free(ans_skb);
2697
2698 return err;
2699}
2700
2701int tipc_nl_link_reset_stats(struct sk_buff *skb, struct genl_info *info)
2702{
2703 int err;
2704 char *link_name;
2705 unsigned int bearer_id;
2706 struct tipc_link *link;
2707 struct tipc_node *node;
2708 struct nlattr *attrs[TIPC_NLA_LINK_MAX + 1];
2709
2710 if (!info->attrs[TIPC_NLA_LINK])
2711 return -EINVAL;
2712
2713 err = nla_parse_nested(attrs, TIPC_NLA_LINK_MAX,
2714 info->attrs[TIPC_NLA_LINK],
2715 tipc_nl_link_policy);
2716 if (err)
2717 return err;
2718
2719 if (!attrs[TIPC_NLA_LINK_NAME])
2720 return -EINVAL;
2721
2722 link_name = nla_data(attrs[TIPC_NLA_LINK_NAME]);
2723
2724 if (strcmp(link_name, tipc_bclink_name) == 0) {
2725 err = tipc_bclink_reset_stats();
2726 if (err)
2727 return err;
2728 return 0;
2729 }
2730
2731 node = tipc_link_find_owner(link_name, &bearer_id);
2732 if (!node)
2733 return -EINVAL;
2734
2735 tipc_node_lock(node);
2736
2737 link = node->links[bearer_id];
2738 if (!link) {
2739 tipc_node_unlock(node);
2740 return -EINVAL;
2741 }
2742
2743 link_reset_statistics(link);
2744
2745 tipc_node_unlock(node);
2746
2747 return 0;
2748}