aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc/link.c
diff options
context:
space:
mode:
authorJon Paul Maloy <jon.maloy@ericsson.com>2015-10-22 08:51:41 -0400
committerDavid S. Miller <davem@davemloft.net>2015-10-24 09:56:37 -0400
commit5266698661401afc5e4a1a521cf9ba10724d10dd (patch)
treecf3d466a2d9982f403a689e8a0c819c7e3693bde /net/tipc/link.c
parentfd556f209af53b9cdc45df8c467feb235376c4df (diff)
tipc: let broadcast packet reception use new link receive function
The code path for receiving broadcast packets is currently distinct from the unicast path. This leads to unnecessary code and data duplication, something that can be avoided with some effort. We now introduce separate per-peer tipc_link instances for handling broadcast packet reception. Each receive link keeps a pointer to the common, single, broadcast link instance, and can hence handle release and retransmission of send buffers as if they belonged to the own instance. Furthermore, we let each unicast link instance keep a reference to both the pertaining broadcast receive link, and to the common send link. This makes it possible for the unicast links to easily access data for broadcast link synchronization, as well as for carrying acknowledges for received broadcast packets. Signed-off-by: Jon Maloy <jon.maloy@ericsson.com> Reviewed-by: Ying Xue <ying.xue@windriver.com> Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net/tipc/link.c')
-rw-r--r--net/tipc/link.c435
1 files changed, 312 insertions, 123 deletions
diff --git a/net/tipc/link.c b/net/tipc/link.c
index 6a1a9d9239ae..ff725c398914 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -76,6 +76,14 @@ static const struct nla_policy tipc_nl_prop_policy[TIPC_NLA_PROP_MAX + 1] = {
76 [TIPC_NLA_PROP_WIN] = { .type = NLA_U32 } 76 [TIPC_NLA_PROP_WIN] = { .type = NLA_U32 }
77}; 77};
78 78
79/* Send states for broadcast NACKs
80 */
81enum {
82 BC_NACK_SND_CONDITIONAL,
83 BC_NACK_SND_UNCONDITIONAL,
84 BC_NACK_SND_SUPPRESS,
85};
86
79/* 87/*
80 * Interval between NACKs when packets arrive out of order 88 * Interval between NACKs when packets arrive out of order
81 */ 89 */
@@ -111,7 +119,11 @@ static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe,
111 struct sk_buff_head *xmitq); 119 struct sk_buff_head *xmitq);
112static void link_reset_statistics(struct tipc_link *l_ptr); 120static void link_reset_statistics(struct tipc_link *l_ptr);
113static void link_print(struct tipc_link *l_ptr, const char *str); 121static void link_print(struct tipc_link *l_ptr, const char *str);
114static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf); 122static void tipc_link_build_nack_msg(struct tipc_link *l,
123 struct sk_buff_head *xmitq);
124static void tipc_link_build_bc_init_msg(struct tipc_link *l,
125 struct sk_buff_head *xmitq);
126static bool tipc_link_release_pkts(struct tipc_link *l, u16 to);
115 127
116/* 128/*
117 * Simple non-static link routines (i.e. referenced outside this file) 129 * Simple non-static link routines (i.e. referenced outside this file)
@@ -151,6 +163,16 @@ bool tipc_link_is_blocked(struct tipc_link *l)
151 return l->state & (LINK_RESETTING | LINK_PEER_RESET | LINK_FAILINGOVER); 163 return l->state & (LINK_RESETTING | LINK_PEER_RESET | LINK_FAILINGOVER);
152} 164}
153 165
166bool link_is_bc_sndlink(struct tipc_link *l)
167{
168 return !l->bc_sndlink;
169}
170
171bool link_is_bc_rcvlink(struct tipc_link *l)
172{
173 return ((l->bc_rcvlink == l) && !link_is_bc_sndlink(l));
174}
175
154int tipc_link_is_active(struct tipc_link *l) 176int tipc_link_is_active(struct tipc_link *l)
155{ 177{
156 struct tipc_node *n = l->owner; 178 struct tipc_node *n = l->owner;
@@ -158,14 +180,31 @@ int tipc_link_is_active(struct tipc_link *l)
158 return (node_active_link(n, 0) == l) || (node_active_link(n, 1) == l); 180 return (node_active_link(n, 0) == l) || (node_active_link(n, 1) == l);
159} 181}
160 182
161void tipc_link_add_bc_peer(struct tipc_link *l) 183void tipc_link_add_bc_peer(struct tipc_link *snd_l,
184 struct tipc_link *uc_l,
185 struct sk_buff_head *xmitq)
162{ 186{
163 l->ackers++; 187 struct tipc_link *rcv_l = uc_l->bc_rcvlink;
188
189 snd_l->ackers++;
190 rcv_l->acked = snd_l->snd_nxt - 1;
191 tipc_link_build_bc_init_msg(uc_l, xmitq);
164} 192}
165 193
166void tipc_link_remove_bc_peer(struct tipc_link *l) 194void tipc_link_remove_bc_peer(struct tipc_link *snd_l,
195 struct tipc_link *rcv_l,
196 struct sk_buff_head *xmitq)
167{ 197{
168 l->ackers--; 198 u16 ack = snd_l->snd_nxt - 1;
199
200 snd_l->ackers--;
201 tipc_link_bc_ack_rcv(rcv_l, ack, xmitq);
202 tipc_link_reset(rcv_l);
203 rcv_l->state = LINK_RESET;
204 if (!snd_l->ackers) {
205 tipc_link_reset(snd_l);
206 __skb_queue_purge(xmitq);
207 }
169} 208}
170 209
171int tipc_link_bc_peers(struct tipc_link *l) 210int tipc_link_bc_peers(struct tipc_link *l)
@@ -193,6 +232,8 @@ static u32 link_own_addr(struct tipc_link *l)
193 * @peer: node id of peer node 232 * @peer: node id of peer node
194 * @peer_caps: bitmap describing peer node capabilities 233 * @peer_caps: bitmap describing peer node capabilities
195 * @maddr: media address to be used 234 * @maddr: media address to be used
235 * @bc_sndlink: the namespace global link used for broadcast sending
236 * @bc_rcvlink: the peer specific link used for broadcast reception
196 * @inputq: queue to put messages ready for delivery 237 * @inputq: queue to put messages ready for delivery
197 * @namedq: queue to put binding table update messages ready for delivery 238 * @namedq: queue to put binding table update messages ready for delivery
198 * @link: return value, pointer to put the created link 239 * @link: return value, pointer to put the created link
@@ -202,8 +243,12 @@ static u32 link_own_addr(struct tipc_link *l)
202bool tipc_link_create(struct tipc_node *n, char *if_name, int bearer_id, 243bool tipc_link_create(struct tipc_node *n, char *if_name, int bearer_id,
203 int tolerance, char net_plane, u32 mtu, int priority, 244 int tolerance, char net_plane, u32 mtu, int priority,
204 int window, u32 session, u32 ownnode, u32 peer, 245 int window, u32 session, u32 ownnode, u32 peer,
205 u16 peer_caps, struct tipc_media_addr *maddr, 246 u16 peer_caps,
206 struct sk_buff_head *inputq, struct sk_buff_head *namedq, 247 struct tipc_media_addr *maddr,
248 struct tipc_link *bc_sndlink,
249 struct tipc_link *bc_rcvlink,
250 struct sk_buff_head *inputq,
251 struct sk_buff_head *namedq,
207 struct tipc_link **link) 252 struct tipc_link **link)
208{ 253{
209 struct tipc_link *l; 254 struct tipc_link *l;
@@ -239,6 +284,8 @@ bool tipc_link_create(struct tipc_node *n, char *if_name, int bearer_id,
239 l->priority = priority; 284 l->priority = priority;
240 tipc_link_set_queue_limits(l, window); 285 tipc_link_set_queue_limits(l, window);
241 l->ackers = 1; 286 l->ackers = 1;
287 l->bc_sndlink = bc_sndlink;
288 l->bc_rcvlink = bc_rcvlink;
242 l->inputq = inputq; 289 l->inputq = inputq;
243 l->namedq = namedq; 290 l->namedq = namedq;
244 l->state = LINK_RESETTING; 291 l->state = LINK_RESETTING;
@@ -261,46 +308,32 @@ bool tipc_link_create(struct tipc_node *n, char *if_name, int bearer_id,
261 * 308 *
262 * Returns true if link was created, otherwise false 309 * Returns true if link was created, otherwise false
263 */ 310 */
264bool tipc_link_bc_create(struct tipc_node *n, int mtu, int window, 311bool tipc_link_bc_create(struct tipc_node *n, u32 ownnode, u32 peer,
265 u16 peer_caps, 312 int mtu, int window, u16 peer_caps,
266 struct sk_buff_head *inputq, 313 struct sk_buff_head *inputq,
267 struct sk_buff_head *namedq, 314 struct sk_buff_head *namedq,
315 struct tipc_link *bc_sndlink,
268 struct tipc_link **link) 316 struct tipc_link **link)
269{ 317{
270 struct tipc_link *l; 318 struct tipc_link *l;
271 319
272 if (!tipc_link_create(n, "", MAX_BEARERS, 0, 'Z', mtu, 0, window, 320 if (!tipc_link_create(n, "", MAX_BEARERS, 0, 'Z', mtu, 0, window,
273 0, 0, 0, peer_caps, NULL, inputq, namedq, link)) 321 0, ownnode, peer, peer_caps, NULL, bc_sndlink,
322 NULL, inputq, namedq, link))
274 return false; 323 return false;
275 324
276 l = *link; 325 l = *link;
277 strcpy(l->name, tipc_bclink_name); 326 strcpy(l->name, tipc_bclink_name);
278 tipc_link_reset(l); 327 tipc_link_reset(l);
328 l->state = LINK_RESET;
279 l->ackers = 0; 329 l->ackers = 0;
280 return true; 330 l->bc_rcvlink = l;
281}
282 331
283/* tipc_link_build_bcast_sync_msg() - synchronize broadcast link endpoints. 332 /* Broadcast send link is always up */
284 * 333 if (link_is_bc_sndlink(l))
285 * Give a newly added peer node the sequence number where it should 334 l->state = LINK_ESTABLISHED;
286 * start receiving and acking broadcast packets.
287 */
288void tipc_link_build_bcast_sync_msg(struct tipc_link *l,
289 struct sk_buff_head *xmitq)
290{
291 struct sk_buff *skb;
292 struct sk_buff_head list;
293 u16 last_sent;
294 335
295 skb = tipc_msg_create(BCAST_PROTOCOL, STATE_MSG, INT_H_SIZE, 336 return true;
296 0, l->addr, link_own_addr(l), 0, 0, 0);
297 if (!skb)
298 return;
299 last_sent = tipc_bclink_get_last_sent(l->owner->net);
300 msg_set_last_bcast(buf_msg(skb), last_sent);
301 __skb_queue_head_init(&list);
302 __skb_queue_tail(&list, skb);
303 tipc_link_xmit(l, &list, xmitq);
304} 337}
305 338
306/** 339/**
@@ -507,12 +540,17 @@ static void link_profile_stats(struct tipc_link *l)
507 540
508/* tipc_link_timeout - perform periodic task as instructed from node timeout 541/* tipc_link_timeout - perform periodic task as instructed from node timeout
509 */ 542 */
543/* tipc_link_timeout - perform periodic task as instructed from node timeout
544 */
510int tipc_link_timeout(struct tipc_link *l, struct sk_buff_head *xmitq) 545int tipc_link_timeout(struct tipc_link *l, struct sk_buff_head *xmitq)
511{ 546{
512 int rc = 0; 547 int rc = 0;
513 int mtyp = STATE_MSG; 548 int mtyp = STATE_MSG;
514 bool xmit = false; 549 bool xmit = false;
515 bool prb = false; 550 bool prb = false;
551 u16 bc_snt = l->bc_sndlink->snd_nxt - 1;
552 u16 bc_acked = l->bc_rcvlink->acked;
553 bool bc_up = link_is_up(l->bc_rcvlink);
516 554
517 link_profile_stats(l); 555 link_profile_stats(l);
518 556
@@ -520,7 +558,7 @@ int tipc_link_timeout(struct tipc_link *l, struct sk_buff_head *xmitq)
520 case LINK_ESTABLISHED: 558 case LINK_ESTABLISHED:
521 case LINK_SYNCHING: 559 case LINK_SYNCHING:
522 if (!l->silent_intv_cnt) { 560 if (!l->silent_intv_cnt) {
523 if (tipc_bclink_acks_missing(l->owner)) 561 if (bc_up && (bc_acked != bc_snt))
524 xmit = true; 562 xmit = true;
525 } else if (l->silent_intv_cnt <= l->abort_limit) { 563 } else if (l->silent_intv_cnt <= l->abort_limit) {
526 xmit = true; 564 xmit = true;
@@ -671,6 +709,7 @@ void tipc_link_reset(struct tipc_link *l)
671 l->silent_intv_cnt = 0; 709 l->silent_intv_cnt = 0;
672 l->stats.recv_info = 0; 710 l->stats.recv_info = 0;
673 l->stale_count = 0; 711 l->stale_count = 0;
712 l->bc_peer_is_up = false;
674 link_reset_statistics(l); 713 link_reset_statistics(l);
675} 714}
676 715
@@ -692,7 +731,7 @@ int __tipc_link_xmit(struct net *net, struct tipc_link *link,
692 uint mtu = link->mtu; 731 uint mtu = link->mtu;
693 u16 ack = mod(link->rcv_nxt - 1); 732 u16 ack = mod(link->rcv_nxt - 1);
694 u16 seqno = link->snd_nxt; 733 u16 seqno = link->snd_nxt;
695 u16 bc_last_in = link->owner->bclink.last_in; 734 u16 bc_ack = link->bc_rcvlink->rcv_nxt - 1;
696 struct tipc_media_addr *addr = link->media_addr; 735 struct tipc_media_addr *addr = link->media_addr;
697 struct sk_buff_head *transmq = &link->transmq; 736 struct sk_buff_head *transmq = &link->transmq;
698 struct sk_buff_head *backlogq = &link->backlogq; 737 struct sk_buff_head *backlogq = &link->backlogq;
@@ -712,7 +751,7 @@ int __tipc_link_xmit(struct net *net, struct tipc_link *link,
712 msg = buf_msg(skb); 751 msg = buf_msg(skb);
713 msg_set_seqno(msg, seqno); 752 msg_set_seqno(msg, seqno);
714 msg_set_ack(msg, ack); 753 msg_set_ack(msg, ack);
715 msg_set_bcast_ack(msg, bc_last_in); 754 msg_set_bcast_ack(msg, bc_ack);
716 755
717 if (likely(skb_queue_len(transmq) < maxwin)) { 756 if (likely(skb_queue_len(transmq) < maxwin)) {
718 __skb_dequeue(list); 757 __skb_dequeue(list);
@@ -762,7 +801,7 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list,
762 unsigned int mtu = l->mtu; 801 unsigned int mtu = l->mtu;
763 u16 ack = l->rcv_nxt - 1; 802 u16 ack = l->rcv_nxt - 1;
764 u16 seqno = l->snd_nxt; 803 u16 seqno = l->snd_nxt;
765 u16 bc_last_in = l->owner->bclink.last_in; 804 u16 bc_ack = l->bc_rcvlink->rcv_nxt - 1;
766 struct sk_buff_head *transmq = &l->transmq; 805 struct sk_buff_head *transmq = &l->transmq;
767 struct sk_buff_head *backlogq = &l->backlogq; 806 struct sk_buff_head *backlogq = &l->backlogq;
768 struct sk_buff *skb, *_skb, *bskb; 807 struct sk_buff *skb, *_skb, *bskb;
@@ -781,7 +820,7 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list,
781 hdr = buf_msg(skb); 820 hdr = buf_msg(skb);
782 msg_set_seqno(hdr, seqno); 821 msg_set_seqno(hdr, seqno);
783 msg_set_ack(hdr, ack); 822 msg_set_ack(hdr, ack);
784 msg_set_bcast_ack(hdr, bc_last_in); 823 msg_set_bcast_ack(hdr, bc_ack);
785 824
786 if (likely(skb_queue_len(transmq) < maxwin)) { 825 if (likely(skb_queue_len(transmq) < maxwin)) {
787 _skb = skb_clone(skb, GFP_ATOMIC); 826 _skb = skb_clone(skb, GFP_ATOMIC);
@@ -816,23 +855,6 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list,
816} 855}
817 856
818/* 857/*
819 * tipc_link_sync_rcv - synchronize broadcast link endpoints.
820 * Receive the sequence number where we should start receiving and
821 * acking broadcast packets from a newly added peer node, and open
822 * up for reception of such packets.
823 *
824 * Called with node locked
825 */
826static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf)
827{
828 struct tipc_msg *msg = buf_msg(buf);
829
830 n->bclink.last_sent = n->bclink.last_in = msg_last_bcast(msg);
831 n->bclink.recv_permitted = true;
832 kfree_skb(buf);
833}
834
835/*
836 * tipc_link_push_packets - push unsent packets to bearer 858 * tipc_link_push_packets - push unsent packets to bearer
837 * 859 *
838 * Push out the unsent messages of a link where congestion 860 * Push out the unsent messages of a link where congestion
@@ -872,6 +894,7 @@ void tipc_link_advance_backlog(struct tipc_link *l, struct sk_buff_head *xmitq)
872 struct tipc_msg *hdr; 894 struct tipc_msg *hdr;
873 u16 seqno = l->snd_nxt; 895 u16 seqno = l->snd_nxt;
874 u16 ack = l->rcv_nxt - 1; 896 u16 ack = l->rcv_nxt - 1;
897 u16 bc_ack = l->bc_rcvlink->rcv_nxt - 1;
875 898
876 while (skb_queue_len(&l->transmq) < l->window) { 899 while (skb_queue_len(&l->transmq) < l->window) {
877 skb = skb_peek(&l->backlogq); 900 skb = skb_peek(&l->backlogq);
@@ -886,54 +909,25 @@ void tipc_link_advance_backlog(struct tipc_link *l, struct sk_buff_head *xmitq)
886 __skb_queue_tail(&l->transmq, skb); 909 __skb_queue_tail(&l->transmq, skb);
887 __skb_queue_tail(xmitq, _skb); 910 __skb_queue_tail(xmitq, _skb);
888 TIPC_SKB_CB(skb)->ackers = l->ackers; 911 TIPC_SKB_CB(skb)->ackers = l->ackers;
889 msg_set_ack(hdr, ack);
890 msg_set_seqno(hdr, seqno); 912 msg_set_seqno(hdr, seqno);
891 msg_set_bcast_ack(hdr, l->owner->bclink.last_in); 913 msg_set_ack(hdr, ack);
914 msg_set_bcast_ack(hdr, bc_ack);
892 l->rcv_unacked = 0; 915 l->rcv_unacked = 0;
893 seqno++; 916 seqno++;
894 } 917 }
895 l->snd_nxt = seqno; 918 l->snd_nxt = seqno;
896} 919}
897 920
898static void link_retransmit_failure(struct tipc_link *l_ptr, 921static void link_retransmit_failure(struct tipc_link *l, struct sk_buff *skb)
899 struct sk_buff *buf)
900{ 922{
901 struct tipc_msg *msg = buf_msg(buf); 923 struct tipc_msg *hdr = buf_msg(skb);
902 struct net *net = l_ptr->owner->net; 924
903 925 pr_warn("Retransmission failure on link <%s>\n", l->name);
904 pr_warn("Retransmission failure on link <%s>\n", l_ptr->name); 926 link_print(l, "Resetting link ");
905 927 pr_info("Failed msg: usr %u, typ %u, len %u, err %u\n",
906 if (l_ptr->addr) { 928 msg_user(hdr), msg_type(hdr), msg_size(hdr), msg_errcode(hdr));
907 /* Handle failure on standard link */ 929 pr_info("sqno %u, prev: %x, src: %x\n",
908 link_print(l_ptr, "Resetting link "); 930 msg_seqno(hdr), msg_prevnode(hdr), msg_orignode(hdr));
909 pr_info("Failed msg: usr %u, typ %u, len %u, err %u\n",
910 msg_user(msg), msg_type(msg), msg_size(msg),
911 msg_errcode(msg));
912 pr_info("sqno %u, prev: %x, src: %x\n",
913 msg_seqno(msg), msg_prevnode(msg), msg_orignode(msg));
914 } else {
915 /* Handle failure on broadcast link */
916 struct tipc_node *n_ptr;
917 char addr_string[16];
918
919 pr_info("Msg seq number: %u, ", msg_seqno(msg));
920 pr_cont("Outstanding acks: %u\n", TIPC_SKB_CB(buf)->ackers);
921
922 n_ptr = tipc_bclink_retransmit_to(net);
923
924 tipc_addr_string_fill(addr_string, n_ptr->addr);
925 pr_info("Broadcast link info for %s\n", addr_string);
926 pr_info("Reception permitted: %d, Acked: %u\n",
927 n_ptr->bclink.recv_permitted,
928 n_ptr->bclink.acked);
929 pr_info("Last in: %u, Oos state: %u, Last sent: %u\n",
930 n_ptr->bclink.last_in,
931 n_ptr->bclink.oos_state,
932 n_ptr->bclink.last_sent);
933
934 n_ptr->action_flags |= TIPC_BCAST_RESET;
935 l_ptr->stale_count = 0;
936 }
937} 931}
938 932
939void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *skb, 933void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *skb,
@@ -976,7 +970,7 @@ int tipc_link_retrans(struct tipc_link *l, u16 from, u16 to,
976 struct sk_buff *_skb, *skb = skb_peek(&l->transmq); 970 struct sk_buff *_skb, *skb = skb_peek(&l->transmq);
977 struct tipc_msg *hdr; 971 struct tipc_msg *hdr;
978 u16 ack = l->rcv_nxt - 1; 972 u16 ack = l->rcv_nxt - 1;
979 u16 bc_ack = l->owner->bclink.last_in; 973 u16 bc_ack = l->bc_rcvlink->rcv_nxt - 1;
980 974
981 if (!skb) 975 if (!skb)
982 return 0; 976 return 0;
@@ -1018,11 +1012,9 @@ int tipc_link_retrans(struct tipc_link *l, u16 from, u16 to,
1018 * Consumes buffer if message is of right type 1012 * Consumes buffer if message is of right type
1019 * Node lock must be held 1013 * Node lock must be held
1020 */ 1014 */
1021static bool tipc_data_input(struct tipc_link *link, struct sk_buff *skb, 1015static bool tipc_data_input(struct tipc_link *l, struct sk_buff *skb,
1022 struct sk_buff_head *inputq) 1016 struct sk_buff_head *inputq)
1023{ 1017{
1024 struct tipc_node *node = link->owner;
1025
1026 switch (msg_user(buf_msg(skb))) { 1018 switch (msg_user(buf_msg(skb))) {
1027 case TIPC_LOW_IMPORTANCE: 1019 case TIPC_LOW_IMPORTANCE:
1028 case TIPC_MEDIUM_IMPORTANCE: 1020 case TIPC_MEDIUM_IMPORTANCE:
@@ -1032,8 +1024,8 @@ static bool tipc_data_input(struct tipc_link *link, struct sk_buff *skb,
1032 skb_queue_tail(inputq, skb); 1024 skb_queue_tail(inputq, skb);
1033 return true; 1025 return true;
1034 case NAME_DISTRIBUTOR: 1026 case NAME_DISTRIBUTOR:
1035 node->bclink.recv_permitted = true; 1027 l->bc_rcvlink->state = LINK_ESTABLISHED;
1036 skb_queue_tail(link->namedq, skb); 1028 skb_queue_tail(l->namedq, skb);
1037 return true; 1029 return true;
1038 case MSG_BUNDLER: 1030 case MSG_BUNDLER:
1039 case TUNNEL_PROTOCOL: 1031 case TUNNEL_PROTOCOL:
@@ -1054,7 +1046,6 @@ static bool tipc_data_input(struct tipc_link *link, struct sk_buff *skb,
1054static int tipc_link_input(struct tipc_link *l, struct sk_buff *skb, 1046static int tipc_link_input(struct tipc_link *l, struct sk_buff *skb,
1055 struct sk_buff_head *inputq) 1047 struct sk_buff_head *inputq)
1056{ 1048{
1057 struct tipc_node *node = l->owner;
1058 struct tipc_msg *hdr = buf_msg(skb); 1049 struct tipc_msg *hdr = buf_msg(skb);
1059 struct sk_buff **reasm_skb = &l->reasm_buf; 1050 struct sk_buff **reasm_skb = &l->reasm_buf;
1060 struct sk_buff *iskb; 1051 struct sk_buff *iskb;
@@ -1095,13 +1086,15 @@ static int tipc_link_input(struct tipc_link *l, struct sk_buff *skb,
1095 if (tipc_buf_append(reasm_skb, &skb)) { 1086 if (tipc_buf_append(reasm_skb, &skb)) {
1096 l->stats.recv_fragmented++; 1087 l->stats.recv_fragmented++;
1097 tipc_data_input(l, skb, inputq); 1088 tipc_data_input(l, skb, inputq);
1098 } else if (!*reasm_skb) { 1089 } else if (!*reasm_skb && !link_is_bc_rcvlink(l)) {
1090 pr_warn_ratelimited("Unable to build fragment list\n");
1099 return tipc_link_fsm_evt(l, LINK_FAILURE_EVT); 1091 return tipc_link_fsm_evt(l, LINK_FAILURE_EVT);
1100 } 1092 }
1101 return 0; 1093 return 0;
1102 } else if (usr == BCAST_PROTOCOL) { 1094 } else if (usr == BCAST_PROTOCOL) {
1103 tipc_link_sync_rcv(node, skb); 1095 tipc_bcast_lock(l->owner->net);
1104 return 0; 1096 tipc_link_bc_init_rcv(l->bc_rcvlink, hdr);
1097 tipc_bcast_unlock(l->owner->net);
1105 } 1098 }
1106drop: 1099drop:
1107 kfree_skb(skb); 1100 kfree_skb(skb);
@@ -1124,12 +1117,28 @@ static bool tipc_link_release_pkts(struct tipc_link *l, u16 acked)
1124} 1117}
1125 1118
1126/* tipc_link_build_ack_msg: prepare link acknowledge message for transmission 1119/* tipc_link_build_ack_msg: prepare link acknowledge message for transmission
1120 *
1121 * Note that sending of broadcast ack is coordinated among nodes, to reduce
1122 * risk of ack storms towards the sender
1127 */ 1123 */
1128void tipc_link_build_ack_msg(struct tipc_link *l, struct sk_buff_head *xmitq) 1124int tipc_link_build_ack_msg(struct tipc_link *l, struct sk_buff_head *xmitq)
1129{ 1125{
1126 if (!l)
1127 return 0;
1128
1129 /* Broadcast ACK must be sent via a unicast link => defer to caller */
1130 if (link_is_bc_rcvlink(l)) {
1131 if (((l->rcv_nxt ^ link_own_addr(l)) & 0xf) != 0xf)
1132 return 0;
1133 l->rcv_unacked = 0;
1134 return TIPC_LINK_SND_BC_ACK;
1135 }
1136
1137 /* Unicast ACK */
1130 l->rcv_unacked = 0; 1138 l->rcv_unacked = 0;
1131 l->stats.sent_acks++; 1139 l->stats.sent_acks++;
1132 tipc_link_build_proto_msg(l, STATE_MSG, 0, 0, 0, 0, xmitq); 1140 tipc_link_build_proto_msg(l, STATE_MSG, 0, 0, 0, 0, xmitq);
1141 return 0;
1133} 1142}
1134 1143
1135/* tipc_link_build_reset_msg: prepare link RESET or ACTIVATE message 1144/* tipc_link_build_reset_msg: prepare link RESET or ACTIVATE message
@@ -1151,6 +1160,9 @@ static void tipc_link_build_nack_msg(struct tipc_link *l,
1151{ 1160{
1152 u32 def_cnt = ++l->stats.deferred_recv; 1161 u32 def_cnt = ++l->stats.deferred_recv;
1153 1162
1163 if (link_is_bc_rcvlink(l))
1164 return;
1165
1154 if ((skb_queue_len(&l->deferdq) == 1) || !(def_cnt % TIPC_NACK_INTV)) 1166 if ((skb_queue_len(&l->deferdq) == 1) || !(def_cnt % TIPC_NACK_INTV))
1155 tipc_link_build_proto_msg(l, STATE_MSG, 0, 0, 0, 0, xmitq); 1167 tipc_link_build_proto_msg(l, STATE_MSG, 0, 0, 0, 0, xmitq);
1156} 1168}
@@ -1211,12 +1223,11 @@ int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb,
1211 l->rcv_nxt++; 1223 l->rcv_nxt++;
1212 l->stats.recv_info++; 1224 l->stats.recv_info++;
1213 if (!tipc_data_input(l, skb, l->inputq)) 1225 if (!tipc_data_input(l, skb, l->inputq))
1214 rc = tipc_link_input(l, skb, l->inputq); 1226 rc |= tipc_link_input(l, skb, l->inputq);
1215 if (unlikely(rc))
1216 break;
1217 if (unlikely(++l->rcv_unacked >= TIPC_MIN_LINK_WIN)) 1227 if (unlikely(++l->rcv_unacked >= TIPC_MIN_LINK_WIN))
1218 tipc_link_build_ack_msg(l, xmitq); 1228 rc |= tipc_link_build_ack_msg(l, xmitq);
1219 1229 if (unlikely(rc & ~TIPC_LINK_SND_BC_ACK))
1230 break;
1220 } while ((skb = __skb_dequeue(defq))); 1231 } while ((skb = __skb_dequeue(defq)));
1221 1232
1222 return rc; 1233 return rc;
@@ -1284,18 +1295,13 @@ void tipc_link_proto_xmit(struct tipc_link *l, u32 msg_typ, int probe_msg,
1284 kfree_skb(skb); 1295 kfree_skb(skb);
1285} 1296}
1286 1297
1287/* tipc_link_build_proto_msg: prepare link protocol message for transmission
1288 */
1289static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe, 1298static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe,
1290 u16 rcvgap, int tolerance, int priority, 1299 u16 rcvgap, int tolerance, int priority,
1291 struct sk_buff_head *xmitq) 1300 struct sk_buff_head *xmitq)
1292{ 1301{
1293 struct sk_buff *skb = NULL; 1302 struct sk_buff *skb = NULL;
1294 struct tipc_msg *hdr = l->pmsg; 1303 struct tipc_msg *hdr = l->pmsg;
1295 u16 snd_nxt = l->snd_nxt; 1304 bool node_up = link_is_up(l->bc_rcvlink);
1296 u16 rcv_nxt = l->rcv_nxt;
1297 u16 rcv_last = rcv_nxt - 1;
1298 int node_up = l->owner->bclink.recv_permitted;
1299 1305
1300 /* Don't send protocol message during reset or link failover */ 1306 /* Don't send protocol message during reset or link failover */
1301 if (tipc_link_is_blocked(l)) 1307 if (tipc_link_is_blocked(l))
@@ -1303,33 +1309,34 @@ static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe,
1303 1309
1304 msg_set_type(hdr, mtyp); 1310 msg_set_type(hdr, mtyp);
1305 msg_set_net_plane(hdr, l->net_plane); 1311 msg_set_net_plane(hdr, l->net_plane);
1306 msg_set_bcast_ack(hdr, l->owner->bclink.last_in); 1312 msg_set_next_sent(hdr, l->snd_nxt);
1307 msg_set_last_bcast(hdr, tipc_bclink_get_last_sent(l->owner->net)); 1313 msg_set_ack(hdr, l->rcv_nxt - 1);
1314 msg_set_bcast_ack(hdr, l->bc_rcvlink->rcv_nxt - 1);
1315 msg_set_last_bcast(hdr, l->bc_sndlink->snd_nxt - 1);
1308 msg_set_link_tolerance(hdr, tolerance); 1316 msg_set_link_tolerance(hdr, tolerance);
1309 msg_set_linkprio(hdr, priority); 1317 msg_set_linkprio(hdr, priority);
1310 msg_set_redundant_link(hdr, node_up); 1318 msg_set_redundant_link(hdr, node_up);
1311 msg_set_seq_gap(hdr, 0); 1319 msg_set_seq_gap(hdr, 0);
1312 1320
1313 /* Compatibility: created msg must not be in sequence with pkt flow */ 1321 /* Compatibility: created msg must not be in sequence with pkt flow */
1314 msg_set_seqno(hdr, snd_nxt + U16_MAX / 2); 1322 msg_set_seqno(hdr, l->snd_nxt + U16_MAX / 2);
1315 1323
1316 if (mtyp == STATE_MSG) { 1324 if (mtyp == STATE_MSG) {
1317 if (!tipc_link_is_up(l)) 1325 if (!tipc_link_is_up(l))
1318 return; 1326 return;
1319 msg_set_next_sent(hdr, snd_nxt);
1320 1327
1321 /* Override rcvgap if there are packets in deferred queue */ 1328 /* Override rcvgap if there are packets in deferred queue */
1322 if (!skb_queue_empty(&l->deferdq)) 1329 if (!skb_queue_empty(&l->deferdq))
1323 rcvgap = buf_seqno(skb_peek(&l->deferdq)) - rcv_nxt; 1330 rcvgap = buf_seqno(skb_peek(&l->deferdq)) - l->rcv_nxt;
1324 if (rcvgap) { 1331 if (rcvgap) {
1325 msg_set_seq_gap(hdr, rcvgap); 1332 msg_set_seq_gap(hdr, rcvgap);
1326 l->stats.sent_nacks++; 1333 l->stats.sent_nacks++;
1327 } 1334 }
1328 msg_set_ack(hdr, rcv_last);
1329 msg_set_probe(hdr, probe); 1335 msg_set_probe(hdr, probe);
1330 if (probe) 1336 if (probe)
1331 l->stats.sent_probes++; 1337 l->stats.sent_probes++;
1332 l->stats.sent_states++; 1338 l->stats.sent_states++;
1339 l->rcv_unacked = 0;
1333 } else { 1340 } else {
1334 /* RESET_MSG or ACTIVATE_MSG */ 1341 /* RESET_MSG or ACTIVATE_MSG */
1335 msg_set_max_pkt(hdr, l->advertised_mtu); 1342 msg_set_max_pkt(hdr, l->advertised_mtu);
@@ -1431,7 +1438,7 @@ static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb,
1431 char *if_name; 1438 char *if_name;
1432 int rc = 0; 1439 int rc = 0;
1433 1440
1434 if (tipc_link_is_blocked(l)) 1441 if (tipc_link_is_blocked(l) || !xmitq)
1435 goto exit; 1442 goto exit;
1436 1443
1437 if (link_own_addr(l) > msg_prevnode(hdr)) 1444 if (link_own_addr(l) > msg_prevnode(hdr))
@@ -1518,6 +1525,188 @@ exit:
1518 return rc; 1525 return rc;
1519} 1526}
1520 1527
1528/* tipc_link_build_bc_proto_msg() - create broadcast protocol message
1529 */
1530static bool tipc_link_build_bc_proto_msg(struct tipc_link *l, bool bcast,
1531 u16 peers_snd_nxt,
1532 struct sk_buff_head *xmitq)
1533{
1534 struct sk_buff *skb;
1535 struct tipc_msg *hdr;
1536 struct sk_buff *dfrd_skb = skb_peek(&l->deferdq);
1537 u16 ack = l->rcv_nxt - 1;
1538 u16 gap_to = peers_snd_nxt - 1;
1539
1540 skb = tipc_msg_create(BCAST_PROTOCOL, STATE_MSG, INT_H_SIZE,
1541 0, l->addr, link_own_addr(l), 0, 0, 0);
1542 if (!skb)
1543 return false;
1544 hdr = buf_msg(skb);
1545 msg_set_last_bcast(hdr, l->bc_sndlink->snd_nxt - 1);
1546 msg_set_bcast_ack(hdr, ack);
1547 msg_set_bcgap_after(hdr, ack);
1548 if (dfrd_skb)
1549 gap_to = buf_seqno(dfrd_skb) - 1;
1550 msg_set_bcgap_to(hdr, gap_to);
1551 msg_set_non_seq(hdr, bcast);
1552 __skb_queue_tail(xmitq, skb);
1553 return true;
1554}
1555
1556/* tipc_link_build_bc_init_msg() - synchronize broadcast link endpoints.
1557 *
1558 * Give a newly added peer node the sequence number where it should
1559 * start receiving and acking broadcast packets.
1560 */
1561void tipc_link_build_bc_init_msg(struct tipc_link *l,
1562 struct sk_buff_head *xmitq)
1563{
1564 struct sk_buff_head list;
1565
1566 __skb_queue_head_init(&list);
1567 if (!tipc_link_build_bc_proto_msg(l->bc_rcvlink, false, 0, &list))
1568 return;
1569 tipc_link_xmit(l, &list, xmitq);
1570}
1571
1572/* tipc_link_bc_init_rcv - receive initial broadcast synch data from peer
1573 */
1574void tipc_link_bc_init_rcv(struct tipc_link *l, struct tipc_msg *hdr)
1575{
1576 int mtyp = msg_type(hdr);
1577 u16 peers_snd_nxt = msg_bc_snd_nxt(hdr);
1578
1579 if (link_is_up(l))
1580 return;
1581
1582 if (msg_user(hdr) == BCAST_PROTOCOL) {
1583 l->rcv_nxt = peers_snd_nxt;
1584 l->state = LINK_ESTABLISHED;
1585 return;
1586 }
1587
1588 if (l->peer_caps & TIPC_BCAST_SYNCH)
1589 return;
1590
1591 if (msg_peer_node_is_up(hdr))
1592 return;
1593
1594 /* Compatibility: accept older, less safe initial synch data */
1595 if ((mtyp == RESET_MSG) || (mtyp == ACTIVATE_MSG))
1596 l->rcv_nxt = peers_snd_nxt;
1597}
1598
1599/* tipc_link_bc_sync_rcv - update rcv link according to peer's send state
1600 */
1601void tipc_link_bc_sync_rcv(struct tipc_link *l, struct tipc_msg *hdr,
1602 struct sk_buff_head *xmitq)
1603{
1604 u16 peers_snd_nxt = msg_bc_snd_nxt(hdr);
1605
1606 if (!link_is_up(l))
1607 return;
1608
1609 if (!msg_peer_node_is_up(hdr))
1610 return;
1611
1612 l->bc_peer_is_up = true;
1613
1614 /* Ignore if peers_snd_nxt goes beyond receive window */
1615 if (more(peers_snd_nxt, l->rcv_nxt + l->window))
1616 return;
1617
1618 if (!more(peers_snd_nxt, l->rcv_nxt)) {
1619 l->nack_state = BC_NACK_SND_CONDITIONAL;
1620 return;
1621 }
1622
1623 /* Don't NACK if one was recently sent or peeked */
1624 if (l->nack_state == BC_NACK_SND_SUPPRESS) {
1625 l->nack_state = BC_NACK_SND_UNCONDITIONAL;
1626 return;
1627 }
1628
1629 /* Conditionally delay NACK sending until next synch rcv */
1630 if (l->nack_state == BC_NACK_SND_CONDITIONAL) {
1631 l->nack_state = BC_NACK_SND_UNCONDITIONAL;
1632 if ((peers_snd_nxt - l->rcv_nxt) < TIPC_MIN_LINK_WIN)
1633 return;
1634 }
1635
1636 /* Send NACK now but suppress next one */
1637 tipc_link_build_bc_proto_msg(l, true, peers_snd_nxt, xmitq);
1638 l->nack_state = BC_NACK_SND_SUPPRESS;
1639}
1640
1641void tipc_link_bc_ack_rcv(struct tipc_link *l, u16 acked,
1642 struct sk_buff_head *xmitq)
1643{
1644 struct sk_buff *skb, *tmp;
1645 struct tipc_link *snd_l = l->bc_sndlink;
1646
1647 if (!link_is_up(l) || !l->bc_peer_is_up)
1648 return;
1649
1650 if (!more(acked, l->acked))
1651 return;
1652
1653 /* Skip over packets peer has already acked */
1654 skb_queue_walk(&snd_l->transmq, skb) {
1655 if (more(buf_seqno(skb), l->acked))
1656 break;
1657 }
1658
1659 /* Update/release the packets peer is acking now */
1660 skb_queue_walk_from_safe(&snd_l->transmq, skb, tmp) {
1661 if (more(buf_seqno(skb), acked))
1662 break;
1663 if (!--TIPC_SKB_CB(skb)->ackers) {
1664 __skb_unlink(skb, &snd_l->transmq);
1665 kfree_skb(skb);
1666 }
1667 }
1668 l->acked = acked;
1669 tipc_link_advance_backlog(snd_l, xmitq);
1670 if (unlikely(!skb_queue_empty(&snd_l->wakeupq)))
1671 link_prepare_wakeup(snd_l);
1672}
1673
1674/* tipc_link_bc_nack_rcv(): receive broadcast nack message
1675 */
1676int tipc_link_bc_nack_rcv(struct tipc_link *l, struct sk_buff *skb,
1677 struct sk_buff_head *xmitq)
1678{
1679 struct tipc_msg *hdr = buf_msg(skb);
1680 u32 dnode = msg_destnode(hdr);
1681 int mtyp = msg_type(hdr);
1682 u16 acked = msg_bcast_ack(hdr);
1683 u16 from = acked + 1;
1684 u16 to = msg_bcgap_to(hdr);
1685 u16 peers_snd_nxt = to + 1;
1686 int rc = 0;
1687
1688 kfree_skb(skb);
1689
1690 if (!tipc_link_is_up(l) || !l->bc_peer_is_up)
1691 return 0;
1692
1693 if (mtyp != STATE_MSG)
1694 return 0;
1695
1696 if (dnode == link_own_addr(l)) {
1697 tipc_link_bc_ack_rcv(l, acked, xmitq);
1698 rc = tipc_link_retrans(l->bc_sndlink, from, to, xmitq);
1699 l->stats.recv_nacks++;
1700 return rc;
1701 }
1702
1703 /* Msg for other node => suppress own NACK at next sync if applicable */
1704 if (more(peers_snd_nxt, l->rcv_nxt) && !less(l->rcv_nxt, from))
1705 l->nack_state = BC_NACK_SND_SUPPRESS;
1706
1707 return 0;
1708}
1709
1521void tipc_link_set_queue_limits(struct tipc_link *l, u32 win) 1710void tipc_link_set_queue_limits(struct tipc_link *l, u32 win)
1522{ 1711{
1523 int max_bulk = TIPC_MAX_PUBLICATIONS / (l->mtu / ITEM_SIZE); 1712 int max_bulk = TIPC_MAX_PUBLICATIONS / (l->mtu / ITEM_SIZE);