aboutsummaryrefslogtreecommitdiffstats
path: root/net
diff options
context:
space:
mode:
authorIlya Dryomov <idryomov@gmail.com>2015-12-28 05:18:34 -0500
committerIlya Dryomov <idryomov@gmail.com>2016-01-21 13:36:08 -0500
commit67645d7619738e51c668ca69f097cb90b5470422 (patch)
tree1812146b41776faa99f49294cb5b628cebd11d23 /net
parent10bcee149f62e7c5122f79aefc30d610b413280b (diff)
libceph: fix ceph_msg_revoke()
There are a number of problems with revoking a "was sending" message: (1) We never make any attempt to revoke data - only kvecs contibute to con->out_skip. However, once the header (envelope) is written to the socket, our peer learns data_len and sets itself to expect at least data_len bytes to follow front or front+middle. If ceph_msg_revoke() is called while the messenger is sending message's data portion, anything we send after that call is counted by the OSD towards the now revoked message's data portion. The effects vary, the most common one is the eventual hang - higher layers get stuck waiting for the reply to the message that was sent out after ceph_msg_revoke() returned and treated by the OSD as a bunch of data bytes. This is what Matt ran into. (2) Flat out zeroing con->out_kvec_bytes worth of bytes to handle kvecs is wrong. If ceph_msg_revoke() is called before the tag is sent out or while the messenger is sending the header, we will get a connection reset, either due to a bad tag (0 is not a valid tag) or a bad header CRC, which kind of defeats the purpose of revoke. Currently the kernel client refuses to work with header CRCs disabled, but that will likely change in the future, making this even worse. (3) con->out_skip is not reset on connection reset, leading to one or more spurious connection resets if we happen to get a real one between con->out_skip is set in ceph_msg_revoke() and before it's cleared in write_partial_skip(). Fixing (1) and (3) is trivial. The idea behind fixing (2) is to never zero the tag or the header, i.e. send out tag+header regardless of when ceph_msg_revoke() is called. That way the header is always correct, no unnecessary resets are induced and revoke stands ready for disabled CRCs. Since ceph_msg_revoke() rips out con->out_msg, introduce a new "message out temp" and copy the header into it before sending. Cc: stable@vger.kernel.org # 4.0+ Reported-by: Matt Conner <matt.conner@keepertech.com> Signed-off-by: Ilya Dryomov <idryomov@gmail.com> Tested-by: Matt Conner <matt.conner@keepertech.com> Reviewed-by: Sage Weil <sage@redhat.com>
Diffstat (limited to 'net')
-rw-r--r--net/ceph/messenger.c76
1 files changed, 58 insertions, 18 deletions
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index de3eb19a6968..3850d1a5bd7c 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -669,6 +669,8 @@ static void reset_connection(struct ceph_connection *con)
669 } 669 }
670 con->in_seq = 0; 670 con->in_seq = 0;
671 con->in_seq_acked = 0; 671 con->in_seq_acked = 0;
672
673 con->out_skip = 0;
672} 674}
673 675
674/* 676/*
@@ -768,6 +770,8 @@ static u32 get_global_seq(struct ceph_messenger *msgr, u32 gt)
768 770
769static void con_out_kvec_reset(struct ceph_connection *con) 771static void con_out_kvec_reset(struct ceph_connection *con)
770{ 772{
773 BUG_ON(con->out_skip);
774
771 con->out_kvec_left = 0; 775 con->out_kvec_left = 0;
772 con->out_kvec_bytes = 0; 776 con->out_kvec_bytes = 0;
773 con->out_kvec_cur = &con->out_kvec[0]; 777 con->out_kvec_cur = &con->out_kvec[0];
@@ -776,9 +780,9 @@ static void con_out_kvec_reset(struct ceph_connection *con)
776static void con_out_kvec_add(struct ceph_connection *con, 780static void con_out_kvec_add(struct ceph_connection *con,
777 size_t size, void *data) 781 size_t size, void *data)
778{ 782{
779 int index; 783 int index = con->out_kvec_left;
780 784
781 index = con->out_kvec_left; 785 BUG_ON(con->out_skip);
782 BUG_ON(index >= ARRAY_SIZE(con->out_kvec)); 786 BUG_ON(index >= ARRAY_SIZE(con->out_kvec));
783 787
784 con->out_kvec[index].iov_len = size; 788 con->out_kvec[index].iov_len = size;
@@ -787,6 +791,27 @@ static void con_out_kvec_add(struct ceph_connection *con,
787 con->out_kvec_bytes += size; 791 con->out_kvec_bytes += size;
788} 792}
789 793
794/*
795 * Chop off a kvec from the end. Return residual number of bytes for
796 * that kvec, i.e. how many bytes would have been written if the kvec
797 * hadn't been nuked.
798 */
799static int con_out_kvec_skip(struct ceph_connection *con)
800{
801 int off = con->out_kvec_cur - con->out_kvec;
802 int skip = 0;
803
804 if (con->out_kvec_bytes > 0) {
805 skip = con->out_kvec[off + con->out_kvec_left - 1].iov_len;
806 BUG_ON(con->out_kvec_bytes < skip);
807 BUG_ON(!con->out_kvec_left);
808 con->out_kvec_bytes -= skip;
809 con->out_kvec_left--;
810 }
811
812 return skip;
813}
814
790#ifdef CONFIG_BLOCK 815#ifdef CONFIG_BLOCK
791 816
792/* 817/*
@@ -1194,7 +1219,6 @@ static void prepare_write_message_footer(struct ceph_connection *con)
1194 m->footer.flags |= CEPH_MSG_FOOTER_COMPLETE; 1219 m->footer.flags |= CEPH_MSG_FOOTER_COMPLETE;
1195 1220
1196 dout("prepare_write_message_footer %p\n", con); 1221 dout("prepare_write_message_footer %p\n", con);
1197 con->out_kvec_is_msg = true;
1198 con->out_kvec[v].iov_base = &m->footer; 1222 con->out_kvec[v].iov_base = &m->footer;
1199 if (con->peer_features & CEPH_FEATURE_MSG_AUTH) { 1223 if (con->peer_features & CEPH_FEATURE_MSG_AUTH) {
1200 if (con->ops->sign_message) 1224 if (con->ops->sign_message)
@@ -1222,7 +1246,6 @@ static void prepare_write_message(struct ceph_connection *con)
1222 u32 crc; 1246 u32 crc;
1223 1247
1224 con_out_kvec_reset(con); 1248 con_out_kvec_reset(con);
1225 con->out_kvec_is_msg = true;
1226 con->out_msg_done = false; 1249 con->out_msg_done = false;
1227 1250
1228 /* Sneak an ack in there first? If we can get it into the same 1251 /* Sneak an ack in there first? If we can get it into the same
@@ -1262,18 +1285,19 @@ static void prepare_write_message(struct ceph_connection *con)
1262 1285
1263 /* tag + hdr + front + middle */ 1286 /* tag + hdr + front + middle */
1264 con_out_kvec_add(con, sizeof (tag_msg), &tag_msg); 1287 con_out_kvec_add(con, sizeof (tag_msg), &tag_msg);
1265 con_out_kvec_add(con, sizeof (m->hdr), &m->hdr); 1288 con_out_kvec_add(con, sizeof(con->out_hdr), &con->out_hdr);
1266 con_out_kvec_add(con, m->front.iov_len, m->front.iov_base); 1289 con_out_kvec_add(con, m->front.iov_len, m->front.iov_base);
1267 1290
1268 if (m->middle) 1291 if (m->middle)
1269 con_out_kvec_add(con, m->middle->vec.iov_len, 1292 con_out_kvec_add(con, m->middle->vec.iov_len,
1270 m->middle->vec.iov_base); 1293 m->middle->vec.iov_base);
1271 1294
1272 /* fill in crc (except data pages), footer */ 1295 /* fill in hdr crc and finalize hdr */
1273 crc = crc32c(0, &m->hdr, offsetof(struct ceph_msg_header, crc)); 1296 crc = crc32c(0, &m->hdr, offsetof(struct ceph_msg_header, crc));
1274 con->out_msg->hdr.crc = cpu_to_le32(crc); 1297 con->out_msg->hdr.crc = cpu_to_le32(crc);
1275 con->out_msg->footer.flags = 0; 1298 memcpy(&con->out_hdr, &con->out_msg->hdr, sizeof(con->out_hdr));
1276 1299
1300 /* fill in front and middle crc, footer */
1277 crc = crc32c(0, m->front.iov_base, m->front.iov_len); 1301 crc = crc32c(0, m->front.iov_base, m->front.iov_len);
1278 con->out_msg->footer.front_crc = cpu_to_le32(crc); 1302 con->out_msg->footer.front_crc = cpu_to_le32(crc);
1279 if (m->middle) { 1303 if (m->middle) {
@@ -1285,6 +1309,7 @@ static void prepare_write_message(struct ceph_connection *con)
1285 dout("%s front_crc %u middle_crc %u\n", __func__, 1309 dout("%s front_crc %u middle_crc %u\n", __func__,
1286 le32_to_cpu(con->out_msg->footer.front_crc), 1310 le32_to_cpu(con->out_msg->footer.front_crc),
1287 le32_to_cpu(con->out_msg->footer.middle_crc)); 1311 le32_to_cpu(con->out_msg->footer.middle_crc));
1312 con->out_msg->footer.flags = 0;
1288 1313
1289 /* is there a data payload? */ 1314 /* is there a data payload? */
1290 con->out_msg->footer.data_crc = 0; 1315 con->out_msg->footer.data_crc = 0;
@@ -1489,7 +1514,6 @@ static int write_partial_kvec(struct ceph_connection *con)
1489 } 1514 }
1490 } 1515 }
1491 con->out_kvec_left = 0; 1516 con->out_kvec_left = 0;
1492 con->out_kvec_is_msg = false;
1493 ret = 1; 1517 ret = 1;
1494out: 1518out:
1495 dout("write_partial_kvec %p %d left in %d kvecs ret = %d\n", con, 1519 dout("write_partial_kvec %p %d left in %d kvecs ret = %d\n", con,
@@ -1581,6 +1605,7 @@ static int write_partial_skip(struct ceph_connection *con)
1581{ 1605{
1582 int ret; 1606 int ret;
1583 1607
1608 dout("%s %p %d left\n", __func__, con, con->out_skip);
1584 while (con->out_skip > 0) { 1609 while (con->out_skip > 0) {
1585 size_t size = min(con->out_skip, (int) PAGE_CACHE_SIZE); 1610 size_t size = min(con->out_skip, (int) PAGE_CACHE_SIZE);
1586 1611
@@ -2503,13 +2528,13 @@ more:
2503 2528
2504more_kvec: 2529more_kvec:
2505 /* kvec data queued? */ 2530 /* kvec data queued? */
2506 if (con->out_skip) { 2531 if (con->out_kvec_left) {
2507 ret = write_partial_skip(con); 2532 ret = write_partial_kvec(con);
2508 if (ret <= 0) 2533 if (ret <= 0)
2509 goto out; 2534 goto out;
2510 } 2535 }
2511 if (con->out_kvec_left) { 2536 if (con->out_skip) {
2512 ret = write_partial_kvec(con); 2537 ret = write_partial_skip(con);
2513 if (ret <= 0) 2538 if (ret <= 0)
2514 goto out; 2539 goto out;
2515 } 2540 }
@@ -3047,16 +3072,31 @@ void ceph_msg_revoke(struct ceph_msg *msg)
3047 ceph_msg_put(msg); 3072 ceph_msg_put(msg);
3048 } 3073 }
3049 if (con->out_msg == msg) { 3074 if (con->out_msg == msg) {
3050 dout("%s %p msg %p - was sending\n", __func__, con, msg); 3075 BUG_ON(con->out_skip);
3051 con->out_msg = NULL; 3076 /* footer */
3052 if (con->out_kvec_is_msg) { 3077 if (con->out_msg_done) {
3053 con->out_skip = con->out_kvec_bytes; 3078 con->out_skip += con_out_kvec_skip(con);
3054 con->out_kvec_is_msg = false; 3079 } else {
3080 BUG_ON(!msg->data_length);
3081 if (con->peer_features & CEPH_FEATURE_MSG_AUTH)
3082 con->out_skip += sizeof(msg->footer);
3083 else
3084 con->out_skip += sizeof(msg->old_footer);
3055 } 3085 }
3086 /* data, middle, front */
3087 if (msg->data_length)
3088 con->out_skip += msg->cursor.total_resid;
3089 if (msg->middle)
3090 con->out_skip += con_out_kvec_skip(con);
3091 con->out_skip += con_out_kvec_skip(con);
3092
3093 dout("%s %p msg %p - was sending, will write %d skip %d\n",
3094 __func__, con, msg, con->out_kvec_bytes, con->out_skip);
3056 msg->hdr.seq = 0; 3095 msg->hdr.seq = 0;
3057 3096 con->out_msg = NULL;
3058 ceph_msg_put(msg); 3097 ceph_msg_put(msg);
3059 } 3098 }
3099
3060 mutex_unlock(&con->mutex); 3100 mutex_unlock(&con->mutex);
3061} 3101}
3062 3102