aboutsummaryrefslogtreecommitdiffstats
path: root/fs/ocfs2/cluster/tcp.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/ocfs2/cluster/tcp.c')
-rw-r--r--fs/ocfs2/cluster/tcp.c240
1 files changed, 180 insertions, 60 deletions
diff --git a/fs/ocfs2/cluster/tcp.c b/fs/ocfs2/cluster/tcp.c
index b650efa8c8be..ae4ff4a6636b 100644
--- a/fs/ocfs2/cluster/tcp.c
+++ b/fs/ocfs2/cluster/tcp.c
@@ -140,13 +140,35 @@ static int o2net_sys_err_translations[O2NET_ERR_MAX] =
140 [O2NET_ERR_DIED] = -EHOSTDOWN,}; 140 [O2NET_ERR_DIED] = -EHOSTDOWN,};
141 141
142/* can't quite avoid *all* internal declarations :/ */ 142/* can't quite avoid *all* internal declarations :/ */
143static void o2net_sc_connect_completed(void *arg); 143static void o2net_sc_connect_completed(struct work_struct *work);
144static void o2net_rx_until_empty(void *arg); 144static void o2net_rx_until_empty(struct work_struct *work);
145static void o2net_shutdown_sc(void *arg); 145static void o2net_shutdown_sc(struct work_struct *work);
146static void o2net_listen_data_ready(struct sock *sk, int bytes); 146static void o2net_listen_data_ready(struct sock *sk, int bytes);
147static void o2net_sc_send_keep_req(void *arg); 147static void o2net_sc_send_keep_req(struct work_struct *work);
148static void o2net_idle_timer(unsigned long data); 148static void o2net_idle_timer(unsigned long data);
149static void o2net_sc_postpone_idle(struct o2net_sock_container *sc); 149static void o2net_sc_postpone_idle(struct o2net_sock_container *sc);
150static void o2net_sc_reset_idle_timer(struct o2net_sock_container *sc);
151
152/*
153 * FIXME: These should use to_o2nm_cluster_from_node(), but we end up
154 * losing our parent link to the cluster during shutdown. This can be
155 * solved by adding a pre-removal callback to configfs, or passing
156 * around the cluster with the node. -jeffm
157 */
158static inline int o2net_reconnect_delay(struct o2nm_node *node)
159{
160 return o2nm_single_cluster->cl_reconnect_delay_ms;
161}
162
163static inline int o2net_keepalive_delay(struct o2nm_node *node)
164{
165 return o2nm_single_cluster->cl_keepalive_delay_ms;
166}
167
168static inline int o2net_idle_timeout(struct o2nm_node *node)
169{
170 return o2nm_single_cluster->cl_idle_timeout_ms;
171}
150 172
151static inline int o2net_sys_err_to_errno(enum o2net_system_error err) 173static inline int o2net_sys_err_to_errno(enum o2net_system_error err)
152{ 174{
@@ -271,6 +293,8 @@ static void sc_kref_release(struct kref *kref)
271{ 293{
272 struct o2net_sock_container *sc = container_of(kref, 294 struct o2net_sock_container *sc = container_of(kref,
273 struct o2net_sock_container, sc_kref); 295 struct o2net_sock_container, sc_kref);
296 BUG_ON(timer_pending(&sc->sc_idle_timeout));
297
274 sclog(sc, "releasing\n"); 298 sclog(sc, "releasing\n");
275 299
276 if (sc->sc_sock) { 300 if (sc->sc_sock) {
@@ -300,7 +324,7 @@ static struct o2net_sock_container *sc_alloc(struct o2nm_node *node)
300 struct page *page = NULL; 324 struct page *page = NULL;
301 325
302 page = alloc_page(GFP_NOFS); 326 page = alloc_page(GFP_NOFS);
303 sc = kcalloc(1, sizeof(*sc), GFP_NOFS); 327 sc = kzalloc(sizeof(*sc), GFP_NOFS);
304 if (sc == NULL || page == NULL) 328 if (sc == NULL || page == NULL)
305 goto out; 329 goto out;
306 330
@@ -308,10 +332,10 @@ static struct o2net_sock_container *sc_alloc(struct o2nm_node *node)
308 o2nm_node_get(node); 332 o2nm_node_get(node);
309 sc->sc_node = node; 333 sc->sc_node = node;
310 334
311 INIT_WORK(&sc->sc_connect_work, o2net_sc_connect_completed, sc); 335 INIT_WORK(&sc->sc_connect_work, o2net_sc_connect_completed);
312 INIT_WORK(&sc->sc_rx_work, o2net_rx_until_empty, sc); 336 INIT_WORK(&sc->sc_rx_work, o2net_rx_until_empty);
313 INIT_WORK(&sc->sc_shutdown_work, o2net_shutdown_sc, sc); 337 INIT_WORK(&sc->sc_shutdown_work, o2net_shutdown_sc);
314 INIT_WORK(&sc->sc_keepalive_work, o2net_sc_send_keep_req, sc); 338 INIT_DELAYED_WORK(&sc->sc_keepalive_work, o2net_sc_send_keep_req);
315 339
316 init_timer(&sc->sc_idle_timeout); 340 init_timer(&sc->sc_idle_timeout);
317 sc->sc_idle_timeout.function = o2net_idle_timer; 341 sc->sc_idle_timeout.function = o2net_idle_timer;
@@ -342,7 +366,7 @@ static void o2net_sc_queue_work(struct o2net_sock_container *sc,
342 sc_put(sc); 366 sc_put(sc);
343} 367}
344static void o2net_sc_queue_delayed_work(struct o2net_sock_container *sc, 368static void o2net_sc_queue_delayed_work(struct o2net_sock_container *sc,
345 struct work_struct *work, 369 struct delayed_work *work,
346 int delay) 370 int delay)
347{ 371{
348 sc_get(sc); 372 sc_get(sc);
@@ -350,12 +374,19 @@ static void o2net_sc_queue_delayed_work(struct o2net_sock_container *sc,
350 sc_put(sc); 374 sc_put(sc);
351} 375}
352static void o2net_sc_cancel_delayed_work(struct o2net_sock_container *sc, 376static void o2net_sc_cancel_delayed_work(struct o2net_sock_container *sc,
353 struct work_struct *work) 377 struct delayed_work *work)
354{ 378{
355 if (cancel_delayed_work(work)) 379 if (cancel_delayed_work(work))
356 sc_put(sc); 380 sc_put(sc);
357} 381}
358 382
383static atomic_t o2net_connected_peers = ATOMIC_INIT(0);
384
385int o2net_num_connected_peers(void)
386{
387 return atomic_read(&o2net_connected_peers);
388}
389
359static void o2net_set_nn_state(struct o2net_node *nn, 390static void o2net_set_nn_state(struct o2net_node *nn,
360 struct o2net_sock_container *sc, 391 struct o2net_sock_container *sc,
361 unsigned valid, int err) 392 unsigned valid, int err)
@@ -366,6 +397,11 @@ static void o2net_set_nn_state(struct o2net_node *nn,
366 397
367 assert_spin_locked(&nn->nn_lock); 398 assert_spin_locked(&nn->nn_lock);
368 399
400 if (old_sc && !sc)
401 atomic_dec(&o2net_connected_peers);
402 else if (!old_sc && sc)
403 atomic_inc(&o2net_connected_peers);
404
369 /* the node num comparison and single connect/accept path should stop 405 /* the node num comparison and single connect/accept path should stop
370 * an non-null sc from being overwritten with another */ 406 * an non-null sc from being overwritten with another */
371 BUG_ON(sc && nn->nn_sc && nn->nn_sc != sc); 407 BUG_ON(sc && nn->nn_sc && nn->nn_sc != sc);
@@ -424,9 +460,9 @@ static void o2net_set_nn_state(struct o2net_node *nn,
424 /* delay if we're withing a RECONNECT_DELAY of the 460 /* delay if we're withing a RECONNECT_DELAY of the
425 * last attempt */ 461 * last attempt */
426 delay = (nn->nn_last_connect_attempt + 462 delay = (nn->nn_last_connect_attempt +
427 msecs_to_jiffies(O2NET_RECONNECT_DELAY_MS)) 463 msecs_to_jiffies(o2net_reconnect_delay(sc->sc_node)))
428 - jiffies; 464 - jiffies;
429 if (delay > msecs_to_jiffies(O2NET_RECONNECT_DELAY_MS)) 465 if (delay > msecs_to_jiffies(o2net_reconnect_delay(sc->sc_node)))
430 delay = 0; 466 delay = 0;
431 mlog(ML_CONN, "queueing conn attempt in %lu jiffies\n", delay); 467 mlog(ML_CONN, "queueing conn attempt in %lu jiffies\n", delay);
432 queue_delayed_work(o2net_wq, &nn->nn_connect_work, delay); 468 queue_delayed_work(o2net_wq, &nn->nn_connect_work, delay);
@@ -564,9 +600,11 @@ static void o2net_ensure_shutdown(struct o2net_node *nn,
564 * ourselves as state_change couldn't get the nn_lock and call set_nn_state 600 * ourselves as state_change couldn't get the nn_lock and call set_nn_state
565 * itself. 601 * itself.
566 */ 602 */
567static void o2net_shutdown_sc(void *arg) 603static void o2net_shutdown_sc(struct work_struct *work)
568{ 604{
569 struct o2net_sock_container *sc = arg; 605 struct o2net_sock_container *sc =
606 container_of(work, struct o2net_sock_container,
607 sc_shutdown_work);
570 struct o2net_node *nn = o2net_nn_from_num(sc->sc_node->nd_num); 608 struct o2net_node *nn = o2net_nn_from_num(sc->sc_node->nd_num);
571 609
572 sclog(sc, "shutting down\n"); 610 sclog(sc, "shutting down\n");
@@ -676,7 +714,7 @@ int o2net_register_handler(u32 msg_type, u32 key, u32 max_len,
676 goto out; 714 goto out;
677 } 715 }
678 716
679 nmh = kcalloc(1, sizeof(struct o2net_msg_handler), GFP_NOFS); 717 nmh = kzalloc(sizeof(struct o2net_msg_handler), GFP_NOFS);
680 if (nmh == NULL) { 718 if (nmh == NULL) {
681 ret = -ENOMEM; 719 ret = -ENOMEM;
682 goto out; 720 goto out;
@@ -1097,13 +1135,51 @@ static int o2net_check_handshake(struct o2net_sock_container *sc)
1097 return -1; 1135 return -1;
1098 } 1136 }
1099 1137
1138 /*
1139 * Ensure timeouts are consistent with other nodes, otherwise
1140 * we can end up with one node thinking that the other must be down,
1141 * but isn't. This can ultimately cause corruption.
1142 */
1143 if (be32_to_cpu(hand->o2net_idle_timeout_ms) !=
1144 o2net_idle_timeout(sc->sc_node)) {
1145 mlog(ML_NOTICE, SC_NODEF_FMT " uses a network idle timeout of "
1146 "%u ms, but we use %u ms locally. disconnecting\n",
1147 SC_NODEF_ARGS(sc),
1148 be32_to_cpu(hand->o2net_idle_timeout_ms),
1149 o2net_idle_timeout(sc->sc_node));
1150 o2net_ensure_shutdown(nn, sc, -ENOTCONN);
1151 return -1;
1152 }
1153
1154 if (be32_to_cpu(hand->o2net_keepalive_delay_ms) !=
1155 o2net_keepalive_delay(sc->sc_node)) {
1156 mlog(ML_NOTICE, SC_NODEF_FMT " uses a keepalive delay of "
1157 "%u ms, but we use %u ms locally. disconnecting\n",
1158 SC_NODEF_ARGS(sc),
1159 be32_to_cpu(hand->o2net_keepalive_delay_ms),
1160 o2net_keepalive_delay(sc->sc_node));
1161 o2net_ensure_shutdown(nn, sc, -ENOTCONN);
1162 return -1;
1163 }
1164
1165 if (be32_to_cpu(hand->o2hb_heartbeat_timeout_ms) !=
1166 O2HB_MAX_WRITE_TIMEOUT_MS) {
1167 mlog(ML_NOTICE, SC_NODEF_FMT " uses a heartbeat timeout of "
1168 "%u ms, but we use %u ms locally. disconnecting\n",
1169 SC_NODEF_ARGS(sc),
1170 be32_to_cpu(hand->o2hb_heartbeat_timeout_ms),
1171 O2HB_MAX_WRITE_TIMEOUT_MS);
1172 o2net_ensure_shutdown(nn, sc, -ENOTCONN);
1173 return -1;
1174 }
1175
1100 sc->sc_handshake_ok = 1; 1176 sc->sc_handshake_ok = 1;
1101 1177
1102 spin_lock(&nn->nn_lock); 1178 spin_lock(&nn->nn_lock);
1103 /* set valid and queue the idle timers only if it hasn't been 1179 /* set valid and queue the idle timers only if it hasn't been
1104 * shut down already */ 1180 * shut down already */
1105 if (nn->nn_sc == sc) { 1181 if (nn->nn_sc == sc) {
1106 o2net_sc_postpone_idle(sc); 1182 o2net_sc_reset_idle_timer(sc);
1107 o2net_set_nn_state(nn, sc, 1, 0); 1183 o2net_set_nn_state(nn, sc, 1, 0);
1108 } 1184 }
1109 spin_unlock(&nn->nn_lock); 1185 spin_unlock(&nn->nn_lock);
@@ -1129,6 +1205,23 @@ static int o2net_advance_rx(struct o2net_sock_container *sc)
1129 sclog(sc, "receiving\n"); 1205 sclog(sc, "receiving\n");
1130 do_gettimeofday(&sc->sc_tv_advance_start); 1206 do_gettimeofday(&sc->sc_tv_advance_start);
1131 1207
1208 if (unlikely(sc->sc_handshake_ok == 0)) {
1209 if(sc->sc_page_off < sizeof(struct o2net_handshake)) {
1210 data = page_address(sc->sc_page) + sc->sc_page_off;
1211 datalen = sizeof(struct o2net_handshake) - sc->sc_page_off;
1212 ret = o2net_recv_tcp_msg(sc->sc_sock, data, datalen);
1213 if (ret > 0)
1214 sc->sc_page_off += ret;
1215 }
1216
1217 if (sc->sc_page_off == sizeof(struct o2net_handshake)) {
1218 o2net_check_handshake(sc);
1219 if (unlikely(sc->sc_handshake_ok == 0))
1220 ret = -EPROTO;
1221 }
1222 goto out;
1223 }
1224
1132 /* do we need more header? */ 1225 /* do we need more header? */
1133 if (sc->sc_page_off < sizeof(struct o2net_msg)) { 1226 if (sc->sc_page_off < sizeof(struct o2net_msg)) {
1134 data = page_address(sc->sc_page) + sc->sc_page_off; 1227 data = page_address(sc->sc_page) + sc->sc_page_off;
@@ -1136,15 +1229,6 @@ static int o2net_advance_rx(struct o2net_sock_container *sc)
1136 ret = o2net_recv_tcp_msg(sc->sc_sock, data, datalen); 1229 ret = o2net_recv_tcp_msg(sc->sc_sock, data, datalen);
1137 if (ret > 0) { 1230 if (ret > 0) {
1138 sc->sc_page_off += ret; 1231 sc->sc_page_off += ret;
1139
1140 /* this working relies on the handshake being
1141 * smaller than the normal message header */
1142 if (sc->sc_page_off >= sizeof(struct o2net_handshake)&&
1143 !sc->sc_handshake_ok && o2net_check_handshake(sc)) {
1144 ret = -EPROTO;
1145 goto out;
1146 }
1147
1148 /* only swab incoming here.. we can 1232 /* only swab incoming here.. we can
1149 * only get here once as we cross from 1233 * only get here once as we cross from
1150 * being under to over */ 1234 * being under to over */
@@ -1201,9 +1285,10 @@ out:
1201/* this work func is triggerd by data ready. it reads until it can read no 1285/* this work func is triggerd by data ready. it reads until it can read no
1202 * more. it interprets 0, eof, as fatal. if data_ready hits while we're doing 1286 * more. it interprets 0, eof, as fatal. if data_ready hits while we're doing
1203 * our work the work struct will be marked and we'll be called again. */ 1287 * our work the work struct will be marked and we'll be called again. */
1204static void o2net_rx_until_empty(void *arg) 1288static void o2net_rx_until_empty(struct work_struct *work)
1205{ 1289{
1206 struct o2net_sock_container *sc = arg; 1290 struct o2net_sock_container *sc =
1291 container_of(work, struct o2net_sock_container, sc_rx_work);
1207 int ret; 1292 int ret;
1208 1293
1209 do { 1294 do {
@@ -1245,26 +1330,43 @@ static int o2net_set_nodelay(struct socket *sock)
1245 return ret; 1330 return ret;
1246} 1331}
1247 1332
1333static void o2net_initialize_handshake(void)
1334{
1335 o2net_hand->o2hb_heartbeat_timeout_ms = cpu_to_be32(
1336 O2HB_MAX_WRITE_TIMEOUT_MS);
1337 o2net_hand->o2net_idle_timeout_ms = cpu_to_be32(
1338 o2net_idle_timeout(NULL));
1339 o2net_hand->o2net_keepalive_delay_ms = cpu_to_be32(
1340 o2net_keepalive_delay(NULL));
1341 o2net_hand->o2net_reconnect_delay_ms = cpu_to_be32(
1342 o2net_reconnect_delay(NULL));
1343}
1344
1248/* ------------------------------------------------------------ */ 1345/* ------------------------------------------------------------ */
1249 1346
1250/* called when a connect completes and after a sock is accepted. the 1347/* called when a connect completes and after a sock is accepted. the
1251 * rx path will see the response and mark the sc valid */ 1348 * rx path will see the response and mark the sc valid */
1252static void o2net_sc_connect_completed(void *arg) 1349static void o2net_sc_connect_completed(struct work_struct *work)
1253{ 1350{
1254 struct o2net_sock_container *sc = arg; 1351 struct o2net_sock_container *sc =
1352 container_of(work, struct o2net_sock_container,
1353 sc_connect_work);
1255 1354
1256 mlog(ML_MSG, "sc sending handshake with ver %llu id %llx\n", 1355 mlog(ML_MSG, "sc sending handshake with ver %llu id %llx\n",
1257 (unsigned long long)O2NET_PROTOCOL_VERSION, 1356 (unsigned long long)O2NET_PROTOCOL_VERSION,
1258 (unsigned long long)be64_to_cpu(o2net_hand->connector_id)); 1357 (unsigned long long)be64_to_cpu(o2net_hand->connector_id));
1259 1358
1359 o2net_initialize_handshake();
1260 o2net_sendpage(sc, o2net_hand, sizeof(*o2net_hand)); 1360 o2net_sendpage(sc, o2net_hand, sizeof(*o2net_hand));
1261 sc_put(sc); 1361 sc_put(sc);
1262} 1362}
1263 1363
1264/* this is called as a work_struct func. */ 1364/* this is called as a work_struct func. */
1265static void o2net_sc_send_keep_req(void *arg) 1365static void o2net_sc_send_keep_req(struct work_struct *work)
1266{ 1366{
1267 struct o2net_sock_container *sc = arg; 1367 struct o2net_sock_container *sc =
1368 container_of(work, struct o2net_sock_container,
1369 sc_keepalive_work.work);
1268 1370
1269 o2net_sendpage(sc, o2net_keep_req, sizeof(*o2net_keep_req)); 1371 o2net_sendpage(sc, o2net_keep_req, sizeof(*o2net_keep_req));
1270 sc_put(sc); 1372 sc_put(sc);
@@ -1280,8 +1382,10 @@ static void o2net_idle_timer(unsigned long data)
1280 1382
1281 do_gettimeofday(&now); 1383 do_gettimeofday(&now);
1282 1384
1283 printk(KERN_INFO "o2net: connection to " SC_NODEF_FMT " has been idle for 10 " 1385 printk(KERN_INFO "o2net: connection to " SC_NODEF_FMT " has been idle for %u.%u "
1284 "seconds, shutting it down.\n", SC_NODEF_ARGS(sc)); 1386 "seconds, shutting it down.\n", SC_NODEF_ARGS(sc),
1387 o2net_idle_timeout(sc->sc_node) / 1000,
1388 o2net_idle_timeout(sc->sc_node) % 1000);
1285 mlog(ML_NOTICE, "here are some times that might help debug the " 1389 mlog(ML_NOTICE, "here are some times that might help debug the "
1286 "situation: (tmr %ld.%ld now %ld.%ld dr %ld.%ld adv " 1390 "situation: (tmr %ld.%ld now %ld.%ld dr %ld.%ld adv "
1287 "%ld.%ld:%ld.%ld func (%08x:%u) %ld.%ld:%ld.%ld)\n", 1391 "%ld.%ld:%ld.%ld func (%08x:%u) %ld.%ld:%ld.%ld)\n",
@@ -1299,14 +1403,21 @@ static void o2net_idle_timer(unsigned long data)
1299 o2net_sc_queue_work(sc, &sc->sc_shutdown_work); 1403 o2net_sc_queue_work(sc, &sc->sc_shutdown_work);
1300} 1404}
1301 1405
1302static void o2net_sc_postpone_idle(struct o2net_sock_container *sc) 1406static void o2net_sc_reset_idle_timer(struct o2net_sock_container *sc)
1303{ 1407{
1304 o2net_sc_cancel_delayed_work(sc, &sc->sc_keepalive_work); 1408 o2net_sc_cancel_delayed_work(sc, &sc->sc_keepalive_work);
1305 o2net_sc_queue_delayed_work(sc, &sc->sc_keepalive_work, 1409 o2net_sc_queue_delayed_work(sc, &sc->sc_keepalive_work,
1306 O2NET_KEEPALIVE_DELAY_SECS * HZ); 1410 msecs_to_jiffies(o2net_keepalive_delay(sc->sc_node)));
1307 do_gettimeofday(&sc->sc_tv_timer); 1411 do_gettimeofday(&sc->sc_tv_timer);
1308 mod_timer(&sc->sc_idle_timeout, 1412 mod_timer(&sc->sc_idle_timeout,
1309 jiffies + (O2NET_IDLE_TIMEOUT_SECS * HZ)); 1413 jiffies + msecs_to_jiffies(o2net_idle_timeout(sc->sc_node)));
1414}
1415
1416static void o2net_sc_postpone_idle(struct o2net_sock_container *sc)
1417{
1418 /* Only push out an existing timer */
1419 if (timer_pending(&sc->sc_idle_timeout))
1420 o2net_sc_reset_idle_timer(sc);
1310} 1421}
1311 1422
1312/* this work func is kicked whenever a path sets the nn state which doesn't 1423/* this work func is kicked whenever a path sets the nn state which doesn't
@@ -1314,14 +1425,15 @@ static void o2net_sc_postpone_idle(struct o2net_sock_container *sc)
1314 * having a connect attempt fail, etc. This centralizes the logic which decides 1425 * having a connect attempt fail, etc. This centralizes the logic which decides
1315 * if a connect attempt should be made or if we should give up and all future 1426 * if a connect attempt should be made or if we should give up and all future
1316 * transmit attempts should fail */ 1427 * transmit attempts should fail */
1317static void o2net_start_connect(void *arg) 1428static void o2net_start_connect(struct work_struct *work)
1318{ 1429{
1319 struct o2net_node *nn = arg; 1430 struct o2net_node *nn =
1431 container_of(work, struct o2net_node, nn_connect_work.work);
1320 struct o2net_sock_container *sc = NULL; 1432 struct o2net_sock_container *sc = NULL;
1321 struct o2nm_node *node = NULL, *mynode = NULL; 1433 struct o2nm_node *node = NULL, *mynode = NULL;
1322 struct socket *sock = NULL; 1434 struct socket *sock = NULL;
1323 struct sockaddr_in myaddr = {0, }, remoteaddr = {0, }; 1435 struct sockaddr_in myaddr = {0, }, remoteaddr = {0, };
1324 int ret = 0; 1436 int ret = 0, stop;
1325 1437
1326 /* if we're greater we initiate tx, otherwise we accept */ 1438 /* if we're greater we initiate tx, otherwise we accept */
1327 if (o2nm_this_node() <= o2net_num_from_nn(nn)) 1439 if (o2nm_this_node() <= o2net_num_from_nn(nn))
@@ -1342,10 +1454,9 @@ static void o2net_start_connect(void *arg)
1342 1454
1343 spin_lock(&nn->nn_lock); 1455 spin_lock(&nn->nn_lock);
1344 /* see if we already have one pending or have given up */ 1456 /* see if we already have one pending or have given up */
1345 if (nn->nn_sc || nn->nn_persistent_error) 1457 stop = (nn->nn_sc || nn->nn_persistent_error);
1346 arg = NULL;
1347 spin_unlock(&nn->nn_lock); 1458 spin_unlock(&nn->nn_lock);
1348 if (arg == NULL) /* *shrug*, needed some indicator */ 1459 if (stop)
1349 goto out; 1460 goto out;
1350 1461
1351 nn->nn_last_connect_attempt = jiffies; 1462 nn->nn_last_connect_attempt = jiffies;
@@ -1421,24 +1532,29 @@ out:
1421 return; 1532 return;
1422} 1533}
1423 1534
1424static void o2net_connect_expired(void *arg) 1535static void o2net_connect_expired(struct work_struct *work)
1425{ 1536{
1426 struct o2net_node *nn = arg; 1537 struct o2net_node *nn =
1538 container_of(work, struct o2net_node, nn_connect_expired.work);
1427 1539
1428 spin_lock(&nn->nn_lock); 1540 spin_lock(&nn->nn_lock);
1429 if (!nn->nn_sc_valid) { 1541 if (!nn->nn_sc_valid) {
1542 struct o2nm_node *node = nn->nn_sc->sc_node;
1430 mlog(ML_ERROR, "no connection established with node %u after " 1543 mlog(ML_ERROR, "no connection established with node %u after "
1431 "%u seconds, giving up and returning errors.\n", 1544 "%u.%u seconds, giving up and returning errors.\n",
1432 o2net_num_from_nn(nn), O2NET_IDLE_TIMEOUT_SECS); 1545 o2net_num_from_nn(nn),
1546 o2net_idle_timeout(node) / 1000,
1547 o2net_idle_timeout(node) % 1000);
1433 1548
1434 o2net_set_nn_state(nn, NULL, 0, -ENOTCONN); 1549 o2net_set_nn_state(nn, NULL, 0, -ENOTCONN);
1435 } 1550 }
1436 spin_unlock(&nn->nn_lock); 1551 spin_unlock(&nn->nn_lock);
1437} 1552}
1438 1553
1439static void o2net_still_up(void *arg) 1554static void o2net_still_up(struct work_struct *work)
1440{ 1555{
1441 struct o2net_node *nn = arg; 1556 struct o2net_node *nn =
1557 container_of(work, struct o2net_node, nn_still_up.work);
1442 1558
1443 o2quo_hb_still_up(o2net_num_from_nn(nn)); 1559 o2quo_hb_still_up(o2net_num_from_nn(nn));
1444} 1560}
@@ -1469,6 +1585,8 @@ static void o2net_hb_node_down_cb(struct o2nm_node *node, int node_num,
1469 1585
1470 if (node_num != o2nm_this_node()) 1586 if (node_num != o2nm_this_node())
1471 o2net_disconnect_node(node); 1587 o2net_disconnect_node(node);
1588
1589 BUG_ON(atomic_read(&o2net_connected_peers) < 0);
1472} 1590}
1473 1591
1474static void o2net_hb_node_up_cb(struct o2nm_node *node, int node_num, 1592static void o2net_hb_node_up_cb(struct o2nm_node *node, int node_num,
@@ -1480,14 +1598,14 @@ static void o2net_hb_node_up_cb(struct o2nm_node *node, int node_num,
1480 1598
1481 /* ensure an immediate connect attempt */ 1599 /* ensure an immediate connect attempt */
1482 nn->nn_last_connect_attempt = jiffies - 1600 nn->nn_last_connect_attempt = jiffies -
1483 (msecs_to_jiffies(O2NET_RECONNECT_DELAY_MS) + 1); 1601 (msecs_to_jiffies(o2net_reconnect_delay(node)) + 1);
1484 1602
1485 if (node_num != o2nm_this_node()) { 1603 if (node_num != o2nm_this_node()) {
1486 /* heartbeat doesn't work unless a local node number is 1604 /* heartbeat doesn't work unless a local node number is
1487 * configured and doing so brings up the o2net_wq, so we can 1605 * configured and doing so brings up the o2net_wq, so we can
1488 * use it.. */ 1606 * use it.. */
1489 queue_delayed_work(o2net_wq, &nn->nn_connect_expired, 1607 queue_delayed_work(o2net_wq, &nn->nn_connect_expired,
1490 O2NET_IDLE_TIMEOUT_SECS * HZ); 1608 msecs_to_jiffies(o2net_idle_timeout(node)));
1491 1609
1492 /* believe it or not, accept and node hearbeating testing 1610 /* believe it or not, accept and node hearbeating testing
1493 * can succeed for this node before we got here.. so 1611 * can succeed for this node before we got here.. so
@@ -1632,6 +1750,7 @@ static int o2net_accept_one(struct socket *sock)
1632 o2net_register_callbacks(sc->sc_sock->sk, sc); 1750 o2net_register_callbacks(sc->sc_sock->sk, sc);
1633 o2net_sc_queue_work(sc, &sc->sc_rx_work); 1751 o2net_sc_queue_work(sc, &sc->sc_rx_work);
1634 1752
1753 o2net_initialize_handshake();
1635 o2net_sendpage(sc, o2net_hand, sizeof(*o2net_hand)); 1754 o2net_sendpage(sc, o2net_hand, sizeof(*o2net_hand));
1636 1755
1637out: 1756out:
@@ -1644,9 +1763,9 @@ out:
1644 return ret; 1763 return ret;
1645} 1764}
1646 1765
1647static void o2net_accept_many(void *arg) 1766static void o2net_accept_many(struct work_struct *work)
1648{ 1767{
1649 struct socket *sock = arg; 1768 struct socket *sock = o2net_listen_sock;
1650 while (o2net_accept_one(sock) == 0) 1769 while (o2net_accept_one(sock) == 0)
1651 cond_resched(); 1770 cond_resched();
1652} 1771}
@@ -1700,7 +1819,7 @@ static int o2net_open_listening_sock(__be16 port)
1700 write_unlock_bh(&sock->sk->sk_callback_lock); 1819 write_unlock_bh(&sock->sk->sk_callback_lock);
1701 1820
1702 o2net_listen_sock = sock; 1821 o2net_listen_sock = sock;
1703 INIT_WORK(&o2net_listen_work, o2net_accept_many, sock); 1822 INIT_WORK(&o2net_listen_work, o2net_accept_many);
1704 1823
1705 sock->sk->sk_reuse = 1; 1824 sock->sk->sk_reuse = 1;
1706 ret = sock->ops->bind(sock, (struct sockaddr *)&sin, sizeof(sin)); 1825 ret = sock->ops->bind(sock, (struct sockaddr *)&sin, sizeof(sin));
@@ -1799,9 +1918,9 @@ int o2net_init(void)
1799 1918
1800 o2quo_init(); 1919 o2quo_init();
1801 1920
1802 o2net_hand = kcalloc(1, sizeof(struct o2net_handshake), GFP_KERNEL); 1921 o2net_hand = kzalloc(sizeof(struct o2net_handshake), GFP_KERNEL);
1803 o2net_keep_req = kcalloc(1, sizeof(struct o2net_msg), GFP_KERNEL); 1922 o2net_keep_req = kzalloc(sizeof(struct o2net_msg), GFP_KERNEL);
1804 o2net_keep_resp = kcalloc(1, sizeof(struct o2net_msg), GFP_KERNEL); 1923 o2net_keep_resp = kzalloc(sizeof(struct o2net_msg), GFP_KERNEL);
1805 if (!o2net_hand || !o2net_keep_req || !o2net_keep_resp) { 1924 if (!o2net_hand || !o2net_keep_req || !o2net_keep_resp) {
1806 kfree(o2net_hand); 1925 kfree(o2net_hand);
1807 kfree(o2net_keep_req); 1926 kfree(o2net_keep_req);
@@ -1819,9 +1938,10 @@ int o2net_init(void)
1819 struct o2net_node *nn = o2net_nn_from_num(i); 1938 struct o2net_node *nn = o2net_nn_from_num(i);
1820 1939
1821 spin_lock_init(&nn->nn_lock); 1940 spin_lock_init(&nn->nn_lock);
1822 INIT_WORK(&nn->nn_connect_work, o2net_start_connect, nn); 1941 INIT_DELAYED_WORK(&nn->nn_connect_work, o2net_start_connect);
1823 INIT_WORK(&nn->nn_connect_expired, o2net_connect_expired, nn); 1942 INIT_DELAYED_WORK(&nn->nn_connect_expired,
1824 INIT_WORK(&nn->nn_still_up, o2net_still_up, nn); 1943 o2net_connect_expired);
1944 INIT_DELAYED_WORK(&nn->nn_still_up, o2net_still_up);
1825 /* until we see hb from a node we'll return einval */ 1945 /* until we see hb from a node we'll return einval */
1826 nn->nn_persistent_error = -ENOTCONN; 1946 nn->nn_persistent_error = -ENOTCONN;
1827 init_waitqueue_head(&nn->nn_sc_wq); 1947 init_waitqueue_head(&nn->nn_sc_wq);