aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc
diff options
context:
space:
mode:
Diffstat (limited to 'net/tipc')
-rw-r--r--net/tipc/Kconfig13
-rw-r--r--net/tipc/bcast.c27
-rw-r--r--net/tipc/bearer.c110
-rw-r--r--net/tipc/bearer.h24
-rw-r--r--net/tipc/core.c5
-rw-r--r--net/tipc/discover.c2
-rw-r--r--net/tipc/link.c232
-rw-r--r--net/tipc/link.h4
-rw-r--r--net/tipc/name_distr.c2
-rw-r--r--net/tipc/node.c15
-rw-r--r--net/tipc/node.h6
-rw-r--r--net/tipc/port.c32
-rw-r--r--net/tipc/port.h6
-rw-r--r--net/tipc/socket.c411
-rw-r--r--net/tipc/subscr.c2
15 files changed, 410 insertions, 481 deletions
diff --git a/net/tipc/Kconfig b/net/tipc/Kconfig
index 585460180ffb..bc41bd31eadc 100644
--- a/net/tipc/Kconfig
+++ b/net/tipc/Kconfig
@@ -20,18 +20,9 @@ menuconfig TIPC
20 20
21 If in doubt, say N. 21 If in doubt, say N.
22 22
23if TIPC
24
25config TIPC_ADVANCED
26 bool "Advanced TIPC configuration"
27 default n
28 help
29 Saying Y here will open some advanced configuration for TIPC.
30 Most users do not need to bother; if unsure, just say N.
31
32config TIPC_PORTS 23config TIPC_PORTS
33 int "Maximum number of ports in a node" 24 int "Maximum number of ports in a node"
34 depends on TIPC_ADVANCED 25 depends on TIPC
35 range 127 65535 26 range 127 65535
36 default "8191" 27 default "8191"
37 help 28 help
@@ -40,5 +31,3 @@ config TIPC_PORTS
40 31
41 Setting this to a smaller value saves some memory, 32 Setting this to a smaller value saves some memory,
42 setting it to higher allows for more ports. 33 setting it to higher allows for more ports.
43
44endif # TIPC
diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index e4e6d8cd47e6..54f89f90ac33 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -347,7 +347,7 @@ static void bclink_peek_nack(struct tipc_msg *msg)
347 347
348 tipc_node_lock(n_ptr); 348 tipc_node_lock(n_ptr);
349 349
350 if (n_ptr->bclink.supported && 350 if (n_ptr->bclink.recv_permitted &&
351 (n_ptr->bclink.last_in != n_ptr->bclink.last_sent) && 351 (n_ptr->bclink.last_in != n_ptr->bclink.last_sent) &&
352 (n_ptr->bclink.last_in == msg_bcgap_after(msg))) 352 (n_ptr->bclink.last_in == msg_bcgap_after(msg)))
353 n_ptr->bclink.oos_state = 2; 353 n_ptr->bclink.oos_state = 2;
@@ -429,7 +429,7 @@ void tipc_bclink_recv_pkt(struct sk_buff *buf)
429 goto exit; 429 goto exit;
430 430
431 tipc_node_lock(node); 431 tipc_node_lock(node);
432 if (unlikely(!node->bclink.supported)) 432 if (unlikely(!node->bclink.recv_permitted))
433 goto unlock; 433 goto unlock;
434 434
435 /* Handle broadcast protocol message */ 435 /* Handle broadcast protocol message */
@@ -564,7 +564,7 @@ exit:
564 564
565u32 tipc_bclink_acks_missing(struct tipc_node *n_ptr) 565u32 tipc_bclink_acks_missing(struct tipc_node *n_ptr)
566{ 566{
567 return (n_ptr->bclink.supported && 567 return (n_ptr->bclink.recv_permitted &&
568 (tipc_bclink_get_last_sent() != n_ptr->bclink.acked)); 568 (tipc_bclink_get_last_sent() != n_ptr->bclink.acked));
569} 569}
570 570
@@ -619,16 +619,14 @@ static int tipc_bcbearer_send(struct sk_buff *buf,
619 if (bcbearer->remains_new.count == bcbearer->remains.count) 619 if (bcbearer->remains_new.count == bcbearer->remains.count)
620 continue; /* bearer pair doesn't add anything */ 620 continue; /* bearer pair doesn't add anything */
621 621
622 if (p->blocked || 622 if (!tipc_bearer_blocked(p))
623 p->media->send_msg(buf, p, &p->media->bcast_addr)) { 623 tipc_bearer_send(p, buf, &p->media->bcast_addr);
624 else if (s && !tipc_bearer_blocked(s))
624 /* unable to send on primary bearer */ 625 /* unable to send on primary bearer */
625 if (!s || s->blocked || 626 tipc_bearer_send(s, buf, &s->media->bcast_addr);
626 s->media->send_msg(buf, s, 627 else
627 &s->media->bcast_addr)) { 628 /* unable to send on either bearer */
628 /* unable to send on either bearer */ 629 continue;
629 continue;
630 }
631 }
632 630
633 if (s) { 631 if (s) {
634 bcbearer->bpairs[bp_index].primary = s; 632 bcbearer->bpairs[bp_index].primary = s;
@@ -731,8 +729,8 @@ int tipc_bclink_stats(char *buf, const u32 buf_size)
731 " TX naks:%u acks:%u dups:%u\n", 729 " TX naks:%u acks:%u dups:%u\n",
732 s->sent_nacks, s->sent_acks, s->retransmitted); 730 s->sent_nacks, s->sent_acks, s->retransmitted);
733 ret += tipc_snprintf(buf + ret, buf_size - ret, 731 ret += tipc_snprintf(buf + ret, buf_size - ret,
734 " Congestion bearer:%u link:%u Send queue max:%u avg:%u\n", 732 " Congestion link:%u Send queue max:%u avg:%u\n",
735 s->bearer_congs, s->link_congs, s->max_queue_sz, 733 s->link_congs, s->max_queue_sz,
736 s->queue_sz_counts ? 734 s->queue_sz_counts ?
737 (s->accu_queue_sz / s->queue_sz_counts) : 0); 735 (s->accu_queue_sz / s->queue_sz_counts) : 0);
738 736
@@ -766,7 +764,6 @@ int tipc_bclink_set_queue_limits(u32 limit)
766 764
767void tipc_bclink_init(void) 765void tipc_bclink_init(void)
768{ 766{
769 INIT_LIST_HEAD(&bcbearer->bearer.cong_links);
770 bcbearer->bearer.media = &bcbearer->media; 767 bcbearer->bearer.media = &bcbearer->media;
771 bcbearer->media.send_msg = tipc_bcbearer_send; 768 bcbearer->media.send_msg = tipc_bcbearer_send;
772 sprintf(bcbearer->media.name, "tipc-broadcast"); 769 sprintf(bcbearer->media.name, "tipc-broadcast");
diff --git a/net/tipc/bearer.c b/net/tipc/bearer.c
index 4ec5c80e8a7c..aa62f93a9127 100644
--- a/net/tipc/bearer.c
+++ b/net/tipc/bearer.c
@@ -279,116 +279,31 @@ void tipc_bearer_remove_dest(struct tipc_bearer *b_ptr, u32 dest)
279} 279}
280 280
281/* 281/*
282 * bearer_push(): Resolve bearer congestion. Force the waiting 282 * Interrupt enabling new requests after bearer blocking:
283 * links to push out their unsent packets, one packet per link
284 * per iteration, until all packets are gone or congestion reoccurs.
285 * 'tipc_net_lock' is read_locked when this function is called
286 * bearer.lock must be taken before calling
287 * Returns binary true(1) ore false(0)
288 */
289static int bearer_push(struct tipc_bearer *b_ptr)
290{
291 u32 res = 0;
292 struct tipc_link *ln, *tln;
293
294 if (b_ptr->blocked)
295 return 0;
296
297 while (!list_empty(&b_ptr->cong_links) && (res != PUSH_FAILED)) {
298 list_for_each_entry_safe(ln, tln, &b_ptr->cong_links, link_list) {
299 res = tipc_link_push_packet(ln);
300 if (res == PUSH_FAILED)
301 break;
302 if (res == PUSH_FINISHED)
303 list_move_tail(&ln->link_list, &b_ptr->links);
304 }
305 }
306 return list_empty(&b_ptr->cong_links);
307}
308
309void tipc_bearer_lock_push(struct tipc_bearer *b_ptr)
310{
311 spin_lock_bh(&b_ptr->lock);
312 bearer_push(b_ptr);
313 spin_unlock_bh(&b_ptr->lock);
314}
315
316
317/*
318 * Interrupt enabling new requests after bearer congestion or blocking:
319 * See bearer_send(). 283 * See bearer_send().
320 */ 284 */
321void tipc_continue(struct tipc_bearer *b_ptr) 285void tipc_continue(struct tipc_bearer *b)
322{ 286{
323 spin_lock_bh(&b_ptr->lock); 287 spin_lock_bh(&b->lock);
324 if (!list_empty(&b_ptr->cong_links)) 288 b->blocked = 0;
325 tipc_k_signal((Handler)tipc_bearer_lock_push, (unsigned long)b_ptr); 289 spin_unlock_bh(&b->lock);
326 b_ptr->blocked = 0;
327 spin_unlock_bh(&b_ptr->lock);
328} 290}
329 291
330/* 292/*
331 * Schedule link for sending of messages after the bearer 293 * tipc_bearer_blocked - determines if bearer is currently blocked
332 * has been deblocked by 'continue()'. This method is called
333 * when somebody tries to send a message via this link while
334 * the bearer is congested. 'tipc_net_lock' is in read_lock here
335 * bearer.lock is busy
336 */ 294 */
337static void tipc_bearer_schedule_unlocked(struct tipc_bearer *b_ptr, 295int tipc_bearer_blocked(struct tipc_bearer *b)
338 struct tipc_link *l_ptr)
339{ 296{
340 list_move_tail(&l_ptr->link_list, &b_ptr->cong_links); 297 int res;
341}
342
343/*
344 * Schedule link for sending of messages after the bearer
345 * has been deblocked by 'continue()'. This method is called
346 * when somebody tries to send a message via this link while
347 * the bearer is congested. 'tipc_net_lock' is in read_lock here,
348 * bearer.lock is free
349 */
350void tipc_bearer_schedule(struct tipc_bearer *b_ptr, struct tipc_link *l_ptr)
351{
352 spin_lock_bh(&b_ptr->lock);
353 tipc_bearer_schedule_unlocked(b_ptr, l_ptr);
354 spin_unlock_bh(&b_ptr->lock);
355}
356
357 298
358/* 299 spin_lock_bh(&b->lock);
359 * tipc_bearer_resolve_congestion(): Check if there is bearer congestion, 300 res = b->blocked;
360 * and if there is, try to resolve it before returning. 301 spin_unlock_bh(&b->lock);
361 * 'tipc_net_lock' is read_locked when this function is called
362 */
363int tipc_bearer_resolve_congestion(struct tipc_bearer *b_ptr,
364 struct tipc_link *l_ptr)
365{
366 int res = 1;
367 302
368 if (list_empty(&b_ptr->cong_links))
369 return 1;
370 spin_lock_bh(&b_ptr->lock);
371 if (!bearer_push(b_ptr)) {
372 tipc_bearer_schedule_unlocked(b_ptr, l_ptr);
373 res = 0;
374 }
375 spin_unlock_bh(&b_ptr->lock);
376 return res; 303 return res;
377} 304}
378 305
379/** 306/**
380 * tipc_bearer_congested - determines if bearer is currently congested
381 */
382int tipc_bearer_congested(struct tipc_bearer *b_ptr, struct tipc_link *l_ptr)
383{
384 if (unlikely(b_ptr->blocked))
385 return 1;
386 if (likely(list_empty(&b_ptr->cong_links)))
387 return 0;
388 return !tipc_bearer_resolve_congestion(b_ptr, l_ptr);
389}
390
391/**
392 * tipc_enable_bearer - enable bearer with the given name 307 * tipc_enable_bearer - enable bearer with the given name
393 */ 308 */
394int tipc_enable_bearer(const char *name, u32 disc_domain, u32 priority) 309int tipc_enable_bearer(const char *name, u32 disc_domain, u32 priority)
@@ -489,7 +404,6 @@ restart:
489 b_ptr->net_plane = bearer_id + 'A'; 404 b_ptr->net_plane = bearer_id + 'A';
490 b_ptr->active = 1; 405 b_ptr->active = 1;
491 b_ptr->priority = priority; 406 b_ptr->priority = priority;
492 INIT_LIST_HEAD(&b_ptr->cong_links);
493 INIT_LIST_HEAD(&b_ptr->links); 407 INIT_LIST_HEAD(&b_ptr->links);
494 spin_lock_init(&b_ptr->lock); 408 spin_lock_init(&b_ptr->lock);
495 409
@@ -528,7 +442,6 @@ int tipc_block_bearer(const char *name)
528 pr_info("Blocking bearer <%s>\n", name); 442 pr_info("Blocking bearer <%s>\n", name);
529 spin_lock_bh(&b_ptr->lock); 443 spin_lock_bh(&b_ptr->lock);
530 b_ptr->blocked = 1; 444 b_ptr->blocked = 1;
531 list_splice_init(&b_ptr->cong_links, &b_ptr->links);
532 list_for_each_entry_safe(l_ptr, temp_l_ptr, &b_ptr->links, link_list) { 445 list_for_each_entry_safe(l_ptr, temp_l_ptr, &b_ptr->links, link_list) {
533 struct tipc_node *n_ptr = l_ptr->owner; 446 struct tipc_node *n_ptr = l_ptr->owner;
534 447
@@ -555,7 +468,6 @@ static void bearer_disable(struct tipc_bearer *b_ptr)
555 spin_lock_bh(&b_ptr->lock); 468 spin_lock_bh(&b_ptr->lock);
556 b_ptr->blocked = 1; 469 b_ptr->blocked = 1;
557 b_ptr->media->disable_bearer(b_ptr); 470 b_ptr->media->disable_bearer(b_ptr);
558 list_splice_init(&b_ptr->cong_links, &b_ptr->links);
559 list_for_each_entry_safe(l_ptr, temp_l_ptr, &b_ptr->links, link_list) { 471 list_for_each_entry_safe(l_ptr, temp_l_ptr, &b_ptr->links, link_list) {
560 tipc_link_delete(l_ptr); 472 tipc_link_delete(l_ptr);
561 } 473 }
diff --git a/net/tipc/bearer.h b/net/tipc/bearer.h
index dd4c2abf08e7..39f1192d04bf 100644
--- a/net/tipc/bearer.h
+++ b/net/tipc/bearer.h
@@ -120,7 +120,6 @@ struct tipc_media {
120 * @identity: array index of this bearer within TIPC bearer array 120 * @identity: array index of this bearer within TIPC bearer array
121 * @link_req: ptr to (optional) structure making periodic link setup requests 121 * @link_req: ptr to (optional) structure making periodic link setup requests
122 * @links: list of non-congested links associated with bearer 122 * @links: list of non-congested links associated with bearer
123 * @cong_links: list of congested links associated with bearer
124 * @active: non-zero if bearer structure is represents a bearer 123 * @active: non-zero if bearer structure is represents a bearer
125 * @net_plane: network plane ('A' through 'H') currently associated with bearer 124 * @net_plane: network plane ('A' through 'H') currently associated with bearer
126 * @nodes: indicates which nodes in cluster can be reached through bearer 125 * @nodes: indicates which nodes in cluster can be reached through bearer
@@ -143,7 +142,6 @@ struct tipc_bearer {
143 u32 identity; 142 u32 identity;
144 struct tipc_link_req *link_req; 143 struct tipc_link_req *link_req;
145 struct list_head links; 144 struct list_head links;
146 struct list_head cong_links;
147 int active; 145 int active;
148 char net_plane; 146 char net_plane;
149 struct tipc_node_map nodes; 147 struct tipc_node_map nodes;
@@ -185,39 +183,23 @@ struct sk_buff *tipc_media_get_names(void);
185struct sk_buff *tipc_bearer_get_names(void); 183struct sk_buff *tipc_bearer_get_names(void);
186void tipc_bearer_add_dest(struct tipc_bearer *b_ptr, u32 dest); 184void tipc_bearer_add_dest(struct tipc_bearer *b_ptr, u32 dest);
187void tipc_bearer_remove_dest(struct tipc_bearer *b_ptr, u32 dest); 185void tipc_bearer_remove_dest(struct tipc_bearer *b_ptr, u32 dest);
188void tipc_bearer_schedule(struct tipc_bearer *b_ptr, struct tipc_link *l_ptr);
189struct tipc_bearer *tipc_bearer_find(const char *name); 186struct tipc_bearer *tipc_bearer_find(const char *name);
190struct tipc_bearer *tipc_bearer_find_interface(const char *if_name); 187struct tipc_bearer *tipc_bearer_find_interface(const char *if_name);
191struct tipc_media *tipc_media_find(const char *name); 188struct tipc_media *tipc_media_find(const char *name);
192int tipc_bearer_resolve_congestion(struct tipc_bearer *b_ptr, 189int tipc_bearer_blocked(struct tipc_bearer *b_ptr);
193 struct tipc_link *l_ptr);
194int tipc_bearer_congested(struct tipc_bearer *b_ptr, struct tipc_link *l_ptr);
195void tipc_bearer_stop(void); 190void tipc_bearer_stop(void);
196void tipc_bearer_lock_push(struct tipc_bearer *b_ptr);
197
198 191
199/** 192/**
200 * tipc_bearer_send- sends buffer to destination over bearer 193 * tipc_bearer_send- sends buffer to destination over bearer
201 * 194 *
202 * Returns true (1) if successful, or false (0) if unable to send
203 *
204 * IMPORTANT: 195 * IMPORTANT:
205 * The media send routine must not alter the buffer being passed in 196 * The media send routine must not alter the buffer being passed in
206 * as it may be needed for later retransmission! 197 * as it may be needed for later retransmission!
207 *
208 * If the media send routine returns a non-zero value (indicating that
209 * it was unable to send the buffer), it must:
210 * 1) mark the bearer as blocked,
211 * 2) call tipc_continue() once the bearer is able to send again.
212 * Media types that are unable to meet these two critera must ensure their
213 * send routine always returns success -- even if the buffer was not sent --
214 * and let TIPC's link code deal with the undelivered message.
215 */ 198 */
216static inline int tipc_bearer_send(struct tipc_bearer *b_ptr, 199static inline void tipc_bearer_send(struct tipc_bearer *b, struct sk_buff *buf,
217 struct sk_buff *buf,
218 struct tipc_media_addr *dest) 200 struct tipc_media_addr *dest)
219{ 201{
220 return !b_ptr->media->send_msg(buf, b_ptr, dest); 202 b->media->send_msg(buf, b, dest);
221} 203}
222 204
223#endif /* _TIPC_BEARER_H */ 205#endif /* _TIPC_BEARER_H */
diff --git a/net/tipc/core.c b/net/tipc/core.c
index bfe8af88469a..fc05cecd7481 100644
--- a/net/tipc/core.c
+++ b/net/tipc/core.c
@@ -42,11 +42,6 @@
42 42
43#include <linux/module.h> 43#include <linux/module.h>
44 44
45#ifndef CONFIG_TIPC_PORTS
46#define CONFIG_TIPC_PORTS 8191
47#endif
48
49
50/* global variables used by multiple sub-systems within TIPC */ 45/* global variables used by multiple sub-systems within TIPC */
51int tipc_random __read_mostly; 46int tipc_random __read_mostly;
52 47
diff --git a/net/tipc/discover.c b/net/tipc/discover.c
index 50eaa403eb6e..1074b9587e81 100644
--- a/net/tipc/discover.c
+++ b/net/tipc/discover.c
@@ -243,7 +243,7 @@ void tipc_disc_recv_msg(struct sk_buff *buf, struct tipc_bearer *b_ptr)
243 if ((type == DSC_REQ_MSG) && !link_fully_up && !b_ptr->blocked) { 243 if ((type == DSC_REQ_MSG) && !link_fully_up && !b_ptr->blocked) {
244 rbuf = tipc_disc_init_msg(DSC_RESP_MSG, orig, b_ptr); 244 rbuf = tipc_disc_init_msg(DSC_RESP_MSG, orig, b_ptr);
245 if (rbuf) { 245 if (rbuf) {
246 b_ptr->media->send_msg(rbuf, b_ptr, &media_addr); 246 tipc_bearer_send(b_ptr, rbuf, &media_addr);
247 kfree_skb(rbuf); 247 kfree_skb(rbuf);
248 } 248 }
249 } 249 }
diff --git a/net/tipc/link.c b/net/tipc/link.c
index a79c755cb417..daa6080a2a0c 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -1,7 +1,7 @@
1/* 1/*
2 * net/tipc/link.c: TIPC link code 2 * net/tipc/link.c: TIPC link code
3 * 3 *
4 * Copyright (c) 1996-2007, Ericsson AB 4 * Copyright (c) 1996-2007, 2012, Ericsson AB
5 * Copyright (c) 2004-2007, 2010-2011, Wind River Systems 5 * Copyright (c) 2004-2007, 2010-2011, Wind River Systems
6 * All rights reserved. 6 * All rights reserved.
7 * 7 *
@@ -97,12 +97,13 @@ static int link_send_sections_long(struct tipc_port *sender,
97 struct iovec const *msg_sect, 97 struct iovec const *msg_sect,
98 u32 num_sect, unsigned int total_len, 98 u32 num_sect, unsigned int total_len,
99 u32 destnode); 99 u32 destnode);
100static void link_check_defragm_bufs(struct tipc_link *l_ptr);
101static void link_state_event(struct tipc_link *l_ptr, u32 event); 100static void link_state_event(struct tipc_link *l_ptr, u32 event);
102static void link_reset_statistics(struct tipc_link *l_ptr); 101static void link_reset_statistics(struct tipc_link *l_ptr);
103static void link_print(struct tipc_link *l_ptr, const char *str); 102static void link_print(struct tipc_link *l_ptr, const char *str);
104static void link_start(struct tipc_link *l_ptr); 103static void link_start(struct tipc_link *l_ptr);
105static int link_send_long_buf(struct tipc_link *l_ptr, struct sk_buff *buf); 104static int link_send_long_buf(struct tipc_link *l_ptr, struct sk_buff *buf);
105static void tipc_link_send_sync(struct tipc_link *l);
106static void tipc_link_recv_sync(struct tipc_node *n, struct sk_buff *buf);
106 107
107/* 108/*
108 * Simple link routines 109 * Simple link routines
@@ -269,7 +270,6 @@ static void link_timeout(struct tipc_link *l_ptr)
269 } 270 }
270 271
271 /* do all other link processing performed on a periodic basis */ 272 /* do all other link processing performed on a periodic basis */
272 link_check_defragm_bufs(l_ptr);
273 273
274 link_state_event(l_ptr, TIMEOUT_EVT); 274 link_state_event(l_ptr, TIMEOUT_EVT);
275 275
@@ -712,6 +712,8 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)
712 link_activate(l_ptr); 712 link_activate(l_ptr);
713 tipc_link_send_proto_msg(l_ptr, STATE_MSG, 1, 0, 0, 0, 0); 713 tipc_link_send_proto_msg(l_ptr, STATE_MSG, 1, 0, 0, 0, 0);
714 l_ptr->fsm_msg_cnt++; 714 l_ptr->fsm_msg_cnt++;
715 if (l_ptr->owner->working_links == 1)
716 tipc_link_send_sync(l_ptr);
715 link_set_timer(l_ptr, cont_intv); 717 link_set_timer(l_ptr, cont_intv);
716 break; 718 break;
717 case RESET_MSG: 719 case RESET_MSG:
@@ -745,6 +747,8 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)
745 link_activate(l_ptr); 747 link_activate(l_ptr);
746 tipc_link_send_proto_msg(l_ptr, STATE_MSG, 1, 0, 0, 0, 0); 748 tipc_link_send_proto_msg(l_ptr, STATE_MSG, 1, 0, 0, 0, 0);
747 l_ptr->fsm_msg_cnt++; 749 l_ptr->fsm_msg_cnt++;
750 if (l_ptr->owner->working_links == 1)
751 tipc_link_send_sync(l_ptr);
748 link_set_timer(l_ptr, cont_intv); 752 link_set_timer(l_ptr, cont_intv);
749 break; 753 break;
750 case RESET_MSG: 754 case RESET_MSG:
@@ -872,17 +876,12 @@ int tipc_link_send_buf(struct tipc_link *l_ptr, struct sk_buff *buf)
872 return link_send_long_buf(l_ptr, buf); 876 return link_send_long_buf(l_ptr, buf);
873 877
874 /* Packet can be queued or sent. */ 878 /* Packet can be queued or sent. */
875 if (likely(!tipc_bearer_congested(l_ptr->b_ptr, l_ptr) && 879 if (likely(!tipc_bearer_blocked(l_ptr->b_ptr) &&
876 !link_congested(l_ptr))) { 880 !link_congested(l_ptr))) {
877 link_add_to_outqueue(l_ptr, buf, msg); 881 link_add_to_outqueue(l_ptr, buf, msg);
878 882
879 if (likely(tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr))) { 883 tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr);
880 l_ptr->unacked_window = 0; 884 l_ptr->unacked_window = 0;
881 } else {
882 tipc_bearer_schedule(l_ptr->b_ptr, l_ptr);
883 l_ptr->stats.bearer_congs++;
884 l_ptr->next_out = buf;
885 }
886 return dsz; 885 return dsz;
887 } 886 }
888 /* Congestion: can message be bundled ? */ 887 /* Congestion: can message be bundled ? */
@@ -891,10 +890,8 @@ int tipc_link_send_buf(struct tipc_link *l_ptr, struct sk_buff *buf)
891 890
892 /* Try adding message to an existing bundle */ 891 /* Try adding message to an existing bundle */
893 if (l_ptr->next_out && 892 if (l_ptr->next_out &&
894 link_bundle_buf(l_ptr, l_ptr->last_out, buf)) { 893 link_bundle_buf(l_ptr, l_ptr->last_out, buf))
895 tipc_bearer_resolve_congestion(l_ptr->b_ptr, l_ptr);
896 return dsz; 894 return dsz;
897 }
898 895
899 /* Try creating a new bundle */ 896 /* Try creating a new bundle */
900 if (size <= max_packet * 2 / 3) { 897 if (size <= max_packet * 2 / 3) {
@@ -917,7 +914,6 @@ int tipc_link_send_buf(struct tipc_link *l_ptr, struct sk_buff *buf)
917 if (!l_ptr->next_out) 914 if (!l_ptr->next_out)
918 l_ptr->next_out = buf; 915 l_ptr->next_out = buf;
919 link_add_to_outqueue(l_ptr, buf, msg); 916 link_add_to_outqueue(l_ptr, buf, msg);
920 tipc_bearer_resolve_congestion(l_ptr->b_ptr, l_ptr);
921 return dsz; 917 return dsz;
922} 918}
923 919
@@ -949,7 +945,48 @@ int tipc_link_send(struct sk_buff *buf, u32 dest, u32 selector)
949 return res; 945 return res;
950} 946}
951 947
952/** 948/*
949 * tipc_link_send_sync - synchronize broadcast link endpoints.
950 *
951 * Give a newly added peer node the sequence number where it should
952 * start receiving and acking broadcast packets.
953 *
954 * Called with node locked
955 */
956static void tipc_link_send_sync(struct tipc_link *l)
957{
958 struct sk_buff *buf;
959 struct tipc_msg *msg;
960
961 buf = tipc_buf_acquire(INT_H_SIZE);
962 if (!buf)
963 return;
964
965 msg = buf_msg(buf);
966 tipc_msg_init(msg, BCAST_PROTOCOL, STATE_MSG, INT_H_SIZE, l->addr);
967 msg_set_last_bcast(msg, l->owner->bclink.acked);
968 link_add_chain_to_outqueue(l, buf, 0);
969 tipc_link_push_queue(l);
970}
971
972/*
973 * tipc_link_recv_sync - synchronize broadcast link endpoints.
974 * Receive the sequence number where we should start receiving and
975 * acking broadcast packets from a newly added peer node, and open
976 * up for reception of such packets.
977 *
978 * Called with node locked
979 */
980static void tipc_link_recv_sync(struct tipc_node *n, struct sk_buff *buf)
981{
982 struct tipc_msg *msg = buf_msg(buf);
983
984 n->bclink.last_sent = n->bclink.last_in = msg_last_bcast(msg);
985 n->bclink.recv_permitted = true;
986 kfree_skb(buf);
987}
988
989/*
953 * tipc_link_send_names - send name table entries to new neighbor 990 * tipc_link_send_names - send name table entries to new neighbor
954 * 991 *
955 * Send routine for bulk delivery of name table messages when contact 992 * Send routine for bulk delivery of name table messages when contact
@@ -1006,16 +1043,11 @@ static int link_send_buf_fast(struct tipc_link *l_ptr, struct sk_buff *buf,
1006 1043
1007 if (likely(!link_congested(l_ptr))) { 1044 if (likely(!link_congested(l_ptr))) {
1008 if (likely(msg_size(msg) <= l_ptr->max_pkt)) { 1045 if (likely(msg_size(msg) <= l_ptr->max_pkt)) {
1009 if (likely(list_empty(&l_ptr->b_ptr->cong_links))) { 1046 if (likely(!tipc_bearer_blocked(l_ptr->b_ptr))) {
1010 link_add_to_outqueue(l_ptr, buf, msg); 1047 link_add_to_outqueue(l_ptr, buf, msg);
1011 if (likely(tipc_bearer_send(l_ptr->b_ptr, buf, 1048 tipc_bearer_send(l_ptr->b_ptr, buf,
1012 &l_ptr->media_addr))) { 1049 &l_ptr->media_addr);
1013 l_ptr->unacked_window = 0; 1050 l_ptr->unacked_window = 0;
1014 return res;
1015 }
1016 tipc_bearer_schedule(l_ptr->b_ptr, l_ptr);
1017 l_ptr->stats.bearer_congs++;
1018 l_ptr->next_out = buf;
1019 return res; 1051 return res;
1020 } 1052 }
1021 } else 1053 } else
@@ -1106,7 +1138,7 @@ exit:
1106 1138
1107 /* Exit if link (or bearer) is congested */ 1139 /* Exit if link (or bearer) is congested */
1108 if (link_congested(l_ptr) || 1140 if (link_congested(l_ptr) ||
1109 !list_empty(&l_ptr->b_ptr->cong_links)) { 1141 tipc_bearer_blocked(l_ptr->b_ptr)) {
1110 res = link_schedule_port(l_ptr, 1142 res = link_schedule_port(l_ptr,
1111 sender->ref, res); 1143 sender->ref, res);
1112 goto exit; 1144 goto exit;
@@ -1329,15 +1361,11 @@ u32 tipc_link_push_packet(struct tipc_link *l_ptr)
1329 if (r_q_size && buf) { 1361 if (r_q_size && buf) {
1330 msg_set_ack(buf_msg(buf), mod(l_ptr->next_in_no - 1)); 1362 msg_set_ack(buf_msg(buf), mod(l_ptr->next_in_no - 1));
1331 msg_set_bcast_ack(buf_msg(buf), l_ptr->owner->bclink.last_in); 1363 msg_set_bcast_ack(buf_msg(buf), l_ptr->owner->bclink.last_in);
1332 if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) { 1364 tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr);
1333 l_ptr->retransm_queue_head = mod(++r_q_head); 1365 l_ptr->retransm_queue_head = mod(++r_q_head);
1334 l_ptr->retransm_queue_size = --r_q_size; 1366 l_ptr->retransm_queue_size = --r_q_size;
1335 l_ptr->stats.retransmitted++; 1367 l_ptr->stats.retransmitted++;
1336 return 0; 1368 return 0;
1337 } else {
1338 l_ptr->stats.bearer_congs++;
1339 return PUSH_FAILED;
1340 }
1341 } 1369 }
1342 1370
1343 /* Send deferred protocol message, if any: */ 1371 /* Send deferred protocol message, if any: */
@@ -1345,15 +1373,11 @@ u32 tipc_link_push_packet(struct tipc_link *l_ptr)
1345 if (buf) { 1373 if (buf) {
1346 msg_set_ack(buf_msg(buf), mod(l_ptr->next_in_no - 1)); 1374 msg_set_ack(buf_msg(buf), mod(l_ptr->next_in_no - 1));
1347 msg_set_bcast_ack(buf_msg(buf), l_ptr->owner->bclink.last_in); 1375 msg_set_bcast_ack(buf_msg(buf), l_ptr->owner->bclink.last_in);
1348 if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) { 1376 tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr);
1349 l_ptr->unacked_window = 0; 1377 l_ptr->unacked_window = 0;
1350 kfree_skb(buf); 1378 kfree_skb(buf);
1351 l_ptr->proto_msg_queue = NULL; 1379 l_ptr->proto_msg_queue = NULL;
1352 return 0; 1380 return 0;
1353 } else {
1354 l_ptr->stats.bearer_congs++;
1355 return PUSH_FAILED;
1356 }
1357 } 1381 }
1358 1382
1359 /* Send one deferred data message, if send window not full: */ 1383 /* Send one deferred data message, if send window not full: */
@@ -1366,18 +1390,14 @@ u32 tipc_link_push_packet(struct tipc_link *l_ptr)
1366 if (mod(next - first) < l_ptr->queue_limit[0]) { 1390 if (mod(next - first) < l_ptr->queue_limit[0]) {
1367 msg_set_ack(msg, mod(l_ptr->next_in_no - 1)); 1391 msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
1368 msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in); 1392 msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
1369 if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) { 1393 tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr);
1370 if (msg_user(msg) == MSG_BUNDLER) 1394 if (msg_user(msg) == MSG_BUNDLER)
1371 msg_set_type(msg, CLOSED_MSG); 1395 msg_set_type(msg, CLOSED_MSG);
1372 l_ptr->next_out = buf->next; 1396 l_ptr->next_out = buf->next;
1373 return 0; 1397 return 0;
1374 } else {
1375 l_ptr->stats.bearer_congs++;
1376 return PUSH_FAILED;
1377 }
1378 } 1398 }
1379 } 1399 }
1380 return PUSH_FINISHED; 1400 return 1;
1381} 1401}
1382 1402
1383/* 1403/*
@@ -1388,15 +1408,12 @@ void tipc_link_push_queue(struct tipc_link *l_ptr)
1388{ 1408{
1389 u32 res; 1409 u32 res;
1390 1410
1391 if (tipc_bearer_congested(l_ptr->b_ptr, l_ptr)) 1411 if (tipc_bearer_blocked(l_ptr->b_ptr))
1392 return; 1412 return;
1393 1413
1394 do { 1414 do {
1395 res = tipc_link_push_packet(l_ptr); 1415 res = tipc_link_push_packet(l_ptr);
1396 } while (!res); 1416 } while (!res);
1397
1398 if (res == PUSH_FAILED)
1399 tipc_bearer_schedule(l_ptr->b_ptr, l_ptr);
1400} 1417}
1401 1418
1402static void link_reset_all(unsigned long addr) 1419static void link_reset_all(unsigned long addr)
@@ -1454,9 +1471,8 @@ static void link_retransmit_failure(struct tipc_link *l_ptr,
1454 1471
1455 tipc_addr_string_fill(addr_string, n_ptr->addr); 1472 tipc_addr_string_fill(addr_string, n_ptr->addr);
1456 pr_info("Broadcast link info for %s\n", addr_string); 1473 pr_info("Broadcast link info for %s\n", addr_string);
1457 pr_info("Supportable: %d, Supported: %d, Acked: %u\n", 1474 pr_info("Reception permitted: %d, Acked: %u\n",
1458 n_ptr->bclink.supportable, 1475 n_ptr->bclink.recv_permitted,
1459 n_ptr->bclink.supported,
1460 n_ptr->bclink.acked); 1476 n_ptr->bclink.acked);
1461 pr_info("Last in: %u, Oos state: %u, Last sent: %u\n", 1477 pr_info("Last in: %u, Oos state: %u, Last sent: %u\n",
1462 n_ptr->bclink.last_in, 1478 n_ptr->bclink.last_in,
@@ -1481,7 +1497,7 @@ void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *buf,
1481 1497
1482 msg = buf_msg(buf); 1498 msg = buf_msg(buf);
1483 1499
1484 if (tipc_bearer_congested(l_ptr->b_ptr, l_ptr)) { 1500 if (tipc_bearer_blocked(l_ptr->b_ptr)) {
1485 if (l_ptr->retransm_queue_size == 0) { 1501 if (l_ptr->retransm_queue_size == 0) {
1486 l_ptr->retransm_queue_head = msg_seqno(msg); 1502 l_ptr->retransm_queue_head = msg_seqno(msg);
1487 l_ptr->retransm_queue_size = retransmits; 1503 l_ptr->retransm_queue_size = retransmits;
@@ -1491,7 +1507,7 @@ void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *buf,
1491 } 1507 }
1492 return; 1508 return;
1493 } else { 1509 } else {
1494 /* Detect repeated retransmit failures on uncongested bearer */ 1510 /* Detect repeated retransmit failures on unblocked bearer */
1495 if (l_ptr->last_retransmitted == msg_seqno(msg)) { 1511 if (l_ptr->last_retransmitted == msg_seqno(msg)) {
1496 if (++l_ptr->stale_count > 100) { 1512 if (++l_ptr->stale_count > 100) {
1497 link_retransmit_failure(l_ptr, buf); 1513 link_retransmit_failure(l_ptr, buf);
@@ -1507,17 +1523,10 @@ void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *buf,
1507 msg = buf_msg(buf); 1523 msg = buf_msg(buf);
1508 msg_set_ack(msg, mod(l_ptr->next_in_no - 1)); 1524 msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
1509 msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in); 1525 msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
1510 if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) { 1526 tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr);
1511 buf = buf->next; 1527 buf = buf->next;
1512 retransmits--; 1528 retransmits--;
1513 l_ptr->stats.retransmitted++; 1529 l_ptr->stats.retransmitted++;
1514 } else {
1515 tipc_bearer_schedule(l_ptr->b_ptr, l_ptr);
1516 l_ptr->stats.bearer_congs++;
1517 l_ptr->retransm_queue_head = buf_seqno(buf);
1518 l_ptr->retransm_queue_size = retransmits;
1519 return;
1520 }
1521 } 1530 }
1522 1531
1523 l_ptr->retransm_queue_head = l_ptr->retransm_queue_size = 0; 1532 l_ptr->retransm_queue_head = l_ptr->retransm_queue_size = 0;
@@ -1676,7 +1685,7 @@ void tipc_recv_msg(struct sk_buff *head, struct tipc_bearer *b_ptr)
1676 ackd = msg_ack(msg); 1685 ackd = msg_ack(msg);
1677 1686
1678 /* Release acked messages */ 1687 /* Release acked messages */
1679 if (n_ptr->bclink.supported) 1688 if (n_ptr->bclink.recv_permitted)
1680 tipc_bclink_acknowledge(n_ptr, msg_bcast_ack(msg)); 1689 tipc_bclink_acknowledge(n_ptr, msg_bcast_ack(msg));
1681 1690
1682 crs = l_ptr->first_out; 1691 crs = l_ptr->first_out;
@@ -1727,9 +1736,14 @@ deliver:
1727 tipc_link_recv_bundle(buf); 1736 tipc_link_recv_bundle(buf);
1728 continue; 1737 continue;
1729 case NAME_DISTRIBUTOR: 1738 case NAME_DISTRIBUTOR:
1739 n_ptr->bclink.recv_permitted = true;
1730 tipc_node_unlock(n_ptr); 1740 tipc_node_unlock(n_ptr);
1731 tipc_named_recv(buf); 1741 tipc_named_recv(buf);
1732 continue; 1742 continue;
1743 case BCAST_PROTOCOL:
1744 tipc_link_recv_sync(n_ptr, buf);
1745 tipc_node_unlock(n_ptr);
1746 continue;
1733 case CONN_MANAGER: 1747 case CONN_MANAGER:
1734 tipc_node_unlock(n_ptr); 1748 tipc_node_unlock(n_ptr);
1735 tipc_port_recv_proto_msg(buf); 1749 tipc_port_recv_proto_msg(buf);
@@ -1772,16 +1786,19 @@ deliver:
1772 continue; 1786 continue;
1773 } 1787 }
1774 1788
1789 /* Link is not in state WORKING_WORKING */
1775 if (msg_user(msg) == LINK_PROTOCOL) { 1790 if (msg_user(msg) == LINK_PROTOCOL) {
1776 link_recv_proto_msg(l_ptr, buf); 1791 link_recv_proto_msg(l_ptr, buf);
1777 head = link_insert_deferred_queue(l_ptr, head); 1792 head = link_insert_deferred_queue(l_ptr, head);
1778 tipc_node_unlock(n_ptr); 1793 tipc_node_unlock(n_ptr);
1779 continue; 1794 continue;
1780 } 1795 }
1796
1797 /* Traffic message. Conditionally activate link */
1781 link_state_event(l_ptr, TRAFFIC_MSG_EVT); 1798 link_state_event(l_ptr, TRAFFIC_MSG_EVT);
1782 1799
1783 if (link_working_working(l_ptr)) { 1800 if (link_working_working(l_ptr)) {
1784 /* Re-insert in front of queue */ 1801 /* Re-insert buffer in front of queue */
1785 buf->next = head; 1802 buf->next = head;
1786 head = buf; 1803 head = buf;
1787 tipc_node_unlock(n_ptr); 1804 tipc_node_unlock(n_ptr);
@@ -1972,21 +1989,13 @@ void tipc_link_send_proto_msg(struct tipc_link *l_ptr, u32 msg_typ,
1972 1989
1973 skb_copy_to_linear_data(buf, msg, sizeof(l_ptr->proto_msg)); 1990 skb_copy_to_linear_data(buf, msg, sizeof(l_ptr->proto_msg));
1974 1991
1975 /* Defer message if bearer is already congested */ 1992 /* Defer message if bearer is already blocked */
1976 if (tipc_bearer_congested(l_ptr->b_ptr, l_ptr)) { 1993 if (tipc_bearer_blocked(l_ptr->b_ptr)) {
1977 l_ptr->proto_msg_queue = buf;
1978 return;
1979 }
1980
1981 /* Defer message if attempting to send results in bearer congestion */
1982 if (!tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) {
1983 tipc_bearer_schedule(l_ptr->b_ptr, l_ptr);
1984 l_ptr->proto_msg_queue = buf; 1994 l_ptr->proto_msg_queue = buf;
1985 l_ptr->stats.bearer_congs++;
1986 return; 1995 return;
1987 } 1996 }
1988 1997
1989 /* Discard message if it was sent successfully */ 1998 tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr);
1990 l_ptr->unacked_window = 0; 1999 l_ptr->unacked_window = 0;
1991 kfree_skb(buf); 2000 kfree_skb(buf);
1992} 2001}
@@ -2057,7 +2066,6 @@ static void link_recv_proto_msg(struct tipc_link *l_ptr, struct sk_buff *buf)
2057 } else { 2066 } else {
2058 l_ptr->max_pkt = l_ptr->max_pkt_target; 2067 l_ptr->max_pkt = l_ptr->max_pkt_target;
2059 } 2068 }
2060 l_ptr->owner->bclink.supportable = (max_pkt_info != 0);
2061 2069
2062 /* Synchronize broadcast link info, if not done previously */ 2070 /* Synchronize broadcast link info, if not done previously */
2063 if (!tipc_node_is_up(l_ptr->owner)) { 2071 if (!tipc_node_is_up(l_ptr->owner)) {
@@ -2112,7 +2120,7 @@ static void link_recv_proto_msg(struct tipc_link *l_ptr, struct sk_buff *buf)
2112 } 2120 }
2113 2121
2114 /* Protocol message before retransmits, reduce loss risk */ 2122 /* Protocol message before retransmits, reduce loss risk */
2115 if (l_ptr->owner->bclink.supported) 2123 if (l_ptr->owner->bclink.recv_permitted)
2116 tipc_bclink_update_link_state(l_ptr->owner, 2124 tipc_bclink_update_link_state(l_ptr->owner,
2117 msg_last_bcast(msg)); 2125 msg_last_bcast(msg));
2118 2126
@@ -2487,16 +2495,6 @@ static void set_expected_frags(struct sk_buff *buf, u32 exp)
2487 msg_set_bcast_ack(buf_msg(buf), exp); 2495 msg_set_bcast_ack(buf_msg(buf), exp);
2488} 2496}
2489 2497
2490static u32 get_timer_cnt(struct sk_buff *buf)
2491{
2492 return msg_reroute_cnt(buf_msg(buf));
2493}
2494
2495static void incr_timer_cnt(struct sk_buff *buf)
2496{
2497 msg_incr_reroute_cnt(buf_msg(buf));
2498}
2499
2500/* 2498/*
2501 * tipc_link_recv_fragment(): Called with node lock on. Returns 2499 * tipc_link_recv_fragment(): Called with node lock on. Returns
2502 * the reassembled buffer if message is complete. 2500 * the reassembled buffer if message is complete.
@@ -2575,38 +2573,6 @@ int tipc_link_recv_fragment(struct sk_buff **pending, struct sk_buff **fb,
2575 return 0; 2573 return 0;
2576} 2574}
2577 2575
2578/**
2579 * link_check_defragm_bufs - flush stale incoming message fragments
2580 * @l_ptr: pointer to link
2581 */
2582static void link_check_defragm_bufs(struct tipc_link *l_ptr)
2583{
2584 struct sk_buff *prev = NULL;
2585 struct sk_buff *next = NULL;
2586 struct sk_buff *buf = l_ptr->defragm_buf;
2587
2588 if (!buf)
2589 return;
2590 if (!link_working_working(l_ptr))
2591 return;
2592 while (buf) {
2593 u32 cnt = get_timer_cnt(buf);
2594
2595 next = buf->next;
2596 if (cnt < 4) {
2597 incr_timer_cnt(buf);
2598 prev = buf;
2599 } else {
2600 if (prev)
2601 prev->next = buf->next;
2602 else
2603 l_ptr->defragm_buf = buf->next;
2604 kfree_skb(buf);
2605 }
2606 buf = next;
2607 }
2608}
2609
2610static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tolerance) 2576static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tolerance)
2611{ 2577{
2612 if ((tolerance < TIPC_MIN_LINK_TOL) || (tolerance > TIPC_MAX_LINK_TOL)) 2578 if ((tolerance < TIPC_MIN_LINK_TOL) || (tolerance > TIPC_MAX_LINK_TOL))
@@ -2937,8 +2903,8 @@ static int tipc_link_stats(const char *name, char *buf, const u32 buf_size)
2937 s->sent_nacks, s->sent_acks, s->retransmitted); 2903 s->sent_nacks, s->sent_acks, s->retransmitted);
2938 2904
2939 ret += tipc_snprintf(buf + ret, buf_size - ret, 2905 ret += tipc_snprintf(buf + ret, buf_size - ret,
2940 " Congestion bearer:%u link:%u Send queue" 2906 " Congestion link:%u Send queue"
2941 " max:%u avg:%u\n", s->bearer_congs, s->link_congs, 2907 " max:%u avg:%u\n", s->link_congs,
2942 s->max_queue_sz, s->queue_sz_counts ? 2908 s->max_queue_sz, s->queue_sz_counts ?
2943 (s->accu_queue_sz / s->queue_sz_counts) : 0); 2909 (s->accu_queue_sz / s->queue_sz_counts) : 0);
2944 2910
diff --git a/net/tipc/link.h b/net/tipc/link.h
index 6e921121be06..c048ed1cbd76 100644
--- a/net/tipc/link.h
+++ b/net/tipc/link.h
@@ -40,9 +40,6 @@
40#include "msg.h" 40#include "msg.h"
41#include "node.h" 41#include "node.h"
42 42
43#define PUSH_FAILED 1
44#define PUSH_FINISHED 2
45
46/* 43/*
47 * Out-of-range value for link sequence numbers 44 * Out-of-range value for link sequence numbers
48 */ 45 */
@@ -82,7 +79,6 @@ struct tipc_stats {
82 u32 recv_fragmented; 79 u32 recv_fragmented;
83 u32 recv_fragments; 80 u32 recv_fragments;
84 u32 link_congs; /* # port sends blocked by congestion */ 81 u32 link_congs; /* # port sends blocked by congestion */
85 u32 bearer_congs;
86 u32 deferred_recv; 82 u32 deferred_recv;
87 u32 duplicates; 83 u32 duplicates;
88 u32 max_queue_sz; /* send queue size high water mark */ 84 u32 max_queue_sz; /* send queue size high water mark */
diff --git a/net/tipc/name_distr.c b/net/tipc/name_distr.c
index 55d3928dfd67..e0d08055754e 100644
--- a/net/tipc/name_distr.c
+++ b/net/tipc/name_distr.c
@@ -262,7 +262,7 @@ void tipc_named_node_up(unsigned long nodearg)
262 named_distribute(&message_list, node, &publ_zone, max_item_buf); 262 named_distribute(&message_list, node, &publ_zone, max_item_buf);
263 read_unlock_bh(&tipc_nametbl_lock); 263 read_unlock_bh(&tipc_nametbl_lock);
264 264
265 tipc_link_send_names(&message_list, (u32)node); 265 tipc_link_send_names(&message_list, node);
266} 266}
267 267
268/** 268/**
diff --git a/net/tipc/node.c b/net/tipc/node.c
index d21db204e25a..48f39dd3eae8 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -1,7 +1,7 @@
1/* 1/*
2 * net/tipc/node.c: TIPC node management routines 2 * net/tipc/node.c: TIPC node management routines
3 * 3 *
4 * Copyright (c) 2000-2006, Ericsson AB 4 * Copyright (c) 2000-2006, 2012 Ericsson AB
5 * Copyright (c) 2005-2006, 2010-2011, Wind River Systems 5 * Copyright (c) 2005-2006, 2010-2011, Wind River Systems
6 * All rights reserved. 6 * All rights reserved.
7 * 7 *
@@ -263,12 +263,9 @@ void tipc_node_detach_link(struct tipc_node *n_ptr, struct tipc_link *l_ptr)
263static void node_established_contact(struct tipc_node *n_ptr) 263static void node_established_contact(struct tipc_node *n_ptr)
264{ 264{
265 tipc_k_signal((Handler)tipc_named_node_up, n_ptr->addr); 265 tipc_k_signal((Handler)tipc_named_node_up, n_ptr->addr);
266 266 n_ptr->bclink.oos_state = 0;
267 if (n_ptr->bclink.supportable) { 267 n_ptr->bclink.acked = tipc_bclink_get_last_sent();
268 n_ptr->bclink.acked = tipc_bclink_get_last_sent(); 268 tipc_bclink_add_node(n_ptr->addr);
269 tipc_bclink_add_node(n_ptr->addr);
270 n_ptr->bclink.supported = 1;
271 }
272} 269}
273 270
274static void node_name_purge_complete(unsigned long node_addr) 271static void node_name_purge_complete(unsigned long node_addr)
@@ -294,7 +291,7 @@ static void node_lost_contact(struct tipc_node *n_ptr)
294 tipc_addr_string_fill(addr_string, n_ptr->addr)); 291 tipc_addr_string_fill(addr_string, n_ptr->addr));
295 292
296 /* Flush broadcast link info associated with lost node */ 293 /* Flush broadcast link info associated with lost node */
297 if (n_ptr->bclink.supported) { 294 if (n_ptr->bclink.recv_permitted) {
298 while (n_ptr->bclink.deferred_head) { 295 while (n_ptr->bclink.deferred_head) {
299 struct sk_buff *buf = n_ptr->bclink.deferred_head; 296 struct sk_buff *buf = n_ptr->bclink.deferred_head;
300 n_ptr->bclink.deferred_head = buf->next; 297 n_ptr->bclink.deferred_head = buf->next;
@@ -310,7 +307,7 @@ static void node_lost_contact(struct tipc_node *n_ptr)
310 tipc_bclink_remove_node(n_ptr->addr); 307 tipc_bclink_remove_node(n_ptr->addr);
311 tipc_bclink_acknowledge(n_ptr, INVALID_LINK_SEQ); 308 tipc_bclink_acknowledge(n_ptr, INVALID_LINK_SEQ);
312 309
313 n_ptr->bclink.supported = 0; 310 n_ptr->bclink.recv_permitted = false;
314 } 311 }
315 312
316 /* Abort link changeover */ 313 /* Abort link changeover */
diff --git a/net/tipc/node.h b/net/tipc/node.h
index cfcaf4d6e480..3c189b35b102 100644
--- a/net/tipc/node.h
+++ b/net/tipc/node.h
@@ -67,8 +67,6 @@
67 * @permit_changeover: non-zero if node has redundant links to this system 67 * @permit_changeover: non-zero if node has redundant links to this system
68 * @signature: node instance identifier 68 * @signature: node instance identifier
69 * @bclink: broadcast-related info 69 * @bclink: broadcast-related info
70 * @supportable: non-zero if node supports TIPC b'cast link capability
71 * @supported: non-zero if node supports TIPC b'cast capability
72 * @acked: sequence # of last outbound b'cast message acknowledged by node 70 * @acked: sequence # of last outbound b'cast message acknowledged by node
73 * @last_in: sequence # of last in-sequence b'cast message received from node 71 * @last_in: sequence # of last in-sequence b'cast message received from node
74 * @last_sent: sequence # of last b'cast message sent by node 72 * @last_sent: sequence # of last b'cast message sent by node
@@ -77,6 +75,7 @@
77 * @deferred_head: oldest OOS b'cast message received from node 75 * @deferred_head: oldest OOS b'cast message received from node
78 * @deferred_tail: newest OOS b'cast message received from node 76 * @deferred_tail: newest OOS b'cast message received from node
79 * @defragm: list of partially reassembled b'cast message fragments from node 77 * @defragm: list of partially reassembled b'cast message fragments from node
78 * @recv_permitted: true if node is allowed to receive b'cast messages
80 */ 79 */
81struct tipc_node { 80struct tipc_node {
82 u32 addr; 81 u32 addr;
@@ -92,8 +91,6 @@ struct tipc_node {
92 int permit_changeover; 91 int permit_changeover;
93 u32 signature; 92 u32 signature;
94 struct { 93 struct {
95 u8 supportable;
96 u8 supported;
97 u32 acked; 94 u32 acked;
98 u32 last_in; 95 u32 last_in;
99 u32 last_sent; 96 u32 last_sent;
@@ -102,6 +99,7 @@ struct tipc_node {
102 struct sk_buff *deferred_head; 99 struct sk_buff *deferred_head;
103 struct sk_buff *deferred_tail; 100 struct sk_buff *deferred_tail;
104 struct sk_buff *defragm; 101 struct sk_buff *defragm;
102 bool recv_permitted;
105 } bclink; 103 } bclink;
106}; 104};
107 105
diff --git a/net/tipc/port.c b/net/tipc/port.c
index 07c42fba672b..18098cac62f2 100644
--- a/net/tipc/port.c
+++ b/net/tipc/port.c
@@ -726,7 +726,7 @@ static void port_dispatcher_sigh(void *dummy)
726 if (unlikely(!cb)) 726 if (unlikely(!cb))
727 goto reject; 727 goto reject;
728 if (unlikely(!connected)) { 728 if (unlikely(!connected)) {
729 if (tipc_connect2port(dref, &orig)) 729 if (tipc_connect(dref, &orig))
730 goto reject; 730 goto reject;
731 } else if (peer_invalid) 731 } else if (peer_invalid)
732 goto reject; 732 goto reject;
@@ -1036,15 +1036,30 @@ int tipc_withdraw(u32 ref, unsigned int scope, struct tipc_name_seq const *seq)
1036 return res; 1036 return res;
1037} 1037}
1038 1038
1039int tipc_connect2port(u32 ref, struct tipc_portid const *peer) 1039int tipc_connect(u32 ref, struct tipc_portid const *peer)
1040{ 1040{
1041 struct tipc_port *p_ptr; 1041 struct tipc_port *p_ptr;
1042 struct tipc_msg *msg; 1042 int res;
1043 int res = -EINVAL;
1044 1043
1045 p_ptr = tipc_port_lock(ref); 1044 p_ptr = tipc_port_lock(ref);
1046 if (!p_ptr) 1045 if (!p_ptr)
1047 return -EINVAL; 1046 return -EINVAL;
1047 res = __tipc_connect(ref, p_ptr, peer);
1048 tipc_port_unlock(p_ptr);
1049 return res;
1050}
1051
1052/*
1053 * __tipc_connect - connect to a remote peer
1054 *
1055 * Port must be locked.
1056 */
1057int __tipc_connect(u32 ref, struct tipc_port *p_ptr,
1058 struct tipc_portid const *peer)
1059{
1060 struct tipc_msg *msg;
1061 int res = -EINVAL;
1062
1048 if (p_ptr->published || p_ptr->connected) 1063 if (p_ptr->published || p_ptr->connected)
1049 goto exit; 1064 goto exit;
1050 if (!peer->ref) 1065 if (!peer->ref)
@@ -1067,17 +1082,16 @@ int tipc_connect2port(u32 ref, struct tipc_portid const *peer)
1067 (net_ev_handler)port_handle_node_down); 1082 (net_ev_handler)port_handle_node_down);
1068 res = 0; 1083 res = 0;
1069exit: 1084exit:
1070 tipc_port_unlock(p_ptr);
1071 p_ptr->max_pkt = tipc_link_get_max_pkt(peer->node, ref); 1085 p_ptr->max_pkt = tipc_link_get_max_pkt(peer->node, ref);
1072 return res; 1086 return res;
1073} 1087}
1074 1088
1075/** 1089/*
1076 * tipc_disconnect_port - disconnect port from peer 1090 * __tipc_disconnect - disconnect port from peer
1077 * 1091 *
1078 * Port must be locked. 1092 * Port must be locked.
1079 */ 1093 */
1080int tipc_disconnect_port(struct tipc_port *tp_ptr) 1094int __tipc_disconnect(struct tipc_port *tp_ptr)
1081{ 1095{
1082 int res; 1096 int res;
1083 1097
@@ -1104,7 +1118,7 @@ int tipc_disconnect(u32 ref)
1104 p_ptr = tipc_port_lock(ref); 1118 p_ptr = tipc_port_lock(ref);
1105 if (!p_ptr) 1119 if (!p_ptr)
1106 return -EINVAL; 1120 return -EINVAL;
1107 res = tipc_disconnect_port(p_ptr); 1121 res = __tipc_disconnect(p_ptr);
1108 tipc_port_unlock(p_ptr); 1122 tipc_port_unlock(p_ptr);
1109 return res; 1123 return res;
1110} 1124}
diff --git a/net/tipc/port.h b/net/tipc/port.h
index 4660e3065790..fb66e2e5f4d1 100644
--- a/net/tipc/port.h
+++ b/net/tipc/port.h
@@ -190,7 +190,7 @@ int tipc_publish(u32 portref, unsigned int scope,
190int tipc_withdraw(u32 portref, unsigned int scope, 190int tipc_withdraw(u32 portref, unsigned int scope,
191 struct tipc_name_seq const *name_seq); 191 struct tipc_name_seq const *name_seq);
192 192
193int tipc_connect2port(u32 portref, struct tipc_portid const *port); 193int tipc_connect(u32 portref, struct tipc_portid const *port);
194 194
195int tipc_disconnect(u32 portref); 195int tipc_disconnect(u32 portref);
196 196
@@ -200,7 +200,9 @@ int tipc_shutdown(u32 ref);
200/* 200/*
201 * The following routines require that the port be locked on entry 201 * The following routines require that the port be locked on entry
202 */ 202 */
203int tipc_disconnect_port(struct tipc_port *tp_ptr); 203int __tipc_disconnect(struct tipc_port *tp_ptr);
204int __tipc_connect(u32 ref, struct tipc_port *p_ptr,
205 struct tipc_portid const *peer);
204int tipc_port_peer_msg(struct tipc_port *p_ptr, struct tipc_msg *msg); 206int tipc_port_peer_msg(struct tipc_port *p_ptr, struct tipc_msg *msg);
205 207
206/* 208/*
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index fd5f042dbff4..9b4e4833a484 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -1,8 +1,8 @@
1/* 1/*
2 * net/tipc/socket.c: TIPC socket API 2 * net/tipc/socket.c: TIPC socket API
3 * 3 *
4 * Copyright (c) 2001-2007, Ericsson AB 4 * Copyright (c) 2001-2007, 2012 Ericsson AB
5 * Copyright (c) 2004-2008, 2010-2011, Wind River Systems 5 * Copyright (c) 2004-2008, 2010-2012, Wind River Systems
6 * All rights reserved. 6 * All rights reserved.
7 * 7 *
8 * Redistribution and use in source and binary forms, with or without 8 * Redistribution and use in source and binary forms, with or without
@@ -43,7 +43,7 @@
43#define SS_LISTENING -1 /* socket is listening */ 43#define SS_LISTENING -1 /* socket is listening */
44#define SS_READY -2 /* socket is connectionless */ 44#define SS_READY -2 /* socket is connectionless */
45 45
46#define OVERLOAD_LIMIT_BASE 5000 46#define OVERLOAD_LIMIT_BASE 10000
47#define CONN_TIMEOUT_DEFAULT 8000 /* default connect timeout = 8s */ 47#define CONN_TIMEOUT_DEFAULT 8000 /* default connect timeout = 8s */
48 48
49struct tipc_sock { 49struct tipc_sock {
@@ -62,6 +62,8 @@ struct tipc_sock {
62static int backlog_rcv(struct sock *sk, struct sk_buff *skb); 62static int backlog_rcv(struct sock *sk, struct sk_buff *skb);
63static u32 dispatch(struct tipc_port *tport, struct sk_buff *buf); 63static u32 dispatch(struct tipc_port *tport, struct sk_buff *buf);
64static void wakeupdispatch(struct tipc_port *tport); 64static void wakeupdispatch(struct tipc_port *tport);
65static void tipc_data_ready(struct sock *sk, int len);
66static void tipc_write_space(struct sock *sk);
65 67
66static const struct proto_ops packet_ops; 68static const struct proto_ops packet_ops;
67static const struct proto_ops stream_ops; 69static const struct proto_ops stream_ops;
@@ -71,8 +73,6 @@ static struct proto tipc_proto;
71 73
72static int sockets_enabled; 74static int sockets_enabled;
73 75
74static atomic_t tipc_queue_size = ATOMIC_INIT(0);
75
76/* 76/*
77 * Revised TIPC socket locking policy: 77 * Revised TIPC socket locking policy:
78 * 78 *
@@ -126,7 +126,6 @@ static atomic_t tipc_queue_size = ATOMIC_INIT(0);
126static void advance_rx_queue(struct sock *sk) 126static void advance_rx_queue(struct sock *sk)
127{ 127{
128 kfree_skb(__skb_dequeue(&sk->sk_receive_queue)); 128 kfree_skb(__skb_dequeue(&sk->sk_receive_queue));
129 atomic_dec(&tipc_queue_size);
130} 129}
131 130
132/** 131/**
@@ -138,10 +137,8 @@ static void discard_rx_queue(struct sock *sk)
138{ 137{
139 struct sk_buff *buf; 138 struct sk_buff *buf;
140 139
141 while ((buf = __skb_dequeue(&sk->sk_receive_queue))) { 140 while ((buf = __skb_dequeue(&sk->sk_receive_queue)))
142 atomic_dec(&tipc_queue_size);
143 kfree_skb(buf); 141 kfree_skb(buf);
144 }
145} 142}
146 143
147/** 144/**
@@ -153,10 +150,8 @@ static void reject_rx_queue(struct sock *sk)
153{ 150{
154 struct sk_buff *buf; 151 struct sk_buff *buf;
155 152
156 while ((buf = __skb_dequeue(&sk->sk_receive_queue))) { 153 while ((buf = __skb_dequeue(&sk->sk_receive_queue)))
157 tipc_reject_msg(buf, TIPC_ERR_NO_PORT); 154 tipc_reject_msg(buf, TIPC_ERR_NO_PORT);
158 atomic_dec(&tipc_queue_size);
159 }
160} 155}
161 156
162/** 157/**
@@ -221,6 +216,8 @@ static int tipc_create(struct net *net, struct socket *sock, int protocol,
221 sock_init_data(sock, sk); 216 sock_init_data(sock, sk);
222 sk->sk_backlog_rcv = backlog_rcv; 217 sk->sk_backlog_rcv = backlog_rcv;
223 sk->sk_rcvbuf = TIPC_FLOW_CONTROL_WIN * 2 * TIPC_MAX_USER_MSG_SIZE * 2; 218 sk->sk_rcvbuf = TIPC_FLOW_CONTROL_WIN * 2 * TIPC_MAX_USER_MSG_SIZE * 2;
219 sk->sk_data_ready = tipc_data_ready;
220 sk->sk_write_space = tipc_write_space;
224 tipc_sk(sk)->p = tp_ptr; 221 tipc_sk(sk)->p = tp_ptr;
225 tipc_sk(sk)->conn_timeout = CONN_TIMEOUT_DEFAULT; 222 tipc_sk(sk)->conn_timeout = CONN_TIMEOUT_DEFAULT;
226 223
@@ -276,7 +273,6 @@ static int release(struct socket *sock)
276 buf = __skb_dequeue(&sk->sk_receive_queue); 273 buf = __skb_dequeue(&sk->sk_receive_queue);
277 if (buf == NULL) 274 if (buf == NULL)
278 break; 275 break;
279 atomic_dec(&tipc_queue_size);
280 if (TIPC_SKB_CB(buf)->handle != 0) 276 if (TIPC_SKB_CB(buf)->handle != 0)
281 kfree_skb(buf); 277 kfree_skb(buf);
282 else { 278 else {
@@ -408,7 +404,7 @@ static int get_name(struct socket *sock, struct sockaddr *uaddr,
408 * socket state flags set 404 * socket state flags set
409 * ------------ --------- 405 * ------------ ---------
410 * unconnected no read flags 406 * unconnected no read flags
411 * no write flags 407 * POLLOUT if port is not congested
412 * 408 *
413 * connecting POLLIN/POLLRDNORM if ACK/NACK in rx queue 409 * connecting POLLIN/POLLRDNORM if ACK/NACK in rx queue
414 * no write flags 410 * no write flags
@@ -435,9 +431,13 @@ static unsigned int poll(struct file *file, struct socket *sock,
435 struct sock *sk = sock->sk; 431 struct sock *sk = sock->sk;
436 u32 mask = 0; 432 u32 mask = 0;
437 433
438 poll_wait(file, sk_sleep(sk), wait); 434 sock_poll_wait(file, sk_sleep(sk), wait);
439 435
440 switch ((int)sock->state) { 436 switch ((int)sock->state) {
437 case SS_UNCONNECTED:
438 if (!tipc_sk_port(sk)->congested)
439 mask |= POLLOUT;
440 break;
441 case SS_READY: 441 case SS_READY:
442 case SS_CONNECTED: 442 case SS_CONNECTED:
443 if (!tipc_sk_port(sk)->congested) 443 if (!tipc_sk_port(sk)->congested)
@@ -775,16 +775,19 @@ exit:
775static int auto_connect(struct socket *sock, struct tipc_msg *msg) 775static int auto_connect(struct socket *sock, struct tipc_msg *msg)
776{ 776{
777 struct tipc_sock *tsock = tipc_sk(sock->sk); 777 struct tipc_sock *tsock = tipc_sk(sock->sk);
778 778 struct tipc_port *p_ptr;
779 if (msg_errcode(msg)) {
780 sock->state = SS_DISCONNECTING;
781 return -ECONNREFUSED;
782 }
783 779
784 tsock->peer_name.ref = msg_origport(msg); 780 tsock->peer_name.ref = msg_origport(msg);
785 tsock->peer_name.node = msg_orignode(msg); 781 tsock->peer_name.node = msg_orignode(msg);
786 tipc_connect2port(tsock->p->ref, &tsock->peer_name); 782 p_ptr = tipc_port_deref(tsock->p->ref);
787 tipc_set_portimportance(tsock->p->ref, msg_importance(msg)); 783 if (!p_ptr)
784 return -EINVAL;
785
786 __tipc_connect(tsock->p->ref, p_ptr, &tsock->peer_name);
787
788 if (msg_importance(msg) > TIPC_CRITICAL_IMPORTANCE)
789 return -EINVAL;
790 msg_set_importance(&p_ptr->phdr, (u32)msg_importance(msg));
788 sock->state = SS_CONNECTED; 791 sock->state = SS_CONNECTED;
789 return 0; 792 return 0;
790} 793}
@@ -943,13 +946,6 @@ restart:
943 sz = msg_data_sz(msg); 946 sz = msg_data_sz(msg);
944 err = msg_errcode(msg); 947 err = msg_errcode(msg);
945 948
946 /* Complete connection setup for an implied connect */
947 if (unlikely(sock->state == SS_CONNECTING)) {
948 res = auto_connect(sock, msg);
949 if (res)
950 goto exit;
951 }
952
953 /* Discard an empty non-errored message & try again */ 949 /* Discard an empty non-errored message & try again */
954 if ((!sz) && (!err)) { 950 if ((!sz) && (!err)) {
955 advance_rx_queue(sk); 951 advance_rx_queue(sk);
@@ -1126,6 +1122,39 @@ exit:
1126} 1122}
1127 1123
1128/** 1124/**
1125 * tipc_write_space - wake up thread if port congestion is released
1126 * @sk: socket
1127 */
1128static void tipc_write_space(struct sock *sk)
1129{
1130 struct socket_wq *wq;
1131
1132 rcu_read_lock();
1133 wq = rcu_dereference(sk->sk_wq);
1134 if (wq_has_sleeper(wq))
1135 wake_up_interruptible_sync_poll(&wq->wait, POLLOUT |
1136 POLLWRNORM | POLLWRBAND);
1137 rcu_read_unlock();
1138}
1139
1140/**
1141 * tipc_data_ready - wake up threads to indicate messages have been received
1142 * @sk: socket
1143 * @len: the length of messages
1144 */
1145static void tipc_data_ready(struct sock *sk, int len)
1146{
1147 struct socket_wq *wq;
1148
1149 rcu_read_lock();
1150 wq = rcu_dereference(sk->sk_wq);
1151 if (wq_has_sleeper(wq))
1152 wake_up_interruptible_sync_poll(&wq->wait, POLLIN |
1153 POLLRDNORM | POLLRDBAND);
1154 rcu_read_unlock();
1155}
1156
1157/**
1129 * rx_queue_full - determine if receive queue can accept another message 1158 * rx_queue_full - determine if receive queue can accept another message
1130 * @msg: message to be added to queue 1159 * @msg: message to be added to queue
1131 * @queue_size: current size of queue 1160 * @queue_size: current size of queue
@@ -1154,6 +1183,83 @@ static int rx_queue_full(struct tipc_msg *msg, u32 queue_size, u32 base)
1154} 1183}
1155 1184
1156/** 1185/**
1186 * filter_connect - Handle all incoming messages for a connection-based socket
1187 * @tsock: TIPC socket
1188 * @msg: message
1189 *
1190 * Returns TIPC error status code and socket error status code
1191 * once it encounters some errors
1192 */
1193static u32 filter_connect(struct tipc_sock *tsock, struct sk_buff **buf)
1194{
1195 struct socket *sock = tsock->sk.sk_socket;
1196 struct tipc_msg *msg = buf_msg(*buf);
1197 struct sock *sk = &tsock->sk;
1198 u32 retval = TIPC_ERR_NO_PORT;
1199 int res;
1200
1201 if (msg_mcast(msg))
1202 return retval;
1203
1204 switch ((int)sock->state) {
1205 case SS_CONNECTED:
1206 /* Accept only connection-based messages sent by peer */
1207 if (msg_connected(msg) && tipc_port_peer_msg(tsock->p, msg)) {
1208 if (unlikely(msg_errcode(msg))) {
1209 sock->state = SS_DISCONNECTING;
1210 __tipc_disconnect(tsock->p);
1211 }
1212 retval = TIPC_OK;
1213 }
1214 break;
1215 case SS_CONNECTING:
1216 /* Accept only ACK or NACK message */
1217 if (unlikely(msg_errcode(msg))) {
1218 sock->state = SS_DISCONNECTING;
1219 sk->sk_err = -ECONNREFUSED;
1220 retval = TIPC_OK;
1221 break;
1222 }
1223
1224 if (unlikely(!msg_connected(msg)))
1225 break;
1226
1227 res = auto_connect(sock, msg);
1228 if (res) {
1229 sock->state = SS_DISCONNECTING;
1230 sk->sk_err = res;
1231 retval = TIPC_OK;
1232 break;
1233 }
1234
1235 /* If an incoming message is an 'ACK-', it should be
1236 * discarded here because it doesn't contain useful
1237 * data. In addition, we should try to wake up
1238 * connect() routine if sleeping.
1239 */
1240 if (msg_data_sz(msg) == 0) {
1241 kfree_skb(*buf);
1242 *buf = NULL;
1243 if (waitqueue_active(sk_sleep(sk)))
1244 wake_up_interruptible(sk_sleep(sk));
1245 }
1246 retval = TIPC_OK;
1247 break;
1248 case SS_LISTENING:
1249 case SS_UNCONNECTED:
1250 /* Accept only SYN message */
1251 if (!msg_connected(msg) && !(msg_errcode(msg)))
1252 retval = TIPC_OK;
1253 break;
1254 case SS_DISCONNECTING:
1255 break;
1256 default:
1257 pr_err("Unknown socket state %u\n", sock->state);
1258 }
1259 return retval;
1260}
1261
1262/**
1157 * filter_rcv - validate incoming message 1263 * filter_rcv - validate incoming message
1158 * @sk: socket 1264 * @sk: socket
1159 * @buf: message 1265 * @buf: message
@@ -1170,6 +1276,7 @@ static u32 filter_rcv(struct sock *sk, struct sk_buff *buf)
1170 struct socket *sock = sk->sk_socket; 1276 struct socket *sock = sk->sk_socket;
1171 struct tipc_msg *msg = buf_msg(buf); 1277 struct tipc_msg *msg = buf_msg(buf);
1172 u32 recv_q_len; 1278 u32 recv_q_len;
1279 u32 res = TIPC_OK;
1173 1280
1174 /* Reject message if it is wrong sort of message for socket */ 1281 /* Reject message if it is wrong sort of message for socket */
1175 if (msg_type(msg) > TIPC_DIRECT_MSG) 1282 if (msg_type(msg) > TIPC_DIRECT_MSG)
@@ -1179,32 +1286,12 @@ static u32 filter_rcv(struct sock *sk, struct sk_buff *buf)
1179 if (msg_connected(msg)) 1286 if (msg_connected(msg))
1180 return TIPC_ERR_NO_PORT; 1287 return TIPC_ERR_NO_PORT;
1181 } else { 1288 } else {
1182 if (msg_mcast(msg)) 1289 res = filter_connect(tipc_sk(sk), &buf);
1183 return TIPC_ERR_NO_PORT; 1290 if (res != TIPC_OK || buf == NULL)
1184 if (sock->state == SS_CONNECTED) { 1291 return res;
1185 if (!msg_connected(msg) ||
1186 !tipc_port_peer_msg(tipc_sk_port(sk), msg))
1187 return TIPC_ERR_NO_PORT;
1188 } else if (sock->state == SS_CONNECTING) {
1189 if (!msg_connected(msg) && (msg_errcode(msg) == 0))
1190 return TIPC_ERR_NO_PORT;
1191 } else if (sock->state == SS_LISTENING) {
1192 if (msg_connected(msg) || msg_errcode(msg))
1193 return TIPC_ERR_NO_PORT;
1194 } else if (sock->state == SS_DISCONNECTING) {
1195 return TIPC_ERR_NO_PORT;
1196 } else /* (sock->state == SS_UNCONNECTED) */ {
1197 if (msg_connected(msg) || msg_errcode(msg))
1198 return TIPC_ERR_NO_PORT;
1199 }
1200 } 1292 }
1201 1293
1202 /* Reject message if there isn't room to queue it */ 1294 /* Reject message if there isn't room to queue it */
1203 recv_q_len = (u32)atomic_read(&tipc_queue_size);
1204 if (unlikely(recv_q_len >= OVERLOAD_LIMIT_BASE)) {
1205 if (rx_queue_full(msg, recv_q_len, OVERLOAD_LIMIT_BASE))
1206 return TIPC_ERR_OVERLOAD;
1207 }
1208 recv_q_len = skb_queue_len(&sk->sk_receive_queue); 1295 recv_q_len = skb_queue_len(&sk->sk_receive_queue);
1209 if (unlikely(recv_q_len >= (OVERLOAD_LIMIT_BASE / 2))) { 1296 if (unlikely(recv_q_len >= (OVERLOAD_LIMIT_BASE / 2))) {
1210 if (rx_queue_full(msg, recv_q_len, OVERLOAD_LIMIT_BASE / 2)) 1297 if (rx_queue_full(msg, recv_q_len, OVERLOAD_LIMIT_BASE / 2))
@@ -1213,17 +1300,9 @@ static u32 filter_rcv(struct sock *sk, struct sk_buff *buf)
1213 1300
1214 /* Enqueue message (finally!) */ 1301 /* Enqueue message (finally!) */
1215 TIPC_SKB_CB(buf)->handle = 0; 1302 TIPC_SKB_CB(buf)->handle = 0;
1216 atomic_inc(&tipc_queue_size);
1217 __skb_queue_tail(&sk->sk_receive_queue, buf); 1303 __skb_queue_tail(&sk->sk_receive_queue, buf);
1218 1304
1219 /* Initiate connection termination for an incoming 'FIN' */ 1305 sk->sk_data_ready(sk, 0);
1220 if (unlikely(msg_errcode(msg) && (sock->state == SS_CONNECTED))) {
1221 sock->state = SS_DISCONNECTING;
1222 tipc_disconnect_port(tipc_sk_port(sk));
1223 }
1224
1225 if (waitqueue_active(sk_sleep(sk)))
1226 wake_up_interruptible(sk_sleep(sk));
1227 return TIPC_OK; 1306 return TIPC_OK;
1228} 1307}
1229 1308
@@ -1290,8 +1369,7 @@ static void wakeupdispatch(struct tipc_port *tport)
1290{ 1369{
1291 struct sock *sk = (struct sock *)tport->usr_handle; 1370 struct sock *sk = (struct sock *)tport->usr_handle;
1292 1371
1293 if (waitqueue_active(sk_sleep(sk))) 1372 sk->sk_write_space(sk);
1294 wake_up_interruptible(sk_sleep(sk));
1295} 1373}
1296 1374
1297/** 1375/**
@@ -1309,8 +1387,6 @@ static int connect(struct socket *sock, struct sockaddr *dest, int destlen,
1309 struct sock *sk = sock->sk; 1387 struct sock *sk = sock->sk;
1310 struct sockaddr_tipc *dst = (struct sockaddr_tipc *)dest; 1388 struct sockaddr_tipc *dst = (struct sockaddr_tipc *)dest;
1311 struct msghdr m = {NULL,}; 1389 struct msghdr m = {NULL,};
1312 struct sk_buff *buf;
1313 struct tipc_msg *msg;
1314 unsigned int timeout; 1390 unsigned int timeout;
1315 int res; 1391 int res;
1316 1392
@@ -1322,26 +1398,6 @@ static int connect(struct socket *sock, struct sockaddr *dest, int destlen,
1322 goto exit; 1398 goto exit;
1323 } 1399 }
1324 1400
1325 /* For now, TIPC does not support the non-blocking form of connect() */
1326 if (flags & O_NONBLOCK) {
1327 res = -EOPNOTSUPP;
1328 goto exit;
1329 }
1330
1331 /* Issue Posix-compliant error code if socket is in the wrong state */
1332 if (sock->state == SS_LISTENING) {
1333 res = -EOPNOTSUPP;
1334 goto exit;
1335 }
1336 if (sock->state == SS_CONNECTING) {
1337 res = -EALREADY;
1338 goto exit;
1339 }
1340 if (sock->state != SS_UNCONNECTED) {
1341 res = -EISCONN;
1342 goto exit;
1343 }
1344
1345 /* 1401 /*
1346 * Reject connection attempt using multicast address 1402 * Reject connection attempt using multicast address
1347 * 1403 *
@@ -1353,49 +1409,66 @@ static int connect(struct socket *sock, struct sockaddr *dest, int destlen,
1353 goto exit; 1409 goto exit;
1354 } 1410 }
1355 1411
1356 /* Reject any messages already in receive queue (very unlikely) */ 1412 timeout = (flags & O_NONBLOCK) ? 0 : tipc_sk(sk)->conn_timeout;
1357 reject_rx_queue(sk);
1358 1413
1359 /* Send a 'SYN-' to destination */ 1414 switch (sock->state) {
1360 m.msg_name = dest; 1415 case SS_UNCONNECTED:
1361 m.msg_namelen = destlen; 1416 /* Send a 'SYN-' to destination */
1362 res = send_msg(NULL, sock, &m, 0); 1417 m.msg_name = dest;
1363 if (res < 0) 1418 m.msg_namelen = destlen;
1419
1420 /* If connect is in non-blocking case, set MSG_DONTWAIT to
1421 * indicate send_msg() is never blocked.
1422 */
1423 if (!timeout)
1424 m.msg_flags = MSG_DONTWAIT;
1425
1426 res = send_msg(NULL, sock, &m, 0);
1427 if ((res < 0) && (res != -EWOULDBLOCK))
1428 goto exit;
1429
1430 /* Just entered SS_CONNECTING state; the only
1431 * difference is that return value in non-blocking
1432 * case is EINPROGRESS, rather than EALREADY.
1433 */
1434 res = -EINPROGRESS;
1435 break;
1436 case SS_CONNECTING:
1437 res = -EALREADY;
1438 break;
1439 case SS_CONNECTED:
1440 res = -EISCONN;
1441 break;
1442 default:
1443 res = -EINVAL;
1364 goto exit; 1444 goto exit;
1445 }
1365 1446
1366 /* Wait until an 'ACK' or 'RST' arrives, or a timeout occurs */ 1447 if (sock->state == SS_CONNECTING) {
1367 timeout = tipc_sk(sk)->conn_timeout; 1448 if (!timeout)
1368 release_sock(sk); 1449 goto exit;
1369 res = wait_event_interruptible_timeout(*sk_sleep(sk),
1370 (!skb_queue_empty(&sk->sk_receive_queue) ||
1371 (sock->state != SS_CONNECTING)),
1372 timeout ? (long)msecs_to_jiffies(timeout)
1373 : MAX_SCHEDULE_TIMEOUT);
1374 lock_sock(sk);
1375 1450
1376 if (res > 0) { 1451 /* Wait until an 'ACK' or 'RST' arrives, or a timeout occurs */
1377 buf = skb_peek(&sk->sk_receive_queue); 1452 release_sock(sk);
1378 if (buf != NULL) { 1453 res = wait_event_interruptible_timeout(*sk_sleep(sk),
1379 msg = buf_msg(buf); 1454 sock->state != SS_CONNECTING,
1380 res = auto_connect(sock, msg); 1455 timeout ? (long)msecs_to_jiffies(timeout)
1381 if (!res) { 1456 : MAX_SCHEDULE_TIMEOUT);
1382 if (!msg_data_sz(msg)) 1457 lock_sock(sk);
1383 advance_rx_queue(sk); 1458 if (res <= 0) {
1384 } 1459 if (res == 0)
1385 } else { 1460 res = -ETIMEDOUT;
1386 if (sock->state == SS_CONNECTED)
1387 res = -EISCONN;
1388 else 1461 else
1389 res = -ECONNREFUSED; 1462 ; /* leave "res" unchanged */
1463 goto exit;
1390 } 1464 }
1391 } else {
1392 if (res == 0)
1393 res = -ETIMEDOUT;
1394 else
1395 ; /* leave "res" unchanged */
1396 sock->state = SS_DISCONNECTING;
1397 } 1465 }
1398 1466
1467 if (unlikely(sock->state == SS_DISCONNECTING))
1468 res = sock_error(sk);
1469 else
1470 res = 0;
1471
1399exit: 1472exit:
1400 release_sock(sk); 1473 release_sock(sk);
1401 return res; 1474 return res;
@@ -1436,8 +1509,13 @@ static int listen(struct socket *sock, int len)
1436 */ 1509 */
1437static int accept(struct socket *sock, struct socket *new_sock, int flags) 1510static int accept(struct socket *sock, struct socket *new_sock, int flags)
1438{ 1511{
1439 struct sock *sk = sock->sk; 1512 struct sock *new_sk, *sk = sock->sk;
1440 struct sk_buff *buf; 1513 struct sk_buff *buf;
1514 struct tipc_sock *new_tsock;
1515 struct tipc_port *new_tport;
1516 struct tipc_msg *msg;
1517 u32 new_ref;
1518
1441 int res; 1519 int res;
1442 1520
1443 lock_sock(sk); 1521 lock_sock(sk);
@@ -1463,48 +1541,51 @@ static int accept(struct socket *sock, struct socket *new_sock, int flags)
1463 buf = skb_peek(&sk->sk_receive_queue); 1541 buf = skb_peek(&sk->sk_receive_queue);
1464 1542
1465 res = tipc_create(sock_net(sock->sk), new_sock, 0, 0); 1543 res = tipc_create(sock_net(sock->sk), new_sock, 0, 0);
1466 if (!res) { 1544 if (res)
1467 struct sock *new_sk = new_sock->sk; 1545 goto exit;
1468 struct tipc_sock *new_tsock = tipc_sk(new_sk);
1469 struct tipc_port *new_tport = new_tsock->p;
1470 u32 new_ref = new_tport->ref;
1471 struct tipc_msg *msg = buf_msg(buf);
1472
1473 lock_sock(new_sk);
1474
1475 /*
1476 * Reject any stray messages received by new socket
1477 * before the socket lock was taken (very, very unlikely)
1478 */
1479 reject_rx_queue(new_sk);
1480
1481 /* Connect new socket to it's peer */
1482 new_tsock->peer_name.ref = msg_origport(msg);
1483 new_tsock->peer_name.node = msg_orignode(msg);
1484 tipc_connect2port(new_ref, &new_tsock->peer_name);
1485 new_sock->state = SS_CONNECTED;
1486
1487 tipc_set_portimportance(new_ref, msg_importance(msg));
1488 if (msg_named(msg)) {
1489 new_tport->conn_type = msg_nametype(msg);
1490 new_tport->conn_instance = msg_nameinst(msg);
1491 }
1492 1546
1493 /* 1547 new_sk = new_sock->sk;
1494 * Respond to 'SYN-' by discarding it & returning 'ACK'-. 1548 new_tsock = tipc_sk(new_sk);
1495 * Respond to 'SYN+' by queuing it on new socket. 1549 new_tport = new_tsock->p;
1496 */ 1550 new_ref = new_tport->ref;
1497 if (!msg_data_sz(msg)) { 1551 msg = buf_msg(buf);
1498 struct msghdr m = {NULL,};
1499 1552
1500 advance_rx_queue(sk); 1553 /* we lock on new_sk; but lockdep sees the lock on sk */
1501 send_packet(NULL, new_sock, &m, 0); 1554 lock_sock_nested(new_sk, SINGLE_DEPTH_NESTING);
1502 } else { 1555
1503 __skb_dequeue(&sk->sk_receive_queue); 1556 /*
1504 __skb_queue_head(&new_sk->sk_receive_queue, buf); 1557 * Reject any stray messages received by new socket
1505 } 1558 * before the socket lock was taken (very, very unlikely)
1506 release_sock(new_sk); 1559 */
1560 reject_rx_queue(new_sk);
1561
1562 /* Connect new socket to it's peer */
1563 new_tsock->peer_name.ref = msg_origport(msg);
1564 new_tsock->peer_name.node = msg_orignode(msg);
1565 tipc_connect(new_ref, &new_tsock->peer_name);
1566 new_sock->state = SS_CONNECTED;
1567
1568 tipc_set_portimportance(new_ref, msg_importance(msg));
1569 if (msg_named(msg)) {
1570 new_tport->conn_type = msg_nametype(msg);
1571 new_tport->conn_instance = msg_nameinst(msg);
1507 } 1572 }
1573
1574 /*
1575 * Respond to 'SYN-' by discarding it & returning 'ACK'-.
1576 * Respond to 'SYN+' by queuing it on new socket.
1577 */
1578 if (!msg_data_sz(msg)) {
1579 struct msghdr m = {NULL,};
1580
1581 advance_rx_queue(sk);
1582 send_packet(NULL, new_sock, &m, 0);
1583 } else {
1584 __skb_dequeue(&sk->sk_receive_queue);
1585 __skb_queue_head(&new_sk->sk_receive_queue, buf);
1586 }
1587 release_sock(new_sk);
1588
1508exit: 1589exit:
1509 release_sock(sk); 1590 release_sock(sk);
1510 return res; 1591 return res;
@@ -1539,7 +1620,6 @@ restart:
1539 /* Disconnect and send a 'FIN+' or 'FIN-' message to peer */ 1620 /* Disconnect and send a 'FIN+' or 'FIN-' message to peer */
1540 buf = __skb_dequeue(&sk->sk_receive_queue); 1621 buf = __skb_dequeue(&sk->sk_receive_queue);
1541 if (buf) { 1622 if (buf) {
1542 atomic_dec(&tipc_queue_size);
1543 if (TIPC_SKB_CB(buf)->handle != 0) { 1623 if (TIPC_SKB_CB(buf)->handle != 0) {
1544 kfree_skb(buf); 1624 kfree_skb(buf);
1545 goto restart; 1625 goto restart;
@@ -1556,10 +1636,11 @@ restart:
1556 1636
1557 case SS_DISCONNECTING: 1637 case SS_DISCONNECTING:
1558 1638
1559 /* Discard any unreceived messages; wake up sleeping tasks */ 1639 /* Discard any unreceived messages */
1560 discard_rx_queue(sk); 1640 discard_rx_queue(sk);
1561 if (waitqueue_active(sk_sleep(sk))) 1641
1562 wake_up_interruptible(sk_sleep(sk)); 1642 /* Wake up anyone sleeping in poll */
1643 sk->sk_state_change(sk);
1563 res = 0; 1644 res = 0;
1564 break; 1645 break;
1565 1646
@@ -1677,7 +1758,7 @@ static int getsockopt(struct socket *sock,
1677 /* no need to set "res", since already 0 at this point */ 1758 /* no need to set "res", since already 0 at this point */
1678 break; 1759 break;
1679 case TIPC_NODE_RECVQ_DEPTH: 1760 case TIPC_NODE_RECVQ_DEPTH:
1680 value = (u32)atomic_read(&tipc_queue_size); 1761 value = 0; /* was tipc_queue_size, now obsolete */
1681 break; 1762 break;
1682 case TIPC_SOCK_RECVQ_DEPTH: 1763 case TIPC_SOCK_RECVQ_DEPTH:
1683 value = skb_queue_len(&sk->sk_receive_queue); 1764 value = skb_queue_len(&sk->sk_receive_queue);
diff --git a/net/tipc/subscr.c b/net/tipc/subscr.c
index 0f7d0d007e22..6b42d47029af 100644
--- a/net/tipc/subscr.c
+++ b/net/tipc/subscr.c
@@ -462,7 +462,7 @@ static void subscr_named_msg_event(void *usr_handle,
462 kfree(subscriber); 462 kfree(subscriber);
463 return; 463 return;
464 } 464 }
465 tipc_connect2port(subscriber->port_ref, orig); 465 tipc_connect(subscriber->port_ref, orig);
466 466
467 /* Lock server port (& save lock address for future use) */ 467 /* Lock server port (& save lock address for future use) */
468 subscriber->lock = tipc_port_lock(subscriber->port_ref)->lock; 468 subscriber->lock = tipc_port_lock(subscriber->port_ref)->lock;