aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc/link.c
diff options
context:
space:
mode:
authorDavid S. Miller <davem@davemloft.net>2014-08-23 14:18:41 -0400
committerDavid S. Miller <davem@davemloft.net>2014-08-23 14:18:41 -0400
commit5aa8dbbd5f9ae6ec6f5ab88596a29a5b5d4caf31 (patch)
treedd7368248aaf26d2476a54a345ede9f0db44eeb9 /net/tipc/link.c
parentf9474ddfaa009ead12bba44fa8fd49dc4536a124 (diff)
parent301bae56f21295a4ba71367818d80735687f11ac (diff)
Merge branch 'tipc-next'
Jon Maloy says: ==================== tipc: Merge port and socket layer code After the removal of the TIPC native interface, there is no reason to keep a distinction between a "generic" port layer and a "specific" socket layer in the code. Throughout the last months, we have posted several series that aimed at facilitating removal of the port layer, and in particular the port_lock spinlock, which in reality duplicates the role normally kept by lock_sock()/bh_lock_sock(). In this series, we finalize this work, by making a significant number of changes to the link, node, port and socket code, all with the aim of reducing dependencies between the layers. In the final commits, we then remove the port spinlock, port.c and port.h altogether. After this series, we have a socket layer that has only few dependencies to the rest of the stack, so that it should be possible to continue cleanups of its code without significantly affecting other code. ==================== Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net/tipc/link.c')
-rw-r--r--net/tipc/link.c120
1 files changed, 54 insertions, 66 deletions
diff --git a/net/tipc/link.c b/net/tipc/link.c
index fb1485dc6736..65410e18b8a6 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -36,7 +36,6 @@
36 36
37#include "core.h" 37#include "core.h"
38#include "link.h" 38#include "link.h"
39#include "port.h"
40#include "socket.h" 39#include "socket.h"
41#include "name_distr.h" 40#include "name_distr.h"
42#include "discover.h" 41#include "discover.h"
@@ -275,7 +274,7 @@ struct tipc_link *tipc_link_create(struct tipc_node *n_ptr,
275 link_init_max_pkt(l_ptr); 274 link_init_max_pkt(l_ptr);
276 275
277 l_ptr->next_out_no = 1; 276 l_ptr->next_out_no = 1;
278 INIT_LIST_HEAD(&l_ptr->waiting_ports); 277 __skb_queue_head_init(&l_ptr->waiting_sks);
279 278
280 link_reset_statistics(l_ptr); 279 link_reset_statistics(l_ptr);
281 280
@@ -322,66 +321,47 @@ void tipc_link_delete_list(unsigned int bearer_id, bool shutting_down)
322} 321}
323 322
324/** 323/**
325 * link_schedule_port - schedule port for deferred sending 324 * link_schedule_user - schedule user for wakeup after congestion
326 * @l_ptr: pointer to link 325 * @link: congested link
327 * @origport: reference to sending port 326 * @oport: sending port
328 * @sz: amount of data to be sent 327 * @chain_sz: size of buffer chain that was attempted sent
329 * 328 * @imp: importance of message attempted sent
330 * Schedules port for renewed sending of messages after link congestion 329 * Create pseudo msg to send back to user when congestion abates
331 * has abated.
332 */ 330 */
333static int link_schedule_port(struct tipc_link *l_ptr, u32 origport, u32 sz) 331static bool link_schedule_user(struct tipc_link *link, u32 oport,
332 uint chain_sz, uint imp)
334{ 333{
335 struct tipc_port *p_ptr; 334 struct sk_buff *buf;
336 struct tipc_sock *tsk;
337 335
338 spin_lock_bh(&tipc_port_list_lock); 336 buf = tipc_msg_create(SOCK_WAKEUP, 0, INT_H_SIZE, 0, tipc_own_addr,
339 p_ptr = tipc_port_lock(origport); 337 tipc_own_addr, oport, 0, 0);
340 if (p_ptr) { 338 if (!buf)
341 if (!list_empty(&p_ptr->wait_list)) 339 return false;
342 goto exit; 340 TIPC_SKB_CB(buf)->chain_sz = chain_sz;
343 tsk = tipc_port_to_sock(p_ptr); 341 TIPC_SKB_CB(buf)->chain_imp = imp;
344 tsk->link_cong = 1; 342 __skb_queue_tail(&link->waiting_sks, buf);
345 p_ptr->waiting_pkts = 1 + ((sz - 1) / l_ptr->max_pkt); 343 link->stats.link_congs++;
346 list_add_tail(&p_ptr->wait_list, &l_ptr->waiting_ports); 344 return true;
347 l_ptr->stats.link_congs++;
348exit:
349 tipc_port_unlock(p_ptr);
350 }
351 spin_unlock_bh(&tipc_port_list_lock);
352 return -ELINKCONG;
353} 345}
354 346
355void tipc_link_wakeup_ports(struct tipc_link *l_ptr, int all) 347/**
348 * link_prepare_wakeup - prepare users for wakeup after congestion
349 * @link: congested link
350 * Move a number of waiting users, as permitted by available space in
351 * the send queue, from link wait queue to node wait queue for wakeup
352 */
353static void link_prepare_wakeup(struct tipc_link *link)
356{ 354{
357 struct tipc_port *p_ptr; 355 struct sk_buff_head *wq = &link->waiting_sks;
358 struct tipc_sock *tsk; 356 struct sk_buff *buf;
359 struct tipc_port *temp_p_ptr; 357 uint pend_qsz = link->out_queue_size;
360 int win = l_ptr->queue_limit[0] - l_ptr->out_queue_size; 358
361 359 for (buf = skb_peek(wq); buf; buf = skb_peek(wq)) {
362 if (all) 360 if (pend_qsz >= link->queue_limit[TIPC_SKB_CB(buf)->chain_imp])
363 win = 100000;
364 if (win <= 0)
365 return;
366 if (!spin_trylock_bh(&tipc_port_list_lock))
367 return;
368 if (link_congested(l_ptr))
369 goto exit;
370 list_for_each_entry_safe(p_ptr, temp_p_ptr, &l_ptr->waiting_ports,
371 wait_list) {
372 if (win <= 0)
373 break; 361 break;
374 tsk = tipc_port_to_sock(p_ptr); 362 pend_qsz += TIPC_SKB_CB(buf)->chain_sz;
375 list_del_init(&p_ptr->wait_list); 363 __skb_queue_tail(&link->owner->waiting_sks, __skb_dequeue(wq));
376 spin_lock_bh(p_ptr->lock);
377 tsk->link_cong = 0;
378 tipc_sock_wakeup(tsk);
379 win -= p_ptr->waiting_pkts;
380 spin_unlock_bh(p_ptr->lock);
381 } 364 }
382
383exit:
384 spin_unlock_bh(&tipc_port_list_lock);
385} 365}
386 366
387/** 367/**
@@ -423,6 +403,7 @@ void tipc_link_reset(struct tipc_link *l_ptr)
423 u32 prev_state = l_ptr->state; 403 u32 prev_state = l_ptr->state;
424 u32 checkpoint = l_ptr->next_in_no; 404 u32 checkpoint = l_ptr->next_in_no;
425 int was_active_link = tipc_link_is_active(l_ptr); 405 int was_active_link = tipc_link_is_active(l_ptr);
406 struct tipc_node *owner = l_ptr->owner;
426 407
427 msg_set_session(l_ptr->pmsg, ((msg_session(l_ptr->pmsg) + 1) & 0xffff)); 408 msg_set_session(l_ptr->pmsg, ((msg_session(l_ptr->pmsg) + 1) & 0xffff));
428 409
@@ -450,9 +431,10 @@ void tipc_link_reset(struct tipc_link *l_ptr)
450 kfree_skb(l_ptr->proto_msg_queue); 431 kfree_skb(l_ptr->proto_msg_queue);
451 l_ptr->proto_msg_queue = NULL; 432 l_ptr->proto_msg_queue = NULL;
452 kfree_skb_list(l_ptr->oldest_deferred_in); 433 kfree_skb_list(l_ptr->oldest_deferred_in);
453 if (!list_empty(&l_ptr->waiting_ports)) 434 if (!skb_queue_empty(&l_ptr->waiting_sks)) {
454 tipc_link_wakeup_ports(l_ptr, 1); 435 skb_queue_splice_init(&l_ptr->waiting_sks, &owner->waiting_sks);
455 436 owner->action_flags |= TIPC_WAKEUP_USERS;
437 }
456 l_ptr->retransm_queue_head = 0; 438 l_ptr->retransm_queue_head = 0;
457 l_ptr->retransm_queue_size = 0; 439 l_ptr->retransm_queue_size = 0;
458 l_ptr->last_out = NULL; 440 l_ptr->last_out = NULL;
@@ -688,19 +670,23 @@ static void link_state_event(struct tipc_link *l_ptr, unsigned int event)
688static int tipc_link_cong(struct tipc_link *link, struct sk_buff *buf) 670static int tipc_link_cong(struct tipc_link *link, struct sk_buff *buf)
689{ 671{
690 struct tipc_msg *msg = buf_msg(buf); 672 struct tipc_msg *msg = buf_msg(buf);
691 uint psz = msg_size(msg);
692 uint imp = tipc_msg_tot_importance(msg); 673 uint imp = tipc_msg_tot_importance(msg);
693 u32 oport = msg_tot_origport(msg); 674 u32 oport = msg_tot_origport(msg);
694 675
695 if (likely(imp <= TIPC_CRITICAL_IMPORTANCE)) { 676 if (unlikely(imp > TIPC_CRITICAL_IMPORTANCE)) {
696 if (!msg_errcode(msg) && !msg_reroute_cnt(msg)) {
697 link_schedule_port(link, oport, psz);
698 return -ELINKCONG;
699 }
700 } else {
701 pr_warn("%s<%s>, send queue full", link_rst_msg, link->name); 677 pr_warn("%s<%s>, send queue full", link_rst_msg, link->name);
702 tipc_link_reset(link); 678 tipc_link_reset(link);
679 goto drop;
703 } 680 }
681 if (unlikely(msg_errcode(msg)))
682 goto drop;
683 if (unlikely(msg_reroute_cnt(msg)))
684 goto drop;
685 if (TIPC_SKB_CB(buf)->wakeup_pending)
686 return -ELINKCONG;
687 if (link_schedule_user(link, oport, TIPC_SKB_CB(buf)->chain_sz, imp))
688 return -ELINKCONG;
689drop:
704 kfree_skb_list(buf); 690 kfree_skb_list(buf);
705 return -EHOSTUNREACH; 691 return -EHOSTUNREACH;
706} 692}
@@ -1202,8 +1188,10 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr)
1202 if (unlikely(l_ptr->next_out)) 1188 if (unlikely(l_ptr->next_out))
1203 tipc_link_push_queue(l_ptr); 1189 tipc_link_push_queue(l_ptr);
1204 1190
1205 if (unlikely(!list_empty(&l_ptr->waiting_ports))) 1191 if (released && !skb_queue_empty(&l_ptr->waiting_sks)) {
1206 tipc_link_wakeup_ports(l_ptr, 0); 1192 link_prepare_wakeup(l_ptr);
1193 l_ptr->owner->action_flags |= TIPC_WAKEUP_USERS;
1194 }
1207 1195
1208 /* Process the incoming packet */ 1196 /* Process the incoming packet */
1209 if (unlikely(!link_working_working(l_ptr))) { 1197 if (unlikely(!link_working_working(l_ptr))) {