aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc
diff options
context:
space:
mode:
authorJon Paul Maloy <jon.maloy@ericsson.com>2015-07-16 16:54:31 -0400
committerDavid S. Miller <davem@davemloft.net>2015-07-20 23:41:16 -0400
commitd999297c3dbbe7fdd832f7fa4ec84301e170b3e6 (patch)
treeb476768ed4799eb009a19f7ff348b3ebd54212fa /net/tipc
parent1a20cc254e60e79929ef7edb5cf784df86b46e42 (diff)
tipc: reduce locking scope during packet reception
We convert packet/message reception according to the same principle we have been using for message sending and timeout handling: We move the function tipc_rcv() to node.c, hence handling the initial packet reception at the link aggregation level. The function grabs the node lock, selects the receiving link, and accesses it via a new call tipc_link_rcv(). This function appends buffers to the input queue for delivery upwards, but it may also append outgoing packets to the xmit queue, just as we do during regular message sending. The latter will happen when buffers are forwarded from the link backlog, or when retransmission is requested. Upon return of this function, and after having released the node lock, tipc_rcv() delivers/tranmsits the contents of those queues, but it may also perform actions such as link activation or reset, as indicated by the return flags from the link. This reduces the number of cpu cycles spent inside the node spinlock, and reduces contention on that lock. Reviewed-by: Ying Xue <ying.xue@windriver.com> Signed-off-by: Jon Maloy <jon.maloy@ericsson.com> Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net/tipc')
-rw-r--r--net/tipc/bcast.c23
-rw-r--r--net/tipc/bcast.h1
-rw-r--r--net/tipc/core.h5
-rw-r--r--net/tipc/link.c673
-rw-r--r--net/tipc/link.h6
-rw-r--r--net/tipc/msg.h50
-rw-r--r--net/tipc/node.c105
-rw-r--r--net/tipc/node.h4
8 files changed, 478 insertions, 389 deletions
diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index aab4e8dd7b32..8b010c976b2f 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -316,6 +316,29 @@ void tipc_bclink_update_link_state(struct tipc_node *n_ptr,
316 } 316 }
317} 317}
318 318
319void tipc_bclink_sync_state(struct tipc_node *n, struct tipc_msg *hdr)
320{
321 u16 last = msg_last_bcast(hdr);
322 int mtyp = msg_type(hdr);
323
324 if (unlikely(msg_user(hdr) != LINK_PROTOCOL))
325 return;
326 if (mtyp == STATE_MSG) {
327 tipc_bclink_update_link_state(n, last);
328 return;
329 }
330 /* Compatibility: older nodes don't know BCAST_PROTOCOL synchronization,
331 * and transfer synch info in LINK_PROTOCOL messages.
332 */
333 if (tipc_node_is_up(n))
334 return;
335 if ((mtyp != RESET_MSG) && (mtyp != ACTIVATE_MSG))
336 return;
337 n->bclink.last_sent = last;
338 n->bclink.last_in = last;
339 n->bclink.oos_state = 0;
340}
341
319/** 342/**
320 * bclink_peek_nack - monitor retransmission requests sent by other nodes 343 * bclink_peek_nack - monitor retransmission requests sent by other nodes
321 * 344 *
diff --git a/net/tipc/bcast.h b/net/tipc/bcast.h
index 3c290a48f720..d74c69bcf60b 100644
--- a/net/tipc/bcast.h
+++ b/net/tipc/bcast.h
@@ -133,5 +133,6 @@ void tipc_bclink_wakeup_users(struct net *net);
133int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg); 133int tipc_nl_add_bc_link(struct net *net, struct tipc_nl_msg *msg);
134int tipc_nl_bc_link_set(struct net *net, struct nlattr *attrs[]); 134int tipc_nl_bc_link_set(struct net *net, struct nlattr *attrs[]);
135void tipc_bclink_input(struct net *net); 135void tipc_bclink_input(struct net *net);
136void tipc_bclink_sync_state(struct tipc_node *n, struct tipc_msg *msg);
136 137
137#endif 138#endif
diff --git a/net/tipc/core.h b/net/tipc/core.h
index 0fcf133d5cb7..f4ed67778c54 100644
--- a/net/tipc/core.h
+++ b/net/tipc/core.h
@@ -129,6 +129,11 @@ static inline int less(u16 left, u16 right)
129 return less_eq(left, right) && (mod(right) != mod(left)); 129 return less_eq(left, right) && (mod(right) != mod(left));
130} 130}
131 131
132static inline int in_range(u16 val, u16 min, u16 max)
133{
134 return !less(val, min) && !more(val, max);
135}
136
132#ifdef CONFIG_SYSCTL 137#ifdef CONFIG_SYSCTL
133int tipc_register_sysctl(void); 138int tipc_register_sysctl(void);
134void tipc_unregister_sysctl(void); 139void tipc_unregister_sysctl(void);
diff --git a/net/tipc/link.c b/net/tipc/link.c
index eaccf4552d15..55b675d20de8 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -77,6 +77,10 @@ static const struct nla_policy tipc_nl_prop_policy[TIPC_NLA_PROP_MAX + 1] = {
77}; 77};
78 78
79/* 79/*
80 * Interval between NACKs when packets arrive out of order
81 */
82#define TIPC_NACK_INTV (TIPC_MIN_LINK_WIN * 2)
83/*
80 * Out-of-range value for link session numbers 84 * Out-of-range value for link session numbers
81 */ 85 */
82#define WILDCARD_SESSION 0x10000 86#define WILDCARD_SESSION 0x10000
@@ -123,22 +127,19 @@ static int link_establishing(struct tipc_link *l)
123 return l->state == TIPC_LINK_ESTABLISHING; 127 return l->state == TIPC_LINK_ESTABLISHING;
124} 128}
125 129
126static void link_handle_out_of_seq_msg(struct tipc_link *link, 130static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb,
127 struct sk_buff *skb); 131 struct sk_buff_head *xmitq);
128static void tipc_link_proto_rcv(struct tipc_link *link,
129 struct sk_buff *skb);
130static void link_state_event(struct tipc_link *l_ptr, u32 event);
131static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe, 132static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe,
132 u16 rcvgap, int tolerance, int priority, 133 u16 rcvgap, int tolerance, int priority,
133 struct sk_buff_head *xmitq); 134 struct sk_buff_head *xmitq);
134static void link_reset_statistics(struct tipc_link *l_ptr); 135static void link_reset_statistics(struct tipc_link *l_ptr);
135static void link_print(struct tipc_link *l_ptr, const char *str); 136static void link_print(struct tipc_link *l_ptr, const char *str);
136static void tipc_link_sync_xmit(struct tipc_link *l); 137static void tipc_link_build_bcast_sync_msg(struct tipc_link *l,
138 struct sk_buff_head *xmitq);
137static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf); 139static void tipc_link_sync_rcv(struct tipc_node *n, struct sk_buff *buf);
138static void tipc_link_input(struct tipc_link *l, struct sk_buff *skb); 140static void tipc_link_input(struct tipc_link *l, struct sk_buff *skb);
139static bool tipc_data_input(struct tipc_link *l, struct sk_buff *skb); 141static bool tipc_data_input(struct tipc_link *l, struct sk_buff *skb);
140static bool tipc_link_failover_rcv(struct tipc_link *l, struct sk_buff **skb); 142static bool tipc_link_failover_rcv(struct tipc_link *l, struct sk_buff **skb);
141static void link_activate(struct tipc_link *link);
142 143
143/* 144/*
144 * Simple link routines 145 * Simple link routines
@@ -283,6 +284,26 @@ void tipc_link_delete_list(struct net *net, unsigned int bearer_id)
283 rcu_read_unlock(); 284 rcu_read_unlock();
284} 285}
285 286
287/* tipc_link_build_bcast_sync_msg() - synchronize broadcast link endpoints.
288 *
289 * Give a newly added peer node the sequence number where it should
290 * start receiving and acking broadcast packets.
291 */
292static void tipc_link_build_bcast_sync_msg(struct tipc_link *l,
293 struct sk_buff_head *xmitq)
294{
295 struct sk_buff *skb;
296 struct sk_buff_head list;
297
298 skb = tipc_msg_create(BCAST_PROTOCOL, STATE_MSG, INT_H_SIZE,
299 0, l->addr, link_own_addr(l), 0, 0, 0);
300 if (!skb)
301 return;
302 __skb_queue_head_init(&list);
303 __skb_queue_tail(&list, skb);
304 tipc_link_xmit(l, &list, xmitq);
305}
306
286/** 307/**
287 * tipc_link_fsm_evt - link finite state machine 308 * tipc_link_fsm_evt - link finite state machine
288 * @l: pointer to link 309 * @l: pointer to link
@@ -295,12 +316,13 @@ static int tipc_link_fsm_evt(struct tipc_link *l, int evt,
295 int mtyp = 0, rc = 0; 316 int mtyp = 0, rc = 0;
296 struct tipc_link *pl; 317 struct tipc_link *pl;
297 enum { 318 enum {
298 LINK_RESET = 1, 319 LINK_RESET = 1,
299 LINK_ACTIVATE = (1 << 1), 320 LINK_ACTIVATE = (1 << 1),
300 SND_PROBE = (1 << 2), 321 SND_PROBE = (1 << 2),
301 SND_STATE = (1 << 3), 322 SND_STATE = (1 << 3),
302 SND_RESET = (1 << 4), 323 SND_RESET = (1 << 4),
303 SND_ACTIVATE = (1 << 5) 324 SND_ACTIVATE = (1 << 5),
325 SND_BCAST_SYNC = (1 << 6)
304 } actions = 0; 326 } actions = 0;
305 327
306 if (l->exec_mode == TIPC_LINK_BLOCKED) 328 if (l->exec_mode == TIPC_LINK_BLOCKED)
@@ -352,8 +374,8 @@ static int tipc_link_fsm_evt(struct tipc_link *l, int evt,
352 if (pl && link_probing(pl)) 374 if (pl && link_probing(pl))
353 break; 375 break;
354 actions |= LINK_ACTIVATE; 376 actions |= LINK_ACTIVATE;
355 if (l->owner->working_links == 1) 377 if (!l->owner->working_links)
356 tipc_link_sync_xmit(l); 378 actions |= SND_BCAST_SYNC;
357 break; 379 break;
358 case PEER_RESET_EVT: 380 case PEER_RESET_EVT:
359 l->state = TIPC_LINK_ESTABLISHING; 381 l->state = TIPC_LINK_ESTABLISHING;
@@ -374,8 +396,8 @@ static int tipc_link_fsm_evt(struct tipc_link *l, int evt,
374 if (pl && link_probing(pl)) 396 if (pl && link_probing(pl))
375 break; 397 break;
376 actions |= LINK_ACTIVATE; 398 actions |= LINK_ACTIVATE;
377 if (l->owner->working_links == 1) 399 if (!l->owner->working_links)
378 tipc_link_sync_xmit(l); 400 actions |= SND_BCAST_SYNC;
379 break; 401 break;
380 case PEER_RESET_EVT: 402 case PEER_RESET_EVT:
381 break; 403 break;
@@ -408,6 +430,8 @@ static int tipc_link_fsm_evt(struct tipc_link *l, int evt,
408 if (actions & (SND_PROBE | SND_STATE | SND_RESET | SND_ACTIVATE)) 430 if (actions & (SND_PROBE | SND_STATE | SND_RESET | SND_ACTIVATE))
409 tipc_link_build_proto_msg(l, mtyp, actions & SND_PROBE, 431 tipc_link_build_proto_msg(l, mtyp, actions & SND_PROBE,
410 0, 0, 0, xmitq); 432 0, 0, 0, xmitq);
433 if (actions & SND_BCAST_SYNC)
434 tipc_link_build_bcast_sync_msg(l, xmitq);
411 return rc; 435 return rc;
412} 436}
413 437
@@ -605,12 +629,14 @@ void tipc_link_reset(struct tipc_link *l_ptr)
605 l_ptr->reasm_buf = NULL; 629 l_ptr->reasm_buf = NULL;
606 l_ptr->rcv_unacked = 0; 630 l_ptr->rcv_unacked = 0;
607 l_ptr->snd_nxt = 1; 631 l_ptr->snd_nxt = 1;
632 l_ptr->rcv_nxt = 1;
608 l_ptr->silent_intv_cnt = 0; 633 l_ptr->silent_intv_cnt = 0;
634 l_ptr->stats.recv_info = 0;
609 l_ptr->stale_count = 0; 635 l_ptr->stale_count = 0;
610 link_reset_statistics(l_ptr); 636 link_reset_statistics(l_ptr);
611} 637}
612 638
613static void link_activate(struct tipc_link *link) 639void tipc_link_activate(struct tipc_link *link)
614{ 640{
615 struct tipc_node *node = link->owner; 641 struct tipc_node *node = link->owner;
616 642
@@ -624,36 +650,6 @@ static void link_activate(struct tipc_link *link)
624} 650}
625 651
626/** 652/**
627 * link_state_event - link finite state machine
628 * @l_ptr: pointer to link
629 * @event: state machine event to process
630 */
631static void link_state_event(struct tipc_link *l, unsigned int evt)
632{
633 int rc;
634 struct sk_buff_head xmitq;
635 struct sk_buff *skb;
636
637 if (l->exec_mode == TIPC_LINK_BLOCKED)
638 return;
639
640 __skb_queue_head_init(&xmitq);
641
642 rc = tipc_link_fsm_evt(l, evt, &xmitq);
643
644 if (rc & TIPC_LINK_UP_EVT)
645 link_activate(l);
646
647 if (rc & TIPC_LINK_DOWN_EVT)
648 tipc_link_reset(l);
649
650 skb = __skb_dequeue(&xmitq);
651 if (!skb)
652 return;
653 tipc_bearer_send(l->owner->net, l->bearer_id, skb, &l->media_addr);
654}
655
656/**
657 * __tipc_link_xmit(): same as tipc_link_xmit, but destlink is known & locked 653 * __tipc_link_xmit(): same as tipc_link_xmit, but destlink is known & locked
658 * @link: link to use 654 * @link: link to use
659 * @list: chain of buffers containing message 655 * @list: chain of buffers containing message
@@ -808,30 +804,6 @@ static int __tipc_link_xmit_skb(struct tipc_link *link, struct sk_buff *skb)
808} 804}
809 805
810/* 806/*
811 * tipc_link_sync_xmit - synchronize broadcast link endpoints.
812 *
813 * Give a newly added peer node the sequence number where it should
814 * start receiving and acking broadcast packets.
815 *
816 * Called with node locked
817 */
818static void tipc_link_sync_xmit(struct tipc_link *link)
819{
820 struct sk_buff *skb;
821 struct tipc_msg *msg;
822
823 skb = tipc_buf_acquire(INT_H_SIZE);
824 if (!skb)
825 return;
826
827 msg = buf_msg(skb);
828 tipc_msg_init(link_own_addr(link), msg, BCAST_PROTOCOL, STATE_MSG,
829 INT_H_SIZE, link->addr);
830 msg_set_last_bcast(msg, link->owner->bclink.acked);
831 __tipc_link_xmit_skb(link, skb);
832}
833
834/*
835 * tipc_link_sync_rcv - synchronize broadcast link endpoints. 807 * tipc_link_sync_rcv - synchronize broadcast link endpoints.
836 * Receive the sequence number where we should start receiving and 808 * Receive the sequence number where we should start receiving and
837 * acking broadcast packets from a newly added peer node, and open 809 * acking broadcast packets from a newly added peer node, and open
@@ -881,6 +853,34 @@ void tipc_link_push_packets(struct tipc_link *link)
881 link->snd_nxt = seqno; 853 link->snd_nxt = seqno;
882} 854}
883 855
856void tipc_link_advance_backlog(struct tipc_link *l, struct sk_buff_head *xmitq)
857{
858 struct sk_buff *skb, *_skb;
859 struct tipc_msg *hdr;
860 u16 seqno = l->snd_nxt;
861 u16 ack = l->rcv_nxt - 1;
862
863 while (skb_queue_len(&l->transmq) < l->window) {
864 skb = skb_peek(&l->backlogq);
865 if (!skb)
866 break;
867 _skb = skb_clone(skb, GFP_ATOMIC);
868 if (!_skb)
869 break;
870 __skb_dequeue(&l->backlogq);
871 hdr = buf_msg(skb);
872 l->backlog[msg_importance(hdr)].len--;
873 __skb_queue_tail(&l->transmq, skb);
874 __skb_queue_tail(xmitq, _skb);
875 msg_set_ack(hdr, ack);
876 msg_set_seqno(hdr, seqno);
877 msg_set_bcast_ack(hdr, l->owner->bclink.last_in);
878 l->rcv_unacked = 0;
879 seqno++;
880 }
881 l->snd_nxt = seqno;
882}
883
884void tipc_link_reset_all(struct tipc_node *node) 884void tipc_link_reset_all(struct tipc_node *node)
885{ 885{
886 char addr_string[16]; 886 char addr_string[16];
@@ -978,6 +978,41 @@ void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *skb,
978 } 978 }
979} 979}
980 980
981static int tipc_link_retransm(struct tipc_link *l, int retransm,
982 struct sk_buff_head *xmitq)
983{
984 struct sk_buff *_skb, *skb = skb_peek(&l->transmq);
985 struct tipc_msg *hdr;
986
987 if (!skb)
988 return 0;
989
990 /* Detect repeated retransmit failures on same packet */
991 if (likely(l->last_retransm != buf_seqno(skb))) {
992 l->last_retransm = buf_seqno(skb);
993 l->stale_count = 1;
994 } else if (++l->stale_count > 100) {
995 link_retransmit_failure(l, skb);
996 return TIPC_LINK_DOWN_EVT;
997 }
998 skb_queue_walk(&l->transmq, skb) {
999 if (!retransm)
1000 return 0;
1001 hdr = buf_msg(skb);
1002 _skb = __pskb_copy(skb, MIN_H_SIZE, GFP_ATOMIC);
1003 if (!_skb)
1004 return 0;
1005 hdr = buf_msg(_skb);
1006 msg_set_ack(hdr, l->rcv_nxt - 1);
1007 msg_set_bcast_ack(hdr, l->owner->bclink.last_in);
1008 _skb->priority = TC_PRIO_CONTROL;
1009 __skb_queue_tail(xmitq, _skb);
1010 retransm--;
1011 l->stats.retransmitted++;
1012 }
1013 return 0;
1014}
1015
981/* link_synch(): check if all packets arrived before the synch 1016/* link_synch(): check if all packets arrived before the synch
982 * point have been consumed 1017 * point have been consumed
983 * Returns true if the parallel links are synched, otherwise false 1018 * Returns true if the parallel links are synched, otherwise false
@@ -1004,155 +1039,6 @@ synched:
1004 return true; 1039 return true;
1005} 1040}
1006 1041
1007static void link_retrieve_defq(struct tipc_link *link,
1008 struct sk_buff_head *list)
1009{
1010 u16 seq_no;
1011
1012 if (skb_queue_empty(&link->deferdq))
1013 return;
1014
1015 seq_no = buf_seqno(skb_peek(&link->deferdq));
1016 if (seq_no == link->rcv_nxt)
1017 skb_queue_splice_tail_init(&link->deferdq, list);
1018}
1019
1020/**
1021 * tipc_rcv - process TIPC packets/messages arriving from off-node
1022 * @net: the applicable net namespace
1023 * @skb: TIPC packet
1024 * @b_ptr: pointer to bearer message arrived on
1025 *
1026 * Invoked with no locks held. Bearer pointer must point to a valid bearer
1027 * structure (i.e. cannot be NULL), but bearer can be inactive.
1028 */
1029void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b_ptr)
1030{
1031 struct tipc_net *tn = net_generic(net, tipc_net_id);
1032 struct sk_buff_head head;
1033 struct tipc_node *n_ptr;
1034 struct tipc_link *l_ptr;
1035 struct sk_buff *skb1, *tmp;
1036 struct tipc_msg *msg;
1037 u16 seq_no;
1038 u16 ackd;
1039 u32 released;
1040
1041 skb2list(skb, &head);
1042
1043 while ((skb = __skb_dequeue(&head))) {
1044 /* Ensure message is well-formed */
1045 if (unlikely(!tipc_msg_validate(skb)))
1046 goto discard;
1047
1048 /* Handle arrival of a non-unicast link message */
1049 msg = buf_msg(skb);
1050 if (unlikely(msg_non_seq(msg))) {
1051 if (msg_user(msg) == LINK_CONFIG)
1052 tipc_disc_rcv(net, skb, b_ptr);
1053 else
1054 tipc_bclink_rcv(net, skb);
1055 continue;
1056 }
1057
1058 /* Discard unicast link messages destined for another node */
1059 if (unlikely(!msg_short(msg) &&
1060 (msg_destnode(msg) != tn->own_addr)))
1061 goto discard;
1062
1063 /* Locate neighboring node that sent message */
1064 n_ptr = tipc_node_find(net, msg_prevnode(msg));
1065 if (unlikely(!n_ptr))
1066 goto discard;
1067
1068 tipc_node_lock(n_ptr);
1069 /* Locate unicast link endpoint that should handle message */
1070 l_ptr = n_ptr->links[b_ptr->identity].link;
1071 if (unlikely(!l_ptr))
1072 goto unlock;
1073
1074 /* Is reception of this pkt permitted at the moment ? */
1075 if (!tipc_node_filter_skb(n_ptr, msg))
1076 goto unlock;
1077
1078 /* Validate message sequence number info */
1079 seq_no = msg_seqno(msg);
1080 ackd = msg_ack(msg);
1081
1082 /* Release acked messages */
1083 if (unlikely(n_ptr->bclink.acked != msg_bcast_ack(msg)))
1084 tipc_bclink_acknowledge(n_ptr, msg_bcast_ack(msg));
1085
1086 released = 0;
1087 skb_queue_walk_safe(&l_ptr->transmq, skb1, tmp) {
1088 if (more(buf_seqno(skb1), ackd))
1089 break;
1090 __skb_unlink(skb1, &l_ptr->transmq);
1091 kfree_skb(skb1);
1092 released = 1;
1093 }
1094
1095 /* Try sending any messages link endpoint has pending */
1096 if (unlikely(skb_queue_len(&l_ptr->backlogq)))
1097 tipc_link_push_packets(l_ptr);
1098
1099 if (released && !skb_queue_empty(&l_ptr->wakeupq))
1100 link_prepare_wakeup(l_ptr);
1101
1102 /* Process the incoming packet */
1103 if (unlikely(!link_working(l_ptr))) {
1104 if (msg_user(msg) == LINK_PROTOCOL) {
1105 tipc_link_proto_rcv(l_ptr, skb);
1106 link_retrieve_defq(l_ptr, &head);
1107 skb = NULL;
1108 goto unlock;
1109 }
1110
1111 /* Traffic message. Conditionally activate link */
1112 link_state_event(l_ptr, TRAFFIC_EVT);
1113
1114 if (link_working(l_ptr)) {
1115 /* Re-insert buffer in front of queue */
1116 __skb_queue_head(&head, skb);
1117 skb = NULL;
1118 goto unlock;
1119 }
1120 goto unlock;
1121 }
1122
1123 /* Link is now in state TIPC_LINK_WORKING */
1124 if (unlikely(seq_no != l_ptr->rcv_nxt)) {
1125 link_handle_out_of_seq_msg(l_ptr, skb);
1126 link_retrieve_defq(l_ptr, &head);
1127 skb = NULL;
1128 goto unlock;
1129 }
1130 l_ptr->silent_intv_cnt = 0;
1131
1132 /* Synchronize with parallel link if applicable */
1133 if (unlikely((l_ptr->exec_mode == TIPC_LINK_TUNNEL) &&
1134 !msg_dup(msg))) {
1135 if (!link_synch(l_ptr))
1136 goto unlock;
1137 }
1138 l_ptr->rcv_nxt++;
1139 if (unlikely(!skb_queue_empty(&l_ptr->deferdq)))
1140 link_retrieve_defq(l_ptr, &head);
1141 if (unlikely(++l_ptr->rcv_unacked >= TIPC_MIN_LINK_WIN)) {
1142 l_ptr->stats.sent_acks++;
1143 tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0);
1144 }
1145 tipc_link_input(l_ptr, skb);
1146 skb = NULL;
1147unlock:
1148 tipc_node_unlock(n_ptr);
1149 tipc_node_put(n_ptr);
1150discard:
1151 if (unlikely(skb))
1152 kfree_skb(skb);
1153 }
1154}
1155
1156/* tipc_data_input - deliver data and name distr msgs to upper layer 1042/* tipc_data_input - deliver data and name distr msgs to upper layer
1157 * 1043 *
1158 * Consumes buffer if message is of right type 1044 * Consumes buffer if message is of right type
@@ -1206,9 +1092,6 @@ static void tipc_link_input(struct tipc_link *link, struct sk_buff *skb)
1206 struct sk_buff *iskb; 1092 struct sk_buff *iskb;
1207 int pos = 0; 1093 int pos = 0;
1208 1094
1209 if (likely(tipc_data_input(link, skb)))
1210 return;
1211
1212 switch (msg_user(msg)) { 1095 switch (msg_user(msg)) {
1213 case TUNNEL_PROTOCOL: 1096 case TUNNEL_PROTOCOL:
1214 if (msg_dup(msg)) { 1097 if (msg_dup(msg)) {
@@ -1247,6 +1130,110 @@ static void tipc_link_input(struct tipc_link *link, struct sk_buff *skb)
1247 }; 1130 };
1248} 1131}
1249 1132
1133static bool tipc_link_release_pkts(struct tipc_link *l, u16 acked)
1134{
1135 bool released = false;
1136 struct sk_buff *skb, *tmp;
1137
1138 skb_queue_walk_safe(&l->transmq, skb, tmp) {
1139 if (more(buf_seqno(skb), acked))
1140 break;
1141 __skb_unlink(skb, &l->transmq);
1142 kfree_skb(skb);
1143 released = true;
1144 }
1145 return released;
1146}
1147
1148/* tipc_link_rcv - process TIPC packets/messages arriving from off-node
1149 * @link: the link that should handle the message
1150 * @skb: TIPC packet
1151 * @xmitq: queue to place packets to be sent after this call
1152 */
1153int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb,
1154 struct sk_buff_head *xmitq)
1155{
1156 struct sk_buff_head *arrvq = &l->deferdq;
1157 struct sk_buff *tmp;
1158 struct tipc_msg *hdr;
1159 u16 seqno, rcv_nxt;
1160 int rc = 0;
1161
1162 if (unlikely(!__tipc_skb_queue_sorted(arrvq, skb))) {
1163 if (!(skb_queue_len(arrvq) % TIPC_NACK_INTV))
1164 tipc_link_build_proto_msg(l, STATE_MSG, 0,
1165 0, 0, 0, xmitq);
1166 return rc;
1167 }
1168
1169 skb_queue_walk_safe(arrvq, skb, tmp) {
1170 hdr = buf_msg(skb);
1171
1172 /* Verify and update link state */
1173 if (unlikely(msg_user(hdr) == LINK_PROTOCOL)) {
1174 __skb_dequeue(arrvq);
1175 rc |= tipc_link_proto_rcv(l, skb, xmitq);
1176 continue;
1177 }
1178
1179 if (unlikely(!link_working(l))) {
1180 rc |= tipc_link_fsm_evt(l, TRAFFIC_EVT, xmitq);
1181 if (!link_working(l)) {
1182 kfree_skb(__skb_dequeue(arrvq));
1183 return rc;
1184 }
1185 }
1186
1187 l->silent_intv_cnt = 0;
1188
1189 /* Forward queues and wake up waiting users */
1190 if (likely(tipc_link_release_pkts(l, msg_ack(hdr)))) {
1191 tipc_link_advance_backlog(l, xmitq);
1192 if (unlikely(!skb_queue_empty(&l->wakeupq)))
1193 link_prepare_wakeup(l);
1194 }
1195
1196 /* Defer reception if there is a gap in the sequence */
1197 seqno = msg_seqno(hdr);
1198 rcv_nxt = l->rcv_nxt;
1199 if (unlikely(less(rcv_nxt, seqno))) {
1200 l->stats.deferred_recv++;
1201 return rc;
1202 }
1203
1204 __skb_dequeue(arrvq);
1205
1206 /* Drop if packet already received */
1207 if (unlikely(more(rcv_nxt, seqno))) {
1208 l->stats.duplicates++;
1209 kfree_skb(skb);
1210 return rc;
1211 }
1212
1213 /* Synchronize with parallel link if applicable */
1214 if (unlikely(l->exec_mode == TIPC_LINK_TUNNEL))
1215 if (!msg_dup(hdr) && !link_synch(l)) {
1216 kfree_skb(skb);
1217 return rc;
1218 }
1219
1220 /* Packet can be delivered */
1221 l->rcv_nxt++;
1222 l->stats.recv_info++;
1223 if (unlikely(!tipc_data_input(l, skb)))
1224 tipc_link_input(l, skb);
1225
1226 /* Ack at regular intervals */
1227 if (unlikely(++l->rcv_unacked >= TIPC_MIN_LINK_WIN)) {
1228 l->rcv_unacked = 0;
1229 l->stats.sent_acks++;
1230 tipc_link_build_proto_msg(l, STATE_MSG,
1231 0, 0, 0, 0, xmitq);
1232 }
1233 }
1234 return rc;
1235}
1236
1250/** 1237/**
1251 * tipc_link_defer_pkt - Add out-of-sequence message to deferred reception queue 1238 * tipc_link_defer_pkt - Add out-of-sequence message to deferred reception queue
1252 * 1239 *
@@ -1287,41 +1274,6 @@ u32 tipc_link_defer_pkt(struct sk_buff_head *list, struct sk_buff *skb)
1287} 1274}
1288 1275
1289/* 1276/*
1290 * link_handle_out_of_seq_msg - handle arrival of out-of-sequence packet
1291 */
1292static void link_handle_out_of_seq_msg(struct tipc_link *l_ptr,
1293 struct sk_buff *buf)
1294{
1295 u32 seq_no = buf_seqno(buf);
1296
1297 if (likely(msg_user(buf_msg(buf)) == LINK_PROTOCOL)) {
1298 tipc_link_proto_rcv(l_ptr, buf);
1299 return;
1300 }
1301
1302 /* Record OOS packet arrival */
1303 l_ptr->silent_intv_cnt = 0;
1304
1305 /*
1306 * Discard packet if a duplicate; otherwise add it to deferred queue
1307 * and notify peer of gap as per protocol specification
1308 */
1309 if (less(seq_no, l_ptr->rcv_nxt)) {
1310 l_ptr->stats.duplicates++;
1311 kfree_skb(buf);
1312 return;
1313 }
1314
1315 if (tipc_link_defer_pkt(&l_ptr->deferdq, buf)) {
1316 l_ptr->stats.deferred_recv++;
1317 if ((skb_queue_len(&l_ptr->deferdq) % TIPC_MIN_LINK_WIN) == 1)
1318 tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0);
1319 } else {
1320 l_ptr->stats.duplicates++;
1321 }
1322}
1323
1324/*
1325 * Send protocol message to the other endpoint. 1277 * Send protocol message to the other endpoint.
1326 */ 1278 */
1327void tipc_link_proto_xmit(struct tipc_link *l, u32 msg_typ, int probe_msg, 1279void tipc_link_proto_xmit(struct tipc_link *l, u32 msg_typ, int probe_msg,
@@ -1341,119 +1293,6 @@ void tipc_link_proto_xmit(struct tipc_link *l, u32 msg_typ, int probe_msg,
1341 kfree_skb(skb); 1293 kfree_skb(skb);
1342} 1294}
1343 1295
1344/*
1345 * Receive protocol message :
1346 * Note that network plane id propagates through the network, and may
1347 * change at any time. The node with lowest address rules
1348 */
1349static void tipc_link_proto_rcv(struct tipc_link *l_ptr,
1350 struct sk_buff *buf)
1351{
1352 u32 rec_gap = 0;
1353 u32 msg_tol;
1354 struct tipc_msg *msg = buf_msg(buf);
1355
1356 if (l_ptr->exec_mode == TIPC_LINK_BLOCKED)
1357 goto exit;
1358
1359 if (l_ptr->net_plane != msg_net_plane(msg))
1360 if (link_own_addr(l_ptr) > msg_prevnode(msg))
1361 l_ptr->net_plane = msg_net_plane(msg);
1362
1363 switch (msg_type(msg)) {
1364
1365 case RESET_MSG:
1366 if (!link_probing(l_ptr) &&
1367 (l_ptr->peer_session != WILDCARD_SESSION)) {
1368 if (less_eq(msg_session(msg), l_ptr->peer_session))
1369 break; /* duplicate or old reset: ignore */
1370 }
1371 link_state_event(l_ptr, RESET_MSG);
1372
1373 /* fall thru' */
1374 case ACTIVATE_MSG:
1375 /* Update link settings according other endpoint's values */
1376 strcpy((strrchr(l_ptr->name, ':') + 1), (char *)msg_data(msg));
1377
1378 msg_tol = msg_link_tolerance(msg);
1379 if (msg_tol > l_ptr->tolerance)
1380 l_ptr->tolerance = msg_tol;
1381
1382 if (msg_linkprio(msg) > l_ptr->priority)
1383 l_ptr->priority = msg_linkprio(msg);
1384
1385 if (l_ptr->mtu > msg_max_pkt(msg))
1386 l_ptr->mtu = msg_max_pkt(msg);
1387
1388 /* Synchronize broadcast link info, if not done previously */
1389 if (!tipc_node_is_up(l_ptr->owner)) {
1390 l_ptr->owner->bclink.last_sent =
1391 l_ptr->owner->bclink.last_in =
1392 msg_last_bcast(msg);
1393 l_ptr->owner->bclink.oos_state = 0;
1394 }
1395
1396 l_ptr->peer_session = msg_session(msg);
1397 l_ptr->peer_bearer_id = msg_bearer_id(msg);
1398
1399 if (!msg_peer_is_up(msg))
1400 tipc_node_fsm_evt(l_ptr->owner, PEER_LOST_CONTACT_EVT);
1401 if (msg_type(msg) == ACTIVATE_MSG)
1402 link_state_event(l_ptr, ACTIVATE_MSG);
1403 break;
1404 case STATE_MSG:
1405
1406 msg_tol = msg_link_tolerance(msg);
1407 if (msg_tol)
1408 l_ptr->tolerance = msg_tol;
1409
1410 if (msg_linkprio(msg) &&
1411 (msg_linkprio(msg) != l_ptr->priority)) {
1412 pr_info("%s<%s>, priority change %u->%u\n",
1413 link_rst_msg, l_ptr->name,
1414 l_ptr->priority, msg_linkprio(msg));
1415 l_ptr->priority = msg_linkprio(msg);
1416 tipc_link_reset(l_ptr);
1417 break;
1418 }
1419
1420 /* Record reception; force mismatch at next timeout: */
1421 l_ptr->silent_intv_cnt = 0;
1422
1423 link_state_event(l_ptr, TRAFFIC_EVT);
1424 l_ptr->stats.recv_states++;
1425 if (link_resetting(l_ptr))
1426 break;
1427
1428 if (less_eq(l_ptr->rcv_nxt, msg_next_sent(msg)))
1429 rec_gap = mod(msg_next_sent(msg) - l_ptr->rcv_nxt);
1430
1431 if (msg_probe(msg))
1432 l_ptr->stats.recv_probes++;
1433
1434 /* Protocol message before retransmits, reduce loss risk */
1435 if (l_ptr->owner->bclink.recv_permitted)
1436 tipc_bclink_update_link_state(l_ptr->owner,
1437 msg_last_bcast(msg));
1438
1439 if (rec_gap || (msg_probe(msg)))
1440 tipc_link_proto_xmit(l_ptr, STATE_MSG, 0,
1441 rec_gap, 0, 0);
1442
1443 if (msg_seq_gap(msg)) {
1444 l_ptr->stats.recv_nacks++;
1445 tipc_link_retransmit(l_ptr, skb_peek(&l_ptr->transmq),
1446 msg_seq_gap(msg));
1447 }
1448 if (tipc_link_is_up(l_ptr))
1449 tipc_node_fsm_evt(l_ptr->owner,
1450 PEER_ESTABL_CONTACT_EVT);
1451 break;
1452 }
1453exit:
1454 kfree_skb(buf);
1455}
1456
1457/* tipc_link_build_proto_msg: prepare link protocol message for transmission 1296/* tipc_link_build_proto_msg: prepare link protocol message for transmission
1458 */ 1297 */
1459static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe, 1298static void tipc_link_build_proto_msg(struct tipc_link *l, int mtyp, bool probe,
@@ -1727,6 +1566,96 @@ exit:
1727 return *skb; 1566 return *skb;
1728} 1567}
1729 1568
1569/* tipc_link_proto_rcv(): receive link level protocol message :
1570 * Note that network plane id propagates through the network, and may
1571 * change at any time. The node with lowest numerical id determines
1572 * network plane
1573 */
1574static int tipc_link_proto_rcv(struct tipc_link *l, struct sk_buff *skb,
1575 struct sk_buff_head *xmitq)
1576{
1577 struct tipc_msg *hdr = buf_msg(skb);
1578 u16 rcvgap = 0;
1579 u16 nacked_gap = msg_seq_gap(hdr);
1580 u16 peers_snd_nxt = msg_next_sent(hdr);
1581 u16 peers_tol = msg_link_tolerance(hdr);
1582 u16 peers_prio = msg_linkprio(hdr);
1583 char *if_name;
1584 int rc = 0;
1585
1586 if (l->exec_mode == TIPC_LINK_BLOCKED)
1587 goto exit;
1588
1589 if (link_own_addr(l) > msg_prevnode(hdr))
1590 l->net_plane = msg_net_plane(hdr);
1591
1592 switch (msg_type(hdr)) {
1593 case RESET_MSG:
1594
1595 /* Ignore duplicate RESET with old session number */
1596 if ((less_eq(msg_session(hdr), l->peer_session)) &&
1597 (l->peer_session != WILDCARD_SESSION))
1598 break;
1599 /* fall thru' */
1600 case ACTIVATE_MSG:
1601
1602 /* Complete own link name with peer's interface name */
1603 if_name = strrchr(l->name, ':') + 1;
1604 if (sizeof(l->name) - (if_name - l->name) <= TIPC_MAX_IF_NAME)
1605 break;
1606 if (msg_data_sz(hdr) < TIPC_MAX_IF_NAME)
1607 break;
1608 strncpy(if_name, msg_data(hdr), TIPC_MAX_IF_NAME);
1609
1610 /* Update own tolerance if peer indicates a non-zero value */
1611 if (in_range(peers_tol, TIPC_MIN_LINK_TOL, TIPC_MAX_LINK_TOL))
1612 l->tolerance = peers_tol;
1613
1614 /* Update own priority if peer's priority is higher */
1615 if (in_range(peers_prio, l->priority + 1, TIPC_MAX_LINK_PRI))
1616 l->priority = peers_prio;
1617
1618 l->peer_session = msg_session(hdr);
1619 l->peer_bearer_id = msg_bearer_id(hdr);
1620 rc = tipc_link_fsm_evt(l, msg_type(hdr), xmitq);
1621 if (l->mtu > msg_max_pkt(hdr))
1622 l->mtu = msg_max_pkt(hdr);
1623 break;
1624 case STATE_MSG:
1625 /* Update own tolerance if peer indicates a non-zero value */
1626 if (in_range(peers_tol, TIPC_MIN_LINK_TOL, TIPC_MAX_LINK_TOL))
1627 l->tolerance = peers_tol;
1628
1629 l->silent_intv_cnt = 0;
1630 l->stats.recv_states++;
1631 if (msg_probe(hdr))
1632 l->stats.recv_probes++;
1633 rc = tipc_link_fsm_evt(l, TRAFFIC_EVT, xmitq);
1634 if (!tipc_link_is_up(l))
1635 break;
1636
1637 /* Has peer sent packets we haven't received yet ? */
1638 if (more(peers_snd_nxt, l->rcv_nxt))
1639 rcvgap = peers_snd_nxt - l->rcv_nxt;
1640 if (rcvgap || (msg_probe(hdr)))
1641 tipc_link_build_proto_msg(l, STATE_MSG, 0, rcvgap,
1642 0, l->mtu, xmitq);
1643 tipc_link_release_pkts(l, msg_ack(hdr));
1644
1645 /* If NACK, retransmit will now start at right position */
1646 if (nacked_gap) {
1647 rc |= tipc_link_retransm(l, nacked_gap, xmitq);
1648 l->stats.recv_nacks++;
1649 }
1650 tipc_link_advance_backlog(l, xmitq);
1651 if (unlikely(!skb_queue_empty(&l->wakeupq)))
1652 link_prepare_wakeup(l);
1653 }
1654exit:
1655 kfree_skb(skb);
1656 return rc;
1657}
1658
1730void tipc_link_set_queue_limits(struct tipc_link *l, u32 win) 1659void tipc_link_set_queue_limits(struct tipc_link *l, u32 win)
1731{ 1660{
1732 int max_bulk = TIPC_MAX_PUBLICATIONS / (l->mtu / ITEM_SIZE); 1661 int max_bulk = TIPC_MAX_PUBLICATIONS / (l->mtu / ITEM_SIZE);
diff --git a/net/tipc/link.h b/net/tipc/link.h
index 0cf7d2b11803..37cfd7d7bf7d 100644
--- a/net/tipc/link.h
+++ b/net/tipc/link.h
@@ -58,7 +58,7 @@ enum {
58 TIPC_LINK_TUNNEL 58 TIPC_LINK_TUNNEL
59}; 59};
60 60
61/* Events occurring at packet reception or at timeout 61/* Events returned from link at packet reception or at timeout
62 */ 62 */
63enum { 63enum {
64 TIPC_LINK_UP_EVT = 1, 64 TIPC_LINK_UP_EVT = 1,
@@ -223,6 +223,7 @@ void tipc_link_purge_queues(struct tipc_link *l_ptr);
223void tipc_link_purge_backlog(struct tipc_link *l); 223void tipc_link_purge_backlog(struct tipc_link *l);
224void tipc_link_reset_all(struct tipc_node *node); 224void tipc_link_reset_all(struct tipc_node *node);
225void tipc_link_reset(struct tipc_link *l_ptr); 225void tipc_link_reset(struct tipc_link *l_ptr);
226void tipc_link_activate(struct tipc_link *link);
226int __tipc_link_xmit(struct net *net, struct tipc_link *link, 227int __tipc_link_xmit(struct net *net, struct tipc_link *link,
227 struct sk_buff_head *list); 228 struct sk_buff_head *list);
228int tipc_link_xmit(struct tipc_link *link, struct sk_buff_head *list, 229int tipc_link_xmit(struct tipc_link *link, struct sk_buff_head *list,
@@ -244,7 +245,8 @@ int tipc_nl_link_reset_stats(struct sk_buff *skb, struct genl_info *info);
244int tipc_nl_parse_link_prop(struct nlattr *prop, struct nlattr *props[]); 245int tipc_nl_parse_link_prop(struct nlattr *prop, struct nlattr *props[]);
245void link_prepare_wakeup(struct tipc_link *l); 246void link_prepare_wakeup(struct tipc_link *l);
246int tipc_link_timeout(struct tipc_link *l, struct sk_buff_head *xmitq); 247int tipc_link_timeout(struct tipc_link *l, struct sk_buff_head *xmitq);
247 248int tipc_link_rcv(struct tipc_link *l, struct sk_buff *skb,
249 struct sk_buff_head *xmitq);
248static inline u32 link_own_addr(struct tipc_link *l) 250static inline u32 link_own_addr(struct tipc_link *l)
249{ 251{
250 return msg_prevnode(l->pmsg); 252 return msg_prevnode(l->pmsg);
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index 4dc66d9f69cc..2f1563b47e24 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -38,6 +38,7 @@
38#define _TIPC_MSG_H 38#define _TIPC_MSG_H
39 39
40#include <linux/tipc.h> 40#include <linux/tipc.h>
41#include "core.h"
41 42
42/* 43/*
43 * Constants and routines used to read and write TIPC payload message headers 44 * Constants and routines used to read and write TIPC payload message headers
@@ -658,12 +659,12 @@ static inline void msg_set_link_selector(struct tipc_msg *m, u32 n)
658/* 659/*
659 * Word 5 660 * Word 5
660 */ 661 */
661static inline u32 msg_session(struct tipc_msg *m) 662static inline u16 msg_session(struct tipc_msg *m)
662{ 663{
663 return msg_bits(m, 5, 16, 0xffff); 664 return msg_bits(m, 5, 16, 0xffff);
664} 665}
665 666
666static inline void msg_set_session(struct tipc_msg *m, u32 n) 667static inline void msg_set_session(struct tipc_msg *m, u16 n)
667{ 668{
668 msg_set_bits(m, 5, 16, 0xffff, n); 669 msg_set_bits(m, 5, 16, 0xffff, n);
669} 670}
@@ -766,10 +767,19 @@ static inline void msg_set_link_tolerance(struct tipc_msg *m, u32 n)
766 msg_set_bits(m, 9, 0, 0xffff, n); 767 msg_set_bits(m, 9, 0, 0xffff, n);
767} 768}
768 769
769static inline bool msg_peer_is_up(struct tipc_msg *m) 770static inline bool msg_is_traffic(struct tipc_msg *m)
770{ 771{
771 if (likely(msg_user(m) != LINK_PROTOCOL) || (msg_type(m) == STATE_MSG)) 772 if (likely(msg_user(m) != LINK_PROTOCOL))
772 return true; 773 return true;
774 if ((msg_type(m) == RESET_MSG) || (msg_type(m) == ACTIVATE_MSG))
775 return false;
776 return true;
777}
778
779static inline bool msg_peer_is_up(struct tipc_msg *m)
780{
781 if (likely(msg_is_traffic(m)))
782 return false;
773 return msg_redundant_link(m); 783 return msg_redundant_link(m);
774} 784}
775 785
@@ -886,4 +896,36 @@ static inline bool tipc_skb_queue_tail(struct sk_buff_head *list,
886 return rv; 896 return rv;
887} 897}
888 898
899/* tipc_skb_queue_sorted(); sort pkt into list according to sequence number
900 * @list: list to be appended to
901 * @skb: buffer to add
902 * Returns true if queue should treated further, otherwise false
903 */
904static inline bool __tipc_skb_queue_sorted(struct sk_buff_head *list,
905 struct sk_buff *skb)
906{
907 struct sk_buff *_skb, *tmp;
908 struct tipc_msg *hdr = buf_msg(skb);
909 u16 seqno = msg_seqno(hdr);
910
911 if (skb_queue_empty(list) || (msg_user(hdr) == LINK_PROTOCOL)) {
912 __skb_queue_head(list, skb);
913 return true;
914 }
915 if (likely(less(seqno, buf_seqno(skb_peek(list))))) {
916 __skb_queue_head(list, skb);
917 return true;
918 }
919 if (!more(seqno, buf_seqno(skb_peek_tail(list)))) {
920 skb_queue_walk_safe(list, _skb, tmp) {
921 if (likely(less(seqno, buf_seqno(_skb)))) {
922 __skb_queue_before(list, _skb, skb);
923 return true;
924 }
925 }
926 }
927 __skb_queue_tail(list, skb);
928 return false;
929}
930
889#endif 931#endif
diff --git a/net/tipc/node.c b/net/tipc/node.c
index 9dbbb5de287b..e92f84afbf95 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -40,11 +40,13 @@
40#include "name_distr.h" 40#include "name_distr.h"
41#include "socket.h" 41#include "socket.h"
42#include "bcast.h" 42#include "bcast.h"
43#include "discover.h"
43 44
44static void node_lost_contact(struct tipc_node *n_ptr); 45static void node_lost_contact(struct tipc_node *n_ptr);
45static void node_established_contact(struct tipc_node *n_ptr); 46static void node_established_contact(struct tipc_node *n_ptr);
46static void tipc_node_delete(struct tipc_node *node); 47static void tipc_node_delete(struct tipc_node *node);
47static void tipc_node_timeout(unsigned long data); 48static void tipc_node_timeout(unsigned long data);
49static void tipc_node_fsm_evt(struct tipc_node *n, int evt);
48 50
49struct tipc_sock_conn { 51struct tipc_sock_conn {
50 u32 port; 52 u32 port;
@@ -141,7 +143,7 @@ struct tipc_node *tipc_node_create(struct net *net, u32 addr)
141 break; 143 break;
142 } 144 }
143 list_add_tail_rcu(&n_ptr->list, &temp_node->list); 145 list_add_tail_rcu(&n_ptr->list, &temp_node->list);
144 n_ptr->state = SELF_DOWN_PEER_DOWN; 146 n_ptr->state = SELF_DOWN_PEER_LEAVING;
145 n_ptr->signature = INVALID_NODE_SIG; 147 n_ptr->signature = INVALID_NODE_SIG;
146 n_ptr->active_links[0] = INVALID_BEARER_ID; 148 n_ptr->active_links[0] = INVALID_BEARER_ID;
147 n_ptr->active_links[1] = INVALID_BEARER_ID; 149 n_ptr->active_links[1] = INVALID_BEARER_ID;
@@ -424,7 +426,7 @@ void tipc_node_detach_link(struct tipc_node *n_ptr, struct tipc_link *l_ptr)
424/* tipc_node_fsm_evt - node finite state machine 426/* tipc_node_fsm_evt - node finite state machine
425 * Determines when contact is allowed with peer node 427 * Determines when contact is allowed with peer node
426 */ 428 */
427void tipc_node_fsm_evt(struct tipc_node *n, int evt) 429static void tipc_node_fsm_evt(struct tipc_node *n, int evt)
428{ 430{
429 int state = n->state; 431 int state = n->state;
430 432
@@ -523,23 +525,36 @@ void tipc_node_fsm_evt(struct tipc_node *n, int evt)
523 n->state = state; 525 n->state = state;
524} 526}
525 527
526bool tipc_node_filter_skb(struct tipc_node *n, struct tipc_msg *hdr) 528bool tipc_node_filter_skb(struct tipc_node *n, struct tipc_link *l,
529 struct tipc_msg *hdr)
527{ 530{
528 int state = n->state; 531 int state = n->state;
529 532
530 if (likely(state == SELF_UP_PEER_UP)) 533 if (likely(state == SELF_UP_PEER_UP))
531 return true; 534 return true;
535
532 if (state == SELF_DOWN_PEER_DOWN) 536 if (state == SELF_DOWN_PEER_DOWN)
533 return true; 537 return true;
534 if (state == SELF_UP_PEER_COMING) 538
539 if (state == SELF_UP_PEER_COMING) {
540 /* If not traffic msg, peer may still be ESTABLISHING */
541 if (tipc_link_is_up(l) && msg_is_traffic(hdr))
542 tipc_node_fsm_evt(n, PEER_ESTABL_CONTACT_EVT);
535 return true; 543 return true;
544 }
545
536 if (state == SELF_COMING_PEER_UP) 546 if (state == SELF_COMING_PEER_UP)
537 return true; 547 return true;
548
538 if (state == SELF_LEAVING_PEER_DOWN) 549 if (state == SELF_LEAVING_PEER_DOWN)
539 return false; 550 return false;
540 if (state == SELF_DOWN_PEER_LEAVING) 551
541 if (!msg_peer_is_up(hdr)) 552 if (state == SELF_DOWN_PEER_LEAVING) {
542 return true; 553 if (msg_peer_is_up(hdr))
554 return false;
555 tipc_node_fsm_evt(n, PEER_LOST_CONTACT_EVT);
556 return true;
557 }
543 return false; 558 return false;
544} 559}
545 560
@@ -819,6 +834,82 @@ int tipc_node_xmit_skb(struct net *net, struct sk_buff *skb, u32 dnode,
819 return 0; 834 return 0;
820} 835}
821 836
837/**
838 * tipc_rcv - process TIPC packets/messages arriving from off-node
839 * @net: the applicable net namespace
840 * @skb: TIPC packet
841 * @bearer: pointer to bearer message arrived on
842 *
843 * Invoked with no locks held. Bearer pointer must point to a valid bearer
844 * structure (i.e. cannot be NULL), but bearer can be inactive.
845 */
846void tipc_rcv(struct net *net, struct sk_buff *skb, struct tipc_bearer *b)
847{
848 struct sk_buff_head xmitq;
849 struct tipc_node *n;
850 struct tipc_link *l;
851 struct tipc_msg *hdr;
852 struct tipc_media_addr *maddr;
853 int bearer_id = b->identity;
854 int rc = 0;
855
856 __skb_queue_head_init(&xmitq);
857
858 /* Ensure message is well-formed */
859 if (unlikely(!tipc_msg_validate(skb)))
860 goto discard;
861
862 /* Handle arrival of a non-unicast link packet */
863 hdr = buf_msg(skb);
864 if (unlikely(msg_non_seq(hdr))) {
865 if (msg_user(hdr) == LINK_CONFIG)
866 tipc_disc_rcv(net, skb, b);
867 else
868 tipc_bclink_rcv(net, skb);
869 return;
870 }
871
872 /* Locate neighboring node that sent packet */
873 n = tipc_node_find(net, msg_prevnode(hdr));
874 if (unlikely(!n))
875 goto discard;
876 tipc_node_lock(n);
877
878 /* Locate link endpoint that should handle packet */
879 l = n->links[bearer_id].link;
880 if (unlikely(!l))
881 goto unlock;
882
883 /* Is reception of this packet permitted at the moment ? */
884 if (unlikely(n->state != SELF_UP_PEER_UP))
885 if (!tipc_node_filter_skb(n, l, hdr))
886 goto unlock;
887
888 if (unlikely(msg_user(hdr) == LINK_PROTOCOL))
889 tipc_bclink_sync_state(n, hdr);
890
891 /* Release acked broadcast messages */
892 if (unlikely(n->bclink.acked != msg_bcast_ack(hdr)))
893 tipc_bclink_acknowledge(n, msg_bcast_ack(hdr));
894
895 /* Check protocol and update link state */
896 rc = tipc_link_rcv(l, skb, &xmitq);
897
898 if (unlikely(rc & TIPC_LINK_UP_EVT))
899 tipc_link_activate(l);
900 if (unlikely(rc & TIPC_LINK_DOWN_EVT))
901 tipc_link_reset(l);
902 skb = NULL;
903unlock:
904 tipc_node_unlock(n);
905 tipc_sk_rcv(net, &n->links[bearer_id].inputq);
906 maddr = &n->links[bearer_id].maddr;
907 tipc_bearer_xmit(net, bearer_id, &xmitq, maddr);
908 tipc_node_put(n);
909discard:
910 kfree_skb(skb);
911}
912
822int tipc_nl_node_dump(struct sk_buff *skb, struct netlink_callback *cb) 913int tipc_nl_node_dump(struct sk_buff *skb, struct netlink_callback *cb)
823{ 914{
824 int err; 915 int err;
diff --git a/net/tipc/node.h b/net/tipc/node.h
index 270256e09ee5..5e7016802077 100644
--- a/net/tipc/node.h
+++ b/net/tipc/node.h
@@ -185,7 +185,6 @@ int tipc_node_xmit_skb(struct net *net, struct sk_buff *skb, u32 dest,
185 u32 selector); 185 u32 selector);
186int tipc_node_add_conn(struct net *net, u32 dnode, u32 port, u32 peer_port); 186int tipc_node_add_conn(struct net *net, u32 dnode, u32 port, u32 peer_port);
187void tipc_node_remove_conn(struct net *net, u32 dnode, u32 port); 187void tipc_node_remove_conn(struct net *net, u32 dnode, u32 port);
188
189int tipc_nl_node_dump(struct sk_buff *skb, struct netlink_callback *cb); 188int tipc_nl_node_dump(struct sk_buff *skb, struct netlink_callback *cb);
190 189
191static inline void tipc_node_lock(struct tipc_node *node) 190static inline void tipc_node_lock(struct tipc_node *node)
@@ -193,9 +192,6 @@ static inline void tipc_node_lock(struct tipc_node *node)
193 spin_lock_bh(&node->lock); 192 spin_lock_bh(&node->lock);
194} 193}
195 194
196void tipc_node_fsm_evt(struct tipc_node *n, int evt);
197bool tipc_node_filter_skb(struct tipc_node *n, struct tipc_msg *hdr);
198
199static inline struct tipc_link *node_active_link(struct tipc_node *n, int sel) 195static inline struct tipc_link *node_active_link(struct tipc_node *n, int sel)
200{ 196{
201 int bearer_id = n->active_links[sel & 1]; 197 int bearer_id = n->active_links[sel & 1];