aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc/msg.c
diff options
context:
space:
mode:
authorJon Maloy <jon.maloy@ericsson.com>2017-11-30 10:47:25 -0500
committerDavid S. Miller <davem@davemloft.net>2017-12-01 15:21:25 -0500
commit4c94cc2d3d57a2e843ab10887f67faa82c2337f9 (patch)
tree5a45310f3e6802cd7bc958806ea770c23cab66d5 /net/tipc/msg.c
parent201c78e05c5adaffa163b022c9b3a4d30debe100 (diff)
tipc: fall back to smaller MTU if allocation of local send skb fails
When sending node local messages the code is using an 'mtu' of 66060 bytes to avoid unnecessary fragmentation. During situations of low memory tipc_msg_build() may sometimes fail to allocate such large buffers, resulting in unnecessary send failures. This can easily be remedied by falling back to a smaller MTU, and then reassemble the buffer chain as if the message were arriving from a remote node. At the same time, we change the initial MTU setting of the broadcast link to a lower value, so that large messages always are fragmented into smaller buffers even when we run in single node mode. Apart from obtaining the same advantage as for the 'fallback' solution above, this turns out to give a significant performance improvement. This can probably be explained with the __pskb_copy() operation performed on the buffer for each recipient during reception. We found the optimal value for this, considering the most relevant skb pool, to be 3744 bytes. Acked-by: Ying Xue <ying.xue@ericsson.com> Signed-off-by: Jon Maloy <jon.maloy@ericsson.com> Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net/tipc/msg.c')
-rw-r--r--net/tipc/msg.c51
1 files changed, 44 insertions, 7 deletions
diff --git a/net/tipc/msg.c b/net/tipc/msg.c
index b0d07b35909d..55d8ba92291d 100644
--- a/net/tipc/msg.c
+++ b/net/tipc/msg.c
@@ -251,20 +251,23 @@ bool tipc_msg_validate(struct sk_buff **_skb)
251 * @pktmax: Max packet size that can be used 251 * @pktmax: Max packet size that can be used
252 * @list: Buffer or chain of buffers to be returned to caller 252 * @list: Buffer or chain of buffers to be returned to caller
253 * 253 *
254 * Note that the recursive call we are making here is safe, since it can
255 * logically go only one further level down.
256 *
254 * Returns message data size or errno: -ENOMEM, -EFAULT 257 * Returns message data size or errno: -ENOMEM, -EFAULT
255 */ 258 */
256int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m, 259int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m, int offset,
257 int offset, int dsz, int pktmax, struct sk_buff_head *list) 260 int dsz, int pktmax, struct sk_buff_head *list)
258{ 261{
259 int mhsz = msg_hdr_sz(mhdr); 262 int mhsz = msg_hdr_sz(mhdr);
263 struct tipc_msg pkthdr;
260 int msz = mhsz + dsz; 264 int msz = mhsz + dsz;
261 int pktno = 1;
262 int pktsz;
263 int pktrem = pktmax; 265 int pktrem = pktmax;
264 int drem = dsz;
265 struct tipc_msg pkthdr;
266 struct sk_buff *skb; 266 struct sk_buff *skb;
267 int drem = dsz;
268 int pktno = 1;
267 char *pktpos; 269 char *pktpos;
270 int pktsz;
268 int rc; 271 int rc;
269 272
270 msg_set_size(mhdr, msz); 273 msg_set_size(mhdr, msz);
@@ -272,8 +275,18 @@ int tipc_msg_build(struct tipc_msg *mhdr, struct msghdr *m,
272 /* No fragmentation needed? */ 275 /* No fragmentation needed? */
273 if (likely(msz <= pktmax)) { 276 if (likely(msz <= pktmax)) {
274 skb = tipc_buf_acquire(msz, GFP_KERNEL); 277 skb = tipc_buf_acquire(msz, GFP_KERNEL);
275 if (unlikely(!skb)) 278
279 /* Fall back to smaller MTU if node local message */
280 if (unlikely(!skb)) {
281 if (pktmax != MAX_MSG_SIZE)
282 return -ENOMEM;
283 rc = tipc_msg_build(mhdr, m, offset, dsz, FB_MTU, list);
284 if (rc != dsz)
285 return rc;
286 if (tipc_msg_assemble(list))
287 return dsz;
276 return -ENOMEM; 288 return -ENOMEM;
289 }
277 skb_orphan(skb); 290 skb_orphan(skb);
278 __skb_queue_tail(list, skb); 291 __skb_queue_tail(list, skb);
279 skb_copy_to_linear_data(skb, mhdr, mhsz); 292 skb_copy_to_linear_data(skb, mhdr, mhsz);
@@ -589,6 +602,30 @@ bool tipc_msg_lookup_dest(struct net *net, struct sk_buff *skb, int *err)
589 return true; 602 return true;
590} 603}
591 604
605/* tipc_msg_assemble() - assemble chain of fragments into one message
606 */
607bool tipc_msg_assemble(struct sk_buff_head *list)
608{
609 struct sk_buff *skb, *tmp = NULL;
610
611 if (skb_queue_len(list) == 1)
612 return true;
613
614 while ((skb = __skb_dequeue(list))) {
615 skb->next = NULL;
616 if (tipc_buf_append(&tmp, &skb)) {
617 __skb_queue_tail(list, skb);
618 return true;
619 }
620 if (!tmp)
621 break;
622 }
623 __skb_queue_purge(list);
624 __skb_queue_head_init(list);
625 pr_warn("Failed do assemble buffer\n");
626 return false;
627}
628
592/* tipc_msg_reassemble() - clone a buffer chain of fragments and 629/* tipc_msg_reassemble() - clone a buffer chain of fragments and
593 * reassemble the clones into one message 630 * reassemble the clones into one message
594 */ 631 */