aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorYing Xue <ying.xue@windriver.com>2012-11-27 06:15:27 -0500
committerPaul Gortmaker <paul.gortmaker@windriver.com>2012-12-07 14:19:52 -0500
commit9da3d475874f4da49057767913af95ce01063ba3 (patch)
tree36397ebac269552b2c1223fb64740a4aab49474a
parentc008413850d1d48cc02c940280bf2dcf76160f4c (diff)
tipc: eliminate aggregate sk_receive_queue limit
As a complement to the per-socket sk_recv_queue limit, TIPC keeps a global atomic counter for the sum of sk_recv_queue sizes across all tipc sockets. When incremented, the counter is compared to an upper threshold value, and if this is reached, the message is rejected with error code TIPC_OVERLOAD. This check was originally meant to protect the node against buffer exhaustion and general CPU overload. However, all experience indicates that the feature not only is redundant on Linux, but even harmful. Users run into the limit very often, causing disturbances for their applications, while removing it seems to have no negative effects at all. We have also seen that overall performance is boosted significantly when this bottleneck is removed. Furthermore, we don't see any other network protocols maintaining such a mechanism, something strengthening our conviction that this control can be eliminated. As a result, the atomic variable tipc_queue_size is now unused and so it can be deleted. There is a getsockopt call that used to allow reading it; we retain that but just return zero for maximum compatibility. Signed-off-by: Ying Xue <ying.xue@windriver.com> Signed-off-by: Jon Maloy <jon.maloy@ericsson.com> Cc: Neil Horman <nhorman@tuxdriver.com> [PG: phase out tipc_queue_size as pointed out by Neil Horman] Signed-off-by: Paul Gortmaker <paul.gortmaker@windriver.com>
-rw-r--r--net/tipc/socket.c23
1 files changed, 4 insertions, 19 deletions
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index 1a720c86e80a..848be692cb45 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -2,7 +2,7 @@
2 * net/tipc/socket.c: TIPC socket API 2 * net/tipc/socket.c: TIPC socket API
3 * 3 *
4 * Copyright (c) 2001-2007, Ericsson AB 4 * Copyright (c) 2001-2007, Ericsson AB
5 * Copyright (c) 2004-2008, 2010-2011, Wind River Systems 5 * Copyright (c) 2004-2008, 2010-2012, Wind River Systems
6 * All rights reserved. 6 * All rights reserved.
7 * 7 *
8 * Redistribution and use in source and binary forms, with or without 8 * Redistribution and use in source and binary forms, with or without
@@ -73,8 +73,6 @@ static struct proto tipc_proto;
73 73
74static int sockets_enabled; 74static int sockets_enabled;
75 75
76static atomic_t tipc_queue_size = ATOMIC_INIT(0);
77
78/* 76/*
79 * Revised TIPC socket locking policy: 77 * Revised TIPC socket locking policy:
80 * 78 *
@@ -128,7 +126,6 @@ static atomic_t tipc_queue_size = ATOMIC_INIT(0);
128static void advance_rx_queue(struct sock *sk) 126static void advance_rx_queue(struct sock *sk)
129{ 127{
130 kfree_skb(__skb_dequeue(&sk->sk_receive_queue)); 128 kfree_skb(__skb_dequeue(&sk->sk_receive_queue));
131 atomic_dec(&tipc_queue_size);
132} 129}
133 130
134/** 131/**
@@ -140,10 +137,8 @@ static void discard_rx_queue(struct sock *sk)
140{ 137{
141 struct sk_buff *buf; 138 struct sk_buff *buf;
142 139
143 while ((buf = __skb_dequeue(&sk->sk_receive_queue))) { 140 while ((buf = __skb_dequeue(&sk->sk_receive_queue)))
144 atomic_dec(&tipc_queue_size);
145 kfree_skb(buf); 141 kfree_skb(buf);
146 }
147} 142}
148 143
149/** 144/**
@@ -155,10 +150,8 @@ static void reject_rx_queue(struct sock *sk)
155{ 150{
156 struct sk_buff *buf; 151 struct sk_buff *buf;
157 152
158 while ((buf = __skb_dequeue(&sk->sk_receive_queue))) { 153 while ((buf = __skb_dequeue(&sk->sk_receive_queue)))
159 tipc_reject_msg(buf, TIPC_ERR_NO_PORT); 154 tipc_reject_msg(buf, TIPC_ERR_NO_PORT);
160 atomic_dec(&tipc_queue_size);
161 }
162} 155}
163 156
164/** 157/**
@@ -280,7 +273,6 @@ static int release(struct socket *sock)
280 buf = __skb_dequeue(&sk->sk_receive_queue); 273 buf = __skb_dequeue(&sk->sk_receive_queue);
281 if (buf == NULL) 274 if (buf == NULL)
282 break; 275 break;
283 atomic_dec(&tipc_queue_size);
284 if (TIPC_SKB_CB(buf)->handle != 0) 276 if (TIPC_SKB_CB(buf)->handle != 0)
285 kfree_skb(buf); 277 kfree_skb(buf);
286 else { 278 else {
@@ -1241,11 +1233,6 @@ static u32 filter_rcv(struct sock *sk, struct sk_buff *buf)
1241 } 1233 }
1242 1234
1243 /* Reject message if there isn't room to queue it */ 1235 /* Reject message if there isn't room to queue it */
1244 recv_q_len = (u32)atomic_read(&tipc_queue_size);
1245 if (unlikely(recv_q_len >= OVERLOAD_LIMIT_BASE)) {
1246 if (rx_queue_full(msg, recv_q_len, OVERLOAD_LIMIT_BASE))
1247 return TIPC_ERR_OVERLOAD;
1248 }
1249 recv_q_len = skb_queue_len(&sk->sk_receive_queue); 1236 recv_q_len = skb_queue_len(&sk->sk_receive_queue);
1250 if (unlikely(recv_q_len >= (OVERLOAD_LIMIT_BASE / 2))) { 1237 if (unlikely(recv_q_len >= (OVERLOAD_LIMIT_BASE / 2))) {
1251 if (rx_queue_full(msg, recv_q_len, OVERLOAD_LIMIT_BASE / 2)) 1238 if (rx_queue_full(msg, recv_q_len, OVERLOAD_LIMIT_BASE / 2))
@@ -1254,7 +1241,6 @@ static u32 filter_rcv(struct sock *sk, struct sk_buff *buf)
1254 1241
1255 /* Enqueue message (finally!) */ 1242 /* Enqueue message (finally!) */
1256 TIPC_SKB_CB(buf)->handle = 0; 1243 TIPC_SKB_CB(buf)->handle = 0;
1257 atomic_inc(&tipc_queue_size);
1258 __skb_queue_tail(&sk->sk_receive_queue, buf); 1244 __skb_queue_tail(&sk->sk_receive_queue, buf);
1259 1245
1260 /* Initiate connection termination for an incoming 'FIN' */ 1246 /* Initiate connection termination for an incoming 'FIN' */
@@ -1578,7 +1564,6 @@ restart:
1578 /* Disconnect and send a 'FIN+' or 'FIN-' message to peer */ 1564 /* Disconnect and send a 'FIN+' or 'FIN-' message to peer */
1579 buf = __skb_dequeue(&sk->sk_receive_queue); 1565 buf = __skb_dequeue(&sk->sk_receive_queue);
1580 if (buf) { 1566 if (buf) {
1581 atomic_dec(&tipc_queue_size);
1582 if (TIPC_SKB_CB(buf)->handle != 0) { 1567 if (TIPC_SKB_CB(buf)->handle != 0) {
1583 kfree_skb(buf); 1568 kfree_skb(buf);
1584 goto restart; 1569 goto restart;
@@ -1717,7 +1702,7 @@ static int getsockopt(struct socket *sock,
1717 /* no need to set "res", since already 0 at this point */ 1702 /* no need to set "res", since already 0 at this point */
1718 break; 1703 break;
1719 case TIPC_NODE_RECVQ_DEPTH: 1704 case TIPC_NODE_RECVQ_DEPTH:
1720 value = (u32)atomic_read(&tipc_queue_size); 1705 value = 0; /* was tipc_queue_size, now obsolete */
1721 break; 1706 break;
1722 case TIPC_SOCK_RECVQ_DEPTH: 1707 case TIPC_SOCK_RECVQ_DEPTH:
1723 value = skb_queue_len(&sk->sk_receive_queue); 1708 value = skb_queue_len(&sk->sk_receive_queue);