aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAllan Stephens <allan.stephens@windriver.com>2008-04-15 03:22:02 -0400
committerDavid S. Miller <davem@davemloft.net>2008-04-15 03:22:02 -0400
commit0c3141e910eaaa0b617e2f26c69b266d1cd1f035 (patch)
tree53b9f635b8dc2bf82d49e2d29f6e07677fa4811e
parentb89741a0cc162511b4341c07e17e1bd4c8b4621d (diff)
[TIPC]: Overhaul of socket locking logic
This patch modifies TIPC's socket code to follow the same approach used by other protocols. This change eliminates the need for a mutex in the TIPC-specific portion of the socket protocol data structure -- in its place, the standard Linux socket backlog queue and associated locking routines are utilized. These changes fix a long-standing receive queue bug on SMP systems, and also enable individual read and write threads to utilize a socket without unnecessarily interfering with each other. Signed-off-by: Allan Stephens <allan.stephens@windriver.com> Signed-off-by: David S. Miller <davem@davemloft.net>
-rw-r--r--include/net/tipc/tipc_port.h6
-rw-r--r--net/tipc/port.c31
-rw-r--r--net/tipc/socket.c1002
3 files changed, 608 insertions, 431 deletions
diff --git a/include/net/tipc/tipc_port.h b/include/net/tipc/tipc_port.h
index c9b36b77a0b9..11105bcc4457 100644
--- a/include/net/tipc/tipc_port.h
+++ b/include/net/tipc/tipc_port.h
@@ -96,6 +96,12 @@ struct tipc_port *tipc_get_port(const u32 ref);
96 96
97void *tipc_get_handle(const u32 ref); 97void *tipc_get_handle(const u32 ref);
98 98
99/*
100 * The following routines require that the port be locked on entry
101 */
102
103int tipc_disconnect_port(struct tipc_port *tp_ptr);
104
99 105
100#endif 106#endif
101 107
diff --git a/net/tipc/port.c b/net/tipc/port.c
index e2646a96935d..2f5806410c64 100644
--- a/net/tipc/port.c
+++ b/net/tipc/port.c
@@ -1240,6 +1240,28 @@ exit:
1240 return res; 1240 return res;
1241} 1241}
1242 1242
1243/**
1244 * tipc_disconnect_port - disconnect port from peer
1245 *
1246 * Port must be locked.
1247 */
1248
1249int tipc_disconnect_port(struct tipc_port *tp_ptr)
1250{
1251 int res;
1252
1253 if (tp_ptr->connected) {
1254 tp_ptr->connected = 0;
1255 /* let timer expire on it's own to avoid deadlock! */
1256 tipc_nodesub_unsubscribe(
1257 &((struct port *)tp_ptr)->subscription);
1258 res = TIPC_OK;
1259 } else {
1260 res = -ENOTCONN;
1261 }
1262 return res;
1263}
1264
1243/* 1265/*
1244 * tipc_disconnect(): Disconnect port form peer. 1266 * tipc_disconnect(): Disconnect port form peer.
1245 * This is a node local operation. 1267 * This is a node local operation.
@@ -1248,17 +1270,12 @@ exit:
1248int tipc_disconnect(u32 ref) 1270int tipc_disconnect(u32 ref)
1249{ 1271{
1250 struct port *p_ptr; 1272 struct port *p_ptr;
1251 int res = -ENOTCONN; 1273 int res;
1252 1274
1253 p_ptr = tipc_port_lock(ref); 1275 p_ptr = tipc_port_lock(ref);
1254 if (!p_ptr) 1276 if (!p_ptr)
1255 return -EINVAL; 1277 return -EINVAL;
1256 if (p_ptr->publ.connected) { 1278 res = tipc_disconnect_port((struct tipc_port *)p_ptr);
1257 p_ptr->publ.connected = 0;
1258 /* let timer expire on it's own to avoid deadlock! */
1259 tipc_nodesub_unsubscribe(&p_ptr->subscription);
1260 res = TIPC_OK;
1261 }
1262 tipc_port_unlock(p_ptr); 1279 tipc_port_unlock(p_ptr);
1263 return res; 1280 return res;
1264} 1281}
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index df1960384364..05853159536a 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -43,7 +43,6 @@
43#include <linux/slab.h> 43#include <linux/slab.h>
44#include <linux/poll.h> 44#include <linux/poll.h>
45#include <linux/fcntl.h> 45#include <linux/fcntl.h>
46#include <linux/mutex.h>
47#include <asm/string.h> 46#include <asm/string.h>
48#include <asm/atomic.h> 47#include <asm/atomic.h>
49#include <net/sock.h> 48#include <net/sock.h>
@@ -64,11 +63,12 @@
64struct tipc_sock { 63struct tipc_sock {
65 struct sock sk; 64 struct sock sk;
66 struct tipc_port *p; 65 struct tipc_port *p;
67 struct mutex lock;
68}; 66};
69 67
70#define tipc_sk(sk) ((struct tipc_sock*)sk) 68#define tipc_sk(sk) ((struct tipc_sock *)(sk))
69#define tipc_sk_port(sk) ((struct tipc_port *)(tipc_sk(sk)->p))
71 70
71static int backlog_rcv(struct sock *sk, struct sk_buff *skb);
72static u32 dispatch(struct tipc_port *tport, struct sk_buff *buf); 72static u32 dispatch(struct tipc_port *tport, struct sk_buff *buf);
73static void wakeupdispatch(struct tipc_port *tport); 73static void wakeupdispatch(struct tipc_port *tport);
74 74
@@ -82,55 +82,115 @@ static int sockets_enabled = 0;
82 82
83static atomic_t tipc_queue_size = ATOMIC_INIT(0); 83static atomic_t tipc_queue_size = ATOMIC_INIT(0);
84 84
85
86/* 85/*
87 * sock_lock(): Lock a port/socket pair. lock_sock() can 86 * Revised TIPC socket locking policy:
88 * not be used here, since the same lock must protect ports 87 *
89 * with non-socket interfaces. 88 * Most socket operations take the standard socket lock when they start
90 * See net.c for description of locking policy. 89 * and hold it until they finish (or until they need to sleep). Acquiring
90 * this lock grants the owner exclusive access to the fields of the socket
91 * data structures, with the exception of the backlog queue. A few socket
92 * operations can be done without taking the socket lock because they only
93 * read socket information that never changes during the life of the socket.
94 *
95 * Socket operations may acquire the lock for the associated TIPC port if they
96 * need to perform an operation on the port. If any routine needs to acquire
97 * both the socket lock and the port lock it must take the socket lock first
98 * to avoid the risk of deadlock.
99 *
100 * The dispatcher handling incoming messages cannot grab the socket lock in
101 * the standard fashion, since invoked it runs at the BH level and cannot block.
102 * Instead, it checks to see if the socket lock is currently owned by someone,
103 * and either handles the message itself or adds it to the socket's backlog
104 * queue; in the latter case the queued message is processed once the process
105 * owning the socket lock releases it.
106 *
107 * NOTE: Releasing the socket lock while an operation is sleeping overcomes
108 * the problem of a blocked socket operation preventing any other operations
109 * from occurring. However, applications must be careful if they have
110 * multiple threads trying to send (or receive) on the same socket, as these
111 * operations might interfere with each other. For example, doing a connect
112 * and a receive at the same time might allow the receive to consume the
113 * ACK message meant for the connect. While additional work could be done
114 * to try and overcome this, it doesn't seem to be worthwhile at the present.
115 *
116 * NOTE: Releasing the socket lock while an operation is sleeping also ensures
117 * that another operation that must be performed in a non-blocking manner is
118 * not delayed for very long because the lock has already been taken.
119 *
120 * NOTE: This code assumes that certain fields of a port/socket pair are
121 * constant over its lifetime; such fields can be examined without taking
122 * the socket lock and/or port lock, and do not need to be re-read even
123 * after resuming processing after waiting. These fields include:
124 * - socket type
125 * - pointer to socket sk structure (aka tipc_sock structure)
126 * - pointer to port structure
127 * - port reference
128 */
129
130/**
131 * advance_rx_queue - discard first buffer in socket receive queue
132 *
133 * Caller must hold socket lock
91 */ 134 */
92static void sock_lock(struct tipc_sock* tsock) 135
136static void advance_rx_queue(struct sock *sk)
93{ 137{
94 spin_lock_bh(tsock->p->lock); 138 buf_discard(__skb_dequeue(&sk->sk_receive_queue));
139 atomic_dec(&tipc_queue_size);
95} 140}
96 141
97/* 142/**
98 * sock_unlock(): Unlock a port/socket pair 143 * discard_rx_queue - discard all buffers in socket receive queue
144 *
145 * Caller must hold socket lock
99 */ 146 */
100static void sock_unlock(struct tipc_sock* tsock) 147
148static void discard_rx_queue(struct sock *sk)
101{ 149{
102 spin_unlock_bh(tsock->p->lock); 150 struct sk_buff *buf;
151
152 while ((buf = __skb_dequeue(&sk->sk_receive_queue))) {
153 atomic_dec(&tipc_queue_size);
154 buf_discard(buf);
155 }
103} 156}
104 157
105/** 158/**
106 * advance_queue - discard first buffer in queue 159 * reject_rx_queue - reject all buffers in socket receive queue
107 * @tsock: TIPC socket 160 *
161 * Caller must hold socket lock
108 */ 162 */
109 163
110static void advance_queue(struct tipc_sock *tsock) 164static void reject_rx_queue(struct sock *sk)
111{ 165{
112 sock_lock(tsock); 166 struct sk_buff *buf;
113 buf_discard(skb_dequeue(&tsock->sk.sk_receive_queue)); 167
114 sock_unlock(tsock); 168 while ((buf = __skb_dequeue(&sk->sk_receive_queue))) {
115 atomic_dec(&tipc_queue_size); 169 tipc_reject_msg(buf, TIPC_ERR_NO_PORT);
170 atomic_dec(&tipc_queue_size);
171 }
116} 172}
117 173
118/** 174/**
119 * tipc_create - create a TIPC socket 175 * tipc_create - create a TIPC socket
176 * @net: network namespace (must be default network)
120 * @sock: pre-allocated socket structure 177 * @sock: pre-allocated socket structure
121 * @protocol: protocol indicator (must be 0) 178 * @protocol: protocol indicator (must be 0)
122 * 179 *
123 * This routine creates and attaches a 'struct sock' to the 'struct socket', 180 * This routine creates additional data structures used by the TIPC socket,
124 * then create and attaches a TIPC port to the 'struct sock' part. 181 * initializes them, and links them together.
125 * 182 *
126 * Returns 0 on success, errno otherwise 183 * Returns 0 on success, errno otherwise
127 */ 184 */
185
128static int tipc_create(struct net *net, struct socket *sock, int protocol) 186static int tipc_create(struct net *net, struct socket *sock, int protocol)
129{ 187{
130 struct tipc_sock *tsock; 188 const struct proto_ops *ops;
131 struct tipc_port *port; 189 socket_state state;
132 struct sock *sk; 190 struct sock *sk;
133 u32 ref; 191 u32 portref;
192
193 /* Validate arguments */
134 194
135 if (net != &init_net) 195 if (net != &init_net)
136 return -EAFNOSUPPORT; 196 return -EAFNOSUPPORT;
@@ -138,53 +198,56 @@ static int tipc_create(struct net *net, struct socket *sock, int protocol)
138 if (unlikely(protocol != 0)) 198 if (unlikely(protocol != 0))
139 return -EPROTONOSUPPORT; 199 return -EPROTONOSUPPORT;
140 200
141 ref = tipc_createport_raw(NULL, &dispatch, &wakeupdispatch, TIPC_LOW_IMPORTANCE);
142 if (unlikely(!ref))
143 return -ENOMEM;
144
145 sock->state = SS_UNCONNECTED;
146
147 switch (sock->type) { 201 switch (sock->type) {
148 case SOCK_STREAM: 202 case SOCK_STREAM:
149 sock->ops = &stream_ops; 203 ops = &stream_ops;
204 state = SS_UNCONNECTED;
150 break; 205 break;
151 case SOCK_SEQPACKET: 206 case SOCK_SEQPACKET:
152 sock->ops = &packet_ops; 207 ops = &packet_ops;
208 state = SS_UNCONNECTED;
153 break; 209 break;
154 case SOCK_DGRAM: 210 case SOCK_DGRAM:
155 tipc_set_portunreliable(ref, 1);
156 /* fall through */
157 case SOCK_RDM: 211 case SOCK_RDM:
158 tipc_set_portunreturnable(ref, 1); 212 ops = &msg_ops;
159 sock->ops = &msg_ops; 213 state = SS_READY;
160 sock->state = SS_READY;
161 break; 214 break;
162 default: 215 default:
163 tipc_deleteport(ref);
164 return -EPROTOTYPE; 216 return -EPROTOTYPE;
165 } 217 }
166 218
219 /* Allocate socket's protocol area */
220
167 sk = sk_alloc(net, AF_TIPC, GFP_KERNEL, &tipc_proto); 221 sk = sk_alloc(net, AF_TIPC, GFP_KERNEL, &tipc_proto);
168 if (!sk) { 222 if (sk == NULL)
169 tipc_deleteport(ref);
170 return -ENOMEM; 223 return -ENOMEM;
171 }
172 224
173 sock_init_data(sock, sk); 225 /* Allocate TIPC port for socket to use */
174 sk->sk_rcvtimeo = msecs_to_jiffies(CONN_TIMEOUT_DEFAULT);
175 226
176 tsock = tipc_sk(sk); 227 portref = tipc_createport_raw(sk, &dispatch, &wakeupdispatch,
177 port = tipc_get_port(ref); 228 TIPC_LOW_IMPORTANCE);
229 if (unlikely(portref == 0)) {
230 sk_free(sk);
231 return -ENOMEM;
232 }
178 233
179 tsock->p = port; 234 /* Finish initializing socket data structures */
180 port->usr_handle = tsock;
181 235
182 mutex_init(&tsock->lock); 236 sock->ops = ops;
237 sock->state = state;
183 238
184 dbg("sock_create: %x\n",tsock); 239 sock_init_data(sock, sk);
240 sk->sk_rcvtimeo = msecs_to_jiffies(CONN_TIMEOUT_DEFAULT);
241 sk->sk_backlog_rcv = backlog_rcv;
242 tipc_sk(sk)->p = tipc_get_port(portref);
185 243
186 atomic_inc(&tipc_user_count); 244 if (sock->state == SS_READY) {
245 tipc_set_portunreturnable(portref, 1);
246 if (sock->type == SOCK_DGRAM)
247 tipc_set_portunreliable(portref, 1);
248 }
187 249
250 atomic_inc(&tipc_user_count);
188 return 0; 251 return 0;
189} 252}
190 253
@@ -207,52 +270,62 @@ static int tipc_create(struct net *net, struct socket *sock, int protocol)
207 270
208static int release(struct socket *sock) 271static int release(struct socket *sock)
209{ 272{
210 struct tipc_sock *tsock = tipc_sk(sock->sk);
211 struct sock *sk = sock->sk; 273 struct sock *sk = sock->sk;
212 int res = TIPC_OK; 274 struct tipc_port *tport;
213 struct sk_buff *buf; 275 struct sk_buff *buf;
276 int res;
214 277
215 dbg("sock_delete: %x\n",tsock); 278 /*
216 if (!tsock) 279 * Exit if socket isn't fully initialized (occurs when a failed accept()
217 return 0; 280 * releases a pre-allocated child socket that was never used)
218 mutex_lock(&tsock->lock); 281 */
219 if (!sock->sk) { 282
220 mutex_unlock(&tsock->lock); 283 if (sk == NULL)
221 return 0; 284 return 0;
222 }
223 285
224 /* Reject unreceived messages, unless no longer connected */ 286 tport = tipc_sk_port(sk);
287 lock_sock(sk);
288
289 /*
290 * Reject all unreceived messages, except on an active connection
291 * (which disconnects locally & sends a 'FIN+' to peer)
292 */
225 293
226 while (sock->state != SS_DISCONNECTING) { 294 while (sock->state != SS_DISCONNECTING) {
227 sock_lock(tsock); 295 buf = __skb_dequeue(&sk->sk_receive_queue);
228 buf = skb_dequeue(&sk->sk_receive_queue); 296 if (buf == NULL)
229 if (!buf)
230 tsock->p->usr_handle = NULL;
231 sock_unlock(tsock);
232 if (!buf)
233 break; 297 break;
298 atomic_dec(&tipc_queue_size);
234 if (TIPC_SKB_CB(buf)->handle != msg_data(buf_msg(buf))) 299 if (TIPC_SKB_CB(buf)->handle != msg_data(buf_msg(buf)))
235 buf_discard(buf); 300 buf_discard(buf);
236 else 301 else {
302 if ((sock->state == SS_CONNECTING) ||
303 (sock->state == SS_CONNECTED)) {
304 sock->state = SS_DISCONNECTING;
305 tipc_disconnect(tport->ref);
306 }
237 tipc_reject_msg(buf, TIPC_ERR_NO_PORT); 307 tipc_reject_msg(buf, TIPC_ERR_NO_PORT);
238 atomic_dec(&tipc_queue_size); 308 }
239 } 309 }
240 310
241 /* Delete TIPC port */ 311 /*
312 * Delete TIPC port; this ensures no more messages are queued
313 * (also disconnects an active connection & sends a 'FIN-' to peer)
314 */
242 315
243 res = tipc_deleteport(tsock->p->ref); 316 res = tipc_deleteport(tport->ref);
244 sock->sk = NULL;
245 317
246 /* Discard any remaining messages */ 318 /* Discard any remaining (connection-based) messages in receive queue */
247 319
248 while ((buf = skb_dequeue(&sk->sk_receive_queue))) { 320 discard_rx_queue(sk);
249 buf_discard(buf);
250 atomic_dec(&tipc_queue_size);
251 }
252 321
253 mutex_unlock(&tsock->lock); 322 /* Reject any messages that accumulated in backlog queue */
323
324 sock->state = SS_DISCONNECTING;
325 release_sock(sk);
254 326
255 sock_put(sk); 327 sock_put(sk);
328 sock->sk = NULL;
256 329
257 atomic_dec(&tipc_user_count); 330 atomic_dec(&tipc_user_count);
258 return res; 331 return res;
@@ -269,47 +342,32 @@ static int release(struct socket *sock)
269 * (i.e. a socket address length of 0) unbinds all names from the socket. 342 * (i.e. a socket address length of 0) unbinds all names from the socket.
270 * 343 *
271 * Returns 0 on success, errno otherwise 344 * Returns 0 on success, errno otherwise
345 *
346 * NOTE: This routine doesn't need to take the socket lock since it doesn't
347 * access any non-constant socket information.
272 */ 348 */
273 349
274static int bind(struct socket *sock, struct sockaddr *uaddr, int uaddr_len) 350static int bind(struct socket *sock, struct sockaddr *uaddr, int uaddr_len)
275{ 351{
276 struct tipc_sock *tsock = tipc_sk(sock->sk);
277 struct sockaddr_tipc *addr = (struct sockaddr_tipc *)uaddr; 352 struct sockaddr_tipc *addr = (struct sockaddr_tipc *)uaddr;
278 int res; 353 u32 portref = tipc_sk_port(sock->sk)->ref;
279 354
280 if (mutex_lock_interruptible(&tsock->lock)) 355 if (unlikely(!uaddr_len))
281 return -ERESTARTSYS; 356 return tipc_withdraw(portref, 0, NULL);
282 357
283 if (unlikely(!uaddr_len)) { 358 if (uaddr_len < sizeof(struct sockaddr_tipc))
284 res = tipc_withdraw(tsock->p->ref, 0, NULL); 359 return -EINVAL;
285 goto exit; 360 if (addr->family != AF_TIPC)
286 } 361 return -EAFNOSUPPORT;
287
288 if (uaddr_len < sizeof(struct sockaddr_tipc)) {
289 res = -EINVAL;
290 goto exit;
291 }
292 362
293 if (addr->family != AF_TIPC) {
294 res = -EAFNOSUPPORT;
295 goto exit;
296 }
297 if (addr->addrtype == TIPC_ADDR_NAME) 363 if (addr->addrtype == TIPC_ADDR_NAME)
298 addr->addr.nameseq.upper = addr->addr.nameseq.lower; 364 addr->addr.nameseq.upper = addr->addr.nameseq.lower;
299 else if (addr->addrtype != TIPC_ADDR_NAMESEQ) { 365 else if (addr->addrtype != TIPC_ADDR_NAMESEQ)
300 res = -EAFNOSUPPORT; 366 return -EAFNOSUPPORT;
301 goto exit;
302 }
303 367
304 if (addr->scope > 0) 368 return (addr->scope > 0) ?
305 res = tipc_publish(tsock->p->ref, addr->scope, 369 tipc_publish(portref, addr->scope, &addr->addr.nameseq) :
306 &addr->addr.nameseq); 370 tipc_withdraw(portref, -addr->scope, &addr->addr.nameseq);
307 else
308 res = tipc_withdraw(tsock->p->ref, -addr->scope,
309 &addr->addr.nameseq);
310exit:
311 mutex_unlock(&tsock->lock);
312 return res;
313} 371}
314 372
315/** 373/**
@@ -320,30 +378,33 @@ exit:
320 * @peer: 0 to obtain socket name, 1 to obtain peer socket name 378 * @peer: 0 to obtain socket name, 1 to obtain peer socket name
321 * 379 *
322 * Returns 0 on success, errno otherwise 380 * Returns 0 on success, errno otherwise
381 *
382 * NOTE: This routine doesn't need to take the socket lock since it doesn't
383 * access any non-constant socket information.
323 */ 384 */
324 385
325static int get_name(struct socket *sock, struct sockaddr *uaddr, 386static int get_name(struct socket *sock, struct sockaddr *uaddr,
326 int *uaddr_len, int peer) 387 int *uaddr_len, int peer)
327{ 388{
328 struct tipc_sock *tsock = tipc_sk(sock->sk);
329 struct sockaddr_tipc *addr = (struct sockaddr_tipc *)uaddr; 389 struct sockaddr_tipc *addr = (struct sockaddr_tipc *)uaddr;
390 u32 portref = tipc_sk_port(sock->sk)->ref;
330 u32 res; 391 u32 res;
331 392
332 if (mutex_lock_interruptible(&tsock->lock)) 393 if (peer) {
333 return -ERESTARTSYS; 394 res = tipc_peer(portref, &addr->addr.id);
395 if (res)
396 return res;
397 } else {
398 tipc_ownidentity(portref, &addr->addr.id);
399 }
334 400
335 *uaddr_len = sizeof(*addr); 401 *uaddr_len = sizeof(*addr);
336 addr->addrtype = TIPC_ADDR_ID; 402 addr->addrtype = TIPC_ADDR_ID;
337 addr->family = AF_TIPC; 403 addr->family = AF_TIPC;
338 addr->scope = 0; 404 addr->scope = 0;
339 if (peer)
340 res = tipc_peer(tsock->p->ref, &addr->addr.id);
341 else
342 res = tipc_ownidentity(tsock->p->ref, &addr->addr.id);
343 addr->addr.name.domain = 0; 405 addr->addr.name.domain = 0;
344 406
345 mutex_unlock(&tsock->lock); 407 return 0;
346 return res;
347} 408}
348 409
349/** 410/**
@@ -414,7 +475,6 @@ static int dest_name_check(struct sockaddr_tipc *dest, struct msghdr *m)
414 return 0; 475 return 0;
415 if (likely(dest->addr.name.name.type == TIPC_TOP_SRV)) 476 if (likely(dest->addr.name.name.type == TIPC_TOP_SRV))
416 return 0; 477 return 0;
417
418 if (likely(dest->addr.name.name.type != TIPC_CFG_SRV)) 478 if (likely(dest->addr.name.name.type != TIPC_CFG_SRV))
419 return -EACCES; 479 return -EACCES;
420 480
@@ -428,7 +488,7 @@ static int dest_name_check(struct sockaddr_tipc *dest, struct msghdr *m)
428 488
429/** 489/**
430 * send_msg - send message in connectionless manner 490 * send_msg - send message in connectionless manner
431 * @iocb: (unused) 491 * @iocb: if NULL, indicates that socket lock is already held
432 * @sock: socket structure 492 * @sock: socket structure
433 * @m: message to send 493 * @m: message to send
434 * @total_len: length of message 494 * @total_len: length of message
@@ -444,9 +504,9 @@ static int dest_name_check(struct sockaddr_tipc *dest, struct msghdr *m)
444static int send_msg(struct kiocb *iocb, struct socket *sock, 504static int send_msg(struct kiocb *iocb, struct socket *sock,
445 struct msghdr *m, size_t total_len) 505 struct msghdr *m, size_t total_len)
446{ 506{
447 struct tipc_sock *tsock = tipc_sk(sock->sk); 507 struct sock *sk = sock->sk;
508 struct tipc_port *tport = tipc_sk_port(sk);
448 struct sockaddr_tipc *dest = (struct sockaddr_tipc *)m->msg_name; 509 struct sockaddr_tipc *dest = (struct sockaddr_tipc *)m->msg_name;
449 struct sk_buff *buf;
450 int needs_conn; 510 int needs_conn;
451 int res = -EINVAL; 511 int res = -EINVAL;
452 512
@@ -456,48 +516,46 @@ static int send_msg(struct kiocb *iocb, struct socket *sock,
456 (dest->family != AF_TIPC))) 516 (dest->family != AF_TIPC)))
457 return -EINVAL; 517 return -EINVAL;
458 518
519 if (iocb)
520 lock_sock(sk);
521
459 needs_conn = (sock->state != SS_READY); 522 needs_conn = (sock->state != SS_READY);
460 if (unlikely(needs_conn)) { 523 if (unlikely(needs_conn)) {
461 if (sock->state == SS_LISTENING) 524 if (sock->state == SS_LISTENING) {
462 return -EPIPE; 525 res = -EPIPE;
463 if (sock->state != SS_UNCONNECTED) 526 goto exit;
464 return -EISCONN; 527 }
465 if ((tsock->p->published) || 528 if (sock->state != SS_UNCONNECTED) {
466 ((sock->type == SOCK_STREAM) && (total_len != 0))) 529 res = -EISCONN;
467 return -EOPNOTSUPP; 530 goto exit;
531 }
532 if ((tport->published) ||
533 ((sock->type == SOCK_STREAM) && (total_len != 0))) {
534 res = -EOPNOTSUPP;
535 goto exit;
536 }
468 if (dest->addrtype == TIPC_ADDR_NAME) { 537 if (dest->addrtype == TIPC_ADDR_NAME) {
469 tsock->p->conn_type = dest->addr.name.name.type; 538 tport->conn_type = dest->addr.name.name.type;
470 tsock->p->conn_instance = dest->addr.name.name.instance; 539 tport->conn_instance = dest->addr.name.name.instance;
471 } 540 }
472 }
473
474 if (mutex_lock_interruptible(&tsock->lock))
475 return -ERESTARTSYS;
476
477 if (needs_conn) {
478 541
479 /* Abort any pending connection attempts (very unlikely) */ 542 /* Abort any pending connection attempts (very unlikely) */
480 543
481 while ((buf = skb_dequeue(&sock->sk->sk_receive_queue))) { 544 reject_rx_queue(sk);
482 tipc_reject_msg(buf, TIPC_ERR_NO_PORT);
483 atomic_dec(&tipc_queue_size);
484 }
485
486 sock->state = SS_CONNECTING;
487 } 545 }
488 546
489 do { 547 do {
490 if (dest->addrtype == TIPC_ADDR_NAME) { 548 if (dest->addrtype == TIPC_ADDR_NAME) {
491 if ((res = dest_name_check(dest, m))) 549 if ((res = dest_name_check(dest, m)))
492 goto exit; 550 break;
493 res = tipc_send2name(tsock->p->ref, 551 res = tipc_send2name(tport->ref,
494 &dest->addr.name.name, 552 &dest->addr.name.name,
495 dest->addr.name.domain, 553 dest->addr.name.domain,
496 m->msg_iovlen, 554 m->msg_iovlen,
497 m->msg_iov); 555 m->msg_iov);
498 } 556 }
499 else if (dest->addrtype == TIPC_ADDR_ID) { 557 else if (dest->addrtype == TIPC_ADDR_ID) {
500 res = tipc_send2port(tsock->p->ref, 558 res = tipc_send2port(tport->ref,
501 &dest->addr.id, 559 &dest->addr.id,
502 m->msg_iovlen, 560 m->msg_iovlen,
503 m->msg_iov); 561 m->msg_iov);
@@ -505,36 +563,43 @@ static int send_msg(struct kiocb *iocb, struct socket *sock,
505 else if (dest->addrtype == TIPC_ADDR_MCAST) { 563 else if (dest->addrtype == TIPC_ADDR_MCAST) {
506 if (needs_conn) { 564 if (needs_conn) {
507 res = -EOPNOTSUPP; 565 res = -EOPNOTSUPP;
508 goto exit; 566 break;
509 } 567 }
510 if ((res = dest_name_check(dest, m))) 568 if ((res = dest_name_check(dest, m)))
511 goto exit; 569 break;
512 res = tipc_multicast(tsock->p->ref, 570 res = tipc_multicast(tport->ref,
513 &dest->addr.nameseq, 571 &dest->addr.nameseq,
514 0, 572 0,
515 m->msg_iovlen, 573 m->msg_iovlen,
516 m->msg_iov); 574 m->msg_iov);
517 } 575 }
518 if (likely(res != -ELINKCONG)) { 576 if (likely(res != -ELINKCONG)) {
519exit: 577 if (needs_conn && (res >= 0)) {
520 mutex_unlock(&tsock->lock); 578 sock->state = SS_CONNECTING;
521 return res; 579 }
580 break;
522 } 581 }
523 if (m->msg_flags & MSG_DONTWAIT) { 582 if (m->msg_flags & MSG_DONTWAIT) {
524 res = -EWOULDBLOCK; 583 res = -EWOULDBLOCK;
525 goto exit; 584 break;
526 }
527 if (wait_event_interruptible(*sock->sk->sk_sleep,
528 !tsock->p->congested)) {
529 res = -ERESTARTSYS;
530 goto exit;
531 } 585 }
586 release_sock(sk);
587 res = wait_event_interruptible(*sk->sk_sleep,
588 !tport->congested);
589 lock_sock(sk);
590 if (res)
591 break;
532 } while (1); 592 } while (1);
593
594exit:
595 if (iocb)
596 release_sock(sk);
597 return res;
533} 598}
534 599
535/** 600/**
536 * send_packet - send a connection-oriented message 601 * send_packet - send a connection-oriented message
537 * @iocb: (unused) 602 * @iocb: if NULL, indicates that socket lock is already held
538 * @sock: socket structure 603 * @sock: socket structure
539 * @m: message to send 604 * @m: message to send
540 * @total_len: length of message 605 * @total_len: length of message
@@ -547,7 +612,8 @@ exit:
547static int send_packet(struct kiocb *iocb, struct socket *sock, 612static int send_packet(struct kiocb *iocb, struct socket *sock,
548 struct msghdr *m, size_t total_len) 613 struct msghdr *m, size_t total_len)
549{ 614{
550 struct tipc_sock *tsock = tipc_sk(sock->sk); 615 struct sock *sk = sock->sk;
616 struct tipc_port *tport = tipc_sk_port(sk);
551 struct sockaddr_tipc *dest = (struct sockaddr_tipc *)m->msg_name; 617 struct sockaddr_tipc *dest = (struct sockaddr_tipc *)m->msg_name;
552 int res; 618 int res;
553 619
@@ -556,9 +622,8 @@ static int send_packet(struct kiocb *iocb, struct socket *sock,
556 if (unlikely(dest)) 622 if (unlikely(dest))
557 return send_msg(iocb, sock, m, total_len); 623 return send_msg(iocb, sock, m, total_len);
558 624
559 if (mutex_lock_interruptible(&tsock->lock)) { 625 if (iocb)
560 return -ERESTARTSYS; 626 lock_sock(sk);
561 }
562 627
563 do { 628 do {
564 if (unlikely(sock->state != SS_CONNECTED)) { 629 if (unlikely(sock->state != SS_CONNECTED)) {
@@ -566,25 +631,28 @@ static int send_packet(struct kiocb *iocb, struct socket *sock,
566 res = -EPIPE; 631 res = -EPIPE;
567 else 632 else
568 res = -ENOTCONN; 633 res = -ENOTCONN;
569 goto exit; 634 break;
570 } 635 }
571 636
572 res = tipc_send(tsock->p->ref, m->msg_iovlen, m->msg_iov); 637 res = tipc_send(tport->ref, m->msg_iovlen, m->msg_iov);
573 if (likely(res != -ELINKCONG)) { 638 if (likely(res != -ELINKCONG)) {
574exit: 639 break;
575 mutex_unlock(&tsock->lock);
576 return res;
577 } 640 }
578 if (m->msg_flags & MSG_DONTWAIT) { 641 if (m->msg_flags & MSG_DONTWAIT) {
579 res = -EWOULDBLOCK; 642 res = -EWOULDBLOCK;
580 goto exit; 643 break;
581 }
582 if (wait_event_interruptible(*sock->sk->sk_sleep,
583 !tsock->p->congested)) {
584 res = -ERESTARTSYS;
585 goto exit;
586 } 644 }
645 release_sock(sk);
646 res = wait_event_interruptible(*sk->sk_sleep,
647 (!tport->congested || !tport->connected));
648 lock_sock(sk);
649 if (res)
650 break;
587 } while (1); 651 } while (1);
652
653 if (iocb)
654 release_sock(sk);
655 return res;
588} 656}
589 657
590/** 658/**
@@ -600,11 +668,11 @@ exit:
600 * or errno if no data sent 668 * or errno if no data sent
601 */ 669 */
602 670
603
604static int send_stream(struct kiocb *iocb, struct socket *sock, 671static int send_stream(struct kiocb *iocb, struct socket *sock,
605 struct msghdr *m, size_t total_len) 672 struct msghdr *m, size_t total_len)
606{ 673{
607 struct tipc_port *tport; 674 struct sock *sk = sock->sk;
675 struct tipc_port *tport = tipc_sk_port(sk);
608 struct msghdr my_msg; 676 struct msghdr my_msg;
609 struct iovec my_iov; 677 struct iovec my_iov;
610 struct iovec *curr_iov; 678 struct iovec *curr_iov;
@@ -616,19 +684,27 @@ static int send_stream(struct kiocb *iocb, struct socket *sock,
616 int bytes_sent; 684 int bytes_sent;
617 int res; 685 int res;
618 686
687 lock_sock(sk);
688
619 /* Handle special cases where there is no connection */ 689 /* Handle special cases where there is no connection */
620 690
621 if (unlikely(sock->state != SS_CONNECTED)) { 691 if (unlikely(sock->state != SS_CONNECTED)) {
622 if (sock->state == SS_UNCONNECTED) 692 if (sock->state == SS_UNCONNECTED) {
623 return send_packet(iocb, sock, m, total_len); 693 res = send_packet(NULL, sock, m, total_len);
624 else if (sock->state == SS_DISCONNECTING) 694 goto exit;
625 return -EPIPE; 695 } else if (sock->state == SS_DISCONNECTING) {
626 else 696 res = -EPIPE;
627 return -ENOTCONN; 697 goto exit;
698 } else {
699 res = -ENOTCONN;
700 goto exit;
701 }
628 } 702 }
629 703
630 if (unlikely(m->msg_name)) 704 if (unlikely(m->msg_name)) {
631 return -EISCONN; 705 res = -EISCONN;
706 goto exit;
707 }
632 708
633 /* 709 /*
634 * Send each iovec entry using one or more messages 710 * Send each iovec entry using one or more messages
@@ -646,7 +722,6 @@ static int send_stream(struct kiocb *iocb, struct socket *sock,
646 my_msg.msg_name = NULL; 722 my_msg.msg_name = NULL;
647 bytes_sent = 0; 723 bytes_sent = 0;
648 724
649 tport = tipc_sk(sock->sk)->p;
650 hdr_size = msg_hdr_sz(&tport->phdr); 725 hdr_size = msg_hdr_sz(&tport->phdr);
651 726
652 while (curr_iovlen--) { 727 while (curr_iovlen--) {
@@ -661,10 +736,10 @@ static int send_stream(struct kiocb *iocb, struct socket *sock,
661 bytes_to_send = curr_left; 736 bytes_to_send = curr_left;
662 my_iov.iov_base = curr_start; 737 my_iov.iov_base = curr_start;
663 my_iov.iov_len = bytes_to_send; 738 my_iov.iov_len = bytes_to_send;
664 if ((res = send_packet(iocb, sock, &my_msg, 0)) < 0) { 739 if ((res = send_packet(NULL, sock, &my_msg, 0)) < 0) {
665 if (bytes_sent != 0) 740 if (bytes_sent)
666 res = bytes_sent; 741 res = bytes_sent;
667 return res; 742 goto exit;
668 } 743 }
669 curr_left -= bytes_to_send; 744 curr_left -= bytes_to_send;
670 curr_start += bytes_to_send; 745 curr_start += bytes_to_send;
@@ -673,22 +748,23 @@ static int send_stream(struct kiocb *iocb, struct socket *sock,
673 748
674 curr_iov++; 749 curr_iov++;
675 } 750 }
676 751 res = bytes_sent;
677 return bytes_sent; 752exit:
753 release_sock(sk);
754 return res;
678} 755}
679 756
680/** 757/**
681 * auto_connect - complete connection setup to a remote port 758 * auto_connect - complete connection setup to a remote port
682 * @sock: socket structure 759 * @sock: socket structure
683 * @tsock: TIPC-specific socket structure
684 * @msg: peer's response message 760 * @msg: peer's response message
685 * 761 *
686 * Returns 0 on success, errno otherwise 762 * Returns 0 on success, errno otherwise
687 */ 763 */
688 764
689static int auto_connect(struct socket *sock, struct tipc_sock *tsock, 765static int auto_connect(struct socket *sock, struct tipc_msg *msg)
690 struct tipc_msg *msg)
691{ 766{
767 struct tipc_port *tport = tipc_sk_port(sock->sk);
692 struct tipc_portid peer; 768 struct tipc_portid peer;
693 769
694 if (msg_errcode(msg)) { 770 if (msg_errcode(msg)) {
@@ -698,8 +774,8 @@ static int auto_connect(struct socket *sock, struct tipc_sock *tsock,
698 774
699 peer.ref = msg_origport(msg); 775 peer.ref = msg_origport(msg);
700 peer.node = msg_orignode(msg); 776 peer.node = msg_orignode(msg);
701 tipc_connect2port(tsock->p->ref, &peer); 777 tipc_connect2port(tport->ref, &peer);
702 tipc_set_portimportance(tsock->p->ref, msg_importance(msg)); 778 tipc_set_portimportance(tport->ref, msg_importance(msg));
703 sock->state = SS_CONNECTED; 779 sock->state = SS_CONNECTED;
704 return 0; 780 return 0;
705} 781}
@@ -812,62 +888,54 @@ static int anc_data_recv(struct msghdr *m, struct tipc_msg *msg,
812static int recv_msg(struct kiocb *iocb, struct socket *sock, 888static int recv_msg(struct kiocb *iocb, struct socket *sock,
813 struct msghdr *m, size_t buf_len, int flags) 889 struct msghdr *m, size_t buf_len, int flags)
814{ 890{
815 struct tipc_sock *tsock = tipc_sk(sock->sk); 891 struct sock *sk = sock->sk;
892 struct tipc_port *tport = tipc_sk_port(sk);
816 struct sk_buff *buf; 893 struct sk_buff *buf;
817 struct tipc_msg *msg; 894 struct tipc_msg *msg;
818 unsigned int q_len;
819 unsigned int sz; 895 unsigned int sz;
820 u32 err; 896 u32 err;
821 int res; 897 int res;
822 898
823 /* Currently doesn't support receiving into multiple iovec entries */ 899 /* Catch invalid receive requests */
824 900
825 if (m->msg_iovlen != 1) 901 if (m->msg_iovlen != 1)
826 return -EOPNOTSUPP; 902 return -EOPNOTSUPP; /* Don't do multiple iovec entries yet */
827
828 /* Catch invalid receive attempts */
829 903
830 if (unlikely(!buf_len)) 904 if (unlikely(!buf_len))
831 return -EINVAL; 905 return -EINVAL;
832 906
833 if (sock->type == SOCK_SEQPACKET) { 907 lock_sock(sk);
834 if (unlikely(sock->state == SS_UNCONNECTED))
835 return -ENOTCONN;
836 if (unlikely((sock->state == SS_DISCONNECTING) &&
837 (skb_queue_len(&sock->sk->sk_receive_queue) == 0)))
838 return -ENOTCONN;
839 }
840 908
841 /* Look for a message in receive queue; wait if necessary */ 909 if (unlikely(sock->state == SS_UNCONNECTED)) {
842 910 res = -ENOTCONN;
843 if (unlikely(mutex_lock_interruptible(&tsock->lock)))
844 return -ERESTARTSYS;
845
846restart:
847 if (unlikely((skb_queue_len(&sock->sk->sk_receive_queue) == 0) &&
848 (flags & MSG_DONTWAIT))) {
849 res = -EWOULDBLOCK;
850 goto exit; 911 goto exit;
851 } 912 }
852 913
853 if ((res = wait_event_interruptible( 914restart:
854 *sock->sk->sk_sleep,
855 ((q_len = skb_queue_len(&sock->sk->sk_receive_queue)) ||
856 (sock->state == SS_DISCONNECTING))) )) {
857 goto exit;
858 }
859 915
860 /* Catch attempt to receive on an already terminated connection */ 916 /* Look for a message in receive queue; wait if necessary */
861 /* [THIS CHECK MAY OVERLAP WITH AN EARLIER CHECK] */
862 917
863 if (!q_len) { 918 while (skb_queue_empty(&sk->sk_receive_queue)) {
864 res = -ENOTCONN; 919 if (sock->state == SS_DISCONNECTING) {
865 goto exit; 920 res = -ENOTCONN;
921 goto exit;
922 }
923 if (flags & MSG_DONTWAIT) {
924 res = -EWOULDBLOCK;
925 goto exit;
926 }
927 release_sock(sk);
928 res = wait_event_interruptible(*sk->sk_sleep,
929 (!skb_queue_empty(&sk->sk_receive_queue) ||
930 (sock->state == SS_DISCONNECTING)));
931 lock_sock(sk);
932 if (res)
933 goto exit;
866 } 934 }
867 935
868 /* Get access to first message in receive queue */ 936 /* Look at first message in receive queue */
869 937
870 buf = skb_peek(&sock->sk->sk_receive_queue); 938 buf = skb_peek(&sk->sk_receive_queue);
871 msg = buf_msg(buf); 939 msg = buf_msg(buf);
872 sz = msg_data_sz(msg); 940 sz = msg_data_sz(msg);
873 err = msg_errcode(msg); 941 err = msg_errcode(msg);
@@ -875,14 +943,15 @@ restart:
875 /* Complete connection setup for an implied connect */ 943 /* Complete connection setup for an implied connect */
876 944
877 if (unlikely(sock->state == SS_CONNECTING)) { 945 if (unlikely(sock->state == SS_CONNECTING)) {
878 if ((res = auto_connect(sock, tsock, msg))) 946 res = auto_connect(sock, msg);
947 if (res)
879 goto exit; 948 goto exit;
880 } 949 }
881 950
882 /* Discard an empty non-errored message & try again */ 951 /* Discard an empty non-errored message & try again */
883 952
884 if ((!sz) && (!err)) { 953 if ((!sz) && (!err)) {
885 advance_queue(tsock); 954 advance_rx_queue(sk);
886 goto restart; 955 goto restart;
887 } 956 }
888 957
@@ -892,7 +961,8 @@ restart:
892 961
893 /* Capture ancillary data (optional) */ 962 /* Capture ancillary data (optional) */
894 963
895 if ((res = anc_data_recv(m, msg, tsock->p))) 964 res = anc_data_recv(m, msg, tport);
965 if (res)
896 goto exit; 966 goto exit;
897 967
898 /* Capture message data (if valid) & compute return value (always) */ 968 /* Capture message data (if valid) & compute return value (always) */
@@ -920,12 +990,12 @@ restart:
920 990
921 if (likely(!(flags & MSG_PEEK))) { 991 if (likely(!(flags & MSG_PEEK))) {
922 if ((sock->state != SS_READY) && 992 if ((sock->state != SS_READY) &&
923 (++tsock->p->conn_unacked >= TIPC_FLOW_CONTROL_WIN)) 993 (++tport->conn_unacked >= TIPC_FLOW_CONTROL_WIN))
924 tipc_acknowledge(tsock->p->ref, tsock->p->conn_unacked); 994 tipc_acknowledge(tport->ref, tport->conn_unacked);
925 advance_queue(tsock); 995 advance_rx_queue(sk);
926 } 996 }
927exit: 997exit:
928 mutex_unlock(&tsock->lock); 998 release_sock(sk);
929 return res; 999 return res;
930} 1000}
931 1001
@@ -945,10 +1015,10 @@ exit:
945static int recv_stream(struct kiocb *iocb, struct socket *sock, 1015static int recv_stream(struct kiocb *iocb, struct socket *sock,
946 struct msghdr *m, size_t buf_len, int flags) 1016 struct msghdr *m, size_t buf_len, int flags)
947{ 1017{
948 struct tipc_sock *tsock = tipc_sk(sock->sk); 1018 struct sock *sk = sock->sk;
1019 struct tipc_port *tport = tipc_sk_port(sk);
949 struct sk_buff *buf; 1020 struct sk_buff *buf;
950 struct tipc_msg *msg; 1021 struct tipc_msg *msg;
951 unsigned int q_len;
952 unsigned int sz; 1022 unsigned int sz;
953 int sz_to_copy; 1023 int sz_to_copy;
954 int sz_copied = 0; 1024 int sz_copied = 0;
@@ -956,54 +1026,49 @@ static int recv_stream(struct kiocb *iocb, struct socket *sock,
956 char __user *crs = m->msg_iov->iov_base; 1026 char __user *crs = m->msg_iov->iov_base;
957 unsigned char *buf_crs; 1027 unsigned char *buf_crs;
958 u32 err; 1028 u32 err;
959 int res; 1029 int res = 0;
960 1030
961 /* Currently doesn't support receiving into multiple iovec entries */ 1031 /* Catch invalid receive attempts */
962 1032
963 if (m->msg_iovlen != 1) 1033 if (m->msg_iovlen != 1)
964 return -EOPNOTSUPP; 1034 return -EOPNOTSUPP; /* Don't do multiple iovec entries yet */
965
966 /* Catch invalid receive attempts */
967 1035
968 if (unlikely(!buf_len)) 1036 if (unlikely(!buf_len))
969 return -EINVAL; 1037 return -EINVAL;
970 1038
971 if (unlikely(sock->state == SS_DISCONNECTING)) { 1039 lock_sock(sk);
972 if (skb_queue_len(&sock->sk->sk_receive_queue) == 0)
973 return -ENOTCONN;
974 } else if (unlikely(sock->state != SS_CONNECTED))
975 return -ENOTCONN;
976
977 /* Look for a message in receive queue; wait if necessary */
978
979 if (unlikely(mutex_lock_interruptible(&tsock->lock)))
980 return -ERESTARTSYS;
981 1040
982restart: 1041 if (unlikely((sock->state == SS_UNCONNECTED) ||
983 if (unlikely((skb_queue_len(&sock->sk->sk_receive_queue) == 0) && 1042 (sock->state == SS_CONNECTING))) {
984 (flags & MSG_DONTWAIT))) { 1043 res = -ENOTCONN;
985 res = -EWOULDBLOCK;
986 goto exit; 1044 goto exit;
987 } 1045 }
988 1046
989 if ((res = wait_event_interruptible( 1047restart:
990 *sock->sk->sk_sleep,
991 ((q_len = skb_queue_len(&sock->sk->sk_receive_queue)) ||
992 (sock->state == SS_DISCONNECTING))) )) {
993 goto exit;
994 }
995 1048
996 /* Catch attempt to receive on an already terminated connection */ 1049 /* Look for a message in receive queue; wait if necessary */
997 /* [THIS CHECK MAY OVERLAP WITH AN EARLIER CHECK] */
998 1050
999 if (!q_len) { 1051 while (skb_queue_empty(&sk->sk_receive_queue)) {
1000 res = -ENOTCONN; 1052 if (sock->state == SS_DISCONNECTING) {
1001 goto exit; 1053 res = -ENOTCONN;
1054 goto exit;
1055 }
1056 if (flags & MSG_DONTWAIT) {
1057 res = -EWOULDBLOCK;
1058 goto exit;
1059 }
1060 release_sock(sk);
1061 res = wait_event_interruptible(*sk->sk_sleep,
1062 (!skb_queue_empty(&sk->sk_receive_queue) ||
1063 (sock->state == SS_DISCONNECTING)));
1064 lock_sock(sk);
1065 if (res)
1066 goto exit;
1002 } 1067 }
1003 1068
1004 /* Get access to first message in receive queue */ 1069 /* Look at first message in receive queue */
1005 1070
1006 buf = skb_peek(&sock->sk->sk_receive_queue); 1071 buf = skb_peek(&sk->sk_receive_queue);
1007 msg = buf_msg(buf); 1072 msg = buf_msg(buf);
1008 sz = msg_data_sz(msg); 1073 sz = msg_data_sz(msg);
1009 err = msg_errcode(msg); 1074 err = msg_errcode(msg);
@@ -1011,7 +1076,7 @@ restart:
1011 /* Discard an empty non-errored message & try again */ 1076 /* Discard an empty non-errored message & try again */
1012 1077
1013 if ((!sz) && (!err)) { 1078 if ((!sz) && (!err)) {
1014 advance_queue(tsock); 1079 advance_rx_queue(sk);
1015 goto restart; 1080 goto restart;
1016 } 1081 }
1017 1082
@@ -1019,7 +1084,8 @@ restart:
1019 1084
1020 if (sz_copied == 0) { 1085 if (sz_copied == 0) {
1021 set_orig_addr(m, msg); 1086 set_orig_addr(m, msg);
1022 if ((res = anc_data_recv(m, msg, tsock->p))) 1087 res = anc_data_recv(m, msg, tport);
1088 if (res)
1023 goto exit; 1089 goto exit;
1024 } 1090 }
1025 1091
@@ -1057,9 +1123,9 @@ restart:
1057 /* Consume received message (optional) */ 1123 /* Consume received message (optional) */
1058 1124
1059 if (likely(!(flags & MSG_PEEK))) { 1125 if (likely(!(flags & MSG_PEEK))) {
1060 if (unlikely(++tsock->p->conn_unacked >= TIPC_FLOW_CONTROL_WIN)) 1126 if (unlikely(++tport->conn_unacked >= TIPC_FLOW_CONTROL_WIN))
1061 tipc_acknowledge(tsock->p->ref, tsock->p->conn_unacked); 1127 tipc_acknowledge(tport->ref, tport->conn_unacked);
1062 advance_queue(tsock); 1128 advance_rx_queue(sk);
1063 } 1129 }
1064 1130
1065 /* Loop around if more data is required */ 1131 /* Loop around if more data is required */
@@ -1074,7 +1140,7 @@ restart:
1074 goto restart; 1140 goto restart;
1075 1141
1076exit: 1142exit:
1077 mutex_unlock(&tsock->lock); 1143 release_sock(sk);
1078 return sz_copied ? sz_copied : res; 1144 return sz_copied ? sz_copied : res;
1079} 1145}
1080 1146
@@ -1108,37 +1174,24 @@ static int rx_queue_full(struct tipc_msg *msg, u32 queue_size, u32 base)
1108} 1174}
1109 1175
1110/** 1176/**
1111 * async_disconnect - wrapper function used to disconnect port 1177 * filter_rcv - validate incoming message
1112 * @portref: TIPC port reference (passed as pointer-sized value) 1178 * @sk: socket
1113 */
1114
1115static void async_disconnect(unsigned long portref)
1116{
1117 tipc_disconnect((u32)portref);
1118}
1119
1120/**
1121 * dispatch - handle arriving message
1122 * @tport: TIPC port that received message
1123 * @buf: message 1179 * @buf: message
1124 * 1180 *
1125 * Called with port locked. Must not take socket lock to avoid deadlock risk. 1181 * Enqueues message on receive queue if acceptable; optionally handles
1182 * disconnect indication for a connected socket.
1183 *
1184 * Called with socket lock already taken; port lock may also be taken.
1126 * 1185 *
1127 * Returns TIPC error status code (TIPC_OK if message is not to be rejected) 1186 * Returns TIPC error status code (TIPC_OK if message is not to be rejected)
1128 */ 1187 */
1129 1188
1130static u32 dispatch(struct tipc_port *tport, struct sk_buff *buf) 1189static u32 filter_rcv(struct sock *sk, struct sk_buff *buf)
1131{ 1190{
1191 struct socket *sock = sk->sk_socket;
1132 struct tipc_msg *msg = buf_msg(buf); 1192 struct tipc_msg *msg = buf_msg(buf);
1133 struct tipc_sock *tsock = (struct tipc_sock *)tport->usr_handle;
1134 struct socket *sock;
1135 u32 recv_q_len; 1193 u32 recv_q_len;
1136 1194
1137 /* Reject message if socket is closing */
1138
1139 if (!tsock)
1140 return TIPC_ERR_NO_PORT;
1141
1142 /* Reject message if it is wrong sort of message for socket */ 1195 /* Reject message if it is wrong sort of message for socket */
1143 1196
1144 /* 1197 /*
@@ -1146,7 +1199,7 @@ static u32 dispatch(struct tipc_port *tport, struct sk_buff *buf)
1146 * "NO PORT" ISN'T REALLY THE RIGHT ERROR CODE, AND THERE MAY 1199 * "NO PORT" ISN'T REALLY THE RIGHT ERROR CODE, AND THERE MAY
1147 * BE SECURITY IMPLICATIONS INHERENT IN REJECTING INVALID TRAFFIC 1200 * BE SECURITY IMPLICATIONS INHERENT IN REJECTING INVALID TRAFFIC
1148 */ 1201 */
1149 sock = tsock->sk.sk_socket; 1202
1150 if (sock->state == SS_READY) { 1203 if (sock->state == SS_READY) {
1151 if (msg_connected(msg)) { 1204 if (msg_connected(msg)) {
1152 msg_dbg(msg, "dispatch filter 1\n"); 1205 msg_dbg(msg, "dispatch filter 1\n");
@@ -1194,45 +1247,98 @@ static u32 dispatch(struct tipc_port *tport, struct sk_buff *buf)
1194 if (rx_queue_full(msg, recv_q_len, OVERLOAD_LIMIT_BASE)) 1247 if (rx_queue_full(msg, recv_q_len, OVERLOAD_LIMIT_BASE))
1195 return TIPC_ERR_OVERLOAD; 1248 return TIPC_ERR_OVERLOAD;
1196 } 1249 }
1197 recv_q_len = skb_queue_len(&tsock->sk.sk_receive_queue); 1250 recv_q_len = skb_queue_len(&sk->sk_receive_queue);
1198 if (unlikely(recv_q_len >= (OVERLOAD_LIMIT_BASE / 2))) { 1251 if (unlikely(recv_q_len >= (OVERLOAD_LIMIT_BASE / 2))) {
1199 if (rx_queue_full(msg, recv_q_len, OVERLOAD_LIMIT_BASE / 2)) 1252 if (rx_queue_full(msg, recv_q_len, OVERLOAD_LIMIT_BASE / 2))
1200 return TIPC_ERR_OVERLOAD; 1253 return TIPC_ERR_OVERLOAD;
1201 } 1254 }
1202 1255
1256 /* Enqueue message (finally!) */
1257
1258 msg_dbg(msg, "<DISP<: ");
1259 TIPC_SKB_CB(buf)->handle = msg_data(msg);
1260 atomic_inc(&tipc_queue_size);
1261 __skb_queue_tail(&sk->sk_receive_queue, buf);
1262
1203 /* Initiate connection termination for an incoming 'FIN' */ 1263 /* Initiate connection termination for an incoming 'FIN' */
1204 1264
1205 if (unlikely(msg_errcode(msg) && (sock->state == SS_CONNECTED))) { 1265 if (unlikely(msg_errcode(msg) && (sock->state == SS_CONNECTED))) {
1206 sock->state = SS_DISCONNECTING; 1266 sock->state = SS_DISCONNECTING;
1207 /* Note: Use signal since port lock is already taken! */ 1267 tipc_disconnect_port(tipc_sk_port(sk));
1208 tipc_k_signal((Handler)async_disconnect, tport->ref);
1209 } 1268 }
1210 1269
1211 /* Enqueue message (finally!) */ 1270 if (waitqueue_active(sk->sk_sleep))
1271 wake_up_interruptible(sk->sk_sleep);
1272 return TIPC_OK;
1273}
1212 1274
1213 msg_dbg(msg,"<DISP<: "); 1275/**
1214 TIPC_SKB_CB(buf)->handle = msg_data(msg); 1276 * backlog_rcv - handle incoming message from backlog queue
1215 atomic_inc(&tipc_queue_size); 1277 * @sk: socket
1216 skb_queue_tail(&sock->sk->sk_receive_queue, buf); 1278 * @buf: message
1279 *
1280 * Caller must hold socket lock, but not port lock.
1281 *
1282 * Returns 0
1283 */
1217 1284
1218 if (waitqueue_active(sock->sk->sk_sleep)) 1285static int backlog_rcv(struct sock *sk, struct sk_buff *buf)
1219 wake_up_interruptible(sock->sk->sk_sleep); 1286{
1220 return TIPC_OK; 1287 u32 res;
1288
1289 res = filter_rcv(sk, buf);
1290 if (res)
1291 tipc_reject_msg(buf, res);
1292 return 0;
1293}
1294
1295/**
1296 * dispatch - handle incoming message
1297 * @tport: TIPC port that received message
1298 * @buf: message
1299 *
1300 * Called with port lock already taken.
1301 *
1302 * Returns TIPC error status code (TIPC_OK if message is not to be rejected)
1303 */
1304
1305static u32 dispatch(struct tipc_port *tport, struct sk_buff *buf)
1306{
1307 struct sock *sk = (struct sock *)tport->usr_handle;
1308 u32 res;
1309
1310 /*
1311 * Process message if socket is unlocked; otherwise add to backlog queue
1312 *
1313 * This code is based on sk_receive_skb(), but must be distinct from it
1314 * since a TIPC-specific filter/reject mechanism is utilized
1315 */
1316
1317 bh_lock_sock(sk);
1318 if (!sock_owned_by_user(sk)) {
1319 res = filter_rcv(sk, buf);
1320 } else {
1321 sk_add_backlog(sk, buf);
1322 res = TIPC_OK;
1323 }
1324 bh_unlock_sock(sk);
1325
1326 return res;
1221} 1327}
1222 1328
1223/** 1329/**
1224 * wakeupdispatch - wake up port after congestion 1330 * wakeupdispatch - wake up port after congestion
1225 * @tport: port to wakeup 1331 * @tport: port to wakeup
1226 * 1332 *
1227 * Called with port lock on. 1333 * Called with port lock already taken.
1228 */ 1334 */
1229 1335
1230static void wakeupdispatch(struct tipc_port *tport) 1336static void wakeupdispatch(struct tipc_port *tport)
1231{ 1337{
1232 struct tipc_sock *tsock = (struct tipc_sock *)tport->usr_handle; 1338 struct sock *sk = (struct sock *)tport->usr_handle;
1233 1339
1234 if (waitqueue_active(tsock->sk.sk_sleep)) 1340 if (waitqueue_active(sk->sk_sleep))
1235 wake_up_interruptible(tsock->sk.sk_sleep); 1341 wake_up_interruptible(sk->sk_sleep);
1236} 1342}
1237 1343
1238/** 1344/**
@@ -1240,7 +1346,7 @@ static void wakeupdispatch(struct tipc_port *tport)
1240 * @sock: socket structure 1346 * @sock: socket structure
1241 * @dest: socket address for destination port 1347 * @dest: socket address for destination port
1242 * @destlen: size of socket address data structure 1348 * @destlen: size of socket address data structure
1243 * @flags: (unused) 1349 * @flags: file-related flags associated with socket
1244 * 1350 *
1245 * Returns 0 on success, errno otherwise 1351 * Returns 0 on success, errno otherwise
1246 */ 1352 */
@@ -1248,31 +1354,43 @@ static void wakeupdispatch(struct tipc_port *tport)
1248static int connect(struct socket *sock, struct sockaddr *dest, int destlen, 1354static int connect(struct socket *sock, struct sockaddr *dest, int destlen,
1249 int flags) 1355 int flags)
1250{ 1356{
1251 struct tipc_sock *tsock = tipc_sk(sock->sk); 1357 struct sock *sk = sock->sk;
1252 struct sockaddr_tipc *dst = (struct sockaddr_tipc *)dest; 1358 struct sockaddr_tipc *dst = (struct sockaddr_tipc *)dest;
1253 struct msghdr m = {NULL,}; 1359 struct msghdr m = {NULL,};
1254 struct sk_buff *buf; 1360 struct sk_buff *buf;
1255 struct tipc_msg *msg; 1361 struct tipc_msg *msg;
1256 int res; 1362 int res;
1257 1363
1364 lock_sock(sk);
1365
1258 /* For now, TIPC does not allow use of connect() with DGRAM/RDM types */ 1366 /* For now, TIPC does not allow use of connect() with DGRAM/RDM types */
1259 1367
1260 if (sock->state == SS_READY) 1368 if (sock->state == SS_READY) {
1261 return -EOPNOTSUPP; 1369 res = -EOPNOTSUPP;
1370 goto exit;
1371 }
1262 1372
1263 /* For now, TIPC does not support the non-blocking form of connect() */ 1373 /* For now, TIPC does not support the non-blocking form of connect() */
1264 1374
1265 if (flags & O_NONBLOCK) 1375 if (flags & O_NONBLOCK) {
1266 return -EWOULDBLOCK; 1376 res = -EWOULDBLOCK;
1377 goto exit;
1378 }
1267 1379
1268 /* Issue Posix-compliant error code if socket is in the wrong state */ 1380 /* Issue Posix-compliant error code if socket is in the wrong state */
1269 1381
1270 if (sock->state == SS_LISTENING) 1382 if (sock->state == SS_LISTENING) {
1271 return -EOPNOTSUPP; 1383 res = -EOPNOTSUPP;
1272 if (sock->state == SS_CONNECTING) 1384 goto exit;
1273 return -EALREADY; 1385 }
1274 if (sock->state != SS_UNCONNECTED) 1386 if (sock->state == SS_CONNECTING) {
1275 return -EISCONN; 1387 res = -EALREADY;
1388 goto exit;
1389 }
1390 if (sock->state != SS_UNCONNECTED) {
1391 res = -EISCONN;
1392 goto exit;
1393 }
1276 1394
1277 /* 1395 /*
1278 * Reject connection attempt using multicast address 1396 * Reject connection attempt using multicast address
@@ -1281,8 +1399,14 @@ static int connect(struct socket *sock, struct sockaddr *dest, int destlen,
1281 * so there's no need to do it here 1399 * so there's no need to do it here
1282 */ 1400 */
1283 1401
1284 if (dst->addrtype == TIPC_ADDR_MCAST) 1402 if (dst->addrtype == TIPC_ADDR_MCAST) {
1285 return -EINVAL; 1403 res = -EINVAL;
1404 goto exit;
1405 }
1406
1407 /* Reject any messages already in receive queue (very unlikely) */
1408
1409 reject_rx_queue(sk);
1286 1410
1287 /* Send a 'SYN-' to destination */ 1411 /* Send a 'SYN-' to destination */
1288 1412
@@ -1290,25 +1414,33 @@ static int connect(struct socket *sock, struct sockaddr *dest, int destlen,
1290 m.msg_namelen = destlen; 1414 m.msg_namelen = destlen;
1291 res = send_msg(NULL, sock, &m, 0); 1415 res = send_msg(NULL, sock, &m, 0);
1292 if (res < 0) { 1416 if (res < 0) {
1293 sock->state = SS_DISCONNECTING; 1417 goto exit;
1294 return res;
1295 } 1418 }
1296 1419
1297 if (mutex_lock_interruptible(&tsock->lock)) 1420 /* Wait until an 'ACK' or 'RST' arrives, or a timeout occurs */
1298 return -ERESTARTSYS;
1299 1421
1300 /* Wait for destination's 'ACK' response */ 1422 release_sock(sk);
1423 res = wait_event_interruptible_timeout(*sk->sk_sleep,
1424 (!skb_queue_empty(&sk->sk_receive_queue) ||
1425 (sock->state != SS_CONNECTING)),
1426 sk->sk_rcvtimeo);
1427 lock_sock(sk);
1301 1428
1302 res = wait_event_interruptible_timeout(*sock->sk->sk_sleep,
1303 skb_queue_len(&sock->sk->sk_receive_queue),
1304 sock->sk->sk_rcvtimeo);
1305 buf = skb_peek(&sock->sk->sk_receive_queue);
1306 if (res > 0) { 1429 if (res > 0) {
1307 msg = buf_msg(buf); 1430 buf = skb_peek(&sk->sk_receive_queue);
1308 res = auto_connect(sock, tsock, msg); 1431 if (buf != NULL) {
1309 if (!res) { 1432 msg = buf_msg(buf);
1310 if (!msg_data_sz(msg)) 1433 res = auto_connect(sock, msg);
1311 advance_queue(tsock); 1434 if (!res) {
1435 if (!msg_data_sz(msg))
1436 advance_rx_queue(sk);
1437 }
1438 } else {
1439 if (sock->state == SS_CONNECTED) {
1440 res = -EISCONN;
1441 } else {
1442 res = -ECONNREFUSED;
1443 }
1312 } 1444 }
1313 } else { 1445 } else {
1314 if (res == 0) 1446 if (res == 0)
@@ -1318,7 +1450,8 @@ static int connect(struct socket *sock, struct sockaddr *dest, int destlen,
1318 sock->state = SS_DISCONNECTING; 1450 sock->state = SS_DISCONNECTING;
1319 } 1451 }
1320 1452
1321 mutex_unlock(&tsock->lock); 1453exit:
1454 release_sock(sk);
1322 return res; 1455 return res;
1323} 1456}
1324 1457
@@ -1332,14 +1465,22 @@ static int connect(struct socket *sock, struct sockaddr *dest, int destlen,
1332 1465
1333static int listen(struct socket *sock, int len) 1466static int listen(struct socket *sock, int len)
1334{ 1467{
1335 /* REQUIRES SOCKET LOCKING OF SOME SORT? */ 1468 struct sock *sk = sock->sk;
1469 int res;
1470
1471 lock_sock(sk);
1336 1472
1337 if (sock->state == SS_READY) 1473 if (sock->state == SS_READY)
1338 return -EOPNOTSUPP; 1474 res = -EOPNOTSUPP;
1339 if (sock->state != SS_UNCONNECTED) 1475 else if (sock->state != SS_UNCONNECTED)
1340 return -EINVAL; 1476 res = -EINVAL;
1341 sock->state = SS_LISTENING; 1477 else {
1342 return 0; 1478 sock->state = SS_LISTENING;
1479 res = 0;
1480 }
1481
1482 release_sock(sk);
1483 return res;
1343} 1484}
1344 1485
1345/** 1486/**
@@ -1351,50 +1492,69 @@ static int listen(struct socket *sock, int len)
1351 * Returns 0 on success, errno otherwise 1492 * Returns 0 on success, errno otherwise
1352 */ 1493 */
1353 1494
1354static int accept(struct socket *sock, struct socket *newsock, int flags) 1495static int accept(struct socket *sock, struct socket *new_sock, int flags)
1355{ 1496{
1356 struct tipc_sock *tsock = tipc_sk(sock->sk); 1497 struct sock *sk = sock->sk;
1357 struct sk_buff *buf; 1498 struct sk_buff *buf;
1358 int res = -EFAULT; 1499 int res;
1359
1360 if (sock->state == SS_READY)
1361 return -EOPNOTSUPP;
1362 if (sock->state != SS_LISTENING)
1363 return -EINVAL;
1364
1365 if (unlikely((skb_queue_len(&sock->sk->sk_receive_queue) == 0) &&
1366 (flags & O_NONBLOCK)))
1367 return -EWOULDBLOCK;
1368 1500
1369 if (mutex_lock_interruptible(&tsock->lock)) 1501 lock_sock(sk);
1370 return -ERESTARTSYS;
1371 1502
1372 if (wait_event_interruptible(*sock->sk->sk_sleep, 1503 if (sock->state == SS_READY) {
1373 skb_queue_len(&sock->sk->sk_receive_queue))) { 1504 res = -EOPNOTSUPP;
1374 res = -ERESTARTSYS; 1505 goto exit;
1506 }
1507 if (sock->state != SS_LISTENING) {
1508 res = -EINVAL;
1375 goto exit; 1509 goto exit;
1376 } 1510 }
1377 buf = skb_peek(&sock->sk->sk_receive_queue);
1378 1511
1379 res = tipc_create(sock_net(sock->sk), newsock, 0); 1512 while (skb_queue_empty(&sk->sk_receive_queue)) {
1513 if (flags & O_NONBLOCK) {
1514 res = -EWOULDBLOCK;
1515 goto exit;
1516 }
1517 release_sock(sk);
1518 res = wait_event_interruptible(*sk->sk_sleep,
1519 (!skb_queue_empty(&sk->sk_receive_queue)));
1520 lock_sock(sk);
1521 if (res)
1522 goto exit;
1523 }
1524
1525 buf = skb_peek(&sk->sk_receive_queue);
1526
1527 res = tipc_create(sock_net(sock->sk), new_sock, 0);
1380 if (!res) { 1528 if (!res) {
1381 struct tipc_sock *new_tsock = tipc_sk(newsock->sk); 1529 struct sock *new_sk = new_sock->sk;
1530 struct tipc_port *new_tport = tipc_sk_port(new_sk);
1531 u32 new_ref = new_tport->ref;
1382 struct tipc_portid id; 1532 struct tipc_portid id;
1383 struct tipc_msg *msg = buf_msg(buf); 1533 struct tipc_msg *msg = buf_msg(buf);
1384 u32 new_ref = new_tsock->p->ref; 1534
1535 lock_sock(new_sk);
1536
1537 /*
1538 * Reject any stray messages received by new socket
1539 * before the socket lock was taken (very, very unlikely)
1540 */
1541
1542 reject_rx_queue(new_sk);
1543
1544 /* Connect new socket to it's peer */
1385 1545
1386 id.ref = msg_origport(msg); 1546 id.ref = msg_origport(msg);
1387 id.node = msg_orignode(msg); 1547 id.node = msg_orignode(msg);
1388 tipc_connect2port(new_ref, &id); 1548 tipc_connect2port(new_ref, &id);
1389 newsock->state = SS_CONNECTED; 1549 new_sock->state = SS_CONNECTED;
1390 1550
1391 tipc_set_portimportance(new_ref, msg_importance(msg)); 1551 tipc_set_portimportance(new_ref, msg_importance(msg));
1392 if (msg_named(msg)) { 1552 if (msg_named(msg)) {
1393 new_tsock->p->conn_type = msg_nametype(msg); 1553 new_tport->conn_type = msg_nametype(msg);
1394 new_tsock->p->conn_instance = msg_nameinst(msg); 1554 new_tport->conn_instance = msg_nameinst(msg);
1395 } 1555 }
1396 1556
1397 /* 1557 /*
1398 * Respond to 'SYN-' by discarding it & returning 'ACK'-. 1558 * Respond to 'SYN-' by discarding it & returning 'ACK'-.
1399 * Respond to 'SYN+' by queuing it on new socket. 1559 * Respond to 'SYN+' by queuing it on new socket.
1400 */ 1560 */
@@ -1403,17 +1563,16 @@ static int accept(struct socket *sock, struct socket *newsock, int flags)
1403 if (!msg_data_sz(msg)) { 1563 if (!msg_data_sz(msg)) {
1404 struct msghdr m = {NULL,}; 1564 struct msghdr m = {NULL,};
1405 1565
1406 send_packet(NULL, newsock, &m, 0); 1566 advance_rx_queue(sk);
1407 advance_queue(tsock); 1567 send_packet(NULL, new_sock, &m, 0);
1408 } else { 1568 } else {
1409 sock_lock(tsock); 1569 __skb_dequeue(&sk->sk_receive_queue);
1410 skb_dequeue(&sock->sk->sk_receive_queue); 1570 __skb_queue_head(&new_sk->sk_receive_queue, buf);
1411 sock_unlock(tsock);
1412 skb_queue_head(&newsock->sk->sk_receive_queue, buf);
1413 } 1571 }
1572 release_sock(new_sk);
1414 } 1573 }
1415exit: 1574exit:
1416 mutex_unlock(&tsock->lock); 1575 release_sock(sk);
1417 return res; 1576 return res;
1418} 1577}
1419 1578
@@ -1429,54 +1588,46 @@ exit:
1429 1588
1430static int shutdown(struct socket *sock, int how) 1589static int shutdown(struct socket *sock, int how)
1431{ 1590{
1432 struct tipc_sock* tsock = tipc_sk(sock->sk); 1591 struct sock *sk = sock->sk;
1592 struct tipc_port *tport = tipc_sk_port(sk);
1433 struct sk_buff *buf; 1593 struct sk_buff *buf;
1434 int res; 1594 int res;
1435 1595
1436 if (how != SHUT_RDWR) 1596 if (how != SHUT_RDWR)
1437 return -EINVAL; 1597 return -EINVAL;
1438 1598
1439 if (mutex_lock_interruptible(&tsock->lock)) 1599 lock_sock(sk);
1440 return -ERESTARTSYS;
1441
1442 sock_lock(tsock);
1443 1600
1444 switch (sock->state) { 1601 switch (sock->state) {
1602 case SS_CONNECTING:
1445 case SS_CONNECTED: 1603 case SS_CONNECTED:
1446 1604
1447 /* Send 'FIN+' or 'FIN-' message to peer */ 1605 /* Disconnect and send a 'FIN+' or 'FIN-' message to peer */
1448
1449 sock_unlock(tsock);
1450restart: 1606restart:
1451 if ((buf = skb_dequeue(&sock->sk->sk_receive_queue))) { 1607 buf = __skb_dequeue(&sk->sk_receive_queue);
1608 if (buf) {
1452 atomic_dec(&tipc_queue_size); 1609 atomic_dec(&tipc_queue_size);
1453 if (TIPC_SKB_CB(buf)->handle != msg_data(buf_msg(buf))) { 1610 if (TIPC_SKB_CB(buf)->handle != msg_data(buf_msg(buf))) {
1454 buf_discard(buf); 1611 buf_discard(buf);
1455 goto restart; 1612 goto restart;
1456 } 1613 }
1614 tipc_disconnect(tport->ref);
1457 tipc_reject_msg(buf, TIPC_CONN_SHUTDOWN); 1615 tipc_reject_msg(buf, TIPC_CONN_SHUTDOWN);
1616 } else {
1617 tipc_shutdown(tport->ref);
1458 } 1618 }
1459 else { 1619
1460 tipc_shutdown(tsock->p->ref); 1620 sock->state = SS_DISCONNECTING;
1461 }
1462 sock_lock(tsock);
1463 1621
1464 /* fall through */ 1622 /* fall through */
1465 1623
1466 case SS_DISCONNECTING: 1624 case SS_DISCONNECTING:
1467 1625
1468 /* Discard any unreceived messages */ 1626 /* Discard any unreceived messages; wake up sleeping tasks */
1469
1470 while ((buf = skb_dequeue(&sock->sk->sk_receive_queue))) {
1471 atomic_dec(&tipc_queue_size);
1472 buf_discard(buf);
1473 }
1474 tsock->p->conn_unacked = 0;
1475 1627
1476 /* fall through */ 1628 discard_rx_queue(sk);
1477 1629 if (waitqueue_active(sk->sk_sleep))
1478 case SS_CONNECTING: 1630 wake_up_interruptible(sk->sk_sleep);
1479 sock->state = SS_DISCONNECTING;
1480 res = 0; 1631 res = 0;
1481 break; 1632 break;
1482 1633
@@ -1484,9 +1635,7 @@ restart:
1484 res = -ENOTCONN; 1635 res = -ENOTCONN;
1485 } 1636 }
1486 1637
1487 sock_unlock(tsock); 1638 release_sock(sk);
1488
1489 mutex_unlock(&tsock->lock);
1490 return res; 1639 return res;
1491} 1640}
1492 1641
@@ -1507,7 +1656,8 @@ restart:
1507static int setsockopt(struct socket *sock, 1656static int setsockopt(struct socket *sock,
1508 int lvl, int opt, char __user *ov, int ol) 1657 int lvl, int opt, char __user *ov, int ol)
1509{ 1658{
1510 struct tipc_sock *tsock = tipc_sk(sock->sk); 1659 struct sock *sk = sock->sk;
1660 struct tipc_port *tport = tipc_sk_port(sk);
1511 u32 value; 1661 u32 value;
1512 int res; 1662 int res;
1513 1663
@@ -1520,30 +1670,31 @@ static int setsockopt(struct socket *sock,
1520 if ((res = get_user(value, (u32 __user *)ov))) 1670 if ((res = get_user(value, (u32 __user *)ov)))
1521 return res; 1671 return res;
1522 1672
1523 if (mutex_lock_interruptible(&tsock->lock)) 1673 lock_sock(sk);
1524 return -ERESTARTSYS;
1525 1674
1526 switch (opt) { 1675 switch (opt) {
1527 case TIPC_IMPORTANCE: 1676 case TIPC_IMPORTANCE:
1528 res = tipc_set_portimportance(tsock->p->ref, value); 1677 res = tipc_set_portimportance(tport->ref, value);
1529 break; 1678 break;
1530 case TIPC_SRC_DROPPABLE: 1679 case TIPC_SRC_DROPPABLE:
1531 if (sock->type != SOCK_STREAM) 1680 if (sock->type != SOCK_STREAM)
1532 res = tipc_set_portunreliable(tsock->p->ref, value); 1681 res = tipc_set_portunreliable(tport->ref, value);
1533 else 1682 else
1534 res = -ENOPROTOOPT; 1683 res = -ENOPROTOOPT;
1535 break; 1684 break;
1536 case TIPC_DEST_DROPPABLE: 1685 case TIPC_DEST_DROPPABLE:
1537 res = tipc_set_portunreturnable(tsock->p->ref, value); 1686 res = tipc_set_portunreturnable(tport->ref, value);
1538 break; 1687 break;
1539 case TIPC_CONN_TIMEOUT: 1688 case TIPC_CONN_TIMEOUT:
1540 sock->sk->sk_rcvtimeo = msecs_to_jiffies(value); 1689 sk->sk_rcvtimeo = msecs_to_jiffies(value);
1690 /* no need to set "res", since already 0 at this point */
1541 break; 1691 break;
1542 default: 1692 default:
1543 res = -EINVAL; 1693 res = -EINVAL;
1544 } 1694 }
1545 1695
1546 mutex_unlock(&tsock->lock); 1696 release_sock(sk);
1697
1547 return res; 1698 return res;
1548} 1699}
1549 1700
@@ -1564,7 +1715,8 @@ static int setsockopt(struct socket *sock,
1564static int getsockopt(struct socket *sock, 1715static int getsockopt(struct socket *sock,
1565 int lvl, int opt, char __user *ov, int __user *ol) 1716 int lvl, int opt, char __user *ov, int __user *ol)
1566{ 1717{
1567 struct tipc_sock *tsock = tipc_sk(sock->sk); 1718 struct sock *sk = sock->sk;
1719 struct tipc_port *tport = tipc_sk_port(sk);
1568 int len; 1720 int len;
1569 u32 value; 1721 u32 value;
1570 int res; 1722 int res;
@@ -1576,26 +1728,28 @@ static int getsockopt(struct socket *sock,
1576 if ((res = get_user(len, ol))) 1728 if ((res = get_user(len, ol)))
1577 return res; 1729 return res;
1578 1730
1579 if (mutex_lock_interruptible(&tsock->lock)) 1731 lock_sock(sk);
1580 return -ERESTARTSYS;
1581 1732
1582 switch (opt) { 1733 switch (opt) {
1583 case TIPC_IMPORTANCE: 1734 case TIPC_IMPORTANCE:
1584 res = tipc_portimportance(tsock->p->ref, &value); 1735 res = tipc_portimportance(tport->ref, &value);
1585 break; 1736 break;
1586 case TIPC_SRC_DROPPABLE: 1737 case TIPC_SRC_DROPPABLE:
1587 res = tipc_portunreliable(tsock->p->ref, &value); 1738 res = tipc_portunreliable(tport->ref, &value);
1588 break; 1739 break;
1589 case TIPC_DEST_DROPPABLE: 1740 case TIPC_DEST_DROPPABLE:
1590 res = tipc_portunreturnable(tsock->p->ref, &value); 1741 res = tipc_portunreturnable(tport->ref, &value);
1591 break; 1742 break;
1592 case TIPC_CONN_TIMEOUT: 1743 case TIPC_CONN_TIMEOUT:
1593 value = jiffies_to_msecs(sock->sk->sk_rcvtimeo); 1744 value = jiffies_to_msecs(sk->sk_rcvtimeo);
1745 /* no need to set "res", since already 0 at this point */
1594 break; 1746 break;
1595 default: 1747 default:
1596 res = -EINVAL; 1748 res = -EINVAL;
1597 } 1749 }
1598 1750
1751 release_sock(sk);
1752
1599 if (res) { 1753 if (res) {
1600 /* "get" failed */ 1754 /* "get" failed */
1601 } 1755 }
@@ -1609,7 +1763,6 @@ static int getsockopt(struct socket *sock,
1609 res = put_user(sizeof(value), ol); 1763 res = put_user(sizeof(value), ol);
1610 } 1764 }
1611 1765
1612 mutex_unlock(&tsock->lock);
1613 return res; 1766 return res;
1614} 1767}
1615 1768
@@ -1722,6 +1875,7 @@ int tipc_socket_init(void)
1722/** 1875/**
1723 * tipc_socket_stop - stop TIPC socket interface 1876 * tipc_socket_stop - stop TIPC socket interface
1724 */ 1877 */
1878
1725void tipc_socket_stop(void) 1879void tipc_socket_stop(void)
1726{ 1880{
1727 if (!sockets_enabled) 1881 if (!sockets_enabled)