aboutsummaryrefslogtreecommitdiffstats
path: root/net/tipc
diff options
context:
space:
mode:
authorYing Xue <ying.xue@windriver.com>2012-08-20 23:16:57 -0400
committerPaul Gortmaker <paul.gortmaker@windriver.com>2012-11-21 14:54:31 -0500
commitf288bef46443eb3a0b212c1c57b222c0497e06f6 (patch)
tree399a156cc2466cfe5526176bac8a0f7e8723a9e8 /net/tipc
parentde4594a51c904ddcd6c3a6cdd100f7c1d94d3239 (diff)
tipc: fix race/inefficiencies in poll/wait behaviour
When an application blocks at poll/select on a TIPC socket while requesting a specific event mask, both the filter_rcv() and wakeupdispatch() case will wake it up unconditionally whenever the state changes (i.e an incoming message arrives, or congestion has subsided). No mask is used. To avoid this, we populate sk->sk_data_ready and sk->sk_write_space with tipc_data_ready and tipc_write_space respectively, which makes tipc more in alignment with the rest of the networking code. These pass the exact set of possible events to the waker in fs/select.c hence avoiding waking up blocked processes unnecessarily. In doing so, we uncover another issue -- that there needs to be a memory barrier in these poll/receive callbacks, otherwise we are subject to the the same race as documented above wq_has_sleeper() [in commit a57de0b4 "net: adding memory barrier to the poll and receive callbacks"]. So we need to replace poll_wait() with sock_poll_wait() and use rcu protection for the sk->sk_wq pointer in these two new functions. Signed-off-by: Ying Xue <ying.xue@windriver.com> Signed-off-by: Paul Gortmaker <paul.gortmaker@windriver.com>
Diffstat (limited to 'net/tipc')
-rw-r--r--net/tipc/socket.c45
1 files changed, 40 insertions, 5 deletions
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index fd5f042dbff4..b5fc8ed1d1fe 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -62,6 +62,8 @@ struct tipc_sock {
62static int backlog_rcv(struct sock *sk, struct sk_buff *skb); 62static int backlog_rcv(struct sock *sk, struct sk_buff *skb);
63static u32 dispatch(struct tipc_port *tport, struct sk_buff *buf); 63static u32 dispatch(struct tipc_port *tport, struct sk_buff *buf);
64static void wakeupdispatch(struct tipc_port *tport); 64static void wakeupdispatch(struct tipc_port *tport);
65static void tipc_data_ready(struct sock *sk, int len);
66static void tipc_write_space(struct sock *sk);
65 67
66static const struct proto_ops packet_ops; 68static const struct proto_ops packet_ops;
67static const struct proto_ops stream_ops; 69static const struct proto_ops stream_ops;
@@ -221,6 +223,8 @@ static int tipc_create(struct net *net, struct socket *sock, int protocol,
221 sock_init_data(sock, sk); 223 sock_init_data(sock, sk);
222 sk->sk_backlog_rcv = backlog_rcv; 224 sk->sk_backlog_rcv = backlog_rcv;
223 sk->sk_rcvbuf = TIPC_FLOW_CONTROL_WIN * 2 * TIPC_MAX_USER_MSG_SIZE * 2; 225 sk->sk_rcvbuf = TIPC_FLOW_CONTROL_WIN * 2 * TIPC_MAX_USER_MSG_SIZE * 2;
226 sk->sk_data_ready = tipc_data_ready;
227 sk->sk_write_space = tipc_write_space;
224 tipc_sk(sk)->p = tp_ptr; 228 tipc_sk(sk)->p = tp_ptr;
225 tipc_sk(sk)->conn_timeout = CONN_TIMEOUT_DEFAULT; 229 tipc_sk(sk)->conn_timeout = CONN_TIMEOUT_DEFAULT;
226 230
@@ -435,7 +439,7 @@ static unsigned int poll(struct file *file, struct socket *sock,
435 struct sock *sk = sock->sk; 439 struct sock *sk = sock->sk;
436 u32 mask = 0; 440 u32 mask = 0;
437 441
438 poll_wait(file, sk_sleep(sk), wait); 442 sock_poll_wait(file, sk_sleep(sk), wait);
439 443
440 switch ((int)sock->state) { 444 switch ((int)sock->state) {
441 case SS_READY: 445 case SS_READY:
@@ -1126,6 +1130,39 @@ exit:
1126} 1130}
1127 1131
1128/** 1132/**
1133 * tipc_write_space - wake up thread if port congestion is released
1134 * @sk: socket
1135 */
1136static void tipc_write_space(struct sock *sk)
1137{
1138 struct socket_wq *wq;
1139
1140 rcu_read_lock();
1141 wq = rcu_dereference(sk->sk_wq);
1142 if (wq_has_sleeper(wq))
1143 wake_up_interruptible_sync_poll(&wq->wait, POLLOUT |
1144 POLLWRNORM | POLLWRBAND);
1145 rcu_read_unlock();
1146}
1147
1148/**
1149 * tipc_data_ready - wake up threads to indicate messages have been received
1150 * @sk: socket
1151 * @len: the length of messages
1152 */
1153static void tipc_data_ready(struct sock *sk, int len)
1154{
1155 struct socket_wq *wq;
1156
1157 rcu_read_lock();
1158 wq = rcu_dereference(sk->sk_wq);
1159 if (wq_has_sleeper(wq))
1160 wake_up_interruptible_sync_poll(&wq->wait, POLLIN |
1161 POLLRDNORM | POLLRDBAND);
1162 rcu_read_unlock();
1163}
1164
1165/**
1129 * rx_queue_full - determine if receive queue can accept another message 1166 * rx_queue_full - determine if receive queue can accept another message
1130 * @msg: message to be added to queue 1167 * @msg: message to be added to queue
1131 * @queue_size: current size of queue 1168 * @queue_size: current size of queue
@@ -1222,8 +1259,7 @@ static u32 filter_rcv(struct sock *sk, struct sk_buff *buf)
1222 tipc_disconnect_port(tipc_sk_port(sk)); 1259 tipc_disconnect_port(tipc_sk_port(sk));
1223 } 1260 }
1224 1261
1225 if (waitqueue_active(sk_sleep(sk))) 1262 sk->sk_data_ready(sk, 0);
1226 wake_up_interruptible(sk_sleep(sk));
1227 return TIPC_OK; 1263 return TIPC_OK;
1228} 1264}
1229 1265
@@ -1290,8 +1326,7 @@ static void wakeupdispatch(struct tipc_port *tport)
1290{ 1326{
1291 struct sock *sk = (struct sock *)tport->usr_handle; 1327 struct sock *sk = (struct sock *)tport->usr_handle;
1292 1328
1293 if (waitqueue_active(sk_sleep(sk))) 1329 sk->sk_write_space(sk);
1294 wake_up_interruptible(sk_sleep(sk));
1295} 1330}
1296 1331
1297/** 1332/**