aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc/link.c
diff options
context:
space:
mode:
Diffstat (limited to 'net/tipc/link.c')
-rw-r--r--net/tipc/link.c119
1 files changed, 54 insertions, 65 deletions
diff --git a/net/tipc/link.c b/net/tipc/link.c
index fb1485dc6736..6c775a107a02 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -275,7 +275,7 @@ struct tipc_link *tipc_link_create(struct tipc_node *n_ptr,
275 link_init_max_pkt(l_ptr); 275 link_init_max_pkt(l_ptr);
276 276
277 l_ptr->next_out_no = 1; 277 l_ptr->next_out_no = 1;
278 INIT_LIST_HEAD(&l_ptr->waiting_ports); 278 __skb_queue_head_init(&l_ptr->waiting_sks);
279 279
280 link_reset_statistics(l_ptr); 280 link_reset_statistics(l_ptr);
281 281
@@ -322,66 +322,47 @@ void tipc_link_delete_list(unsigned int bearer_id, bool shutting_down)
322} 322}
323 323
324/** 324/**
325 * link_schedule_port - schedule port for deferred sending 325 * link_schedule_user - schedule user for wakeup after congestion
326 * @l_ptr: pointer to link 326 * @link: congested link
327 * @origport: reference to sending port 327 * @oport: sending port
328 * @sz: amount of data to be sent 328 * @chain_sz: size of buffer chain that was attempted sent
329 * 329 * @imp: importance of message attempted sent
330 * Schedules port for renewed sending of messages after link congestion 330 * Create pseudo msg to send back to user when congestion abates
331 * has abated.
332 */ 331 */
333static int link_schedule_port(struct tipc_link *l_ptr, u32 origport, u32 sz) 332static bool link_schedule_user(struct tipc_link *link, u32 oport,
333 uint chain_sz, uint imp)
334{ 334{
335 struct tipc_port *p_ptr; 335 struct sk_buff *buf;
336 struct tipc_sock *tsk;
337 336
338 spin_lock_bh(&tipc_port_list_lock); 337 buf = tipc_msg_create(SOCK_WAKEUP, 0, INT_H_SIZE, 0, tipc_own_addr,
339 p_ptr = tipc_port_lock(origport); 338 tipc_own_addr, oport, 0, 0);
340 if (p_ptr) { 339 if (!buf)
341 if (!list_empty(&p_ptr->wait_list)) 340 return false;
342 goto exit; 341 TIPC_SKB_CB(buf)->chain_sz = chain_sz;
343 tsk = tipc_port_to_sock(p_ptr); 342 TIPC_SKB_CB(buf)->chain_imp = imp;
344 tsk->link_cong = 1; 343 __skb_queue_tail(&link->waiting_sks, buf);
345 p_ptr->waiting_pkts = 1 + ((sz - 1) / l_ptr->max_pkt); 344 link->stats.link_congs++;
346 list_add_tail(&p_ptr->wait_list, &l_ptr->waiting_ports); 345 return true;
347 l_ptr->stats.link_congs++;
348exit:
349 tipc_port_unlock(p_ptr);
350 }
351 spin_unlock_bh(&tipc_port_list_lock);
352 return -ELINKCONG;
353} 346}
354 347
355void tipc_link_wakeup_ports(struct tipc_link *l_ptr, int all) 348/**
349 * link_prepare_wakeup - prepare users for wakeup after congestion
350 * @link: congested link
351 * Move a number of waiting users, as permitted by available space in
352 * the send queue, from link wait queue to node wait queue for wakeup
353 */
354static void link_prepare_wakeup(struct tipc_link *link)
356{ 355{
357 struct tipc_port *p_ptr; 356 struct sk_buff_head *wq = &link->waiting_sks;
358 struct tipc_sock *tsk; 357 struct sk_buff *buf;
359 struct tipc_port *temp_p_ptr; 358 uint pend_qsz = link->out_queue_size;
360 int win = l_ptr->queue_limit[0] - l_ptr->out_queue_size; 359
361 360 for (buf = skb_peek(wq); buf; buf = skb_peek(wq)) {
362 if (all) 361 if (pend_qsz >= link->queue_limit[TIPC_SKB_CB(buf)->chain_imp])
363 win = 100000;
364 if (win <= 0)
365 return;
366 if (!spin_trylock_bh(&tipc_port_list_lock))
367 return;
368 if (link_congested(l_ptr))
369 goto exit;
370 list_for_each_entry_safe(p_ptr, temp_p_ptr, &l_ptr->waiting_ports,
371 wait_list) {
372 if (win <= 0)
373 break; 362 break;
374 tsk = tipc_port_to_sock(p_ptr); 363 pend_qsz += TIPC_SKB_CB(buf)->chain_sz;
375 list_del_init(&p_ptr->wait_list); 364 __skb_queue_tail(&link->owner->waiting_sks, __skb_dequeue(wq));
376 spin_lock_bh(p_ptr->lock);
377 tsk->link_cong = 0;
378 tipc_sock_wakeup(tsk);
379 win -= p_ptr->waiting_pkts;
380 spin_unlock_bh(p_ptr->lock);
381 } 365 }
382
383exit:
384 spin_unlock_bh(&tipc_port_list_lock);
385} 366}
386 367
387/** 368/**
@@ -423,6 +404,7 @@ void tipc_link_reset(struct tipc_link *l_ptr)
423 u32 prev_state = l_ptr->state; 404 u32 prev_state = l_ptr->state;
424 u32 checkpoint = l_ptr->next_in_no; 405 u32 checkpoint = l_ptr->next_in_no;
425 int was_active_link = tipc_link_is_active(l_ptr); 406 int was_active_link = tipc_link_is_active(l_ptr);
407 struct tipc_node *owner = l_ptr->owner;
426 408
427 msg_set_session(l_ptr->pmsg, ((msg_session(l_ptr->pmsg) + 1) & 0xffff)); 409 msg_set_session(l_ptr->pmsg, ((msg_session(l_ptr->pmsg) + 1) & 0xffff));
428 410
@@ -450,9 +432,10 @@ void tipc_link_reset(struct tipc_link *l_ptr)
450 kfree_skb(l_ptr->proto_msg_queue); 432 kfree_skb(l_ptr->proto_msg_queue);
451 l_ptr->proto_msg_queue = NULL; 433 l_ptr->proto_msg_queue = NULL;
452 kfree_skb_list(l_ptr->oldest_deferred_in); 434 kfree_skb_list(l_ptr->oldest_deferred_in);
453 if (!list_empty(&l_ptr->waiting_ports)) 435 if (!skb_queue_empty(&l_ptr->waiting_sks)) {
454 tipc_link_wakeup_ports(l_ptr, 1); 436 skb_queue_splice_init(&l_ptr->waiting_sks, &owner->waiting_sks);
455 437 owner->action_flags |= TIPC_WAKEUP_USERS;
438 }
456 l_ptr->retransm_queue_head = 0; 439 l_ptr->retransm_queue_head = 0;
457 l_ptr->retransm_queue_size = 0; 440 l_ptr->retransm_queue_size = 0;
458 l_ptr->last_out = NULL; 441 l_ptr->last_out = NULL;
@@ -688,19 +671,23 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)
688static int tipc_link_cong(struct tipc_link *link, struct sk_buff *buf) 671static int tipc_link_cong(struct tipc_link *link, struct sk_buff *buf)
689{ 672{
690 struct tipc_msg *msg = buf_msg(buf); 673 struct tipc_msg *msg = buf_msg(buf);
691 uint psz = msg_size(msg);
692 uint imp = tipc_msg_tot_importance(msg); 674 uint imp = tipc_msg_tot_importance(msg);
693 u32 oport = msg_tot_origport(msg); 675 u32 oport = msg_tot_origport(msg);
694 676
695 if (likely(imp <= TIPC_CRITICAL_IMPORTANCE)) { 677 if (unlikely(imp > TIPC_CRITICAL_IMPORTANCE)) {
696 if (!msg_errcode(msg) && !msg_reroute_cnt(msg)) {
697 link_schedule_port(link, oport, psz);
698 return -ELINKCONG;
699 }
700 } else {
701 pr_warn("%s<%s>, send queue full", link_rst_msg, link->name); 678 pr_warn("%s<%s>, send queue full", link_rst_msg, link->name);
702 tipc_link_reset(link); 679 tipc_link_reset(link);
680 goto drop;
703 } 681 }
682 if (unlikely(msg_errcode(msg)))
683 goto drop;
684 if (unlikely(msg_reroute_cnt(msg)))
685 goto drop;
686 if (TIPC_SKB_CB(buf)->wakeup_pending)
687 return -ELINKCONG;
688 if (link_schedule_user(link, oport, TIPC_SKB_CB(buf)->chain_sz, imp))
689 return -ELINKCONG;
690drop:
704 kfree_skb_list(buf); 691 kfree_skb_list(buf);
705 return -EHOSTUNREACH; 692 return -EHOSTUNREACH;
706} 693}
@@ -1202,8 +1189,10 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr)
1202 if (unlikely(l_ptr->next_out)) 1189 if (unlikely(l_ptr->next_out))
1203 tipc_link_push_queue(l_ptr); 1190 tipc_link_push_queue(l_ptr);
1204 1191
1205 if (unlikely(!list_empty(&l_ptr->waiting_ports))) 1192 if (released && !skb_queue_empty(&l_ptr->waiting_sks)) {
1206 tipc_link_wakeup_ports(l_ptr, 0); 1193 link_prepare_wakeup(l_ptr);
1194 l_ptr->owner->action_flags |= TIPC_WAKEUP_USERS;
1195 }
1207 1196
1208 /* Process the incoming packet */ 1197 /* Process the incoming packet */
1209 if (unlikely(!link_working_working(l_ptr))) { 1198 if (unlikely(!link_working_working(l_ptr))) {