aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc/link.c
diff options
context:
space:
mode:
authorJon Paul Maloy <jon.maloy@ericsson.com>2014-08-22 18:09:07 -0400
committerDavid S. Miller <davem@davemloft.net>2014-08-23 14:18:33 -0400
commit50100a5e39461b2a61d6040e73c384766c29975d (patch)
tree7f632d0f22af7f38c282603e5c9f4de3139bce59 /net/tipc/link.c
parent1dd0bd2b14032037d40a316dd52370f1713fa62b (diff)
tipc: use pseudo message to wake up sockets after link congestion
The current link implementation keeps a linked list of blocked ports/ sockets that is populated when there is link congestion. The purpose of this is to let the link know which users to wake up when the congestion abates. This adds unnecessary complexity to the data structure and the code, since it forces us to involve the link each time we want to delete a socket. It also forces us to grab the spinlock port_lock within the scope of node_lock. We want to get rid of this direct dependence, as well as the deadlock hazard resulting from the usage of port_lock. In this commit, we instead let the link keep list of a "wakeup" pseudo messages for use in such situations. Those messages are sent to the pending sockets via the ordinary message reception path, and wake up the socket's owner when they are received. This enables us to get rid of the 'waiting_ports' linked lists in struct tipc_port that manifest this direct reference. As a consequence, we can eliminate another BH entry into the socket, and hence the need to grab port_lock. This is a further step in our effort to remove port_lock altogether. Signed-off-by: Jon Maloy <jon.maloy@ericsson.com> Reviewed-by: Erik Hugne <erik.hugne@ericsson.com> Reviewed-by: Ying Xue <ying.xue@windriver.com> Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net/tipc/link.c')
-rw-r--r--net/tipc/link.c119
1 files changed, 54 insertions, 65 deletions
diff --git a/net/tipc/link.c b/net/tipc/link.c
index fb1485dc6736..6c775a107a02 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -275,7 +275,7 @@ struct tipc_link *tipc_link_create(struct tipc_node *n_ptr,
275 link_init_max_pkt(l_ptr); 275 link_init_max_pkt(l_ptr);
276 276
277 l_ptr->next_out_no = 1; 277 l_ptr->next_out_no = 1;
278 INIT_LIST_HEAD(&l_ptr->waiting_ports); 278 __skb_queue_head_init(&l_ptr->waiting_sks);
279 279
280 link_reset_statistics(l_ptr); 280 link_reset_statistics(l_ptr);
281 281
@@ -322,66 +322,47 @@ void tipc_link_delete_list(unsigned int bearer_id, bool shutting_down)
322} 322}
323 323
324/** 324/**
325 * link_schedule_port - schedule port for deferred sending 325 * link_schedule_user - schedule user for wakeup after congestion
326 * @l_ptr: pointer to link 326 * @link: congested link
327 * @origport: reference to sending port 327 * @oport: sending port
328 * @sz: amount of data to be sent 328 * @chain_sz: size of buffer chain that was attempted sent
329 * 329 * @imp: importance of message attempted sent
330 * Schedules port for renewed sending of messages after link congestion 330 * Create pseudo msg to send back to user when congestion abates
331 * has abated.
332 */ 331 */
333static int link_schedule_port(struct tipc_link *l_ptr, u32 origport, u32 sz) 332static bool link_schedule_user(struct tipc_link *link, u32 oport,
333 uint chain_sz, uint imp)
334{ 334{
335 struct tipc_port *p_ptr; 335 struct sk_buff *buf;
336 struct tipc_sock *tsk;
337 336
338 spin_lock_bh(&tipc_port_list_lock); 337 buf = tipc_msg_create(SOCK_WAKEUP, 0, INT_H_SIZE, 0, tipc_own_addr,
339 p_ptr = tipc_port_lock(origport); 338 tipc_own_addr, oport, 0, 0);
340 if (p_ptr) { 339 if (!buf)
341 if (!list_empty(&p_ptr->wait_list)) 340 return false;
342 goto exit; 341 TIPC_SKB_CB(buf)->chain_sz = chain_sz;
343 tsk = tipc_port_to_sock(p_ptr); 342 TIPC_SKB_CB(buf)->chain_imp = imp;
344 tsk->link_cong = 1; 343 __skb_queue_tail(&link->waiting_sks, buf);
345 p_ptr->waiting_pkts = 1 + ((sz - 1) / l_ptr->max_pkt); 344 link->stats.link_congs++;
346 list_add_tail(&p_ptr->wait_list, &l_ptr->waiting_ports); 345 return true;
347 l_ptr->stats.link_congs++;
348exit:
349 tipc_port_unlock(p_ptr);
350 }
351 spin_unlock_bh(&tipc_port_list_lock);
352 return -ELINKCONG;
353} 346}
354 347
355void tipc_link_wakeup_ports(struct tipc_link *l_ptr, int all) 348/**
349 * link_prepare_wakeup - prepare users for wakeup after congestion
350 * @link: congested link
351 * Move a number of waiting users, as permitted by available space in
352 * the send queue, from link wait queue to node wait queue for wakeup
353 */
354static void link_prepare_wakeup(struct tipc_link *link)
356{ 355{
357 struct tipc_port *p_ptr; 356 struct sk_buff_head *wq = &link->waiting_sks;
358 struct tipc_sock *tsk; 357 struct sk_buff *buf;
359 struct tipc_port *temp_p_ptr; 358 uint pend_qsz = link->out_queue_size;
360 int win = l_ptr->queue_limit[0] - l_ptr->out_queue_size; 359
361 360 for (buf = skb_peek(wq); buf; buf = skb_peek(wq)) {
362 if (all) 361 if (pend_qsz >= link->queue_limit[TIPC_SKB_CB(buf)->chain_imp])
363 win = 100000;
364 if (win <= 0)
365 return;
366 if (!spin_trylock_bh(&tipc_port_list_lock))
367 return;
368 if (link_congested(l_ptr))
369 goto exit;
370 list_for_each_entry_safe(p_ptr, temp_p_ptr, &l_ptr->waiting_ports,
371 wait_list) {
372 if (win <= 0)
373 break; 362 break;
374 tsk = tipc_port_to_sock(p_ptr); 363 pend_qsz += TIPC_SKB_CB(buf)->chain_sz;
375 list_del_init(&p_ptr->wait_list); 364 __skb_queue_tail(&link->owner->waiting_sks, __skb_dequeue(wq));
376 spin_lock_bh(p_ptr->lock);
377 tsk->link_cong = 0;
378 tipc_sock_wakeup(tsk);
379 win -= p_ptr->waiting_pkts;
380 spin_unlock_bh(p_ptr->lock);
381 } 365 }
382
383exit:
384 spin_unlock_bh(&tipc_port_list_lock);
385} 366}
386 367
387/** 368/**
@@ -423,6 +404,7 @@ void tipc_link_reset(struct tipc_link *l_ptr)
423 u32 prev_state = l_ptr->state; 404 u32 prev_state = l_ptr->state;
424 u32 checkpoint = l_ptr->next_in_no; 405 u32 checkpoint = l_ptr->next_in_no;
425 int was_active_link = tipc_link_is_active(l_ptr); 406 int was_active_link = tipc_link_is_active(l_ptr);
407 struct tipc_node *owner = l_ptr->owner;
426 408
427 msg_set_session(l_ptr->pmsg, ((msg_session(l_ptr->pmsg) + 1) & 0xffff)); 409 msg_set_session(l_ptr->pmsg, ((msg_session(l_ptr->pmsg) + 1) & 0xffff));
428 410
@@ -450,9 +432,10 @@ void tipc_link_reset(struct tipc_link *l_ptr)
450 kfree_skb(l_ptr->proto_msg_queue); 432 kfree_skb(l_ptr->proto_msg_queue);
451 l_ptr->proto_msg_queue = NULL; 433 l_ptr->proto_msg_queue = NULL;
452 kfree_skb_list(l_ptr->oldest_deferred_in); 434 kfree_skb_list(l_ptr->oldest_deferred_in);
453 if (!list_empty(&l_ptr->waiting_ports)) 435 if (!skb_queue_empty(&l_ptr->waiting_sks)) {
454 tipc_link_wakeup_ports(l_ptr, 1); 436 skb_queue_splice_init(&l_ptr->waiting_sks, &owner->waiting_sks);
455 437 owner->action_flags |= TIPC_WAKEUP_USERS;
438 }
456 l_ptr->retransm_queue_head = 0; 439 l_ptr->retransm_queue_head = 0;
457 l_ptr->retransm_queue_size = 0; 440 l_ptr->retransm_queue_size = 0;
458 l_ptr->last_out = NULL; 441 l_ptr->last_out = NULL;
@@ -688,19 +671,23 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)
688static int tipc_link_cong(struct tipc_link *link, struct sk_buff *buf) 671static int tipc_link_cong(struct tipc_link *link, struct sk_buff *buf)
689{ 672{
690 struct tipc_msg *msg = buf_msg(buf); 673 struct tipc_msg *msg = buf_msg(buf);
691 uint psz = msg_size(msg);
692 uint imp = tipc_msg_tot_importance(msg); 674 uint imp = tipc_msg_tot_importance(msg);
693 u32 oport = msg_tot_origport(msg); 675 u32 oport = msg_tot_origport(msg);
694 676
695 if (likely(imp <= TIPC_CRITICAL_IMPORTANCE)) { 677 if (unlikely(imp > TIPC_CRITICAL_IMPORTANCE)) {
696 if (!msg_errcode(msg) && !msg_reroute_cnt(msg)) {
697 link_schedule_port(link, oport, psz);
698 return -ELINKCONG;
699 }
700 } else {
701 pr_warn("%s<%s>, send queue full", link_rst_msg, link->name); 678 pr_warn("%s<%s>, send queue full", link_rst_msg, link->name);
702 tipc_link_reset(link); 679 tipc_link_reset(link);
680 goto drop;
703 } 681 }
682 if (unlikely(msg_errcode(msg)))
683 goto drop;
684 if (unlikely(msg_reroute_cnt(msg)))
685 goto drop;
686 if (TIPC_SKB_CB(buf)->wakeup_pending)
687 return -ELINKCONG;
688 if (link_schedule_user(link, oport, TIPC_SKB_CB(buf)->chain_sz, imp))
689 return -ELINKCONG;
690drop:
704 kfree_skb_list(buf); 691 kfree_skb_list(buf);
705 return -EHOSTUNREACH; 692 return -EHOSTUNREACH;
706} 693}
@@ -1202,8 +1189,10 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr)
1202 if (unlikely(l_ptr->next_out)) 1189 if (unlikely(l_ptr->next_out))
1203 tipc_link_push_queue(l_ptr); 1190 tipc_link_push_queue(l_ptr);
1204 1191
1205 if (unlikely(!list_empty(&l_ptr->waiting_ports))) 1192 if (released && !skb_queue_empty(&l_ptr->waiting_sks)) {
1206 tipc_link_wakeup_ports(l_ptr, 0); 1193 link_prepare_wakeup(l_ptr);
1194 l_ptr->owner->action_flags |= TIPC_WAKEUP_USERS;
1195 }
1207 1196
1208 /* Process the incoming packet */ 1197 /* Process the incoming packet */
1209 if (unlikely(!link_working_working(l_ptr))) { 1198 if (unlikely(!link_working_working(l_ptr))) {