aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc
diff options
context:
space:
mode:
authorYing Xue <ying.xue@windriver.com>2013-01-20 17:30:09 -0500
committerPaul Gortmaker <paul.gortmaker@windriver.com>2013-02-15 17:01:58 -0500
commitaba79f332f46ca8529f3e62a9fc2926c8fe75e44 (patch)
treebd22bd59d69f28e71f075912ec024082adf5a36a /net/tipc
parent57467e56293796f780e91a24600a732516f534ac (diff)
tipc: byte-based overload control on socket receive queue
Change overload control to be purely byte-based, using sk->sk_rmem_alloc as byte counter, and compare it to a calculated upper limit for the socket receive queue. For all connection messages, irrespective of message importance, the overload limit is set to a constant value (i.e, 67MB). This limit should normally never be reached because of the lower limit used by the flow control algorithm, and is there only as a last resort in case a faulty peer doesn't respect the send window limit. For datagram messages, message importance is taken into account when calculating the overload limit. The calculation is based on sk->sk_rcvbuf, and is hence configurable via the socket option SO_RCVBUF. Cc: Neil Horman <nhorman@tuxdriver.com> Signed-off-by: Ying Xue <ying.xue@windriver.com> Signed-off-by: Jon Maloy <jon.maloy@ericsson.com> Signed-off-by: Paul Gortmaker <paul.gortmaker@windriver.com>
Diffstat (limited to 'net/tipc')
-rw-r--r--net/tipc/socket.c77
1 files changed, 39 insertions, 38 deletions
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index f6ceecd44749..cbe2f6ecf07a 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -43,7 +43,8 @@
43#define SS_LISTENING -1 /* socket is listening */ 43#define SS_LISTENING -1 /* socket is listening */
44#define SS_READY -2 /* socket is connectionless */ 44#define SS_READY -2 /* socket is connectionless */
45 45
46#define OVERLOAD_LIMIT_BASE 10000 46#define CONN_OVERLOAD_LIMIT ((TIPC_FLOW_CONTROL_WIN * 2 + 1) * \
47 SKB_TRUESIZE(TIPC_MAX_USER_MSG_SIZE))
47#define CONN_TIMEOUT_DEFAULT 8000 /* default connect timeout = 8s */ 48#define CONN_TIMEOUT_DEFAULT 8000 /* default connect timeout = 8s */
48 49
49struct tipc_sock { 50struct tipc_sock {
@@ -202,7 +203,6 @@ static int tipc_create(struct net *net, struct socket *sock, int protocol,
202 203
203 sock_init_data(sock, sk); 204 sock_init_data(sock, sk);
204 sk->sk_backlog_rcv = backlog_rcv; 205 sk->sk_backlog_rcv = backlog_rcv;
205 sk->sk_rcvbuf = TIPC_FLOW_CONTROL_WIN * 2 * TIPC_MAX_USER_MSG_SIZE * 2;
206 sk->sk_data_ready = tipc_data_ready; 206 sk->sk_data_ready = tipc_data_ready;
207 sk->sk_write_space = tipc_write_space; 207 sk->sk_write_space = tipc_write_space;
208 tipc_sk(sk)->p = tp_ptr; 208 tipc_sk(sk)->p = tp_ptr;
@@ -1142,34 +1142,6 @@ static void tipc_data_ready(struct sock *sk, int len)
1142} 1142}
1143 1143
1144/** 1144/**
1145 * rx_queue_full - determine if receive queue can accept another message
1146 * @msg: message to be added to queue
1147 * @queue_size: current size of queue
1148 * @base: nominal maximum size of queue
1149 *
1150 * Returns 1 if queue is unable to accept message, 0 otherwise
1151 */
1152static int rx_queue_full(struct tipc_msg *msg, u32 queue_size, u32 base)
1153{
1154 u32 threshold;
1155 u32 imp = msg_importance(msg);
1156
1157 if (imp == TIPC_LOW_IMPORTANCE)
1158 threshold = base;
1159 else if (imp == TIPC_MEDIUM_IMPORTANCE)
1160 threshold = base * 2;
1161 else if (imp == TIPC_HIGH_IMPORTANCE)
1162 threshold = base * 100;
1163 else
1164 return 0;
1165
1166 if (msg_connected(msg))
1167 threshold *= 4;
1168
1169 return queue_size >= threshold;
1170}
1171
1172/**
1173 * filter_connect - Handle all incoming messages for a connection-based socket 1145 * filter_connect - Handle all incoming messages for a connection-based socket
1174 * @tsock: TIPC socket 1146 * @tsock: TIPC socket
1175 * @msg: message 1147 * @msg: message
@@ -1247,6 +1219,36 @@ static u32 filter_connect(struct tipc_sock *tsock, struct sk_buff **buf)
1247} 1219}
1248 1220
1249/** 1221/**
1222 * rcvbuf_limit - get proper overload limit of socket receive queue
1223 * @sk: socket
1224 * @buf: message
1225 *
1226 * For all connection oriented messages, irrespective of importance,
1227 * the default overload value (i.e. 67MB) is set as limit.
1228 *
1229 * For all connectionless messages, by default new queue limits are
1230 * as belows:
1231 *
1232 * TIPC_LOW_IMPORTANCE (5MB)
1233 * TIPC_MEDIUM_IMPORTANCE (10MB)
1234 * TIPC_HIGH_IMPORTANCE (20MB)
1235 * TIPC_CRITICAL_IMPORTANCE (40MB)
1236 *
1237 * Returns overload limit according to corresponding message importance
1238 */
1239static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *buf)
1240{
1241 struct tipc_msg *msg = buf_msg(buf);
1242 unsigned int limit;
1243
1244 if (msg_connected(msg))
1245 limit = CONN_OVERLOAD_LIMIT;
1246 else
1247 limit = sk->sk_rcvbuf << (msg_importance(msg) + 5);
1248 return limit;
1249}
1250
1251/**
1250 * filter_rcv - validate incoming message 1252 * filter_rcv - validate incoming message
1251 * @sk: socket 1253 * @sk: socket
1252 * @buf: message 1254 * @buf: message
@@ -1262,7 +1264,7 @@ static u32 filter_rcv(struct sock *sk, struct sk_buff *buf)
1262{ 1264{
1263 struct socket *sock = sk->sk_socket; 1265 struct socket *sock = sk->sk_socket;
1264 struct tipc_msg *msg = buf_msg(buf); 1266 struct tipc_msg *msg = buf_msg(buf);
1265 u32 recv_q_len; 1267 unsigned int limit = rcvbuf_limit(sk, buf);
1266 u32 res = TIPC_OK; 1268 u32 res = TIPC_OK;
1267 1269
1268 /* Reject message if it is wrong sort of message for socket */ 1270 /* Reject message if it is wrong sort of message for socket */
@@ -1279,15 +1281,13 @@ static u32 filter_rcv(struct sock *sk, struct sk_buff *buf)
1279 } 1281 }
1280 1282
1281 /* Reject message if there isn't room to queue it */ 1283 /* Reject message if there isn't room to queue it */
1282 recv_q_len = skb_queue_len(&sk->sk_receive_queue); 1284 if (sk_rmem_alloc_get(sk) + buf->truesize >= limit)
1283 if (unlikely(recv_q_len >= (OVERLOAD_LIMIT_BASE / 2))) { 1285 return TIPC_ERR_OVERLOAD;
1284 if (rx_queue_full(msg, recv_q_len, OVERLOAD_LIMIT_BASE / 2))
1285 return TIPC_ERR_OVERLOAD;
1286 }
1287 1286
1288 /* Enqueue message (finally!) */ 1287 /* Enqueue message */
1289 TIPC_SKB_CB(buf)->handle = 0; 1288 TIPC_SKB_CB(buf)->handle = 0;
1290 __skb_queue_tail(&sk->sk_receive_queue, buf); 1289 __skb_queue_tail(&sk->sk_receive_queue, buf);
1290 skb_set_owner_r(buf, sk);
1291 1291
1292 sk->sk_data_ready(sk, 0); 1292 sk->sk_data_ready(sk, 0);
1293 return TIPC_OK; 1293 return TIPC_OK;
@@ -1336,7 +1336,7 @@ static u32 dispatch(struct tipc_port *tport, struct sk_buff *buf)
1336 if (!sock_owned_by_user(sk)) { 1336 if (!sock_owned_by_user(sk)) {
1337 res = filter_rcv(sk, buf); 1337 res = filter_rcv(sk, buf);
1338 } else { 1338 } else {
1339 if (sk_add_backlog(sk, buf, sk->sk_rcvbuf)) 1339 if (sk_add_backlog(sk, buf, rcvbuf_limit(sk, buf)))
1340 res = TIPC_ERR_OVERLOAD; 1340 res = TIPC_ERR_OVERLOAD;
1341 else 1341 else
1342 res = TIPC_OK; 1342 res = TIPC_OK;
@@ -1570,6 +1570,7 @@ static int accept(struct socket *sock, struct socket *new_sock, int flags)
1570 } else { 1570 } else {
1571 __skb_dequeue(&sk->sk_receive_queue); 1571 __skb_dequeue(&sk->sk_receive_queue);
1572 __skb_queue_head(&new_sk->sk_receive_queue, buf); 1572 __skb_queue_head(&new_sk->sk_receive_queue, buf);
1573 skb_set_owner_r(buf, new_sk);
1573 } 1574 }
1574 release_sock(new_sk); 1575 release_sock(new_sk);
1575 1576