aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc/link.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/tipc/link.c')
-rw-r--r--net/tipc/link.c127
1 files changed, 60 insertions, 67 deletions
diff --git a/net/tipc/link.c b/net/tipc/link.c
index fb1485dc6736..1db162aa64a5 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -36,7 +36,6 @@
36 36
37#include "core.h" 37#include "core.h"
38#include "link.h" 38#include "link.h"
39#include "port.h"
40#include "socket.h" 39#include "socket.h"
41#include "name_distr.h" 40#include "name_distr.h"
42#include "discover.h" 41#include "discover.h"
@@ -275,7 +274,7 @@ struct tipc_link *tipc_link_create(struct tipc_node *n_ptr,
275 link_init_max_pkt(l_ptr); 274 link_init_max_pkt(l_ptr);
276 275
277 l_ptr->next_out_no = 1; 276 l_ptr->next_out_no = 1;
278 INIT_LIST_HEAD(&l_ptr->waiting_ports); 277 __skb_queue_head_init(&l_ptr->waiting_sks);
279 278
280 link_reset_statistics(l_ptr); 279 link_reset_statistics(l_ptr);
281 280
@@ -322,66 +321,47 @@ void tipc_link_delete_list(unsigned int bearer_id, bool shutting_down)
322} 321}
323 322
324/** 323/**
325 * link_schedule_port - schedule port for deferred sending 324 * link_schedule_user - schedule user for wakeup after congestion
326 * @l_ptr: pointer to link 325 * @link: congested link
327 * @origport: reference to sending port 326 * @oport: sending port
328 * @sz: amount of data to be sent 327 * @chain_sz: size of buffer chain that was attempted sent
329 * 328 * @imp: importance of message attempted sent
330 * Schedules port for renewed sending of messages after link congestion 329 * Create pseudo msg to send back to user when congestion abates
331 * has abated.
332 */ 330 */
333static int link_schedule_port(struct tipc_link *l_ptr, u32 origport, u32 sz) 331static bool link_schedule_user(struct tipc_link *link, u32 oport,
332 uint chain_sz, uint imp)
334{ 333{
335 struct tipc_port *p_ptr; 334 struct sk_buff *buf;
336 struct tipc_sock *tsk;
337 335
338 spin_lock_bh(&tipc_port_list_lock); 336 buf = tipc_msg_create(SOCK_WAKEUP, 0, INT_H_SIZE, 0, tipc_own_addr,
339 p_ptr = tipc_port_lock(origport); 337 tipc_own_addr, oport, 0, 0);
340 if (p_ptr) { 338 if (!buf)
341 if (!list_empty(&p_ptr->wait_list)) 339 return false;
342 goto exit; 340 TIPC_SKB_CB(buf)->chain_sz = chain_sz;
343 tsk = tipc_port_to_sock(p_ptr); 341 TIPC_SKB_CB(buf)->chain_imp = imp;
344 tsk->link_cong = 1; 342 __skb_queue_tail(&link->waiting_sks, buf);
345 p_ptr->waiting_pkts = 1 + ((sz - 1) / l_ptr->max_pkt); 343 link->stats.link_congs++;
346 list_add_tail(&p_ptr->wait_list, &l_ptr->waiting_ports); 344 return true;
347 l_ptr->stats.link_congs++;
348exit:
349 tipc_port_unlock(p_ptr);
350 }
351 spin_unlock_bh(&tipc_port_list_lock);
352 return -ELINKCONG;
353} 345}
354 346
355void tipc_link_wakeup_ports(struct tipc_link *l_ptr, int all) 347/**
348 * link_prepare_wakeup - prepare users for wakeup after congestion
349 * @link: congested link
350 * Move a number of waiting users, as permitted by available space in
351 * the send queue, from link wait queue to node wait queue for wakeup
352 */
353static void link_prepare_wakeup(struct tipc_link *link)
356{ 354{
357 struct tipc_port *p_ptr; 355 struct sk_buff_head *wq = &link->waiting_sks;
358 struct tipc_sock *tsk; 356 struct sk_buff *buf;
359 struct tipc_port *temp_p_ptr; 357 uint pend_qsz = link->out_queue_size;
360 int win = l_ptr->queue_limit[0] - l_ptr->out_queue_size; 358
361 359 for (buf = skb_peek(wq); buf; buf = skb_peek(wq)) {
362 if (all) 360 if (pend_qsz >= link->queue_limit[TIPC_SKB_CB(buf)->chain_imp])
363 win = 100000;
364 if (win <= 0)
365 return;
366 if (!spin_trylock_bh(&tipc_port_list_lock))
367 return;
368 if (link_congested(l_ptr))
369 goto exit;
370 list_for_each_entry_safe(p_ptr, temp_p_ptr, &l_ptr->waiting_ports,
371 wait_list) {
372 if (win <= 0)
373 break; 361 break;
374 tsk = tipc_port_to_sock(p_ptr); 362 pend_qsz += TIPC_SKB_CB(buf)->chain_sz;
375 list_del_init(&p_ptr->wait_list); 363 __skb_queue_tail(&link->owner->waiting_sks, __skb_dequeue(wq));
376 spin_lock_bh(p_ptr->lock);
377 tsk->link_cong = 0;
378 tipc_sock_wakeup(tsk);
379 win -= p_ptr->waiting_pkts;
380 spin_unlock_bh(p_ptr->lock);
381 } 364 }
382
383exit:
384 spin_unlock_bh(&tipc_port_list_lock);
385} 365}
386 366
387/** 367/**
@@ -423,6 +403,7 @@ void tipc_link_reset(struct tipc_link *l_ptr)
423 u32 prev_state = l_ptr->state; 403 u32 prev_state = l_ptr->state;
424 u32 checkpoint = l_ptr->next_in_no; 404 u32 checkpoint = l_ptr->next_in_no;
425 int was_active_link = tipc_link_is_active(l_ptr); 405 int was_active_link = tipc_link_is_active(l_ptr);
406 struct tipc_node *owner = l_ptr->owner;
426 407
427 msg_set_session(l_ptr->pmsg, ((msg_session(l_ptr->pmsg) + 1) & 0xffff)); 408 msg_set_session(l_ptr->pmsg, ((msg_session(l_ptr->pmsg) + 1) & 0xffff));
428 409
@@ -450,9 +431,10 @@ void tipc_link_reset(struct tipc_link *l_ptr)
450 kfree_skb(l_ptr->proto_msg_queue); 431 kfree_skb(l_ptr->proto_msg_queue);
451 l_ptr->proto_msg_queue = NULL; 432 l_ptr->proto_msg_queue = NULL;
452 kfree_skb_list(l_ptr->oldest_deferred_in); 433 kfree_skb_list(l_ptr->oldest_deferred_in);
453 if (!list_empty(&l_ptr->waiting_ports)) 434 if (!skb_queue_empty(&l_ptr->waiting_sks)) {
454 tipc_link_wakeup_ports(l_ptr, 1); 435 skb_queue_splice_init(&l_ptr->waiting_sks, &owner->waiting_sks);
455 436 owner->action_flags |= TIPC_WAKEUP_USERS;
437 }
456 l_ptr->retransm_queue_head = 0; 438 l_ptr->retransm_queue_head = 0;
457 l_ptr->retransm_queue_size = 0; 439 l_ptr->retransm_queue_size = 0;
458 l_ptr->last_out = NULL; 440 l_ptr->last_out = NULL;
@@ -688,19 +670,23 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)
688static int tipc_link_cong(struct tipc_link *link, struct sk_buff *buf) 670static int tipc_link_cong(struct tipc_link *link, struct sk_buff *buf)
689{ 671{
690 struct tipc_msg *msg = buf_msg(buf); 672 struct tipc_msg *msg = buf_msg(buf);
691 uint psz = msg_size(msg);
692 uint imp = tipc_msg_tot_importance(msg); 673 uint imp = tipc_msg_tot_importance(msg);
693 u32 oport = msg_tot_origport(msg); 674 u32 oport = msg_tot_origport(msg);
694 675
695 if (likely(imp <= TIPC_CRITICAL_IMPORTANCE)) { 676 if (unlikely(imp > TIPC_CRITICAL_IMPORTANCE)) {
696 if (!msg_errcode(msg) && !msg_reroute_cnt(msg)) {
697 link_schedule_port(link, oport, psz);
698 return -ELINKCONG;
699 }
700 } else {
701 pr_warn("%s<%s>, send queue full", link_rst_msg, link->name); 677 pr_warn("%s<%s>, send queue full", link_rst_msg, link->name);
702 tipc_link_reset(link); 678 tipc_link_reset(link);
679 goto drop;
703 } 680 }
681 if (unlikely(msg_errcode(msg)))
682 goto drop;
683 if (unlikely(msg_reroute_cnt(msg)))
684 goto drop;
685 if (TIPC_SKB_CB(buf)->wakeup_pending)
686 return -ELINKCONG;
687 if (link_schedule_user(link, oport, TIPC_SKB_CB(buf)->chain_sz, imp))
688 return -ELINKCONG;
689drop:
704 kfree_skb_list(buf); 690 kfree_skb_list(buf);
705 return -EHOSTUNREACH; 691 return -EHOSTUNREACH;
706} 692}
@@ -1202,8 +1188,10 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr)
1202 if (unlikely(l_ptr->next_out)) 1188 if (unlikely(l_ptr->next_out))
1203 tipc_link_push_queue(l_ptr); 1189 tipc_link_push_queue(l_ptr);
1204 1190
1205 if (unlikely(!list_empty(&l_ptr->waiting_ports))) 1191 if (released && !skb_queue_empty(&l_ptr->waiting_sks)) {
1206 tipc_link_wakeup_ports(l_ptr, 0); 1192 link_prepare_wakeup(l_ptr);
1193 l_ptr->owner->action_flags |= TIPC_WAKEUP_USERS;
1194 }
1207 1195
1208 /* Process the incoming packet */ 1196 /* Process the incoming packet */
1209 if (unlikely(!link_working_working(l_ptr))) { 1197 if (unlikely(!link_working_working(l_ptr))) {
@@ -1936,7 +1924,12 @@ void tipc_link_bundle_rcv(struct sk_buff *buf)
1936 } 1924 }
1937 omsg = buf_msg(obuf); 1925 omsg = buf_msg(obuf);
1938 pos += align(msg_size(omsg)); 1926 pos += align(msg_size(omsg));
1939 if (msg_isdata(omsg) || (msg_user(omsg) == CONN_MANAGER)) { 1927 if (msg_isdata(omsg)) {
1928 if (unlikely(msg_type(omsg) == TIPC_MCAST_MSG))
1929 tipc_sk_mcast_rcv(obuf);
1930 else
1931 tipc_sk_rcv(obuf);
1932 } else if (msg_user(omsg) == CONN_MANAGER) {
1940 tipc_sk_rcv(obuf); 1933 tipc_sk_rcv(obuf);
1941 } else if (msg_user(omsg) == NAME_DISTRIBUTOR) { 1934 } else if (msg_user(omsg) == NAME_DISTRIBUTOR) {
1942 tipc_named_rcv(obuf); 1935 tipc_named_rcv(obuf);