aboutsummaryrefslogtreecommitdiffstats
path: root/net/ceph/messenger.c
diff options
context:
space:
mode:
authorSage Weil <sage@inktank.com>2012-07-20 20:24:40 -0400
committerSage Weil <sage@inktank.com>2012-07-30 21:16:00 -0400
commit8dacc7da69a491c515851e68de6036f21b5663ce (patch)
tree72e96abb5bc27d2a185cdea2f08a090302f0ede3 /net/ceph/messenger.c
parentd7353dd5aaf22ed611fbcd0d4a4a12fb30659290 (diff)
libceph: replace connection state bits with states
Use a simple set of 6 enumerated values for the socket states (CON_STATE_*) and use those instead of the state bits. All of the con->state checks are now under the protection of the con mutex, so this is safe. It also simplifies many of the state checks because we can check for anything other than the expected state instead of various bits for races we can think of. This appears to hold up well to stress testing both with and without socket failure injection on the server side. Signed-off-by: Sage Weil <sage@inktank.com>
Diffstat (limited to 'net/ceph/messenger.c')
-rw-r--r--net/ceph/messenger.c130
1 files changed, 68 insertions, 62 deletions
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index e7320cd5fdbc..563e46aa4d6d 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -77,6 +77,17 @@
77#define CON_SOCK_STATE_CONNECTED 3 /* -> CLOSING or -> CLOSED */ 77#define CON_SOCK_STATE_CONNECTED 3 /* -> CLOSING or -> CLOSED */
78#define CON_SOCK_STATE_CLOSING 4 /* -> CLOSED */ 78#define CON_SOCK_STATE_CLOSING 4 /* -> CLOSED */
79 79
80/*
81 * connection states
82 */
83#define CON_STATE_CLOSED 1 /* -> PREOPEN */
84#define CON_STATE_PREOPEN 2 /* -> CONNECTING, CLOSED */
85#define CON_STATE_CONNECTING 3 /* -> NEGOTIATING, CLOSED */
86#define CON_STATE_NEGOTIATING 4 /* -> OPEN, CLOSED */
87#define CON_STATE_OPEN 5 /* -> STANDBY, CLOSED */
88#define CON_STATE_STANDBY 6 /* -> PREOPEN, CLOSED */
89
90
80/* static tag bytes (protocol control messages) */ 91/* static tag bytes (protocol control messages) */
81static char tag_msg = CEPH_MSGR_TAG_MSG; 92static char tag_msg = CEPH_MSGR_TAG_MSG;
82static char tag_ack = CEPH_MSGR_TAG_ACK; 93static char tag_ack = CEPH_MSGR_TAG_ACK;
@@ -503,11 +514,7 @@ void ceph_con_close(struct ceph_connection *con)
503 mutex_lock(&con->mutex); 514 mutex_lock(&con->mutex);
504 dout("con_close %p peer %s\n", con, 515 dout("con_close %p peer %s\n", con,
505 ceph_pr_addr(&con->peer_addr.in_addr)); 516 ceph_pr_addr(&con->peer_addr.in_addr));
506 clear_bit(NEGOTIATING, &con->state); 517 con->state = CON_STATE_CLOSED;
507 clear_bit(CONNECTING, &con->state);
508 clear_bit(CONNECTED, &con->state);
509 clear_bit(STANDBY, &con->state); /* avoid connect_seq bump */
510 set_bit(CLOSED, &con->state);
511 518
512 clear_bit(LOSSYTX, &con->flags); /* so we retry next connect */ 519 clear_bit(LOSSYTX, &con->flags); /* so we retry next connect */
513 clear_bit(KEEPALIVE_PENDING, &con->flags); 520 clear_bit(KEEPALIVE_PENDING, &con->flags);
@@ -530,8 +537,9 @@ void ceph_con_open(struct ceph_connection *con,
530{ 537{
531 mutex_lock(&con->mutex); 538 mutex_lock(&con->mutex);
532 dout("con_open %p %s\n", con, ceph_pr_addr(&addr->in_addr)); 539 dout("con_open %p %s\n", con, ceph_pr_addr(&addr->in_addr));
533 set_bit(OPENING, &con->state); 540
534 WARN_ON(!test_and_clear_bit(CLOSED, &con->state)); 541 BUG_ON(con->state != CON_STATE_CLOSED);
542 con->state = CON_STATE_PREOPEN;
535 543
536 con->peer_name.type = (__u8) entity_type; 544 con->peer_name.type = (__u8) entity_type;
537 con->peer_name.num = cpu_to_le64(entity_num); 545 con->peer_name.num = cpu_to_le64(entity_num);
@@ -571,7 +579,7 @@ void ceph_con_init(struct ceph_connection *con, void *private,
571 INIT_LIST_HEAD(&con->out_sent); 579 INIT_LIST_HEAD(&con->out_sent);
572 INIT_DELAYED_WORK(&con->work, con_work); 580 INIT_DELAYED_WORK(&con->work, con_work);
573 581
574 set_bit(CLOSED, &con->state); 582 con->state = CON_STATE_CLOSED;
575} 583}
576EXPORT_SYMBOL(ceph_con_init); 584EXPORT_SYMBOL(ceph_con_init);
577 585
@@ -809,27 +817,21 @@ static struct ceph_auth_handshake *get_connect_authorizer(struct ceph_connection
809 if (!con->ops->get_authorizer) { 817 if (!con->ops->get_authorizer) {
810 con->out_connect.authorizer_protocol = CEPH_AUTH_UNKNOWN; 818 con->out_connect.authorizer_protocol = CEPH_AUTH_UNKNOWN;
811 con->out_connect.authorizer_len = 0; 819 con->out_connect.authorizer_len = 0;
812
813 return NULL; 820 return NULL;
814 } 821 }
815 822
816 /* Can't hold the mutex while getting authorizer */ 823 /* Can't hold the mutex while getting authorizer */
817
818 mutex_unlock(&con->mutex); 824 mutex_unlock(&con->mutex);
819
820 auth = con->ops->get_authorizer(con, auth_proto, con->auth_retry); 825 auth = con->ops->get_authorizer(con, auth_proto, con->auth_retry);
821
822 mutex_lock(&con->mutex); 826 mutex_lock(&con->mutex);
823 827
824 if (IS_ERR(auth)) 828 if (IS_ERR(auth))
825 return auth; 829 return auth;
826 if (test_bit(CLOSED, &con->state) || test_bit(OPENING, &con->flags)) 830 if (con->state != CON_STATE_NEGOTIATING)
827 return ERR_PTR(-EAGAIN); 831 return ERR_PTR(-EAGAIN);
828 832
829 con->auth_reply_buf = auth->authorizer_reply_buf; 833 con->auth_reply_buf = auth->authorizer_reply_buf;
830 con->auth_reply_buf_len = auth->authorizer_reply_buf_len; 834 con->auth_reply_buf_len = auth->authorizer_reply_buf_len;
831
832
833 return auth; 835 return auth;
834} 836}
835 837
@@ -1484,7 +1486,8 @@ static int process_banner(struct ceph_connection *con)
1484static void fail_protocol(struct ceph_connection *con) 1486static void fail_protocol(struct ceph_connection *con)
1485{ 1487{
1486 reset_connection(con); 1488 reset_connection(con);
1487 set_bit(CLOSED, &con->state); /* in case there's queued work */ 1489 BUG_ON(con->state != CON_STATE_NEGOTIATING);
1490 con->state = CON_STATE_CLOSED;
1488} 1491}
1489 1492
1490static int process_connect(struct ceph_connection *con) 1493static int process_connect(struct ceph_connection *con)
@@ -1558,8 +1561,7 @@ static int process_connect(struct ceph_connection *con)
1558 if (con->ops->peer_reset) 1561 if (con->ops->peer_reset)
1559 con->ops->peer_reset(con); 1562 con->ops->peer_reset(con);
1560 mutex_lock(&con->mutex); 1563 mutex_lock(&con->mutex);
1561 if (test_bit(CLOSED, &con->state) || 1564 if (con->state != CON_STATE_NEGOTIATING)
1562 test_bit(OPENING, &con->state))
1563 return -EAGAIN; 1565 return -EAGAIN;
1564 break; 1566 break;
1565 1567
@@ -1605,8 +1607,10 @@ static int process_connect(struct ceph_connection *con)
1605 fail_protocol(con); 1607 fail_protocol(con);
1606 return -1; 1608 return -1;
1607 } 1609 }
1608 clear_bit(NEGOTIATING, &con->state); 1610
1609 set_bit(CONNECTED, &con->state); 1611 BUG_ON(con->state != CON_STATE_NEGOTIATING);
1612 con->state = CON_STATE_OPEN;
1613
1610 con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq); 1614 con->peer_global_seq = le32_to_cpu(con->in_reply.global_seq);
1611 con->connect_seq++; 1615 con->connect_seq++;
1612 con->peer_features = server_feat; 1616 con->peer_features = server_feat;
@@ -1994,8 +1998,9 @@ more:
1994 dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes); 1998 dout("try_write out_kvec_bytes %d\n", con->out_kvec_bytes);
1995 1999
1996 /* open the socket first? */ 2000 /* open the socket first? */
1997 if (con->sock == NULL) { 2001 if (con->state == CON_STATE_PREOPEN) {
1998 set_bit(CONNECTING, &con->state); 2002 BUG_ON(con->sock);
2003 con->state = CON_STATE_CONNECTING;
1999 2004
2000 con_out_kvec_reset(con); 2005 con_out_kvec_reset(con);
2001 prepare_write_banner(con); 2006 prepare_write_banner(con);
@@ -2046,8 +2051,7 @@ more_kvec:
2046 } 2051 }
2047 2052
2048do_next: 2053do_next:
2049 if (!test_bit(CONNECTING, &con->state) && 2054 if (con->state == CON_STATE_OPEN) {
2050 !test_bit(NEGOTIATING, &con->state)) {
2051 /* is anything else pending? */ 2055 /* is anything else pending? */
2052 if (!list_empty(&con->out_queue)) { 2056 if (!list_empty(&con->out_queue)) {
2053 prepare_write_message(con); 2057 prepare_write_message(con);
@@ -2081,29 +2085,19 @@ static int try_read(struct ceph_connection *con)
2081{ 2085{
2082 int ret = -1; 2086 int ret = -1;
2083 2087
2084 if (!con->sock) 2088more:
2085 return 0; 2089 dout("try_read start on %p state %lu\n", con, con->state);
2086 2090 if (con->state != CON_STATE_CONNECTING &&
2087 if (test_bit(STANDBY, &con->state)) 2091 con->state != CON_STATE_NEGOTIATING &&
2092 con->state != CON_STATE_OPEN)
2088 return 0; 2093 return 0;
2089 2094
2090 dout("try_read start on %p\n", con); 2095 BUG_ON(!con->sock);
2091 2096
2092more:
2093 dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag, 2097 dout("try_read tag %d in_base_pos %d\n", (int)con->in_tag,
2094 con->in_base_pos); 2098 con->in_base_pos);
2095 2099
2096 /* 2100 if (con->state == CON_STATE_CONNECTING) {
2097 * process_connect and process_message drop and re-take
2098 * con->mutex. make sure we handle a racing close or reopen.
2099 */
2100 if (test_bit(CLOSED, &con->state) ||
2101 test_bit(OPENING, &con->state)) {
2102 ret = -EAGAIN;
2103 goto out;
2104 }
2105
2106 if (test_bit(CONNECTING, &con->state)) {
2107 dout("try_read connecting\n"); 2101 dout("try_read connecting\n");
2108 ret = read_partial_banner(con); 2102 ret = read_partial_banner(con);
2109 if (ret <= 0) 2103 if (ret <= 0)
@@ -2112,8 +2106,8 @@ more:
2112 if (ret < 0) 2106 if (ret < 0)
2113 goto out; 2107 goto out;
2114 2108
2115 clear_bit(CONNECTING, &con->state); 2109 BUG_ON(con->state != CON_STATE_CONNECTING);
2116 set_bit(NEGOTIATING, &con->state); 2110 con->state = CON_STATE_NEGOTIATING;
2117 2111
2118 /* Banner is good, exchange connection info */ 2112 /* Banner is good, exchange connection info */
2119 ret = prepare_write_connect(con); 2113 ret = prepare_write_connect(con);
@@ -2125,7 +2119,7 @@ more:
2125 goto out; 2119 goto out;
2126 } 2120 }
2127 2121
2128 if (test_bit(NEGOTIATING, &con->state)) { 2122 if (con->state == CON_STATE_NEGOTIATING) {
2129 dout("try_read negotiating\n"); 2123 dout("try_read negotiating\n");
2130 ret = read_partial_connect(con); 2124 ret = read_partial_connect(con);
2131 if (ret <= 0) 2125 if (ret <= 0)
@@ -2136,6 +2130,8 @@ more:
2136 goto more; 2130 goto more;
2137 } 2131 }
2138 2132
2133 BUG_ON(con->state != CON_STATE_OPEN);
2134
2139 if (con->in_base_pos < 0) { 2135 if (con->in_base_pos < 0) {
2140 /* 2136 /*
2141 * skipping + discarding content. 2137 * skipping + discarding content.
@@ -2169,8 +2165,8 @@ more:
2169 prepare_read_ack(con); 2165 prepare_read_ack(con);
2170 break; 2166 break;
2171 case CEPH_MSGR_TAG_CLOSE: 2167 case CEPH_MSGR_TAG_CLOSE:
2172 clear_bit(CONNECTED, &con->state); 2168 con_close_socket(con);
2173 set_bit(CLOSED, &con->state); /* fixme */ 2169 con->state = CON_STATE_CLOSED;
2174 goto out; 2170 goto out;
2175 default: 2171 default:
2176 goto bad_tag; 2172 goto bad_tag;
@@ -2246,14 +2242,21 @@ static void con_work(struct work_struct *work)
2246 mutex_lock(&con->mutex); 2242 mutex_lock(&con->mutex);
2247restart: 2243restart:
2248 if (test_and_clear_bit(SOCK_CLOSED, &con->flags)) { 2244 if (test_and_clear_bit(SOCK_CLOSED, &con->flags)) {
2249 if (test_and_clear_bit(CONNECTED, &con->state)) 2245 switch (con->state) {
2250 con->error_msg = "socket closed"; 2246 case CON_STATE_CONNECTING:
2251 else if (test_and_clear_bit(NEGOTIATING, &con->state))
2252 con->error_msg = "negotiation failed";
2253 else if (test_and_clear_bit(CONNECTING, &con->state))
2254 con->error_msg = "connection failed"; 2247 con->error_msg = "connection failed";
2255 else 2248 break;
2249 case CON_STATE_NEGOTIATING:
2250 con->error_msg = "negotiation failed";
2251 break;
2252 case CON_STATE_OPEN:
2253 con->error_msg = "socket closed";
2254 break;
2255 default:
2256 dout("unrecognized con state %d\n", (int)con->state);
2256 con->error_msg = "unrecognized con state"; 2257 con->error_msg = "unrecognized con state";
2258 BUG();
2259 }
2257 goto fault; 2260 goto fault;
2258 } 2261 }
2259 2262
@@ -2271,17 +2274,16 @@ restart:
2271 } 2274 }
2272 } 2275 }
2273 2276
2274 if (test_bit(STANDBY, &con->state)) { 2277 if (con->state == CON_STATE_STANDBY) {
2275 dout("con_work %p STANDBY\n", con); 2278 dout("con_work %p STANDBY\n", con);
2276 goto done; 2279 goto done;
2277 } 2280 }
2278 if (test_bit(CLOSED, &con->state)) { 2281 if (con->state == CON_STATE_CLOSED) {
2279 dout("con_work %p CLOSED\n", con); 2282 dout("con_work %p CLOSED\n", con);
2280 BUG_ON(con->sock); 2283 BUG_ON(con->sock);
2281 goto done; 2284 goto done;
2282 } 2285 }
2283 if (test_and_clear_bit(OPENING, &con->state)) { 2286 if (con->state == CON_STATE_PREOPEN) {
2284 /* reopen w/ new peer */
2285 dout("con_work OPENING\n"); 2287 dout("con_work OPENING\n");
2286 BUG_ON(con->sock); 2288 BUG_ON(con->sock);
2287 } 2289 }
@@ -2328,13 +2330,15 @@ static void ceph_fault(struct ceph_connection *con)
2328 dout("fault %p state %lu to peer %s\n", 2330 dout("fault %p state %lu to peer %s\n",
2329 con, con->state, ceph_pr_addr(&con->peer_addr.in_addr)); 2331 con, con->state, ceph_pr_addr(&con->peer_addr.in_addr));
2330 2332
2331 if (test_bit(CLOSED, &con->state)) 2333 BUG_ON(con->state != CON_STATE_CONNECTING &&
2332 goto out_unlock; 2334 con->state != CON_STATE_NEGOTIATING &&
2335 con->state != CON_STATE_OPEN);
2333 2336
2334 con_close_socket(con); 2337 con_close_socket(con);
2335 2338
2336 if (test_bit(LOSSYTX, &con->flags)) { 2339 if (test_bit(LOSSYTX, &con->flags)) {
2337 dout("fault on LOSSYTX channel\n"); 2340 dout("fault on LOSSYTX channel, marking CLOSED\n");
2341 con->state = CON_STATE_CLOSED;
2338 goto out_unlock; 2342 goto out_unlock;
2339 } 2343 }
2340 2344
@@ -2355,9 +2359,10 @@ static void ceph_fault(struct ceph_connection *con)
2355 !test_bit(KEEPALIVE_PENDING, &con->flags)) { 2359 !test_bit(KEEPALIVE_PENDING, &con->flags)) {
2356 dout("fault %p setting STANDBY clearing WRITE_PENDING\n", con); 2360 dout("fault %p setting STANDBY clearing WRITE_PENDING\n", con);
2357 clear_bit(WRITE_PENDING, &con->flags); 2361 clear_bit(WRITE_PENDING, &con->flags);
2358 set_bit(STANDBY, &con->state); 2362 con->state = CON_STATE_STANDBY;
2359 } else { 2363 } else {
2360 /* retry after a delay. */ 2364 /* retry after a delay. */
2365 con->state = CON_STATE_PREOPEN;
2361 if (con->delay == 0) 2366 if (con->delay == 0)
2362 con->delay = BASE_DELAY_INTERVAL; 2367 con->delay = BASE_DELAY_INTERVAL;
2363 else if (con->delay < MAX_DELAY_INTERVAL) 2368 else if (con->delay < MAX_DELAY_INTERVAL)
@@ -2431,8 +2436,9 @@ EXPORT_SYMBOL(ceph_messenger_init);
2431static void clear_standby(struct ceph_connection *con) 2436static void clear_standby(struct ceph_connection *con)
2432{ 2437{
2433 /* come back from STANDBY? */ 2438 /* come back from STANDBY? */
2434 if (test_and_clear_bit(STANDBY, &con->state)) { 2439 if (con->state == CON_STATE_STANDBY) {
2435 dout("clear_standby %p and ++connect_seq\n", con); 2440 dout("clear_standby %p and ++connect_seq\n", con);
2441 con->state = CON_STATE_PREOPEN;
2436 con->connect_seq++; 2442 con->connect_seq++;
2437 WARN_ON(test_bit(WRITE_PENDING, &con->flags)); 2443 WARN_ON(test_bit(WRITE_PENDING, &con->flags));
2438 WARN_ON(test_bit(KEEPALIVE_PENDING, &con->flags)); 2444 WARN_ON(test_bit(KEEPALIVE_PENDING, &con->flags));
@@ -2451,7 +2457,7 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)
2451 2457
2452 mutex_lock(&con->mutex); 2458 mutex_lock(&con->mutex);
2453 2459
2454 if (test_bit(CLOSED, &con->state)) { 2460 if (con->state == CON_STATE_CLOSED) {
2455 dout("con_send %p closed, dropping %p\n", con, msg); 2461 dout("con_send %p closed, dropping %p\n", con, msg);
2456 ceph_msg_put(msg); 2462 ceph_msg_put(msg);
2457 mutex_unlock(&con->mutex); 2463 mutex_unlock(&con->mutex);