aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc
diff options
context:
space:
mode:
authorJon Paul Maloy <jon.maloy@ericsson.com>2014-08-22 18:09:07 -0400
committerDavid S. Miller <davem@davemloft.net>2014-08-23 14:18:33 -0400
commit50100a5e39461b2a61d6040e73c384766c29975d (patch)
tree7f632d0f22af7f38c282603e5c9f4de3139bce59 /net/tipc
parent1dd0bd2b14032037d40a316dd52370f1713fa62b (diff)
tipc: use pseudo message to wake up sockets after link congestion
The current link implementation keeps a linked list of blocked ports/ sockets that is populated when there is link congestion. The purpose of this is to let the link know which users to wake up when the congestion abates. This adds unnecessary complexity to the data structure and the code, since it forces us to involve the link each time we want to delete a socket. It also forces us to grab the spinlock port_lock within the scope of node_lock. We want to get rid of this direct dependence, as well as the deadlock hazard resulting from the usage of port_lock. In this commit, we instead let the link keep list of a "wakeup" pseudo messages for use in such situations. Those messages are sent to the pending sockets via the ordinary message reception path, and wake up the socket's owner when they are received. This enables us to get rid of the 'waiting_ports' linked lists in struct tipc_port that manifest this direct reference. As a consequence, we can eliminate another BH entry into the socket, and hence the need to grab port_lock. This is a further step in our effort to remove port_lock altogether. Signed-off-by: Jon Maloy <jon.maloy@ericsson.com> Reviewed-by: Erik Hugne <erik.hugne@ericsson.com> Reviewed-by: Ying Xue <ying.xue@windriver.com> Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net/tipc')
-rw-r--r--net/tipc/bcast.c7
-rw-r--r--net/tipc/core.h5
-rw-r--r--net/tipc/link.c119
-rw-r--r--net/tipc/link.h7
-rw-r--r--net/tipc/msg.c7
-rw-r--r--net/tipc/msg.h1
-rw-r--r--net/tipc/node.c12
-rw-r--r--net/tipc/node.h4
-rw-r--r--net/tipc/port.c2
-rw-r--r--net/tipc/port.h4
-rw-r--r--net/tipc/socket.c15
-rw-r--r--net/tipc/socket.h7
12 files changed, 99 insertions, 91 deletions
diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index dd13bfa09333..9510fb2df566 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -300,8 +300,8 @@ void tipc_bclink_acknowledge(struct tipc_node *n_ptr, u32 acked)
300 tipc_link_push_queue(bcl); 300 tipc_link_push_queue(bcl);
301 bclink_set_last_sent(); 301 bclink_set_last_sent();
302 } 302 }
303 if (unlikely(released && !list_empty(&bcl->waiting_ports))) 303 if (unlikely(released && !skb_queue_empty(&bcl->waiting_sks)))
304 tipc_link_wakeup_ports(bcl, 0); 304 bclink->node.action_flags |= TIPC_WAKEUP_USERS;
305exit: 305exit:
306 tipc_bclink_unlock(); 306 tipc_bclink_unlock();
307} 307}
@@ -840,9 +840,10 @@ int tipc_bclink_init(void)
840 sprintf(bcbearer->media.name, "tipc-broadcast"); 840 sprintf(bcbearer->media.name, "tipc-broadcast");
841 841
842 spin_lock_init(&bclink->lock); 842 spin_lock_init(&bclink->lock);
843 INIT_LIST_HEAD(&bcl->waiting_ports); 843 __skb_queue_head_init(&bcl->waiting_sks);
844 bcl->next_out_no = 1; 844 bcl->next_out_no = 1;
845 spin_lock_init(&bclink->node.lock); 845 spin_lock_init(&bclink->node.lock);
846 __skb_queue_head_init(&bclink->node.waiting_sks);
846 bcl->owner = &bclink->node; 847 bcl->owner = &bclink->node;
847 bcl->max_pkt = MAX_PKT_DEFAULT_MCAST; 848 bcl->max_pkt = MAX_PKT_DEFAULT_MCAST;
848 tipc_link_set_queue_limits(bcl, BCLINK_WIN_DEFAULT); 849 tipc_link_set_queue_limits(bcl, BCLINK_WIN_DEFAULT);
diff --git a/net/tipc/core.h b/net/tipc/core.h
index bb26ed1ee966..d2607a8e2b80 100644
--- a/net/tipc/core.h
+++ b/net/tipc/core.h
@@ -187,8 +187,11 @@ static inline void k_term_timer(struct timer_list *timer)
187 187
188struct tipc_skb_cb { 188struct tipc_skb_cb {
189 void *handle; 189 void *handle;
190 bool deferred;
191 struct sk_buff *tail; 190 struct sk_buff *tail;
191 bool deferred;
192 bool wakeup_pending;
193 u16 chain_sz;
194 u16 chain_imp;
192}; 195};
193 196
194#define TIPC_SKB_CB(__skb) ((struct tipc_skb_cb *)&((__skb)->cb[0])) 197#define TIPC_SKB_CB(__skb) ((struct tipc_skb_cb *)&((__skb)->cb[0]))
diff --git a/net/tipc/link.c b/net/tipc/link.c
index fb1485dc6736..6c775a107a02 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -275,7 +275,7 @@ struct tipc_link *tipc_link_create(struct tipc_node *n_ptr,
275 link_init_max_pkt(l_ptr); 275 link_init_max_pkt(l_ptr);
276 276
277 l_ptr->next_out_no = 1; 277 l_ptr->next_out_no = 1;
278 INIT_LIST_HEAD(&l_ptr->waiting_ports); 278 __skb_queue_head_init(&l_ptr->waiting_sks);
279 279
280 link_reset_statistics(l_ptr); 280 link_reset_statistics(l_ptr);
281 281
@@ -322,66 +322,47 @@ void tipc_link_delete_list(unsigned int bearer_id, bool shutting_down)
322} 322}
323 323
324/** 324/**
325 * link_schedule_port - schedule port for deferred sending 325 * link_schedule_user - schedule user for wakeup after congestion
326 * @l_ptr: pointer to link 326 * @link: congested link
327 * @origport: reference to sending port 327 * @oport: sending port
328 * @sz: amount of data to be sent 328 * @chain_sz: size of buffer chain that was attempted sent
329 * 329 * @imp: importance of message attempted sent
330 * Schedules port for renewed sending of messages after link congestion 330 * Create pseudo msg to send back to user when congestion abates
331 * has abated.
332 */ 331 */
333static int link_schedule_port(struct tipc_link *l_ptr, u32 origport, u32 sz) 332static bool link_schedule_user(struct tipc_link *link, u32 oport,
333 uint chain_sz, uint imp)
334{ 334{
335 struct tipc_port *p_ptr; 335 struct sk_buff *buf;
336 struct tipc_sock *tsk;
337 336
338 spin_lock_bh(&tipc_port_list_lock); 337 buf = tipc_msg_create(SOCK_WAKEUP, 0, INT_H_SIZE, 0, tipc_own_addr,
339 p_ptr = tipc_port_lock(origport); 338 tipc_own_addr, oport, 0, 0);
340 if (p_ptr) { 339 if (!buf)
341 if (!list_empty(&p_ptr->wait_list)) 340 return false;
342 goto exit; 341 TIPC_SKB_CB(buf)->chain_sz = chain_sz;
343 tsk = tipc_port_to_sock(p_ptr); 342 TIPC_SKB_CB(buf)->chain_imp = imp;
344 tsk->link_cong = 1; 343 __skb_queue_tail(&link->waiting_sks, buf);
345 p_ptr->waiting_pkts = 1 + ((sz - 1) / l_ptr->max_pkt); 344 link->stats.link_congs++;
346 list_add_tail(&p_ptr->wait_list, &l_ptr->waiting_ports); 345 return true;
347 l_ptr->stats.link_congs++;
348exit:
349 tipc_port_unlock(p_ptr);
350 }
351 spin_unlock_bh(&tipc_port_list_lock);
352 return -ELINKCONG;
353} 346}
354 347
355void tipc_link_wakeup_ports(struct tipc_link *l_ptr, int all) 348/**
349 * link_prepare_wakeup - prepare users for wakeup after congestion
350 * @link: congested link
351 * Move a number of waiting users, as permitted by available space in
352 * the send queue, from link wait queue to node wait queue for wakeup
353 */
354static void link_prepare_wakeup(struct tipc_link *link)
356{ 355{
357 struct tipc_port *p_ptr; 356 struct sk_buff_head *wq = &link->waiting_sks;
358 struct tipc_sock *tsk; 357 struct sk_buff *buf;
359 struct tipc_port *temp_p_ptr; 358 uint pend_qsz = link->out_queue_size;
360 int win = l_ptr->queue_limit[0] - l_ptr->out_queue_size; 359
361 360 for (buf = skb_peek(wq); buf; buf = skb_peek(wq)) {
362 if (all) 361 if (pend_qsz >= link->queue_limit[TIPC_SKB_CB(buf)->chain_imp])
363 win = 100000;
364 if (win <= 0)
365 return;
366 if (!spin_trylock_bh(&tipc_port_list_lock))
367 return;
368 if (link_congested(l_ptr))
369 goto exit;
370 list_for_each_entry_safe(p_ptr, temp_p_ptr, &l_ptr->waiting_ports,
371 wait_list) {
372 if (win <= 0)
373 break; 362 break;
374 tsk = tipc_port_to_sock(p_ptr); 363 pend_qsz += TIPC_SKB_CB(buf)->chain_sz;
375 list_del_init(&p_ptr->wait_list); 364 __skb_queue_tail(&link->owner->waiting_sks, __skb_dequeue(wq));
376 spin_lock_bh(p_ptr->lock);
377 tsk->link_cong = 0;
378 tipc_sock_wakeup(tsk);
379 win -= p_ptr->waiting_pkts;
380 spin_unlock_bh(p_ptr->lock);
381 } 365 }
382
383exit:
384 spin_unlock_bh(&tipc_port_list_lock);
385} 366}
386 367
387/** 368/**
@@ -423,6 +404,7 @@ void tipc_link_reset(struct tipc_link *l_ptr)
423 u32 prev_state = l_ptr->state; 404 u32 prev_state = l_ptr->state;
424 u32 checkpoint = l_ptr->next_in_no; 405 u32 checkpoint = l_ptr->next_in_no;
425 int was_active_link = tipc_link_is_active(l_ptr); 406 int was_active_link = tipc_link_is_active(l_ptr);
407 struct tipc_node *owner = l_ptr->owner;
426 408
427 msg_set_session(l_ptr->pmsg, ((msg_session(l_ptr->pmsg) + 1) & 0xffff)); 409 msg_set_session(l_ptr->pmsg, ((msg_session(l_ptr->pmsg) + 1) & 0xffff));
428 410
@@ -450,9 +432,10 @@ void tipc_link_reset(struct tipc_link *l_ptr)
450 kfree_skb(l_ptr->proto_msg_queue); 432 kfree_skb(l_ptr->proto_msg_queue);
451 l_ptr->proto_msg_queue = NULL; 433 l_ptr->proto_msg_queue = NULL;
452 kfree_skb_list(l_ptr->oldest_deferred_in); 434 kfree_skb_list(l_ptr->oldest_deferred_in);
453 if (!list_empty(&l_ptr->waiting_ports)) 435 if (!skb_queue_empty(&l_ptr->waiting_sks)) {
454 tipc_link_wakeup_ports(l_ptr, 1); 436 skb_queue_splice_init(&l_ptr->waiting_sks, &owner->waiting_sks);
455 437 owner->action_flags |= TIPC_WAKEUP_USERS;
438 }
456 l_ptr->retransm_queue_head = 0; 439 l_ptr->retransm_queue_head = 0;
457 l_ptr->retransm_queue_size = 0; 440 l_ptr->retransm_queue_size = 0;
458 l_ptr->last_out = NULL; 441 l_ptr->last_out = NULL;
@@ -688,19 +671,23 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)
688static int tipc_link_cong(struct tipc_link *link, struct sk_buff *buf) 671static int tipc_link_cong(struct tipc_link *link, struct sk_buff *buf)
689{ 672{
690 struct tipc_msg *msg = buf_msg(buf); 673 struct tipc_msg *msg = buf_msg(buf);
691 uint psz = msg_size(msg);
692 uint imp = tipc_msg_tot_importance(msg); 674 uint imp = tipc_msg_tot_importance(msg);
693 u32 oport = msg_tot_origport(msg); 675 u32 oport = msg_tot_origport(msg);
694 676
695 if (likely(imp <= TIPC_CRITICAL_IMPORTANCE)) { 677 if (unlikely(imp > TIPC_CRITICAL_IMPORTANCE)) {
696 if (!msg_errcode(msg) && !msg_reroute_cnt(msg)) {
697 link_schedule_port(link, oport, psz);
698 return -ELINKCONG;
699 }
700 } else {
701 pr_warn("%s<%s>, send queue full", link_rst_msg, link->name); 678 pr_warn("%s<%s>, send queue full", link_rst_msg, link->name);
702 tipc_link_reset(link); 679 tipc_link_reset(link);
680 goto drop;
703 } 681 }
682 if (unlikely(msg_errcode(msg)))
683 goto drop;
684 if (unlikely(msg_reroute_cnt(msg)))
685 goto drop;
686 if (TIPC_SKB_CB(buf)->wakeup_pending)
687 return -ELINKCONG;
688 if (link_schedule_user(link, oport, TIPC_SKB_CB(buf)->chain_sz, imp))
689 return -ELINKCONG;
690drop:
704 kfree_skb_list(buf); 691 kfree_skb_list(buf);
705 return -EHOSTUNREACH; 692 return -EHOSTUNREACH;
706} 693}
@@ -1202,8 +1189,10 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr)
1202 if (unlikely(l_ptr->next_out)) 1189 if (unlikely(l_ptr->next_out))
1203 tipc_link_push_queue(l_ptr); 1190 tipc_link_push_queue(l_ptr);
1204 1191
1205 if (unlikely(!list_empty(&l_ptr->waiting_ports))) 1192 if (released && !skb_queue_empty(&l_ptr->waiting_sks)) {
1206 tipc_link_wakeup_ports(l_ptr, 0); 1193 link_prepare_wakeup(l_ptr);
1194 l_ptr->owner->action_flags |= TIPC_WAKEUP_USERS;
1195 }
1207 1196
1208 /* Process the incoming packet */ 1197 /* Process the incoming packet */
1209 if (unlikely(!link_working_working(l_ptr))) { 1198 if (unlikely(!link_working_working(l_ptr))) {
diff --git a/net/tipc/link.h b/net/tipc/link.h
index 782983ccd323..b567a3427fda 100644
--- a/net/tipc/link.h
+++ b/net/tipc/link.h
@@ -1,7 +1,7 @@
1/* 1/*
2 * net/tipc/link.h: Include file for TIPC link code 2 * net/tipc/link.h: Include file for TIPC link code
3 * 3 *
4 * Copyright (c) 1995-2006, 2013, Ericsson AB 4 * Copyright (c) 1995-2006, 2013-2014, Ericsson AB
5 * Copyright (c) 2004-2005, 2010-2011, Wind River Systems 5 * Copyright (c) 2004-2005, 2010-2011, Wind River Systems
6 * All rights reserved. 6 * All rights reserved.
7 * 7 *
@@ -133,7 +133,7 @@ struct tipc_stats {
133 * @retransm_queue_size: number of messages to retransmit 133 * @retransm_queue_size: number of messages to retransmit
134 * @retransm_queue_head: sequence number of first message to retransmit 134 * @retransm_queue_head: sequence number of first message to retransmit
135 * @next_out: ptr to first unsent outbound message in queue 135 * @next_out: ptr to first unsent outbound message in queue
136 * @waiting_ports: linked list of ports waiting for link congestion to abate 136 * @waiting_sks: linked list of sockets waiting for link congestion to abate
137 * @long_msg_seq_no: next identifier to use for outbound fragmented messages 137 * @long_msg_seq_no: next identifier to use for outbound fragmented messages
138 * @reasm_buf: head of partially reassembled inbound message fragments 138 * @reasm_buf: head of partially reassembled inbound message fragments
139 * @stats: collects statistics regarding link activity 139 * @stats: collects statistics regarding link activity
@@ -194,7 +194,7 @@ struct tipc_link {
194 u32 retransm_queue_size; 194 u32 retransm_queue_size;
195 u32 retransm_queue_head; 195 u32 retransm_queue_head;
196 struct sk_buff *next_out; 196 struct sk_buff *next_out;
197 struct list_head waiting_ports; 197 struct sk_buff_head waiting_sks;
198 198
199 /* Fragmentation/reassembly */ 199 /* Fragmentation/reassembly */
200 u32 long_msg_seq_no; 200 u32 long_msg_seq_no;
@@ -235,7 +235,6 @@ void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int prob,
235void tipc_link_push_queue(struct tipc_link *l_ptr); 235void tipc_link_push_queue(struct tipc_link *l_ptr);
236u32 tipc_link_defer_pkt(struct sk_buff **head, struct sk_buff **tail, 236u32 tipc_link_defer_pkt(struct sk_buff **head, struct sk_buff **tail,
237 struct sk_buff *buf); 237 struct sk_buff *buf);
238void tipc_link_wakeup_ports(struct tipc_link *l_ptr, int all);
239void tipc_link_set_queue_limits(struct tipc_link *l_ptr, u32 window); 238void tipc_link_set_queue_limits(struct tipc_link *l_ptr, u32 window);
240void tipc_link_retransmit(struct tipc_link *l_ptr, 239void tipc_link_retransmit(struct tipc_link *l_ptr,
241 struct sk_buff *start, u32 retransmits); 240 struct sk_buff *start, u32 retransmits);
diff --git a/net/tipc/msg.c b/net/tipc/msg.c
index fdb92e247050..74745a47d72a 100644
--- a/net/tipc/msg.c
+++ b/net/tipc/msg.c
@@ -182,7 +182,7 @@ int tipc_msg_build(struct tipc_msg *mhdr, struct iovec const *iov,
182 struct sk_buff *buf, *prev; 182 struct sk_buff *buf, *prev;
183 char *pktpos; 183 char *pktpos;
184 int rc; 184 int rc;
185 185 uint chain_sz = 0;
186 msg_set_size(mhdr, msz); 186 msg_set_size(mhdr, msz);
187 187
188 /* No fragmentation needed? */ 188 /* No fragmentation needed? */
@@ -193,6 +193,7 @@ int tipc_msg_build(struct tipc_msg *mhdr, struct iovec const *iov,
193 return -ENOMEM; 193 return -ENOMEM;
194 skb_copy_to_linear_data(buf, mhdr, mhsz); 194 skb_copy_to_linear_data(buf, mhdr, mhsz);
195 pktpos = buf->data + mhsz; 195 pktpos = buf->data + mhsz;
196 TIPC_SKB_CB(buf)->chain_sz = 1;
196 if (!dsz || !memcpy_fromiovecend(pktpos, iov, offset, dsz)) 197 if (!dsz || !memcpy_fromiovecend(pktpos, iov, offset, dsz))
197 return dsz; 198 return dsz;
198 rc = -EFAULT; 199 rc = -EFAULT;
@@ -209,6 +210,7 @@ int tipc_msg_build(struct tipc_msg *mhdr, struct iovec const *iov,
209 *chain = buf = tipc_buf_acquire(pktmax); 210 *chain = buf = tipc_buf_acquire(pktmax);
210 if (!buf) 211 if (!buf)
211 return -ENOMEM; 212 return -ENOMEM;
213 chain_sz = 1;
212 pktpos = buf->data; 214 pktpos = buf->data;
213 skb_copy_to_linear_data(buf, &pkthdr, INT_H_SIZE); 215 skb_copy_to_linear_data(buf, &pkthdr, INT_H_SIZE);
214 pktpos += INT_H_SIZE; 216 pktpos += INT_H_SIZE;
@@ -242,6 +244,7 @@ int tipc_msg_build(struct tipc_msg *mhdr, struct iovec const *iov,
242 rc = -ENOMEM; 244 rc = -ENOMEM;
243 goto error; 245 goto error;
244 } 246 }
247 chain_sz++;
245 prev->next = buf; 248 prev->next = buf;
246 msg_set_type(&pkthdr, FRAGMENT); 249 msg_set_type(&pkthdr, FRAGMENT);
247 msg_set_size(&pkthdr, pktsz); 250 msg_set_size(&pkthdr, pktsz);
@@ -251,7 +254,7 @@ int tipc_msg_build(struct tipc_msg *mhdr, struct iovec const *iov,
251 pktrem = pktsz - INT_H_SIZE; 254 pktrem = pktsz - INT_H_SIZE;
252 255
253 } while (1); 256 } while (1);
254 257 TIPC_SKB_CB(*chain)->chain_sz = chain_sz;
255 msg_set_type(buf_msg(buf), LAST_FRAGMENT); 258 msg_set_type(buf_msg(buf), LAST_FRAGMENT);
256 return dsz; 259 return dsz;
257error: 260error:
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index 3045b2cfbff8..0ea7b695ac4d 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -442,6 +442,7 @@ static inline struct tipc_msg *msg_get_wrapped(struct tipc_msg *m)
442#define NAME_DISTRIBUTOR 11 442#define NAME_DISTRIBUTOR 11
443#define MSG_FRAGMENTER 12 443#define MSG_FRAGMENTER 12
444#define LINK_CONFIG 13 444#define LINK_CONFIG 13
445#define SOCK_WAKEUP 14 /* pseudo user */
445 446
446/* 447/*
447 * Connection management protocol message types 448 * Connection management protocol message types
diff --git a/net/tipc/node.c b/net/tipc/node.c
index f7069299943f..6ea2c15cfc88 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -38,6 +38,7 @@
38#include "config.h" 38#include "config.h"
39#include "node.h" 39#include "node.h"
40#include "name_distr.h" 40#include "name_distr.h"
41#include "socket.h"
41 42
42#define NODE_HTABLE_SIZE 512 43#define NODE_HTABLE_SIZE 512
43 44
@@ -100,6 +101,7 @@ struct tipc_node *tipc_node_create(u32 addr)
100 INIT_HLIST_NODE(&n_ptr->hash); 101 INIT_HLIST_NODE(&n_ptr->hash);
101 INIT_LIST_HEAD(&n_ptr->list); 102 INIT_LIST_HEAD(&n_ptr->list);
102 INIT_LIST_HEAD(&n_ptr->nsub); 103 INIT_LIST_HEAD(&n_ptr->nsub);
104 __skb_queue_head_init(&n_ptr->waiting_sks);
103 105
104 hlist_add_head_rcu(&n_ptr->hash, &node_htable[tipc_hashfn(addr)]); 106 hlist_add_head_rcu(&n_ptr->hash, &node_htable[tipc_hashfn(addr)]);
105 107
@@ -474,6 +476,7 @@ int tipc_node_get_linkname(u32 bearer_id, u32 addr, char *linkname, size_t len)
474void tipc_node_unlock(struct tipc_node *node) 476void tipc_node_unlock(struct tipc_node *node)
475{ 477{
476 LIST_HEAD(nsub_list); 478 LIST_HEAD(nsub_list);
479 struct sk_buff_head waiting_sks;
477 u32 addr = 0; 480 u32 addr = 0;
478 481
479 if (likely(!node->action_flags)) { 482 if (likely(!node->action_flags)) {
@@ -481,6 +484,11 @@ void tipc_node_unlock(struct tipc_node *node)
481 return; 484 return;
482 } 485 }
483 486
487 __skb_queue_head_init(&waiting_sks);
488 if (node->action_flags & TIPC_WAKEUP_USERS) {
489 skb_queue_splice_init(&node->waiting_sks, &waiting_sks);
490 node->action_flags &= ~TIPC_WAKEUP_USERS;
491 }
484 if (node->action_flags & TIPC_NOTIFY_NODE_DOWN) { 492 if (node->action_flags & TIPC_NOTIFY_NODE_DOWN) {
485 list_replace_init(&node->nsub, &nsub_list); 493 list_replace_init(&node->nsub, &nsub_list);
486 node->action_flags &= ~TIPC_NOTIFY_NODE_DOWN; 494 node->action_flags &= ~TIPC_NOTIFY_NODE_DOWN;
@@ -491,8 +499,12 @@ void tipc_node_unlock(struct tipc_node *node)
491 } 499 }
492 spin_unlock_bh(&node->lock); 500 spin_unlock_bh(&node->lock);
493 501
502 while (!skb_queue_empty(&waiting_sks))
503 tipc_sk_rcv(__skb_dequeue(&waiting_sks));
504
494 if (!list_empty(&nsub_list)) 505 if (!list_empty(&nsub_list))
495 tipc_nodesub_notify(&nsub_list); 506 tipc_nodesub_notify(&nsub_list);
507
496 if (addr) 508 if (addr)
497 tipc_named_node_up(addr); 509 tipc_named_node_up(addr);
498} 510}
diff --git a/net/tipc/node.h b/net/tipc/node.h
index b61716a8218e..2ebf9e8b50fd 100644
--- a/net/tipc/node.h
+++ b/net/tipc/node.h
@@ -58,7 +58,8 @@ enum {
58 TIPC_WAIT_PEER_LINKS_DOWN = (1 << 1), 58 TIPC_WAIT_PEER_LINKS_DOWN = (1 << 1),
59 TIPC_WAIT_OWN_LINKS_DOWN = (1 << 2), 59 TIPC_WAIT_OWN_LINKS_DOWN = (1 << 2),
60 TIPC_NOTIFY_NODE_DOWN = (1 << 3), 60 TIPC_NOTIFY_NODE_DOWN = (1 << 3),
61 TIPC_NOTIFY_NODE_UP = (1 << 4) 61 TIPC_NOTIFY_NODE_UP = (1 << 4),
62 TIPC_WAKEUP_USERS = (1 << 5)
62}; 63};
63 64
64/** 65/**
@@ -115,6 +116,7 @@ struct tipc_node {
115 int working_links; 116 int working_links;
116 u32 signature; 117 u32 signature;
117 struct list_head nsub; 118 struct list_head nsub;
119 struct sk_buff_head waiting_sks;
118 struct rcu_head rcu; 120 struct rcu_head rcu;
119}; 121};
120 122
diff --git a/net/tipc/port.c b/net/tipc/port.c
index 7e096a5e7701..b58a777a4399 100644
--- a/net/tipc/port.c
+++ b/net/tipc/port.c
@@ -92,7 +92,6 @@ u32 tipc_port_init(struct tipc_port *p_ptr,
92 92
93 p_ptr->max_pkt = MAX_PKT_DEFAULT; 93 p_ptr->max_pkt = MAX_PKT_DEFAULT;
94 p_ptr->ref = ref; 94 p_ptr->ref = ref;
95 INIT_LIST_HEAD(&p_ptr->wait_list);
96 INIT_LIST_HEAD(&p_ptr->subscription.nodesub_list); 95 INIT_LIST_HEAD(&p_ptr->subscription.nodesub_list);
97 k_init_timer(&p_ptr->timer, (Handler)port_timeout, ref); 96 k_init_timer(&p_ptr->timer, (Handler)port_timeout, ref);
98 INIT_LIST_HEAD(&p_ptr->publications); 97 INIT_LIST_HEAD(&p_ptr->publications);
@@ -134,7 +133,6 @@ void tipc_port_destroy(struct tipc_port *p_ptr)
134 } 133 }
135 spin_lock_bh(&tipc_port_list_lock); 134 spin_lock_bh(&tipc_port_list_lock);
136 list_del(&p_ptr->port_list); 135 list_del(&p_ptr->port_list);
137 list_del(&p_ptr->wait_list);
138 spin_unlock_bh(&tipc_port_list_lock); 136 spin_unlock_bh(&tipc_port_list_lock);
139 k_term_timer(&p_ptr->timer); 137 k_term_timer(&p_ptr->timer);
140} 138}
diff --git a/net/tipc/port.h b/net/tipc/port.h
index 3087da39ee47..6cdc7de8c9b8 100644
--- a/net/tipc/port.h
+++ b/net/tipc/port.h
@@ -58,8 +58,6 @@
58 * @ref: unique reference to port in TIPC object registry 58 * @ref: unique reference to port in TIPC object registry
59 * @phdr: preformatted message header used when sending messages 59 * @phdr: preformatted message header used when sending messages
60 * @port_list: adjacent ports in TIPC's global list of ports 60 * @port_list: adjacent ports in TIPC's global list of ports
61 * @wait_list: adjacent ports in list of ports waiting on link congestion
62 * @waiting_pkts:
63 * @publications: list of publications for port 61 * @publications: list of publications for port
64 * @pub_count: total # of publications port has made during its lifetime 62 * @pub_count: total # of publications port has made during its lifetime
65 * @probing_state: 63 * @probing_state:
@@ -77,8 +75,6 @@ struct tipc_port {
77 u32 ref; 75 u32 ref;
78 struct tipc_msg phdr; 76 struct tipc_msg phdr;
79 struct list_head port_list; 77 struct list_head port_list;
80 struct list_head wait_list;
81 u32 waiting_pkts;
82 struct list_head publications; 78 struct list_head publications;
83 u32 pub_count; 79 u32 pub_count;
84 u32 probing_state; 80 u32 probing_state;
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index ff8c8118d56e..a8be4d2001f7 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -579,6 +579,7 @@ new_mtu:
579 goto new_mtu; 579 goto new_mtu;
580 if (rc != -ELINKCONG) 580 if (rc != -ELINKCONG)
581 break; 581 break;
582 tipc_sk(sk)->link_cong = 1;
582 rc = tipc_wait_for_sndmsg(sock, &timeo); 583 rc = tipc_wait_for_sndmsg(sock, &timeo);
583 if (rc) 584 if (rc)
584 kfree_skb_list(buf); 585 kfree_skb_list(buf);
@@ -651,7 +652,7 @@ static int tipc_sk_proto_rcv(struct tipc_sock *tsk, u32 *dnode,
651 conn_cong = tipc_sk_conn_cong(tsk); 652 conn_cong = tipc_sk_conn_cong(tsk);
652 tsk->sent_unacked -= msg_msgcnt(msg); 653 tsk->sent_unacked -= msg_msgcnt(msg);
653 if (conn_cong) 654 if (conn_cong)
654 tipc_sock_wakeup(tsk); 655 tsk->sk.sk_write_space(&tsk->sk);
655 } else if (msg_type(msg) == CONN_PROBE) { 656 } else if (msg_type(msg) == CONN_PROBE) {
656 if (!tipc_msg_reverse(buf, dnode, TIPC_OK)) 657 if (!tipc_msg_reverse(buf, dnode, TIPC_OK))
657 return TIPC_OK; 658 return TIPC_OK;
@@ -826,6 +827,7 @@ new_mtu:
826 goto exit; 827 goto exit;
827 828
828 do { 829 do {
830 TIPC_SKB_CB(buf)->wakeup_pending = tsk->link_cong;
829 rc = tipc_link_xmit(buf, dnode, tsk->port.ref); 831 rc = tipc_link_xmit(buf, dnode, tsk->port.ref);
830 if (likely(rc >= 0)) { 832 if (likely(rc >= 0)) {
831 if (sock->state != SS_READY) 833 if (sock->state != SS_READY)
@@ -835,10 +837,9 @@ new_mtu:
835 } 837 }
836 if (rc == -EMSGSIZE) 838 if (rc == -EMSGSIZE)
837 goto new_mtu; 839 goto new_mtu;
838
839 if (rc != -ELINKCONG) 840 if (rc != -ELINKCONG)
840 break; 841 break;
841 842 tsk->link_cong = 1;
842 rc = tipc_wait_for_sndmsg(sock, &timeo); 843 rc = tipc_wait_for_sndmsg(sock, &timeo);
843 if (rc) 844 if (rc)
844 kfree_skb_list(buf); 845 kfree_skb_list(buf);
@@ -953,6 +954,7 @@ next:
953 } 954 }
954 if (rc != -ELINKCONG) 955 if (rc != -ELINKCONG)
955 break; 956 break;
957 tsk->link_cong = 1;
956 } 958 }
957 rc = tipc_wait_for_sndpkt(sock, &timeo); 959 rc = tipc_wait_for_sndpkt(sock, &timeo);
958 if (rc) 960 if (rc)
@@ -1518,6 +1520,13 @@ static int filter_rcv(struct sock *sk, struct sk_buff *buf)
1518 if (unlikely(msg_user(msg) == CONN_MANAGER)) 1520 if (unlikely(msg_user(msg) == CONN_MANAGER))
1519 return tipc_sk_proto_rcv(tsk, &onode, buf); 1521 return tipc_sk_proto_rcv(tsk, &onode, buf);
1520 1522
1523 if (unlikely(msg_user(msg) == SOCK_WAKEUP)) {
1524 kfree_skb(buf);
1525 tsk->link_cong = 0;
1526 sk->sk_write_space(sk);
1527 return TIPC_OK;
1528 }
1529
1521 /* Reject message if it is wrong sort of message for socket */ 1530 /* Reject message if it is wrong sort of message for socket */
1522 if (msg_type(msg) > TIPC_DIRECT_MSG) 1531 if (msg_type(msg) > TIPC_DIRECT_MSG)
1523 return -TIPC_ERR_NO_PORT; 1532 return -TIPC_ERR_NO_PORT;
diff --git a/net/tipc/socket.h b/net/tipc/socket.h
index 43b75b3ceced..1405633362f5 100644
--- a/net/tipc/socket.h
+++ b/net/tipc/socket.h
@@ -58,7 +58,7 @@ struct tipc_sock {
58 struct tipc_port port; 58 struct tipc_port port;
59 unsigned int conn_timeout; 59 unsigned int conn_timeout;
60 atomic_t dupl_rcvcnt; 60 atomic_t dupl_rcvcnt;
61 int link_cong; 61 bool link_cong;
62 uint sent_unacked; 62 uint sent_unacked;
63 uint rcv_unacked; 63 uint rcv_unacked;
64}; 64};
@@ -73,11 +73,6 @@ static inline struct tipc_sock *tipc_port_to_sock(const struct tipc_port *port)
73 return container_of(port, struct tipc_sock, port); 73 return container_of(port, struct tipc_sock, port);
74} 74}
75 75
76static inline void tipc_sock_wakeup(struct tipc_sock *tsk)
77{
78 tsk->sk.sk_write_space(&tsk->sk);
79}
80
81static inline int tipc_sk_conn_cong(struct tipc_sock *tsk) 76static inline int tipc_sk_conn_cong(struct tipc_sock *tsk)
82{ 77{
83 return tsk->sent_unacked >= TIPC_FLOWCTRL_WIN; 78 return tsk->sent_unacked >= TIPC_FLOWCTRL_WIN;