aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAlex Elder <elder@inktank.com>2012-05-22 12:41:43 -0400
committerAlex Elder <elder@dreamhost.com>2012-06-01 09:37:56 -0400
commit928443cd9644e7cfd46f687dbeffda2d1a357ff9 (patch)
tree09112335af82c7ce01d93caa55567341f9468c3f
parent15d9882c336db2db73ccf9871ae2398e452f694c (diff)
libceph: start separating connection flags from state
A ceph_connection holds a mixture of connection state (as in "state machine" state) and connection flags in a single "state" field. To make the distinction more clear, define a new "flags" field and use it rather than the "state" field to hold Boolean flag values. Signed-off-by: Alex Elder <elder@inktank.com> Reviewed-by: Sage Weil<sage@inktank.com>
-rw-r--r--include/linux/ceph/messenger.h18
-rw-r--r--net/ceph/messenger.c50
2 files changed, 37 insertions, 31 deletions
diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h
index 3fbd4be804ed..920235e114ad 100644
--- a/include/linux/ceph/messenger.h
+++ b/include/linux/ceph/messenger.h
@@ -103,20 +103,25 @@ struct ceph_msg_pos {
103#define MAX_DELAY_INTERVAL (5 * 60 * HZ) 103#define MAX_DELAY_INTERVAL (5 * 60 * HZ)
104 104
105/* 105/*
106 * ceph_connection state bit flags 106 * ceph_connection flag bits
107 */ 107 */
108
108#define LOSSYTX 0 /* we can close channel or drop messages on errors */ 109#define LOSSYTX 0 /* we can close channel or drop messages on errors */
109#define CONNECTING 1
110#define NEGOTIATING 2
111#define KEEPALIVE_PENDING 3 110#define KEEPALIVE_PENDING 3
112#define WRITE_PENDING 4 /* we have data ready to send */ 111#define WRITE_PENDING 4 /* we have data ready to send */
112#define SOCK_CLOSED 11 /* socket state changed to closed */
113#define BACKOFF 15
114
115/*
116 * ceph_connection states
117 */
118#define CONNECTING 1
119#define NEGOTIATING 2
113#define STANDBY 8 /* no outgoing messages, socket closed. we keep 120#define STANDBY 8 /* no outgoing messages, socket closed. we keep
114 * the ceph_connection around to maintain shared 121 * the ceph_connection around to maintain shared
115 * state with the peer. */ 122 * state with the peer. */
116#define CLOSED 10 /* we've closed the connection */ 123#define CLOSED 10 /* we've closed the connection */
117#define SOCK_CLOSED 11 /* socket state changed to closed */
118#define OPENING 13 /* open connection w/ (possibly new) peer */ 124#define OPENING 13 /* open connection w/ (possibly new) peer */
119#define BACKOFF 15
120 125
121/* 126/*
122 * A single connection with another host. 127 * A single connection with another host.
@@ -133,7 +138,8 @@ struct ceph_connection {
133 138
134 struct ceph_messenger *msgr; 139 struct ceph_messenger *msgr;
135 struct socket *sock; 140 struct socket *sock;
136 unsigned long state; /* connection state (see flags above) */ 141 unsigned long flags;
142 unsigned long state;
137 const char *error_msg; /* error message, if any */ 143 const char *error_msg; /* error message, if any */
138 144
139 struct ceph_entity_addr peer_addr; /* peer address */ 145 struct ceph_entity_addr peer_addr; /* peer address */
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index d8423a3f6698..e84e4fd86bb7 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -176,7 +176,7 @@ static void ceph_sock_write_space(struct sock *sk)
176 * buffer. See net/ipv4/tcp_input.c:tcp_check_space() 176 * buffer. See net/ipv4/tcp_input.c:tcp_check_space()
177 * and net/core/stream.c:sk_stream_write_space(). 177 * and net/core/stream.c:sk_stream_write_space().
178 */ 178 */
179 if (test_bit(WRITE_PENDING, &con->state)) { 179 if (test_bit(WRITE_PENDING, &con->flags)) {
180 if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk)) { 180 if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk)) {
181 dout("%s %p queueing write work\n", __func__, con); 181 dout("%s %p queueing write work\n", __func__, con);
182 clear_bit(SOCK_NOSPACE, &sk->sk_socket->flags); 182 clear_bit(SOCK_NOSPACE, &sk->sk_socket->flags);
@@ -203,7 +203,7 @@ static void ceph_sock_state_change(struct sock *sk)
203 dout("%s TCP_CLOSE\n", __func__); 203 dout("%s TCP_CLOSE\n", __func__);
204 case TCP_CLOSE_WAIT: 204 case TCP_CLOSE_WAIT:
205 dout("%s TCP_CLOSE_WAIT\n", __func__); 205 dout("%s TCP_CLOSE_WAIT\n", __func__);
206 if (test_and_set_bit(SOCK_CLOSED, &con->state) == 0) { 206 if (test_and_set_bit(SOCK_CLOSED, &con->flags) == 0) {
207 if (test_bit(CONNECTING, &con->state)) 207 if (test_bit(CONNECTING, &con->state))
208 con->error_msg = "connection failed"; 208 con->error_msg = "connection failed";
209 else 209 else
@@ -395,9 +395,9 @@ void ceph_con_close(struct ceph_connection *con)
395 ceph_pr_addr(&con->peer_addr.in_addr)); 395 ceph_pr_addr(&con->peer_addr.in_addr));
396 set_bit(CLOSED, &con->state); /* in case there's queued work */ 396 set_bit(CLOSED, &con->state); /* in case there's queued work */
397 clear_bit(STANDBY, &con->state); /* avoid connect_seq bump */ 397 clear_bit(STANDBY, &con->state); /* avoid connect_seq bump */
398 clear_bit(LOSSYTX, &con->state); /* so we retry next connect */ 398 clear_bit(LOSSYTX, &con->flags); /* so we retry next connect */
399 clear_bit(KEEPALIVE_PENDING, &con->state); 399 clear_bit(KEEPALIVE_PENDING, &con->flags);
400 clear_bit(WRITE_PENDING, &con->state); 400 clear_bit(WRITE_PENDING, &con->flags);
401 mutex_lock(&con->mutex); 401 mutex_lock(&con->mutex);
402 reset_connection(con); 402 reset_connection(con);
403 con->peer_global_seq = 0; 403 con->peer_global_seq = 0;
@@ -614,7 +614,7 @@ static void prepare_write_message(struct ceph_connection *con)
614 prepare_write_message_footer(con); 614 prepare_write_message_footer(con);
615 } 615 }
616 616
617 set_bit(WRITE_PENDING, &con->state); 617 set_bit(WRITE_PENDING, &con->flags);
618} 618}
619 619
620/* 620/*
@@ -635,7 +635,7 @@ static void prepare_write_ack(struct ceph_connection *con)
635 &con->out_temp_ack); 635 &con->out_temp_ack);
636 636
637 con->out_more = 1; /* more will follow.. eventually.. */ 637 con->out_more = 1; /* more will follow.. eventually.. */
638 set_bit(WRITE_PENDING, &con->state); 638 set_bit(WRITE_PENDING, &con->flags);
639} 639}
640 640
641/* 641/*
@@ -646,7 +646,7 @@ static void prepare_write_keepalive(struct ceph_connection *con)
646 dout("prepare_write_keepalive %p\n", con); 646 dout("prepare_write_keepalive %p\n", con);
647 con_out_kvec_reset(con); 647 con_out_kvec_reset(con);
648 con_out_kvec_add(con, sizeof (tag_keepalive), &tag_keepalive); 648 con_out_kvec_add(con, sizeof (tag_keepalive), &tag_keepalive);
649 set_bit(WRITE_PENDING, &con->state); 649 set_bit(WRITE_PENDING, &con->flags);
650} 650}
651 651
652/* 652/*
@@ -675,7 +675,7 @@ static struct ceph_auth_handshake *get_connect_authorizer(struct ceph_connection
675 675
676 if (IS_ERR(auth)) 676 if (IS_ERR(auth))
677 return auth; 677 return auth;
678 if (test_bit(CLOSED, &con->state) || test_bit(OPENING, &con->state)) 678 if (test_bit(CLOSED, &con->state) || test_bit(OPENING, &con->flags))
679 return ERR_PTR(-EAGAIN); 679 return ERR_PTR(-EAGAIN);
680 680
681 con->auth_reply_buf = auth->authorizer_reply_buf; 681 con->auth_reply_buf = auth->authorizer_reply_buf;
@@ -695,7 +695,7 @@ static void prepare_write_banner(struct ceph_connection *con)
695 &con->msgr->my_enc_addr); 695 &con->msgr->my_enc_addr);
696 696
697 con->out_more = 0; 697 con->out_more = 0;
698 set_bit(WRITE_PENDING, &con->state); 698 set_bit(WRITE_PENDING, &con->flags);
699} 699}
700 700
701static int prepare_write_connect(struct ceph_connection *con) 701static int prepare_write_connect(struct ceph_connection *con)
@@ -745,7 +745,7 @@ static int prepare_write_connect(struct ceph_connection *con)
745 auth->authorizer_buf); 745 auth->authorizer_buf);
746 746
747 con->out_more = 0; 747 con->out_more = 0;
748 set_bit(WRITE_PENDING, &con->state); 748 set_bit(WRITE_PENDING, &con->flags);
749 749
750 return 0; 750 return 0;
751} 751}
@@ -1492,7 +1492,7 @@ static int process_connect(struct ceph_connection *con)
1492 le32_to_cpu(con->in_reply.connect_seq)); 1492 le32_to_cpu(con->in_reply.connect_seq));
1493 1493
1494 if (con->in_reply.flags & CEPH_MSG_CONNECT_LOSSY) 1494 if (con->in_reply.flags & CEPH_MSG_CONNECT_LOSSY)
1495 set_bit(LOSSYTX, &con->state); 1495 set_bit(LOSSYTX, &con->flags);
1496 1496
1497 prepare_read_tag(con); 1497 prepare_read_tag(con);
1498 break; 1498 break;
@@ -1933,14 +1933,14 @@ do_next:
1933 prepare_write_ack(con); 1933 prepare_write_ack(con);
1934 goto more; 1934 goto more;
1935 } 1935 }
1936 if (test_and_clear_bit(KEEPALIVE_PENDING, &con->state)) { 1936 if (test_and_clear_bit(KEEPALIVE_PENDING, &con->flags)) {
1937 prepare_write_keepalive(con); 1937 prepare_write_keepalive(con);
1938 goto more; 1938 goto more;
1939 } 1939 }
1940 } 1940 }
1941 1941
1942 /* Nothing to do! */ 1942 /* Nothing to do! */
1943 clear_bit(WRITE_PENDING, &con->state); 1943 clear_bit(WRITE_PENDING, &con->flags);
1944 dout("try_write nothing else to write.\n"); 1944 dout("try_write nothing else to write.\n");
1945 ret = 0; 1945 ret = 0;
1946out: 1946out:
@@ -2106,7 +2106,7 @@ static void con_work(struct work_struct *work)
2106 2106
2107 mutex_lock(&con->mutex); 2107 mutex_lock(&con->mutex);
2108restart: 2108restart:
2109 if (test_and_clear_bit(BACKOFF, &con->state)) { 2109 if (test_and_clear_bit(BACKOFF, &con->flags)) {
2110 dout("con_work %p backing off\n", con); 2110 dout("con_work %p backing off\n", con);
2111 if (queue_delayed_work(ceph_msgr_wq, &con->work, 2111 if (queue_delayed_work(ceph_msgr_wq, &con->work,
2112 round_jiffies_relative(con->delay))) { 2112 round_jiffies_relative(con->delay))) {
@@ -2135,7 +2135,7 @@ restart:
2135 con_close_socket(con); 2135 con_close_socket(con);
2136 } 2136 }
2137 2137
2138 if (test_and_clear_bit(SOCK_CLOSED, &con->state)) 2138 if (test_and_clear_bit(SOCK_CLOSED, &con->flags))
2139 goto fault; 2139 goto fault;
2140 2140
2141 ret = try_read(con); 2141 ret = try_read(con);
@@ -2174,7 +2174,7 @@ static void ceph_fault(struct ceph_connection *con)
2174 dout("fault %p state %lu to peer %s\n", 2174 dout("fault %p state %lu to peer %s\n",
2175 con, con->state, ceph_pr_addr(&con->peer_addr.in_addr)); 2175 con, con->state, ceph_pr_addr(&con->peer_addr.in_addr));
2176 2176
2177 if (test_bit(LOSSYTX, &con->state)) { 2177 if (test_bit(LOSSYTX, &con->flags)) {
2178 dout("fault on LOSSYTX channel\n"); 2178 dout("fault on LOSSYTX channel\n");
2179 goto out; 2179 goto out;
2180 } 2180 }
@@ -2196,9 +2196,9 @@ static void ceph_fault(struct ceph_connection *con)
2196 /* If there are no messages queued or keepalive pending, place 2196 /* If there are no messages queued or keepalive pending, place
2197 * the connection in a STANDBY state */ 2197 * the connection in a STANDBY state */
2198 if (list_empty(&con->out_queue) && 2198 if (list_empty(&con->out_queue) &&
2199 !test_bit(KEEPALIVE_PENDING, &con->state)) { 2199 !test_bit(KEEPALIVE_PENDING, &con->flags)) {
2200 dout("fault %p setting STANDBY clearing WRITE_PENDING\n", con); 2200 dout("fault %p setting STANDBY clearing WRITE_PENDING\n", con);
2201 clear_bit(WRITE_PENDING, &con->state); 2201 clear_bit(WRITE_PENDING, &con->flags);
2202 set_bit(STANDBY, &con->state); 2202 set_bit(STANDBY, &con->state);
2203 } else { 2203 } else {
2204 /* retry after a delay. */ 2204 /* retry after a delay. */
@@ -2222,7 +2222,7 @@ static void ceph_fault(struct ceph_connection *con)
2222 * that when con_work restarts we schedule the 2222 * that when con_work restarts we schedule the
2223 * delay then. 2223 * delay then.
2224 */ 2224 */
2225 set_bit(BACKOFF, &con->state); 2225 set_bit(BACKOFF, &con->flags);
2226 } 2226 }
2227 } 2227 }
2228 2228
@@ -2278,8 +2278,8 @@ static void clear_standby(struct ceph_connection *con)
2278 mutex_lock(&con->mutex); 2278 mutex_lock(&con->mutex);
2279 dout("clear_standby %p and ++connect_seq\n", con); 2279 dout("clear_standby %p and ++connect_seq\n", con);
2280 con->connect_seq++; 2280 con->connect_seq++;
2281 WARN_ON(test_bit(WRITE_PENDING, &con->state)); 2281 WARN_ON(test_bit(WRITE_PENDING, &con->flags));
2282 WARN_ON(test_bit(KEEPALIVE_PENDING, &con->state)); 2282 WARN_ON(test_bit(KEEPALIVE_PENDING, &con->flags));
2283 mutex_unlock(&con->mutex); 2283 mutex_unlock(&con->mutex);
2284 } 2284 }
2285} 2285}
@@ -2317,7 +2317,7 @@ void ceph_con_send(struct ceph_connection *con, struct ceph_msg *msg)
2317 /* if there wasn't anything waiting to send before, queue 2317 /* if there wasn't anything waiting to send before, queue
2318 * new work */ 2318 * new work */
2319 clear_standby(con); 2319 clear_standby(con);
2320 if (test_and_set_bit(WRITE_PENDING, &con->state) == 0) 2320 if (test_and_set_bit(WRITE_PENDING, &con->flags) == 0)
2321 queue_con(con); 2321 queue_con(con);
2322} 2322}
2323EXPORT_SYMBOL(ceph_con_send); 2323EXPORT_SYMBOL(ceph_con_send);
@@ -2384,8 +2384,8 @@ void ceph_con_keepalive(struct ceph_connection *con)
2384{ 2384{
2385 dout("con_keepalive %p\n", con); 2385 dout("con_keepalive %p\n", con);
2386 clear_standby(con); 2386 clear_standby(con);
2387 if (test_and_set_bit(KEEPALIVE_PENDING, &con->state) == 0 && 2387 if (test_and_set_bit(KEEPALIVE_PENDING, &con->flags) == 0 &&
2388 test_and_set_bit(WRITE_PENDING, &con->state) == 0) 2388 test_and_set_bit(WRITE_PENDING, &con->flags) == 0)
2389 queue_con(con); 2389 queue_con(con);
2390} 2390}
2391EXPORT_SYMBOL(ceph_con_keepalive); 2391EXPORT_SYMBOL(ceph_con_keepalive);