aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorDavid S. Miller <davem@davemloft.net>2012-12-08 20:25:45 -0500
committerDavid S. Miller <davem@davemloft.net>2012-12-08 20:25:45 -0500
commitba501666fa8d44344a1f7e5158ba973491a70671 (patch)
treece71348bab0fe881627d6ed8e43c6e137d3c3a8c
parentc772dde343917b0961f75227cfb8c2b2f2b31d24 (diff)
parent0fef8f205f6f4cdff1869e54e44f317a79902785 (diff)
Merge branch 'tipc_net-next_v2' of git://git.kernel.org/pub/scm/linux/kernel/git/paulg/linux
Paul Gortmaker says: ==================== Changes since v1: -get rid of essentially unused variable spotted by Neil Horman (patch #2) -drop patch #3; defer it for 3.9 content, so Neil, Jon and Ying can discuss its specifics at their leisure while net-next is closed. (It had no direct dependencies to the rest of the series, and was just an optimization) -fix indentation of accept() code directly in place vs. forking it out to a separate function (was patch #10, now patch #9). Rebuilt and re-ran tests just to ensure nothing odd happened. Original v1 text follows, updated pull information follows that. --------- Here is another batch of TIPC changes. The most interesting thing is probably the non-blocking socket connect - I'm told there were several users looking forward to seeing this. Also there were some resource limitation changes that had the right intent back in 2005, but were now apparently causing needless limitations to people's real use cases; those have been relaxed/removed. There is a lockdep splat fix, but no need for a stable backport, since it is virtually impossible to trigger in mainline; you have to essentially modify code to force the probabilities in your favour to see it. The rest can largely be categorized as general cleanup of things seen in the process of getting the above changes done. Tested between 64 and 32 bit nodes with the test suite. I've also compile tested all the individual commits on the chain. I'd originally figured on this queue not being ready for 3.8, but the extended stabilization window of 3.7 has changed that. On the other hand, this can still be 3.9 material, if that simply works better for folks - no problem for me to defer it to 2013. If anyone spots any problems then I'll definitely defer it, rather than rush a last minute respin. =================== Signed-off-by: David S. Miller <davem@davemloft.net>
-rw-r--r--net/tipc/link.c44
-rw-r--r--net/tipc/port.c32
-rw-r--r--net/tipc/port.h6
-rw-r--r--net/tipc/socket.c353
-rw-r--r--net/tipc/subscr.c2
5 files changed, 225 insertions, 212 deletions
diff --git a/net/tipc/link.c b/net/tipc/link.c
index 87bf5aad704b..daa6080a2a0c 100644
--- a/net/tipc/link.c
+++ b/net/tipc/link.c
@@ -97,7 +97,6 @@ static int link_send_sections_long(struct tipc_port *sender,
97 struct iovec const *msg_sect, 97 struct iovec const *msg_sect,
98 u32 num_sect, unsigned int total_len, 98 u32 num_sect, unsigned int total_len,
99 u32 destnode); 99 u32 destnode);
100static void link_check_defragm_bufs(struct tipc_link *l_ptr);
101static void link_state_event(struct tipc_link *l_ptr, u32 event); 100static void link_state_event(struct tipc_link *l_ptr, u32 event);
102static void link_reset_statistics(struct tipc_link *l_ptr); 101static void link_reset_statistics(struct tipc_link *l_ptr);
103static void link_print(struct tipc_link *l_ptr, const char *str); 102static void link_print(struct tipc_link *l_ptr, const char *str);
@@ -271,7 +270,6 @@ static void link_timeout(struct tipc_link *l_ptr)
271 } 270 }
272 271
273 /* do all other link processing performed on a periodic basis */ 272 /* do all other link processing performed on a periodic basis */
274 link_check_defragm_bufs(l_ptr);
275 273
276 link_state_event(l_ptr, TIMEOUT_EVT); 274 link_state_event(l_ptr, TIMEOUT_EVT);
277 275
@@ -2497,16 +2495,6 @@ static void set_expected_frags(struct sk_buff *buf, u32 exp)
2497 msg_set_bcast_ack(buf_msg(buf), exp); 2495 msg_set_bcast_ack(buf_msg(buf), exp);
2498} 2496}
2499 2497
2500static u32 get_timer_cnt(struct sk_buff *buf)
2501{
2502 return msg_reroute_cnt(buf_msg(buf));
2503}
2504
2505static void incr_timer_cnt(struct sk_buff *buf)
2506{
2507 msg_incr_reroute_cnt(buf_msg(buf));
2508}
2509
2510/* 2498/*
2511 * tipc_link_recv_fragment(): Called with node lock on. Returns 2499 * tipc_link_recv_fragment(): Called with node lock on. Returns
2512 * the reassembled buffer if message is complete. 2500 * the reassembled buffer if message is complete.
@@ -2585,38 +2573,6 @@ int tipc_link_recv_fragment(struct sk_buff **pending, struct sk_buff **fb,
2585 return 0; 2573 return 0;
2586} 2574}
2587 2575
2588/**
2589 * link_check_defragm_bufs - flush stale incoming message fragments
2590 * @l_ptr: pointer to link
2591 */
2592static void link_check_defragm_bufs(struct tipc_link *l_ptr)
2593{
2594 struct sk_buff *prev = NULL;
2595 struct sk_buff *next = NULL;
2596 struct sk_buff *buf = l_ptr->defragm_buf;
2597
2598 if (!buf)
2599 return;
2600 if (!link_working_working(l_ptr))
2601 return;
2602 while (buf) {
2603 u32 cnt = get_timer_cnt(buf);
2604
2605 next = buf->next;
2606 if (cnt < 4) {
2607 incr_timer_cnt(buf);
2608 prev = buf;
2609 } else {
2610 if (prev)
2611 prev->next = buf->next;
2612 else
2613 l_ptr->defragm_buf = buf->next;
2614 kfree_skb(buf);
2615 }
2616 buf = next;
2617 }
2618}
2619
2620static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tolerance) 2576static void link_set_supervision_props(struct tipc_link *l_ptr, u32 tolerance)
2621{ 2577{
2622 if ((tolerance < TIPC_MIN_LINK_TOL) || (tolerance > TIPC_MAX_LINK_TOL)) 2578 if ((tolerance < TIPC_MIN_LINK_TOL) || (tolerance > TIPC_MAX_LINK_TOL))
diff --git a/net/tipc/port.c b/net/tipc/port.c
index 07c42fba672b..18098cac62f2 100644
--- a/net/tipc/port.c
+++ b/net/tipc/port.c
@@ -726,7 +726,7 @@ static void port_dispatcher_sigh(void *dummy)
726 if (unlikely(!cb)) 726 if (unlikely(!cb))
727 goto reject; 727 goto reject;
728 if (unlikely(!connected)) { 728 if (unlikely(!connected)) {
729 if (tipc_connect2port(dref, &orig)) 729 if (tipc_connect(dref, &orig))
730 goto reject; 730 goto reject;
731 } else if (peer_invalid) 731 } else if (peer_invalid)
732 goto reject; 732 goto reject;
@@ -1036,15 +1036,30 @@ int tipc_withdraw(u32 ref, unsigned int scope, struct tipc_name_seq const *seq)
1036 return res; 1036 return res;
1037} 1037}
1038 1038
1039int tipc_connect2port(u32 ref, struct tipc_portid const *peer) 1039int tipc_connect(u32 ref, struct tipc_portid const *peer)
1040{ 1040{
1041 struct tipc_port *p_ptr; 1041 struct tipc_port *p_ptr;
1042 struct tipc_msg *msg; 1042 int res;
1043 int res = -EINVAL;
1044 1043
1045 p_ptr = tipc_port_lock(ref); 1044 p_ptr = tipc_port_lock(ref);
1046 if (!p_ptr) 1045 if (!p_ptr)
1047 return -EINVAL; 1046 return -EINVAL;
1047 res = __tipc_connect(ref, p_ptr, peer);
1048 tipc_port_unlock(p_ptr);
1049 return res;
1050}
1051
1052/*
1053 * __tipc_connect - connect to a remote peer
1054 *
1055 * Port must be locked.
1056 */
1057int __tipc_connect(u32 ref, struct tipc_port *p_ptr,
1058 struct tipc_portid const *peer)
1059{
1060 struct tipc_msg *msg;
1061 int res = -EINVAL;
1062
1048 if (p_ptr->published || p_ptr->connected) 1063 if (p_ptr->published || p_ptr->connected)
1049 goto exit; 1064 goto exit;
1050 if (!peer->ref) 1065 if (!peer->ref)
@@ -1067,17 +1082,16 @@ int tipc_connect2port(u32 ref, struct tipc_portid const *peer)
1067 (net_ev_handler)port_handle_node_down); 1082 (net_ev_handler)port_handle_node_down);
1068 res = 0; 1083 res = 0;
1069exit: 1084exit:
1070 tipc_port_unlock(p_ptr);
1071 p_ptr->max_pkt = tipc_link_get_max_pkt(peer->node, ref); 1085 p_ptr->max_pkt = tipc_link_get_max_pkt(peer->node, ref);
1072 return res; 1086 return res;
1073} 1087}
1074 1088
1075/** 1089/*
1076 * tipc_disconnect_port - disconnect port from peer 1090 * __tipc_disconnect - disconnect port from peer
1077 * 1091 *
1078 * Port must be locked. 1092 * Port must be locked.
1079 */ 1093 */
1080int tipc_disconnect_port(struct tipc_port *tp_ptr) 1094int __tipc_disconnect(struct tipc_port *tp_ptr)
1081{ 1095{
1082 int res; 1096 int res;
1083 1097
@@ -1104,7 +1118,7 @@ int tipc_disconnect(u32 ref)
1104 p_ptr = tipc_port_lock(ref); 1118 p_ptr = tipc_port_lock(ref);
1105 if (!p_ptr) 1119 if (!p_ptr)
1106 return -EINVAL; 1120 return -EINVAL;
1107 res = tipc_disconnect_port(p_ptr); 1121 res = __tipc_disconnect(p_ptr);
1108 tipc_port_unlock(p_ptr); 1122 tipc_port_unlock(p_ptr);
1109 return res; 1123 return res;
1110} 1124}
diff --git a/net/tipc/port.h b/net/tipc/port.h
index 4660e3065790..fb66e2e5f4d1 100644
--- a/net/tipc/port.h
+++ b/net/tipc/port.h
@@ -190,7 +190,7 @@ int tipc_publish(u32 portref, unsigned int scope,
190int tipc_withdraw(u32 portref, unsigned int scope, 190int tipc_withdraw(u32 portref, unsigned int scope,
191 struct tipc_name_seq const *name_seq); 191 struct tipc_name_seq const *name_seq);
192 192
193int tipc_connect2port(u32 portref, struct tipc_portid const *port); 193int tipc_connect(u32 portref, struct tipc_portid const *port);
194 194
195int tipc_disconnect(u32 portref); 195int tipc_disconnect(u32 portref);
196 196
@@ -200,7 +200,9 @@ int tipc_shutdown(u32 ref);
200/* 200/*
201 * The following routines require that the port be locked on entry 201 * The following routines require that the port be locked on entry
202 */ 202 */
203int tipc_disconnect_port(struct tipc_port *tp_ptr); 203int __tipc_disconnect(struct tipc_port *tp_ptr);
204int __tipc_connect(u32 ref, struct tipc_port *p_ptr,
205 struct tipc_portid const *peer);
204int tipc_port_peer_msg(struct tipc_port *p_ptr, struct tipc_msg *msg); 206int tipc_port_peer_msg(struct tipc_port *p_ptr, struct tipc_msg *msg);
205 207
206/* 208/*
diff --git a/net/tipc/socket.c b/net/tipc/socket.c
index 1a720c86e80a..9b4e4833a484 100644
--- a/net/tipc/socket.c
+++ b/net/tipc/socket.c
@@ -1,8 +1,8 @@
1/* 1/*
2 * net/tipc/socket.c: TIPC socket API 2 * net/tipc/socket.c: TIPC socket API
3 * 3 *
4 * Copyright (c) 2001-2007, Ericsson AB 4 * Copyright (c) 2001-2007, 2012 Ericsson AB
5 * Copyright (c) 2004-2008, 2010-2011, Wind River Systems 5 * Copyright (c) 2004-2008, 2010-2012, Wind River Systems
6 * All rights reserved. 6 * All rights reserved.
7 * 7 *
8 * Redistribution and use in source and binary forms, with or without 8 * Redistribution and use in source and binary forms, with or without
@@ -43,7 +43,7 @@
43#define SS_LISTENING -1 /* socket is listening */ 43#define SS_LISTENING -1 /* socket is listening */
44#define SS_READY -2 /* socket is connectionless */ 44#define SS_READY -2 /* socket is connectionless */
45 45
46#define OVERLOAD_LIMIT_BASE 5000 46#define OVERLOAD_LIMIT_BASE 10000
47#define CONN_TIMEOUT_DEFAULT 8000 /* default connect timeout = 8s */ 47#define CONN_TIMEOUT_DEFAULT 8000 /* default connect timeout = 8s */
48 48
49struct tipc_sock { 49struct tipc_sock {
@@ -73,8 +73,6 @@ static struct proto tipc_proto;
73 73
74static int sockets_enabled; 74static int sockets_enabled;
75 75
76static atomic_t tipc_queue_size = ATOMIC_INIT(0);
77
78/* 76/*
79 * Revised TIPC socket locking policy: 77 * Revised TIPC socket locking policy:
80 * 78 *
@@ -128,7 +126,6 @@ static atomic_t tipc_queue_size = ATOMIC_INIT(0);
128static void advance_rx_queue(struct sock *sk) 126static void advance_rx_queue(struct sock *sk)
129{ 127{
130 kfree_skb(__skb_dequeue(&sk->sk_receive_queue)); 128 kfree_skb(__skb_dequeue(&sk->sk_receive_queue));
131 atomic_dec(&tipc_queue_size);
132} 129}
133 130
134/** 131/**
@@ -140,10 +137,8 @@ static void discard_rx_queue(struct sock *sk)
140{ 137{
141 struct sk_buff *buf; 138 struct sk_buff *buf;
142 139
143 while ((buf = __skb_dequeue(&sk->sk_receive_queue))) { 140 while ((buf = __skb_dequeue(&sk->sk_receive_queue)))
144 atomic_dec(&tipc_queue_size);
145 kfree_skb(buf); 141 kfree_skb(buf);
146 }
147} 142}
148 143
149/** 144/**
@@ -155,10 +150,8 @@ static void reject_rx_queue(struct sock *sk)
155{ 150{
156 struct sk_buff *buf; 151 struct sk_buff *buf;
157 152
158 while ((buf = __skb_dequeue(&sk->sk_receive_queue))) { 153 while ((buf = __skb_dequeue(&sk->sk_receive_queue)))
159 tipc_reject_msg(buf, TIPC_ERR_NO_PORT); 154 tipc_reject_msg(buf, TIPC_ERR_NO_PORT);
160 atomic_dec(&tipc_queue_size);
161 }
162} 155}
163 156
164/** 157/**
@@ -280,7 +273,6 @@ static int release(struct socket *sock)
280 buf = __skb_dequeue(&sk->sk_receive_queue); 273 buf = __skb_dequeue(&sk->sk_receive_queue);
281 if (buf == NULL) 274 if (buf == NULL)
282 break; 275 break;
283 atomic_dec(&tipc_queue_size);
284 if (TIPC_SKB_CB(buf)->handle != 0) 276 if (TIPC_SKB_CB(buf)->handle != 0)
285 kfree_skb(buf); 277 kfree_skb(buf);
286 else { 278 else {
@@ -783,16 +775,19 @@ exit:
783static int auto_connect(struct socket *sock, struct tipc_msg *msg) 775static int auto_connect(struct socket *sock, struct tipc_msg *msg)
784{ 776{
785 struct tipc_sock *tsock = tipc_sk(sock->sk); 777 struct tipc_sock *tsock = tipc_sk(sock->sk);
786 778 struct tipc_port *p_ptr;
787 if (msg_errcode(msg)) {
788 sock->state = SS_DISCONNECTING;
789 return -ECONNREFUSED;
790 }
791 779
792 tsock->peer_name.ref = msg_origport(msg); 780 tsock->peer_name.ref = msg_origport(msg);
793 tsock->peer_name.node = msg_orignode(msg); 781 tsock->peer_name.node = msg_orignode(msg);
794 tipc_connect2port(tsock->p->ref, &tsock->peer_name); 782 p_ptr = tipc_port_deref(tsock->p->ref);
795 tipc_set_portimportance(tsock->p->ref, msg_importance(msg)); 783 if (!p_ptr)
784 return -EINVAL;
785
786 __tipc_connect(tsock->p->ref, p_ptr, &tsock->peer_name);
787
788 if (msg_importance(msg) > TIPC_CRITICAL_IMPORTANCE)
789 return -EINVAL;
790 msg_set_importance(&p_ptr->phdr, (u32)msg_importance(msg));
796 sock->state = SS_CONNECTED; 791 sock->state = SS_CONNECTED;
797 return 0; 792 return 0;
798} 793}
@@ -951,13 +946,6 @@ restart:
951 sz = msg_data_sz(msg); 946 sz = msg_data_sz(msg);
952 err = msg_errcode(msg); 947 err = msg_errcode(msg);
953 948
954 /* Complete connection setup for an implied connect */
955 if (unlikely(sock->state == SS_CONNECTING)) {
956 res = auto_connect(sock, msg);
957 if (res)
958 goto exit;
959 }
960
961 /* Discard an empty non-errored message & try again */ 949 /* Discard an empty non-errored message & try again */
962 if ((!sz) && (!err)) { 950 if ((!sz) && (!err)) {
963 advance_rx_queue(sk); 951 advance_rx_queue(sk);
@@ -1195,6 +1183,83 @@ static int rx_queue_full(struct tipc_msg *msg, u32 queue_size, u32 base)
1195} 1183}
1196 1184
1197/** 1185/**
1186 * filter_connect - Handle all incoming messages for a connection-based socket
1187 * @tsock: TIPC socket
1188 * @msg: message
1189 *
1190 * Returns TIPC error status code and socket error status code
1191 * once it encounters some errors
1192 */
1193static u32 filter_connect(struct tipc_sock *tsock, struct sk_buff **buf)
1194{
1195 struct socket *sock = tsock->sk.sk_socket;
1196 struct tipc_msg *msg = buf_msg(*buf);
1197 struct sock *sk = &tsock->sk;
1198 u32 retval = TIPC_ERR_NO_PORT;
1199 int res;
1200
1201 if (msg_mcast(msg))
1202 return retval;
1203
1204 switch ((int)sock->state) {
1205 case SS_CONNECTED:
1206 /* Accept only connection-based messages sent by peer */
1207 if (msg_connected(msg) && tipc_port_peer_msg(tsock->p, msg)) {
1208 if (unlikely(msg_errcode(msg))) {
1209 sock->state = SS_DISCONNECTING;
1210 __tipc_disconnect(tsock->p);
1211 }
1212 retval = TIPC_OK;
1213 }
1214 break;
1215 case SS_CONNECTING:
1216 /* Accept only ACK or NACK message */
1217 if (unlikely(msg_errcode(msg))) {
1218 sock->state = SS_DISCONNECTING;
1219 sk->sk_err = -ECONNREFUSED;
1220 retval = TIPC_OK;
1221 break;
1222 }
1223
1224 if (unlikely(!msg_connected(msg)))
1225 break;
1226
1227 res = auto_connect(sock, msg);
1228 if (res) {
1229 sock->state = SS_DISCONNECTING;
1230 sk->sk_err = res;
1231 retval = TIPC_OK;
1232 break;
1233 }
1234
1235 /* If an incoming message is an 'ACK-', it should be
1236 * discarded here because it doesn't contain useful
1237 * data. In addition, we should try to wake up
1238 * connect() routine if sleeping.
1239 */
1240 if (msg_data_sz(msg) == 0) {
1241 kfree_skb(*buf);
1242 *buf = NULL;
1243 if (waitqueue_active(sk_sleep(sk)))
1244 wake_up_interruptible(sk_sleep(sk));
1245 }
1246 retval = TIPC_OK;
1247 break;
1248 case SS_LISTENING:
1249 case SS_UNCONNECTED:
1250 /* Accept only SYN message */
1251 if (!msg_connected(msg) && !(msg_errcode(msg)))
1252 retval = TIPC_OK;
1253 break;
1254 case SS_DISCONNECTING:
1255 break;
1256 default:
1257 pr_err("Unknown socket state %u\n", sock->state);
1258 }
1259 return retval;
1260}
1261
1262/**
1198 * filter_rcv - validate incoming message 1263 * filter_rcv - validate incoming message
1199 * @sk: socket 1264 * @sk: socket
1200 * @buf: message 1265 * @buf: message
@@ -1211,6 +1276,7 @@ static u32 filter_rcv(struct sock *sk, struct sk_buff *buf)
1211 struct socket *sock = sk->sk_socket; 1276 struct socket *sock = sk->sk_socket;
1212 struct tipc_msg *msg = buf_msg(buf); 1277 struct tipc_msg *msg = buf_msg(buf);
1213 u32 recv_q_len; 1278 u32 recv_q_len;
1279 u32 res = TIPC_OK;
1214 1280
1215 /* Reject message if it is wrong sort of message for socket */ 1281 /* Reject message if it is wrong sort of message for socket */
1216 if (msg_type(msg) > TIPC_DIRECT_MSG) 1282 if (msg_type(msg) > TIPC_DIRECT_MSG)
@@ -1220,32 +1286,12 @@ static u32 filter_rcv(struct sock *sk, struct sk_buff *buf)
1220 if (msg_connected(msg)) 1286 if (msg_connected(msg))
1221 return TIPC_ERR_NO_PORT; 1287 return TIPC_ERR_NO_PORT;
1222 } else { 1288 } else {
1223 if (msg_mcast(msg)) 1289 res = filter_connect(tipc_sk(sk), &buf);
1224 return TIPC_ERR_NO_PORT; 1290 if (res != TIPC_OK || buf == NULL)
1225 if (sock->state == SS_CONNECTED) { 1291 return res;
1226 if (!msg_connected(msg) ||
1227 !tipc_port_peer_msg(tipc_sk_port(sk), msg))
1228 return TIPC_ERR_NO_PORT;
1229 } else if (sock->state == SS_CONNECTING) {
1230 if (!msg_connected(msg) && (msg_errcode(msg) == 0))
1231 return TIPC_ERR_NO_PORT;
1232 } else if (sock->state == SS_LISTENING) {
1233 if (msg_connected(msg) || msg_errcode(msg))
1234 return TIPC_ERR_NO_PORT;
1235 } else if (sock->state == SS_DISCONNECTING) {
1236 return TIPC_ERR_NO_PORT;
1237 } else /* (sock->state == SS_UNCONNECTED) */ {
1238 if (msg_connected(msg) || msg_errcode(msg))
1239 return TIPC_ERR_NO_PORT;
1240 }
1241 } 1292 }
1242 1293
1243 /* Reject message if there isn't room to queue it */ 1294 /* Reject message if there isn't room to queue it */
1244 recv_q_len = (u32)atomic_read(&tipc_queue_size);
1245 if (unlikely(recv_q_len >= OVERLOAD_LIMIT_BASE)) {
1246 if (rx_queue_full(msg, recv_q_len, OVERLOAD_LIMIT_BASE))
1247 return TIPC_ERR_OVERLOAD;
1248 }
1249 recv_q_len = skb_queue_len(&sk->sk_receive_queue); 1295 recv_q_len = skb_queue_len(&sk->sk_receive_queue);
1250 if (unlikely(recv_q_len >= (OVERLOAD_LIMIT_BASE / 2))) { 1296 if (unlikely(recv_q_len >= (OVERLOAD_LIMIT_BASE / 2))) {
1251 if (rx_queue_full(msg, recv_q_len, OVERLOAD_LIMIT_BASE / 2)) 1297 if (rx_queue_full(msg, recv_q_len, OVERLOAD_LIMIT_BASE / 2))
@@ -1254,15 +1300,8 @@ static u32 filter_rcv(struct sock *sk, struct sk_buff *buf)
1254 1300
1255 /* Enqueue message (finally!) */ 1301 /* Enqueue message (finally!) */
1256 TIPC_SKB_CB(buf)->handle = 0; 1302 TIPC_SKB_CB(buf)->handle = 0;
1257 atomic_inc(&tipc_queue_size);
1258 __skb_queue_tail(&sk->sk_receive_queue, buf); 1303 __skb_queue_tail(&sk->sk_receive_queue, buf);
1259 1304
1260 /* Initiate connection termination for an incoming 'FIN' */
1261 if (unlikely(msg_errcode(msg) && (sock->state == SS_CONNECTED))) {
1262 sock->state = SS_DISCONNECTING;
1263 tipc_disconnect_port(tipc_sk_port(sk));
1264 }
1265
1266 sk->sk_data_ready(sk, 0); 1305 sk->sk_data_ready(sk, 0);
1267 return TIPC_OK; 1306 return TIPC_OK;
1268} 1307}
@@ -1348,8 +1387,6 @@ static int connect(struct socket *sock, struct sockaddr *dest, int destlen,
1348 struct sock *sk = sock->sk; 1387 struct sock *sk = sock->sk;
1349 struct sockaddr_tipc *dst = (struct sockaddr_tipc *)dest; 1388 struct sockaddr_tipc *dst = (struct sockaddr_tipc *)dest;
1350 struct msghdr m = {NULL,}; 1389 struct msghdr m = {NULL,};
1351 struct sk_buff *buf;
1352 struct tipc_msg *msg;
1353 unsigned int timeout; 1390 unsigned int timeout;
1354 int res; 1391 int res;
1355 1392
@@ -1361,26 +1398,6 @@ static int connect(struct socket *sock, struct sockaddr *dest, int destlen,
1361 goto exit; 1398 goto exit;
1362 } 1399 }
1363 1400
1364 /* For now, TIPC does not support the non-blocking form of connect() */
1365 if (flags & O_NONBLOCK) {
1366 res = -EOPNOTSUPP;
1367 goto exit;
1368 }
1369
1370 /* Issue Posix-compliant error code if socket is in the wrong state */
1371 if (sock->state == SS_LISTENING) {
1372 res = -EOPNOTSUPP;
1373 goto exit;
1374 }
1375 if (sock->state == SS_CONNECTING) {
1376 res = -EALREADY;
1377 goto exit;
1378 }
1379 if (sock->state != SS_UNCONNECTED) {
1380 res = -EISCONN;
1381 goto exit;
1382 }
1383
1384 /* 1401 /*
1385 * Reject connection attempt using multicast address 1402 * Reject connection attempt using multicast address
1386 * 1403 *
@@ -1392,49 +1409,66 @@ static int connect(struct socket *sock, struct sockaddr *dest, int destlen,
1392 goto exit; 1409 goto exit;
1393 } 1410 }
1394 1411
1395 /* Reject any messages already in receive queue (very unlikely) */ 1412 timeout = (flags & O_NONBLOCK) ? 0 : tipc_sk(sk)->conn_timeout;
1396 reject_rx_queue(sk); 1413
1414 switch (sock->state) {
1415 case SS_UNCONNECTED:
1416 /* Send a 'SYN-' to destination */
1417 m.msg_name = dest;
1418 m.msg_namelen = destlen;
1419
1420 /* If connect is in non-blocking case, set MSG_DONTWAIT to
1421 * indicate send_msg() is never blocked.
1422 */
1423 if (!timeout)
1424 m.msg_flags = MSG_DONTWAIT;
1397 1425
1398 /* Send a 'SYN-' to destination */ 1426 res = send_msg(NULL, sock, &m, 0);
1399 m.msg_name = dest; 1427 if ((res < 0) && (res != -EWOULDBLOCK))
1400 m.msg_namelen = destlen; 1428 goto exit;
1401 res = send_msg(NULL, sock, &m, 0); 1429
1402 if (res < 0) 1430 /* Just entered SS_CONNECTING state; the only
1431 * difference is that return value in non-blocking
1432 * case is EINPROGRESS, rather than EALREADY.
1433 */
1434 res = -EINPROGRESS;
1435 break;
1436 case SS_CONNECTING:
1437 res = -EALREADY;
1438 break;
1439 case SS_CONNECTED:
1440 res = -EISCONN;
1441 break;
1442 default:
1443 res = -EINVAL;
1403 goto exit; 1444 goto exit;
1445 }
1404 1446
1405 /* Wait until an 'ACK' or 'RST' arrives, or a timeout occurs */ 1447 if (sock->state == SS_CONNECTING) {
1406 timeout = tipc_sk(sk)->conn_timeout; 1448 if (!timeout)
1407 release_sock(sk); 1449 goto exit;
1408 res = wait_event_interruptible_timeout(*sk_sleep(sk),
1409 (!skb_queue_empty(&sk->sk_receive_queue) ||
1410 (sock->state != SS_CONNECTING)),
1411 timeout ? (long)msecs_to_jiffies(timeout)
1412 : MAX_SCHEDULE_TIMEOUT);
1413 lock_sock(sk);
1414 1450
1415 if (res > 0) { 1451 /* Wait until an 'ACK' or 'RST' arrives, or a timeout occurs */
1416 buf = skb_peek(&sk->sk_receive_queue); 1452 release_sock(sk);
1417 if (buf != NULL) { 1453 res = wait_event_interruptible_timeout(*sk_sleep(sk),
1418 msg = buf_msg(buf); 1454 sock->state != SS_CONNECTING,
1419 res = auto_connect(sock, msg); 1455 timeout ? (long)msecs_to_jiffies(timeout)
1420 if (!res) { 1456 : MAX_SCHEDULE_TIMEOUT);
1421 if (!msg_data_sz(msg)) 1457 lock_sock(sk);
1422 advance_rx_queue(sk); 1458 if (res <= 0) {
1423 } 1459 if (res == 0)
1424 } else { 1460 res = -ETIMEDOUT;
1425 if (sock->state == SS_CONNECTED)
1426 res = -EISCONN;
1427 else 1461 else
1428 res = -ECONNREFUSED; 1462 ; /* leave "res" unchanged */
1463 goto exit;
1429 } 1464 }
1430 } else {
1431 if (res == 0)
1432 res = -ETIMEDOUT;
1433 else
1434 ; /* leave "res" unchanged */
1435 sock->state = SS_DISCONNECTING;
1436 } 1465 }
1437 1466
1467 if (unlikely(sock->state == SS_DISCONNECTING))
1468 res = sock_error(sk);
1469 else
1470 res = 0;
1471
1438exit: 1472exit:
1439 release_sock(sk); 1473 release_sock(sk);
1440 return res; 1474 return res;
@@ -1475,8 +1509,13 @@ static int listen(struct socket *sock, int len)
1475 */ 1509 */
1476static int accept(struct socket *sock, struct socket *new_sock, int flags) 1510static int accept(struct socket *sock, struct socket *new_sock, int flags)
1477{ 1511{
1478 struct sock *sk = sock->sk; 1512 struct sock *new_sk, *sk = sock->sk;
1479 struct sk_buff *buf; 1513 struct sk_buff *buf;
1514 struct tipc_sock *new_tsock;
1515 struct tipc_port *new_tport;
1516 struct tipc_msg *msg;
1517 u32 new_ref;
1518
1480 int res; 1519 int res;
1481 1520
1482 lock_sock(sk); 1521 lock_sock(sk);
@@ -1502,48 +1541,51 @@ static int accept(struct socket *sock, struct socket *new_sock, int flags)
1502 buf = skb_peek(&sk->sk_receive_queue); 1541 buf = skb_peek(&sk->sk_receive_queue);
1503 1542
1504 res = tipc_create(sock_net(sock->sk), new_sock, 0, 0); 1543 res = tipc_create(sock_net(sock->sk), new_sock, 0, 0);
1505 if (!res) { 1544 if (res)
1506 struct sock *new_sk = new_sock->sk; 1545 goto exit;
1507 struct tipc_sock *new_tsock = tipc_sk(new_sk);
1508 struct tipc_port *new_tport = new_tsock->p;
1509 u32 new_ref = new_tport->ref;
1510 struct tipc_msg *msg = buf_msg(buf);
1511
1512 lock_sock(new_sk);
1513
1514 /*
1515 * Reject any stray messages received by new socket
1516 * before the socket lock was taken (very, very unlikely)
1517 */
1518 reject_rx_queue(new_sk);
1519
1520 /* Connect new socket to it's peer */
1521 new_tsock->peer_name.ref = msg_origport(msg);
1522 new_tsock->peer_name.node = msg_orignode(msg);
1523 tipc_connect2port(new_ref, &new_tsock->peer_name);
1524 new_sock->state = SS_CONNECTED;
1525
1526 tipc_set_portimportance(new_ref, msg_importance(msg));
1527 if (msg_named(msg)) {
1528 new_tport->conn_type = msg_nametype(msg);
1529 new_tport->conn_instance = msg_nameinst(msg);
1530 }
1531 1546
1532 /* 1547 new_sk = new_sock->sk;
1533 * Respond to 'SYN-' by discarding it & returning 'ACK'-. 1548 new_tsock = tipc_sk(new_sk);
1534 * Respond to 'SYN+' by queuing it on new socket. 1549 new_tport = new_tsock->p;
1535 */ 1550 new_ref = new_tport->ref;
1536 if (!msg_data_sz(msg)) { 1551 msg = buf_msg(buf);
1537 struct msghdr m = {NULL,};
1538 1552
1539 advance_rx_queue(sk); 1553 /* we lock on new_sk; but lockdep sees the lock on sk */
1540 send_packet(NULL, new_sock, &m, 0); 1554 lock_sock_nested(new_sk, SINGLE_DEPTH_NESTING);
1541 } else { 1555
1542 __skb_dequeue(&sk->sk_receive_queue); 1556 /*
1543 __skb_queue_head(&new_sk->sk_receive_queue, buf); 1557 * Reject any stray messages received by new socket
1544 } 1558 * before the socket lock was taken (very, very unlikely)
1545 release_sock(new_sk); 1559 */
1560 reject_rx_queue(new_sk);
1561
1562 /* Connect new socket to it's peer */
1563 new_tsock->peer_name.ref = msg_origport(msg);
1564 new_tsock->peer_name.node = msg_orignode(msg);
1565 tipc_connect(new_ref, &new_tsock->peer_name);
1566 new_sock->state = SS_CONNECTED;
1567
1568 tipc_set_portimportance(new_ref, msg_importance(msg));
1569 if (msg_named(msg)) {
1570 new_tport->conn_type = msg_nametype(msg);
1571 new_tport->conn_instance = msg_nameinst(msg);
1546 } 1572 }
1573
1574 /*
1575 * Respond to 'SYN-' by discarding it & returning 'ACK'-.
1576 * Respond to 'SYN+' by queuing it on new socket.
1577 */
1578 if (!msg_data_sz(msg)) {
1579 struct msghdr m = {NULL,};
1580
1581 advance_rx_queue(sk);
1582 send_packet(NULL, new_sock, &m, 0);
1583 } else {
1584 __skb_dequeue(&sk->sk_receive_queue);
1585 __skb_queue_head(&new_sk->sk_receive_queue, buf);
1586 }
1587 release_sock(new_sk);
1588
1547exit: 1589exit:
1548 release_sock(sk); 1590 release_sock(sk);
1549 return res; 1591 return res;
@@ -1578,7 +1620,6 @@ restart:
1578 /* Disconnect and send a 'FIN+' or 'FIN-' message to peer */ 1620 /* Disconnect and send a 'FIN+' or 'FIN-' message to peer */
1579 buf = __skb_dequeue(&sk->sk_receive_queue); 1621 buf = __skb_dequeue(&sk->sk_receive_queue);
1580 if (buf) { 1622 if (buf) {
1581 atomic_dec(&tipc_queue_size);
1582 if (TIPC_SKB_CB(buf)->handle != 0) { 1623 if (TIPC_SKB_CB(buf)->handle != 0) {
1583 kfree_skb(buf); 1624 kfree_skb(buf);
1584 goto restart; 1625 goto restart;
@@ -1717,7 +1758,7 @@ static int getsockopt(struct socket *sock,
1717 /* no need to set "res", since already 0 at this point */ 1758 /* no need to set "res", since already 0 at this point */
1718 break; 1759 break;
1719 case TIPC_NODE_RECVQ_DEPTH: 1760 case TIPC_NODE_RECVQ_DEPTH:
1720 value = (u32)atomic_read(&tipc_queue_size); 1761 value = 0; /* was tipc_queue_size, now obsolete */
1721 break; 1762 break;
1722 case TIPC_SOCK_RECVQ_DEPTH: 1763 case TIPC_SOCK_RECVQ_DEPTH:
1723 value = skb_queue_len(&sk->sk_receive_queue); 1764 value = skb_queue_len(&sk->sk_receive_queue);
diff --git a/net/tipc/subscr.c b/net/tipc/subscr.c
index 0f7d0d007e22..6b42d47029af 100644
--- a/net/tipc/subscr.c
+++ b/net/tipc/subscr.c
@@ -462,7 +462,7 @@ static void subscr_named_msg_event(void *usr_handle,
462 kfree(subscriber); 462 kfree(subscriber);
463 return; 463 return;
464 } 464 }
465 tipc_connect2port(subscriber->port_ref, orig); 465 tipc_connect(subscriber->port_ref, orig);
466 466
467 /* Lock server port (& save lock address for future use) */ 467 /* Lock server port (& save lock address for future use) */
468 subscriber->lock = tipc_port_lock(subscriber->port_ref)->lock; 468 subscriber->lock = tipc_port_lock(subscriber->port_ref)->lock;