diff options
Diffstat (limited to 'net/ceph')
-rw-r--r-- | net/ceph/messenger.c | 130 |
1 files changed, 68 insertions, 62 deletions
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index e7320cd5fdbc..563e46aa4d6d 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c | |||
@@ -77,6 +77,17 @@ | |||
77 | #define CON_SOCK_STATE_CONNECTED 3 /* -> CLOSING or -> CLOSED */ | 77 | #define CON_SOCK_STATE_CONNECTED 3 /* -> CLOSING or -> CLOSED */ |
78 | #define CON_SOCK_STATE_CLOSING 4 /* -> CLOSED */ | 78 | #define CON_SOCK_STATE_CLOSING 4 /* -> CLOSED */ |
79 | 79 | ||
80 | /* | ||
81 | * connection states | ||
82 | */ | ||
83 | #define CON_STATE_CLOSED 1 /* -> PREOPEN */ | ||
84 | #define CON_STATE_PREOPEN 2 /* -> CONNECTING, CLOSED */ | ||
85 | #define CON_STATE_CONNECTING 3 /* -> NEGOTIATING, CLOSED */ | ||
86 | #define CON_STATE_NEGOTIATING 4 /* -> OPEN, CLOSED */ | ||
87 | #define CON_STATE_OPEN 5 /* -> STANDBY, CLOSED */ | ||
88 | #define CON_STATE_STANDBY 6 /* -> PREOPEN, CLOSED */ | ||
89 | |||
90 | |||
80 | /* static tag bytes (protocol control messages) */ | 91 | /* static tag bytes (protocol control messages) */ |
81 | static char tag_msg = CEPH_MSGR_TAG_MSG; | 92 | static char tag_msg = CEPH_MSGR_TAG_MSG; |
82 | static char tag_ack = CEPH_MSGR_TAG_ACK; | 93 | static char tag_ack = CEPH_MSGR_TAG_ACK; |
@@ -503,11 +514,7 @@ void ceph_con_close(struct ceph_connection *con) | |||
503 | mutex_lock(&con->mutex); | 514 | mutex_lock(&con->mutex); |
504 | dout("con_close %p peer %s\n", con, | 515 | dout("con_close %p peer %s\n", con, |
505 | ceph_pr_addr(&con->peer_addr.in_addr)); | 516 | ceph_pr_addr(&con->peer_addr.in_addr)); |
506 | clear_bit(NEGOTIATING, &con->state); | 517 | con->state = CON_STATE_CLOSED; |
507 | clear_bit(CONNECTING, &con->state); | ||
508 | clear_bit(CONNECTED, &con->state); | ||
509 | clear_bit(STANDBY, &con->state); /* avoid connect_seq bump */ | ||
510 | set_bit(CLOSED, &con->state); | ||
511 | 518 | ||
512 | clear_bit(LOSSYTX, &con->flags); /* so we retry next connect */ | 519 | clear_bit(LOSSYTX, &con->flags); /* so we retry next connect */ |
513 | clear_bit(KEEPALIVE_PENDING, &con->flags); | 520 | clear_bit(KEEPALIVE_PENDING, &con->flags); |
@@ -530,8 +537,9 @@ void ceph_con_open(struct ceph_connection *con, | |||
530 | { | 537 | { |
531 | mutex_lock(&con->mutex); | 538 | mutex_lock(&con->mutex); |
532 | dout("con_open %p %s\n", con, ceph_pr_addr(&addr->in_addr)); | 539 | dout("con_open %p %s\n", con, ceph_pr_addr(&addr->in_addr)); |
533 | set_bit(OPENING, &con->state); | 540 | |
534 | WARN_ON(!test_and_clear_bit(CLOSED, &con->state)); | 541 | BUG_ON(con->state != CON_STATE_CLOSED); |
542 | con->state = CON_STATE_PREOPEN; | ||
535 | 543 | ||
536 | con->peer_name.type = (__u8) entity_type; | 544 | con->peer_name.type = (__u8) entity_type; |
537 | con->peer_name.num = cpu_to_le64(entity_num); | 545 | con->peer_name.num = cpu_to_le64(entity_num); |
@@ -571,7 +579,7 @@ void ceph_con_init(struct ceph_connection *con, void *private, | |||
571 | INIT_LIST_HEAD(&con->out_sent); | 579 | INIT_LIST_HEAD(&con->out_sent); |
572 | INIT_DELAYED_WORK(&con->work, con_work); | 580 | INIT_DELAYED_WORK(&con->work, con_work); |
573 | 581 | ||
574 | set_bit(CLOSED, &con->state); | 582 | con->state = CON_STATE_CLOSED; |
575 | } | 583 | } |
576 | EXPORT_SYMBOL(ceph_con_init); | 584 | EXPORT_SYMBOL(ceph_con_init); |
577 | 585 | ||
@@ -809,27 +817,21 @@ static struct ceph_auth_handshake *get_connect_authorizer(struct ceph_connection | |||
809 | if (!con->ops->get_authorizer) { | 817 | if (!con->ops->get_authorizer) { |
810 | con->out_connect.authorizer_protocol = CEPH_AUTH_UNKNOWN; | 818 | con->out_connect.authorizer_protocol = CEPH_AUTH_UNKNOWN; |
811 | con->out_connect.authorizer_len = 0; | 819 | con->out_connect.authorizer_len = 0; |
812 | |||
813 | return NULL; | 820 | return NULL; |
814 | } | 821 | } |
815 | 822 | ||
816 | /* Can't hold the mutex while getting authorizer */ | 823 | /* Can't hold the mutex while getting authorizer */ |
817 | |||
818 | mutex_unlock(&con->mutex); | 824 | mutex_unlock(&con->mutex); |
819 | |||
820 | auth = con->ops->get_authorizer(con, auth_proto, con->auth_retry); | 825 | auth = con->ops->get_authorizer(con, auth_proto, con->auth_retry); |
821 | |||
822 | mutex_lock(&con->mutex); | 826 | mutex_lock(&con->mutex); |
823 | 827 | ||
824 | if (IS_ERR(auth)) | 828 | if (IS_ERR(auth)) |
825 | return auth; | 829 | return auth; |
826 | if (test_bit(CLOSED, &con->state) || test_bit(OPENING, &con->flags)) | 830 | if (con->state != CON_STATE_NEGOTIATING) |
827 | return ERR_PTR(-EAGAIN); | 831 | return ERR_PTR(-EAGAIN); |
828 | 832 | ||
829 | con->auth_reply_buf = auth->authorizer_reply_buf; | 833 | con->auth_reply_buf = auth->authorizer_reply_buf; |
830 | con->auth_reply_buf_len = auth->authorizer_reply_buf_len; | 834 | con->auth_reply_buf_len = auth->authorizer_reply_buf_len; |
831 | |||
832 | |||
833 | return auth; | 835 | return auth; |
834 | } | 836 | } |
835 | 837 | ||
@@ -1484,7 +1486,8 @@ static int process_banner(struct ceph_connection *con) | |||
1484 | static void fail_protocol(struct ceph_connection *con) | 1486 | static void fail_protocol(struct ceph_connection *con) |
1485 | { | 1487 | { |
1486 | reset_connection(con); | 1488 | reset_connection(con); |
1487 | set_bit(CLOSED, &con->state); /* in case there's queued work */ | 1489 | BUG_ON(con->state != CON_STATE_NEGOTIATING); |
1490 | con->state = CON_STATE_CLOSED; | ||
1488 | } | 1491 | } |
1489 | 1492 | ||
1490 | static int process_connect(struct ceph_connection *con) | 1493 | static int process_connect(struct ceph_connection *con) |
@@ -1558,8 +1561,7 @@ static int process_connect(struct ceph_connection *con) | |||
1558 | if (con->ops->peer_reset) | 1561 | if (con->ops->peer_reset) |
1559 | con->ops->peer_reset(con); | 1562 | con->ops->peer_reset(con); |
1560 | mutex_lock(&con->mutex); | 1563 | mutex_lock(&con->mutex); |
1561 | if (test_bit(CLOSED, &con->state) || | 1564 | if (con->state != CON_STATE_NEGOTIATING) |
1562 | test_bit(OPENING, &con->state)) | ||
1563 | return -EAGAIN; | 1565 | return -EAGAIN; |
1564 | break; | 1566 | break; |
1565 | 1567 | ||
@@ -1605,8 +1607,10 @@ static int process_connect(struct ceph_connection *con) | |||
1605 | fail_protocol(con); | 1607 | fail_protocol(con); |
1606 | return -1; | 1608 | return -1; |
1607 | } | 1609 | } |
1608 | clear_bit(NEGOTIATING, &con->state); | 1610 | |
1609 | set_bit(CONNECTED, &con->state); | 1611 | BUG_ON(con->state != CON_STATE_NEGOTIATING); |
1612 | con->state = CON_STATE_OPEN; | ||
1613 | |||
1610 | con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq); | 1614 | con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq); |
1611 | con->connect_seq++; | 1615 | con->connect_seq++; |
1612 | con->peer_features = server_feat; | 1616 | con->peer_features = server_feat; |
@@ -1994,8 +1998,9 @@ more: | |||
1994 | dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes); | 1998 | dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes); |
1995 | 1999 | ||
1996 | /* open the socket first? */ | 2000 | /* open the socket first? */ |
1997 | if (con->sock == NULL) { | 2001 | if (con->state == CON_STATE_PREOPEN) { |
1998 | set_bit(CONNECTING, &con->state); | 2002 | BUG_ON(con->sock); |
2003 | con->state = CON_STATE_CONNECTING; | ||
1999 | 2004 | ||
2000 | con_out_kvec_reset(con); | 2005 | con_out_kvec_reset(con); |
2001 | prepare_write_banner(con); | 2006 | prepare_write_banner(con); |
@@ -2046,8 +2051,7 @@ more_kvec: | |||
2046 | } | 2051 | } |
2047 | 2052 | ||
2048 | do_next: | 2053 | do_next: |
2049 | if (!test_bit(CONNECTING, &con->state) && | 2054 | if (con->state == CON_STATE_OPEN) { |
2050 | !test_bit(NEGOTIATING, &con->state)) { | ||
2051 | /* is anything else pending? */ | 2055 | /* is anything else pending? */ |
2052 | if (!list_empty(&con->out_queue)) { | 2056 | if (!list_empty(&con->out_queue)) { |
2053 | prepare_write_message(con); | 2057 | prepare_write_message(con); |
@@ -2081,29 +2085,19 @@ static int try_read(struct ceph_connection *con) | |||
2081 | { | 2085 | { |
2082 | int ret = -1; | 2086 | int ret = -1; |
2083 | 2087 | ||
2084 | if (!con->sock) | 2088 | more: |
2085 | return 0; | 2089 | dout("try_read start on %p state %lu\n", con, con->state); |
2086 | 2090 | if (con->state != CON_STATE_CONNECTING && | |
2087 | if (test_bit(STANDBY, &con->state)) | 2091 | con->state != CON_STATE_NEGOTIATING && |
2092 | con->state != CON_STATE_OPEN) | ||
2088 | return 0; | 2093 | return 0; |
2089 | 2094 | ||
2090 | dout("try_read start on %p\n", con); | 2095 | BUG_ON(!con->sock); |
2091 | 2096 | ||
2092 | more: | ||
2093 | dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag, | 2097 | dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag, |
2094 | con->in_base_pos); | 2098 | con->in_base_pos); |
2095 | 2099 | ||
2096 | /* | 2100 | if (con->state == CON_STATE_CONNECTING) { |
2097 | * process_connect and process_message drop and re-take | ||
2098 | * con->mutex. make sure we handle a racing close or reopen. | ||
2099 | */ | ||
2100 | if (test_bit(CLOSED, &con->state) || | ||
2101 | test_bit(OPENING, &con->state)) { | ||
2102 | ret = -EAGAIN; | ||
2103 | goto out; | ||
2104 | } | ||
2105 | |||
2106 | if (test_bit(CONNECTING, &con->state)) { | ||
2107 | dout("try_read connecting\n"); | 2101 | dout("try_read connecting\n"); |
2108 | ret = read_partial_banner(con); | 2102 | ret = read_partial_banner(con); |
2109 | if (ret <= 0) | 2103 | if (ret <= 0) |
@@ -2112,8 +2106,8 @@ more: | |||
2112 | if (ret < 0) | 2106 | if (ret < 0) |
2113 | goto out; | 2107 | goto out; |
2114 | 2108 | ||
2115 | clear_bit(CONNECTING, &con->state); | 2109 | BUG_ON(con->state != CON_STATE_CONNECTING); |
2116 | set_bit(NEGOTIATING, &con->state); | 2110 | con->state = CON_STATE_NEGOTIATING; |
2117 | 2111 | ||
2118 | /* Banner is good, exchange connection info */ | 2112 | /* Banner is good, exchange connection info */ |
2119 | ret = prepare_write_connect(con); | 2113 | ret = prepare_write_connect(con); |
@@ -2125,7 +2119,7 @@ more: | |||
2125 | goto out; | 2119 | goto out; |
2126 | } | 2120 | } |
2127 | 2121 | ||
2128 | if (test_bit(NEGOTIATING, &con->state)) { | 2122 | if (con->state == CON_STATE_NEGOTIATING) { |
2129 | dout("try_read negotiating\n"); | 2123 | dout("try_read negotiating\n"); |
2130 | ret = read_partial_connect(con); | 2124 | ret = read_partial_connect(con); |
2131 | if (ret <= 0) | 2125 | if (ret <= 0) |
@@ -2136,6 +2130,8 @@ more: | |||
2136 | goto more; | 2130 | goto more; |
2137 | } | 2131 | } |
2138 | 2132 | ||
2133 | BUG_ON(con->state != CON_STATE_OPEN); | ||
2134 | |||
2139 | if (con->in_base_pos < 0) { | 2135 | if (con->in_base_pos < 0) { |
2140 | /* | 2136 | /* |
2141 | * skipping + discarding content. | 2137 | * skipping + discarding content. |
@@ -2169,8 +2165,8 @@ more: | |||
2169 | prepare_read_ack(con); | 2165 | prepare_read_ack(con); |
2170 | break; | 2166 | break; |
2171 | case CEPH_MSGR_TAG_CLOSE: | 2167 | case CEPH_MSGR_TAG_CLOSE: |
2172 | clear_bit(CONNECTED, &con->state); | 2168 | con_close_socket(con); |
2173 | set_bit(CLOSED, &con->state); /* fixme */ | 2169 | con->state = CON_STATE_CLOSED; |
2174 | goto out; | 2170 | goto out; |
2175 | default: | 2171 | default: |
2176 | goto bad_tag; | 2172 | goto bad_tag; |
@@ -2246,14 +2242,21 @@ static void con_work(struct work_struct *work) | |||
2246 | mutex_lock(&con->mutex); | 2242 | mutex_lock(&con->mutex); |
2247 | restart: | 2243 | restart: |
2248 | if (test_and_clear_bit(SOCK_CLOSED, &con->flags)) { | 2244 | if (test_and_clear_bit(SOCK_CLOSED, &con->flags)) { |
2249 | if (test_and_clear_bit(CONNECTED, &con->state)) | 2245 | switch (con->state) { |
2250 | con->error_msg = "socket closed"; | 2246 | case CON_STATE_CONNECTING: |
2251 | else if (test_and_clear_bit(NEGOTIATING, &con->state)) | ||
2252 | con->error_msg = "negotiation failed"; | ||
2253 | else if (test_and_clear_bit(CONNECTING, &con->state)) | ||
2254 | con->error_msg = "connection failed"; | 2247 | con->error_msg = "connection failed"; |
2255 | else | 2248 | break; |
2249 | case CON_STATE_NEGOTIATING: | ||
2250 | con->error_msg = "negotiation failed"; | ||
2251 | break; | ||
2252 | case CON_STATE_OPEN: | ||
2253 | con->error_msg = "socket closed"; | ||
2254 | break; | ||
2255 | default: | ||
2256 | dout("unrecognized con state %d\n", (int)con->state); | ||
2256 | con->error_msg = "unrecognized con state"; | 2257 | con->error_msg = "unrecognized con state"; |
2258 | BUG(); | ||
2259 | } | ||
2257 | goto fault; | 2260 | goto fault; |
2258 | } | 2261 | } |
2259 | 2262 | ||
@@ -2271,17 +2274,16 @@ restart: | |||
2271 | } | 2274 | } |
2272 | } | 2275 | } |
2273 | 2276 | ||
2274 | if (test_bit(STANDBY, &con->state)) { | 2277 | if (con->state == CON_STATE_STANDBY) { |
2275 | dout("con_work %p STANDBY\n", con); | 2278 | dout("con_work %p STANDBY\n", con); |
2276 | goto done; | 2279 | goto done; |
2277 | } | 2280 | } |
2278 | if (test_bit(CLOSED, &con->state)) { | 2281 | if (con->state == CON_STATE_CLOSED) { |
2279 | dout("con_work %p CLOSED\n", con); | 2282 | dout("con_work %p CLOSED\n", con); |
2280 | BUG_ON(con->sock); | 2283 | BUG_ON(con->sock); |
2281 | goto done; | 2284 | goto done; |
2282 | } | 2285 | } |
2283 | if (test_and_clear_bit(OPENING, &con->state)) { | 2286 | if (con->state == CON_STATE_PREOPEN) { |
2284 | /* reopen w/ new peer */ | ||
2285 | dout("con_work OPENING\n"); | 2287 | dout("con_work OPENING\n"); |
2286 | BUG_ON(con->sock); | 2288 | BUG_ON(con->sock); |
2287 | } | 2289 | } |
@@ -2328,13 +2330,15 @@ static void ceph_fault(struct ceph_connection *con) | |||
2328 | dout("fault %p state %lu to peer %s\n", | 2330 | dout("fault %p state %lu to peer %s\n", |
2329 | con, con->state, ceph_pr_addr(&con->peer_addr.in_addr)); | 2331 | con, con->state, ceph_pr_addr(&con->peer_addr.in_addr)); |
2330 | 2332 | ||
2331 | if (test_bit(CLOSED, &con->state)) | 2333 | BUG_ON(con->state != CON_STATE_CONNECTING && |
2332 | goto out_unlock; | 2334 | con->state != CON_STATE_NEGOTIATING && |
2335 | con->state != CON_STATE_OPEN); | ||
2333 | 2336 | ||
2334 | con_close_socket(con); | 2337 | con_close_socket(con); |
2335 | 2338 | ||
2336 | if (test_bit(LOSSYTX, &con->flags)) { | 2339 | if (test_bit(LOSSYTX, &con->flags)) { |
2337 | dout("fault on LOSSYTX channel\n"); | 2340 | dout("fault on LOSSYTX channel, marking CLOSED\n"); |
2341 | con->state = CON_STATE_CLOSED; | ||
2338 | goto out_unlock; | 2342 | goto out_unlock; |
2339 | } | 2343 | } |
2340 | 2344 | ||
@@ -2355,9 +2359,10 @@ static void ceph_fault(struct ceph_connection *con) | |||
2355 | !test_bit(KEEPALIVE_PENDING, &con->flags)) { | 2359 | !test_bit(KEEPALIVE_PENDING, &con->flags)) { |
2356 | dout("fault %p setting STANDBY clearing WRITE_PENDING\n", con); | 2360 | dout("fault %p setting STANDBY clearing WRITE_PENDING\n", con); |
2357 | clear_bit(WRITE_PENDING, &con->flags); | 2361 | clear_bit(WRITE_PENDING, &con->flags); |
2358 | set_bit(STANDBY, &con->state); | 2362 | con->state = CON_STATE_STANDBY; |
2359 | } else { | 2363 | } else { |
2360 | /* retry after a delay. */ | 2364 | /* retry after a delay. */ |
2365 | con->state = CON_STATE_PREOPEN; | ||
2361 | if (con->delay == 0) | 2366 | if (con->delay == 0) |
2362 | con->delay = BASE_DELAY_INTERVAL; | 2367 | con->delay = BASE_DELAY_INTERVAL; |
2363 | else if (con->delay < MAX_DELAY_INTERVAL) | 2368 | else if (con->delay < MAX_DELAY_INTERVAL) |
@@ -2431,8 +2436,9 @@ EXPORT_SYMBOL(ceph_messenger_init); | |||
2431 | static void clear_standby(struct ceph_connection *con) | 2436 | static void clear_standby(struct ceph_connection *con) |
2432 | { | 2437 | { |
2433 | /* come back from STANDBY? */ | 2438 | /* come back from STANDBY? */ |
2434 | if (test_and_clear_bit(STANDBY, &con->state)) { | 2439 | if (con->state == CON_STATE_STANDBY) { |
2435 | dout("clear_standby %p and ++connect_seq\n", con); | 2440 | dout("clear_standby %p and ++connect_seq\n", con); |
2441 | con->state = CON_STATE_PREOPEN; | ||
2436 | con->connect_seq++; | 2442 | con->connect_seq++; |
2437 | WARN_ON(test_bit(WRITE_PENDING, &con->flags)); | 2443 | WARN_ON(test_bit(WRITE_PENDING, &con->flags)); |
2438 | WARN_ON(test_bit(KEEPALIVE_PENDING, &con->flags)); | 2444 | WARN_ON(test_bit(KEEPALIVE_PENDING, &con->flags)); |
@@ -2451,7 +2457,7 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg) | |||
2451 | 2457 | ||
2452 | mutex_lock(&con->mutex); | 2458 | mutex_lock(&con->mutex); |
2453 | 2459 | ||
2454 | if (test_bit(CLOSED, &con->state)) { | 2460 | if (con->state == CON_STATE_CLOSED) { |
2455 | dout("con_send %p closed, dropping %p\n", con, msg); | 2461 | dout("con_send %p closed, dropping %p\n", con, msg); |
2456 | ceph_msg_put(msg); | 2462 | ceph_msg_put(msg); |
2457 | mutex_unlock(&con->mutex); | 2463 | mutex_unlock(&con->mutex); |