aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc
diff options
context:
space:
mode:
Diffstat (limited to 'net/tipc')
-rw-r--r--net/tipc/Kconfig17
-rw-r--r--net/tipc/bcast.c28
-rw-r--r--net/tipc/bearer.c110
-rw-r--r--net/tipc/bearer.h24
-rw-r--r--net/tipc/core.c5
-rw-r--r--net/tipc/discover.c2
-rw-r--r--net/tipc/handler.c1
-rw-r--r--net/tipc/link.c232
-rw-r--r--net/tipc/link.h4
-rw-r--r--net/tipc/name_distr.c2
-rw-r--r--net/tipc/name_table.c8
-rw-r--r--net/tipc/node.c18
-rw-r--r--net/tipc/node.h6
-rw-r--r--net/tipc/port.c32
-rw-r--r--net/tipc/port.h6
-rw-r--r--net/tipc/socket.c501
-rw-r--r--net/tipc/subscr.c2
17 files changed, 458 insertions, 540 deletions
diff --git a/net/tipc/Kconfig b/net/tipc/Kconfig
index 585460180ffb..4f99600a5fed 100644
--- a/net/tipc/Kconfig
+++ b/net/tipc/Kconfig
@@ -3,8 +3,8 @@
3# 3#
4 4
5menuconfig TIPC 5menuconfig TIPC
6 tristate "The TIPC Protocol (EXPERIMENTAL)" 6 tristate "The TIPC Protocol"
7 depends on INET && EXPERIMENTAL 7 depends on INET
8 ---help--- 8 ---help---
9 The Transparent Inter Process Communication (TIPC) protocol is 9 The Transparent Inter Process Communication (TIPC) protocol is
10 specially designed for intra cluster communication. This protocol 10 specially designed for intra cluster communication. This protocol
@@ -20,18 +20,9 @@ menuconfig TIPC
20 20
21 If in doubt, say N. 21 If in doubt, say N.
22 22
23if TIPC
24
25config TIPC_ADVANCED
26 bool "Advanced TIPC configuration"
27 default n
28 help
29 Saying Y here will open some advanced configuration for TIPC.
30 Most users do not need to bother; if unsure, just say N.
31
32config TIPC_PORTS 23config TIPC_PORTS
33 int "Maximum number of ports in a node" 24 int "Maximum number of ports in a node"
34 depends on TIPC_ADVANCED 25 depends on TIPC
35 range 127 65535 26 range 127 65535
36 default "8191" 27 default "8191"
37 help 28 help
@@ -40,5 +31,3 @@ config TIPC_PORTS
40 31
41 Setting this to a smaller value saves some memory, 32 Setting this to a smaller value saves some memory,
42 setting it to higher allows for more ports. 33 setting it to higher allows for more ports.
43
44endif # TIPC
diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index e4e6d8cd47e6..2655c9f4ecad 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -347,7 +347,7 @@ static void bclink_peek_nack(struct tipc_msg *msg)
347 347
348 tipc_node_lock(n_ptr); 348 tipc_node_lock(n_ptr);
349 349
350 if (n_ptr->bclink.supported && 350 if (n_ptr->bclink.recv_permitted &&
351 (n_ptr->bclink.last_in != n_ptr->bclink.last_sent) && 351 (n_ptr->bclink.last_in != n_ptr->bclink.last_sent) &&
352 (n_ptr->bclink.last_in == msg_bcgap_after(msg))) 352 (n_ptr->bclink.last_in == msg_bcgap_after(msg)))
353 n_ptr->bclink.oos_state = 2; 353 n_ptr->bclink.oos_state = 2;
@@ -429,7 +429,7 @@ void tipc_bclink_recv_pkt(struct sk_buff *buf)
429 goto exit; 429 goto exit;
430 430
431 tipc_node_lock(node); 431 tipc_node_lock(node);
432 if (unlikely(!node->bclink.supported)) 432 if (unlikely(!node->bclink.recv_permitted))
433 goto unlock; 433 goto unlock;
434 434
435 /* Handle broadcast protocol message */ 435 /* Handle broadcast protocol message */
@@ -564,7 +564,7 @@ exit:
564 564
565u32 tipc_bclink_acks_missing(struct tipc_node *n_ptr) 565u32 tipc_bclink_acks_missing(struct tipc_node *n_ptr)
566{ 566{
567 return (n_ptr->bclink.supported && 567 return (n_ptr->bclink.recv_permitted &&
568 (tipc_bclink_get_last_sent() != n_ptr->bclink.acked)); 568 (tipc_bclink_get_last_sent() != n_ptr->bclink.acked));
569} 569}
570 570
@@ -619,16 +619,14 @@ static int tipc_bcbearer_send(struct sk_buff *buf,
619 if (bcbearer->remains_new.count == bcbearer->remains.count) 619 if (bcbearer->remains_new.count == bcbearer->remains.count)
620 continue; /* bearer pair doesn't add anything */ 620 continue; /* bearer pair doesn't add anything */
621 621
622 if (p->blocked || 622 if (!tipc_bearer_blocked(p))
623 p->media->send_msg(buf, p, &p->media->bcast_addr)) { 623 tipc_bearer_send(p, buf, &p->media->bcast_addr);
624 else if (s && !tipc_bearer_blocked(s))
624 /* unable to send on primary bearer */ 625 /* unable to send on primary bearer */
625 if (!s || s->blocked || 626 tipc_bearer_send(s, buf, &s->media->bcast_addr);
626 s->media->send_msg(buf, s, 627 else
627 &s->media->bcast_addr)) { 628 /* unable to send on either bearer */
628 /* unable to send on either bearer */ 629 continue;
629 continue;
630 }
631 }
632 630
633 if (s) { 631 if (s) {
634 bcbearer->bpairs[bp_index].primary = s; 632 bcbearer->bpairs[bp_index].primary = s;
@@ -731,8 +729,8 @@ int tipc_bclink_stats(char *buf, const u32 buf_size)
731 " TX naks:%u acks:%u dups:%u\n", 729 " TX naks:%u acks:%u dups:%u\n",
732 s->sent_nacks, s->sent_acks, s->retransmitted); 730 s->sent_nacks, s->sent_acks, s->retransmitted);
733 ret += tipc_snprintf(buf + ret, buf_size - ret, 731 ret += tipc_snprintf(buf + ret, buf_size - ret,
734 " Congestion bearer:%u link:%u Send queue max:%u avg:%u\n", 732 " Congestion link:%u Send queue max:%u avg:%u\n",
735 s->bearer_congs, s->link_congs, s->max_queue_sz, 733 s->link_congs, s->max_queue_sz,
736 s->queue_sz_counts ? 734 s->queue_sz_counts ?
737 (s->accu_queue_sz / s->queue_sz_counts) : 0); 735 (s->accu_queue_sz / s->queue_sz_counts) : 0);
738 736
@@ -766,7 +764,6 @@ int tipc_bclink_set_queue_limits(u32 limit)
766 764
767void tipc_bclink_init(void) 765void tipc_bclink_init(void)
768{ 766{
769 INIT_LIST_HEAD(&bcbearer->bearer.cong_links);
770 bcbearer->bearer.media = &bcbearer->media; 767 bcbearer->bearer.media = &bcbearer->media;
771 bcbearer->media.send_msg = tipc_bcbearer_send; 768 bcbearer->media.send_msg = tipc_bcbearer_send;
772 sprintf(bcbearer->media.name, "tipc-broadcast"); 769 sprintf(bcbearer->media.name, "tipc-broadcast");
@@ -777,6 +774,7 @@ void tipc_bclink_init(void)
777 bcl->owner = &bclink->node; 774 bcl->owner = &bclink->node;
778 bcl->max_pkt = MAX_PKT_DEFAULT_MCAST; 775 bcl->max_pkt = MAX_PKT_DEFAULT_MCAST;
779 tipc_link_set_queue_limits(bcl, BCLINK_WIN_DEFAULT); 776 tipc_link_set_queue_limits(bcl, BCLINK_WIN_DEFAULT);
777 spin_lock_init(&bcbearer->bearer.lock);
780 bcl->b_ptr = &bcbearer->bearer; 778 bcl->b_ptr = &bcbearer->bearer;
781 bcl->state = WORKING_WORKING; 779 bcl->state = WORKING_WORKING;
782 strlcpy(bcl->name, tipc_bclink_name, TIPC_MAX_LINK_NAME); 780 strlcpy(bcl->name, tipc_bclink_name, TIPC_MAX_LINK_NAME);
diff --git a/net/tipc/bearer.c b/net/tipc/bearer.c
index 4ec5c80e8a7c..aa62f93a9127 100644
--- a/net/tipc/bearer.c
+++ b/net/tipc/bearer.c
@@ -279,116 +279,31 @@ void tipc_bearer_remove_dest(struct tipc_bearer *b_ptr, u32 dest)
279} 279}
280 280
281/* 281/*
282 * bearer_push(): Resolve bearer congestion. Force the waiting 282 * Interrupt enabling new requests after bearer blocking:
283 * links to push out their unsent packets, one packet per link
284 * per iteration, until all packets are gone or congestion reoccurs.
285 * 'tipc_net_lock' is read_locked when this function is called
286 * bearer.lock must be taken before calling
287 * Returns binary true(1) ore false(0)
288 */
289static int bearer_push(struct tipc_bearer *b_ptr)
290{
291 u32 res = 0;
292 struct tipc_link *ln, *tln;
293
294 if (b_ptr->blocked)
295 return 0;
296
297 while (!list_empty(&b_ptr->cong_links) && (res != PUSH_FAILED)) {
298 list_for_each_entry_safe(ln, tln, &b_ptr->cong_links, link_list) {
299 res = tipc_link_push_packet(ln);
300 if (res == PUSH_FAILED)
301 break;
302 if (res == PUSH_FINISHED)
303 list_move_tail(&ln->link_list, &b_ptr->links);
304 }
305 }
306 return list_empty(&b_ptr->cong_links);
307}
308
309void tipc_bearer_lock_push(struct tipc_bearer *b_ptr)
310{
311 spin_lock_bh(&b_ptr->lock);
312 bearer_push(b_ptr);
313 spin_unlock_bh(&b_ptr->lock);
314}
315
316
317/*
318 * Interrupt enabling new requests after bearer congestion or blocking:
319 * See bearer_send(). 283 * See bearer_send().
320 */ 284 */
321void tipc_continue(struct tipc_bearer *b_ptr) 285void tipc_continue(struct tipc_bearer *b)
322{ 286{
323 spin_lock_bh(&b_ptr->lock); 287 spin_lock_bh(&b->lock);
324 if (!list_empty(&b_ptr->cong_links)) 288 b->blocked = 0;
325 tipc_k_signal((Handler)tipc_bearer_lock_push, (unsigned long)b_ptr); 289 spin_unlock_bh(&b->lock);
326 b_ptr->blocked = 0;
327 spin_unlock_bh(&b_ptr->lock);
328} 290}
329 291
330/* 292/*
331 * Schedule link for sending of messages after the bearer 293 * tipc_bearer_blocked - determines if bearer is currently blocked
332 * has been deblocked by 'continue()'. This method is called
333 * when somebody tries to send a message via this link while
334 * the bearer is congested. 'tipc_net_lock' is in read_lock here
335 * bearer.lock is busy
336 */ 294 */
337static void tipc_bearer_schedule_unlocked(struct tipc_bearer *b_ptr, 295int tipc_bearer_blocked(struct tipc_bearer *b)
338 struct tipc_link *l_ptr)
339{ 296{
340 list_move_tail(&l_ptr->link_list, &b_ptr->cong_links); 297 int res;
341}
342
343/*
344 * Schedule link for sending of messages after the bearer
345 * has been deblocked by 'continue()'. This method is called
346 * when somebody tries to send a message via this link while
347 * the bearer is congested. 'tipc_net_lock' is in read_lock here,
348 * bearer.lock is free
349 */
350void tipc_bearer_schedule(struct tipc_bearer *b_ptr, struct tipc_link *l_ptr)
351{
352 spin_lock_bh(&b_ptr->lock);
353 tipc_bearer_schedule_unlocked(b_ptr, l_ptr);
354 spin_unlock_bh(&b_ptr->lock);
355}
356
357 298
358/* 299 spin_lock_bh(&b->lock);
359 * tipc_bearer_resolve_congestion(): Check if there is bearer congestion, 300 res = b->blocked;
360 * and if there is, try to resolve it before returning. 301 spin_unlock_bh(&b->lock);
361 * 'tipc_net_lock' is read_locked when this function is called
362 */
363int tipc_bearer_resolve_congestion(struct tipc_bearer *b_ptr,
364 struct tipc_link *l_ptr)
365{
366 int res = 1;
367 302
368 if (list_empty(&b_ptr->cong_links))
369 return 1;
370 spin_lock_bh(&b_ptr->lock);
371 if (!bearer_push(b_ptr)) {
372 tipc_bearer_schedule_unlocked(b_ptr, l_ptr);
373 res = 0;
374 }
375 spin_unlock_bh(&b_ptr->lock);
376 return res; 303 return res;
377} 304}
378 305
379/** 306/**
380 * tipc_bearer_congested - determines if bearer is currently congested
381 */
382int tipc_bearer_congested(struct tipc_bearer *b_ptr, struct tipc_link *l_ptr)
383{
384 if (unlikely(b_ptr->blocked))
385 return 1;
386 if (likely(list_empty(&b_ptr->cong_links)))
387 return 0;
388 return !tipc_bearer_resolve_congestion(b_ptr, l_ptr);
389}
390
391/**
392 * tipc_enable_bearer - enable bearer with the given name 307 * tipc_enable_bearer - enable bearer with the given name
393 */ 308 */
394int tipc_enable_bearer(const char *name, u32 disc_domain, u32 priority) 309int tipc_enable_bearer(const char *name, u32 disc_domain, u32 priority)
@@ -489,7 +404,6 @@ restart:
489 b_ptr->net_plane = bearer_id + 'A'; 404 b_ptr->net_plane = bearer_id + 'A';
490 b_ptr->active = 1; 405 b_ptr->active = 1;
491 b_ptr->priority = priority; 406 b_ptr->priority = priority;
492 INIT_LIST_HEAD(&b_ptr->cong_links);
493 INIT_LIST_HEAD(&b_ptr->links); 407 INIT_LIST_HEAD(&b_ptr->links);
494 spin_lock_init(&b_ptr->lock); 408 spin_lock_init(&b_ptr->lock);
495 409
@@ -528,7 +442,6 @@ int tipc_block_bearer(const char *name)
528 pr_info("Blocking bearer <%s>\n", name); 442 pr_info("Blocking bearer <%s>\n", name);
529 spin_lock_bh(&b_ptr->lock); 443 spin_lock_bh(&b_ptr->lock);
530 b_ptr->blocked = 1; 444 b_ptr->blocked = 1;
531 list_splice_init(&b_ptr->cong_links, &b_ptr->links);
532 list_for_each_entry_safe(l_ptr, temp_l_ptr, &b_ptr->links, link_list) { 445 list_for_each_entry_safe(l_ptr, temp_l_ptr, &b_ptr->links, link_list) {
533 struct tipc_node *n_ptr = l_ptr->owner; 446 struct tipc_node *n_ptr = l_ptr->owner;
534 447
@@ -555,7 +468,6 @@ static void bearer_disable(struct tipc_bearer *b_ptr)
555 spin_lock_bh(&b_ptr->lock); 468 spin_lock_bh(&b_ptr->lock);
556 b_ptr->blocked = 1; 469 b_ptr->blocked = 1;
557 b_ptr->media->disable_bearer(b_ptr); 470 b_ptr->media->disable_bearer(b_ptr);
558 list_splice_init(&b_ptr->cong_links, &b_ptr->links);
559 list_for_each_entry_safe(l_ptr, temp_l_ptr, &b_ptr->links, link_list) { 471 list_for_each_entry_safe(l_ptr, temp_l_ptr, &b_ptr->links, link_list) {
560 tipc_link_delete(l_ptr); 472 tipc_link_delete(l_ptr);
561 } 473 }
diff --git a/net/tipc/bearer.h b/net/tipc/bearer.h
index dd4c2abf08e7..39f1192d04bf 100644
--- a/net/tipc/bearer.h
+++ b/net/tipc/bearer.h
@@ -120,7 +120,6 @@ struct tipc_media {
120 * @identity: array index of this bearer within TIPC bearer array 120 * @identity: array index of this bearer within TIPC bearer array
121 * @link_req: ptr to (optional) structure making periodic link setup requests 121 * @link_req: ptr to (optional) structure making periodic link setup requests
122 * @links: list of non-congested links associated with bearer 122 * @links: list of non-congested links associated with bearer
123 * @cong_links: list of congested links associated with bearer
124 * @active: non-zero if bearer structure is represents a bearer 123 * @active: non-zero if bearer structure is represents a bearer
125 * @net_plane: network plane ('A' through 'H') currently associated with bearer 124 * @net_plane: network plane ('A' through 'H') currently associated with bearer
126 * @nodes: indicates which nodes in cluster can be reached through bearer 125 * @nodes: indicates which nodes in cluster can be reached through bearer
@@ -143,7 +142,6 @@ struct tipc_bearer {
143 u32 identity; 142 u32 identity;
144 struct tipc_link_req *link_req; 143 struct tipc_link_req *link_req;
145 struct list_head links; 144 struct list_head links;
146 struct list_head cong_links;
147 int active; 145 int active;
148 char net_plane; 146 char net_plane;
149 struct tipc_node_map nodes; 147 struct tipc_node_map nodes;
@@ -185,39 +183,23 @@ struct sk_buff *tipc_media_get_names(void);
185struct sk_buff *tipc_bearer_get_names(void); 183struct sk_buff *tipc_bearer_get_names(void);
186void tipc_bearer_add_dest(struct tipc_bearer *b_ptr, u32 dest); 184void tipc_bearer_add_dest(struct tipc_bearer *b_ptr, u32 dest);
187void tipc_bearer_remove_dest(struct tipc_bearer *b_ptr, u32 dest); 185void tipc_bearer_remove_dest(struct tipc_bearer *b_ptr, u32 dest);
188void tipc_bearer_schedule(struct tipc_bearer *b_ptr, struct tipc_link *l_ptr);
189struct tipc_bearer *tipc_bearer_find(const char *name); 186struct tipc_bearer *tipc_bearer_find(const char *name);
190struct tipc_bearer *tipc_bearer_find_interface(const char *if_name); 187struct tipc_bearer *tipc_bearer_find_interface(const char *if_name);
191struct tipc_media *tipc_media_find(const char *name); 188struct tipc_media *tipc_media_find(const char *name);
192int tipc_bearer_resolve_congestion(struct tipc_bearer *b_ptr, 189int tipc_bearer_blocked(struct tipc_bearer *b_ptr);
193 struct tipc_link *l_ptr);
194int tipc_bearer_congested(struct tipc_bearer *b_ptr, struct tipc_link *l_ptr);
195void tipc_bearer_stop(void); 190void tipc_bearer_stop(void);
196void tipc_bearer_lock_push(struct tipc_bearer *b_ptr);
197
198 191
199/** 192/**
200 * tipc_bearer_send- sends buffer to destination over bearer 193 * tipc_bearer_send- sends buffer to destination over bearer
201 * 194 *
202 * Returns true (1) if successful, or false (0) if unable to send
203 *
204 * IMPORTANT: 195 * IMPORTANT:
205 * The media send routine must not alter the buffer being passed in 196 * The media send routine must not alter the buffer being passed in
206 * as it may be needed for later retransmission! 197 * as it may be needed for later retransmission!
207 *
208 * If the media send routine returns a non-zero value (indicating that
209 * it was unable to send the buffer), it must:
210 * 1) mark the bearer as blocked,
211 * 2) call tipc_continue() once the bearer is able to send again.
212 * Media types that are unable to meet these two critera must ensure their
213 * send routine always returns success -- even if the buffer was not sent --
214 * and let TIPC's link code deal with the undelivered message.
215 */ 198 */
216static inline int tipc_bearer_send(struct tipc_bearer *b_ptr, 199static inline void tipc_bearer_send(struct tipc_bearer *b, struct sk_buff *buf,
217 struct sk_buff *buf,
218 struct tipc_media_addr *dest) 200 struct tipc_media_addr *dest)
219{ 201{
220 return !b_ptr->media->send_msg(buf, b_ptr, dest); 202 b->media->send_msg(buf, b, dest);
221} 203}
222 204
223#endif /* _TIPC_BEARER_H */ 205#endif /* _TIPC_BEARER_H */
diff --git a/net/tipc/core.c b/net/tipc/core.c
index bfe8af88469a..fc05cecd7481 100644
--- a/net/tipc/core.c
+++ b/net/tipc/core.c
@@ -42,11 +42,6 @@
42 42
43#include <linux/module.h> 43#include <linux/module.h>
44 44
45#ifndef CONFIG_TIPC_PORTS
46#define CONFIG_TIPC_PORTS 8191
47#endif
48
49
50/* global variables used by multiple sub-systems within TIPC */ 45/* global variables used by multiple sub-systems within TIPC */
51int tipc_random __read_mostly; 46int tipc_random __read_mostly;
52 47
diff --git a/net/tipc/discover.c b/net/tipc/discover.c
index 50eaa403eb6e..1074b9587e81 100644
--- a/net/tipc/discover.c
+++ b/net/tipc/discover.c
@@ -243,7 +243,7 @@ void tipc_disc_recv_msg(struct sk_buff *buf, struct tipc_bearer *b_ptr)
243 if ((type == DSC_REQ_MSG) && !link_fully_up && !b_ptr->blocked) { 243 if ((type == DSC_REQ_MSG) && !link_fully_up && !b_ptr->blocked) {
244 rbuf = tipc_disc_init_msg(DSC_RESP_MSG, orig, b_ptr); 244 rbuf = tipc_disc_init_msg(DSC_RESP_MSG, orig, b_ptr);
245 if (rbuf) { 245 if (rbuf) {
246 b_ptr->media->send_msg(rbuf, b_ptr, &media_addr); 246 tipc_bearer_send(b_ptr, rbuf, &media_addr);
247 kfree_skb(rbuf); 247 kfree_skb(rbuf);
248 } 248 }
249 } 249 }
diff --git a/net/tipc/handler.c b/net/tipc/handler.c
index 111ff8300ae5..b36f0fcd9bdf 100644
--- a/net/tipc/handler.c
+++ b/net/tipc/handler.c
@@ -116,7 +116,6 @@ void tipc_handler_stop(void)
116 return; 116 return;
117 117
118 handler_enabled = 0; 118 handler_enabled = 0;
119 tasklet_disable(&tipc_tasklet);
120 tasklet_kill(&tipc_tasklet); 119 tasklet_kill(&tipc_tasklet);
121 120
122 spin_lock_bh(&qitem_lock); 121 spin_lock_bh(&qitem_lock);
diff --git a/net/tipc/link.c b/net/tipc/link.c
index a79c755cb417..daa6080a2a0c 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -1,7 +1,7 @@
1/* 1/*
2 * net/tipc/link.c: TIPC link code 2 * net/tipc/link.c: TIPC link code
3 * 3 *
4 * Copyright (c) 1996-2007, Ericsson AB 4 * Copyright (c) 1996-2007, 2012, Ericsson AB
5 * Copyright (c) 2004-2007, 2010-2011, Wind River Systems 5 * Copyright (c) 2004-2007, 2010-2011, Wind River Systems
6 * All rights reserved. 6 * All rights reserved.
7 * 7 *
@@ -97,12 +97,13 @@ static int link_send_sections_long(struct tipc_port *sender,
97 struct iovec const *msg_sect, 97 struct iovec const *msg_sect,
98 u32 num_sect, unsigned int total_len, 98 u32 num_sect, unsigned int total_len,
99 u32 destnode); 99 u32 destnode);
100static void link_check_defragm_bufs(struct tipc_link *l_ptr);
101static void link_state_event(struct tipc_link *l_ptr, u32 event); 100static void link_state_event(struct tipc_link *l_ptr, u32 event);
102static void link_reset_statistics(struct tipc_link *l_ptr); 101static void link_reset_statistics(struct tipc_link *l_ptr);
103static void link_print(struct tipc_link *l_ptr, const char *str); 102static void link_print(struct tipc_link *l_ptr, const char *str);
104static void link_start(struct tipc_link *l_ptr); 103static void link_start(struct tipc_link *l_ptr);
105static int link_send_long_buf(struct tipc_link *l_ptr, struct sk_buff *buf); 104static int link_send_long_buf(struct tipc_link *l_ptr, struct sk_buff *buf);
105static void tipc_link_send_sync(struct tipc_link *l);
106static void tipc_link_recv_sync(struct tipc_node *n, struct sk_buff *buf);
106 107
107/* 108/*
108 * Simple link routines 109 * Simple link routines
@@ -269,7 +270,6 @@ static void link_timeout(struct tipc_link *l_ptr)
269 } 270 }
270 271
271 /* do all other link processing performed on a periodic basis */ 272 /* do all other link processing performed on a periodic basis */
272 link_check_defragm_bufs(l_ptr);
273 273
274 link_state_event(l_ptr, TIMEOUT_EVT); 274 link_state_event(l_ptr, TIMEOUT_EVT);
275 275
@@ -712,6 +712,8 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)
712 link_activate(l_ptr); 712 link_activate(l_ptr);
713 tipc_link_send_proto_msg(l_ptr, STATE_MSG, 1, 0, 0, 0, 0); 713 tipc_link_send_proto_msg(l_ptr, STATE_MSG, 1, 0, 0, 0, 0);
714 l_ptr->fsm_msg_cnt++; 714 l_ptr->fsm_msg_cnt++;
715 if (l_ptr->owner->working_links == 1)
716 tipc_link_send_sync(l_ptr);
715 link_set_timer(l_ptr, cont_intv); 717 link_set_timer(l_ptr, cont_intv);
716 break; 718 break;
717 case RESET_MSG: 719 case RESET_MSG:
@@ -745,6 +747,8 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)
745 link_activate(l_ptr); 747 link_activate(l_ptr);
746 tipc_link_send_proto_msg(l_ptr, STATE_MSG, 1, 0, 0, 0, 0); 748 tipc_link_send_proto_msg(l_ptr, STATE_MSG, 1, 0, 0, 0, 0);
747 l_ptr->fsm_msg_cnt++; 749 l_ptr->fsm_msg_cnt++;
750 if (l_ptr->owner->working_links == 1)
751 tipc_link_send_sync(l_ptr);
748 link_set_timer(l_ptr, cont_intv); 752 link_set_timer(l_ptr, cont_intv);
749 break; 753 break;
750 case RESET_MSG: 754 case RESET_MSG:
@@ -872,17 +876,12 @@ int tipc_link_send_buf(struct tipc_link *l_ptr, struct sk_buff *buf)
872 return link_send_long_buf(l_ptr, buf); 876 return link_send_long_buf(l_ptr, buf);
873 877
874 /* Packet can be queued or sent. */ 878 /* Packet can be queued or sent. */
875 if (likely(!tipc_bearer_congested(l_ptr->b_ptr, l_ptr) && 879 if (likely(!tipc_bearer_blocked(l_ptr->b_ptr) &&
876 !link_congested(l_ptr))) { 880 !link_congested(l_ptr))) {
877 link_add_to_outqueue(l_ptr, buf, msg); 881 link_add_to_outqueue(l_ptr, buf, msg);
878 882
879 if (likely(tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr))) { 883 tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr);
880 l_ptr->unacked_window = 0; 884 l_ptr->unacked_window = 0;
881 } else {
882 tipc_bearer_schedule(l_ptr->b_ptr, l_ptr);
883 l_ptr->stats.bearer_congs++;
884 l_ptr->next_out = buf;
885 }
886 return dsz; 885 return dsz;
887 } 886 }
888 /* Congestion: can message be bundled ? */ 887 /* Congestion: can message be bundled ? */
@@ -891,10 +890,8 @@ int tipc_link_send_buf(struct tipc_link *l_ptr, struct sk_buff *buf)
891 890
892 /* Try adding message to an existing bundle */ 891 /* Try adding message to an existing bundle */
893 if (l_ptr->next_out && 892 if (l_ptr->next_out &&
894 link_bundle_buf(l_ptr, l_ptr->last_out, buf)) { 893 link_bundle_buf(l_ptr, l_ptr->last_out, buf))
895 tipc_bearer_resolve_congestion(l_ptr->b_ptr, l_ptr);
896 return dsz; 894 return dsz;
897 }
898 895
899 /* Try creating a new bundle */ 896 /* Try creating a new bundle */
900 if (size <= max_packet * 2 / 3) { 897 if (size <= max_packet * 2 / 3) {
@@ -917,7 +914,6 @@ int tipc_link_send_buf(struct tipc_link *l_ptr, struct sk_buff *buf)
917 if (!l_ptr->next_out) 914 if (!l_ptr->next_out)
918 l_ptr->next_out = buf; 915 l_ptr->next_out = buf;
919 link_add_to_outqueue(l_ptr, buf, msg); 916 link_add_to_outqueue(l_ptr, buf, msg);
920 tipc_bearer_resolve_congestion(l_ptr->b_ptr, l_ptr);
921 return dsz; 917 return dsz;
922} 918}
923 919
@@ -949,7 +945,48 @@ int tipc_link_send(struct sk_buff *buf, u32 dest, u32 selector)
949 return res; 945 return res;
950} 946}
951 947
952/** 948/*
949 * tipc_link_send_sync - synchronize broadcast link endpoints.
950 *
951 * Give a newly added peer node the sequence number where it should
952 * start receiving and acking broadcast packets.
953 *
954 * Called with node locked
955 */
956static void tipc_link_send_sync(struct tipc_link *l)
957{
958 struct sk_buff *buf;
959 struct tipc_msg *msg;
960
961 buf = tipc_buf_acquire(INT_H_SIZE);
962 if (!buf)
963 return;
964
965 msg = buf_msg(buf);
966 tipc_msg_init(msg, BCAST_PROTOCOL, STATE_MSG, INT_H_SIZE, l->addr);
967 msg_set_last_bcast(msg, l->owner->bclink.acked);
968 link_add_chain_to_outqueue(l, buf, 0);
969 tipc_link_push_queue(l);
970}
971
972/*
973 * tipc_link_recv_sync - synchronize broadcast link endpoints.
974 * Receive the sequence number where we should start receiving and
975 * acking broadcast packets from a newly added peer node, and open
976 * up for reception of such packets.
977 *
978 * Called with node locked
979 */
980static void tipc_link_recv_sync(struct tipc_node *n, struct sk_buff *buf)
981{
982 struct tipc_msg *msg = buf_msg(buf);
983
984 n->bclink.last_sent = n->bclink.last_in = msg_last_bcast(msg);
985 n->bclink.recv_permitted = true;
986 kfree_skb(buf);
987}
988
989/*
953 * tipc_link_send_names - send name table entries to new neighbor 990 * tipc_link_send_names - send name table entries to new neighbor
954 * 991 *
955 * Send routine for bulk delivery of name table messages when contact 992 * Send routine for bulk delivery of name table messages when contact
@@ -1006,16 +1043,11 @@ static int link_send_buf_fast(struct tipc_link *l_ptr, struct sk_buff *buf,
1006 1043
1007 if (likely(!link_congested(l_ptr))) { 1044 if (likely(!link_congested(l_ptr))) {
1008 if (likely(msg_size(msg) <= l_ptr->max_pkt)) { 1045 if (likely(msg_size(msg) <= l_ptr->max_pkt)) {
1009 if (likely(list_empty(&l_ptr->b_ptr->cong_links))) { 1046 if (likely(!tipc_bearer_blocked(l_ptr->b_ptr))) {
1010 link_add_to_outqueue(l_ptr, buf, msg); 1047 link_add_to_outqueue(l_ptr, buf, msg);
1011 if (likely(tipc_bearer_send(l_ptr->b_ptr, buf, 1048 tipc_bearer_send(l_ptr->b_ptr, buf,
1012 &l_ptr->media_addr))) { 1049 &l_ptr->media_addr);
1013 l_ptr->unacked_window = 0; 1050 l_ptr->unacked_window = 0;
1014 return res;
1015 }
1016 tipc_bearer_schedule(l_ptr->b_ptr, l_ptr);
1017 l_ptr->stats.bearer_congs++;
1018 l_ptr->next_out = buf;
1019 return res; 1051 return res;
1020 } 1052 }
1021 } else 1053 } else
@@ -1106,7 +1138,7 @@ exit:
1106 1138
1107 /* Exit if link (or bearer) is congested */ 1139 /* Exit if link (or bearer) is congested */
1108 if (link_congested(l_ptr) || 1140 if (link_congested(l_ptr) ||
1109 !list_empty(&l_ptr->b_ptr->cong_links)) { 1141 tipc_bearer_blocked(l_ptr->b_ptr)) {
1110 res = link_schedule_port(l_ptr, 1142 res = link_schedule_port(l_ptr,
1111 sender->ref, res); 1143 sender->ref, res);
1112 goto exit; 1144 goto exit;
@@ -1329,15 +1361,11 @@ u32 tipc_link_push_packet(struct tipc_link *l_ptr)
1329 if (r_q_size && buf) { 1361 if (r_q_size && buf) {
1330 msg_set_ack(buf_msg(buf), mod(l_ptr->next_in_no - 1)); 1362 msg_set_ack(buf_msg(buf), mod(l_ptr->next_in_no - 1));
1331 msg_set_bcast_ack(buf_msg(buf), l_ptr->owner->bclink.last_in); 1363 msg_set_bcast_ack(buf_msg(buf), l_ptr->owner->bclink.last_in);
1332 if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) { 1364 tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr);
1333 l_ptr->retransm_queue_head = mod(++r_q_head); 1365 l_ptr->retransm_queue_head = mod(++r_q_head);
1334 l_ptr->retransm_queue_size = --r_q_size; 1366 l_ptr->retransm_queue_size = --r_q_size;
1335 l_ptr->stats.retransmitted++; 1367 l_ptr->stats.retransmitted++;
1336 return 0; 1368 return 0;
1337 } else {
1338 l_ptr->stats.bearer_congs++;
1339 return PUSH_FAILED;
1340 }
1341 } 1369 }
1342 1370
1343 /* Send deferred protocol message, if any: */ 1371 /* Send deferred protocol message, if any: */
@@ -1345,15 +1373,11 @@ u32 tipc_link_push_packet(struct tipc_link *l_ptr)
1345 if (buf) { 1373 if (buf) {
1346 msg_set_ack(buf_msg(buf), mod(l_ptr->next_in_no - 1)); 1374 msg_set_ack(buf_msg(buf), mod(l_ptr->next_in_no - 1));
1347 msg_set_bcast_ack(buf_msg(buf), l_ptr->owner->bclink.last_in); 1375 msg_set_bcast_ack(buf_msg(buf), l_ptr->owner->bclink.last_in);
1348 if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) { 1376 tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr);
1349 l_ptr->unacked_window = 0; 1377 l_ptr->unacked_window = 0;
1350 kfree_skb(buf); 1378 kfree_skb(buf);
1351 l_ptr->proto_msg_queue = NULL; 1379 l_ptr->proto_msg_queue = NULL;
1352 return 0; 1380 return 0;
1353 } else {
1354 l_ptr->stats.bearer_congs++;
1355 return PUSH_FAILED;
1356 }
1357 } 1381 }
1358 1382
1359 /* Send one deferred data message, if send window not full: */ 1383 /* Send one deferred data message, if send window not full: */
@@ -1366,18 +1390,14 @@ u32 tipc_link_push_packet(struct tipc_link *l_ptr)
1366 if (mod(next - first) < l_ptr->queue_limit[0]) { 1390 if (mod(next - first) < l_ptr->queue_limit[0]) {
1367 msg_set_ack(msg, mod(l_ptr->next_in_no - 1)); 1391 msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
1368 msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in); 1392 msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
1369 if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) { 1393 tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr);
1370 if (msg_user(msg) == MSG_BUNDLER) 1394 if (msg_user(msg) == MSG_BUNDLER)
1371 msg_set_type(msg, CLOSED_MSG); 1395 msg_set_type(msg, CLOSED_MSG);
1372 l_ptr->next_out = buf->next; 1396 l_ptr->next_out = buf->next;
1373 return 0; 1397 return 0;
1374 } else {
1375 l_ptr->stats.bearer_congs++;
1376 return PUSH_FAILED;
1377 }
1378 } 1398 }
1379 } 1399 }
1380 return PUSH_FINISHED; 1400 return 1;
1381} 1401}
1382 1402
1383/* 1403/*
@@ -1388,15 +1408,12 @@ void tipc_link_push_queue(struct tipc_link *l_ptr)
1388{ 1408{
1389 u32 res; 1409 u32 res;
1390 1410
1391 if (tipc_bearer_congested(l_ptr->b_ptr, l_ptr)) 1411 if (tipc_bearer_blocked(l_ptr->b_ptr))
1392 return; 1412 return;
1393 1413
1394 do { 1414 do {
1395 res = tipc_link_push_packet(l_ptr); 1415 res = tipc_link_push_packet(l_ptr);
1396 } while (!res); 1416 } while (!res);
1397
1398 if (res == PUSH_FAILED)
1399 tipc_bearer_schedule(l_ptr->b_ptr, l_ptr);
1400} 1417}
1401 1418
1402static void link_reset_all(unsigned long addr) 1419static void link_reset_all(unsigned long addr)
@@ -1454,9 +1471,8 @@ static void link_retransmit_failure(struct tipc_link *l_ptr,
1454 1471
1455 tipc_addr_string_fill(addr_string, n_ptr->addr); 1472 tipc_addr_string_fill(addr_string, n_ptr->addr);
1456 pr_info("Broadcast link info for %s\n", addr_string); 1473 pr_info("Broadcast link info for %s\n", addr_string);
1457 pr_info("Supportable: %d, Supported: %d, Acked: %u\n", 1474 pr_info("Reception permitted: %d, Acked: %u\n",
1458 n_ptr->bclink.supportable, 1475 n_ptr->bclink.recv_permitted,
1459 n_ptr->bclink.supported,
1460 n_ptr->bclink.acked); 1476 n_ptr->bclink.acked);
1461 pr_info("Last in: %u, Oos state: %u, Last sent: %u\n", 1477 pr_info("Last in: %u, Oos state: %u, Last sent: %u\n",
1462 n_ptr->bclink.last_in, 1478 n_ptr->bclink.last_in,
@@ -1481,7 +1497,7 @@ void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *buf,
1481 1497
1482 msg = buf_msg(buf); 1498 msg = buf_msg(buf);
1483 1499
1484 if (tipc_bearer_congested(l_ptr->b_ptr, l_ptr)) { 1500 if (tipc_bearer_blocked(l_ptr->b_ptr)) {
1485 if (l_ptr->retransm_queue_size == 0) { 1501 if (l_ptr->retransm_queue_size == 0) {
1486 l_ptr->retransm_queue_head = msg_seqno(msg); 1502 l_ptr->retransm_queue_head = msg_seqno(msg);
1487 l_ptr->retransm_queue_size = retransmits; 1503 l_ptr->retransm_queue_size = retransmits;
@@ -1491,7 +1507,7 @@ void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *buf,
1491 } 1507 }
1492 return; 1508 return;
1493 } else { 1509 } else {
1494 /* Detect repeated retransmit failures on uncongested bearer */ 1510 /* Detect repeated retransmit failures on unblocked bearer */
1495 if (l_ptr->last_retransmitted == msg_seqno(msg)) { 1511 if (l_ptr->last_retransmitted == msg_seqno(msg)) {
1496 if (++l_ptr->stale_count > 100) { 1512 if (++l_ptr->stale_count > 100) {
1497 link_retransmit_failure(l_ptr, buf); 1513 link_retransmit_failure(l_ptr, buf);
@@ -1507,17 +1523,10 @@ void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *buf,
1507 msg = buf_msg(buf); 1523 msg = buf_msg(buf);
1508 msg_set_ack(msg, mod(l_ptr->next_in_no - 1)); 1524 msg_set_ack(msg, mod(l_ptr->next_in_no - 1));
1509 msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in); 1525 msg_set_bcast_ack(msg, l_ptr->owner->bclink.last_in);
1510 if (tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) { 1526 tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr);
1511 buf = buf->next; 1527 buf = buf->next;
1512 retransmits--; 1528 retransmits--;
1513 l_ptr->stats.retransmitted++; 1529 l_ptr->stats.retransmitted++;
1514 } else {
1515 tipc_bearer_schedule(l_ptr->b_ptr, l_ptr);
1516 l_ptr->stats.bearer_congs++;
1517 l_ptr->retransm_queue_head = buf_seqno(buf);
1518 l_ptr->retransm_queue_size = retransmits;
1519 return;
1520 }
1521 } 1530 }
1522 1531
1523 l_ptr->retransm_queue_head = l_ptr->retransm_queue_size = 0; 1532 l_ptr->retransm_queue_head = l_ptr->retransm_queue_size = 0;
@@ -1676,7 +1685,7 @@ void tipc_recv_msg(struct sk_buff *head, struct tipc_bearer *b_ptr)
1676 ackd = msg_ack(msg); 1685 ackd = msg_ack(msg);
1677 1686
1678 /* Release acked messages */ 1687 /* Release acked messages */
1679 if (n_ptr->bclink.supported) 1688 if (n_ptr->bclink.recv_permitted)
1680 tipc_bclink_acknowledge(n_ptr, msg_bcast_ack(msg)); 1689 tipc_bclink_acknowledge(n_ptr, msg_bcast_ack(msg));
1681 1690
1682 crs = l_ptr->first_out; 1691 crs = l_ptr->first_out;
@@ -1727,9 +1736,14 @@ deliver:
1727 tipc_link_recv_bundle(buf); 1736 tipc_link_recv_bundle(buf);
1728 continue; 1737 continue;
1729 case NAME_DISTRIBUTOR: 1738 case NAME_DISTRIBUTOR:
1739 n_ptr->bclink.recv_permitted = true;
1730 tipc_node_unlock(n_ptr); 1740 tipc_node_unlock(n_ptr);
1731 tipc_named_recv(buf); 1741 tipc_named_recv(buf);
1732 continue; 1742 continue;
1743 case BCAST_PROTOCOL:
1744 tipc_link_recv_sync(n_ptr, buf);
1745 tipc_node_unlock(n_ptr);
1746 continue;
1733 case CONN_MANAGER: 1747 case CONN_MANAGER:
1734 tipc_node_unlock(n_ptr); 1748 tipc_node_unlock(n_ptr);
1735 tipc_port_recv_proto_msg(buf); 1749 tipc_port_recv_proto_msg(buf);
@@ -1772,16 +1786,19 @@ deliver:
1772 continue; 1786 continue;
1773 } 1787 }
1774 1788
1789 /* Link is not in state WORKING_WORKING */
1775 if (msg_user(msg) == LINK_PROTOCOL) { 1790 if (msg_user(msg) == LINK_PROTOCOL) {
1776 link_recv_proto_msg(l_ptr, buf); 1791 link_recv_proto_msg(l_ptr, buf);
1777 head = link_insert_deferred_queue(l_ptr, head); 1792 head = link_insert_deferred_queue(l_ptr, head);
1778 tipc_node_unlock(n_ptr); 1793 tipc_node_unlock(n_ptr);
1779 continue; 1794 continue;
1780 } 1795 }
1796
1797 /* Traffic message. Conditionally activate link */
1781 link_state_event(l_ptr, TRAFFIC_MSG_EVT); 1798 link_state_event(l_ptr, TRAFFIC_MSG_EVT);
1782 1799
1783 if (link_working_working(l_ptr)) { 1800 if (link_working_working(l_ptr)) {
1784 /* Re-insert in front of queue */ 1801 /* Re-insert buffer in front of queue */
1785 buf->next = head; 1802 buf->next = head;
1786 head = buf; 1803 head = buf;
1787 tipc_node_unlock(n_ptr); 1804 tipc_node_unlock(n_ptr);
@@ -1972,21 +1989,13 @@ void tipc_link_send_proto_msg(struct tipc_link *l_ptr, u32 msg_typ,
1972 1989
1973 skb_copy_to_linear_data(buf, msg, sizeof(l_ptr->proto_msg)); 1990 skb_copy_to_linear_data(buf, msg, sizeof(l_ptr->proto_msg));
1974 1991
1975 /* Defer message if bearer is already congested */ 1992 /* Defer message if bearer is already blocked */
1976 if (tipc_bearer_congested(l_ptr->b_ptr, l_ptr)) { 1993 if (tipc_bearer_blocked(l_ptr->b_ptr)) {
1977 l_ptr->proto_msg_queue = buf;
1978 return;
1979 }
1980
1981 /* Defer message if attempting to send results in bearer congestion */
1982 if (!tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr)) {
1983 tipc_bearer_schedule(l_ptr->b_ptr, l_ptr);
1984 l_ptr->proto_msg_queue = buf; 1994 l_ptr->proto_msg_queue = buf;
1985 l_ptr->stats.bearer_congs++;
1986 return; 1995 return;
1987 } 1996 }
1988 1997
1989 /* Discard message if it was sent successfully */ 1998 tipc_bearer_send(l_ptr->b_ptr, buf, &l_ptr->media_addr);
1990 l_ptr->unacked_window = 0; 1999 l_ptr->unacked_window = 0;
1991 kfree_skb(buf); 2000 kfree_skb(buf);
1992} 2001}
@@ -2057,7 +2066,6 @@ static void link_recv_proto_msg(struct tipc_link *l_ptr, struct sk_buff *buf)
2057 } else { 2066 } else {
2058 l_ptr->max_pkt = l_ptr->max_pkt_target; 2067 l_ptr->max_pkt = l_ptr->max_pkt_target;
2059 } 2068 }
2060 l_ptr->owner->bclink.supportable = (max_pkt_info != 0);
2061 2069
2062 /* Synchronize broadcast link info, if not done previously */ 2070 /* Synchronize broadcast link info, if not done previously */
2063 if (!tipc_node_is_up(l_ptr->owner)) { 2071 if (!tipc_node_is_up(l_ptr->owner)) {
@@ -2112,7 +2120,7 @@ static void link_recv_proto_msg(struct tipc_link *l_ptr, struct sk_buff *buf)
2112 } 2120 }
2113 2121
2114 /* Protocol message before retransmits, reduce loss risk */ 2122 /* Protocol message before retransmits, reduce loss risk */
2115 if (l_ptr->owner->bclink.supported) 2123 if (l_ptr->owner->bclink.recv_permitted)
2116 tipc_bclink_update_link_state(l_ptr->owner, 2124 tipc_bclink_update_link_state(l_ptr->owner,
2117 msg_last_bcast(msg)); 2125 msg_last_bcast(msg));
2118 2126
@@ -2487,16 +2495,6 @@ static void set_expected_frags(struct sk_buff *buf, u32 exp)
2487 msg_set_bcast_ack(buf_msg(buf), exp); 2495 msg_set_bcast_ack(buf_msg(buf), exp);
2488} 2496}
2489 2497
2490static u32 get_timer_cnt(struct sk_buff *buf)
2491{
2492 return msg_reroute_cnt(buf_msg(buf));
2493}
2494
2495static void incr_timer_cnt(struct sk_buff *buf)
2496{
2497 msg_incr_reroute_cnt(buf_msg(buf));
2498}
2499
2500/* 2498/*
2501 * tipc_link_recv_fragment(): Called with node lock on. Returns 2499 * tipc_link_recv_fragment(): Called with node lock on. Returns
2502 * the reassembled buffer if message is complete. 2500 * the reassembled buffer if message is complete.
@@ -2575,38 +2573,6 @@ int tipc_link_recv_fragment(struct sk_buff **pending, struct sk_buff **fb,
2575 return 0; 2573 return 0;
2576} 2574}
2577 2575
2578/**
2579 * link_check_defragm_bufs - flush stale incoming message fragments
2580 * @l_ptr: pointer to link
2581 */
2582static void link_check_defragm_bufs(struct tipc_link *l_ptr)
2583{
2584 struct sk_buff *prev = NULL;
2585 struct sk_buff *next = NULL;
2586 struct sk_buff *buf = l_ptr->defragm_buf;
2587
2588 if (!buf)
2589 return;
2590 if (!link_working_working(l_ptr))
2591 return;
2592 while (buf) {
2593 u32 cnt = get_timer_cnt(buf);
2594
2595 next = buf->next;
2596 if (cnt < 4) {
2597 incr_timer_cnt(buf);
2598 prev = buf;
2599 } else {
2600 if (prev)
2601 prev->next = buf->next;
2602 else
2603 l_ptr->defragm_buf = buf->next;
2604 kfree_skb(buf);
2605 }
2606 buf = next;
2607 }
2608}
2609
2610static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tolerance) 2576static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tolerance)
2611{ 2577{
2612 if ((tolerance < TIPC_MIN_LINK_TOL) || (tolerance > TIPC_MAX_LINK_TOL)) 2578 if ((tolerance < TIPC_MIN_LINK_TOL) || (tolerance > TIPC_MAX_LINK_TOL))
@@ -2937,8 +2903,8 @@ static int tipc_link_stats(const char *name, char *buf, const u32 buf_size)
2937 s->sent_nacks, s->sent_acks, s->retransmitted); 2903 s->sent_nacks, s->sent_acks, s->retransmitted);
2938 2904
2939 ret += tipc_snprintf(buf + ret, buf_size - ret, 2905 ret += tipc_snprintf(buf + ret, buf_size - ret,
2940 " Congestion bearer:%u link:%u Send queue" 2906 " Congestion link:%u Send queue"
2941 " max:%u avg:%u\n", s->bearer_congs, s->link_congs, 2907 " max:%u avg:%u\n", s->link_congs,
2942 s->max_queue_sz, s->queue_sz_counts ? 2908 s->max_queue_sz, s->queue_sz_counts ?
2943 (s->accu_queue_sz / s->queue_sz_counts) : 0); 2909 (s->accu_queue_sz / s->queue_sz_counts) : 0);
2944 2910
diff --git a/net/tipc/link.h b/net/tipc/link.h
index 6e921121be06..c048ed1cbd76 100644
--- a/net/tipc/link.h
+++ b/net/tipc/link.h
@@ -40,9 +40,6 @@
40#include "msg.h" 40#include "msg.h"
41#include "node.h" 41#include "node.h"
42 42
43#define PUSH_FAILED 1
44#define PUSH_FINISHED 2
45
46/* 43/*
47 * Out-of-range value for link sequence numbers 44 * Out-of-range value for link sequence numbers
48 */ 45 */
@@ -82,7 +79,6 @@ struct tipc_stats {
82 u32 recv_fragmented; 79 u32 recv_fragmented;
83 u32 recv_fragments; 80 u32 recv_fragments;
84 u32 link_congs; /* # port sends blocked by congestion */ 81 u32 link_congs; /* # port sends blocked by congestion */
85 u32 bearer_congs;
86 u32 deferred_recv; 82 u32 deferred_recv;
87 u32 duplicates; 83 u32 duplicates;
88 u32 max_queue_sz; /* send queue size high water mark */ 84 u32 max_queue_sz; /* send queue size high water mark */
diff --git a/net/tipc/name_distr.c b/net/tipc/name_distr.c
index 55d3928dfd67..e0d08055754e 100644
--- a/net/tipc/name_distr.c
+++ b/net/tipc/name_distr.c
@@ -262,7 +262,7 @@ void tipc_named_node_up(unsigned long nodearg)
262 named_distribute(&message_list, node, &publ_zone, max_item_buf); 262 named_distribute(&message_list, node, &publ_zone, max_item_buf);
263 read_unlock_bh(&tipc_nametbl_lock); 263 read_unlock_bh(&tipc_nametbl_lock);
264 264
265 tipc_link_send_names(&message_list, (u32)node); 265 tipc_link_send_names(&message_list, node);
266} 266}
267 267
268/** 268/**
diff --git a/net/tipc/name_table.c b/net/tipc/name_table.c
index 46754779fd3d..24b167914311 100644
--- a/net/tipc/name_table.c
+++ b/net/tipc/name_table.c
@@ -473,11 +473,10 @@ static void tipc_nameseq_subscribe(struct name_seq *nseq,
473static struct name_seq *nametbl_find_seq(u32 type) 473static struct name_seq *nametbl_find_seq(u32 type)
474{ 474{
475 struct hlist_head *seq_head; 475 struct hlist_head *seq_head;
476 struct hlist_node *seq_node;
477 struct name_seq *ns; 476 struct name_seq *ns;
478 477
479 seq_head = &table.types[hash(type)]; 478 seq_head = &table.types[hash(type)];
480 hlist_for_each_entry(ns, seq_node, seq_head, ns_list) { 479 hlist_for_each_entry(ns, seq_head, ns_list) {
481 if (ns->type == type) 480 if (ns->type == type)
482 return ns; 481 return ns;
483 } 482 }
@@ -853,7 +852,6 @@ static int nametbl_list(char *buf, int len, u32 depth_info,
853 u32 type, u32 lowbound, u32 upbound) 852 u32 type, u32 lowbound, u32 upbound)
854{ 853{
855 struct hlist_head *seq_head; 854 struct hlist_head *seq_head;
856 struct hlist_node *seq_node;
857 struct name_seq *seq; 855 struct name_seq *seq;
858 int all_types; 856 int all_types;
859 int ret = 0; 857 int ret = 0;
@@ -873,7 +871,7 @@ static int nametbl_list(char *buf, int len, u32 depth_info,
873 upbound = ~0; 871 upbound = ~0;
874 for (i = 0; i < TIPC_NAMETBL_SIZE; i++) { 872 for (i = 0; i < TIPC_NAMETBL_SIZE; i++) {
875 seq_head = &table.types[i]; 873 seq_head = &table.types[i];
876 hlist_for_each_entry(seq, seq_node, seq_head, ns_list) { 874 hlist_for_each_entry(seq, seq_head, ns_list) {
877 ret += nameseq_list(seq, buf + ret, len - ret, 875 ret += nameseq_list(seq, buf + ret, len - ret,
878 depth, seq->type, 876 depth, seq->type,
879 lowbound, upbound, i); 877 lowbound, upbound, i);
@@ -889,7 +887,7 @@ static int nametbl_list(char *buf, int len, u32 depth_info,
889 ret += nametbl_header(buf + ret, len - ret, depth); 887 ret += nametbl_header(buf + ret, len - ret, depth);
890 i = hash(type); 888 i = hash(type);
891 seq_head = &table.types[i]; 889 seq_head = &table.types[i];
892 hlist_for_each_entry(seq, seq_node, seq_head, ns_list) { 890 hlist_for_each_entry(seq, seq_head, ns_list) {
893 if (seq->type == type) { 891 if (seq->type == type) {
894 ret += nameseq_list(seq, buf + ret, len - ret, 892 ret += nameseq_list(seq, buf + ret, len - ret,
895 depth, type, 893 depth, type,
diff --git a/net/tipc/node.c b/net/tipc/node.c
index d21db204e25a..6e6c434872e8 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -1,7 +1,7 @@
1/* 1/*
2 * net/tipc/node.c: TIPC node management routines 2 * net/tipc/node.c: TIPC node management routines
3 * 3 *
4 * Copyright (c) 2000-2006, Ericsson AB 4 * Copyright (c) 2000-2006, 2012 Ericsson AB
5 * Copyright (c) 2005-2006, 2010-2011, Wind River Systems 5 * Copyright (c) 2005-2006, 2010-2011, Wind River Systems
6 * All rights reserved. 6 * All rights reserved.
7 * 7 *
@@ -69,12 +69,11 @@ static unsigned int tipc_hashfn(u32 addr)
69struct tipc_node *tipc_node_find(u32 addr) 69struct tipc_node *tipc_node_find(u32 addr)
70{ 70{
71 struct tipc_node *node; 71 struct tipc_node *node;
72 struct hlist_node *pos;
73 72
74 if (unlikely(!in_own_cluster_exact(addr))) 73 if (unlikely(!in_own_cluster_exact(addr)))
75 return NULL; 74 return NULL;
76 75
77 hlist_for_each_entry(node, pos, &node_htable[tipc_hashfn(addr)], hash) { 76 hlist_for_each_entry(node, &node_htable[tipc_hashfn(addr)], hash) {
78 if (node->addr == addr) 77 if (node->addr == addr)
79 return node; 78 return node;
80 } 79 }
@@ -263,12 +262,9 @@ void tipc_node_detach_link(struct tipc_node *n_ptr, struct tipc_link *l_ptr)
263static void node_established_contact(struct tipc_node *n_ptr) 262static void node_established_contact(struct tipc_node *n_ptr)
264{ 263{
265 tipc_k_signal((Handler)tipc_named_node_up, n_ptr->addr); 264 tipc_k_signal((Handler)tipc_named_node_up, n_ptr->addr);
266 265 n_ptr->bclink.oos_state = 0;
267 if (n_ptr->bclink.supportable) { 266 n_ptr->bclink.acked = tipc_bclink_get_last_sent();
268 n_ptr->bclink.acked = tipc_bclink_get_last_sent(); 267 tipc_bclink_add_node(n_ptr->addr);
269 tipc_bclink_add_node(n_ptr->addr);
270 n_ptr->bclink.supported = 1;
271 }
272} 268}
273 269
274static void node_name_purge_complete(unsigned long node_addr) 270static void node_name_purge_complete(unsigned long node_addr)
@@ -294,7 +290,7 @@ static void node_lost_contact(struct tipc_node *n_ptr)
294 tipc_addr_string_fill(addr_string, n_ptr->addr)); 290 tipc_addr_string_fill(addr_string, n_ptr->addr));
295 291
296 /* Flush broadcast link info associated with lost node */ 292 /* Flush broadcast link info associated with lost node */
297 if (n_ptr->bclink.supported) { 293 if (n_ptr->bclink.recv_permitted) {
298 while (n_ptr->bclink.deferred_head) { 294 while (n_ptr->bclink.deferred_head) {
299 struct sk_buff *buf = n_ptr->bclink.deferred_head; 295 struct sk_buff *buf = n_ptr->bclink.deferred_head;
300 n_ptr->bclink.deferred_head = buf->next; 296 n_ptr->bclink.deferred_head = buf->next;
@@ -310,7 +306,7 @@ static void node_lost_contact(struct tipc_node *n_ptr)
310 tipc_bclink_remove_node(n_ptr->addr); 306 tipc_bclink_remove_node(n_ptr->addr);
311 tipc_bclink_acknowledge(n_ptr, INVALID_LINK_SEQ); 307 tipc_bclink_acknowledge(n_ptr, INVALID_LINK_SEQ);
312 308
313 n_ptr->bclink.supported = 0; 309 n_ptr->bclink.recv_permitted = false;
314 } 310 }
315 311
316 /* Abort link changeover */ 312 /* Abort link changeover */
diff --git a/net/tipc/node.h b/net/tipc/node.h
index cfcaf4d6e480..3c189b35b102 100644
--- a/net/tipc/node.h
+++ b/net/tipc/node.h
@@ -67,8 +67,6 @@
67 * @permit_changeover: non-zero if node has redundant links to this system 67 * @permit_changeover: non-zero if node has redundant links to this system
68 * @signature: node instance identifier 68 * @signature: node instance identifier
69 * @bclink: broadcast-related info 69 * @bclink: broadcast-related info
70 * @supportable: non-zero if node supports TIPC b'cast link capability
71 * @supported: non-zero if node supports TIPC b'cast capability
72 * @acked: sequence # of last outbound b'cast message acknowledged by node 70 * @acked: sequence # of last outbound b'cast message acknowledged by node
73 * @last_in: sequence # of last in-sequence b'cast message received from node 71 * @last_in: sequence # of last in-sequence b'cast message received from node
74 * @last_sent: sequence # of last b'cast message sent by node 72 * @last_sent: sequence # of last b'cast message sent by node
@@ -77,6 +75,7 @@
77 * @deferred_head: oldest OOS b'cast message received from node 75 * @deferred_head: oldest OOS b'cast message received from node
78 * @deferred_tail: newest OOS b'cast message received from node 76 * @deferred_tail: newest OOS b'cast message received from node
79 * @defragm: list of partially reassembled b'cast message fragments from node 77 * @defragm: list of partially reassembled b'cast message fragments from node
78 * @recv_permitted: true if node is allowed to receive b'cast messages
80 */ 79 */
81struct tipc_node { 80struct tipc_node {
82 u32 addr; 81 u32 addr;
@@ -92,8 +91,6 @@ struct tipc_node {
92 int permit_changeover; 91 int permit_changeover;
93 u32 signature; 92 u32 signature;
94 struct { 93 struct {
95 u8 supportable;
96 u8 supported;
97 u32 acked; 94 u32 acked;
98 u32 last_in; 95 u32 last_in;
99 u32 last_sent; 96 u32 last_sent;
@@ -102,6 +99,7 @@ struct tipc_node {
102 struct sk_buff *deferred_head; 99 struct sk_buff *deferred_head;
103 struct sk_buff *deferred_tail; 100 struct sk_buff *deferred_tail;
104 struct sk_buff *defragm; 101 struct sk_buff *defragm;
102 bool recv_permitted;
105 } bclink; 103 } bclink;
106}; 104};
107 105
diff --git a/net/tipc/port.c b/net/tipc/port.c
index 07c42fba672b..18098cac62f2 100644
--- a/net/tipc/port.c
+++ b/net/tipc/port.c
@@ -726,7 +726,7 @@ static void port_dispatcher_sigh(void *dummy)
726 if (unlikely(!cb)) 726 if (unlikely(!cb))
727 goto reject; 727 goto reject;
728 if (unlikely(!connected)) { 728 if (unlikely(!connected)) {
729 if (tipc_connect2port(dref, &orig)) 729 if (tipc_connect(dref, &orig))
730 goto reject; 730 goto reject;
731 } else if (peer_invalid) 731 } else if (peer_invalid)
732 goto reject; 732 goto reject;
@@ -1036,15 +1036,30 @@ int tipc_withdraw(u32 ref, unsigned int scope, struct tipc_name_seq const *seq)
1036 return res; 1036 return res;
1037} 1037}
1038 1038
1039int tipc_connect2port(u32 ref, struct tipc_portid const *peer) 1039int tipc_connect(u32 ref, struct tipc_portid const *peer)
1040{ 1040{
1041 struct tipc_port *p_ptr; 1041 struct tipc_port *p_ptr;
1042 struct tipc_msg *msg; 1042 int res;
1043 int res = -EINVAL;
1044 1043
1045 p_ptr = tipc_port_lock(ref); 1044 p_ptr = tipc_port_lock(ref);
1046 if (!p_ptr) 1045 if (!p_ptr)
1047 return -EINVAL; 1046 return -EINVAL;
1047 res = __tipc_connect(ref, p_ptr, peer);
1048 tipc_port_unlock(p_ptr);
1049 return res;
1050}
1051
1052/*
1053 * __tipc_connect - connect to a remote peer
1054 *
1055 * Port must be locked.
1056 */
1057int __tipc_connect(u32 ref, struct tipc_port *p_ptr,
1058 struct tipc_portid const *peer)
1059{
1060 struct tipc_msg *msg;
1061 int res = -EINVAL;
1062
1048 if (p_ptr->published || p_ptr->connected) 1063 if (p_ptr->published || p_ptr->connected)
1049 goto exit; 1064 goto exit;
1050 if (!peer->ref) 1065 if (!peer->ref)
@@ -1067,17 +1082,16 @@ int tipc_connect2port(u32 ref, struct tipc_portid const *peer)
1067 (net_ev_handler)port_handle_node_down); 1082 (net_ev_handler)port_handle_node_down);
1068 res = 0; 1083 res = 0;
1069exit: 1084exit:
1070 tipc_port_unlock(p_ptr);
1071 p_ptr->max_pkt = tipc_link_get_max_pkt(peer->node, ref); 1085 p_ptr->max_pkt = tipc_link_get_max_pkt(peer->node, ref);
1072 return res; 1086 return res;
1073} 1087}
1074 1088
1075/** 1089/*
1076 * tipc_disconnect_port - disconnect port from peer 1090 * __tipc_disconnect - disconnect port from peer
1077 * 1091 *
1078 * Port must be locked. 1092 * Port must be locked.
1079 */ 1093 */
1080int tipc_disconnect_port(struct tipc_port *tp_ptr) 1094int __tipc_disconnect(struct tipc_port *tp_ptr)
1081{ 1095{
1082 int res; 1096 int res;
1083 1097
@@ -1104,7 +1118,7 @@ int tipc_disconnect(u32 ref)
1104 p_ptr = tipc_port_lock(ref); 1118 p_ptr = tipc_port_lock(ref);
1105 if (!p_ptr) 1119 if (!p_ptr)
1106 return -EINVAL; 1120 return -EINVAL;
1107 res = tipc_disconnect_port(p_ptr); 1121 res = __tipc_disconnect(p_ptr);
1108 tipc_port_unlock(p_ptr); 1122 tipc_port_unlock(p_ptr);
1109 return res; 1123 return res;
1110} 1124}
diff --git a/net/tipc/port.h b/net/tipc/port.h
index 4660e3065790..fb66e2e5f4d1 100644
--- a/net/tipc/port.h
+++ b/net/tipc/port.h
@@ -190,7 +190,7 @@ int tipc_publish(u32 portref, unsigned int scope,
190int tipc_withdraw(u32 portref, unsigned int scope, 190int tipc_withdraw(u32 portref, unsigned int scope,
191 struct tipc_name_seq const *name_seq); 191 struct tipc_name_seq const *name_seq);
192 192
193int tipc_connect2port(u32 portref, struct tipc_portid const *port); 193int tipc_connect(u32 portref, struct tipc_portid const *port);
194 194
195int tipc_disconnect(u32 portref); 195int tipc_disconnect(u32 portref);
196 196
@@ -200,7 +200,9 @@ int tipc_shutdown(u32 ref);
200/* 200/*
201 * The following routines require that the port be locked on entry 201 * The following routines require that the port be locked on entry
202 */ 202 */
203int tipc_disconnect_port(struct tipc_port *tp_ptr); 203int __tipc_disconnect(struct tipc_port *tp_ptr);
204int __tipc_connect(u32 ref, struct tipc_port *p_ptr,
205 struct tipc_portid const *peer);
204int tipc_port_peer_msg(struct tipc_port *p_ptr, struct tipc_msg *msg); 206int tipc_port_peer_msg(struct tipc_port *p_ptr, struct tipc_msg *msg);
205 207
206/* 208/*
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index fd5f042dbff4..515ce38e4f4c 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -1,8 +1,8 @@
1/* 1/*
2 * net/tipc/socket.c: TIPC socket API 2 * net/tipc/socket.c: TIPC socket API
3 * 3 *
4 * Copyright (c) 2001-2007, Ericsson AB 4 * Copyright (c) 2001-2007, 2012 Ericsson AB
5 * Copyright (c) 2004-2008, 2010-2011, Wind River Systems 5 * Copyright (c) 2004-2008, 2010-2012, Wind River Systems
6 * All rights reserved. 6 * All rights reserved.
7 * 7 *
8 * Redistribution and use in source and binary forms, with or without 8 * Redistribution and use in source and binary forms, with or without
@@ -43,7 +43,8 @@
43#define SS_LISTENING -1 /* socket is listening */ 43#define SS_LISTENING -1 /* socket is listening */
44#define SS_READY -2 /* socket is connectionless */ 44#define SS_READY -2 /* socket is connectionless */
45 45
46#define OVERLOAD_LIMIT_BASE 5000 46#define CONN_OVERLOAD_LIMIT ((TIPC_FLOW_CONTROL_WIN * 2 + 1) * \
47 SKB_TRUESIZE(TIPC_MAX_USER_MSG_SIZE))
47#define CONN_TIMEOUT_DEFAULT 8000 /* default connect timeout = 8s */ 48#define CONN_TIMEOUT_DEFAULT 8000 /* default connect timeout = 8s */
48 49
49struct tipc_sock { 50struct tipc_sock {
@@ -62,6 +63,8 @@ struct tipc_sock {
62static int backlog_rcv(struct sock *sk, struct sk_buff *skb); 63static int backlog_rcv(struct sock *sk, struct sk_buff *skb);
63static u32 dispatch(struct tipc_port *tport, struct sk_buff *buf); 64static u32 dispatch(struct tipc_port *tport, struct sk_buff *buf);
64static void wakeupdispatch(struct tipc_port *tport); 65static void wakeupdispatch(struct tipc_port *tport);
66static void tipc_data_ready(struct sock *sk, int len);
67static void tipc_write_space(struct sock *sk);
65 68
66static const struct proto_ops packet_ops; 69static const struct proto_ops packet_ops;
67static const struct proto_ops stream_ops; 70static const struct proto_ops stream_ops;
@@ -71,8 +74,6 @@ static struct proto tipc_proto;
71 74
72static int sockets_enabled; 75static int sockets_enabled;
73 76
74static atomic_t tipc_queue_size = ATOMIC_INIT(0);
75
76/* 77/*
77 * Revised TIPC socket locking policy: 78 * Revised TIPC socket locking policy:
78 * 79 *
@@ -126,22 +127,6 @@ static atomic_t tipc_queue_size = ATOMIC_INIT(0);
126static void advance_rx_queue(struct sock *sk) 127static void advance_rx_queue(struct sock *sk)
127{ 128{
128 kfree_skb(__skb_dequeue(&sk->sk_receive_queue)); 129 kfree_skb(__skb_dequeue(&sk->sk_receive_queue));
129 atomic_dec(&tipc_queue_size);
130}
131
132/**
133 * discard_rx_queue - discard all buffers in socket receive queue
134 *
135 * Caller must hold socket lock
136 */
137static void discard_rx_queue(struct sock *sk)
138{
139 struct sk_buff *buf;
140
141 while ((buf = __skb_dequeue(&sk->sk_receive_queue))) {
142 atomic_dec(&tipc_queue_size);
143 kfree_skb(buf);
144 }
145} 130}
146 131
147/** 132/**
@@ -153,10 +138,8 @@ static void reject_rx_queue(struct sock *sk)
153{ 138{
154 struct sk_buff *buf; 139 struct sk_buff *buf;
155 140
156 while ((buf = __skb_dequeue(&sk->sk_receive_queue))) { 141 while ((buf = __skb_dequeue(&sk->sk_receive_queue)))
157 tipc_reject_msg(buf, TIPC_ERR_NO_PORT); 142 tipc_reject_msg(buf, TIPC_ERR_NO_PORT);
158 atomic_dec(&tipc_queue_size);
159 }
160} 143}
161 144
162/** 145/**
@@ -220,7 +203,8 @@ static int tipc_create(struct net *net, struct socket *sock, int protocol,
220 203
221 sock_init_data(sock, sk); 204 sock_init_data(sock, sk);
222 sk->sk_backlog_rcv = backlog_rcv; 205 sk->sk_backlog_rcv = backlog_rcv;
223 sk->sk_rcvbuf = TIPC_FLOW_CONTROL_WIN * 2 * TIPC_MAX_USER_MSG_SIZE * 2; 206 sk->sk_data_ready = tipc_data_ready;
207 sk->sk_write_space = tipc_write_space;
224 tipc_sk(sk)->p = tp_ptr; 208 tipc_sk(sk)->p = tp_ptr;
225 tipc_sk(sk)->conn_timeout = CONN_TIMEOUT_DEFAULT; 209 tipc_sk(sk)->conn_timeout = CONN_TIMEOUT_DEFAULT;
226 210
@@ -276,7 +260,6 @@ static int release(struct socket *sock)
276 buf = __skb_dequeue(&sk->sk_receive_queue); 260 buf = __skb_dequeue(&sk->sk_receive_queue);
277 if (buf == NULL) 261 if (buf == NULL)
278 break; 262 break;
279 atomic_dec(&tipc_queue_size);
280 if (TIPC_SKB_CB(buf)->handle != 0) 263 if (TIPC_SKB_CB(buf)->handle != 0)
281 kfree_skb(buf); 264 kfree_skb(buf);
282 else { 265 else {
@@ -296,7 +279,7 @@ static int release(struct socket *sock)
296 res = tipc_deleteport(tport->ref); 279 res = tipc_deleteport(tport->ref);
297 280
298 /* Discard any remaining (connection-based) messages in receive queue */ 281 /* Discard any remaining (connection-based) messages in receive queue */
299 discard_rx_queue(sk); 282 __skb_queue_purge(&sk->sk_receive_queue);
300 283
301 /* Reject any messages that accumulated in backlog queue */ 284 /* Reject any messages that accumulated in backlog queue */
302 sock->state = SS_DISCONNECTING; 285 sock->state = SS_DISCONNECTING;
@@ -408,7 +391,7 @@ static int get_name(struct socket *sock, struct sockaddr *uaddr,
408 * socket state flags set 391 * socket state flags set
409 * ------------ --------- 392 * ------------ ---------
410 * unconnected no read flags 393 * unconnected no read flags
411 * no write flags 394 * POLLOUT if port is not congested
412 * 395 *
413 * connecting POLLIN/POLLRDNORM if ACK/NACK in rx queue 396 * connecting POLLIN/POLLRDNORM if ACK/NACK in rx queue
414 * no write flags 397 * no write flags
@@ -435,9 +418,13 @@ static unsigned int poll(struct file *file, struct socket *sock,
435 struct sock *sk = sock->sk; 418 struct sock *sk = sock->sk;
436 u32 mask = 0; 419 u32 mask = 0;
437 420
438 poll_wait(file, sk_sleep(sk), wait); 421 sock_poll_wait(file, sk_sleep(sk), wait);
439 422
440 switch ((int)sock->state) { 423 switch ((int)sock->state) {
424 case SS_UNCONNECTED:
425 if (!tipc_sk_port(sk)->congested)
426 mask |= POLLOUT;
427 break;
441 case SS_READY: 428 case SS_READY:
442 case SS_CONNECTED: 429 case SS_CONNECTED:
443 if (!tipc_sk_port(sk)->congested) 430 if (!tipc_sk_port(sk)->congested)
@@ -516,8 +503,7 @@ static int send_msg(struct kiocb *iocb, struct socket *sock,
516 if (unlikely((m->msg_namelen < sizeof(*dest)) || 503 if (unlikely((m->msg_namelen < sizeof(*dest)) ||
517 (dest->family != AF_TIPC))) 504 (dest->family != AF_TIPC)))
518 return -EINVAL; 505 return -EINVAL;
519 if ((total_len > TIPC_MAX_USER_MSG_SIZE) || 506 if (total_len > TIPC_MAX_USER_MSG_SIZE)
520 (m->msg_iovlen > (unsigned int)INT_MAX))
521 return -EMSGSIZE; 507 return -EMSGSIZE;
522 508
523 if (iocb) 509 if (iocb)
@@ -625,8 +611,7 @@ static int send_packet(struct kiocb *iocb, struct socket *sock,
625 if (unlikely(dest)) 611 if (unlikely(dest))
626 return send_msg(iocb, sock, m, total_len); 612 return send_msg(iocb, sock, m, total_len);
627 613
628 if ((total_len > TIPC_MAX_USER_MSG_SIZE) || 614 if (total_len > TIPC_MAX_USER_MSG_SIZE)
629 (m->msg_iovlen > (unsigned int)INT_MAX))
630 return -EMSGSIZE; 615 return -EMSGSIZE;
631 616
632 if (iocb) 617 if (iocb)
@@ -711,8 +696,7 @@ static int send_stream(struct kiocb *iocb, struct socket *sock,
711 goto exit; 696 goto exit;
712 } 697 }
713 698
714 if ((total_len > (unsigned int)INT_MAX) || 699 if (total_len > (unsigned int)INT_MAX) {
715 (m->msg_iovlen > (unsigned int)INT_MAX)) {
716 res = -EMSGSIZE; 700 res = -EMSGSIZE;
717 goto exit; 701 goto exit;
718 } 702 }
@@ -775,16 +759,19 @@ exit:
775static int auto_connect(struct socket *sock, struct tipc_msg *msg) 759static int auto_connect(struct socket *sock, struct tipc_msg *msg)
776{ 760{
777 struct tipc_sock *tsock = tipc_sk(sock->sk); 761 struct tipc_sock *tsock = tipc_sk(sock->sk);
778 762 struct tipc_port *p_ptr;
779 if (msg_errcode(msg)) {
780 sock->state = SS_DISCONNECTING;
781 return -ECONNREFUSED;
782 }
783 763
784 tsock->peer_name.ref = msg_origport(msg); 764 tsock->peer_name.ref = msg_origport(msg);
785 tsock->peer_name.node = msg_orignode(msg); 765 tsock->peer_name.node = msg_orignode(msg);
786 tipc_connect2port(tsock->p->ref, &tsock->peer_name); 766 p_ptr = tipc_port_deref(tsock->p->ref);
787 tipc_set_portimportance(tsock->p->ref, msg_importance(msg)); 767 if (!p_ptr)
768 return -EINVAL;
769
770 __tipc_connect(tsock->p->ref, p_ptr, &tsock->peer_name);
771
772 if (msg_importance(msg) > TIPC_CRITICAL_IMPORTANCE)
773 return -EINVAL;
774 msg_set_importance(&p_ptr->phdr, (u32)msg_importance(msg));
788 sock->state = SS_CONNECTED; 775 sock->state = SS_CONNECTED;
789 return 0; 776 return 0;
790} 777}
@@ -803,6 +790,7 @@ static void set_orig_addr(struct msghdr *m, struct tipc_msg *msg)
803 if (addr) { 790 if (addr) {
804 addr->family = AF_TIPC; 791 addr->family = AF_TIPC;
805 addr->addrtype = TIPC_ADDR_ID; 792 addr->addrtype = TIPC_ADDR_ID;
793 memset(&addr->addr, 0, sizeof(addr->addr));
806 addr->addr.id.ref = msg_origport(msg); 794 addr->addr.id.ref = msg_origport(msg);
807 addr->addr.id.node = msg_orignode(msg); 795 addr->addr.id.node = msg_orignode(msg);
808 addr->addr.name.domain = 0; /* could leave uninitialized */ 796 addr->addr.name.domain = 0; /* could leave uninitialized */
@@ -917,6 +905,9 @@ static int recv_msg(struct kiocb *iocb, struct socket *sock,
917 goto exit; 905 goto exit;
918 } 906 }
919 907
908 /* will be updated in set_orig_addr() if needed */
909 m->msg_namelen = 0;
910
920 timeout = sock_rcvtimeo(sk, flags & MSG_DONTWAIT); 911 timeout = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
921restart: 912restart:
922 913
@@ -943,13 +934,6 @@ restart:
943 sz = msg_data_sz(msg); 934 sz = msg_data_sz(msg);
944 err = msg_errcode(msg); 935 err = msg_errcode(msg);
945 936
946 /* Complete connection setup for an implied connect */
947 if (unlikely(sock->state == SS_CONNECTING)) {
948 res = auto_connect(sock, msg);
949 if (res)
950 goto exit;
951 }
952
953 /* Discard an empty non-errored message & try again */ 937 /* Discard an empty non-errored message & try again */
954 if ((!sz) && (!err)) { 938 if ((!sz) && (!err)) {
955 advance_rx_queue(sk); 939 advance_rx_queue(sk);
@@ -1033,6 +1017,9 @@ static int recv_stream(struct kiocb *iocb, struct socket *sock,
1033 goto exit; 1017 goto exit;
1034 } 1018 }
1035 1019
1020 /* will be updated in set_orig_addr() if needed */
1021 m->msg_namelen = 0;
1022
1036 target = sock_rcvlowat(sk, flags & MSG_WAITALL, buf_len); 1023 target = sock_rcvlowat(sk, flags & MSG_WAITALL, buf_len);
1037 timeout = sock_rcvtimeo(sk, flags & MSG_DONTWAIT); 1024 timeout = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
1038 1025
@@ -1126,31 +1113,143 @@ exit:
1126} 1113}
1127 1114
1128/** 1115/**
1129 * rx_queue_full - determine if receive queue can accept another message 1116 * tipc_write_space - wake up thread if port congestion is released
1130 * @msg: message to be added to queue 1117 * @sk: socket
1131 * @queue_size: current size of queue 1118 */
1132 * @base: nominal maximum size of queue 1119static void tipc_write_space(struct sock *sk)
1120{
1121 struct socket_wq *wq;
1122
1123 rcu_read_lock();
1124 wq = rcu_dereference(sk->sk_wq);
1125 if (wq_has_sleeper(wq))
1126 wake_up_interruptible_sync_poll(&wq->wait, POLLOUT |
1127 POLLWRNORM | POLLWRBAND);
1128 rcu_read_unlock();
1129}
1130
1131/**
1132 * tipc_data_ready - wake up threads to indicate messages have been received
1133 * @sk: socket
1134 * @len: the length of messages
1135 */
1136static void tipc_data_ready(struct sock *sk, int len)
1137{
1138 struct socket_wq *wq;
1139
1140 rcu_read_lock();
1141 wq = rcu_dereference(sk->sk_wq);
1142 if (wq_has_sleeper(wq))
1143 wake_up_interruptible_sync_poll(&wq->wait, POLLIN |
1144 POLLRDNORM | POLLRDBAND);
1145 rcu_read_unlock();
1146}
1147
1148/**
1149 * filter_connect - Handle all incoming messages for a connection-based socket
1150 * @tsock: TIPC socket
1151 * @msg: message
1133 * 1152 *
1134 * Returns 1 if queue is unable to accept message, 0 otherwise 1153 * Returns TIPC error status code and socket error status code
1154 * once it encounters some errors
1135 */ 1155 */
1136static int rx_queue_full(struct tipc_msg *msg, u32 queue_size, u32 base) 1156static u32 filter_connect(struct tipc_sock *tsock, struct sk_buff **buf)
1137{ 1157{
1138 u32 threshold; 1158 struct socket *sock = tsock->sk.sk_socket;
1139 u32 imp = msg_importance(msg); 1159 struct tipc_msg *msg = buf_msg(*buf);
1140 1160 struct sock *sk = &tsock->sk;
1141 if (imp == TIPC_LOW_IMPORTANCE) 1161 u32 retval = TIPC_ERR_NO_PORT;
1142 threshold = base; 1162 int res;
1143 else if (imp == TIPC_MEDIUM_IMPORTANCE)
1144 threshold = base * 2;
1145 else if (imp == TIPC_HIGH_IMPORTANCE)
1146 threshold = base * 100;
1147 else
1148 return 0;
1149 1163
1150 if (msg_connected(msg)) 1164 if (msg_mcast(msg))
1151 threshold *= 4; 1165 return retval;
1152 1166
1153 return queue_size >= threshold; 1167 switch ((int)sock->state) {
1168 case SS_CONNECTED:
1169 /* Accept only connection-based messages sent by peer */
1170 if (msg_connected(msg) && tipc_port_peer_msg(tsock->p, msg)) {
1171 if (unlikely(msg_errcode(msg))) {
1172 sock->state = SS_DISCONNECTING;
1173 __tipc_disconnect(tsock->p);
1174 }
1175 retval = TIPC_OK;
1176 }
1177 break;
1178 case SS_CONNECTING:
1179 /* Accept only ACK or NACK message */
1180 if (unlikely(msg_errcode(msg))) {
1181 sock->state = SS_DISCONNECTING;
1182 sk->sk_err = -ECONNREFUSED;
1183 retval = TIPC_OK;
1184 break;
1185 }
1186
1187 if (unlikely(!msg_connected(msg)))
1188 break;
1189
1190 res = auto_connect(sock, msg);
1191 if (res) {
1192 sock->state = SS_DISCONNECTING;
1193 sk->sk_err = res;
1194 retval = TIPC_OK;
1195 break;
1196 }
1197
1198 /* If an incoming message is an 'ACK-', it should be
1199 * discarded here because it doesn't contain useful
1200 * data. In addition, we should try to wake up
1201 * connect() routine if sleeping.
1202 */
1203 if (msg_data_sz(msg) == 0) {
1204 kfree_skb(*buf);
1205 *buf = NULL;
1206 if (waitqueue_active(sk_sleep(sk)))
1207 wake_up_interruptible(sk_sleep(sk));
1208 }
1209 retval = TIPC_OK;
1210 break;
1211 case SS_LISTENING:
1212 case SS_UNCONNECTED:
1213 /* Accept only SYN message */
1214 if (!msg_connected(msg) && !(msg_errcode(msg)))
1215 retval = TIPC_OK;
1216 break;
1217 case SS_DISCONNECTING:
1218 break;
1219 default:
1220 pr_err("Unknown socket state %u\n", sock->state);
1221 }
1222 return retval;
1223}
1224
1225/**
1226 * rcvbuf_limit - get proper overload limit of socket receive queue
1227 * @sk: socket
1228 * @buf: message
1229 *
1230 * For all connection oriented messages, irrespective of importance,
1231 * the default overload value (i.e. 67MB) is set as limit.
1232 *
1233 * For all connectionless messages, by default new queue limits are
1234 * as belows:
1235 *
1236 * TIPC_LOW_IMPORTANCE (5MB)
1237 * TIPC_MEDIUM_IMPORTANCE (10MB)
1238 * TIPC_HIGH_IMPORTANCE (20MB)
1239 * TIPC_CRITICAL_IMPORTANCE (40MB)
1240 *
1241 * Returns overload limit according to corresponding message importance
1242 */
1243static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *buf)
1244{
1245 struct tipc_msg *msg = buf_msg(buf);
1246 unsigned int limit;
1247
1248 if (msg_connected(msg))
1249 limit = CONN_OVERLOAD_LIMIT;
1250 else
1251 limit = sk->sk_rcvbuf << (msg_importance(msg) + 5);
1252 return limit;
1154} 1253}
1155 1254
1156/** 1255/**
@@ -1169,7 +1268,8 @@ static u32 filter_rcv(struct sock *sk, struct sk_buff *buf)
1169{ 1268{
1170 struct socket *sock = sk->sk_socket; 1269 struct socket *sock = sk->sk_socket;
1171 struct tipc_msg *msg = buf_msg(buf); 1270 struct tipc_msg *msg = buf_msg(buf);
1172 u32 recv_q_len; 1271 unsigned int limit = rcvbuf_limit(sk, buf);
1272 u32 res = TIPC_OK;
1173 1273
1174 /* Reject message if it is wrong sort of message for socket */ 1274 /* Reject message if it is wrong sort of message for socket */
1175 if (msg_type(msg) > TIPC_DIRECT_MSG) 1275 if (msg_type(msg) > TIPC_DIRECT_MSG)
@@ -1179,51 +1279,21 @@ static u32 filter_rcv(struct sock *sk, struct sk_buff *buf)
1179 if (msg_connected(msg)) 1279 if (msg_connected(msg))
1180 return TIPC_ERR_NO_PORT; 1280 return TIPC_ERR_NO_PORT;
1181 } else { 1281 } else {
1182 if (msg_mcast(msg)) 1282 res = filter_connect(tipc_sk(sk), &buf);
1183 return TIPC_ERR_NO_PORT; 1283 if (res != TIPC_OK || buf == NULL)
1184 if (sock->state == SS_CONNECTED) { 1284 return res;
1185 if (!msg_connected(msg) ||
1186 !tipc_port_peer_msg(tipc_sk_port(sk), msg))
1187 return TIPC_ERR_NO_PORT;
1188 } else if (sock->state == SS_CONNECTING) {
1189 if (!msg_connected(msg) && (msg_errcode(msg) == 0))
1190 return TIPC_ERR_NO_PORT;
1191 } else if (sock->state == SS_LISTENING) {
1192 if (msg_connected(msg) || msg_errcode(msg))
1193 return TIPC_ERR_NO_PORT;
1194 } else if (sock->state == SS_DISCONNECTING) {
1195 return TIPC_ERR_NO_PORT;
1196 } else /* (sock->state == SS_UNCONNECTED) */ {
1197 if (msg_connected(msg) || msg_errcode(msg))
1198 return TIPC_ERR_NO_PORT;
1199 }
1200 } 1285 }
1201 1286
1202 /* Reject message if there isn't room to queue it */ 1287 /* Reject message if there isn't room to queue it */
1203 recv_q_len = (u32)atomic_read(&tipc_queue_size); 1288 if (sk_rmem_alloc_get(sk) + buf->truesize >= limit)
1204 if (unlikely(recv_q_len >= OVERLOAD_LIMIT_BASE)) { 1289 return TIPC_ERR_OVERLOAD;
1205 if (rx_queue_full(msg, recv_q_len, OVERLOAD_LIMIT_BASE))
1206 return TIPC_ERR_OVERLOAD;
1207 }
1208 recv_q_len = skb_queue_len(&sk->sk_receive_queue);
1209 if (unlikely(recv_q_len >= (OVERLOAD_LIMIT_BASE / 2))) {
1210 if (rx_queue_full(msg, recv_q_len, OVERLOAD_LIMIT_BASE / 2))
1211 return TIPC_ERR_OVERLOAD;
1212 }
1213 1290
1214 /* Enqueue message (finally!) */ 1291 /* Enqueue message */
1215 TIPC_SKB_CB(buf)->handle = 0; 1292 TIPC_SKB_CB(buf)->handle = 0;
1216 atomic_inc(&tipc_queue_size);
1217 __skb_queue_tail(&sk->sk_receive_queue, buf); 1293 __skb_queue_tail(&sk->sk_receive_queue, buf);
1294 skb_set_owner_r(buf, sk);
1218 1295
1219 /* Initiate connection termination for an incoming 'FIN' */ 1296 sk->sk_data_ready(sk, 0);
1220 if (unlikely(msg_errcode(msg) && (sock->state == SS_CONNECTED))) {
1221 sock->state = SS_DISCONNECTING;
1222 tipc_disconnect_port(tipc_sk_port(sk));
1223 }
1224
1225 if (waitqueue_active(sk_sleep(sk)))
1226 wake_up_interruptible(sk_sleep(sk));
1227 return TIPC_OK; 1297 return TIPC_OK;
1228} 1298}
1229 1299
@@ -1270,7 +1340,7 @@ static u32 dispatch(struct tipc_port *tport, struct sk_buff *buf)
1270 if (!sock_owned_by_user(sk)) { 1340 if (!sock_owned_by_user(sk)) {
1271 res = filter_rcv(sk, buf); 1341 res = filter_rcv(sk, buf);
1272 } else { 1342 } else {
1273 if (sk_add_backlog(sk, buf, sk->sk_rcvbuf)) 1343 if (sk_add_backlog(sk, buf, rcvbuf_limit(sk, buf)))
1274 res = TIPC_ERR_OVERLOAD; 1344 res = TIPC_ERR_OVERLOAD;
1275 else 1345 else
1276 res = TIPC_OK; 1346 res = TIPC_OK;
@@ -1290,8 +1360,7 @@ static void wakeupdispatch(struct tipc_port *tport)
1290{ 1360{
1291 struct sock *sk = (struct sock *)tport->usr_handle; 1361 struct sock *sk = (struct sock *)tport->usr_handle;
1292 1362
1293 if (waitqueue_active(sk_sleep(sk))) 1363 sk->sk_write_space(sk);
1294 wake_up_interruptible(sk_sleep(sk));
1295} 1364}
1296 1365
1297/** 1366/**
@@ -1309,8 +1378,6 @@ static int connect(struct socket *sock, struct sockaddr *dest, int destlen,
1309 struct sock *sk = sock->sk; 1378 struct sock *sk = sock->sk;
1310 struct sockaddr_tipc *dst = (struct sockaddr_tipc *)dest; 1379 struct sockaddr_tipc *dst = (struct sockaddr_tipc *)dest;
1311 struct msghdr m = {NULL,}; 1380 struct msghdr m = {NULL,};
1312 struct sk_buff *buf;
1313 struct tipc_msg *msg;
1314 unsigned int timeout; 1381 unsigned int timeout;
1315 int res; 1382 int res;
1316 1383
@@ -1322,26 +1389,6 @@ static int connect(struct socket *sock, struct sockaddr *dest, int destlen,
1322 goto exit; 1389 goto exit;
1323 } 1390 }
1324 1391
1325 /* For now, TIPC does not support the non-blocking form of connect() */
1326 if (flags & O_NONBLOCK) {
1327 res = -EOPNOTSUPP;
1328 goto exit;
1329 }
1330
1331 /* Issue Posix-compliant error code if socket is in the wrong state */
1332 if (sock->state == SS_LISTENING) {
1333 res = -EOPNOTSUPP;
1334 goto exit;
1335 }
1336 if (sock->state == SS_CONNECTING) {
1337 res = -EALREADY;
1338 goto exit;
1339 }
1340 if (sock->state != SS_UNCONNECTED) {
1341 res = -EISCONN;
1342 goto exit;
1343 }
1344
1345 /* 1392 /*
1346 * Reject connection attempt using multicast address 1393 * Reject connection attempt using multicast address
1347 * 1394 *
@@ -1353,49 +1400,66 @@ static int connect(struct socket *sock, struct sockaddr *dest, int destlen,
1353 goto exit; 1400 goto exit;
1354 } 1401 }
1355 1402
1356 /* Reject any messages already in receive queue (very unlikely) */ 1403 timeout = (flags & O_NONBLOCK) ? 0 : tipc_sk(sk)->conn_timeout;
1357 reject_rx_queue(sk);
1358 1404
1359 /* Send a 'SYN-' to destination */ 1405 switch (sock->state) {
1360 m.msg_name = dest; 1406 case SS_UNCONNECTED:
1361 m.msg_namelen = destlen; 1407 /* Send a 'SYN-' to destination */
1362 res = send_msg(NULL, sock, &m, 0); 1408 m.msg_name = dest;
1363 if (res < 0) 1409 m.msg_namelen = destlen;
1410
1411 /* If connect is in non-blocking case, set MSG_DONTWAIT to
1412 * indicate send_msg() is never blocked.
1413 */
1414 if (!timeout)
1415 m.msg_flags = MSG_DONTWAIT;
1416
1417 res = send_msg(NULL, sock, &m, 0);
1418 if ((res < 0) && (res != -EWOULDBLOCK))
1419 goto exit;
1420
1421 /* Just entered SS_CONNECTING state; the only
1422 * difference is that return value in non-blocking
1423 * case is EINPROGRESS, rather than EALREADY.
1424 */
1425 res = -EINPROGRESS;
1426 break;
1427 case SS_CONNECTING:
1428 res = -EALREADY;
1429 break;
1430 case SS_CONNECTED:
1431 res = -EISCONN;
1432 break;
1433 default:
1434 res = -EINVAL;
1364 goto exit; 1435 goto exit;
1436 }
1365 1437
1366 /* Wait until an 'ACK' or 'RST' arrives, or a timeout occurs */ 1438 if (sock->state == SS_CONNECTING) {
1367 timeout = tipc_sk(sk)->conn_timeout; 1439 if (!timeout)
1368 release_sock(sk); 1440 goto exit;
1369 res = wait_event_interruptible_timeout(*sk_sleep(sk),
1370 (!skb_queue_empty(&sk->sk_receive_queue) ||
1371 (sock->state != SS_CONNECTING)),
1372 timeout ? (long)msecs_to_jiffies(timeout)
1373 : MAX_SCHEDULE_TIMEOUT);
1374 lock_sock(sk);
1375 1441
1376 if (res > 0) { 1442 /* Wait until an 'ACK' or 'RST' arrives, or a timeout occurs */
1377 buf = skb_peek(&sk->sk_receive_queue); 1443 release_sock(sk);
1378 if (buf != NULL) { 1444 res = wait_event_interruptible_timeout(*sk_sleep(sk),
1379 msg = buf_msg(buf); 1445 sock->state != SS_CONNECTING,
1380 res = auto_connect(sock, msg); 1446 timeout ? (long)msecs_to_jiffies(timeout)
1381 if (!res) { 1447 : MAX_SCHEDULE_TIMEOUT);
1382 if (!msg_data_sz(msg)) 1448 lock_sock(sk);
1383 advance_rx_queue(sk); 1449 if (res <= 0) {
1384 } 1450 if (res == 0)
1385 } else { 1451 res = -ETIMEDOUT;
1386 if (sock->state == SS_CONNECTED)
1387 res = -EISCONN;
1388 else 1452 else
1389 res = -ECONNREFUSED; 1453 ; /* leave "res" unchanged */
1454 goto exit;
1390 } 1455 }
1391 } else {
1392 if (res == 0)
1393 res = -ETIMEDOUT;
1394 else
1395 ; /* leave "res" unchanged */
1396 sock->state = SS_DISCONNECTING;
1397 } 1456 }
1398 1457
1458 if (unlikely(sock->state == SS_DISCONNECTING))
1459 res = sock_error(sk);
1460 else
1461 res = 0;
1462
1399exit: 1463exit:
1400 release_sock(sk); 1464 release_sock(sk);
1401 return res; 1465 return res;
@@ -1436,8 +1500,13 @@ static int listen(struct socket *sock, int len)
1436 */ 1500 */
1437static int accept(struct socket *sock, struct socket *new_sock, int flags) 1501static int accept(struct socket *sock, struct socket *new_sock, int flags)
1438{ 1502{
1439 struct sock *sk = sock->sk; 1503 struct sock *new_sk, *sk = sock->sk;
1440 struct sk_buff *buf; 1504 struct sk_buff *buf;
1505 struct tipc_sock *new_tsock;
1506 struct tipc_port *new_tport;
1507 struct tipc_msg *msg;
1508 u32 new_ref;
1509
1441 int res; 1510 int res;
1442 1511
1443 lock_sock(sk); 1512 lock_sock(sk);
@@ -1463,48 +1532,52 @@ static int accept(struct socket *sock, struct socket *new_sock, int flags)
1463 buf = skb_peek(&sk->sk_receive_queue); 1532 buf = skb_peek(&sk->sk_receive_queue);
1464 1533
1465 res = tipc_create(sock_net(sock->sk), new_sock, 0, 0); 1534 res = tipc_create(sock_net(sock->sk), new_sock, 0, 0);
1466 if (!res) { 1535 if (res)
1467 struct sock *new_sk = new_sock->sk; 1536 goto exit;
1468 struct tipc_sock *new_tsock = tipc_sk(new_sk);
1469 struct tipc_port *new_tport = new_tsock->p;
1470 u32 new_ref = new_tport->ref;
1471 struct tipc_msg *msg = buf_msg(buf);
1472
1473 lock_sock(new_sk);
1474
1475 /*
1476 * Reject any stray messages received by new socket
1477 * before the socket lock was taken (very, very unlikely)
1478 */
1479 reject_rx_queue(new_sk);
1480
1481 /* Connect new socket to it's peer */
1482 new_tsock->peer_name.ref = msg_origport(msg);
1483 new_tsock->peer_name.node = msg_orignode(msg);
1484 tipc_connect2port(new_ref, &new_tsock->peer_name);
1485 new_sock->state = SS_CONNECTED;
1486
1487 tipc_set_portimportance(new_ref, msg_importance(msg));
1488 if (msg_named(msg)) {
1489 new_tport->conn_type = msg_nametype(msg);
1490 new_tport->conn_instance = msg_nameinst(msg);
1491 }
1492 1537
1493 /* 1538 new_sk = new_sock->sk;
1494 * Respond to 'SYN-' by discarding it & returning 'ACK'-. 1539 new_tsock = tipc_sk(new_sk);
1495 * Respond to 'SYN+' by queuing it on new socket. 1540 new_tport = new_tsock->p;
1496 */ 1541 new_ref = new_tport->ref;
1497 if (!msg_data_sz(msg)) { 1542 msg = buf_msg(buf);
1498 struct msghdr m = {NULL,};
1499 1543
1500 advance_rx_queue(sk); 1544 /* we lock on new_sk; but lockdep sees the lock on sk */
1501 send_packet(NULL, new_sock, &m, 0); 1545 lock_sock_nested(new_sk, SINGLE_DEPTH_NESTING);
1502 } else { 1546
1503 __skb_dequeue(&sk->sk_receive_queue); 1547 /*
1504 __skb_queue_head(&new_sk->sk_receive_queue, buf); 1548 * Reject any stray messages received by new socket
1505 } 1549 * before the socket lock was taken (very, very unlikely)
1506 release_sock(new_sk); 1550 */
1551 reject_rx_queue(new_sk);
1552
1553 /* Connect new socket to it's peer */
1554 new_tsock->peer_name.ref = msg_origport(msg);
1555 new_tsock->peer_name.node = msg_orignode(msg);
1556 tipc_connect(new_ref, &new_tsock->peer_name);
1557 new_sock->state = SS_CONNECTED;
1558
1559 tipc_set_portimportance(new_ref, msg_importance(msg));
1560 if (msg_named(msg)) {
1561 new_tport->conn_type = msg_nametype(msg);
1562 new_tport->conn_instance = msg_nameinst(msg);
1563 }
1564
1565 /*
1566 * Respond to 'SYN-' by discarding it & returning 'ACK'-.
1567 * Respond to 'SYN+' by queuing it on new socket.
1568 */
1569 if (!msg_data_sz(msg)) {
1570 struct msghdr m = {NULL,};
1571
1572 advance_rx_queue(sk);
1573 send_packet(NULL, new_sock, &m, 0);
1574 } else {
1575 __skb_dequeue(&sk->sk_receive_queue);
1576 __skb_queue_head(&new_sk->sk_receive_queue, buf);
1577 skb_set_owner_r(buf, new_sk);
1507 } 1578 }
1579 release_sock(new_sk);
1580
1508exit: 1581exit:
1509 release_sock(sk); 1582 release_sock(sk);
1510 return res; 1583 return res;
@@ -1539,7 +1612,6 @@ restart:
1539 /* Disconnect and send a 'FIN+' or 'FIN-' message to peer */ 1612 /* Disconnect and send a 'FIN+' or 'FIN-' message to peer */
1540 buf = __skb_dequeue(&sk->sk_receive_queue); 1613 buf = __skb_dequeue(&sk->sk_receive_queue);
1541 if (buf) { 1614 if (buf) {
1542 atomic_dec(&tipc_queue_size);
1543 if (TIPC_SKB_CB(buf)->handle != 0) { 1615 if (TIPC_SKB_CB(buf)->handle != 0) {
1544 kfree_skb(buf); 1616 kfree_skb(buf);
1545 goto restart; 1617 goto restart;
@@ -1556,10 +1628,11 @@ restart:
1556 1628
1557 case SS_DISCONNECTING: 1629 case SS_DISCONNECTING:
1558 1630
1559 /* Discard any unreceived messages; wake up sleeping tasks */ 1631 /* Discard any unreceived messages */
1560 discard_rx_queue(sk); 1632 __skb_queue_purge(&sk->sk_receive_queue);
1561 if (waitqueue_active(sk_sleep(sk))) 1633
1562 wake_up_interruptible(sk_sleep(sk)); 1634 /* Wake up anyone sleeping in poll */
1635 sk->sk_state_change(sk);
1563 res = 0; 1636 res = 0;
1564 break; 1637 break;
1565 1638
@@ -1677,7 +1750,7 @@ static int getsockopt(struct socket *sock,
1677 /* no need to set "res", since already 0 at this point */ 1750 /* no need to set "res", since already 0 at this point */
1678 break; 1751 break;
1679 case TIPC_NODE_RECVQ_DEPTH: 1752 case TIPC_NODE_RECVQ_DEPTH:
1680 value = (u32)atomic_read(&tipc_queue_size); 1753 value = 0; /* was tipc_queue_size, now obsolete */
1681 break; 1754 break;
1682 case TIPC_SOCK_RECVQ_DEPTH: 1755 case TIPC_SOCK_RECVQ_DEPTH:
1683 value = skb_queue_len(&sk->sk_receive_queue); 1756 value = skb_queue_len(&sk->sk_receive_queue);
diff --git a/net/tipc/subscr.c b/net/tipc/subscr.c
index 0f7d0d007e22..6b42d47029af 100644
--- a/net/tipc/subscr.c
+++ b/net/tipc/subscr.c
@@ -462,7 +462,7 @@ static void subscr_named_msg_event(void *usr_handle,
462 kfree(subscriber); 462 kfree(subscriber);
463 return; 463 return;
464 } 464 }
465 tipc_connect2port(subscriber->port_ref, orig); 465 tipc_connect(subscriber->port_ref, orig);
466 466
467 /* Lock server port (& save lock address for future use) */ 467 /* Lock server port (& save lock address for future use) */
468 subscriber->lock = tipc_port_lock(subscriber->port_ref)->lock; 468 subscriber->lock = tipc_port_lock(subscriber->port_ref)->lock;