aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDavid S. Miller <davem@davemloft.net>2013-02-18 12:22:17 -0500
committerDavid S. Miller <davem@davemloft.net>2013-02-18 12:22:17 -0500
commitc8c5b287152bfd10c27390d36df2a8131cf906f5 (patch)
treef27dd1d7a78f6ba8424a7a98338a5aba22ebb71f
parentefd9450e7e36717f24dff3bd584faa80a85231d6 (diff)
parent97f8b87e9108485a0b7070645662253561304458 (diff)
Merge branch 'tipc_net-next' of git://git.kernel.org/pub/scm/linux/kernel/git/paulg/linux
Paul Gortmaker says: ==================== Two relatively small cleanup patches here, plus a reimplementation of the patch Neil had questions about[1] in the last development cycle. Tested on today's net-next, between 32 and 64 bit x86 machines using the server/client in tipc-utils, as usual. [1] http://patchwork.ozlabs.org/patch/204507/ ==================== Signed-off-by: David S. Miller <davem@davemloft.net>
-rw-r--r--net/tipc/socket.c103
1 files changed, 44 insertions, 59 deletions
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index 9b4e4833a484..a9622b6cd916 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -43,7 +43,8 @@
43#define SS_LISTENING -1 /* socket is listening */ 43#define SS_LISTENING -1 /* socket is listening */
44#define SS_READY -2 /* socket is connectionless */ 44#define SS_READY -2 /* socket is connectionless */
45 45
46#define OVERLOAD_LIMIT_BASE 10000 46#define CONN_OVERLOAD_LIMIT ((TIPC_FLOW_CONTROL_WIN * 2 + 1) * \
47 SKB_TRUESIZE(TIPC_MAX_USER_MSG_SIZE))
47#define CONN_TIMEOUT_DEFAULT 8000 /* default connect timeout = 8s */ 48#define CONN_TIMEOUT_DEFAULT 8000 /* default connect timeout = 8s */
48 49
49struct tipc_sock { 50struct tipc_sock {
@@ -129,19 +130,6 @@ static void advance_rx_queue(struct sock *sk)
129} 130}
130 131
131/** 132/**
132 * discard_rx_queue - discard all buffers in socket receive queue
133 *
134 * Caller must hold socket lock
135 */
136static void discard_rx_queue(struct sock *sk)
137{
138 struct sk_buff *buf;
139
140 while ((buf = __skb_dequeue(&sk->sk_receive_queue)))
141 kfree_skb(buf);
142}
143
144/**
145 * reject_rx_queue - reject all buffers in socket receive queue 133 * reject_rx_queue - reject all buffers in socket receive queue
146 * 134 *
147 * Caller must hold socket lock 135 * Caller must hold socket lock
@@ -215,7 +203,6 @@ static int tipc_create(struct net *net, struct socket *sock, int protocol,
215 203
216 sock_init_data(sock, sk); 204 sock_init_data(sock, sk);
217 sk->sk_backlog_rcv = backlog_rcv; 205 sk->sk_backlog_rcv = backlog_rcv;
218 sk->sk_rcvbuf = TIPC_FLOW_CONTROL_WIN * 2 * TIPC_MAX_USER_MSG_SIZE * 2;
219 sk->sk_data_ready = tipc_data_ready; 206 sk->sk_data_ready = tipc_data_ready;
220 sk->sk_write_space = tipc_write_space; 207 sk->sk_write_space = tipc_write_space;
221 tipc_sk(sk)->p = tp_ptr; 208 tipc_sk(sk)->p = tp_ptr;
@@ -292,7 +279,7 @@ static int release(struct socket *sock)
292 res = tipc_deleteport(tport->ref); 279 res = tipc_deleteport(tport->ref);
293 280
294 /* Discard any remaining (connection-based) messages in receive queue */ 281 /* Discard any remaining (connection-based) messages in receive queue */
295 discard_rx_queue(sk); 282 __skb_queue_purge(&sk->sk_receive_queue);
296 283
297 /* Reject any messages that accumulated in backlog queue */ 284 /* Reject any messages that accumulated in backlog queue */
298 sock->state = SS_DISCONNECTING; 285 sock->state = SS_DISCONNECTING;
@@ -516,8 +503,7 @@ static int send_msg(struct kiocb *iocb, struct socket *sock,
516 if (unlikely((m->msg_namelen < sizeof(*dest)) || 503 if (unlikely((m->msg_namelen < sizeof(*dest)) ||
517 (dest->family != AF_TIPC))) 504 (dest->family != AF_TIPC)))
518 return -EINVAL; 505 return -EINVAL;
519 if ((total_len > TIPC_MAX_USER_MSG_SIZE) || 506 if (total_len > TIPC_MAX_USER_MSG_SIZE)
520 (m->msg_iovlen > (unsigned int)INT_MAX))
521 return -EMSGSIZE; 507 return -EMSGSIZE;
522 508
523 if (iocb) 509 if (iocb)
@@ -625,8 +611,7 @@ static int send_packet(struct kiocb *iocb, struct socket *sock,
625 if (unlikely(dest)) 611 if (unlikely(dest))
626 return send_msg(iocb, sock, m, total_len); 612 return send_msg(iocb, sock, m, total_len);
627 613
628 if ((total_len > TIPC_MAX_USER_MSG_SIZE) || 614 if (total_len > TIPC_MAX_USER_MSG_SIZE)
629 (m->msg_iovlen > (unsigned int)INT_MAX))
630 return -EMSGSIZE; 615 return -EMSGSIZE;
631 616
632 if (iocb) 617 if (iocb)
@@ -711,8 +696,7 @@ static int send_stream(struct kiocb *iocb, struct socket *sock,
711 goto exit; 696 goto exit;
712 } 697 }
713 698
714 if ((total_len > (unsigned int)INT_MAX) || 699 if (total_len > (unsigned int)INT_MAX) {
715 (m->msg_iovlen > (unsigned int)INT_MAX)) {
716 res = -EMSGSIZE; 700 res = -EMSGSIZE;
717 goto exit; 701 goto exit;
718 } 702 }
@@ -1155,34 +1139,6 @@ static void tipc_data_ready(struct sock *sk, int len)
1155} 1139}
1156 1140
1157/** 1141/**
1158 * rx_queue_full - determine if receive queue can accept another message
1159 * @msg: message to be added to queue
1160 * @queue_size: current size of queue
1161 * @base: nominal maximum size of queue
1162 *
1163 * Returns 1 if queue is unable to accept message, 0 otherwise
1164 */
1165static int rx_queue_full(struct tipc_msg *msg, u32 queue_size, u32 base)
1166{
1167 u32 threshold;
1168 u32 imp = msg_importance(msg);
1169
1170 if (imp == TIPC_LOW_IMPORTANCE)
1171 threshold = base;
1172 else if (imp == TIPC_MEDIUM_IMPORTANCE)
1173 threshold = base * 2;
1174 else if (imp == TIPC_HIGH_IMPORTANCE)
1175 threshold = base * 100;
1176 else
1177 return 0;
1178
1179 if (msg_connected(msg))
1180 threshold *= 4;
1181
1182 return queue_size >= threshold;
1183}
1184
1185/**
1186 * filter_connect - Handle all incoming messages for a connection-based socket 1142 * filter_connect - Handle all incoming messages for a connection-based socket
1187 * @tsock: TIPC socket 1143 * @tsock: TIPC socket
1188 * @msg: message 1144 * @msg: message
@@ -1260,6 +1216,36 @@ static u32 filter_connect(struct tipc_sock *tsock, struct sk_buff **buf)
1260} 1216}
1261 1217
1262/** 1218/**
1219 * rcvbuf_limit - get proper overload limit of socket receive queue
1220 * @sk: socket
1221 * @buf: message
1222 *
1223 * For all connection oriented messages, irrespective of importance,
1224 * the default overload value (i.e. 67MB) is set as limit.
1225 *
1226 * For all connectionless messages, by default new queue limits are
1227 * as belows:
1228 *
1229 * TIPC_LOW_IMPORTANCE (5MB)
1230 * TIPC_MEDIUM_IMPORTANCE (10MB)
1231 * TIPC_HIGH_IMPORTANCE (20MB)
1232 * TIPC_CRITICAL_IMPORTANCE (40MB)
1233 *
1234 * Returns overload limit according to corresponding message importance
1235 */
1236static unsigned int rcvbuf_limit(struct sock *sk, struct sk_buff *buf)
1237{
1238 struct tipc_msg *msg = buf_msg(buf);
1239 unsigned int limit;
1240
1241 if (msg_connected(msg))
1242 limit = CONN_OVERLOAD_LIMIT;
1243 else
1244 limit = sk->sk_rcvbuf << (msg_importance(msg) + 5);
1245 return limit;
1246}
1247
1248/**
1263 * filter_rcv - validate incoming message 1249 * filter_rcv - validate incoming message
1264 * @sk: socket 1250 * @sk: socket
1265 * @buf: message 1251 * @buf: message
@@ -1275,7 +1261,7 @@ static u32 filter_rcv(struct sock *sk, struct sk_buff *buf)
1275{ 1261{
1276 struct socket *sock = sk->sk_socket; 1262 struct socket *sock = sk->sk_socket;
1277 struct tipc_msg *msg = buf_msg(buf); 1263 struct tipc_msg *msg = buf_msg(buf);
1278 u32 recv_q_len; 1264 unsigned int limit = rcvbuf_limit(sk, buf);
1279 u32 res = TIPC_OK; 1265 u32 res = TIPC_OK;
1280 1266
1281 /* Reject message if it is wrong sort of message for socket */ 1267 /* Reject message if it is wrong sort of message for socket */
@@ -1292,15 +1278,13 @@ static u32 filter_rcv(struct sock *sk, struct sk_buff *buf)
1292 } 1278 }
1293 1279
1294 /* Reject message if there isn't room to queue it */ 1280 /* Reject message if there isn't room to queue it */
1295 recv_q_len = skb_queue_len(&sk->sk_receive_queue); 1281 if (sk_rmem_alloc_get(sk) + buf->truesize >= limit)
1296 if (unlikely(recv_q_len >= (OVERLOAD_LIMIT_BASE / 2))) { 1282 return TIPC_ERR_OVERLOAD;
1297 if (rx_queue_full(msg, recv_q_len, OVERLOAD_LIMIT_BASE / 2))
1298 return TIPC_ERR_OVERLOAD;
1299 }
1300 1283
1301 /* Enqueue message (finally!) */ 1284 /* Enqueue message */
1302 TIPC_SKB_CB(buf)->handle = 0; 1285 TIPC_SKB_CB(buf)->handle = 0;
1303 __skb_queue_tail(&sk->sk_receive_queue, buf); 1286 __skb_queue_tail(&sk->sk_receive_queue, buf);
1287 skb_set_owner_r(buf, sk);
1304 1288
1305 sk->sk_data_ready(sk, 0); 1289 sk->sk_data_ready(sk, 0);
1306 return TIPC_OK; 1290 return TIPC_OK;
@@ -1349,7 +1333,7 @@ static u32 dispatch(struct tipc_port *tport, struct sk_buff *buf)
1349 if (!sock_owned_by_user(sk)) { 1333 if (!sock_owned_by_user(sk)) {
1350 res = filter_rcv(sk, buf); 1334 res = filter_rcv(sk, buf);
1351 } else { 1335 } else {
1352 if (sk_add_backlog(sk, buf, sk->sk_rcvbuf)) 1336 if (sk_add_backlog(sk, buf, rcvbuf_limit(sk, buf)))
1353 res = TIPC_ERR_OVERLOAD; 1337 res = TIPC_ERR_OVERLOAD;
1354 else 1338 else
1355 res = TIPC_OK; 1339 res = TIPC_OK;
@@ -1583,6 +1567,7 @@ static int accept(struct socket *sock, struct socket *new_sock, int flags)
1583 } else { 1567 } else {
1584 __skb_dequeue(&sk->sk_receive_queue); 1568 __skb_dequeue(&sk->sk_receive_queue);
1585 __skb_queue_head(&new_sk->sk_receive_queue, buf); 1569 __skb_queue_head(&new_sk->sk_receive_queue, buf);
1570 skb_set_owner_r(buf, new_sk);
1586 } 1571 }
1587 release_sock(new_sk); 1572 release_sock(new_sk);
1588 1573
@@ -1637,7 +1622,7 @@ restart:
1637 case SS_DISCONNECTING: 1622 case SS_DISCONNECTING:
1638 1623
1639 /* Discard any unreceived messages */ 1624 /* Discard any unreceived messages */
1640 discard_rx_queue(sk); 1625 __skb_queue_purge(&sk->sk_receive_queue);
1641 1626
1642 /* Wake up anyone sleeping in poll */ 1627 /* Wake up anyone sleeping in poll */
1643 sk->sk_state_change(sk); 1628 sk->sk_state_change(sk);