diff options
author | Sage Weil <sage@inktank.com> | 2013-03-25 11:47:40 -0400 |
---|---|---|
committer | Sage Weil <sage@inktank.com> | 2013-05-02 00:17:09 -0400 |
commit | 3a23083bda56850a1dc0e1c6d270b1f5dc789f07 (patch) | |
tree | ddd52f6a7c8da4334d68ae3e28825d89155d29a0 /net | |
parent | 022f3e2ee2354599faccf5a764a5a24a5dd194c9 (diff) |
libceph: implement RECONNECT_SEQ feature
This is an old protocol extension that allows the client and server to
avoid resending old messages after a reconnect (following a socket error).
Instead, the exchange their sequence numbers during the handshake. This
avoids sending a bunch of useless data over the socket.
It has been supported in the server code since v0.22 (Sep 2010).
Signed-off-by: Sage Weil <sage@inktank.com>
Reviewed-by: Alex Elder <elder@inktank.com>
Diffstat (limited to 'net')
-rw-r--r-- | net/ceph/messenger.c | 43 |
1 files changed, 38 insertions, 5 deletions
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c index 997daccf973a..e8491db43f5e 100644 --- a/net/ceph/messenger.c +++ b/net/ceph/messenger.c | |||
@@ -1247,6 +1247,24 @@ static void prepare_write_ack(struct ceph_connection *con) | |||
1247 | } | 1247 | } |
1248 | 1248 | ||
1249 | /* | 1249 | /* |
1250 | * Prepare to share the seq during handshake | ||
1251 | */ | ||
1252 | static void prepare_write_seq(struct ceph_connection *con) | ||
1253 | { | ||
1254 | dout("prepare_write_seq %p %llu -> %llu\n", con, | ||
1255 | con->in_seq_acked, con->in_seq); | ||
1256 | con->in_seq_acked = con->in_seq; | ||
1257 | |||
1258 | con_out_kvec_reset(con); | ||
1259 | |||
1260 | con->out_temp_ack = cpu_to_le64(con->in_seq_acked); | ||
1261 | con_out_kvec_add(con, sizeof (con->out_temp_ack), | ||
1262 | &con->out_temp_ack); | ||
1263 | |||
1264 | con_flag_set(con, CON_FLAG_WRITE_PENDING); | ||
1265 | } | ||
1266 | |||
1267 | /* | ||
1250 | * Prepare to write keepalive byte. | 1268 | * Prepare to write keepalive byte. |
1251 | */ | 1269 | */ |
1252 | static void prepare_write_keepalive(struct ceph_connection *con) | 1270 | static void prepare_write_keepalive(struct ceph_connection *con) |
@@ -1582,6 +1600,13 @@ static void prepare_read_ack(struct ceph_connection *con) | |||
1582 | con->in_base_pos = 0; | 1600 | con->in_base_pos = 0; |
1583 | } | 1601 | } |
1584 | 1602 | ||
1603 | static void prepare_read_seq(struct ceph_connection *con) | ||
1604 | { | ||
1605 | dout("prepare_read_seq %p\n", con); | ||
1606 | con->in_base_pos = 0; | ||
1607 | con->in_tag = CEPH_MSGR_TAG_SEQ; | ||
1608 | } | ||
1609 | |||
1585 | static void prepare_read_tag(struct ceph_connection *con) | 1610 | static void prepare_read_tag(struct ceph_connection *con) |
1586 | { | 1611 | { |
1587 | dout("prepare_read_tag %p\n", con); | 1612 | dout("prepare_read_tag %p\n", con); |
@@ -2059,6 +2084,7 @@ static int process_connect(struct ceph_connection *con) | |||
2059 | prepare_read_connect(con); | 2084 | prepare_read_connect(con); |
2060 | break; | 2085 | break; |
2061 | 2086 | ||
2087 | case CEPH_MSGR_TAG_SEQ: | ||
2062 | case CEPH_MSGR_TAG_READY: | 2088 | case CEPH_MSGR_TAG_READY: |
2063 | if (req_feat & ~server_feat) { | 2089 | if (req_feat & ~server_feat) { |
2064 | pr_err("%s%lld %s protocol feature mismatch," | 2090 | pr_err("%s%lld %s protocol feature mismatch," |
@@ -2089,7 +2115,12 @@ static int process_connect(struct ceph_connection *con) | |||
2089 | 2115 | ||
2090 | con->delay = 0; /* reset backoff memory */ | 2116 | con->delay = 0; /* reset backoff memory */ |
2091 | 2117 | ||
2092 | prepare_read_tag(con); | 2118 | if (con->in_reply.tag == CEPH_MSGR_TAG_SEQ) { |
2119 | prepare_write_seq(con); | ||
2120 | prepare_read_seq(con); | ||
2121 | } else { | ||
2122 | prepare_read_tag(con); | ||
2123 | } | ||
2093 | break; | 2124 | break; |
2094 | 2125 | ||
2095 | case CEPH_MSGR_TAG_WAIT: | 2126 | case CEPH_MSGR_TAG_WAIT: |
@@ -2123,7 +2154,6 @@ static int read_partial_ack(struct ceph_connection *con) | |||
2123 | return read_partial(con, end, size, &con->in_temp_ack); | 2154 | return read_partial(con, end, size, &con->in_temp_ack); |
2124 | } | 2155 | } |
2125 | 2156 | ||
2126 | |||
2127 | /* | 2157 | /* |
2128 | * We can finally discard anything that's been acked. | 2158 | * We can finally discard anything that's been acked. |
2129 | */ | 2159 | */ |
@@ -2148,8 +2178,6 @@ static void process_ack(struct ceph_connection *con) | |||
2148 | } | 2178 | } |
2149 | 2179 | ||
2150 | 2180 | ||
2151 | |||
2152 | |||
2153 | static int read_partial_message_section(struct ceph_connection *con, | 2181 | static int read_partial_message_section(struct ceph_connection *con, |
2154 | struct kvec *section, | 2182 | struct kvec *section, |
2155 | unsigned int sec_len, u32 *crc) | 2183 | unsigned int sec_len, u32 *crc) |
@@ -2672,7 +2700,12 @@ more: | |||
2672 | prepare_read_tag(con); | 2700 | prepare_read_tag(con); |
2673 | goto more; | 2701 | goto more; |
2674 | } | 2702 | } |
2675 | if (con->in_tag == CEPH_MSGR_TAG_ACK) { | 2703 | if (con->in_tag == CEPH_MSGR_TAG_ACK || |
2704 | con->in_tag == CEPH_MSGR_TAG_SEQ) { | ||
2705 | /* | ||
2706 | * the final handshake seq exchange is semantically | ||
2707 | * equivalent to an ACK | ||
2708 | */ | ||
2676 | ret = read_partial_ack(con); | 2709 | ret = read_partial_ack(con); |
2677 | if (ret <= 0) | 2710 | if (ret <= 0) |
2678 | goto out; | 2711 | goto out; |