aboutsummaryrefslogtreecommitdiffstats
path: root/net/ceph
diff options
context:
space:
mode:
Diffstat (limited to 'net/ceph')
-rw-r--r--net/ceph/messenger.c130
1 files changed, 68 insertions, 62 deletions
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index e7320cd5fdbc..563e46aa4d6d 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -77,6 +77,17 @@
77#define CON_SOCK_STATE_CONNECTED 3 /* -> CLOSING or -> CLOSED */ 77#define CON_SOCK_STATE_CONNECTED 3 /* -> CLOSING or -> CLOSED */
78#define CON_SOCK_STATE_CLOSING 4 /* -> CLOSED */ 78#define CON_SOCK_STATE_CLOSING 4 /* -> CLOSED */
79 79
80/*
81 * connection states
82 */
83#define CON_STATE_CLOSED 1 /* -> PREOPEN */
84#define CON_STATE_PREOPEN 2 /* -> CONNECTING, CLOSED */
85#define CON_STATE_CONNECTING 3 /* -> NEGOTIATING, CLOSED */
86#define CON_STATE_NEGOTIATING 4 /* -> OPEN, CLOSED */
87#define CON_STATE_OPEN 5 /* -> STANDBY, CLOSED */
88#define CON_STATE_STANDBY 6 /* -> PREOPEN, CLOSED */
89
90
80/* static tag bytes (protocol control messages) */ 91/* static tag bytes (protocol control messages) */
81static char tag_msg = CEPH_MSGR_TAG_MSG; 92static char tag_msg = CEPH_MSGR_TAG_MSG;
82static char tag_ack = CEPH_MSGR_TAG_ACK; 93static char tag_ack = CEPH_MSGR_TAG_ACK;
@@ -503,11 +514,7 @@ void ceph_con_close(struct ceph_connection *con)
503 mutex_lock(&con->mutex); 514 mutex_lock(&con->mutex);
504 dout("con_close %p peer %s\n", con, 515 dout("con_close %p peer %s\n", con,
505 ceph_pr_addr(&con->peer_addr.in_addr)); 516 ceph_pr_addr(&con->peer_addr.in_addr));
506 clear_bit(NEGOTIATING, &con->state); 517 con->state = CON_STATE_CLOSED;
507 clear_bit(CONNECTING, &con->state);
508 clear_bit(CONNECTED, &con->state);
509 clear_bit(STANDBY, &con->state); /* avoid connect_seq bump */
510 set_bit(CLOSED, &con->state);
511 518
512 clear_bit(LOSSYTX, &con->flags); /* so we retry next connect */ 519 clear_bit(LOSSYTX, &con->flags); /* so we retry next connect */
513 clear_bit(KEEPALIVE_PENDING, &con->flags); 520 clear_bit(KEEPALIVE_PENDING, &con->flags);
@@ -530,8 +537,9 @@ void ceph_con_open(struct ceph_connection *con,
530{ 537{
531 mutex_lock(&con->mutex); 538 mutex_lock(&con->mutex);
532 dout("con_open %p %s\n", con, ceph_pr_addr(&addr->in_addr)); 539 dout("con_open %p %s\n", con, ceph_pr_addr(&addr->in_addr));
533 set_bit(OPENING, &con->state); 540
534 WARN_ON(!test_and_clear_bit(CLOSED, &con->state)); 541 BUG_ON(con->state != CON_STATE_CLOSED);
542 con->state = CON_STATE_PREOPEN;
535 543
536 con->peer_name.type = (__u8) entity_type; 544 con->peer_name.type = (__u8) entity_type;
537 con->peer_name.num = cpu_to_le64(entity_num); 545 con->peer_name.num = cpu_to_le64(entity_num);
@@ -571,7 +579,7 @@ void ceph_con_init(struct ceph_connection *con, void *private,
571 INIT_LIST_HEAD(&con->out_sent); 579 INIT_LIST_HEAD(&con->out_sent);
572 INIT_DELAYED_WORK(&con->work, con_work); 580 INIT_DELAYED_WORK(&con->work, con_work);
573 581
574 set_bit(CLOSED, &con->state); 582 con->state = CON_STATE_CLOSED;
575} 583}
576EXPORT_SYMBOL(ceph_con_init); 584EXPORT_SYMBOL(ceph_con_init);
577 585
@@ -809,27 +817,21 @@ static struct ceph_auth_handshake *get_connect_authorizer(struct ceph_connection
809 if (!con->ops->get_authorizer) { 817 if (!con->ops->get_authorizer) {
810 con->out_connect.authorizer_protocol = CEPH_AUTH_UNKNOWN; 818 con->out_connect.authorizer_protocol = CEPH_AUTH_UNKNOWN;
811 con->out_connect.authorizer_len = 0; 819 con->out_connect.authorizer_len = 0;
812
813 return NULL; 820 return NULL;
814 } 821 }
815 822
816 /* Can't hold the mutex while getting authorizer */ 823 /* Can't hold the mutex while getting authorizer */
817
818 mutex_unlock(&con->mutex); 824 mutex_unlock(&con->mutex);
819
820 auth = con->ops->get_authorizer(con, auth_proto, con->auth_retry); 825 auth = con->ops->get_authorizer(con, auth_proto, con->auth_retry);
821
822 mutex_lock(&con->mutex); 826 mutex_lock(&con->mutex);
823 827
824 if (IS_ERR(auth)) 828 if (IS_ERR(auth))
825 return auth; 829 return auth;
826 if (test_bit(CLOSED, &con->state) || test_bit(OPENING, &con->flags)) 830 if (con->state != CON_STATE_NEGOTIATING)
827 return ERR_PTR(-EAGAIN); 831 return ERR_PTR(-EAGAIN);
828 832
829 con->auth_reply_buf = auth->authorizer_reply_buf; 833 con->auth_reply_buf = auth->authorizer_reply_buf;
830 con->auth_reply_buf_len = auth->authorizer_reply_buf_len; 834 con->auth_reply_buf_len = auth->authorizer_reply_buf_len;
831
832
833 return auth; 835 return auth;
834} 836}
835 837
@@ -1484,7 +1486,8 @@ static int process_banner(struct ceph_connection *con)
1484static void fail_protocol(struct ceph_connection *con) 1486static void fail_protocol(struct ceph_connection *con)
1485{ 1487{
1486 reset_connection(con); 1488 reset_connection(con);
1487 set_bit(CLOSED, &con->state); /* in case there's queued work */ 1489 BUG_ON(con->state != CON_STATE_NEGOTIATING);
1490 con->state = CON_STATE_CLOSED;
1488} 1491}
1489 1492
1490static int process_connect(struct ceph_connection *con) 1493static int process_connect(struct ceph_connection *con)
@@ -1558,8 +1561,7 @@ static int process_connect(struct ceph_connection *con)
1558 if (con->ops->peer_reset) 1561 if (con->ops->peer_reset)
1559 con->ops->peer_reset(con); 1562 con->ops->peer_reset(con);
1560 mutex_lock(&con->mutex); 1563 mutex_lock(&con->mutex);
1561 if (test_bit(CLOSED, &con->state) || 1564 if (con->state != CON_STATE_NEGOTIATING)
1562 test_bit(OPENING, &con->state))
1563 return -EAGAIN; 1565 return -EAGAIN;
1564 break; 1566 break;
1565 1567
@@ -1605,8 +1607,10 @@ static int process_connect(struct ceph_connection *con)
1605 fail_protocol(con); 1607 fail_protocol(con);
1606 return -1; 1608 return -1;
1607 } 1609 }
1608 clear_bit(NEGOTIATING, &con->state); 1610
1609 set_bit(CONNECTED, &con->state); 1611 BUG_ON(con->state != CON_STATE_NEGOTIATING);
1612 con->state = CON_STATE_OPEN;
1613
1610 con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq); 1614 con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq);
1611 con->connect_seq++; 1615 con->connect_seq++;
1612 con->peer_features = server_feat; 1616 con->peer_features = server_feat;
@@ -1994,8 +1998,9 @@ more:
1994 dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes); 1998 dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes);
1995 1999
1996 /* open the socket first? */ 2000 /* open the socket first? */
1997 if (con->sock == NULL) { 2001 if (con->state == CON_STATE_PREOPEN) {
1998 set_bit(CONNECTING, &con->state); 2002 BUG_ON(con->sock);
2003 con->state = CON_STATE_CONNECTING;
1999 2004
2000 con_out_kvec_reset(con); 2005 con_out_kvec_reset(con);
2001 prepare_write_banner(con); 2006 prepare_write_banner(con);
@@ -2046,8 +2051,7 @@ more_kvec:
2046 } 2051 }
2047 2052
2048do_next: 2053do_next:
2049 if (!test_bit(CONNECTING, &con->state) && 2054 if (con->state == CON_STATE_OPEN) {
2050 !test_bit(NEGOTIATING, &con->state)) {
2051 /* is anything else pending? */ 2055 /* is anything else pending? */
2052 if (!list_empty(&con->out_queue)) { 2056 if (!list_empty(&con->out_queue)) {
2053 prepare_write_message(con); 2057 prepare_write_message(con);
@@ -2081,29 +2085,19 @@ static int try_read(struct ceph_connection *con)
2081{ 2085{
2082 int ret = -1; 2086 int ret = -1;
2083 2087
2084 if (!con->sock) 2088more:
2085 return 0; 2089 dout("try_read start on %p state %lu\n", con, con->state);
2086 2090 if (con->state != CON_STATE_CONNECTING &&
2087 if (test_bit(STANDBY, &con->state)) 2091 con->state != CON_STATE_NEGOTIATING &&
2092 con->state != CON_STATE_OPEN)
2088 return 0; 2093 return 0;
2089 2094
2090 dout("try_read start on %p\n", con); 2095 BUG_ON(!con->sock);
2091 2096
2092more:
2093 dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag, 2097 dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag,
2094 con->in_base_pos); 2098 con->in_base_pos);
2095 2099
2096 /* 2100 if (con->state == CON_STATE_CONNECTING) {
2097 * process_connect and process_message drop and re-take
2098 * con->mutex. make sure we handle a racing close or reopen.
2099 */
2100 if (test_bit(CLOSED, &con->state) ||
2101 test_bit(OPENING, &con->state)) {
2102 ret = -EAGAIN;
2103 goto out;
2104 }
2105
2106 if (test_bit(CONNECTING, &con->state)) {
2107 dout("try_read connecting\n"); 2101 dout("try_read connecting\n");
2108 ret = read_partial_banner(con); 2102 ret = read_partial_banner(con);
2109 if (ret <= 0) 2103 if (ret <= 0)
@@ -2112,8 +2106,8 @@ more:
2112 if (ret < 0) 2106 if (ret < 0)
2113 goto out; 2107 goto out;
2114 2108
2115 clear_bit(CONNECTING, &con->state); 2109 BUG_ON(con->state != CON_STATE_CONNECTING);
2116 set_bit(NEGOTIATING, &con->state); 2110 con->state = CON_STATE_NEGOTIATING;
2117 2111
2118 /* Banner is good, exchange connection info */ 2112 /* Banner is good, exchange connection info */
2119 ret = prepare_write_connect(con); 2113 ret = prepare_write_connect(con);
@@ -2125,7 +2119,7 @@ more:
2125 goto out; 2119 goto out;
2126 } 2120 }
2127 2121
2128 if (test_bit(NEGOTIATING, &con->state)) { 2122 if (con->state == CON_STATE_NEGOTIATING) {
2129 dout("try_read negotiating\n"); 2123 dout("try_read negotiating\n");
2130 ret = read_partial_connect(con); 2124 ret = read_partial_connect(con);
2131 if (ret <= 0) 2125 if (ret <= 0)
@@ -2136,6 +2130,8 @@ more:
2136 goto more; 2130 goto more;
2137 } 2131 }
2138 2132
2133 BUG_ON(con->state != CON_STATE_OPEN);
2134
2139 if (con->in_base_pos < 0) { 2135 if (con->in_base_pos < 0) {
2140 /* 2136 /*
2141 * skipping + discarding content. 2137 * skipping + discarding content.
@@ -2169,8 +2165,8 @@ more:
2169 prepare_read_ack(con); 2165 prepare_read_ack(con);
2170 break; 2166 break;
2171 case CEPH_MSGR_TAG_CLOSE: 2167 case CEPH_MSGR_TAG_CLOSE:
2172 clear_bit(CONNECTED, &con->state); 2168 con_close_socket(con);
2173 set_bit(CLOSED, &con->state); /* fixme */ 2169 con->state = CON_STATE_CLOSED;
2174 goto out; 2170 goto out;
2175 default: 2171 default:
2176 goto bad_tag; 2172 goto bad_tag;
@@ -2246,14 +2242,21 @@ static void con_work(struct work_struct *work)
2246 mutex_lock(&con->mutex); 2242 mutex_lock(&con->mutex);
2247restart: 2243restart:
2248 if (test_and_clear_bit(SOCK_CLOSED, &con->flags)) { 2244 if (test_and_clear_bit(SOCK_CLOSED, &con->flags)) {
2249 if (test_and_clear_bit(CONNECTED, &con->state)) 2245 switch (con->state) {
2250 con->error_msg = "socket closed"; 2246 case CON_STATE_CONNECTING:
2251 else if (test_and_clear_bit(NEGOTIATING, &con->state))
2252 con->error_msg = "negotiation failed";
2253 else if (test_and_clear_bit(CONNECTING, &con->state))
2254 con->error_msg = "connection failed"; 2247 con->error_msg = "connection failed";
2255 else 2248 break;
2249 case CON_STATE_NEGOTIATING:
2250 con->error_msg = "negotiation failed";
2251 break;
2252 case CON_STATE_OPEN:
2253 con->error_msg = "socket closed";
2254 break;
2255 default:
2256 dout("unrecognized con state %d\n", (int)con->state);
2256 con->error_msg = "unrecognized con state"; 2257 con->error_msg = "unrecognized con state";
2258 BUG();
2259 }
2257 goto fault; 2260 goto fault;
2258 } 2261 }
2259 2262
@@ -2271,17 +2274,16 @@ restart:
2271 } 2274 }
2272 } 2275 }
2273 2276
2274 if (test_bit(STANDBY, &con->state)) { 2277 if (con->state == CON_STATE_STANDBY) {
2275 dout("con_work %p STANDBY\n", con); 2278 dout("con_work %p STANDBY\n", con);
2276 goto done; 2279 goto done;
2277 } 2280 }
2278 if (test_bit(CLOSED, &con->state)) { 2281 if (con->state == CON_STATE_CLOSED) {
2279 dout("con_work %p CLOSED\n", con); 2282 dout("con_work %p CLOSED\n", con);
2280 BUG_ON(con->sock); 2283 BUG_ON(con->sock);
2281 goto done; 2284 goto done;
2282 } 2285 }
2283 if (test_and_clear_bit(OPENING, &con->state)) { 2286 if (con->state == CON_STATE_PREOPEN) {
2284 /* reopen w/ new peer */
2285 dout("con_work OPENING\n"); 2287 dout("con_work OPENING\n");
2286 BUG_ON(con->sock); 2288 BUG_ON(con->sock);
2287 } 2289 }
@@ -2328,13 +2330,15 @@ static void ceph_fault(struct ceph_connection *con)
2328 dout("fault %p state %lu to peer %s\n", 2330 dout("fault %p state %lu to peer %s\n",
2329 con, con->state, ceph_pr_addr(&con->peer_addr.in_addr)); 2331 con, con->state, ceph_pr_addr(&con->peer_addr.in_addr));
2330 2332
2331 if (test_bit(CLOSED, &con->state)) 2333 BUG_ON(con->state != CON_STATE_CONNECTING &&
2332 goto out_unlock; 2334 con->state != CON_STATE_NEGOTIATING &&
2335 con->state != CON_STATE_OPEN);
2333 2336
2334 con_close_socket(con); 2337 con_close_socket(con);
2335 2338
2336 if (test_bit(LOSSYTX, &con->flags)) { 2339 if (test_bit(LOSSYTX, &con->flags)) {
2337 dout("fault on LOSSYTX channel\n"); 2340 dout("fault on LOSSYTX channel, marking CLOSED\n");
2341 con->state = CON_STATE_CLOSED;
2338 goto out_unlock; 2342 goto out_unlock;
2339 } 2343 }
2340 2344
@@ -2355,9 +2359,10 @@ static void ceph_fault(struct ceph_connection *con)
2355 !test_bit(KEEPALIVE_PENDING, &con->flags)) { 2359 !test_bit(KEEPALIVE_PENDING, &con->flags)) {
2356 dout("fault %p setting STANDBY clearing WRITE_PENDING\n", con); 2360 dout("fault %p setting STANDBY clearing WRITE_PENDING\n", con);
2357 clear_bit(WRITE_PENDING, &con->flags); 2361 clear_bit(WRITE_PENDING, &con->flags);
2358 set_bit(STANDBY, &con->state); 2362 con->state = CON_STATE_STANDBY;
2359 } else { 2363 } else {
2360 /* retry after a delay. */ 2364 /* retry after a delay. */
2365 con->state = CON_STATE_PREOPEN;
2361 if (con->delay == 0) 2366 if (con->delay == 0)
2362 con->delay = BASE_DELAY_INTERVAL; 2367 con->delay = BASE_DELAY_INTERVAL;
2363 else if (con->delay < MAX_DELAY_INTERVAL) 2368 else if (con->delay < MAX_DELAY_INTERVAL)
@@ -2431,8 +2436,9 @@ EXPORT_SYMBOL(ceph_messenger_init);
2431static void clear_standby(struct ceph_connection *con) 2436static void clear_standby(struct ceph_connection *con)
2432{ 2437{
2433 /* come back from STANDBY? */ 2438 /* come back from STANDBY? */
2434 if (test_and_clear_bit(STANDBY, &con->state)) { 2439 if (con->state == CON_STATE_STANDBY) {
2435 dout("clear_standby %p and ++connect_seq\n", con); 2440 dout("clear_standby %p and ++connect_seq\n", con);
2441 con->state = CON_STATE_PREOPEN;
2436 con->connect_seq++; 2442 con->connect_seq++;
2437 WARN_ON(test_bit(WRITE_PENDING, &con->flags)); 2443 WARN_ON(test_bit(WRITE_PENDING, &con->flags));
2438 WARN_ON(test_bit(KEEPALIVE_PENDING, &con->flags)); 2444 WARN_ON(test_bit(KEEPALIVE_PENDING, &con->flags));
@@ -2451,7 +2457,7 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)
2451 2457
2452 mutex_lock(&con->mutex); 2458 mutex_lock(&con->mutex);
2453 2459
2454 if (test_bit(CLOSED, &con->state)) { 2460 if (con->state == CON_STATE_CLOSED) {
2455 dout("con_send %p closed, dropping %p\n", con, msg); 2461 dout("con_send %p closed, dropping %p\n", con, msg);
2456 ceph_msg_put(msg); 2462 ceph_msg_put(msg);
2457 mutex_unlock(&con->mutex); 2463 mutex_unlock(&con->mutex);