diff options
author | Sage Weil <sage@inktank.com> | 2012-07-20 20:24:40 -0400 |
---|---|---|
committer | Sage Weil <sage@inktank.com> | 2012-07-30 21:16:00 -0400 |
commit | 8dacc7da69a491c515851e68de6036f21b5663ce (patch) | |
tree | 72e96abb5bc27d2a185cdea2f08a090302f0ede3 /net/ceph/messenger.c | |
parent | d7353dd5aaf22ed611fbcd0d4a4a12fb30659290 (diff) |
libceph: replace connection state bits with states
Use a simple set of 6 enumerated values for the socket states (CON_STATE_*)
and use those instead of the state bits. All of the con->state checks are
now under the protection of the con mutex, so this is safe. It also
simplifies many of the state checks because we can check for anything other
than the expected state instead of various bits for races we can think of.
This appears to hold up well to stress testing both with and without socket
failure injection on the server side.
Signed-off-by: Sage Weil <sage@inktank.com>
Diffstat (limited to 'net/ceph/messenger.c')
-rw-r--r-- | net/ceph/messenger.c | 130 |
1 files changed, 68 insertions, 62 deletions
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index e7320cd5fdbc..563e46aa4d6d 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c | |||
@@ -77,6 +77,17 @@ | |||
77 | #define CON_SOCK_STATE_CONNECTED 3 /* -> CLOSING or -> CLOSED */ | 77 | #define CON_SOCK_STATE_CONNECTED 3 /* -> CLOSING or -> CLOSED */ |
78 | #define CON_SOCK_STATE_CLOSING 4 /* -> CLOSED */ | 78 | #define CON_SOCK_STATE_CLOSING 4 /* -> CLOSED */ |
79 | 79 | ||
80 | /* | ||
81 | * connection states | ||
82 | */ | ||
83 | #define CON_STATE_CLOSED 1 /* -> PREOPEN */ | ||
84 | #define CON_STATE_PREOPEN 2 /* -> CONNECTING, CLOSED */ | ||
85 | #define CON_STATE_CONNECTING 3 /* -> NEGOTIATING, CLOSED */ | ||
86 | #define CON_STATE_NEGOTIATING 4 /* -> OPEN, CLOSED */ | ||
87 | #define CON_STATE_OPEN 5 /* -> STANDBY, CLOSED */ | ||
88 | #define CON_STATE_STANDBY 6 /* -> PREOPEN, CLOSED */ | ||
89 | |||
90 | |||
80 | /* static tag bytes (protocol control messages) */ | 91 | /* static tag bytes (protocol control messages) */ |
81 | static char tag_msg = CEPH_MSGR_TAG_MSG; | 92 | static char tag_msg = CEPH_MSGR_TAG_MSG; |
82 | static char tag_ack = CEPH_MSGR_TAG_ACK; | 93 | static char tag_ack = CEPH_MSGR_TAG_ACK; |
@@ -503,11 +514,7 @@ void ceph_con_close(struct ceph_connection *con) | |||
503 | mutex_lock(&con->mutex); | 514 | mutex_lock(&con->mutex); |
504 | dout("con_close %p peer %s\n", con, | 515 | dout("con_close %p peer %s\n", con, |
505 | ceph_pr_addr(&con->peer_addr.in_addr)); | 516 | ceph_pr_addr(&con->peer_addr.in_addr)); |
506 | clear_bit(NEGOTIATING, &con->state); | 517 | con->state = CON_STATE_CLOSED; |
507 | clear_bit(CONNECTING, &con->state); | ||
508 | clear_bit(CONNECTED, &con->state); | ||
509 | clear_bit(STANDBY, &con->state); /* avoid connect_seq bump */ | ||
510 | set_bit(CLOSED, &con->state); | ||
511 | 518 | ||
512 | clear_bit(LOSSYTX, &con->flags); /* so we retry next connect */ | 519 | clear_bit(LOSSYTX, &con->flags); /* so we retry next connect */ |
513 | clear_bit(KEEPALIVE_PENDING, &con->flags); | 520 | clear_bit(KEEPALIVE_PENDING, &con->flags); |
@@ -530,8 +537,9 @@ void ceph_con_open(struct ceph_connection *con, | |||
530 | { | 537 | { |
531 | mutex_lock(&con->mutex); | 538 | mutex_lock(&con->mutex); |
532 | dout("con_open %p %s\n", con, ceph_pr_addr(&addr->in_addr)); | 539 | dout("con_open %p %s\n", con, ceph_pr_addr(&addr->in_addr)); |
533 | set_bit(OPENING, &con->state); | 540 | |
534 | WARN_ON(!test_and_clear_bit(CLOSED, &con->state)); | 541 | BUG_ON(con->state != CON_STATE_CLOSED); |
542 | con->state = CON_STATE_PREOPEN; | ||
535 | 543 | ||
536 | con->peer_name.type = (__u8) entity_type; | 544 | con->peer_name.type = (__u8) entity_type; |
537 | con->peer_name.num = cpu_to_le64(entity_num); | 545 | con->peer_name.num = cpu_to_le64(entity_num); |
@@ -571,7 +579,7 @@ void ceph_con_init(struct ceph_connection *con, void *private, | |||
571 | INIT_LIST_HEAD(&con->out_sent); | 579 | INIT_LIST_HEAD(&con->out_sent); |
572 | INIT_DELAYED_WORK(&con->work, con_work); | 580 | INIT_DELAYED_WORK(&con->work, con_work); |
573 | 581 | ||
574 | set_bit(CLOSED, &con->state); | 582 | con->state = CON_STATE_CLOSED; |
575 | } | 583 | } |
576 | EXPORT_SYMBOL(ceph_con_init); | 584 | EXPORT_SYMBOL(ceph_con_init); |
577 | 585 | ||
@@ -809,27 +817,21 @@ static struct ceph_auth_handshake *get_connect_authorizer(struct ceph_connection | |||
809 | if (!con->ops->get_authorizer) { | 817 | if (!con->ops->get_authorizer) { |
810 | con->out_connect.authorizer_protocol = CEPH_AUTH_UNKNOWN; | 818 | con->out_connect.authorizer_protocol = CEPH_AUTH_UNKNOWN; |
811 | con->out_connect.authorizer_len = 0; | 819 | con->out_connect.authorizer_len = 0; |
812 | |||
813 | return NULL; | 820 | return NULL; |
814 | } | 821 | } |
815 | 822 | ||
816 | /* Can't hold the mutex while getting authorizer */ | 823 | /* Can't hold the mutex while getting authorizer */ |
817 | |||
818 | mutex_unlock(&con->mutex); | 824 | mutex_unlock(&con->mutex); |
819 | |||
820 | auth = con->ops->get_authorizer(con, auth_proto, con->auth_retry); | 825 | auth = con->ops->get_authorizer(con, auth_proto, con->auth_retry); |
821 | |||
822 | mutex_lock(&con->mutex); | 826 | mutex_lock(&con->mutex); |
823 | 827 | ||
824 | if (IS_ERR(auth)) | 828 | if (IS_ERR(auth)) |
825 | return auth; | 829 | return auth; |
826 | if (test_bit(CLOSED, &con->state) || test_bit(OPENING, &con->flags)) | 830 | if (con->state != CON_STATE_NEGOTIATING) |
827 | return ERR_PTR(-EAGAIN); | 831 | return ERR_PTR(-EAGAIN); |
828 | 832 | ||
829 | con->auth_reply_buf = auth->authorizer_reply_buf; | 833 | con->auth_reply_buf = auth->authorizer_reply_buf; |
830 | con->auth_reply_buf_len = auth->authorizer_reply_buf_len; | 834 | con->auth_reply_buf_len = auth->authorizer_reply_buf_len; |
831 | |||
832 | |||
833 | return auth; | 835 | return auth; |
834 | } | 836 | } |
835 | 837 | ||
@@ -1484,7 +1486,8 @@ static int process_banner(struct ceph_connection *con) | |||
1484 | static void fail_protocol(struct ceph_connection *con) | 1486 | static void fail_protocol(struct ceph_connection *con) |
1485 | { | 1487 | { |
1486 | reset_connection(con); | 1488 | reset_connection(con); |
1487 | set_bit(CLOSED, &con->state); /* in case there's queued work */ | 1489 | BUG_ON(con->state != CON_STATE_NEGOTIATING); |
1490 | con->state = CON_STATE_CLOSED; | ||
1488 | } | 1491 | } |
1489 | 1492 | ||
1490 | static int process_connect(struct ceph_connection *con) | 1493 | static int process_connect(struct ceph_connection *con) |
@@ -1558,8 +1561,7 @@ static int process_connect(struct ceph_connection *con) | |||
1558 | if (con->ops->peer_reset) | 1561 | if (con->ops->peer_reset) |
1559 | con->ops->peer_reset(con); | 1562 | con->ops->peer_reset(con); |
1560 | mutex_lock(&con->mutex); | 1563 | mutex_lock(&con->mutex); |
1561 | if (test_bit(CLOSED, &con->state) || | 1564 | if (con->state != CON_STATE_NEGOTIATING) |
1562 | test_bit(OPENING, &con->state)) | ||
1563 | return -EAGAIN; | 1565 | return -EAGAIN; |
1564 | break; | 1566 | break; |
1565 | 1567 | ||
@@ -1605,8 +1607,10 @@ static int process_connect(struct ceph_connection *con) | |||
1605 | fail_protocol(con); | 1607 | fail_protocol(con); |
1606 | return -1; | 1608 | return -1; |
1607 | } | 1609 | } |
1608 | clear_bit(NEGOTIATING, &con->state); | 1610 | |
1609 | set_bit(CONNECTED, &con->state); | 1611 | BUG_ON(con->state != CON_STATE_NEGOTIATING); |
1612 | con->state = CON_STATE_OPEN; | ||
1613 | |||
1610 | con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq); | 1614 | con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq); |
1611 | con->connect_seq++; | 1615 | con->connect_seq++; |
1612 | con->peer_features = server_feat; | 1616 | con->peer_features = server_feat; |
@@ -1994,8 +1998,9 @@ more: | |||
1994 | dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes); | 1998 | dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes); |
1995 | 1999 | ||
1996 | /* open the socket first? */ | 2000 | /* open the socket first? */ |
1997 | if (con->sock == NULL) { | 2001 | if (con->state == CON_STATE_PREOPEN) { |
1998 | set_bit(CONNECTING, &con->state); | 2002 | BUG_ON(con->sock); |
2003 | con->state = CON_STATE_CONNECTING; | ||
1999 | 2004 | ||
2000 | con_out_kvec_reset(con); | 2005 | con_out_kvec_reset(con); |
2001 | prepare_write_banner(con); | 2006 | prepare_write_banner(con); |
@@ -2046,8 +2051,7 @@ more_kvec: | |||
2046 | } | 2051 | } |
2047 | 2052 | ||
2048 | do_next: | 2053 | do_next: |
2049 | if (!test_bit(CONNECTING, &con->state) && | 2054 | if (con->state == CON_STATE_OPEN) { |
2050 | !test_bit(NEGOTIATING, &con->state)) { | ||
2051 | /* is anything else pending? */ | 2055 | /* is anything else pending? */ |
2052 | if (!list_empty(&con->out_queue)) { | 2056 | if (!list_empty(&con->out_queue)) { |
2053 | prepare_write_message(con); | 2057 | prepare_write_message(con); |
@@ -2081,29 +2085,19 @@ static int try_read(struct ceph_connection *con) | |||
2081 | { | 2085 | { |
2082 | int ret = -1; | 2086 | int ret = -1; |
2083 | 2087 | ||
2084 | if (!con->sock) | 2088 | more: |
2085 | return 0; | 2089 | dout("try_read start on %p state %lu\n", con, con->state); |
2086 | 2090 | if (con->state != CON_STATE_CONNECTING && | |
2087 | if (test_bit(STANDBY, &con->state)) | 2091 | con->state != CON_STATE_NEGOTIATING && |
2092 | con->state != CON_STATE_OPEN) | ||
2088 | return 0; | 2093 | return 0; |
2089 | 2094 | ||
2090 | dout("try_read start on %p\n", con); | 2095 | BUG_ON(!con->sock); |
2091 | 2096 | ||
2092 | more: | ||
2093 | dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag, | 2097 | dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag, |
2094 | con->in_base_pos); | 2098 | con->in_base_pos); |
2095 | 2099 | ||
2096 | /* | 2100 | if (con->state == CON_STATE_CONNECTING) { |
2097 | * process_connect and process_message drop and re-take | ||
2098 | * con->mutex. make sure we handle a racing close or reopen. | ||
2099 | */ | ||
2100 | if (test_bit(CLOSED, &con->state) || | ||
2101 | test_bit(OPENING, &con->state)) { | ||
2102 | ret = -EAGAIN; | ||
2103 | goto out; | ||
2104 | } | ||
2105 | |||
2106 | if (test_bit(CONNECTING, &con->state)) { | ||
2107 | dout("try_read connecting\n"); | 2101 | dout("try_read connecting\n"); |
2108 | ret = read_partial_banner(con); | 2102 | ret = read_partial_banner(con); |
2109 | if (ret <= 0) | 2103 | if (ret <= 0) |
@@ -2112,8 +2106,8 @@ more: | |||
2112 | if (ret < 0) | 2106 | if (ret < 0) |
2113 | goto out; | 2107 | goto out; |
2114 | 2108 | ||
2115 | clear_bit(CONNECTING, &con->state); | 2109 | BUG_ON(con->state != CON_STATE_CONNECTING); |
2116 | set_bit(NEGOTIATING, &con->state); | 2110 | con->state = CON_STATE_NEGOTIATING; |
2117 | 2111 | ||
2118 | /* Banner is good, exchange connection info */ | 2112 | /* Banner is good, exchange connection info */ |
2119 | ret = prepare_write_connect(con); | 2113 | ret = prepare_write_connect(con); |
@@ -2125,7 +2119,7 @@ more: | |||
2125 | goto out; | 2119 | goto out; |
2126 | } | 2120 | } |
2127 | 2121 | ||
2128 | if (test_bit(NEGOTIATING, &con->state)) { | 2122 | if (con->state == CON_STATE_NEGOTIATING) { |
2129 | dout("try_read negotiating\n"); | 2123 | dout("try_read negotiating\n"); |
2130 | ret = read_partial_connect(con); | 2124 | ret = read_partial_connect(con); |
2131 | if (ret <= 0) | 2125 | if (ret <= 0) |
@@ -2136,6 +2130,8 @@ more: | |||
2136 | goto more; | 2130 | goto more; |
2137 | } | 2131 | } |
2138 | 2132 | ||
2133 | BUG_ON(con->state != CON_STATE_OPEN); | ||
2134 | |||
2139 | if (con->in_base_pos < 0) { | 2135 | if (con->in_base_pos < 0) { |
2140 | /* | 2136 | /* |
2141 | * skipping + discarding content. | 2137 | * skipping + discarding content. |
@@ -2169,8 +2165,8 @@ more: | |||
2169 | prepare_read_ack(con); | 2165 | prepare_read_ack(con); |
2170 | break; | 2166 | break; |
2171 | case CEPH_MSGR_TAG_CLOSE: | 2167 | case CEPH_MSGR_TAG_CLOSE: |
2172 | clear_bit(CONNECTED, &con->state); | 2168 | con_close_socket(con); |
2173 | set_bit(CLOSED, &con->state); /* fixme */ | 2169 | con->state = CON_STATE_CLOSED; |
2174 | goto out; | 2170 | goto out; |
2175 | default: | 2171 | default: |
2176 | goto bad_tag; | 2172 | goto bad_tag; |
@@ -2246,14 +2242,21 @@ static void con_work(struct work_struct *work) | |||
2246 | mutex_lock(&con->mutex); | 2242 | mutex_lock(&con->mutex); |
2247 | restart: | 2243 | restart: |
2248 | if (test_and_clear_bit(SOCK_CLOSED, &con->flags)) { | 2244 | if (test_and_clear_bit(SOCK_CLOSED, &con->flags)) { |
2249 | if (test_and_clear_bit(CONNECTED, &con->state)) | 2245 | switch (con->state) { |
2250 | con->error_msg = "socket closed"; | 2246 | case CON_STATE_CONNECTING: |
2251 | else if (test_and_clear_bit(NEGOTIATING, &con->state)) | ||
2252 | con->error_msg = "negotiation failed"; | ||
2253 | else if (test_and_clear_bit(CONNECTING, &con->state)) | ||
2254 | con->error_msg = "connection failed"; | 2247 | con->error_msg = "connection failed"; |
2255 | else | 2248 | break; |
2249 | case CON_STATE_NEGOTIATING: | ||
2250 | con->error_msg = "negotiation failed"; | ||
2251 | break; | ||
2252 | case CON_STATE_OPEN: | ||
2253 | con->error_msg = "socket closed"; | ||
2254 | break; | ||
2255 | default: | ||
2256 | dout("unrecognized con state %d\n", (int)con->state); | ||
2256 | con->error_msg = "unrecognized con state"; | 2257 | con->error_msg = "unrecognized con state"; |
2258 | BUG(); | ||
2259 | } | ||
2257 | goto fault; | 2260 | goto fault; |
2258 | } | 2261 | } |
2259 | 2262 | ||
@@ -2271,17 +2274,16 @@ restart: | |||
2271 | } | 2274 | } |
2272 | } | 2275 | } |
2273 | 2276 | ||
2274 | if (test_bit(STANDBY, &con->state)) { | 2277 | if (con->state == CON_STATE_STANDBY) { |
2275 | dout("con_work %p STANDBY\n", con); | 2278 | dout("con_work %p STANDBY\n", con); |
2276 | goto done; | 2279 | goto done; |
2277 | } | 2280 | } |
2278 | if (test_bit(CLOSED, &con->state)) { | 2281 | if (con->state == CON_STATE_CLOSED) { |
2279 | dout("con_work %p CLOSED\n", con); | 2282 | dout("con_work %p CLOSED\n", con); |
2280 | BUG_ON(con->sock); | 2283 | BUG_ON(con->sock); |
2281 | goto done; | 2284 | goto done; |
2282 | } | 2285 | } |
2283 | if (test_and_clear_bit(OPENING, &con->state)) { | 2286 | if (con->state == CON_STATE_PREOPEN) { |
2284 | /* reopen w/ new peer */ | ||
2285 | dout("con_work OPENING\n"); | 2287 | dout("con_work OPENING\n"); |
2286 | BUG_ON(con->sock); | 2288 | BUG_ON(con->sock); |
2287 | } | 2289 | } |
@@ -2328,13 +2330,15 @@ static void ceph_fault(struct ceph_connection *con) | |||
2328 | dout("fault %p state %lu to peer %s\n", | 2330 | dout("fault %p state %lu to peer %s\n", |
2329 | con, con->state, ceph_pr_addr(&con->peer_addr.in_addr)); | 2331 | con, con->state, ceph_pr_addr(&con->peer_addr.in_addr)); |
2330 | 2332 | ||
2331 | if (test_bit(CLOSED, &con->state)) | 2333 | BUG_ON(con->state != CON_STATE_CONNECTING && |
2332 | goto out_unlock; | 2334 | con->state != CON_STATE_NEGOTIATING && |
2335 | con->state != CON_STATE_OPEN); | ||
2333 | 2336 | ||
2334 | con_close_socket(con); | 2337 | con_close_socket(con); |
2335 | 2338 | ||
2336 | if (test_bit(LOSSYTX, &con->flags)) { | 2339 | if (test_bit(LOSSYTX, &con->flags)) { |
2337 | dout("fault on LOSSYTX channel\n"); | 2340 | dout("fault on LOSSYTX channel, marking CLOSED\n"); |
2341 | con->state = CON_STATE_CLOSED; | ||
2338 | goto out_unlock; | 2342 | goto out_unlock; |
2339 | } | 2343 | } |
2340 | 2344 | ||
@@ -2355,9 +2359,10 @@ static void ceph_fault(struct ceph_connection *con) | |||
2355 | !test_bit(KEEPALIVE_PENDING, &con->flags)) { | 2359 | !test_bit(KEEPALIVE_PENDING, &con->flags)) { |
2356 | dout("fault %p setting STANDBY clearing WRITE_PENDING\n", con); | 2360 | dout("fault %p setting STANDBY clearing WRITE_PENDING\n", con); |
2357 | clear_bit(WRITE_PENDING, &con->flags); | 2361 | clear_bit(WRITE_PENDING, &con->flags); |
2358 | set_bit(STANDBY, &con->state); | 2362 | con->state = CON_STATE_STANDBY; |
2359 | } else { | 2363 | } else { |
2360 | /* retry after a delay. */ | 2364 | /* retry after a delay. */ |
2365 | con->state = CON_STATE_PREOPEN; | ||
2361 | if (con->delay == 0) | 2366 | if (con->delay == 0) |
2362 | con->delay = BASE_DELAY_INTERVAL; | 2367 | con->delay = BASE_DELAY_INTERVAL; |
2363 | else if (con->delay < MAX_DELAY_INTERVAL) | 2368 | else if (con->delay < MAX_DELAY_INTERVAL) |
@@ -2431,8 +2436,9 @@ EXPORT_SYMBOL(ceph_messenger_init); | |||
2431 | static void clear_standby(struct ceph_connection *con) | 2436 | static void clear_standby(struct ceph_connection *con) |
2432 | { | 2437 | { |
2433 | /* come back from STANDBY? */ | 2438 | /* come back from STANDBY? */ |
2434 | if (test_and_clear_bit(STANDBY, &con->state)) { | 2439 | if (con->state == CON_STATE_STANDBY) { |
2435 | dout("clear_standby %p and ++connect_seq\n", con); | 2440 | dout("clear_standby %p and ++connect_seq\n", con); |
2441 | con->state = CON_STATE_PREOPEN; | ||
2436 | con->connect_seq++; | 2442 | con->connect_seq++; |
2437 | WARN_ON(test_bit(WRITE_PENDING, &con->flags)); | 2443 | WARN_ON(test_bit(WRITE_PENDING, &con->flags)); |
2438 | WARN_ON(test_bit(KEEPALIVE_PENDING, &con->flags)); | 2444 | WARN_ON(test_bit(KEEPALIVE_PENDING, &con->flags)); |
@@ -2451,7 +2457,7 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg) | |||
2451 | 2457 | ||
2452 | mutex_lock(&con->mutex); | 2458 | mutex_lock(&con->mutex); |
2453 | 2459 | ||
2454 | if (test_bit(CLOSED, &con->state)) { | 2460 | if (con->state == CON_STATE_CLOSED) { |
2455 | dout("con_send %p closed, dropping %p\n", con, msg); | 2461 | dout("con_send %p closed, dropping %p\n", con, msg); |
2456 | ceph_msg_put(msg); | 2462 | ceph_msg_put(msg); |
2457 | mutex_unlock(&con->mutex); | 2463 | mutex_unlock(&con->mutex); |