aboutsummaryrefslogtreecommitdiffstats
path: root/net/ceph
diff options
context:
space:
mode:
authorAlex Elder <elder@inktank.com>2013-03-12 00:34:23 -0400
committerSage Weil <sage@inktank.com>2013-05-02 00:17:34 -0400
commitf5db90bcf2c69d099f9d828a8104796f41de6bc5 (patch)
treee0f5e2c76beeea42d41c47f5ebb9b8130bf8a171 /net/ceph
parent859a35d5523e8e6a5c3568c12febe2e1270bc3a1 (diff)
libceph: kill last of ceph_msg_pos
The only remaining field in the ceph_msg_pos structure is did_page_crc. In the new cursor model of things that flag (or something like it) belongs in the cursor. Define a new field "need_crc" in the cursor (which applies to all types of data) and initialize it to true whenever a cursor is initialized. In write_partial_message_data(), the data CRC still will be computed as before, but it will check the cursor->need_crc field to determine whether it's needed. Any time the cursor is advanced to a new piece of a data item, need_crc will be set, and this will cause the crc for that entire piece to be accumulated into the data crc. In write_partial_message_data() the intermediate crc value is now held in a local variable so it doesn't have to be byte-swapped so many times. In read_partial_msg_data() we do something similar (but mainly for consistency there). With that, the ceph_msg_pos structure can go away, and it no longer needs to be passed as an argument to prepare_message_data(). This cleanup is related to: http://tracker.ceph.com/issues/4428 Signed-off-by: Alex Elder <elder@inktank.com> Reviewed-by: Josh Durgin <josh.durgin@inktank.com>
Diffstat (limited to 'net/ceph')
-rw-r--r--net/ceph/messenger.c56
1 files changed, 31 insertions, 25 deletions
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 19f9fffc170c..eee7a878dbfb 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -1002,6 +1002,7 @@ static void ceph_msg_data_cursor_init(struct ceph_msg_data *data,
1002 /* BUG(); */ 1002 /* BUG(); */
1003 break; 1003 break;
1004 } 1004 }
1005 data->cursor.need_crc = true;
1005} 1006}
1006 1007
1007/* 1008/*
@@ -1069,12 +1070,12 @@ static bool ceph_msg_data_advance(struct ceph_msg_data *data, size_t bytes)
1069 BUG(); 1070 BUG();
1070 break; 1071 break;
1071 } 1072 }
1073 data->cursor.need_crc = new_piece;
1072 1074
1073 return new_piece; 1075 return new_piece;
1074} 1076}
1075 1077
1076static void prepare_message_data(struct ceph_msg *msg, 1078static void prepare_message_data(struct ceph_msg *msg)
1077 struct ceph_msg_pos *msg_pos)
1078{ 1079{
1079 size_t data_len; 1080 size_t data_len;
1080 1081
@@ -1086,8 +1087,6 @@ static void prepare_message_data(struct ceph_msg *msg,
1086 /* Initialize data cursor */ 1087 /* Initialize data cursor */
1087 1088
1088 ceph_msg_data_cursor_init(&msg->data, data_len); 1089 ceph_msg_data_cursor_init(&msg->data, data_len);
1089
1090 msg_pos->did_page_crc = false;
1091} 1090}
1092 1091
1093/* 1092/*
@@ -1186,7 +1185,7 @@ static void prepare_write_message(struct ceph_connection *con)
1186 /* is there a data payload? */ 1185 /* is there a data payload? */
1187 con->out_msg->footer.data_crc = 0; 1186 con->out_msg->footer.data_crc = 0;
1188 if (m->hdr.data_len) { 1187 if (m->hdr.data_len) {
1189 prepare_message_data(con->out_msg, &con->out_msg_pos); 1188 prepare_message_data(con->out_msg);
1190 con->out_more = 1; /* data + footer will follow */ 1189 con->out_more = 1; /* data + footer will follow */
1191 } else { 1190 } else {
1192 /* no, queue up footer too and be done */ 1191 /* no, queue up footer too and be done */
@@ -1388,8 +1387,7 @@ static void out_msg_pos_next(struct ceph_connection *con, struct page *page,
1388 size_t len, size_t sent) 1387 size_t len, size_t sent)
1389{ 1388{
1390 struct ceph_msg *msg = con->out_msg; 1389 struct ceph_msg *msg = con->out_msg;
1391 struct ceph_msg_pos *msg_pos = &con->out_msg_pos; 1390 bool need_crc;
1392 bool need_crc = false;
1393 1391
1394 BUG_ON(!msg); 1392 BUG_ON(!msg);
1395 BUG_ON(!sent); 1393 BUG_ON(!sent);
@@ -1401,7 +1399,6 @@ static void out_msg_pos_next(struct ceph_connection *con, struct page *page,
1401 return; 1399 return;
1402 1400
1403 BUG_ON(sent != len); 1401 BUG_ON(sent != len);
1404 msg_pos->did_page_crc = false;
1405} 1402}
1406 1403
1407static void in_msg_pos_next(struct ceph_connection *con, size_t len, 1404static void in_msg_pos_next(struct ceph_connection *con, size_t len,
@@ -1444,9 +1441,8 @@ static int write_partial_message_data(struct ceph_connection *con)
1444{ 1441{
1445 struct ceph_msg *msg = con->out_msg; 1442 struct ceph_msg *msg = con->out_msg;
1446 struct ceph_msg_data_cursor *cursor = &msg->data.cursor; 1443 struct ceph_msg_data_cursor *cursor = &msg->data.cursor;
1447 struct ceph_msg_pos *msg_pos = &con->out_msg_pos;
1448 bool do_datacrc = !con->msgr->nocrc; 1444 bool do_datacrc = !con->msgr->nocrc;
1449 int ret; 1445 u32 crc;
1450 1446
1451 dout("%s %p msg %p\n", __func__, con, msg); 1447 dout("%s %p msg %p\n", __func__, con, msg);
1452 1448
@@ -1461,38 +1457,40 @@ static int write_partial_message_data(struct ceph_connection *con)
1461 * need to map the page. If we have no pages, they have 1457 * need to map the page. If we have no pages, they have
1462 * been revoked, so use the zero page. 1458 * been revoked, so use the zero page.
1463 */ 1459 */
1460 crc = do_datacrc ? le32_to_cpu(msg->footer.data_crc) : 0;
1464 while (cursor->resid) { 1461 while (cursor->resid) {
1465 struct page *page; 1462 struct page *page;
1466 size_t page_offset; 1463 size_t page_offset;
1467 size_t length; 1464 size_t length;
1468 bool last_piece; 1465 bool last_piece;
1466 int ret;
1469 1467
1470 page = ceph_msg_data_next(&msg->data, &page_offset, &length, 1468 page = ceph_msg_data_next(&msg->data, &page_offset, &length,
1471 &last_piece); 1469 &last_piece);
1472 if (do_datacrc && !msg_pos->did_page_crc) { 1470 if (do_datacrc && cursor->need_crc)
1473 u32 crc = le32_to_cpu(msg->footer.data_crc);
1474 crc = ceph_crc32c_page(crc, page, page_offset, length); 1471 crc = ceph_crc32c_page(crc, page, page_offset, length);
1475 msg->footer.data_crc = cpu_to_le32(crc);
1476 msg_pos->did_page_crc = true;
1477 }
1478 ret = ceph_tcp_sendpage(con->sock, page, page_offset, 1472 ret = ceph_tcp_sendpage(con->sock, page, page_offset,
1479 length, last_piece); 1473 length, last_piece);
1480 if (ret <= 0) 1474 if (ret <= 0) {
1481 goto out; 1475 if (do_datacrc)
1476 msg->footer.data_crc = cpu_to_le32(crc);
1482 1477
1478 return ret;
1479 }
1483 out_msg_pos_next(con, page, length, (size_t) ret); 1480 out_msg_pos_next(con, page, length, (size_t) ret);
1484 } 1481 }
1485 1482
1486 dout("%s %p msg %p done\n", __func__, con, msg); 1483 dout("%s %p msg %p done\n", __func__, con, msg);
1487 1484
1488 /* prepare and queue up footer, too */ 1485 /* prepare and queue up footer, too */
1489 if (!do_datacrc) 1486 if (do_datacrc)
1487 msg->footer.data_crc = cpu_to_le32(crc);
1488 else
1490 msg->footer.flags |= CEPH_MSG_FOOTER_NOCRC; 1489 msg->footer.flags |= CEPH_MSG_FOOTER_NOCRC;
1491 con_out_kvec_reset(con); 1490 con_out_kvec_reset(con);
1492 prepare_write_message_footer(con); 1491 prepare_write_message_footer(con);
1493 ret = 1; 1492
1494out: 1493 return 1; /* must return > 0 to indicate success */
1495 return ret;
1496} 1494}
1497 1495
1498/* 1496/*
@@ -2144,24 +2142,32 @@ static int read_partial_msg_data(struct ceph_connection *con)
2144 struct page *page; 2142 struct page *page;
2145 size_t page_offset; 2143 size_t page_offset;
2146 size_t length; 2144 size_t length;
2145 u32 crc = 0;
2147 int ret; 2146 int ret;
2148 2147
2149 BUG_ON(!msg); 2148 BUG_ON(!msg);
2150 if (WARN_ON(!ceph_msg_has_data(msg))) 2149 if (WARN_ON(!ceph_msg_has_data(msg)))
2151 return -EIO; 2150 return -EIO;
2152 2151
2152 if (do_datacrc)
2153 crc = con->in_data_crc;
2153 while (cursor->resid) { 2154 while (cursor->resid) {
2154 page = ceph_msg_data_next(&msg->data, &page_offset, &length, 2155 page = ceph_msg_data_next(&msg->data, &page_offset, &length,
2155 NULL); 2156 NULL);
2156 ret = ceph_tcp_recvpage(con->sock, page, page_offset, length); 2157 ret = ceph_tcp_recvpage(con->sock, page, page_offset, length);
2157 if (ret <= 0) 2158 if (ret <= 0) {
2159 if (do_datacrc)
2160 con->in_data_crc = crc;
2161
2158 return ret; 2162 return ret;
2163 }
2159 2164
2160 if (do_datacrc) 2165 if (do_datacrc)
2161 con->in_data_crc = ceph_crc32c_page(con->in_data_crc, 2166 crc = ceph_crc32c_page(crc, page, page_offset, ret);
2162 page, page_offset, ret);
2163 in_msg_pos_next(con, length, ret); 2167 in_msg_pos_next(con, length, ret);
2164 } 2168 }
2169 if (do_datacrc)
2170 con->in_data_crc = crc;
2165 2171
2166 return 1; /* must return > 0 to indicate success */ 2172 return 1; /* must return > 0 to indicate success */
2167} 2173}
@@ -2257,7 +2263,7 @@ static int read_partial_message(struct ceph_connection *con)
2257 /* prepare for data payload, if any */ 2263 /* prepare for data payload, if any */
2258 2264
2259 if (data_len) 2265 if (data_len)
2260 prepare_message_data(con->in_msg, &con->in_msg_pos); 2266 prepare_message_data(con->in_msg);
2261 } 2267 }
2262 2268
2263 /* front */ 2269 /* front */