aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorJon Paul Maloy <jon.maloy@ericsson.com>2016-05-02 11:58:47 -0400
committerDavid S. Miller <davem@davemloft.net>2016-05-03 15:51:16 -0400
commit10724cc7bb7832b482df049c20fd824d928c5eaa (patch)
treef8ce3ae55f12a36bc4361d9ee515b3f301051c65
parent60020e1857042387cdcd4cd6680a9e5496213379 (diff)
tipc: redesign connection-level flow control
There are two flow control mechanisms in TIPC; one at link level that handles network congestion, burst control, and retransmission, and one at connection level which' only remaining task is to prevent overflow in the receiving socket buffer. In TIPC, the latter task has to be solved end-to-end because messages can not be thrown away once they have been accepted and delivered upwards from the link layer, i.e, we can never permit the receive buffer to overflow. Currently, this algorithm is message based. A counter in the receiving socket keeps track of number of consumed messages, and sends a dedicated acknowledge message back to the sender for each 256 consumed message. A counter at the sending end keeps track of the sent, not yet acknowledged messages, and blocks the sender if this number ever reaches 512 unacknowledged messages. When the missing acknowledge arrives, the socket is then woken up for renewed transmission. This works well for keeping the message flow running, as it almost never happens that a sender socket is blocked this way. A problem with the current mechanism is that it potentially is very memory consuming. Since we don't distinguish between small and large messages, we have to dimension the socket receive buffer according to a worst-case of both. I.e., the window size must be chosen large enough to sustain a reasonable throughput even for the smallest messages, while we must still consider a scenario where all messages are of maximum size. Hence, the current fix window size of 512 messages and a maximum message size of 66k results in a receive buffer of 66 MB when truesize(66k) = 131k is taken into account. It is possible to do much better. This commit introduces an algorithm where we instead use 1024-byte blocks as base unit. This unit, always rounded upwards from the actual message size, is used when we advertise windows as well as when we count and acknowledge transmitted data. The advertised window is based on the configured receive buffer size in such a way that even the worst-case truesize/msgsize ratio always is covered. Since the smallest possible message size (from a flow control viewpoint) now is 1024 bytes, we can safely assume this ratio to be less than four, which is the value we are now using. This way, we have been able to reduce the default receive buffer size from 66 MB to 2 MB with maintained performance. In order to keep this solution backwards compatible, we introduce a new capability bit in the discovery protocol, and use this throughout the message sending/reception path to always select the right unit. Acked-by: Ying Xue <ying.xue@windriver.com> Signed-off-by: Jon Maloy <jon.maloy@ericsson.com> Signed-off-by: David S. Miller <davem@davemloft.net>
-rw-r--r--net/tipc/core.c8
-rw-r--r--net/tipc/msg.h14
-rw-r--r--net/tipc/node.h5
-rw-r--r--net/tipc/socket.c140
-rw-r--r--net/tipc/socket.h17
5 files changed, 122 insertions, 62 deletions
diff --git a/net/tipc/core.c b/net/tipc/core.c
index e2bdb07a49a2..fe1b062c4f18 100644
--- a/net/tipc/core.c
+++ b/net/tipc/core.c
@@ -112,11 +112,9 @@ static int __init tipc_init(void)
112 112
113 pr_info("Activated (version " TIPC_MOD_VER ")\n"); 113 pr_info("Activated (version " TIPC_MOD_VER ")\n");
114 114
115 sysctl_tipc_rmem[0] = TIPC_CONN_OVERLOAD_LIMIT >> 4 << 115 sysctl_tipc_rmem[0] = RCVBUF_MIN;
116 TIPC_LOW_IMPORTANCE; 116 sysctl_tipc_rmem[1] = RCVBUF_DEF;
117 sysctl_tipc_rmem[1] = TIPC_CONN_OVERLOAD_LIMIT >> 4 << 117 sysctl_tipc_rmem[2] = RCVBUF_MAX;
118 TIPC_CRITICAL_IMPORTANCE;
119 sysctl_tipc_rmem[2] = TIPC_CONN_OVERLOAD_LIMIT;
120 118
121 err = tipc_netlink_start(); 119 err = tipc_netlink_start();
122 if (err) 120 if (err)
diff --git a/net/tipc/msg.h b/net/tipc/msg.h
index 58bf51541813..024da8af91f0 100644
--- a/net/tipc/msg.h
+++ b/net/tipc/msg.h
@@ -743,16 +743,26 @@ static inline void msg_set_msgcnt(struct tipc_msg *m, u16 n)
743 msg_set_bits(m, 9, 16, 0xffff, n); 743 msg_set_bits(m, 9, 16, 0xffff, n);
744} 744}
745 745
746static inline u32 msg_bcast_tag(struct tipc_msg *m) 746static inline u32 msg_conn_ack(struct tipc_msg *m)
747{ 747{
748 return msg_bits(m, 9, 16, 0xffff); 748 return msg_bits(m, 9, 16, 0xffff);
749} 749}
750 750
751static inline void msg_set_bcast_tag(struct tipc_msg *m, u32 n) 751static inline void msg_set_conn_ack(struct tipc_msg *m, u32 n)
752{ 752{
753 msg_set_bits(m, 9, 16, 0xffff, n); 753 msg_set_bits(m, 9, 16, 0xffff, n);
754} 754}
755 755
756static inline u32 msg_adv_win(struct tipc_msg *m)
757{
758 return msg_bits(m, 9, 0, 0xffff);
759}
760
761static inline void msg_set_adv_win(struct tipc_msg *m, u32 n)
762{
763 msg_set_bits(m, 9, 0, 0xffff, n);
764}
765
756static inline u32 msg_max_pkt(struct tipc_msg *m) 766static inline u32 msg_max_pkt(struct tipc_msg *m)
757{ 767{
758 return msg_bits(m, 9, 16, 0xffff) * 4; 768 return msg_bits(m, 9, 16, 0xffff) * 4;
diff --git a/net/tipc/node.h b/net/tipc/node.h
index 18237684ffc4..8264b3d97dc4 100644
--- a/net/tipc/node.h
+++ b/net/tipc/node.h
@@ -45,10 +45,11 @@
45/* Optional capabilities supported by this code version 45/* Optional capabilities supported by this code version
46 */ 46 */
47enum { 47enum {
48 TIPC_BCAST_SYNCH = (1 << 1) 48 TIPC_BCAST_SYNCH = (1 << 1),
49 TIPC_BLOCK_FLOWCTL = (2 << 1)
49}; 50};
50 51
51#define TIPC_NODE_CAPABILITIES TIPC_BCAST_SYNCH 52#define TIPC_NODE_CAPABILITIES (TIPC_BCAST_SYNCH | TIPC_BLOCK_FLOWCTL)
52#define INVALID_BEARER_ID -1 53#define INVALID_BEARER_ID -1
53 54
54void tipc_node_stop(struct net *net); 55void tipc_node_stop(struct net *net);
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index 94bd28639855..12628890c219 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -96,9 +96,11 @@ struct tipc_sock {
96 uint conn_timeout; 96 uint conn_timeout;
97 atomic_t dupl_rcvcnt; 97 atomic_t dupl_rcvcnt;
98 bool link_cong; 98 bool link_cong;
99 uint sent_unacked; 99 u16 snt_unacked;
100 uint rcv_unacked; 100 u16 snd_win;
101 u16 peer_caps; 101 u16 peer_caps;
102 u16 rcv_unacked;
103 u16 rcv_win;
102 struct sockaddr_tipc remote; 104 struct sockaddr_tipc remote;
103 struct rhash_head node; 105 struct rhash_head node;
104 struct rcu_head rcu; 106 struct rcu_head rcu;
@@ -228,9 +230,29 @@ static struct tipc_sock *tipc_sk(const struct sock *sk)
228 return container_of(sk, struct tipc_sock, sk); 230 return container_of(sk, struct tipc_sock, sk);
229} 231}
230 232
231static int tsk_conn_cong(struct tipc_sock *tsk) 233static bool tsk_conn_cong(struct tipc_sock *tsk)
232{ 234{
233 return tsk->sent_unacked >= TIPC_FLOWCTRL_WIN; 235 return tsk->snt_unacked >= tsk->snd_win;
236}
237
238/* tsk_blocks(): translate a buffer size in bytes to number of
239 * advertisable blocks, taking into account the ratio truesize(len)/len
240 * We can trust that this ratio is always < 4 for len >= FLOWCTL_BLK_SZ
241 */
242static u16 tsk_adv_blocks(int len)
243{
244 return len / FLOWCTL_BLK_SZ / 4;
245}
246
247/* tsk_inc(): increment counter for sent or received data
248 * - If block based flow control is not supported by peer we
249 * fall back to message based ditto, incrementing the counter
250 */
251static u16 tsk_inc(struct tipc_sock *tsk, int msglen)
252{
253 if (likely(tsk->peer_caps & TIPC_BLOCK_FLOWCTL))
254 return ((msglen / FLOWCTL_BLK_SZ) + 1);
255 return 1;
234} 256}
235 257
236/** 258/**
@@ -378,9 +400,12 @@ static int tipc_sk_create(struct net *net, struct socket *sock,
378 sk->sk_write_space = tipc_write_space; 400 sk->sk_write_space = tipc_write_space;
379 sk->sk_destruct = tipc_sock_destruct; 401 sk->sk_destruct = tipc_sock_destruct;
380 tsk->conn_timeout = CONN_TIMEOUT_DEFAULT; 402 tsk->conn_timeout = CONN_TIMEOUT_DEFAULT;
381 tsk->sent_unacked = 0;
382 atomic_set(&tsk->dupl_rcvcnt, 0); 403 atomic_set(&tsk->dupl_rcvcnt, 0);
383 404
405 /* Start out with safe limits until we receive an advertised window */
406 tsk->snd_win = tsk_adv_blocks(RCVBUF_MIN);
407 tsk->rcv_win = tsk->snd_win;
408
384 if (sock->state == SS_READY) { 409 if (sock->state == SS_READY) {
385 tsk_set_unreturnable(tsk, true); 410 tsk_set_unreturnable(tsk, true);
386 if (sock->type == SOCK_DGRAM) 411 if (sock->type == SOCK_DGRAM)
@@ -776,7 +801,7 @@ static void tipc_sk_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb)
776 struct sock *sk = &tsk->sk; 801 struct sock *sk = &tsk->sk;
777 struct tipc_msg *hdr = buf_msg(skb); 802 struct tipc_msg *hdr = buf_msg(skb);
778 int mtyp = msg_type(hdr); 803 int mtyp = msg_type(hdr);
779 int conn_cong; 804 bool conn_cong;
780 805
781 /* Ignore if connection cannot be validated: */ 806 /* Ignore if connection cannot be validated: */
782 if (!tsk_peer_msg(tsk, hdr)) 807 if (!tsk_peer_msg(tsk, hdr))
@@ -790,7 +815,9 @@ static void tipc_sk_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb)
790 return; 815 return;
791 } else if (mtyp == CONN_ACK) { 816 } else if (mtyp == CONN_ACK) {
792 conn_cong = tsk_conn_cong(tsk); 817 conn_cong = tsk_conn_cong(tsk);
793 tsk->sent_unacked -= msg_msgcnt(hdr); 818 tsk->snt_unacked -= msg_conn_ack(hdr);
819 if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL)
820 tsk->snd_win = msg_adv_win(hdr);
794 if (conn_cong) 821 if (conn_cong)
795 sk->sk_write_space(sk); 822 sk->sk_write_space(sk);
796 } else if (mtyp != CONN_PROBE_REPLY) { 823 } else if (mtyp != CONN_PROBE_REPLY) {
@@ -1021,12 +1048,14 @@ static int __tipc_send_stream(struct socket *sock, struct msghdr *m, size_t dsz)
1021 u32 dnode; 1048 u32 dnode;
1022 uint mtu, send, sent = 0; 1049 uint mtu, send, sent = 0;
1023 struct iov_iter save; 1050 struct iov_iter save;
1051 int hlen = MIN_H_SIZE;
1024 1052
1025 /* Handle implied connection establishment */ 1053 /* Handle implied connection establishment */
1026 if (unlikely(dest)) { 1054 if (unlikely(dest)) {
1027 rc = __tipc_sendmsg(sock, m, dsz); 1055 rc = __tipc_sendmsg(sock, m, dsz);
1056 hlen = msg_hdr_sz(mhdr);
1028 if (dsz && (dsz == rc)) 1057 if (dsz && (dsz == rc))
1029 tsk->sent_unacked = 1; 1058 tsk->snt_unacked = tsk_inc(tsk, dsz + hlen);
1030 return rc; 1059 return rc;
1031 } 1060 }
1032 if (dsz > (uint)INT_MAX) 1061 if (dsz > (uint)INT_MAX)
@@ -1055,7 +1084,7 @@ next:
1055 if (likely(!tsk_conn_cong(tsk))) { 1084 if (likely(!tsk_conn_cong(tsk))) {
1056 rc = tipc_node_xmit(net, &pktchain, dnode, portid); 1085 rc = tipc_node_xmit(net, &pktchain, dnode, portid);
1057 if (likely(!rc)) { 1086 if (likely(!rc)) {
1058 tsk->sent_unacked++; 1087 tsk->snt_unacked += tsk_inc(tsk, send + hlen);
1059 sent += send; 1088 sent += send;
1060 if (sent == dsz) 1089 if (sent == dsz)
1061 return dsz; 1090 return dsz;
@@ -1120,6 +1149,12 @@ static void tipc_sk_finish_conn(struct tipc_sock *tsk, u32 peer_port,
1120 tipc_node_add_conn(net, peer_node, tsk->portid, peer_port); 1149 tipc_node_add_conn(net, peer_node, tsk->portid, peer_port);
1121 tsk->max_pkt = tipc_node_get_mtu(net, peer_node, tsk->portid); 1150 tsk->max_pkt = tipc_node_get_mtu(net, peer_node, tsk->portid);
1122 tsk->peer_caps = tipc_node_get_capabilities(net, peer_node); 1151 tsk->peer_caps = tipc_node_get_capabilities(net, peer_node);
1152 if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL)
1153 return;
1154
1155 /* Fall back to message based flow control */
1156 tsk->rcv_win = FLOWCTL_MSG_WIN;
1157 tsk->snd_win = FLOWCTL_MSG_WIN;
1123} 1158}
1124 1159
1125/** 1160/**
@@ -1216,7 +1251,7 @@ static int tipc_sk_anc_data_recv(struct msghdr *m, struct tipc_msg *msg,
1216 return 0; 1251 return 0;
1217} 1252}
1218 1253
1219static void tipc_sk_send_ack(struct tipc_sock *tsk, uint ack) 1254static void tipc_sk_send_ack(struct tipc_sock *tsk)
1220{ 1255{
1221 struct net *net = sock_net(&tsk->sk); 1256 struct net *net = sock_net(&tsk->sk);
1222 struct sk_buff *skb = NULL; 1257 struct sk_buff *skb = NULL;
@@ -1232,7 +1267,14 @@ static void tipc_sk_send_ack(struct tipc_sock *tsk, uint ack)
1232 if (!skb) 1267 if (!skb)
1233 return; 1268 return;
1234 msg = buf_msg(skb); 1269 msg = buf_msg(skb);
1235 msg_set_msgcnt(msg, ack); 1270 msg_set_conn_ack(msg, tsk->rcv_unacked);
1271 tsk->rcv_unacked = 0;
1272
1273 /* Adjust to and advertize the correct window limit */
1274 if (tsk->peer_caps & TIPC_BLOCK_FLOWCTL) {
1275 tsk->rcv_win = tsk_adv_blocks(tsk->sk.sk_rcvbuf);
1276 msg_set_adv_win(msg, tsk->rcv_win);
1277 }
1236 tipc_node_xmit_skb(net, skb, dnode, msg_link_selector(msg)); 1278 tipc_node_xmit_skb(net, skb, dnode, msg_link_selector(msg));
1237} 1279}
1238 1280
@@ -1290,7 +1332,7 @@ static int tipc_recvmsg(struct socket *sock, struct msghdr *m, size_t buf_len,
1290 long timeo; 1332 long timeo;
1291 unsigned int sz; 1333 unsigned int sz;
1292 u32 err; 1334 u32 err;
1293 int res; 1335 int res, hlen;
1294 1336
1295 /* Catch invalid receive requests */ 1337 /* Catch invalid receive requests */
1296 if (unlikely(!buf_len)) 1338 if (unlikely(!buf_len))
@@ -1315,6 +1357,7 @@ restart:
1315 buf = skb_peek(&sk->sk_receive_queue); 1357 buf = skb_peek(&sk->sk_receive_queue);
1316 msg = buf_msg(buf); 1358 msg = buf_msg(buf);
1317 sz = msg_data_sz(msg); 1359 sz = msg_data_sz(msg);
1360 hlen = msg_hdr_sz(msg);
1318 err = msg_errcode(msg); 1361 err = msg_errcode(msg);
1319 1362
1320 /* Discard an empty non-errored message & try again */ 1363 /* Discard an empty non-errored message & try again */
@@ -1337,7 +1380,7 @@ restart:
1337 sz = buf_len; 1380 sz = buf_len;
1338 m->msg_flags |= MSG_TRUNC; 1381 m->msg_flags |= MSG_TRUNC;
1339 } 1382 }
1340 res = skb_copy_datagram_msg(buf, msg_hdr_sz(msg), m, sz); 1383 res = skb_copy_datagram_msg(buf, hlen, m, sz);
1341 if (res) 1384 if (res)
1342 goto exit; 1385 goto exit;
1343 res = sz; 1386 res = sz;
@@ -1349,15 +1392,15 @@ restart:
1349 res = -ECONNRESET; 1392 res = -ECONNRESET;
1350 } 1393 }
1351 1394
1352 /* Consume received message (optional) */ 1395 if (unlikely(flags & MSG_PEEK))
1353 if (likely(!(flags & MSG_PEEK))) { 1396 goto exit;
1354 if ((sock->state != SS_READY) && 1397
1355 (++tsk->rcv_unacked >= TIPC_CONNACK_INTV)) { 1398 if (likely(sock->state != SS_READY)) {
1356 tipc_sk_send_ack(tsk, tsk->rcv_unacked); 1399 tsk->rcv_unacked += tsk_inc(tsk, hlen + sz);
1357 tsk->rcv_unacked = 0; 1400 if (unlikely(tsk->rcv_unacked >= (tsk->rcv_win / 4)))
1358 } 1401 tipc_sk_send_ack(tsk);
1359 tsk_advance_rx_queue(sk);
1360 } 1402 }
1403 tsk_advance_rx_queue(sk);
1361exit: 1404exit:
1362 release_sock(sk); 1405 release_sock(sk);
1363 return res; 1406 return res;
@@ -1386,7 +1429,7 @@ static int tipc_recv_stream(struct socket *sock, struct msghdr *m,
1386 int sz_to_copy, target, needed; 1429 int sz_to_copy, target, needed;
1387 int sz_copied = 0; 1430 int sz_copied = 0;
1388 u32 err; 1431 u32 err;
1389 int res = 0; 1432 int res = 0, hlen;
1390 1433
1391 /* Catch invalid receive attempts */ 1434 /* Catch invalid receive attempts */
1392 if (unlikely(!buf_len)) 1435 if (unlikely(!buf_len))
@@ -1412,6 +1455,7 @@ restart:
1412 buf = skb_peek(&sk->sk_receive_queue); 1455 buf = skb_peek(&sk->sk_receive_queue);
1413 msg = buf_msg(buf); 1456 msg = buf_msg(buf);
1414 sz = msg_data_sz(msg); 1457 sz = msg_data_sz(msg);
1458 hlen = msg_hdr_sz(msg);
1415 err = msg_errcode(msg); 1459 err = msg_errcode(msg);
1416 1460
1417 /* Discard an empty non-errored message & try again */ 1461 /* Discard an empty non-errored message & try again */
@@ -1436,8 +1480,7 @@ restart:
1436 needed = (buf_len - sz_copied); 1480 needed = (buf_len - sz_copied);
1437 sz_to_copy = (sz <= needed) ? sz : needed; 1481 sz_to_copy = (sz <= needed) ? sz : needed;
1438 1482
1439 res = skb_copy_datagram_msg(buf, msg_hdr_sz(msg) + offset, 1483 res = skb_copy_datagram_msg(buf, hlen + offset, m, sz_to_copy);
1440 m, sz_to_copy);
1441 if (res) 1484 if (res)
1442 goto exit; 1485 goto exit;
1443 1486
@@ -1459,20 +1502,18 @@ restart:
1459 res = -ECONNRESET; 1502 res = -ECONNRESET;
1460 } 1503 }
1461 1504
1462 /* Consume received message (optional) */ 1505 if (unlikely(flags & MSG_PEEK))
1463 if (likely(!(flags & MSG_PEEK))) { 1506 goto exit;
1464 if (unlikely(++tsk->rcv_unacked >= TIPC_CONNACK_INTV)) { 1507
1465 tipc_sk_send_ack(tsk, tsk->rcv_unacked); 1508 tsk->rcv_unacked += tsk_inc(tsk, hlen + sz);
1466 tsk->rcv_unacked = 0; 1509 if (unlikely(tsk->rcv_unacked >= (tsk->rcv_win / 4)))
1467 } 1510 tipc_sk_send_ack(tsk);
1468 tsk_advance_rx_queue(sk); 1511 tsk_advance_rx_queue(sk);
1469 }
1470 1512
1471 /* Loop around if more data is required */ 1513 /* Loop around if more data is required */
1472 if ((sz_copied < buf_len) && /* didn't get all requested data */ 1514 if ((sz_copied < buf_len) && /* didn't get all requested data */
1473 (!skb_queue_empty(&sk->sk_receive_queue) || 1515 (!skb_queue_empty(&sk->sk_receive_queue) ||
1474 (sz_copied < target)) && /* and more is ready or required */ 1516 (sz_copied < target)) && /* and more is ready or required */
1475 (!(flags & MSG_PEEK)) && /* and aren't just peeking at data */
1476 (!err)) /* and haven't reached a FIN */ 1517 (!err)) /* and haven't reached a FIN */
1477 goto restart; 1518 goto restart;
1478 1519
@@ -1604,30 +1645,33 @@ static bool filter_connect(struct tipc_sock *tsk, struct sk_buff *skb)
1604/** 1645/**
1605 * rcvbuf_limit - get proper overload limit of socket receive queue 1646 * rcvbuf_limit - get proper overload limit of socket receive queue
1606 * @sk: socket 1647 * @sk: socket
1607 * @buf: message 1648 * @skb: message
1608 * 1649 *
1609 * For all connection oriented messages, irrespective of importance, 1650 * For connection oriented messages, irrespective of importance,
1610 * the default overload value (i.e. 67MB) is set as limit. 1651 * default queue limit is 2 MB.
1611 * 1652 *
1612 * For all connectionless messages, by default new queue limits are 1653 * For connectionless messages, queue limits are based on message
1613 * as belows: 1654 * importance as follows:
1614 * 1655 *
1615 * TIPC_LOW_IMPORTANCE (4 MB) 1656 * TIPC_LOW_IMPORTANCE (2 MB)
1616 * TIPC_MEDIUM_IMPORTANCE (8 MB) 1657 * TIPC_MEDIUM_IMPORTANCE (4 MB)
1617 * TIPC_HIGH_IMPORTANCE (16 MB) 1658 * TIPC_HIGH_IMPORTANCE (8 MB)
1618 * TIPC_CRITICAL_IMPORTANCE (32 MB) 1659 * TIPC_CRITICAL_IMPORTANCE (16 MB)
1619 * 1660 *
1620 * Returns overload limit according to corresponding message importance 1661 * Returns overload limit according to corresponding message importance
1621 */ 1662 */
1622static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *buf) 1663static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *skb)
1623{ 1664{
1624 struct tipc_msg *msg = buf_msg(buf); 1665 struct tipc_sock *tsk = tipc_sk(sk);
1666 struct tipc_msg *hdr = buf_msg(skb);
1667
1668 if (unlikely(!msg_connected(hdr)))
1669 return sk->sk_rcvbuf << msg_importance(hdr);
1625 1670
1626 if (msg_connected(msg)) 1671 if (likely(tsk->peer_caps & TIPC_BLOCK_FLOWCTL))
1627 return sysctl_tipc_rmem[2]; 1672 return sk->sk_rcvbuf;
1628 1673
1629 return sk->sk_rcvbuf >> TIPC_CRITICAL_IMPORTANCE << 1674 return FLOWCTL_MSG_LIM;
1630 msg_importance(msg);
1631} 1675}
1632 1676
1633/** 1677/**
diff --git a/net/tipc/socket.h b/net/tipc/socket.h
index 4241f22069dc..06fb5944cf76 100644
--- a/net/tipc/socket.h
+++ b/net/tipc/socket.h
@@ -1,6 +1,6 @@
1/* net/tipc/socket.h: Include file for TIPC socket code 1/* net/tipc/socket.h: Include file for TIPC socket code
2 * 2 *
3 * Copyright (c) 2014-2015, Ericsson AB 3 * Copyright (c) 2014-2016, Ericsson AB
4 * All rights reserved. 4 * All rights reserved.
5 * 5 *
6 * Redistribution and use in source and binary forms, with or without 6 * Redistribution and use in source and binary forms, with or without
@@ -38,10 +38,17 @@
38#include <net/sock.h> 38#include <net/sock.h>
39#include <net/genetlink.h> 39#include <net/genetlink.h>
40 40
41#define TIPC_CONNACK_INTV 256 41/* Compatibility values for deprecated message based flow control */
42#define TIPC_FLOWCTRL_WIN (TIPC_CONNACK_INTV * 2) 42#define FLOWCTL_MSG_WIN 512
43#define TIPC_CONN_OVERLOAD_LIMIT ((TIPC_FLOWCTRL_WIN * 2 + 1) * \ 43#define FLOWCTL_MSG_LIM ((FLOWCTL_MSG_WIN * 2 + 1) * SKB_TRUESIZE(MAX_MSG_SIZE))
44 SKB_TRUESIZE(TIPC_MAX_USER_MSG_SIZE)) 44
45#define FLOWCTL_BLK_SZ 1024
46
47/* Socket receive buffer sizes */
48#define RCVBUF_MIN (FLOWCTL_BLK_SZ * 512)
49#define RCVBUF_DEF (FLOWCTL_BLK_SZ * 1024 * 2)
50#define RCVBUF_MAX (FLOWCTL_BLK_SZ * 1024 * 16)
51
45int tipc_socket_init(void); 52int tipc_socket_init(void);
46void tipc_socket_stop(void); 53void tipc_socket_stop(void);
47void tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq); 54void tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq);