aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDavid S. Miller <davem@davemloft.net>2013-11-07 18:30:35 -0500
committerDavid S. Miller <davem@davemloft.net>2013-11-07 18:30:35 -0500
commit95ed40196f965177ee0d044ab304e5cab3aee9c1 (patch)
tree01dd481f7f1aca3d2151abc31b14a2d92ed3a7b8
parentb0db7b0c21a014d01be1018db68e78ebf7d4f0d7 (diff)
parenta715b49e79b0924863ff1424f9823cc1b5972322 (diff)
Merge branch 'tipc_fragmentation'
Erik Hugne says: ==================== tipc: message reassembly using fragment chain We introduce a new reassembly algorithm that improves performance and eliminates the risk of causing out-of-memory situations. v3: -Use skb_try_coalesce, and revert to fraglist if this does not succeed. -Make sure reassembly list head is uncloned. v2: -Rebased on Ying's indentation fix. -Node unlock call in msg_fragmenter case moved from patch #2 to #1. ('continue' with this lock held would cause spinlock recursion if only patch #1 is used) ==================== Signed-off-by: David S. Miller <davem@davemloft.net>
-rw-r--r--net/tipc/bcast.c16
-rw-r--r--net/tipc/link.c164
-rw-r--r--net/tipc/link.h20
-rw-r--r--net/tipc/msg.h12
-rw-r--r--net/tipc/node.c7
-rw-r--r--net/tipc/node.h6
6 files changed, 80 insertions, 145 deletions
diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index 716de1ac6cb5..0d4402587fdf 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -480,18 +480,24 @@ receive:
480 tipc_node_unlock(node); 480 tipc_node_unlock(node);
481 tipc_link_recv_bundle(buf); 481 tipc_link_recv_bundle(buf);
482 } else if (msg_user(msg) == MSG_FRAGMENTER) { 482 } else if (msg_user(msg) == MSG_FRAGMENTER) {
483 int ret = tipc_link_recv_fragment(&node->bclink.defragm, 483 int ret;
484 &buf, &msg); 484 ret = tipc_link_recv_fragment(&node->bclink.reasm_head,
485 if (ret < 0) 485 &node->bclink.reasm_tail,
486 &buf);
487 if (ret == LINK_REASM_ERROR)
486 goto unlock; 488 goto unlock;
487 spin_lock_bh(&bc_lock); 489 spin_lock_bh(&bc_lock);
488 bclink_accept_pkt(node, seqno); 490 bclink_accept_pkt(node, seqno);
489 bcl->stats.recv_fragments++; 491 bcl->stats.recv_fragments++;
490 if (ret > 0) 492 if (ret == LINK_REASM_COMPLETE) {
491 bcl->stats.recv_fragmented++; 493 bcl->stats.recv_fragmented++;
494 /* Point msg to inner header */
495 msg = buf_msg(buf);
496 spin_unlock_bh(&bc_lock);
497 goto receive;
498 }
492 spin_unlock_bh(&bc_lock); 499 spin_unlock_bh(&bc_lock);
493 tipc_node_unlock(node); 500 tipc_node_unlock(node);
494 tipc_net_route_msg(buf);
495 } else if (msg_user(msg) == NAME_DISTRIBUTOR) { 501 } else if (msg_user(msg) == NAME_DISTRIBUTOR) {
496 spin_lock_bh(&bc_lock); 502 spin_lock_bh(&bc_lock);
497 bclink_accept_pkt(node, seqno); 503 bclink_accept_pkt(node, seqno);
diff --git a/net/tipc/link.c b/net/tipc/link.c
index 54163f91b8ae..cf465d66ccde 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -404,15 +404,9 @@ static void link_release_outqueue(struct tipc_link *l_ptr)
404 */ 404 */
405void tipc_link_reset_fragments(struct tipc_link *l_ptr) 405void tipc_link_reset_fragments(struct tipc_link *l_ptr)
406{ 406{
407 struct sk_buff *buf = l_ptr->defragm_buf; 407 kfree_skb(l_ptr->reasm_head);
408 struct sk_buff *next; 408 l_ptr->reasm_head = NULL;
409 409 l_ptr->reasm_tail = NULL;
410 while (buf) {
411 next = buf->next;
412 kfree_skb(buf);
413 buf = next;
414 }
415 l_ptr->defragm_buf = NULL;
416} 410}
417 411
418/** 412/**
@@ -1649,15 +1643,18 @@ deliver:
1649 continue; 1643 continue;
1650 case MSG_FRAGMENTER: 1644 case MSG_FRAGMENTER:
1651 l_ptr->stats.recv_fragments++; 1645 l_ptr->stats.recv_fragments++;
1652 ret = tipc_link_recv_fragment(&l_ptr->defragm_buf, 1646 ret = tipc_link_recv_fragment(&l_ptr->reasm_head,
1653 &buf, &msg); 1647 &l_ptr->reasm_tail,
1654 if (ret == 1) { 1648 &buf);
1649 if (ret == LINK_REASM_COMPLETE) {
1655 l_ptr->stats.recv_fragmented++; 1650 l_ptr->stats.recv_fragmented++;
1651 msg = buf_msg(buf);
1656 goto deliver; 1652 goto deliver;
1657 } 1653 }
1658 if (ret == -1) 1654 if (ret == LINK_REASM_ERROR)
1659 l_ptr->next_in_no--; 1655 tipc_link_reset(l_ptr);
1660 break; 1656 tipc_node_unlock(n_ptr);
1657 continue;
1661 case CHANGEOVER_PROTOCOL: 1658 case CHANGEOVER_PROTOCOL:
1662 type = msg_type(msg); 1659 type = msg_type(msg);
1663 if (link_recv_changeover_msg(&l_ptr, &buf)) { 1660 if (link_recv_changeover_msg(&l_ptr, &buf)) {
@@ -2342,114 +2339,47 @@ static int link_send_long_buf(struct tipc_link *l_ptr, struct sk_buff *buf)
2342} 2339}
2343 2340
2344/* 2341/*
2345 * A pending message being re-assembled must store certain values
2346 * to handle subsequent fragments correctly. The following functions
2347 * help storing these values in unused, available fields in the
2348 * pending message. This makes dynamic memory allocation unnecessary.
2349 */
2350static void set_long_msg_seqno(struct sk_buff *buf, u32 seqno)
2351{
2352 msg_set_seqno(buf_msg(buf), seqno);
2353}
2354
2355static u32 get_fragm_size(struct sk_buff *buf)
2356{
2357 return msg_ack(buf_msg(buf));
2358}
2359
2360static void set_fragm_size(struct sk_buff *buf, u32 sz)
2361{
2362 msg_set_ack(buf_msg(buf), sz);
2363}
2364
2365static u32 get_expected_frags(struct sk_buff *buf)
2366{
2367 return msg_bcast_ack(buf_msg(buf));
2368}
2369
2370static void set_expected_frags(struct sk_buff *buf, u32 exp)
2371{
2372 msg_set_bcast_ack(buf_msg(buf), exp);
2373}
2374
2375/*
2376 * tipc_link_recv_fragment(): Called with node lock on. Returns 2342 * tipc_link_recv_fragment(): Called with node lock on. Returns
2377 * the reassembled buffer if message is complete. 2343 * the reassembled buffer if message is complete.
2378 */ 2344 */
2379int tipc_link_recv_fragment(struct sk_buff **pending, struct sk_buff **fb, 2345int tipc_link_recv_fragment(struct sk_buff **head, struct sk_buff **tail,
2380 struct tipc_msg **m) 2346 struct sk_buff **fbuf)
2381{ 2347{
2382 struct sk_buff *prev = NULL; 2348 struct sk_buff *frag = *fbuf;
2383 struct sk_buff *fbuf = *fb; 2349 struct tipc_msg *msg = buf_msg(frag);
2384 struct tipc_msg *fragm = buf_msg(fbuf); 2350 u32 fragid = msg_type(msg);
2385 struct sk_buff *pbuf = *pending; 2351 bool headstolen;
2386 u32 long_msg_seq_no = msg_long_msgno(fragm); 2352 int delta;
2387 2353
2388 *fb = NULL; 2354 skb_pull(frag, msg_hdr_sz(msg));
2389 2355 if (fragid == FIRST_FRAGMENT) {
2390 /* Is there an incomplete message waiting for this fragment? */ 2356 if (*head || skb_unclone(frag, GFP_ATOMIC))
2391 while (pbuf && ((buf_seqno(pbuf) != long_msg_seq_no) || 2357 goto out_free;
2392 (msg_orignode(fragm) != msg_orignode(buf_msg(pbuf))))) { 2358 *head = frag;
2393 prev = pbuf; 2359 skb_frag_list_init(*head);
2394 pbuf = pbuf->next;
2395 }
2396
2397 if (!pbuf && (msg_type(fragm) == FIRST_FRAGMENT)) {
2398 struct tipc_msg *imsg = (struct tipc_msg *)msg_data(fragm);
2399 u32 msg_sz = msg_size(imsg);
2400 u32 fragm_sz = msg_data_sz(fragm);
2401 u32 exp_fragm_cnt;
2402 u32 max = TIPC_MAX_USER_MSG_SIZE + NAMED_H_SIZE;
2403
2404 if (msg_type(imsg) == TIPC_MCAST_MSG)
2405 max = TIPC_MAX_USER_MSG_SIZE + MCAST_H_SIZE;
2406 if (fragm_sz == 0 || msg_size(imsg) > max) {
2407 kfree_skb(fbuf);
2408 return 0;
2409 }
2410 exp_fragm_cnt = msg_sz / fragm_sz + !!(msg_sz % fragm_sz);
2411 pbuf = tipc_buf_acquire(msg_size(imsg));
2412 if (pbuf != NULL) {
2413 pbuf->next = *pending;
2414 *pending = pbuf;
2415 skb_copy_to_linear_data(pbuf, imsg,
2416 msg_data_sz(fragm));
2417 /* Prepare buffer for subsequent fragments. */
2418 set_long_msg_seqno(pbuf, long_msg_seq_no);
2419 set_fragm_size(pbuf, fragm_sz);
2420 set_expected_frags(pbuf, exp_fragm_cnt - 1);
2421 } else {
2422 pr_debug("Link unable to reassemble fragmented message\n");
2423 kfree_skb(fbuf);
2424 return -1;
2425 }
2426 kfree_skb(fbuf);
2427 return 0;
2428 } else if (pbuf && (msg_type(fragm) != FIRST_FRAGMENT)) {
2429 u32 dsz = msg_data_sz(fragm);
2430 u32 fsz = get_fragm_size(pbuf);
2431 u32 crs = ((msg_fragm_no(fragm) - 1) * fsz);
2432 u32 exp_frags = get_expected_frags(pbuf) - 1;
2433 skb_copy_to_linear_data_offset(pbuf, crs,
2434 msg_data(fragm), dsz);
2435 kfree_skb(fbuf);
2436
2437 /* Is message complete? */
2438 if (exp_frags == 0) {
2439 if (prev)
2440 prev->next = pbuf->next;
2441 else
2442 *pending = pbuf->next;
2443 msg_reset_reroute_cnt(buf_msg(pbuf));
2444 *fb = pbuf;
2445 *m = buf_msg(pbuf);
2446 return 1;
2447 }
2448 set_expected_frags(pbuf, exp_frags);
2449 return 0; 2360 return 0;
2361 } else if (skb_try_coalesce(*head, frag, &headstolen, &delta)) {
2362 kfree_skb_partial(frag, headstolen);
2363 } else {
2364 if (!*head)
2365 goto out_free;
2366 if (!skb_has_frag_list(*head))
2367 skb_shinfo(*head)->frag_list = frag;
2368 else
2369 (*tail)->next = frag;
2370 *tail = frag;
2371 (*head)->truesize += frag->truesize;
2372 }
2373 if (fragid == LAST_FRAGMENT) {
2374 *fbuf = *head;
2375 *tail = *head = NULL;
2376 return LINK_REASM_COMPLETE;
2450 } 2377 }
2451 kfree_skb(fbuf);
2452 return 0; 2378 return 0;
2379out_free:
2380 pr_warn_ratelimited("Link unable to reassemble fragmented message\n");
2381 kfree_skb(*fbuf);
2382 return LINK_REASM_ERROR;
2453} 2383}
2454 2384
2455static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tolerance) 2385static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tolerance)
diff --git a/net/tipc/link.h b/net/tipc/link.h
index 55cf8554a08b..8a6c1026644d 100644
--- a/net/tipc/link.h
+++ b/net/tipc/link.h
@@ -41,6 +41,12 @@
41#include "node.h" 41#include "node.h"
42 42
43/* 43/*
44 * Link reassembly status codes
45 */
46#define LINK_REASM_ERROR -1
47#define LINK_REASM_COMPLETE 1
48
49/*
44 * Out-of-range value for link sequence numbers 50 * Out-of-range value for link sequence numbers
45 */ 51 */
46#define INVALID_LINK_SEQ 0x10000 52#define INVALID_LINK_SEQ 0x10000
@@ -134,7 +140,8 @@ struct tipc_stats {
134 * @next_out: ptr to first unsent outbound message in queue 140 * @next_out: ptr to first unsent outbound message in queue
135 * @waiting_ports: linked list of ports waiting for link congestion to abate 141 * @waiting_ports: linked list of ports waiting for link congestion to abate
136 * @long_msg_seq_no: next identifier to use for outbound fragmented messages 142 * @long_msg_seq_no: next identifier to use for outbound fragmented messages
137 * @defragm_buf: list of partially reassembled inbound message fragments 143 * @reasm_head: list head of partially reassembled inbound message fragments
144 * @reasm_tail: last fragment received
138 * @stats: collects statistics regarding link activity 145 * @stats: collects statistics regarding link activity
139 */ 146 */
140struct tipc_link { 147struct tipc_link {
@@ -196,9 +203,10 @@ struct tipc_link {
196 struct sk_buff *next_out; 203 struct sk_buff *next_out;
197 struct list_head waiting_ports; 204 struct list_head waiting_ports;
198 205
199 /* Fragmentation/defragmentation */ 206 /* Fragmentation/reassembly */
200 u32 long_msg_seq_no; 207 u32 long_msg_seq_no;
201 struct sk_buff *defragm_buf; 208 struct sk_buff *reasm_head;
209 struct sk_buff *reasm_tail;
202 210
203 /* Statistics */ 211 /* Statistics */
204 struct tipc_stats stats; 212 struct tipc_stats stats;
@@ -229,9 +237,9 @@ int tipc_link_send_sections_fast(struct tipc_port *sender,
229 struct iovec const *msg_sect, 237 struct iovec const *msg_sect,
230 unsigned int len, u32 destnode); 238 unsigned int len, u32 destnode);
231void tipc_link_recv_bundle(struct sk_buff *buf); 239void tipc_link_recv_bundle(struct sk_buff *buf);
232int tipc_link_recv_fragment(struct sk_buff **pending, 240int tipc_link_recv_fragment(struct sk_buff **reasm_head,
233 struct sk_buff **fb, 241 struct sk_buff **reasm_tail,
234 struct tipc_msg **msg); 242 struct sk_buff **fbuf);
235void tipc_link_send_proto_msg(struct tipc_link *l_ptr, u32 msg_typ, int prob, 243void tipc_link_send_proto_msg(struct tipc_link *l_ptr, u32 msg_typ, int prob,
236 u32 gap, u32 tolerance, u32 priority, 244 u32 gap, u32 tolerance, u32 priority,
237 u32 acked_mtu); 245 u32 acked_mtu);
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index 559b73a9bf35..76d1269b9443 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -554,12 +554,6 @@ static inline void msg_set_last_bcast(struct tipc_msg *m, u32 n)
554 msg_set_bits(m, 4, 16, 0xffff, n); 554 msg_set_bits(m, 4, 16, 0xffff, n);
555} 555}
556 556
557
558static inline u32 msg_fragm_no(struct tipc_msg *m)
559{
560 return msg_bits(m, 4, 16, 0xffff);
561}
562
563static inline void msg_set_fragm_no(struct tipc_msg *m, u32 n) 557static inline void msg_set_fragm_no(struct tipc_msg *m, u32 n)
564{ 558{
565 msg_set_bits(m, 4, 16, 0xffff, n); 559 msg_set_bits(m, 4, 16, 0xffff, n);
@@ -576,12 +570,6 @@ static inline void msg_set_next_sent(struct tipc_msg *m, u32 n)
576 msg_set_bits(m, 4, 0, 0xffff, n); 570 msg_set_bits(m, 4, 0, 0xffff, n);
577} 571}
578 572
579
580static inline u32 msg_long_msgno(struct tipc_msg *m)
581{
582 return msg_bits(m, 4, 0, 0xffff);
583}
584
585static inline void msg_set_long_msgno(struct tipc_msg *m, u32 n) 573static inline void msg_set_long_msgno(struct tipc_msg *m, u32 n)
586{ 574{
587 msg_set_bits(m, 4, 0, 0xffff, n); 575 msg_set_bits(m, 4, 0, 0xffff, n);
diff --git a/net/tipc/node.c b/net/tipc/node.c
index 6e6c434872e8..25100c0a6fe8 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -298,9 +298,10 @@ static void node_lost_contact(struct tipc_node *n_ptr)
298 } 298 }
299 n_ptr->bclink.deferred_size = 0; 299 n_ptr->bclink.deferred_size = 0;
300 300
301 if (n_ptr->bclink.defragm) { 301 if (n_ptr->bclink.reasm_head) {
302 kfree_skb(n_ptr->bclink.defragm); 302 kfree_skb(n_ptr->bclink.reasm_head);
303 n_ptr->bclink.defragm = NULL; 303 n_ptr->bclink.reasm_head = NULL;
304 n_ptr->bclink.reasm_tail = NULL;
304 } 305 }
305 306
306 tipc_bclink_remove_node(n_ptr->addr); 307 tipc_bclink_remove_node(n_ptr->addr);
diff --git a/net/tipc/node.h b/net/tipc/node.h
index 3c189b35b102..e5e96c04e167 100644
--- a/net/tipc/node.h
+++ b/net/tipc/node.h
@@ -74,7 +74,8 @@
74 * @deferred_size: number of OOS b'cast messages in deferred queue 74 * @deferred_size: number of OOS b'cast messages in deferred queue
75 * @deferred_head: oldest OOS b'cast message received from node 75 * @deferred_head: oldest OOS b'cast message received from node
76 * @deferred_tail: newest OOS b'cast message received from node 76 * @deferred_tail: newest OOS b'cast message received from node
77 * @defragm: list of partially reassembled b'cast message fragments from node 77 * @reasm_head: broadcast reassembly queue head from node
78 * @reasm_tail: last broadcast fragment received from node
78 * @recv_permitted: true if node is allowed to receive b'cast messages 79 * @recv_permitted: true if node is allowed to receive b'cast messages
79 */ 80 */
80struct tipc_node { 81struct tipc_node {
@@ -98,7 +99,8 @@ struct tipc_node {
98 u32 deferred_size; 99 u32 deferred_size;
99 struct sk_buff *deferred_head; 100 struct sk_buff *deferred_head;
100 struct sk_buff *deferred_tail; 101 struct sk_buff *deferred_tail;
101 struct sk_buff *defragm; 102 struct sk_buff *reasm_head;
103 struct sk_buff *reasm_tail;
102 bool recv_permitted; 104 bool recv_permitted;
103 } bclink; 105 } bclink;
104}; 106};