aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorErik Hugne <erik.hugne@ericsson.com>2013-11-06 03:28:06 -0500
committerDavid S. Miller <davem@davemloft.net>2013-11-07 18:30:11 -0500
commit40ba3cdf542a469aaa9083fa041656e59b109b90 (patch)
treebd7e7af25968b0646155e67b58ab0b03ce3d484d
parent528f6f4bf372511ebf6004eed822de8f749b5930 (diff)
tipc: message reassembly using fragment chain
When the first fragment of a long data data message is received on a link, a reassembly buffer large enough to hold the data from this and all subsequent fragments of the message is allocated. The payload of each new fragment is copied into this buffer upon arrival. When the last fragment is received, the reassembled message is delivered upwards to the port/socket layer. Not only is this an inefficient approach, but it may also cause bursts of reassembly failures in low memory situations. since we may fail to allocate the necessary large buffer in the first place. Furthermore, after 100 subsequent such failures the link will be reset, something that in reality aggravates the situation. To remedy this problem, this patch introduces a different approach. Instead of allocating a big reassembly buffer, we now append the arriving fragments to a reassembly chain on the link, and deliver the whole chain up to the socket layer once the last fragment has been received. This is safe because the retransmission layer of a TIPC link always delivers packets in strict uninterrupted order, to the reassembly layer as to all other upper layers. Hence there can never be more than one fragment chain pending reassembly at any given time in a link, and we can trust (but still verify) that the fragments will be chained up in the correct order. Signed-off-by: Erik Hugne <erik.hugne@ericsson.com> Reviewed-by: Paul Gortmaker <paul.gortmaker@windriver.com> Signed-off-by: Jon Maloy <jon.maloy@ericsson.com> Signed-off-by: David S. Miller <davem@davemloft.net>
-rw-r--r--net/tipc/bcast.c12
-rw-r--r--net/tipc/link.c159
-rw-r--r--net/tipc/link.h20
-rw-r--r--net/tipc/msg.h12
-rw-r--r--net/tipc/node.c7
-rw-r--r--net/tipc/node.h6
6 files changed, 74 insertions, 142 deletions
diff --git a/net/tipc/bcast.c b/net/tipc/bcast.c
index 766a6eb4a88f..0d4402587fdf 100644
--- a/net/tipc/bcast.c
+++ b/net/tipc/bcast.c
@@ -480,15 +480,19 @@ receive:
480 tipc_node_unlock(node); 480 tipc_node_unlock(node);
481 tipc_link_recv_bundle(buf); 481 tipc_link_recv_bundle(buf);
482 } else if (msg_user(msg) == MSG_FRAGMENTER) { 482 } else if (msg_user(msg) == MSG_FRAGMENTER) {
483 int ret = tipc_link_recv_fragment(&node->bclink.defragm, 483 int ret;
484 &buf, &msg); 484 ret = tipc_link_recv_fragment(&node->bclink.reasm_head,
485 if (ret < 0) 485 &node->bclink.reasm_tail,
486 &buf);
487 if (ret == LINK_REASM_ERROR)
486 goto unlock; 488 goto unlock;
487 spin_lock_bh(&bc_lock); 489 spin_lock_bh(&bc_lock);
488 bclink_accept_pkt(node, seqno); 490 bclink_accept_pkt(node, seqno);
489 bcl->stats.recv_fragments++; 491 bcl->stats.recv_fragments++;
490 if (ret > 0) { 492 if (ret == LINK_REASM_COMPLETE) {
491 bcl->stats.recv_fragmented++; 493 bcl->stats.recv_fragmented++;
494 /* Point msg to inner header */
495 msg = buf_msg(buf);
492 spin_unlock_bh(&bc_lock); 496 spin_unlock_bh(&bc_lock);
493 goto receive; 497 goto receive;
494 } 498 }
diff --git a/net/tipc/link.c b/net/tipc/link.c
index ada8cadf5af8..a63646e6c2cf 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -404,15 +404,9 @@ static void link_release_outqueue(struct tipc_link *l_ptr)
404 */ 404 */
405void tipc_link_reset_fragments(struct tipc_link *l_ptr) 405void tipc_link_reset_fragments(struct tipc_link *l_ptr)
406{ 406{
407 struct sk_buff *buf = l_ptr->defragm_buf; 407 kfree_skb(l_ptr->reasm_head);
408 struct sk_buff *next; 408 l_ptr->reasm_head = NULL;
409 409 l_ptr->reasm_tail = NULL;
410 while (buf) {
411 next = buf->next;
412 kfree_skb(buf);
413 buf = next;
414 }
415 l_ptr->defragm_buf = NULL;
416} 410}
417 411
418/** 412/**
@@ -1649,13 +1643,15 @@ deliver:
1649 continue; 1643 continue;
1650 case MSG_FRAGMENTER: 1644 case MSG_FRAGMENTER:
1651 l_ptr->stats.recv_fragments++; 1645 l_ptr->stats.recv_fragments++;
1652 ret = tipc_link_recv_fragment(&l_ptr->defragm_buf, 1646 ret = tipc_link_recv_fragment(&l_ptr->reasm_head,
1653 &buf, &msg); 1647 &l_ptr->reasm_tail,
1654 if (ret == 1) { 1648 &buf);
1649 if (ret == LINK_REASM_COMPLETE) {
1655 l_ptr->stats.recv_fragmented++; 1650 l_ptr->stats.recv_fragmented++;
1651 msg = buf_msg(buf);
1656 goto deliver; 1652 goto deliver;
1657 } 1653 }
1658 if (ret == -1) 1654 if (ret == LINK_REASM_ERROR)
1659 l_ptr->next_in_no--; 1655 l_ptr->next_in_no--;
1660 tipc_node_unlock(n_ptr); 1656 tipc_node_unlock(n_ptr);
1661 continue; 1657 continue;
@@ -2343,114 +2339,47 @@ static int link_send_long_buf(struct tipc_link *l_ptr, struct sk_buff *buf)
2343} 2339}
2344 2340
2345/* 2341/*
2346 * A pending message being re-assembled must store certain values
2347 * to handle subsequent fragments correctly. The following functions
2348 * help storing these values in unused, available fields in the
2349 * pending message. This makes dynamic memory allocation unnecessary.
2350 */
2351static void set_long_msg_seqno(struct sk_buff *buf, u32 seqno)
2352{
2353 msg_set_seqno(buf_msg(buf), seqno);
2354}
2355
2356static u32 get_fragm_size(struct sk_buff *buf)
2357{
2358 return msg_ack(buf_msg(buf));
2359}
2360
2361static void set_fragm_size(struct sk_buff *buf, u32 sz)
2362{
2363 msg_set_ack(buf_msg(buf), sz);
2364}
2365
2366static u32 get_expected_frags(struct sk_buff *buf)
2367{
2368 return msg_bcast_ack(buf_msg(buf));
2369}
2370
2371static void set_expected_frags(struct sk_buff *buf, u32 exp)
2372{
2373 msg_set_bcast_ack(buf_msg(buf), exp);
2374}
2375
2376/*
2377 * tipc_link_recv_fragment(): Called with node lock on. Returns 2342 * tipc_link_recv_fragment(): Called with node lock on. Returns
2378 * the reassembled buffer if message is complete. 2343 * the reassembled buffer if message is complete.
2379 */ 2344 */
2380int tipc_link_recv_fragment(struct sk_buff **pending, struct sk_buff **fb, 2345int tipc_link_recv_fragment(struct sk_buff **head, struct sk_buff **tail,
2381 struct tipc_msg **m) 2346 struct sk_buff **fbuf)
2382{ 2347{
2383 struct sk_buff *prev = NULL; 2348 struct sk_buff *frag = *fbuf;
2384 struct sk_buff *fbuf = *fb; 2349 struct tipc_msg *msg = buf_msg(frag);
2385 struct tipc_msg *fragm = buf_msg(fbuf); 2350 u32 fragid = msg_type(msg);
2386 struct sk_buff *pbuf = *pending; 2351 bool headstolen;
2387 u32 long_msg_seq_no = msg_long_msgno(fragm); 2352 int delta;
2388 2353
2389 *fb = NULL; 2354 skb_pull(frag, msg_hdr_sz(msg));
2390 2355 if (fragid == FIRST_FRAGMENT) {
2391 /* Is there an incomplete message waiting for this fragment? */ 2356 if (*head || skb_unclone(frag, GFP_ATOMIC))
2392 while (pbuf && ((buf_seqno(pbuf) != long_msg_seq_no) || 2357 goto out_free;
2393 (msg_orignode(fragm) != msg_orignode(buf_msg(pbuf))))) { 2358 *head = frag;
2394 prev = pbuf; 2359 skb_frag_list_init(*head);
2395 pbuf = pbuf->next;
2396 }
2397
2398 if (!pbuf && (msg_type(fragm) == FIRST_FRAGMENT)) {
2399 struct tipc_msg *imsg = (struct tipc_msg *)msg_data(fragm);
2400 u32 msg_sz = msg_size(imsg);
2401 u32 fragm_sz = msg_data_sz(fragm);
2402 u32 exp_fragm_cnt;
2403 u32 max = TIPC_MAX_USER_MSG_SIZE + NAMED_H_SIZE;
2404
2405 if (msg_type(imsg) == TIPC_MCAST_MSG)
2406 max = TIPC_MAX_USER_MSG_SIZE + MCAST_H_SIZE;
2407 if (fragm_sz == 0 || msg_size(imsg) > max) {
2408 kfree_skb(fbuf);
2409 return 0;
2410 }
2411 exp_fragm_cnt = msg_sz / fragm_sz + !!(msg_sz % fragm_sz);
2412 pbuf = tipc_buf_acquire(msg_size(imsg));
2413 if (pbuf != NULL) {
2414 pbuf->next = *pending;
2415 *pending = pbuf;
2416 skb_copy_to_linear_data(pbuf, imsg,
2417 msg_data_sz(fragm));
2418 /* Prepare buffer for subsequent fragments. */
2419 set_long_msg_seqno(pbuf, long_msg_seq_no);
2420 set_fragm_size(pbuf, fragm_sz);
2421 set_expected_frags(pbuf, exp_fragm_cnt - 1);
2422 } else {
2423 pr_debug("Link unable to reassemble fragmented message\n");
2424 kfree_skb(fbuf);
2425 return -1;
2426 }
2427 kfree_skb(fbuf);
2428 return 0;
2429 } else if (pbuf && (msg_type(fragm) != FIRST_FRAGMENT)) {
2430 u32 dsz = msg_data_sz(fragm);
2431 u32 fsz = get_fragm_size(pbuf);
2432 u32 crs = ((msg_fragm_no(fragm) - 1) * fsz);
2433 u32 exp_frags = get_expected_frags(pbuf) - 1;
2434 skb_copy_to_linear_data_offset(pbuf, crs,
2435 msg_data(fragm), dsz);
2436 kfree_skb(fbuf);
2437
2438 /* Is message complete? */
2439 if (exp_frags == 0) {
2440 if (prev)
2441 prev->next = pbuf->next;
2442 else
2443 *pending = pbuf->next;
2444 msg_reset_reroute_cnt(buf_msg(pbuf));
2445 *fb = pbuf;
2446 *m = buf_msg(pbuf);
2447 return 1;
2448 }
2449 set_expected_frags(pbuf, exp_frags);
2450 return 0; 2360 return 0;
2361 } else if (skb_try_coalesce(*head, frag, &headstolen, &delta)) {
2362 kfree_skb_partial(frag, headstolen);
2363 } else {
2364 if (!*head)
2365 goto out_free;
2366 if (!skb_has_frag_list(*head))
2367 skb_shinfo(*head)->frag_list = frag;
2368 else
2369 (*tail)->next = frag;
2370 *tail = frag;
2371 (*head)->truesize += frag->truesize;
2372 }
2373 if (fragid == LAST_FRAGMENT) {
2374 *fbuf = *head;
2375 *tail = *head = NULL;
2376 return LINK_REASM_COMPLETE;
2451 } 2377 }
2452 kfree_skb(fbuf);
2453 return 0; 2378 return 0;
2379out_free:
2380 pr_warn_ratelimited("Link unable to reassemble fragmented message\n");
2381 kfree_skb(*fbuf);
2382 return LINK_REASM_ERROR;
2454} 2383}
2455 2384
2456static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tolerance) 2385static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tolerance)
diff --git a/net/tipc/link.h b/net/tipc/link.h
index 55cf8554a08b..8a6c1026644d 100644
--- a/net/tipc/link.h
+++ b/net/tipc/link.h
@@ -41,6 +41,12 @@
41#include "node.h" 41#include "node.h"
42 42
43/* 43/*
44 * Link reassembly status codes
45 */
46#define LINK_REASM_ERROR -1
47#define LINK_REASM_COMPLETE 1
48
49/*
44 * Out-of-range value for link sequence numbers 50 * Out-of-range value for link sequence numbers
45 */ 51 */
46#define INVALID_LINK_SEQ 0x10000 52#define INVALID_LINK_SEQ 0x10000
@@ -134,7 +140,8 @@ struct tipc_stats {
134 * @next_out: ptr to first unsent outbound message in queue 140 * @next_out: ptr to first unsent outbound message in queue
135 * @waiting_ports: linked list of ports waiting for link congestion to abate 141 * @waiting_ports: linked list of ports waiting for link congestion to abate
136 * @long_msg_seq_no: next identifier to use for outbound fragmented messages 142 * @long_msg_seq_no: next identifier to use for outbound fragmented messages
137 * @defragm_buf: list of partially reassembled inbound message fragments 143 * @reasm_head: list head of partially reassembled inbound message fragments
144 * @reasm_tail: last fragment received
138 * @stats: collects statistics regarding link activity 145 * @stats: collects statistics regarding link activity
139 */ 146 */
140struct tipc_link { 147struct tipc_link {
@@ -196,9 +203,10 @@ struct tipc_link {
196 struct sk_buff *next_out; 203 struct sk_buff *next_out;
197 struct list_head waiting_ports; 204 struct list_head waiting_ports;
198 205
199 /* Fragmentation/defragmentation */ 206 /* Fragmentation/reassembly */
200 u32 long_msg_seq_no; 207 u32 long_msg_seq_no;
201 struct sk_buff *defragm_buf; 208 struct sk_buff *reasm_head;
209 struct sk_buff *reasm_tail;
202 210
203 /* Statistics */ 211 /* Statistics */
204 struct tipc_stats stats; 212 struct tipc_stats stats;
@@ -229,9 +237,9 @@ int tipc_link_send_sections_fast(struct tipc_port *sender,
229 struct iovec const *msg_sect, 237 struct iovec const *msg_sect,
230 unsigned int len, u32 destnode); 238 unsigned int len, u32 destnode);
231void tipc_link_recv_bundle(struct sk_buff *buf); 239void tipc_link_recv_bundle(struct sk_buff *buf);
232int tipc_link_recv_fragment(struct sk_buff **pending, 240int tipc_link_recv_fragment(struct sk_buff **reasm_head,
233 struct sk_buff **fb, 241 struct sk_buff **reasm_tail,
234 struct tipc_msg **msg); 242 struct sk_buff **fbuf);
235void tipc_link_send_proto_msg(struct tipc_link *l_ptr, u32 msg_typ, int prob, 243void tipc_link_send_proto_msg(struct tipc_link *l_ptr, u32 msg_typ, int prob,
236 u32 gap, u32 tolerance, u32 priority, 244 u32 gap, u32 tolerance, u32 priority,
237 u32 acked_mtu); 245 u32 acked_mtu);
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index 559b73a9bf35..76d1269b9443 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -554,12 +554,6 @@ static inline void msg_set_last_bcast(struct tipc_msg *m, u32 n)
554 msg_set_bits(m, 4, 16, 0xffff, n); 554 msg_set_bits(m, 4, 16, 0xffff, n);
555} 555}
556 556
557
558static inline u32 msg_fragm_no(struct tipc_msg *m)
559{
560 return msg_bits(m, 4, 16, 0xffff);
561}
562
563static inline void msg_set_fragm_no(struct tipc_msg *m, u32 n) 557static inline void msg_set_fragm_no(struct tipc_msg *m, u32 n)
564{ 558{
565 msg_set_bits(m, 4, 16, 0xffff, n); 559 msg_set_bits(m, 4, 16, 0xffff, n);
@@ -576,12 +570,6 @@ static inline void msg_set_next_sent(struct tipc_msg *m, u32 n)
576 msg_set_bits(m, 4, 0, 0xffff, n); 570 msg_set_bits(m, 4, 0, 0xffff, n);
577} 571}
578 572
579
580static inline u32 msg_long_msgno(struct tipc_msg *m)
581{
582 return msg_bits(m, 4, 0, 0xffff);
583}
584
585static inline void msg_set_long_msgno(struct tipc_msg *m, u32 n) 573static inline void msg_set_long_msgno(struct tipc_msg *m, u32 n)
586{ 574{
587 msg_set_bits(m, 4, 0, 0xffff, n); 575 msg_set_bits(m, 4, 0, 0xffff, n);
diff --git a/net/tipc/node.c b/net/tipc/node.c
index 6e6c434872e8..25100c0a6fe8 100644
--- a/net/tipc/node.c
+++ b/net/tipc/node.c
@@ -298,9 +298,10 @@ static void node_lost_contact(struct tipc_node *n_ptr)
298 } 298 }
299 n_ptr->bclink.deferred_size = 0; 299 n_ptr->bclink.deferred_size = 0;
300 300
301 if (n_ptr->bclink.defragm) { 301 if (n_ptr->bclink.reasm_head) {
302 kfree_skb(n_ptr->bclink.defragm); 302 kfree_skb(n_ptr->bclink.reasm_head);
303 n_ptr->bclink.defragm = NULL; 303 n_ptr->bclink.reasm_head = NULL;
304 n_ptr->bclink.reasm_tail = NULL;
304 } 305 }
305 306
306 tipc_bclink_remove_node(n_ptr->addr); 307 tipc_bclink_remove_node(n_ptr->addr);
diff --git a/net/tipc/node.h b/net/tipc/node.h
index 3c189b35b102..e5e96c04e167 100644
--- a/net/tipc/node.h
+++ b/net/tipc/node.h
@@ -74,7 +74,8 @@
74 * @deferred_size: number of OOS b'cast messages in deferred queue 74 * @deferred_size: number of OOS b'cast messages in deferred queue
75 * @deferred_head: oldest OOS b'cast message received from node 75 * @deferred_head: oldest OOS b'cast message received from node
76 * @deferred_tail: newest OOS b'cast message received from node 76 * @deferred_tail: newest OOS b'cast message received from node
77 * @defragm: list of partially reassembled b'cast message fragments from node 77 * @reasm_head: broadcast reassembly queue head from node
78 * @reasm_tail: last broadcast fragment received from node
78 * @recv_permitted: true if node is allowed to receive b'cast messages 79 * @recv_permitted: true if node is allowed to receive b'cast messages
79 */ 80 */
80struct tipc_node { 81struct tipc_node {
@@ -98,7 +99,8 @@ struct tipc_node {
98 u32 deferred_size; 99 u32 deferred_size;
99 struct sk_buff *deferred_head; 100 struct sk_buff *deferred_head;
100 struct sk_buff *deferred_tail; 101 struct sk_buff *deferred_tail;
101 struct sk_buff *defragm; 102 struct sk_buff *reasm_head;
103 struct sk_buff *reasm_tail;
102 bool recv_permitted; 104 bool recv_permitted;
103 } bclink; 105 } bclink;
104}; 106};