aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc
diff options
context:
space:
mode:
authorYing Xue <ying.xue@windriver.com>2014-11-25 22:41:53 -0500
committerDavid S. Miller <davem@davemloft.net>2014-11-26 12:30:17 -0500
commitbc6fecd4098df2d21b056486e5b418c84be95032 (patch)
tree4b37da9bbd5fda66bba92558de5e6c9dfe97b7f6 /net/tipc
parent58dc55f25631178ee74cd27185956a8f7dcb3e32 (diff)
tipc: use generic SKB list APIs to manage deferred queue of link
Use standard SKB list APIs associated with struct sk_buff_head to manage link's deferred queue, simplifying relevant code. Signed-off-by: Ying Xue <ying.xue@windriver.com> Reviewed-by: Jon Maloy <jon.maloy@ericsson.com> Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net/tipc')
-rw-r--r--net/tipc/bcast.c20
-rw-r--r--net/tipc/link.c74
-rw-r--r--net/tipc/link.h11
-rw-r--r--net/tipc/node.c4
-rw-r--r--net/tipc/node.h7
5 files changed, 47 insertions, 69 deletions
diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index 4a1a3c8627d0..7b238b1f339b 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -352,6 +352,8 @@ void tipc_bclink_update_link_state(struct tipc_node *n_ptr, u32 last_sent)
352 buf = tipc_buf_acquire(INT_H_SIZE); 352 buf = tipc_buf_acquire(INT_H_SIZE);
353 if (buf) { 353 if (buf) {
354 struct tipc_msg *msg = buf_msg(buf); 354 struct tipc_msg *msg = buf_msg(buf);
355 struct sk_buff *skb = skb_peek(&n_ptr->bclink.deferred_queue);
356 u32 to = skb ? buf_seqno(skb) - 1 : n_ptr->bclink.last_sent;
355 357
356 tipc_msg_init(msg, BCAST_PROTOCOL, STATE_MSG, 358 tipc_msg_init(msg, BCAST_PROTOCOL, STATE_MSG,
357 INT_H_SIZE, n_ptr->addr); 359 INT_H_SIZE, n_ptr->addr);
@@ -359,9 +361,7 @@ void tipc_bclink_update_link_state(struct tipc_node *n_ptr, u32 last_sent)
359 msg_set_mc_netid(msg, tipc_net_id); 361 msg_set_mc_netid(msg, tipc_net_id);
360 msg_set_bcast_ack(msg, n_ptr->bclink.last_in); 362 msg_set_bcast_ack(msg, n_ptr->bclink.last_in);
361 msg_set_bcgap_after(msg, n_ptr->bclink.last_in); 363 msg_set_bcgap_after(msg, n_ptr->bclink.last_in);
362 msg_set_bcgap_to(msg, n_ptr->bclink.deferred_head 364 msg_set_bcgap_to(msg, to);
363 ? buf_seqno(n_ptr->bclink.deferred_head) - 1
364 : n_ptr->bclink.last_sent);
365 365
366 tipc_bclink_lock(); 366 tipc_bclink_lock();
367 tipc_bearer_send(MAX_BEARERS, buf, NULL); 367 tipc_bearer_send(MAX_BEARERS, buf, NULL);
@@ -574,31 +574,26 @@ receive:
574 if (node->bclink.last_in == node->bclink.last_sent) 574 if (node->bclink.last_in == node->bclink.last_sent)
575 goto unlock; 575 goto unlock;
576 576
577 if (!node->bclink.deferred_head) { 577 if (skb_queue_empty(&node->bclink.deferred_queue)) {
578 node->bclink.oos_state = 1; 578 node->bclink.oos_state = 1;
579 goto unlock; 579 goto unlock;
580 } 580 }
581 581
582 msg = buf_msg(node->bclink.deferred_head); 582 msg = buf_msg(skb_peek(&node->bclink.deferred_queue));
583 seqno = msg_seqno(msg); 583 seqno = msg_seqno(msg);
584 next_in = mod(next_in + 1); 584 next_in = mod(next_in + 1);
585 if (seqno != next_in) 585 if (seqno != next_in)
586 goto unlock; 586 goto unlock;
587 587
588 /* Take in-sequence message from deferred queue & deliver it */ 588 /* Take in-sequence message from deferred queue & deliver it */
589 buf = node->bclink.deferred_head; 589 buf = __skb_dequeue(&node->bclink.deferred_queue);
590 node->bclink.deferred_head = buf->next;
591 buf->next = NULL;
592 node->bclink.deferred_size--;
593 goto receive; 590 goto receive;
594 } 591 }
595 592
596 /* Handle out-of-sequence broadcast message */ 593 /* Handle out-of-sequence broadcast message */
597 if (less(next_in, seqno)) { 594 if (less(next_in, seqno)) {
598 deferred = tipc_link_defer_pkt(&node->bclink.deferred_head, 595 deferred = tipc_link_defer_pkt(&node->bclink.deferred_queue,
599 &node->bclink.deferred_tail,
600 buf); 596 buf);
601 node->bclink.deferred_size += deferred;
602 bclink_update_last_sent(node, seqno); 597 bclink_update_last_sent(node, seqno);
603 buf = NULL; 598 buf = NULL;
604 } 599 }
@@ -954,6 +949,7 @@ int tipc_bclink_init(void)
954 949
955 spin_lock_init(&bclink->lock); 950 spin_lock_init(&bclink->lock);
956 __skb_queue_head_init(&bcl->outqueue); 951 __skb_queue_head_init(&bcl->outqueue);
952 __skb_queue_head_init(&bcl->deferred_queue);
957 __skb_queue_head_init(&bcl->waiting_sks); 953 __skb_queue_head_init(&bcl->waiting_sks);
958 bcl->next_out_no = 1; 954 bcl->next_out_no = 1;
959 spin_lock_init(&bclink->node.lock); 955 spin_lock_init(&bclink->node.lock);
diff --git a/net/tipc/link.c b/net/tipc/link.c
index 9e94bf935e48..d9c2310e417d 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -292,6 +292,7 @@ struct tipc_link *tipc_link_create(struct tipc_node *n_ptr,
292 292
293 l_ptr->next_out_no = 1; 293 l_ptr->next_out_no = 1;
294 __skb_queue_head_init(&l_ptr->outqueue); 294 __skb_queue_head_init(&l_ptr->outqueue);
295 __skb_queue_head_init(&l_ptr->deferred_queue);
295 __skb_queue_head_init(&l_ptr->waiting_sks); 296 __skb_queue_head_init(&l_ptr->waiting_sks);
296 297
297 link_reset_statistics(l_ptr); 298 link_reset_statistics(l_ptr);
@@ -398,7 +399,7 @@ void tipc_link_reset_fragments(struct tipc_link *l_ptr)
398 */ 399 */
399void tipc_link_purge_queues(struct tipc_link *l_ptr) 400void tipc_link_purge_queues(struct tipc_link *l_ptr)
400{ 401{
401 kfree_skb_list(l_ptr->oldest_deferred_in); 402 __skb_queue_purge(&l_ptr->deferred_queue);
402 __skb_queue_purge(&l_ptr->outqueue); 403 __skb_queue_purge(&l_ptr->outqueue);
403 tipc_link_reset_fragments(l_ptr); 404 tipc_link_reset_fragments(l_ptr);
404} 405}
@@ -433,7 +434,7 @@ void tipc_link_reset(struct tipc_link *l_ptr)
433 434
434 /* Clean up all queues: */ 435 /* Clean up all queues: */
435 __skb_queue_purge(&l_ptr->outqueue); 436 __skb_queue_purge(&l_ptr->outqueue);
436 kfree_skb_list(l_ptr->oldest_deferred_in); 437 __skb_queue_purge(&l_ptr->deferred_queue);
437 if (!skb_queue_empty(&l_ptr->waiting_sks)) { 438 if (!skb_queue_empty(&l_ptr->waiting_sks)) {
438 skb_queue_splice_init(&l_ptr->waiting_sks, &owner->waiting_sks); 439 skb_queue_splice_init(&l_ptr->waiting_sks, &owner->waiting_sks);
439 owner->action_flags |= TIPC_WAKEUP_USERS; 440 owner->action_flags |= TIPC_WAKEUP_USERS;
@@ -442,9 +443,6 @@ void tipc_link_reset(struct tipc_link *l_ptr)
442 l_ptr->unacked_window = 0; 443 l_ptr->unacked_window = 0;
443 l_ptr->checkpoint = 1; 444 l_ptr->checkpoint = 1;
444 l_ptr->next_out_no = 1; 445 l_ptr->next_out_no = 1;
445 l_ptr->deferred_inqueue_sz = 0;
446 l_ptr->oldest_deferred_in = NULL;
447 l_ptr->newest_deferred_in = NULL;
448 l_ptr->fsm_msg_cnt = 0; 446 l_ptr->fsm_msg_cnt = 0;
449 l_ptr->stale_count = 0; 447 l_ptr->stale_count = 0;
450 link_reset_statistics(l_ptr); 448 link_reset_statistics(l_ptr);
@@ -974,19 +972,23 @@ void tipc_link_retransmit(struct tipc_link *l_ptr, struct sk_buff *skb,
974static struct sk_buff *link_insert_deferred_queue(struct tipc_link *l_ptr, 972static struct sk_buff *link_insert_deferred_queue(struct tipc_link *l_ptr,
975 struct sk_buff *buf) 973 struct sk_buff *buf)
976{ 974{
975 struct sk_buff_head head;
976 struct sk_buff *skb = NULL;
977 u32 seq_no; 977 u32 seq_no;
978 978
979 if (l_ptr->oldest_deferred_in == NULL) 979 if (skb_queue_empty(&l_ptr->deferred_queue))
980 return buf; 980 return buf;
981 981
982 seq_no = buf_seqno(l_ptr->oldest_deferred_in); 982 seq_no = buf_seqno(skb_peek(&l_ptr->deferred_queue));
983 if (seq_no == mod(l_ptr->next_in_no)) { 983 if (seq_no == mod(l_ptr->next_in_no)) {
984 l_ptr->newest_deferred_in->next = buf; 984 __skb_queue_head_init(&head);
985 buf = l_ptr->oldest_deferred_in; 985 skb_queue_splice_tail_init(&l_ptr->deferred_queue, &head);
986 l_ptr->oldest_deferred_in = NULL; 986 skb = head.next;
987 l_ptr->deferred_inqueue_sz = 0; 987 skb->prev = NULL;
988 head.prev->next = buf;
989 head.prev->prev = NULL;
988 } 990 }
989 return buf; 991 return skb;
990} 992}
991 993
992/** 994/**
@@ -1170,7 +1172,7 @@ void tipc_rcv(struct sk_buff *head, struct tipc_bearer *b_ptr)
1170 continue; 1172 continue;
1171 } 1173 }
1172 l_ptr->next_in_no++; 1174 l_ptr->next_in_no++;
1173 if (unlikely(l_ptr->oldest_deferred_in)) 1175 if (unlikely(!skb_queue_empty(&l_ptr->deferred_queue)))
1174 head = link_insert_deferred_queue(l_ptr, head); 1176 head = link_insert_deferred_queue(l_ptr, head);
1175 1177
1176 if (unlikely(++l_ptr->unacked_window >= TIPC_MIN_LINK_WIN)) { 1178 if (unlikely(++l_ptr->unacked_window >= TIPC_MIN_LINK_WIN)) {
@@ -1273,48 +1275,37 @@ static int tipc_link_input(struct tipc_link *l, struct sk_buff *buf)
1273 * 1275 *
1274 * Returns increase in queue length (i.e. 0 or 1) 1276 * Returns increase in queue length (i.e. 0 or 1)
1275 */ 1277 */
1276u32 tipc_link_defer_pkt(struct sk_buff **head, struct sk_buff **tail, 1278u32 tipc_link_defer_pkt(struct sk_buff_head *list, struct sk_buff *skb)
1277 struct sk_buff *buf)
1278{ 1279{
1279 struct sk_buff *queue_buf; 1280 struct sk_buff *skb1;
1280 struct sk_buff **prev; 1281 u32 seq_no = buf_seqno(skb);
1281 u32 seq_no = buf_seqno(buf);
1282
1283 buf->next = NULL;
1284 1282
1285 /* Empty queue ? */ 1283 /* Empty queue ? */
1286 if (*head == NULL) { 1284 if (skb_queue_empty(list)) {
1287 *head = *tail = buf; 1285 __skb_queue_tail(list, skb);
1288 return 1; 1286 return 1;
1289 } 1287 }
1290 1288
1291 /* Last ? */ 1289 /* Last ? */
1292 if (less(buf_seqno(*tail), seq_no)) { 1290 if (less(buf_seqno(skb_peek_tail(list)), seq_no)) {
1293 (*tail)->next = buf; 1291 __skb_queue_tail(list, skb);
1294 *tail = buf;
1295 return 1; 1292 return 1;
1296 } 1293 }
1297 1294
1298 /* Locate insertion point in queue, then insert; discard if duplicate */ 1295 /* Locate insertion point in queue, then insert; discard if duplicate */
1299 prev = head; 1296 skb_queue_walk(list, skb1) {
1300 queue_buf = *head; 1297 u32 curr_seqno = buf_seqno(skb1);
1301 for (;;) {
1302 u32 curr_seqno = buf_seqno(queue_buf);
1303 1298
1304 if (seq_no == curr_seqno) { 1299 if (seq_no == curr_seqno) {
1305 kfree_skb(buf); 1300 kfree_skb(skb);
1306 return 0; 1301 return 0;
1307 } 1302 }
1308 1303
1309 if (less(seq_no, curr_seqno)) 1304 if (less(seq_no, curr_seqno))
1310 break; 1305 break;
1311
1312 prev = &queue_buf->next;
1313 queue_buf = queue_buf->next;
1314 } 1306 }
1315 1307
1316 buf->next = queue_buf; 1308 __skb_queue_before(list, skb1, skb);
1317 *prev = buf;
1318 return 1; 1309 return 1;
1319} 1310}
1320 1311
@@ -1344,15 +1335,14 @@ static void link_handle_out_of_seq_msg(struct tipc_link *l_ptr,
1344 return; 1335 return;
1345 } 1336 }
1346 1337
1347 if (tipc_link_defer_pkt(&l_ptr->oldest_deferred_in, 1338 if (tipc_link_defer_pkt(&l_ptr->deferred_queue, buf)) {
1348 &l_ptr->newest_deferred_in, buf)) {
1349 l_ptr->deferred_inqueue_sz++;
1350 l_ptr->stats.deferred_recv++; 1339 l_ptr->stats.deferred_recv++;
1351 TIPC_SKB_CB(buf)->deferred = true; 1340 TIPC_SKB_CB(buf)->deferred = true;
1352 if ((l_ptr->deferred_inqueue_sz % 16) == 1) 1341 if ((skb_queue_len(&l_ptr->deferred_queue) % 16) == 1)
1353 tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0, 0); 1342 tipc_link_proto_xmit(l_ptr, STATE_MSG, 0, 0, 0, 0, 0);
1354 } else 1343 } else {
1355 l_ptr->stats.duplicates++; 1344 l_ptr->stats.duplicates++;
1345 }
1356} 1346}
1357 1347
1358/* 1348/*
@@ -1388,8 +1378,8 @@ void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int probe_msg,
1388 if (l_ptr->next_out) 1378 if (l_ptr->next_out)
1389 next_sent = buf_seqno(l_ptr->next_out); 1379 next_sent = buf_seqno(l_ptr->next_out);
1390 msg_set_next_sent(msg, next_sent); 1380 msg_set_next_sent(msg, next_sent);
1391 if (l_ptr->oldest_deferred_in) { 1381 if (!skb_queue_empty(&l_ptr->deferred_queue)) {
1392 u32 rec = buf_seqno(l_ptr->oldest_deferred_in); 1382 u32 rec = buf_seqno(skb_peek(&l_ptr->deferred_queue));
1393 gap = mod(rec - mod(l_ptr->next_in_no)); 1383 gap = mod(rec - mod(l_ptr->next_in_no));
1394 } 1384 }
1395 msg_set_seq_gap(msg, gap); 1385 msg_set_seq_gap(msg, gap);
diff --git a/net/tipc/link.h b/net/tipc/link.h
index 96f1e1bf0798..de7b8833641a 100644
--- a/net/tipc/link.h
+++ b/net/tipc/link.h
@@ -124,9 +124,7 @@ struct tipc_stats {
124 * @last_retransmitted: sequence number of most recently retransmitted message 124 * @last_retransmitted: sequence number of most recently retransmitted message
125 * @stale_count: # of identical retransmit requests made by peer 125 * @stale_count: # of identical retransmit requests made by peer
126 * @next_in_no: next sequence number to expect for inbound messages 126 * @next_in_no: next sequence number to expect for inbound messages
127 * @deferred_inqueue_sz: # of messages in inbound message queue 127 * @deferred_queue: deferred queue saved OOS b'cast message received from node
128 * @oldest_deferred_in: ptr to first inbound message in queue
129 * @newest_deferred_in: ptr to last inbound message in queue
130 * @unacked_window: # of inbound messages rx'd without ack'ing back to peer 128 * @unacked_window: # of inbound messages rx'd without ack'ing back to peer
131 * @next_out: ptr to first unsent outbound message in queue 129 * @next_out: ptr to first unsent outbound message in queue
132 * @waiting_sks: linked list of sockets waiting for link congestion to abate 130 * @waiting_sks: linked list of sockets waiting for link congestion to abate
@@ -178,9 +176,7 @@ struct tipc_link {
178 176
179 /* Reception */ 177 /* Reception */
180 u32 next_in_no; 178 u32 next_in_no;
181 u32 deferred_inqueue_sz; 179 struct sk_buff_head deferred_queue;
182 struct sk_buff *oldest_deferred_in;
183 struct sk_buff *newest_deferred_in;
184 u32 unacked_window; 180 u32 unacked_window;
185 181
186 /* Congestion handling */ 182 /* Congestion handling */
@@ -224,8 +220,7 @@ void tipc_link_bundle_rcv(struct sk_buff *buf);
224void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int prob, 220void tipc_link_proto_xmit(struct tipc_link *l_ptr, u32 msg_typ, int prob,
225 u32 gap, u32 tolerance, u32 priority, u32 acked_mtu); 221 u32 gap, u32 tolerance, u32 priority, u32 acked_mtu);
226void tipc_link_push_packets(struct tipc_link *l_ptr); 222void tipc_link_push_packets(struct tipc_link *l_ptr);
227u32 tipc_link_defer_pkt(struct sk_buff **head, struct sk_buff **tail, 223u32 tipc_link_defer_pkt(struct sk_buff_head *list, struct sk_buff *buf);
228 struct sk_buff *buf);
229void tipc_link_set_queue_limits(struct tipc_link *l_ptr, u32 window); 224void tipc_link_set_queue_limits(struct tipc_link *l_ptr, u32 window);
230void tipc_link_retransmit(struct tipc_link *l_ptr, 225void tipc_link_retransmit(struct tipc_link *l_ptr,
231 struct sk_buff *start, u32 retransmits); 226 struct sk_buff *start, u32 retransmits);
diff --git a/net/tipc/node.c b/net/tipc/node.c
index 17b8092f9c40..69b96be09a86 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -116,6 +116,7 @@ struct tipc_node *tipc_node_create(u32 addr)
116 INIT_LIST_HEAD(&n_ptr->publ_list); 116 INIT_LIST_HEAD(&n_ptr->publ_list);
117 INIT_LIST_HEAD(&n_ptr->conn_sks); 117 INIT_LIST_HEAD(&n_ptr->conn_sks);
118 __skb_queue_head_init(&n_ptr->waiting_sks); 118 __skb_queue_head_init(&n_ptr->waiting_sks);
119 __skb_queue_head_init(&n_ptr->bclink.deferred_queue);
119 120
120 hlist_add_head_rcu(&n_ptr->hash, &node_htable[tipc_hashfn(addr)]); 121 hlist_add_head_rcu(&n_ptr->hash, &node_htable[tipc_hashfn(addr)]);
121 122
@@ -381,8 +382,7 @@ static void node_lost_contact(struct tipc_node *n_ptr)
381 382
382 /* Flush broadcast link info associated with lost node */ 383 /* Flush broadcast link info associated with lost node */
383 if (n_ptr->bclink.recv_permitted) { 384 if (n_ptr->bclink.recv_permitted) {
384 kfree_skb_list(n_ptr->bclink.deferred_head); 385 __skb_queue_purge(&n_ptr->bclink.deferred_queue);
385 n_ptr->bclink.deferred_size = 0;
386 386
387 if (n_ptr->bclink.reasm_buf) { 387 if (n_ptr->bclink.reasm_buf) {
388 kfree_skb(n_ptr->bclink.reasm_buf); 388 kfree_skb(n_ptr->bclink.reasm_buf);
diff --git a/net/tipc/node.h b/net/tipc/node.h
index f1994511f033..cbe0e950f1cc 100644
--- a/net/tipc/node.h
+++ b/net/tipc/node.h
@@ -71,9 +71,7 @@ enum {
71 * @last_in: sequence # of last in-sequence b'cast message received from node 71 * @last_in: sequence # of last in-sequence b'cast message received from node
72 * @last_sent: sequence # of last b'cast message sent by node 72 * @last_sent: sequence # of last b'cast message sent by node
73 * @oos_state: state tracker for handling OOS b'cast messages 73 * @oos_state: state tracker for handling OOS b'cast messages
74 * @deferred_size: number of OOS b'cast messages in deferred queue 74 * @deferred_queue: deferred queue saved OOS b'cast message received from node
75 * @deferred_head: oldest OOS b'cast message received from node
76 * @deferred_tail: newest OOS b'cast message received from node
77 * @reasm_buf: broadcast reassembly queue head from node 75 * @reasm_buf: broadcast reassembly queue head from node
78 * @recv_permitted: true if node is allowed to receive b'cast messages 76 * @recv_permitted: true if node is allowed to receive b'cast messages
79 */ 77 */
@@ -83,8 +81,7 @@ struct tipc_node_bclink {
83 u32 last_sent; 81 u32 last_sent;
84 u32 oos_state; 82 u32 oos_state;
85 u32 deferred_size; 83 u32 deferred_size;
86 struct sk_buff *deferred_head; 84 struct sk_buff_head deferred_queue;
87 struct sk_buff *deferred_tail;
88 struct sk_buff *reasm_buf; 85 struct sk_buff *reasm_buf;
89 bool recv_permitted; 86 bool recv_permitted;
90}; 87};