aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc/link.c
diff options
context:
space:
mode:
authorDavid S. Miller <davem@davemloft.net>2015-10-24 09:56:54 -0400
committerDavid S. Miller <davem@davemloft.net>2015-10-24 09:56:54 -0400
commit687f079addba1ac7f97ce97080c2291bbe8c8dce (patch)
tree5da9c2e91de35b9111a3badb947416deba5083d8 /net/tipc/link.c
parentba3e2084f268bdfed7627046e58a2218037e15af (diff)
parent2af5ae372a4b6d6e2d3314af0e9c865d6d64f8d3 (diff)
Merge branch 'tipc-next'
Jon Maloy says: ==================== tipc: improve broadcast implementation The TIPC broadcast link implementation is currently complex and hard to follow. It also incurs some amount of code and structure duplication, something that can be reduced significantly with a little effort. This commit series introduces a number of improvements which address both the locking structure, the code/structure duplication issue, and the overall readbility of the code. The series consists of three main parts: 1-7: Adaptation to the new link structure, and preparation for the next step. In particular, we want the broadcast transmission link to have a life cycle that is longer than any of its potential (unicast and broadcast receive links) users. This eliminates the need to always test for the presence of this link before accessing it. 8-10: This is what is really new in this series. Commit #9 is by far the largest and most important one, because it moves most of the broadcast functionality into link.c, partially reusing the fields and functionality of the unicast link. The removal of the "node_map" infrastructure in commit #10 is also an important achievement. 11-16: Some improvements leveraging the changes made in the previous commits. The series needs commit 53387c4e22ac ("tipc: extend broadcast link window size") and commit e53567948f82 ("tipc: conditionally expand buffer headroom over udp tunnel") which are both present in 'net' but not yet in 'net-next', to apply cleanly. ==================== Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net/tipc/link.c')
-rw-r--r--net/tipc/link.c775
1 files changed, 419 insertions, 356 deletions
diff --git a/net/tipc/link.c b/net/tipc/link.c
index ff9b0b92e62e..4449fa01e232 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -50,6 +50,7 @@
50 */ 50 */
51static const char *link_co_err = "Link tunneling error, "; 51static const char *link_co_err = "Link tunneling error, ";
52static const char *link_rst_msg = "Resetting link "; 52static const char *link_rst_msg = "Resetting link ";
53static const char tipc_bclink_name[] = "broadcast-link";
53 54
54static const struct nla_policy tipc_nl_link_policy[TIPC_NLA_LINK_MAX + 1] = { 55static const struct nla_policy tipc_nl_link_policy[TIPC_NLA_LINK_MAX + 1] = {
55 [TIPC_NLA_LINK_UNSPEC] = { .type = NLA_UNSPEC }, 56 [TIPC_NLA_LINK_UNSPEC] = { .type = NLA_UNSPEC },
@@ -75,6 +76,14 @@ static const struct nla_policy tipc_nl_prop_policy[TIPC_NLA_PROP_MAX + 1] = {
75 [TIPC_NLA_PROP_WIN] = { .type = NLA_U32 } 76 [TIPC_NLA_PROP_WIN] = { .type = NLA_U32 }
76}; 77};
77 78
79/* Send states for broadcast NACKs
80 */
81enum {
82 BC_NACK_SND_CONDITIONAL,
83 BC_NACK_SND_UNCONDITIONAL,
84 BC_NACK_SND_SUPPRESS,
85};
86
78/* 87/*
79 * Interval between NACKs when packets arrive out of order 88 * Interval between NACKs when packets arrive out of order
80 */ 89 */
@@ -110,7 +119,11 @@ static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe,
110 struct sk_buff_head *xmitq); 119 struct sk_buff_head *xmitq);
111static void link_reset_statistics(struct tipc_link *l_ptr); 120static void link_reset_statistics(struct tipc_link *l_ptr);
112static void link_print(struct tipc_link *l_ptr, const char *str); 121static void link_print(struct tipc_link *l_ptr, const char *str);
113static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf); 122static void tipc_link_build_nack_msg(struct tipc_link *l,
123 struct sk_buff_head *xmitq);
124static void tipc_link_build_bc_init_msg(struct tipc_link *l,
125 struct sk_buff_head *xmitq);
126static bool tipc_link_release_pkts(struct tipc_link *l, u16 to);
114 127
115/* 128/*
116 * Simple non-static link routines (i.e. referenced outside this file) 129 * Simple non-static link routines (i.e. referenced outside this file)
@@ -150,11 +163,66 @@ bool tipc_link_is_blocked(struct tipc_link *l)
150 return l->state & (LINK_RESETTING | LINK_PEER_RESET | LINK_FAILINGOVER); 163 return l->state & (LINK_RESETTING | LINK_PEER_RESET | LINK_FAILINGOVER);
151} 164}
152 165
166bool link_is_bc_sndlink(struct tipc_link *l)
167{
168 return !l->bc_sndlink;
169}
170
171bool link_is_bc_rcvlink(struct tipc_link *l)
172{
173 return ((l->bc_rcvlink == l) && !link_is_bc_sndlink(l));
174}
175
153int tipc_link_is_active(struct tipc_link *l) 176int tipc_link_is_active(struct tipc_link *l)
154{ 177{
155 struct tipc_node *n = l->owner; 178 return l->active;
179}
180
181void tipc_link_set_active(struct tipc_link *l, bool active)
182{
183 l->active = active;
184}
185
186void tipc_link_add_bc_peer(struct tipc_link *snd_l,
187 struct tipc_link *uc_l,
188 struct sk_buff_head *xmitq)
189{
190 struct tipc_link *rcv_l = uc_l->bc_rcvlink;
156 191
157 return (node_active_link(n, 0) == l) || (node_active_link(n, 1) == l); 192 snd_l->ackers++;
193 rcv_l->acked = snd_l->snd_nxt - 1;
194 tipc_link_build_bc_init_msg(uc_l, xmitq);
195}
196
197void tipc_link_remove_bc_peer(struct tipc_link *snd_l,
198 struct tipc_link *rcv_l,
199 struct sk_buff_head *xmitq)
200{
201 u16 ack = snd_l->snd_nxt - 1;
202
203 snd_l->ackers--;
204 tipc_link_bc_ack_rcv(rcv_l, ack, xmitq);
205 tipc_link_reset(rcv_l);
206 rcv_l->state = LINK_RESET;
207 if (!snd_l->ackers) {
208 tipc_link_reset(snd_l);
209 __skb_queue_purge(xmitq);
210 }
211}
212
213int tipc_link_bc_peers(struct tipc_link *l)
214{
215 return l->ackers;
216}
217
218void tipc_link_set_mtu(struct tipc_link *l, int mtu)
219{
220 l->mtu = mtu;
221}
222
223int tipc_link_mtu(struct tipc_link *l)
224{
225 return l->mtu;
158} 226}
159 227
160static u32 link_own_addr(struct tipc_link *l) 228static u32 link_own_addr(struct tipc_link *l)
@@ -165,57 +233,72 @@ static u32 link_own_addr(struct tipc_link *l)
165/** 233/**
166 * tipc_link_create - create a new link 234 * tipc_link_create - create a new link
167 * @n: pointer to associated node 235 * @n: pointer to associated node
168 * @b: pointer to associated bearer 236 * @if_name: associated interface name
237 * @bearer_id: id (index) of associated bearer
238 * @tolerance: link tolerance to be used by link
239 * @net_plane: network plane (A,B,c..) this link belongs to
240 * @mtu: mtu to be advertised by link
241 * @priority: priority to be used by link
242 * @window: send window to be used by link
243 * @session: session to be used by link
169 * @ownnode: identity of own node 244 * @ownnode: identity of own node
170 * @peer: identity of peer node 245 * @peer: node id of peer node
171 * @maddr: media address to be used 246 * @peer_caps: bitmap describing peer node capabilities
247 * @bc_sndlink: the namespace global link used for broadcast sending
248 * @bc_rcvlink: the peer specific link used for broadcast reception
172 * @inputq: queue to put messages ready for delivery 249 * @inputq: queue to put messages ready for delivery
173 * @namedq: queue to put binding table update messages ready for delivery 250 * @namedq: queue to put binding table update messages ready for delivery
174 * @link: return value, pointer to put the created link 251 * @link: return value, pointer to put the created link
175 * 252 *
176 * Returns true if link was created, otherwise false 253 * Returns true if link was created, otherwise false
177 */ 254 */
178bool tipc_link_create(struct tipc_node *n, struct tipc_bearer *b, u32 session, 255bool tipc_link_create(struct net *net, char *if_name, int bearer_id,
179 u32 ownnode, u32 peer, struct tipc_media_addr *maddr, 256 int tolerance, char net_plane, u32 mtu, int priority,
180 struct sk_buff_head *inputq, struct sk_buff_head *namedq, 257 int window, u32 session, u32 ownnode, u32 peer,
258 u16 peer_caps,
259 struct tipc_link *bc_sndlink,
260 struct tipc_link *bc_rcvlink,
261 struct sk_buff_head *inputq,
262 struct sk_buff_head *namedq,
181 struct tipc_link **link) 263 struct tipc_link **link)
182{ 264{
183 struct tipc_link *l; 265 struct tipc_link *l;
184 struct tipc_msg *hdr; 266 struct tipc_msg *hdr;
185 char *if_name;
186 267
187 l = kzalloc(sizeof(*l), GFP_ATOMIC); 268 l = kzalloc(sizeof(*l), GFP_ATOMIC);
188 if (!l) 269 if (!l)
189 return false; 270 return false;
190 *link = l; 271 *link = l;
272 l->pmsg = (struct tipc_msg *)&l->proto_msg;
273 hdr = l->pmsg;
274 tipc_msg_init(ownnode, hdr, LINK_PROTOCOL, RESET_MSG, INT_H_SIZE, peer);
275 msg_set_size(hdr, sizeof(l->proto_msg));
276 msg_set_session(hdr, session);
277 msg_set_bearer_id(hdr, l->bearer_id);
191 278
192 /* Note: peer i/f name is completed by reset/activate message */ 279 /* Note: peer i/f name is completed by reset/activate message */
193 if_name = strchr(b->name, ':') + 1;
194 sprintf(l->name, "%u.%u.%u:%s-%u.%u.%u:unknown", 280 sprintf(l->name, "%u.%u.%u:%s-%u.%u.%u:unknown",
195 tipc_zone(ownnode), tipc_cluster(ownnode), tipc_node(ownnode), 281 tipc_zone(ownnode), tipc_cluster(ownnode), tipc_node(ownnode),
196 if_name, tipc_zone(peer), tipc_cluster(peer), tipc_node(peer)); 282 if_name, tipc_zone(peer), tipc_cluster(peer), tipc_node(peer));
283 strcpy((char *)msg_data(hdr), if_name);
197 284
198 l->addr = peer; 285 l->addr = peer;
199 l->media_addr = maddr; 286 l->peer_caps = peer_caps;
200 l->owner = n; 287 l->net = net;
201 l->peer_session = WILDCARD_SESSION; 288 l->peer_session = WILDCARD_SESSION;
202 l->bearer_id = b->identity; 289 l->bearer_id = bearer_id;
203 l->tolerance = b->tolerance; 290 l->tolerance = tolerance;
204 l->net_plane = b->net_plane; 291 l->net_plane = net_plane;
205 l->advertised_mtu = b->mtu; 292 l->advertised_mtu = mtu;
206 l->mtu = b->mtu; 293 l->mtu = mtu;
207 l->priority = b->priority; 294 l->priority = priority;
208 tipc_link_set_queue_limits(l, b->window); 295 tipc_link_set_queue_limits(l, window);
296 l->ackers = 1;
297 l->bc_sndlink = bc_sndlink;
298 l->bc_rcvlink = bc_rcvlink;
209 l->inputq = inputq; 299 l->inputq = inputq;
210 l->namedq = namedq; 300 l->namedq = namedq;
211 l->state = LINK_RESETTING; 301 l->state = LINK_RESETTING;
212 l->pmsg = (struct tipc_msg *)&l->proto_msg;
213 hdr = l->pmsg;
214 tipc_msg_init(ownnode, hdr, LINK_PROTOCOL, RESET_MSG, INT_H_SIZE, peer);
215 msg_set_size(hdr, sizeof(l->proto_msg));
216 msg_set_session(hdr, session);
217 msg_set_bearer_id(hdr, l->bearer_id);
218 strcpy((char *)msg_data(hdr), if_name);
219 __skb_queue_head_init(&l->transmq); 302 __skb_queue_head_init(&l->transmq);
220 __skb_queue_head_init(&l->backlogq); 303 __skb_queue_head_init(&l->backlogq);
221 __skb_queue_head_init(&l->deferdq); 304 __skb_queue_head_init(&l->deferdq);
@@ -224,27 +307,43 @@ bool tipc_link_create(struct tipc_node *n, struct tipc_bearer *b, u32 session,
224 return true; 307 return true;
225} 308}
226 309
227/* tipc_link_build_bcast_sync_msg() - synchronize broadcast link endpoints. 310/**
311 * tipc_link_bc_create - create new link to be used for broadcast
312 * @n: pointer to associated node
313 * @mtu: mtu to be used
314 * @window: send window to be used
315 * @inputq: queue to put messages ready for delivery
316 * @namedq: queue to put binding table update messages ready for delivery
317 * @link: return value, pointer to put the created link
228 * 318 *
229 * Give a newly added peer node the sequence number where it should 319 * Returns true if link was created, otherwise false
230 * start receiving and acking broadcast packets.
231 */ 320 */
232void tipc_link_build_bcast_sync_msg(struct tipc_link *l, 321bool tipc_link_bc_create(struct net *net, u32 ownnode, u32 peer,
233 struct sk_buff_head *xmitq) 322 int mtu, int window, u16 peer_caps,
323 struct sk_buff_head *inputq,
324 struct sk_buff_head *namedq,
325 struct tipc_link *bc_sndlink,
326 struct tipc_link **link)
234{ 327{
235 struct sk_buff *skb; 328 struct tipc_link *l;
236 struct sk_buff_head list;
237 u16 last_sent;
238 329
239 skb = tipc_msg_create(BCAST_PROTOCOL, STATE_MSG, INT_H_SIZE, 330 if (!tipc_link_create(net, "", MAX_BEARERS, 0, 'Z', mtu, 0, window,
240 0, l->addr, link_own_addr(l), 0, 0, 0); 331 0, ownnode, peer, peer_caps, bc_sndlink,
241 if (!skb) 332 NULL, inputq, namedq, link))
242 return; 333 return false;
243 last_sent = tipc_bclink_get_last_sent(l->owner->net); 334
244 msg_set_last_bcast(buf_msg(skb), last_sent); 335 l = *link;
245 __skb_queue_head_init(&list); 336 strcpy(l->name, tipc_bclink_name);
246 __skb_queue_tail(&list, skb); 337 tipc_link_reset(l);
247 tipc_link_xmit(l, &list, xmitq); 338 l->state = LINK_RESET;
339 l->ackers = 0;
340 l->bc_rcvlink = l;
341
342 /* Broadcast send link is always up */
343 if (link_is_bc_sndlink(l))
344 l->state = LINK_ESTABLISHED;
345
346 return true;
248} 347}
249 348
250/** 349/**
@@ -451,12 +550,17 @@ static void link_profile_stats(struct tipc_link *l)
451 550
452/* tipc_link_timeout - perform periodic task as instructed from node timeout 551/* tipc_link_timeout - perform periodic task as instructed from node timeout
453 */ 552 */
553/* tipc_link_timeout - perform periodic task as instructed from node timeout
554 */
454int tipc_link_timeout(struct tipc_link *l, struct sk_buff_head *xmitq) 555int tipc_link_timeout(struct tipc_link *l, struct sk_buff_head *xmitq)
455{ 556{
456 int rc = 0; 557 int rc = 0;
457 int mtyp = STATE_MSG; 558 int mtyp = STATE_MSG;
458 bool xmit = false; 559 bool xmit = false;
459 bool prb = false; 560 bool prb = false;
561 u16 bc_snt = l->bc_sndlink->snd_nxt - 1;
562 u16 bc_acked = l->bc_rcvlink->acked;
563 bool bc_up = link_is_up(l->bc_rcvlink);
460 564
461 link_profile_stats(l); 565 link_profile_stats(l);
462 566
@@ -464,7 +568,7 @@ int tipc_link_timeout(struct tipc_link *l, struct sk_buff_head *xmitq)
464 case LINK_ESTABLISHED: 568 case LINK_ESTABLISHED:
465 case LINK_SYNCHING: 569 case LINK_SYNCHING:
466 if (!l->silent_intv_cnt) { 570 if (!l->silent_intv_cnt) {
467 if (tipc_bclink_acks_missing(l->owner)) 571 if (bc_up && (bc_acked != bc_snt))
468 xmit = true; 572 xmit = true;
469 } else if (l->silent_intv_cnt <= l->abort_limit) { 573 } else if (l->silent_intv_cnt <= l->abort_limit) {
470 xmit = true; 574 xmit = true;
@@ -555,38 +659,6 @@ void link_prepare_wakeup(struct tipc_link *l)
555 } 659 }
556} 660}
557 661
558/**
559 * tipc_link_reset_fragments - purge link's inbound message fragments queue
560 * @l_ptr: pointer to link
561 */
562void tipc_link_reset_fragments(struct tipc_link *l_ptr)
563{
564 kfree_skb(l_ptr->reasm_buf);
565 l_ptr->reasm_buf = NULL;
566}
567
568void tipc_link_purge_backlog(struct tipc_link *l)
569{
570 __skb_queue_purge(&l->backlogq);
571 l->backlog[TIPC_LOW_IMPORTANCE].len = 0;
572 l->backlog[TIPC_MEDIUM_IMPORTANCE].len = 0;
573 l->backlog[TIPC_HIGH_IMPORTANCE].len = 0;
574 l->backlog[TIPC_CRITICAL_IMPORTANCE].len = 0;
575 l->backlog[TIPC_SYSTEM_IMPORTANCE].len = 0;
576}
577
578/**
579 * tipc_link_purge_queues - purge all pkt queues associated with link
580 * @l_ptr: pointer to link
581 */
582void tipc_link_purge_queues(struct tipc_link *l_ptr)
583{
584 __skb_queue_purge(&l_ptr->deferdq);
585 __skb_queue_purge(&l_ptr->transmq);
586 tipc_link_purge_backlog(l_ptr);
587 tipc_link_reset_fragments(l_ptr);
588}
589
590void tipc_link_reset(struct tipc_link *l) 662void tipc_link_reset(struct tipc_link *l)
591{ 663{
592 /* Link is down, accept any session */ 664 /* Link is down, accept any session */
@@ -598,12 +670,16 @@ void tipc_link_reset(struct tipc_link *l)
598 /* Prepare for renewed mtu size negotiation */ 670 /* Prepare for renewed mtu size negotiation */
599 l->mtu = l->advertised_mtu; 671 l->mtu = l->advertised_mtu;
600 672
601 /* Clean up all queues: */ 673 /* Clean up all queues and counters: */
602 __skb_queue_purge(&l->transmq); 674 __skb_queue_purge(&l->transmq);
603 __skb_queue_purge(&l->deferdq); 675 __skb_queue_purge(&l->deferdq);
604 skb_queue_splice_init(&l->wakeupq, l->inputq); 676 skb_queue_splice_init(&l->wakeupq, l->inputq);
605 677 __skb_queue_purge(&l->backlogq);
606 tipc_link_purge_backlog(l); 678 l->backlog[TIPC_LOW_IMPORTANCE].len = 0;
679 l->backlog[TIPC_MEDIUM_IMPORTANCE].len = 0;
680 l->backlog[TIPC_HIGH_IMPORTANCE].len = 0;
681 l->backlog[TIPC_CRITICAL_IMPORTANCE].len = 0;
682 l->backlog[TIPC_SYSTEM_IMPORTANCE].len = 0;
607 kfree_skb(l->reasm_buf); 683 kfree_skb(l->reasm_buf);
608 kfree_skb(l->failover_reasm_skb); 684 kfree_skb(l->failover_reasm_skb);
609 l->reasm_buf = NULL; 685 l->reasm_buf = NULL;
@@ -611,81 +687,15 @@ void tipc_link_reset(struct tipc_link *l)
611 l->rcv_unacked = 0; 687 l->rcv_unacked = 0;
612 l->snd_nxt = 1; 688 l->snd_nxt = 1;
613 l->rcv_nxt = 1; 689 l->rcv_nxt = 1;
690 l->acked = 0;
614 l->silent_intv_cnt = 0; 691 l->silent_intv_cnt = 0;
615 l->stats.recv_info = 0; 692 l->stats.recv_info = 0;
616 l->stale_count = 0; 693 l->stale_count = 0;
694 l->bc_peer_is_up = false;
617 link_reset_statistics(l); 695 link_reset_statistics(l);
618} 696}
619 697
620/** 698/**
621 * __tipc_link_xmit(): same as tipc_link_xmit, but destlink is known & locked
622 * @link: link to use
623 * @list: chain of buffers containing message
624 *
625 * Consumes the buffer chain, except when returning an error code,
626 * Returns 0 if success, or errno: -ELINKCONG, -EMSGSIZE or -ENOBUFS
627 * Messages at TIPC_SYSTEM_IMPORTANCE are always accepted
628 */
629int __tipc_link_xmit(struct net *net, struct tipc_link *link,
630 struct sk_buff_head *list)
631{
632 struct tipc_msg *msg = buf_msg(skb_peek(list));
633 unsigned int maxwin = link->window;
634 unsigned int i, imp = msg_importance(msg);
635 uint mtu = link->mtu;
636 u16 ack = mod(link->rcv_nxt - 1);
637 u16 seqno = link->snd_nxt;
638 u16 bc_last_in = link->owner->bclink.last_in;
639 struct tipc_media_addr *addr = link->media_addr;
640 struct sk_buff_head *transmq = &link->transmq;
641 struct sk_buff_head *backlogq = &link->backlogq;
642 struct sk_buff *skb, *bskb;
643
644 /* Match msg importance against this and all higher backlog limits: */
645 for (i = imp; i <= TIPC_SYSTEM_IMPORTANCE; i++) {
646 if (unlikely(link->backlog[i].len >= link->backlog[i].limit))
647 return link_schedule_user(link, list);
648 }
649 if (unlikely(msg_size(msg) > mtu))
650 return -EMSGSIZE;
651
652 /* Prepare each packet for sending, and add to relevant queue: */
653 while (skb_queue_len(list)) {
654 skb = skb_peek(list);
655 msg = buf_msg(skb);
656 msg_set_seqno(msg, seqno);
657 msg_set_ack(msg, ack);
658 msg_set_bcast_ack(msg, bc_last_in);
659
660 if (likely(skb_queue_len(transmq) < maxwin)) {
661 __skb_dequeue(list);
662 __skb_queue_tail(transmq, skb);
663 tipc_bearer_send(net, link->bearer_id, skb, addr);
664 link->rcv_unacked = 0;
665 seqno++;
666 continue;
667 }
668 if (tipc_msg_bundle(skb_peek_tail(backlogq), msg, mtu)) {
669 kfree_skb(__skb_dequeue(list));
670 link->stats.sent_bundled++;
671 continue;
672 }
673 if (tipc_msg_make_bundle(&bskb, msg, mtu, link->addr)) {
674 kfree_skb(__skb_dequeue(list));
675 __skb_queue_tail(backlogq, bskb);
676 link->backlog[msg_importance(buf_msg(bskb))].len++;
677 link->stats.sent_bundled++;
678 link->stats.sent_bundles++;
679 continue;
680 }
681 link->backlog[imp].len += skb_queue_len(list);
682 skb_queue_splice_tail_init(list, backlogq);
683 }
684 link->snd_nxt = seqno;
685 return 0;
686}
687
688/**
689 * tipc_link_xmit(): enqueue buffer list according to queue situation 699 * tipc_link_xmit(): enqueue buffer list according to queue situation
690 * @link: link to use 700 * @link: link to use
691 * @list: chain of buffers containing message 701 * @list: chain of buffers containing message
@@ -705,7 +715,7 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list,
705 unsigned int mtu = l->mtu; 715 unsigned int mtu = l->mtu;
706 u16 ack = l->rcv_nxt - 1; 716 u16 ack = l->rcv_nxt - 1;
707 u16 seqno = l->snd_nxt; 717 u16 seqno = l->snd_nxt;
708 u16 bc_last_in = l->owner->bclink.last_in; 718 u16 bc_ack = l->bc_rcvlink->rcv_nxt - 1;
709 struct sk_buff_head *transmq = &l->transmq; 719 struct sk_buff_head *transmq = &l->transmq;
710 struct sk_buff_head *backlogq = &l->backlogq; 720 struct sk_buff_head *backlogq = &l->backlogq;
711 struct sk_buff *skb, *_skb, *bskb; 721 struct sk_buff *skb, *_skb, *bskb;
@@ -724,7 +734,7 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list,
724 hdr = buf_msg(skb); 734 hdr = buf_msg(skb);
725 msg_set_seqno(hdr, seqno); 735 msg_set_seqno(hdr, seqno);
726 msg_set_ack(hdr, ack); 736 msg_set_ack(hdr, ack);
727 msg_set_bcast_ack(hdr, bc_last_in); 737 msg_set_bcast_ack(hdr, bc_ack);
728 738
729 if (likely(skb_queue_len(transmq) < maxwin)) { 739 if (likely(skb_queue_len(transmq) < maxwin)) {
730 _skb = skb_clone(skb, GFP_ATOMIC); 740 _skb = skb_clone(skb, GFP_ATOMIC);
@@ -733,6 +743,7 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list,
733 __skb_dequeue(list); 743 __skb_dequeue(list);
734 __skb_queue_tail(transmq, skb); 744 __skb_queue_tail(transmq, skb);
735 __skb_queue_tail(xmitq, _skb); 745 __skb_queue_tail(xmitq, _skb);
746 TIPC_SKB_CB(skb)->ackers = l->ackers;
736 l->rcv_unacked = 0; 747 l->rcv_unacked = 0;
737 seqno++; 748 seqno++;
738 continue; 749 continue;
@@ -757,62 +768,13 @@ int tipc_link_xmit(struct tipc_link *l, struct sk_buff_head *list,
757 return 0; 768 return 0;
758} 769}
759 770
760/*
761 * tipc_link_sync_rcv - synchronize broadcast link endpoints.
762 * Receive the sequence number where we should start receiving and
763 * acking broadcast packets from a newly added peer node, and open
764 * up for reception of such packets.
765 *
766 * Called with node locked
767 */
768static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf)
769{
770 struct tipc_msg *msg = buf_msg(buf);
771
772 n->bclink.last_sent = n->bclink.last_in = msg_last_bcast(msg);
773 n->bclink.recv_permitted = true;
774 kfree_skb(buf);
775}
776
777/*
778 * tipc_link_push_packets - push unsent packets to bearer
779 *
780 * Push out the unsent messages of a link where congestion
781 * has abated. Node is locked.
782 *
783 * Called with node locked
784 */
785void tipc_link_push_packets(struct tipc_link *link)
786{
787 struct sk_buff *skb;
788 struct tipc_msg *msg;
789 u16 seqno = link->snd_nxt;
790 u16 ack = mod(link->rcv_nxt - 1);
791
792 while (skb_queue_len(&link->transmq) < link->window) {
793 skb = __skb_dequeue(&link->backlogq);
794 if (!skb)
795 break;
796 msg = buf_msg(skb);
797 link->backlog[msg_importance(msg)].len--;
798 msg_set_ack(msg, ack);
799 msg_set_seqno(msg, seqno);
800 seqno = mod(seqno + 1);
801 msg_set_bcast_ack(msg, link->owner->bclink.last_in);
802 link->rcv_unacked = 0;
803 __skb_queue_tail(&link->transmq, skb);
804 tipc_bearer_send(link->owner->net, link->bearer_id,
805 skb, link->media_addr);
806 }
807 link->snd_nxt = seqno;
808}
809
810void tipc_link_advance_backlog(struct tipc_link *l, struct sk_buff_head *xmitq) 771void tipc_link_advance_backlog(struct tipc_link *l, struct sk_buff_head *xmitq)
811{ 772{
812 struct sk_buff *skb, *_skb; 773 struct sk_buff *skb, *_skb;
813 struct tipc_msg *hdr; 774 struct tipc_msg *hdr;
814 u16 seqno = l->snd_nxt; 775 u16 seqno = l->snd_nxt;
815 u16 ack = l->rcv_nxt - 1; 776 u16 ack = l->rcv_nxt - 1;
777 u16 bc_ack = l->bc_rcvlink->rcv_nxt - 1;
816 778
817 while (skb_queue_len(&l->transmq) < l->window) { 779 while (skb_queue_len(&l->transmq) < l->window) {
818 skb = skb_peek(&l->backlogq); 780 skb = skb_peek(&l->backlogq);
@@ -826,96 +788,35 @@ void tipc_link_advance_backlog(struct tipc_link *l, struct sk_buff_head *xmitq)
826 l->backlog[msg_importance(hdr)].len--; 788 l->backlog[msg_importance(hdr)].len--;
827 __skb_queue_tail(&l->transmq, skb); 789 __skb_queue_tail(&l->transmq, skb);
828 __skb_queue_tail(xmitq, _skb); 790 __skb_queue_tail(xmitq, _skb);
829 msg_set_ack(hdr, ack); 791 TIPC_SKB_CB(skb)->ackers = l->ackers;
830 msg_set_seqno(hdr, seqno); 792 msg_set_seqno(hdr, seqno);
831 msg_set_bcast_ack(hdr, l->owner->bclink.last_in); 793 msg_set_ack(hdr, ack);
794 msg_set_bcast_ack(hdr, bc_ack);
832 l->rcv_unacked = 0; 795 l->rcv_unacked = 0;
833 seqno++; 796 seqno++;
834 } 797 }
835 l->snd_nxt = seqno; 798 l->snd_nxt = seqno;
836} 799}
837 800
838static void link_retransmit_failure(struct tipc_link *l_ptr, 801static void link_retransmit_failure(struct tipc_link *l, struct sk_buff *skb)
839 struct sk_buff *buf)
840{
841 struct tipc_msg *msg = buf_msg(buf);
842 struct net *net = l_ptr->owner->net;
843
844 pr_warn("Retransmission failure on link <%s>\n", l_ptr->name);
845
846 if (l_ptr->addr) {
847 /* Handle failure on standard link */
848 link_print(l_ptr, "Resetting link ");
849 pr_info("Failed msg: usr %u, typ %u, len %u, err %u\n",
850 msg_user(msg), msg_type(msg), msg_size(msg),
851 msg_errcode(msg));
852 pr_info("sqno %u, prev: %x, src: %x\n",
853 msg_seqno(msg), msg_prevnode(msg), msg_orignode(msg));
854 } else {
855 /* Handle failure on broadcast link */
856 struct tipc_node *n_ptr;
857 char addr_string[16];
858
859 pr_info("Msg seq number: %u, ", msg_seqno(msg));
860 pr_cont("Outstanding acks: %lu\n",
861 (unsigned long) TIPC_SKB_CB(buf)->handle);
862
863 n_ptr = tipc_bclink_retransmit_to(net);
864
865 tipc_addr_string_fill(addr_string, n_ptr->addr);
866 pr_info("Broadcast link info for %s\n", addr_string);
867 pr_info("Reception permitted: %d, Acked: %u\n",
868 n_ptr->bclink.recv_permitted,
869 n_ptr->bclink.acked);
870 pr_info("Last in: %u, Oos state: %u, Last sent: %u\n",
871 n_ptr->bclink.last_in,
872 n_ptr->bclink.oos_state,
873 n_ptr->bclink.last_sent);
874
875 n_ptr->action_flags |= TIPC_BCAST_RESET;
876 l_ptr->stale_count = 0;
877 }
878}
879
880void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *skb,
881 u32 retransmits)
882{ 802{
883 struct tipc_msg *msg; 803 struct tipc_msg *hdr = buf_msg(skb);
884
885 if (!skb)
886 return;
887
888 msg = buf_msg(skb);
889
890 /* Detect repeated retransmit failures */
891 if (l_ptr->last_retransm == msg_seqno(msg)) {
892 if (++l_ptr->stale_count > 100) {
893 link_retransmit_failure(l_ptr, skb);
894 return;
895 }
896 } else {
897 l_ptr->last_retransm = msg_seqno(msg);
898 l_ptr->stale_count = 1;
899 }
900 804
901 skb_queue_walk_from(&l_ptr->transmq, skb) { 805 pr_warn("Retransmission failure on link <%s>\n", l->name);
902 if (!retransmits) 806 link_print(l, "Resetting link ");
903 break; 807 pr_info("Failed msg: usr %u, typ %u, len %u, err %u\n",
904 msg = buf_msg(skb); 808 msg_user(hdr), msg_type(hdr), msg_size(hdr), msg_errcode(hdr));
905 msg_set_ack(msg, mod(l_ptr->rcv_nxt - 1)); 809 pr_info("sqno %u, prev: %x, src: %x\n",
906 msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in); 810 msg_seqno(hdr), msg_prevnode(hdr), msg_orignode(hdr));
907 tipc_bearer_send(l_ptr->owner->net, l_ptr->bearer_id, skb,
908 l_ptr->media_addr);
909 retransmits--;
910 l_ptr->stats.retransmitted++;
911 }
912} 811}
913 812
914static int tipc_link_retransm(struct tipc_link *l, int retransm, 813int tipc_link_retrans(struct tipc_link *l, u16 from, u16 to,
915 struct sk_buff_head *xmitq) 814 struct sk_buff_head *xmitq)
916{ 815{
917 struct sk_buff *_skb, *skb = skb_peek(&l->transmq); 816 struct sk_buff *_skb, *skb = skb_peek(&l->transmq);
918 struct tipc_msg *hdr; 817 struct tipc_msg *hdr;
818 u16 ack = l->rcv_nxt - 1;
819 u16 bc_ack = l->bc_rcvlink->rcv_nxt - 1;
919 820
920 if (!skb) 821 if (!skb)
921 return 0; 822 return 0;
@@ -928,19 +829,25 @@ static int tipc_link_retransm(struct tipc_link *l, int retransm,
928 link_retransmit_failure(l, skb); 829 link_retransmit_failure(l, skb);
929 return tipc_link_fsm_evt(l, LINK_FAILURE_EVT); 830 return tipc_link_fsm_evt(l, LINK_FAILURE_EVT);
930 } 831 }
832
833 /* Move forward to where retransmission should start */
931 skb_queue_walk(&l->transmq, skb) { 834 skb_queue_walk(&l->transmq, skb) {
932 if (!retransm) 835 if (!less(buf_seqno(skb), from))
933 return 0; 836 break;
837 }
838
839 skb_queue_walk_from(&l->transmq, skb) {
840 if (more(buf_seqno(skb), to))
841 break;
934 hdr = buf_msg(skb); 842 hdr = buf_msg(skb);
935 _skb = __pskb_copy(skb, MIN_H_SIZE, GFP_ATOMIC); 843 _skb = __pskb_copy(skb, MIN_H_SIZE, GFP_ATOMIC);
936 if (!_skb) 844 if (!_skb)
937 return 0; 845 return 0;
938 hdr = buf_msg(_skb); 846 hdr = buf_msg(_skb);
939 msg_set_ack(hdr, l->rcv_nxt - 1); 847 msg_set_ack(hdr, ack);
940 msg_set_bcast_ack(hdr, l->owner->bclink.last_in); 848 msg_set_bcast_ack(hdr, bc_ack);
941 _skb->priority = TC_PRIO_CONTROL; 849 _skb->priority = TC_PRIO_CONTROL;
942 __skb_queue_tail(xmitq, _skb); 850 __skb_queue_tail(xmitq, _skb);
943 retransm--;
944 l->stats.retransmitted++; 851 l->stats.retransmitted++;
945 } 852 }
946 return 0; 853 return 0;
@@ -951,11 +858,9 @@ static int tipc_link_retransm(struct tipc_link *l, int retransm,
951 * Consumes buffer if message is of right type 858 * Consumes buffer if message is of right type
952 * Node lock must be held 859 * Node lock must be held
953 */ 860 */
954static bool tipc_data_input(struct tipc_link *link, struct sk_buff *skb, 861static bool tipc_data_input(struct tipc_link *l, struct sk_buff *skb,
955 struct sk_buff_head *inputq) 862 struct sk_buff_head *inputq)
956{ 863{
957 struct tipc_node *node = link->owner;
958
959 switch (msg_user(buf_msg(skb))) { 864 switch (msg_user(buf_msg(skb))) {
960 case TIPC_LOW_IMPORTANCE: 865 case TIPC_LOW_IMPORTANCE:
961 case TIPC_MEDIUM_IMPORTANCE: 866 case TIPC_MEDIUM_IMPORTANCE:
@@ -965,8 +870,8 @@ static bool tipc_data_input(struct tipc_link *link, struct sk_buff *skb,
965 skb_queue_tail(inputq, skb); 870 skb_queue_tail(inputq, skb);
966 return true; 871 return true;
967 case NAME_DISTRIBUTOR: 872 case NAME_DISTRIBUTOR:
968 node->bclink.recv_permitted = true; 873 l->bc_rcvlink->state = LINK_ESTABLISHED;
969 skb_queue_tail(link->namedq, skb); 874 skb_queue_tail(l->namedq, skb);
970 return true; 875 return true;
971 case MSG_BUNDLER: 876 case MSG_BUNDLER:
972 case TUNNEL_PROTOCOL: 877 case TUNNEL_PROTOCOL:
@@ -987,7 +892,6 @@ static bool tipc_data_input(struct tipc_link *link, struct sk_buff *skb,
987static int tipc_link_input(struct tipc_link *l, struct sk_buff *skb, 892static int tipc_link_input(struct tipc_link *l, struct sk_buff *skb,
988 struct sk_buff_head *inputq) 893 struct sk_buff_head *inputq)
989{ 894{
990 struct tipc_node *node = l->owner;
991 struct tipc_msg *hdr = buf_msg(skb); 895 struct tipc_msg *hdr = buf_msg(skb);
992 struct sk_buff **reasm_skb = &l->reasm_buf; 896 struct sk_buff **reasm_skb = &l->reasm_buf;
993 struct sk_buff *iskb; 897 struct sk_buff *iskb;
@@ -1028,13 +932,15 @@ static int tipc_link_input(struct tipc_link *l, struct sk_buff *skb,
1028 if (tipc_buf_append(reasm_skb, &skb)) { 932 if (tipc_buf_append(reasm_skb, &skb)) {
1029 l->stats.recv_fragmented++; 933 l->stats.recv_fragmented++;
1030 tipc_data_input(l, skb, inputq); 934 tipc_data_input(l, skb, inputq);
1031 } else if (!*reasm_skb) { 935 } else if (!*reasm_skb && !link_is_bc_rcvlink(l)) {
936 pr_warn_ratelimited("Unable to build fragment list\n");
1032 return tipc_link_fsm_evt(l, LINK_FAILURE_EVT); 937 return tipc_link_fsm_evt(l, LINK_FAILURE_EVT);
1033 } 938 }
1034 return 0; 939 return 0;
1035 } else if (usr == BCAST_PROTOCOL) { 940 } else if (usr == BCAST_PROTOCOL) {
1036 tipc_link_sync_rcv(node, skb); 941 tipc_bcast_lock(l->net);
1037 return 0; 942 tipc_link_bc_init_rcv(l->bc_rcvlink, hdr);
943 tipc_bcast_unlock(l->net);
1038 } 944 }
1039drop: 945drop:
1040 kfree_skb(skb); 946 kfree_skb(skb);
@@ -1057,12 +963,28 @@ static bool tipc_link_release_pkts(struct tipc_link *l, u16 acked)
1057} 963}
1058 964
1059/* tipc_link_build_ack_msg: prepare link acknowledge message for transmission 965/* tipc_link_build_ack_msg: prepare link acknowledge message for transmission
966 *
967 * Note that sending of broadcast ack is coordinated among nodes, to reduce
968 * risk of ack storms towards the sender
1060 */ 969 */
1061void tipc_link_build_ack_msg(struct tipc_link *l, struct sk_buff_head *xmitq) 970int tipc_link_build_ack_msg(struct tipc_link *l, struct sk_buff_head *xmitq)
1062{ 971{
972 if (!l)
973 return 0;
974
975 /* Broadcast ACK must be sent via a unicast link => defer to caller */
976 if (link_is_bc_rcvlink(l)) {
977 if (((l->rcv_nxt ^ link_own_addr(l)) & 0xf) != 0xf)
978 return 0;
979 l->rcv_unacked = 0;
980 return TIPC_LINK_SND_BC_ACK;
981 }
982
983 /* Unicast ACK */
1063 l->rcv_unacked = 0; 984 l->rcv_unacked = 0;
1064 l->stats.sent_acks++; 985 l->stats.sent_acks++;
1065 tipc_link_build_proto_msg(l, STATE_MSG, 0, 0, 0, 0, xmitq); 986 tipc_link_build_proto_msg(l, STATE_MSG, 0, 0, 0, 0, xmitq);
987 return 0;
1066} 988}
1067 989
1068/* tipc_link_build_reset_msg: prepare link RESET or ACTIVATE message 990/* tipc_link_build_reset_msg: prepare link RESET or ACTIVATE message
@@ -1084,6 +1006,9 @@ static void tipc_link_build_nack_msg(struct tipc_link *l,
1084{ 1006{
1085 u32 def_cnt = ++l->stats.deferred_recv; 1007 u32 def_cnt = ++l->stats.deferred_recv;
1086 1008
1009 if (link_is_bc_rcvlink(l))
1010 return;
1011
1087 if ((skb_queue_len(&l->deferdq) == 1) || !(def_cnt % TIPC_NACK_INTV)) 1012 if ((skb_queue_len(&l->deferdq) == 1) || !(def_cnt % TIPC_NACK_INTV))
1088 tipc_link_build_proto_msg(l, STATE_MSG, 0, 0, 0, 0, xmitq); 1013 tipc_link_build_proto_msg(l, STATE_MSG, 0, 0, 0, 0, xmitq);
1089} 1014}
@@ -1144,12 +1069,11 @@ int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb,
1144 l->rcv_nxt++; 1069 l->rcv_nxt++;
1145 l->stats.recv_info++; 1070 l->stats.recv_info++;
1146 if (!tipc_data_input(l, skb, l->inputq)) 1071 if (!tipc_data_input(l, skb, l->inputq))
1147 rc = tipc_link_input(l, skb, l->inputq); 1072 rc |= tipc_link_input(l, skb, l->inputq);
1148 if (unlikely(rc))
1149 break;
1150 if (unlikely(++l->rcv_unacked >= TIPC_MIN_LINK_WIN)) 1073 if (unlikely(++l->rcv_unacked >= TIPC_MIN_LINK_WIN))
1151 tipc_link_build_ack_msg(l, xmitq); 1074 rc |= tipc_link_build_ack_msg(l, xmitq);
1152 1075 if (unlikely(rc & ~TIPC_LINK_SND_BC_ACK))
1076 break;
1153 } while ((skb = __skb_dequeue(defq))); 1077 } while ((skb = __skb_dequeue(defq)));
1154 1078
1155 return rc; 1079 return rc;
@@ -1158,45 +1082,6 @@ drop:
1158 return rc; 1082 return rc;
1159} 1083}
1160 1084
1161/**
1162 * tipc_link_defer_pkt - Add out-of-sequence message to deferred reception queue
1163 *
1164 * Returns increase in queue length (i.e. 0 or 1)
1165 */
1166u32 tipc_link_defer_pkt(struct sk_buff_head *list, struct sk_buff *skb)
1167{
1168 struct sk_buff *skb1;
1169 u16 seq_no = buf_seqno(skb);
1170
1171 /* Empty queue ? */
1172 if (skb_queue_empty(list)) {
1173 __skb_queue_tail(list, skb);
1174 return 1;
1175 }
1176
1177 /* Last ? */
1178 if (less(buf_seqno(skb_peek_tail(list)), seq_no)) {
1179 __skb_queue_tail(list, skb);
1180 return 1;
1181 }
1182
1183 /* Locate insertion point in queue, then insert; discard if duplicate */
1184 skb_queue_walk(list, skb1) {
1185 u16 curr_seqno = buf_seqno(skb1);
1186
1187 if (seq_no == curr_seqno) {
1188 kfree_skb(skb);
1189 return 0;
1190 }
1191
1192 if (less(seq_no, curr_seqno))
1193 break;
1194 }
1195
1196 __skb_queue_before(list, skb1, skb);
1197 return 1;
1198}
1199
1200/* 1085/*
1201 * Send protocol message to the other endpoint. 1086 * Send protocol message to the other endpoint.
1202 */ 1087 */
@@ -1212,23 +1097,17 @@ void tipc_link_proto_xmit(struct tipc_link *l, u32 msg_typ, int probe_msg,
1212 skb = __skb_dequeue(&xmitq); 1097 skb = __skb_dequeue(&xmitq);
1213 if (!skb) 1098 if (!skb)
1214 return; 1099 return;
1215 tipc_bearer_send(l->owner->net, l->bearer_id, skb, l->media_addr); 1100 tipc_bearer_xmit_skb(l->net, l->bearer_id, skb, l->media_addr);
1216 l->rcv_unacked = 0; 1101 l->rcv_unacked = 0;
1217 kfree_skb(skb);
1218} 1102}
1219 1103
1220/* tipc_link_build_proto_msg: prepare link protocol message for transmission
1221 */
1222static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe, 1104static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe,
1223 u16 rcvgap, int tolerance, int priority, 1105 u16 rcvgap, int tolerance, int priority,
1224 struct sk_buff_head *xmitq) 1106 struct sk_buff_head *xmitq)
1225{ 1107{
1226 struct sk_buff *skb = NULL; 1108 struct sk_buff *skb = NULL;
1227 struct tipc_msg *hdr = l->pmsg; 1109 struct tipc_msg *hdr = l->pmsg;
1228 u16 snd_nxt = l->snd_nxt; 1110 bool node_up = link_is_up(l->bc_rcvlink);
1229 u16 rcv_nxt = l->rcv_nxt;
1230 u16 rcv_last = rcv_nxt - 1;
1231 int node_up = l->owner->bclink.recv_permitted;
1232 1111
1233 /* Don't send protocol message during reset or link failover */ 1112 /* Don't send protocol message during reset or link failover */
1234 if (tipc_link_is_blocked(l)) 1113 if (tipc_link_is_blocked(l))
@@ -1236,33 +1115,34 @@ static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe,
1236 1115
1237 msg_set_type(hdr, mtyp); 1116 msg_set_type(hdr, mtyp);
1238 msg_set_net_plane(hdr, l->net_plane); 1117 msg_set_net_plane(hdr, l->net_plane);
1239 msg_set_bcast_ack(hdr, l->owner->bclink.last_in); 1118 msg_set_next_sent(hdr, l->snd_nxt);
1240 msg_set_last_bcast(hdr, tipc_bclink_get_last_sent(l->owner->net)); 1119 msg_set_ack(hdr, l->rcv_nxt - 1);
1120 msg_set_bcast_ack(hdr, l->bc_rcvlink->rcv_nxt - 1);
1121 msg_set_last_bcast(hdr, l->bc_sndlink->snd_nxt - 1);
1241 msg_set_link_tolerance(hdr, tolerance); 1122 msg_set_link_tolerance(hdr, tolerance);
1242 msg_set_linkprio(hdr, priority); 1123 msg_set_linkprio(hdr, priority);
1243 msg_set_redundant_link(hdr, node_up); 1124 msg_set_redundant_link(hdr, node_up);
1244 msg_set_seq_gap(hdr, 0); 1125 msg_set_seq_gap(hdr, 0);
1245 1126
1246 /* Compatibility: created msg must not be in sequence with pkt flow */ 1127 /* Compatibility: created msg must not be in sequence with pkt flow */
1247 msg_set_seqno(hdr, snd_nxt + U16_MAX / 2); 1128 msg_set_seqno(hdr, l->snd_nxt + U16_MAX / 2);
1248 1129
1249 if (mtyp == STATE_MSG) { 1130 if (mtyp == STATE_MSG) {
1250 if (!tipc_link_is_up(l)) 1131 if (!tipc_link_is_up(l))
1251 return; 1132 return;
1252 msg_set_next_sent(hdr, snd_nxt);
1253 1133
1254 /* Override rcvgap if there are packets in deferred queue */ 1134 /* Override rcvgap if there are packets in deferred queue */
1255 if (!skb_queue_empty(&l->deferdq)) 1135 if (!skb_queue_empty(&l->deferdq))
1256 rcvgap = buf_seqno(skb_peek(&l->deferdq)) - rcv_nxt; 1136 rcvgap = buf_seqno(skb_peek(&l->deferdq)) - l->rcv_nxt;
1257 if (rcvgap) { 1137 if (rcvgap) {
1258 msg_set_seq_gap(hdr, rcvgap); 1138 msg_set_seq_gap(hdr, rcvgap);
1259 l->stats.sent_nacks++; 1139 l->stats.sent_nacks++;
1260 } 1140 }
1261 msg_set_ack(hdr, rcv_last);
1262 msg_set_probe(hdr, probe); 1141 msg_set_probe(hdr, probe);
1263 if (probe) 1142 if (probe)
1264 l->stats.sent_probes++; 1143 l->stats.sent_probes++;
1265 l->stats.sent_states++; 1144 l->stats.sent_states++;
1145 l->rcv_unacked = 0;
1266 } else { 1146 } else {
1267 /* RESET_MSG or ACTIVATE_MSG */ 1147 /* RESET_MSG or ACTIVATE_MSG */
1268 msg_set_max_pkt(hdr, l->advertised_mtu); 1148 msg_set_max_pkt(hdr, l->advertised_mtu);
@@ -1354,7 +1234,8 @@ static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb,
1354{ 1234{
1355 struct tipc_msg *hdr = buf_msg(skb); 1235 struct tipc_msg *hdr = buf_msg(skb);
1356 u16 rcvgap = 0; 1236 u16 rcvgap = 0;
1357 u16 nacked_gap = msg_seq_gap(hdr); 1237 u16 ack = msg_ack(hdr);
1238 u16 gap = msg_seq_gap(hdr);
1358 u16 peers_snd_nxt = msg_next_sent(hdr); 1239 u16 peers_snd_nxt = msg_next_sent(hdr);
1359 u16 peers_tol = msg_link_tolerance(hdr); 1240 u16 peers_tol = msg_link_tolerance(hdr);
1360 u16 peers_prio = msg_linkprio(hdr); 1241 u16 peers_prio = msg_linkprio(hdr);
@@ -1363,7 +1244,7 @@ static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb,
1363 char *if_name; 1244 char *if_name;
1364 int rc = 0; 1245 int rc = 0;
1365 1246
1366 if (tipc_link_is_blocked(l)) 1247 if (tipc_link_is_blocked(l) || !xmitq)
1367 goto exit; 1248 goto exit;
1368 1249
1369 if (link_own_addr(l) > msg_prevnode(hdr)) 1250 if (link_own_addr(l) > msg_prevnode(hdr))
@@ -1433,11 +1314,11 @@ static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb,
1433 if (rcvgap || (msg_probe(hdr))) 1314 if (rcvgap || (msg_probe(hdr)))
1434 tipc_link_build_proto_msg(l, STATE_MSG, 0, rcvgap, 1315 tipc_link_build_proto_msg(l, STATE_MSG, 0, rcvgap,
1435 0, 0, xmitq); 1316 0, 0, xmitq);
1436 tipc_link_release_pkts(l, msg_ack(hdr)); 1317 tipc_link_release_pkts(l, ack);
1437 1318
1438 /* If NACK, retransmit will now start at right position */ 1319 /* If NACK, retransmit will now start at right position */
1439 if (nacked_gap) { 1320 if (gap) {
1440 rc = tipc_link_retransm(l, nacked_gap, xmitq); 1321 rc = tipc_link_retrans(l, ack + 1, ack + gap, xmitq);
1441 l->stats.recv_nacks++; 1322 l->stats.recv_nacks++;
1442 } 1323 }
1443 1324
@@ -1450,6 +1331,188 @@ exit:
1450 return rc; 1331 return rc;
1451} 1332}
1452 1333
1334/* tipc_link_build_bc_proto_msg() - create broadcast protocol message
1335 */
1336static bool tipc_link_build_bc_proto_msg(struct tipc_link *l, bool bcast,
1337 u16 peers_snd_nxt,
1338 struct sk_buff_head *xmitq)
1339{
1340 struct sk_buff *skb;
1341 struct tipc_msg *hdr;
1342 struct sk_buff *dfrd_skb = skb_peek(&l->deferdq);
1343 u16 ack = l->rcv_nxt - 1;
1344 u16 gap_to = peers_snd_nxt - 1;
1345
1346 skb = tipc_msg_create(BCAST_PROTOCOL, STATE_MSG, INT_H_SIZE,
1347 0, l->addr, link_own_addr(l), 0, 0, 0);
1348 if (!skb)
1349 return false;
1350 hdr = buf_msg(skb);
1351 msg_set_last_bcast(hdr, l->bc_sndlink->snd_nxt - 1);
1352 msg_set_bcast_ack(hdr, ack);
1353 msg_set_bcgap_after(hdr, ack);
1354 if (dfrd_skb)
1355 gap_to = buf_seqno(dfrd_skb) - 1;
1356 msg_set_bcgap_to(hdr, gap_to);
1357 msg_set_non_seq(hdr, bcast);
1358 __skb_queue_tail(xmitq, skb);
1359 return true;
1360}
1361
1362/* tipc_link_build_bc_init_msg() - synchronize broadcast link endpoints.
1363 *
1364 * Give a newly added peer node the sequence number where it should
1365 * start receiving and acking broadcast packets.
1366 */
1367void tipc_link_build_bc_init_msg(struct tipc_link *l,
1368 struct sk_buff_head *xmitq)
1369{
1370 struct sk_buff_head list;
1371
1372 __skb_queue_head_init(&list);
1373 if (!tipc_link_build_bc_proto_msg(l->bc_rcvlink, false, 0, &list))
1374 return;
1375 tipc_link_xmit(l, &list, xmitq);
1376}
1377
1378/* tipc_link_bc_init_rcv - receive initial broadcast synch data from peer
1379 */
1380void tipc_link_bc_init_rcv(struct tipc_link *l, struct tipc_msg *hdr)
1381{
1382 int mtyp = msg_type(hdr);
1383 u16 peers_snd_nxt = msg_bc_snd_nxt(hdr);
1384
1385 if (link_is_up(l))
1386 return;
1387
1388 if (msg_user(hdr) == BCAST_PROTOCOL) {
1389 l->rcv_nxt = peers_snd_nxt;
1390 l->state = LINK_ESTABLISHED;
1391 return;
1392 }
1393
1394 if (l->peer_caps & TIPC_BCAST_SYNCH)
1395 return;
1396
1397 if (msg_peer_node_is_up(hdr))
1398 return;
1399
1400 /* Compatibility: accept older, less safe initial synch data */
1401 if ((mtyp == RESET_MSG) || (mtyp == ACTIVATE_MSG))
1402 l->rcv_nxt = peers_snd_nxt;
1403}
1404
1405/* tipc_link_bc_sync_rcv - update rcv link according to peer's send state
1406 */
1407void tipc_link_bc_sync_rcv(struct tipc_link *l, struct tipc_msg *hdr,
1408 struct sk_buff_head *xmitq)
1409{
1410 u16 peers_snd_nxt = msg_bc_snd_nxt(hdr);
1411
1412 if (!link_is_up(l))
1413 return;
1414
1415 if (!msg_peer_node_is_up(hdr))
1416 return;
1417
1418 l->bc_peer_is_up = true;
1419
1420 /* Ignore if peers_snd_nxt goes beyond receive window */
1421 if (more(peers_snd_nxt, l->rcv_nxt + l->window))
1422 return;
1423
1424 if (!more(peers_snd_nxt, l->rcv_nxt)) {
1425 l->nack_state = BC_NACK_SND_CONDITIONAL;
1426 return;
1427 }
1428
1429 /* Don't NACK if one was recently sent or peeked */
1430 if (l->nack_state == BC_NACK_SND_SUPPRESS) {
1431 l->nack_state = BC_NACK_SND_UNCONDITIONAL;
1432 return;
1433 }
1434
1435 /* Conditionally delay NACK sending until next synch rcv */
1436 if (l->nack_state == BC_NACK_SND_CONDITIONAL) {
1437 l->nack_state = BC_NACK_SND_UNCONDITIONAL;
1438 if ((peers_snd_nxt - l->rcv_nxt) < TIPC_MIN_LINK_WIN)
1439 return;
1440 }
1441
1442 /* Send NACK now but suppress next one */
1443 tipc_link_build_bc_proto_msg(l, true, peers_snd_nxt, xmitq);
1444 l->nack_state = BC_NACK_SND_SUPPRESS;
1445}
1446
1447void tipc_link_bc_ack_rcv(struct tipc_link *l, u16 acked,
1448 struct sk_buff_head *xmitq)
1449{
1450 struct sk_buff *skb, *tmp;
1451 struct tipc_link *snd_l = l->bc_sndlink;
1452
1453 if (!link_is_up(l) || !l->bc_peer_is_up)
1454 return;
1455
1456 if (!more(acked, l->acked))
1457 return;
1458
1459 /* Skip over packets peer has already acked */
1460 skb_queue_walk(&snd_l->transmq, skb) {
1461 if (more(buf_seqno(skb), l->acked))
1462 break;
1463 }
1464
1465 /* Update/release the packets peer is acking now */
1466 skb_queue_walk_from_safe(&snd_l->transmq, skb, tmp) {
1467 if (more(buf_seqno(skb), acked))
1468 break;
1469 if (!--TIPC_SKB_CB(skb)->ackers) {
1470 __skb_unlink(skb, &snd_l->transmq);
1471 kfree_skb(skb);
1472 }
1473 }
1474 l->acked = acked;
1475 tipc_link_advance_backlog(snd_l, xmitq);
1476 if (unlikely(!skb_queue_empty(&snd_l->wakeupq)))
1477 link_prepare_wakeup(snd_l);
1478}
1479
1480/* tipc_link_bc_nack_rcv(): receive broadcast nack message
1481 */
1482int tipc_link_bc_nack_rcv(struct tipc_link *l, struct sk_buff *skb,
1483 struct sk_buff_head *xmitq)
1484{
1485 struct tipc_msg *hdr = buf_msg(skb);
1486 u32 dnode = msg_destnode(hdr);
1487 int mtyp = msg_type(hdr);
1488 u16 acked = msg_bcast_ack(hdr);
1489 u16 from = acked + 1;
1490 u16 to = msg_bcgap_to(hdr);
1491 u16 peers_snd_nxt = to + 1;
1492 int rc = 0;
1493
1494 kfree_skb(skb);
1495
1496 if (!tipc_link_is_up(l) || !l->bc_peer_is_up)
1497 return 0;
1498
1499 if (mtyp != STATE_MSG)
1500 return 0;
1501
1502 if (dnode == link_own_addr(l)) {
1503 tipc_link_bc_ack_rcv(l, acked, xmitq);
1504 rc = tipc_link_retrans(l->bc_sndlink, from, to, xmitq);
1505 l->stats.recv_nacks++;
1506 return rc;
1507 }
1508
1509 /* Msg for other node => suppress own NACK at next sync if applicable */
1510 if (more(peers_snd_nxt, l->rcv_nxt) && !less(l->rcv_nxt, from))
1511 l->nack_state = BC_NACK_SND_SUPPRESS;
1512
1513 return 0;
1514}
1515
1453void tipc_link_set_queue_limits(struct tipc_link *l, u32 win) 1516void tipc_link_set_queue_limits(struct tipc_link *l, u32 win)
1454{ 1517{
1455 int max_bulk = TIPC_MAX_PUBLICATIONS / (l->mtu / ITEM_SIZE); 1518 int max_bulk = TIPC_MAX_PUBLICATIONS / (l->mtu / ITEM_SIZE);
@@ -1514,7 +1577,7 @@ static void link_reset_statistics(struct tipc_link *l_ptr)
1514static void link_print(struct tipc_link *l, const char *str) 1577static void link_print(struct tipc_link *l, const char *str)
1515{ 1578{
1516 struct sk_buff *hskb = skb_peek(&l->transmq); 1579 struct sk_buff *hskb = skb_peek(&l->transmq);
1517 u16 head = hskb ? msg_seqno(buf_msg(hskb)) : l->snd_nxt; 1580 u16 head = hskb ? msg_seqno(buf_msg(hskb)) : l->snd_nxt - 1;
1518 u16 tail = l->snd_nxt - 1; 1581 u16 tail = l->snd_nxt - 1;
1519 1582
1520 pr_info("%s Link <%s> state %x\n", str, l->name, l->state); 1583 pr_info("%s Link <%s> state %x\n", str, l->name, l->state);
@@ -1738,7 +1801,7 @@ static int __tipc_nl_add_link(struct net *net, struct tipc_nl_msg *msg,
1738 if (tipc_link_is_up(link)) 1801 if (tipc_link_is_up(link))
1739 if (nla_put_flag(msg->skb, TIPC_NLA_LINK_UP)) 1802 if (nla_put_flag(msg->skb, TIPC_NLA_LINK_UP))
1740 goto attr_msg_full; 1803 goto attr_msg_full;
1741 if (tipc_link_is_active(link)) 1804 if (link->active)
1742 if (nla_put_flag(msg->skb, TIPC_NLA_LINK_ACTIVE)) 1805 if (nla_put_flag(msg->skb, TIPC_NLA_LINK_ACTIVE))
1743 goto attr_msg_full; 1806 goto attr_msg_full;
1744 1807