diff options
author | Alex Elder <elder@inktank.com> | 2012-05-22 12:41:43 -0400 |
---|---|---|
committer | Alex Elder <elder@dreamhost.com> | 2012-06-01 09:37:56 -0400 |
commit | 928443cd9644e7cfd46f687dbeffda2d1a357ff9 (patch) | |
tree | 09112335af82c7ce01d93caa55567341f9468c3f | |
parent | 15d9882c336db2db73ccf9871ae2398e452f694c (diff) |
libceph: start separating connection flags from state
A ceph_connection holds a mixture of connection state (as in "state
machine" state) and connection flags in a single "state" field. To
make the distinction more clear, define a new "flags" field and use
it rather than the "state" field to hold Boolean flag values.
Signed-off-by: Alex Elder <elder@inktank.com>
Reviewed-by: Sage Weil<sage@inktank.com>
-rw-r--r-- | include/linux/ceph/messenger.h | 18 | ||||
-rw-r--r-- | net/ceph/messenger.c | 50 |
2 files changed, 37 insertions, 31 deletions
diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h index 3fbd4be804ed..920235e114ad 100644 --- a/include/linux/ceph/messenger.h +++ b/include/linux/ceph/messenger.h | |||
@@ -103,20 +103,25 @@ struct ceph_msg_pos { | |||
103 | #define MAX_DELAY_INTERVAL (5 * 60 * HZ) | 103 | #define MAX_DELAY_INTERVAL (5 * 60 * HZ) |
104 | 104 | ||
105 | /* | 105 | /* |
106 | * ceph_connection state bit flags | 106 | * ceph_connection flag bits |
107 | */ | 107 | */ |
108 | |||
108 | #define LOSSYTX 0 /* we can close channel or drop messages on errors */ | 109 | #define LOSSYTX 0 /* we can close channel or drop messages on errors */ |
109 | #define CONNECTING 1 | ||
110 | #define NEGOTIATING 2 | ||
111 | #define KEEPALIVE_PENDING 3 | 110 | #define KEEPALIVE_PENDING 3 |
112 | #define WRITE_PENDING 4 /* we have data ready to send */ | 111 | #define WRITE_PENDING 4 /* we have data ready to send */ |
112 | #define SOCK_CLOSED 11 /* socket state changed to closed */ | ||
113 | #define BACKOFF 15 | ||
114 | |||
115 | /* | ||
116 | * ceph_connection states | ||
117 | */ | ||
118 | #define CONNECTING 1 | ||
119 | #define NEGOTIATING 2 | ||
113 | #define STANDBY 8 /* no outgoing messages, socket closed. we keep | 120 | #define STANDBY 8 /* no outgoing messages, socket closed. we keep |
114 | * the ceph_connection around to maintain shared | 121 | * the ceph_connection around to maintain shared |
115 | * state with the peer. */ | 122 | * state with the peer. */ |
116 | #define CLOSED 10 /* we've closed the connection */ | 123 | #define CLOSED 10 /* we've closed the connection */ |
117 | #define SOCK_CLOSED 11 /* socket state changed to closed */ | ||
118 | #define OPENING 13 /* open connection w/ (possibly new) peer */ | 124 | #define OPENING 13 /* open connection w/ (possibly new) peer */ |
119 | #define BACKOFF 15 | ||
120 | 125 | ||
121 | /* | 126 | /* |
122 | * A single connection with another host. | 127 | * A single connection with another host. |
@@ -133,7 +138,8 @@ struct ceph_connection { | |||
133 | 138 | ||
134 | struct ceph_messenger *msgr; | 139 | struct ceph_messenger *msgr; |
135 | struct socket *sock; | 140 | struct socket *sock; |
136 | unsigned long state; /* connection state (see flags above) */ | 141 | unsigned long flags; |
142 | unsigned long state; | ||
137 | const char *error_msg; /* error message, if any */ | 143 | const char *error_msg; /* error message, if any */ |
138 | 144 | ||
139 | struct ceph_entity_addr peer_addr; /* peer address */ | 145 | struct ceph_entity_addr peer_addr; /* peer address */ |
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index d8423a3f6698..e84e4fd86bb7 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c | |||
@@ -176,7 +176,7 @@ static void ceph_sock_write_space(struct sock *sk) | |||
176 | * buffer. See net/ipv4/tcp_input.c:tcp_check_space() | 176 | * buffer. See net/ipv4/tcp_input.c:tcp_check_space() |
177 | * and net/core/stream.c:sk_stream_write_space(). | 177 | * and net/core/stream.c:sk_stream_write_space(). |
178 | */ | 178 | */ |
179 | if (test_bit(WRITE_PENDING, &con->state)) { | 179 | if (test_bit(WRITE_PENDING, &con->flags)) { |
180 | if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk)) { | 180 | if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk)) { |
181 | dout("%s %p queueing write work\n", __func__, con); | 181 | dout("%s %p queueing write work\n", __func__, con); |
182 | clear_bit(SOCK_NOSPACE, &sk->sk_socket->flags); | 182 | clear_bit(SOCK_NOSPACE, &sk->sk_socket->flags); |
@@ -203,7 +203,7 @@ static void ceph_sock_state_change(struct sock *sk) | |||
203 | dout("%s TCP_CLOSE\n", __func__); | 203 | dout("%s TCP_CLOSE\n", __func__); |
204 | case TCP_CLOSE_WAIT: | 204 | case TCP_CLOSE_WAIT: |
205 | dout("%s TCP_CLOSE_WAIT\n", __func__); | 205 | dout("%s TCP_CLOSE_WAIT\n", __func__); |
206 | if (test_and_set_bit(SOCK_CLOSED, &con->state) == 0) { | 206 | if (test_and_set_bit(SOCK_CLOSED, &con->flags) == 0) { |
207 | if (test_bit(CONNECTING, &con->state)) | 207 | if (test_bit(CONNECTING, &con->state)) |
208 | con->error_msg = "connection failed"; | 208 | con->error_msg = "connection failed"; |
209 | else | 209 | else |
@@ -395,9 +395,9 @@ void ceph_con_close(struct ceph_connection *con) | |||
395 | ceph_pr_addr(&con->peer_addr.in_addr)); | 395 | ceph_pr_addr(&con->peer_addr.in_addr)); |
396 | set_bit(CLOSED, &con->state); /* in case there's queued work */ | 396 | set_bit(CLOSED, &con->state); /* in case there's queued work */ |
397 | clear_bit(STANDBY, &con->state); /* avoid connect_seq bump */ | 397 | clear_bit(STANDBY, &con->state); /* avoid connect_seq bump */ |
398 | clear_bit(LOSSYTX, &con->state); /* so we retry next connect */ | 398 | clear_bit(LOSSYTX, &con->flags); /* so we retry next connect */ |
399 | clear_bit(KEEPALIVE_PENDING, &con->state); | 399 | clear_bit(KEEPALIVE_PENDING, &con->flags); |
400 | clear_bit(WRITE_PENDING, &con->state); | 400 | clear_bit(WRITE_PENDING, &con->flags); |
401 | mutex_lock(&con->mutex); | 401 | mutex_lock(&con->mutex); |
402 | reset_connection(con); | 402 | reset_connection(con); |
403 | con->peer_global_seq = 0; | 403 | con->peer_global_seq = 0; |
@@ -614,7 +614,7 @@ static void prepare_write_message(struct ceph_connection *con) | |||
614 | prepare_write_message_footer(con); | 614 | prepare_write_message_footer(con); |
615 | } | 615 | } |
616 | 616 | ||
617 | set_bit(WRITE_PENDING, &con->state); | 617 | set_bit(WRITE_PENDING, &con->flags); |
618 | } | 618 | } |
619 | 619 | ||
620 | /* | 620 | /* |
@@ -635,7 +635,7 @@ static void prepare_write_ack(struct ceph_connection *con) | |||
635 | &con->out_temp_ack); | 635 | &con->out_temp_ack); |
636 | 636 | ||
637 | con->out_more = 1; /* more will follow.. eventually.. */ | 637 | con->out_more = 1; /* more will follow.. eventually.. */ |
638 | set_bit(WRITE_PENDING, &con->state); | 638 | set_bit(WRITE_PENDING, &con->flags); |
639 | } | 639 | } |
640 | 640 | ||
641 | /* | 641 | /* |
@@ -646,7 +646,7 @@ static void prepare_write_keepalive(struct ceph_connection *con) | |||
646 | dout("prepare_write_keepalive %p\n", con); | 646 | dout("prepare_write_keepalive %p\n", con); |
647 | con_out_kvec_reset(con); | 647 | con_out_kvec_reset(con); |
648 | con_out_kvec_add(con, sizeof (tag_keepalive), &tag_keepalive); | 648 | con_out_kvec_add(con, sizeof (tag_keepalive), &tag_keepalive); |
649 | set_bit(WRITE_PENDING, &con->state); | 649 | set_bit(WRITE_PENDING, &con->flags); |
650 | } | 650 | } |
651 | 651 | ||
652 | /* | 652 | /* |
@@ -675,7 +675,7 @@ static struct ceph_auth_handshake *get_connect_authorizer(struct ceph_connection | |||
675 | 675 | ||
676 | if (IS_ERR(auth)) | 676 | if (IS_ERR(auth)) |
677 | return auth; | 677 | return auth; |
678 | if (test_bit(CLOSED, &con->state) || test_bit(OPENING, &con->state)) | 678 | if (test_bit(CLOSED, &con->state) || test_bit(OPENING, &con->flags)) |
679 | return ERR_PTR(-EAGAIN); | 679 | return ERR_PTR(-EAGAIN); |
680 | 680 | ||
681 | con->auth_reply_buf = auth->authorizer_reply_buf; | 681 | con->auth_reply_buf = auth->authorizer_reply_buf; |
@@ -695,7 +695,7 @@ static void prepare_write_banner(struct ceph_connection *con) | |||
695 | &con->msgr->my_enc_addr); | 695 | &con->msgr->my_enc_addr); |
696 | 696 | ||
697 | con->out_more = 0; | 697 | con->out_more = 0; |
698 | set_bit(WRITE_PENDING, &con->state); | 698 | set_bit(WRITE_PENDING, &con->flags); |
699 | } | 699 | } |
700 | 700 | ||
701 | static int prepare_write_connect(struct ceph_connection *con) | 701 | static int prepare_write_connect(struct ceph_connection *con) |
@@ -745,7 +745,7 @@ static int prepare_write_connect(struct ceph_connection *con) | |||
745 | auth->authorizer_buf); | 745 | auth->authorizer_buf); |
746 | 746 | ||
747 | con->out_more = 0; | 747 | con->out_more = 0; |
748 | set_bit(WRITE_PENDING, &con->state); | 748 | set_bit(WRITE_PENDING, &con->flags); |
749 | 749 | ||
750 | return 0; | 750 | return 0; |
751 | } | 751 | } |
@@ -1492,7 +1492,7 @@ static int process_connect(struct ceph_connection *con) | |||
1492 | le32_to_cpu(con->in_reply.connect_seq)); | 1492 | le32_to_cpu(con->in_reply.connect_seq)); |
1493 | 1493 | ||
1494 | if (con->in_reply.flags & CEPH_MSG_CONNECT_LOSSY) | 1494 | if (con->in_reply.flags & CEPH_MSG_CONNECT_LOSSY) |
1495 | set_bit(LOSSYTX, &con->state); | 1495 | set_bit(LOSSYTX, &con->flags); |
1496 | 1496 | ||
1497 | prepare_read_tag(con); | 1497 | prepare_read_tag(con); |
1498 | break; | 1498 | break; |
@@ -1933,14 +1933,14 @@ do_next: | |||
1933 | prepare_write_ack(con); | 1933 | prepare_write_ack(con); |
1934 | goto more; | 1934 | goto more; |
1935 | } | 1935 | } |
1936 | if (test_and_clear_bit(KEEPALIVE_PENDING, &con->state)) { | 1936 | if (test_and_clear_bit(KEEPALIVE_PENDING, &con->flags)) { |
1937 | prepare_write_keepalive(con); | 1937 | prepare_write_keepalive(con); |
1938 | goto more; | 1938 | goto more; |
1939 | } | 1939 | } |
1940 | } | 1940 | } |
1941 | 1941 | ||
1942 | /* Nothing to do! */ | 1942 | /* Nothing to do! */ |
1943 | clear_bit(WRITE_PENDING, &con->state); | 1943 | clear_bit(WRITE_PENDING, &con->flags); |
1944 | dout("try_write nothing else to write.\n"); | 1944 | dout("try_write nothing else to write.\n"); |
1945 | ret = 0; | 1945 | ret = 0; |
1946 | out: | 1946 | out: |
@@ -2106,7 +2106,7 @@ static void con_work(struct work_struct *work) | |||
2106 | 2106 | ||
2107 | mutex_lock(&con->mutex); | 2107 | mutex_lock(&con->mutex); |
2108 | restart: | 2108 | restart: |
2109 | if (test_and_clear_bit(BACKOFF, &con->state)) { | 2109 | if (test_and_clear_bit(BACKOFF, &con->flags)) { |
2110 | dout("con_work %p backing off\n", con); | 2110 | dout("con_work %p backing off\n", con); |
2111 | if (queue_delayed_work(ceph_msgr_wq, &con->work, | 2111 | if (queue_delayed_work(ceph_msgr_wq, &con->work, |
2112 | round_jiffies_relative(con->delay))) { | 2112 | round_jiffies_relative(con->delay))) { |
@@ -2135,7 +2135,7 @@ restart: | |||
2135 | con_close_socket(con); | 2135 | con_close_socket(con); |
2136 | } | 2136 | } |
2137 | 2137 | ||
2138 | if (test_and_clear_bit(SOCK_CLOSED, &con->state)) | 2138 | if (test_and_clear_bit(SOCK_CLOSED, &con->flags)) |
2139 | goto fault; | 2139 | goto fault; |
2140 | 2140 | ||
2141 | ret = try_read(con); | 2141 | ret = try_read(con); |
@@ -2174,7 +2174,7 @@ static void ceph_fault(struct ceph_connection *con) | |||
2174 | dout("fault %p state %lu to peer %s\n", | 2174 | dout("fault %p state %lu to peer %s\n", |
2175 | con, con->state, ceph_pr_addr(&con->peer_addr.in_addr)); | 2175 | con, con->state, ceph_pr_addr(&con->peer_addr.in_addr)); |
2176 | 2176 | ||
2177 | if (test_bit(LOSSYTX, &con->state)) { | 2177 | if (test_bit(LOSSYTX, &con->flags)) { |
2178 | dout("fault on LOSSYTX channel\n"); | 2178 | dout("fault on LOSSYTX channel\n"); |
2179 | goto out; | 2179 | goto out; |
2180 | } | 2180 | } |
@@ -2196,9 +2196,9 @@ static void ceph_fault(struct ceph_connection *con) | |||
2196 | /* If there are no messages queued or keepalive pending, place | 2196 | /* If there are no messages queued or keepalive pending, place |
2197 | * the connection in a STANDBY state */ | 2197 | * the connection in a STANDBY state */ |
2198 | if (list_empty(&con->out_queue) && | 2198 | if (list_empty(&con->out_queue) && |
2199 | !test_bit(KEEPALIVE_PENDING, &con->state)) { | 2199 | !test_bit(KEEPALIVE_PENDING, &con->flags)) { |
2200 | dout("fault %p setting STANDBY clearing WRITE_PENDING\n", con); | 2200 | dout("fault %p setting STANDBY clearing WRITE_PENDING\n", con); |
2201 | clear_bit(WRITE_PENDING, &con->state); | 2201 | clear_bit(WRITE_PENDING, &con->flags); |
2202 | set_bit(STANDBY, &con->state); | 2202 | set_bit(STANDBY, &con->state); |
2203 | } else { | 2203 | } else { |
2204 | /* retry after a delay. */ | 2204 | /* retry after a delay. */ |
@@ -2222,7 +2222,7 @@ static void ceph_fault(struct ceph_connection *con) | |||
2222 | * that when con_work restarts we schedule the | 2222 | * that when con_work restarts we schedule the |
2223 | * delay then. | 2223 | * delay then. |
2224 | */ | 2224 | */ |
2225 | set_bit(BACKOFF, &con->state); | 2225 | set_bit(BACKOFF, &con->flags); |
2226 | } | 2226 | } |
2227 | } | 2227 | } |
2228 | 2228 | ||
@@ -2278,8 +2278,8 @@ static void clear_standby(struct ceph_connection *con) | |||
2278 | mutex_lock(&con->mutex); | 2278 | mutex_lock(&con->mutex); |
2279 | dout("clear_standby %p and ++connect_seq\n", con); | 2279 | dout("clear_standby %p and ++connect_seq\n", con); |
2280 | con->connect_seq++; | 2280 | con->connect_seq++; |
2281 | WARN_ON(test_bit(WRITE_PENDING, &con->state)); | 2281 | WARN_ON(test_bit(WRITE_PENDING, &con->flags)); |
2282 | WARN_ON(test_bit(KEEPALIVE_PENDING, &con->state)); | 2282 | WARN_ON(test_bit(KEEPALIVE_PENDING, &con->flags)); |
2283 | mutex_unlock(&con->mutex); | 2283 | mutex_unlock(&con->mutex); |
2284 | } | 2284 | } |
2285 | } | 2285 | } |
@@ -2317,7 +2317,7 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg) | |||
2317 | /* if there wasn't anything waiting to send before, queue | 2317 | /* if there wasn't anything waiting to send before, queue |
2318 | * new work */ | 2318 | * new work */ |
2319 | clear_standby(con); | 2319 | clear_standby(con); |
2320 | if (test_and_set_bit(WRITE_PENDING, &con->state) == 0) | 2320 | if (test_and_set_bit(WRITE_PENDING, &con->flags) == 0) |
2321 | queue_con(con); | 2321 | queue_con(con); |
2322 | } | 2322 | } |
2323 | EXPORT_SYMBOL(ceph_con_send); | 2323 | EXPORT_SYMBOL(ceph_con_send); |
@@ -2384,8 +2384,8 @@ void ceph_con_keepalive(struct ceph_connection *con) | |||
2384 | { | 2384 | { |
2385 | dout("con_keepalive %p\n", con); | 2385 | dout("con_keepalive %p\n", con); |
2386 | clear_standby(con); | 2386 | clear_standby(con); |
2387 | if (test_and_set_bit(KEEPALIVE_PENDING, &con->state) == 0 && | 2387 | if (test_and_set_bit(KEEPALIVE_PENDING, &con->flags) == 0 && |
2388 | test_and_set_bit(WRITE_PENDING, &con->state) == 0) | 2388 | test_and_set_bit(WRITE_PENDING, &con->flags) == 0) |
2389 | queue_con(con); | 2389 | queue_con(con); |
2390 | } | 2390 | } |
2391 | EXPORT_SYMBOL(ceph_con_keepalive); | 2391 | EXPORT_SYMBOL(ceph_con_keepalive); |