aboutsummaryrefslogtreecommitdiffstats
path: root/net/ceph/messenger.c
diff options
context:
space:
mode:
authorAlex Elder <elder@inktank.com>2013-03-12 00:34:23 -0400
committerSage Weil <sage@inktank.com>2013-05-02 00:17:30 -0400
commit4c59b4a278f9b7a418ad8af933fd7b341df64393 (patch)
tree8f42d880e2ed3fb97c64ea293b42ff08f891537a /net/ceph/messenger.c
parent686be20875db63c6103573565c63db20153ee6e1 (diff)
libceph: collapse all data items into one
It turns out that only one of the data item types is ever used at any one time in a single message (currently). - A page array is used by the osd client (on behalf of the file system) and by rbd. Only one osd op (and therefore at most one data item) is ever used at a time by rbd. And the only time the file system sends two, the second op contains no data. - A bio is only used by the rbd client (and again, only one data item per message) - A page list is used by the file system and by rbd for outgoing data, but only one op (and one data item) at a time. We can therefore collapse all three of our data item fields into a single field "data", and depend on the messenger code to properly handle it based on its type. This allows us to eliminate quite a bit of duplicated code. This is related to: http://tracker.ceph.com/issues/4429 Signed-off-by: Alex Elder <elder@inktank.com> Reviewed-by: Josh Durgin <josh.durgin@inktank.com>
Diffstat (limited to 'net/ceph/messenger.c')
-rw-r--r--net/ceph/messenger.c123
1 files changed, 38 insertions, 85 deletions
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index a19ba00ce777..6b5b5c625547 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -1085,22 +1085,15 @@ static void prepare_message_data(struct ceph_msg *msg,
1085 1085
1086 /* initialize page iterator */ 1086 /* initialize page iterator */
1087 msg_pos->page = 0; 1087 msg_pos->page = 0;
1088 if (ceph_msg_has_pages(msg)) 1088 if (ceph_msg_has_data(msg))
1089 msg_pos->page_pos = msg->p.alignment; 1089 msg_pos->page_pos = msg->data.alignment;
1090 else 1090 else
1091 msg_pos->page_pos = 0; 1091 msg_pos->page_pos = 0;
1092 msg_pos->data_pos = 0; 1092 msg_pos->data_pos = 0;
1093 1093
1094 /* Initialize data cursors */ 1094 /* Initialize data cursor */
1095 1095
1096#ifdef CONFIG_BLOCK 1096 ceph_msg_data_cursor_init(&msg->data, data_len);
1097 if (ceph_msg_has_bio(msg))
1098 ceph_msg_data_cursor_init(&msg->b, data_len);
1099#endif /* CONFIG_BLOCK */
1100 if (ceph_msg_has_pages(msg))
1101 ceph_msg_data_cursor_init(&msg->p, data_len);
1102 if (ceph_msg_has_pagelist(msg))
1103 ceph_msg_data_cursor_init(&msg->l, data_len);
1104 1097
1105 msg_pos->did_page_crc = false; 1098 msg_pos->did_page_crc = false;
1106} 1099}
@@ -1166,10 +1159,10 @@ static void prepare_write_message(struct ceph_connection *con)
1166 m->needs_out_seq = false; 1159 m->needs_out_seq = false;
1167 } 1160 }
1168 1161
1169 dout("prepare_write_message %p seq %lld type %d len %d+%d+%d (%zd)\n", 1162 dout("prepare_write_message %p seq %lld type %d len %d+%d+%d\n",
1170 m, con->out_seq, le16_to_cpu(m->hdr.type), 1163 m, con->out_seq, le16_to_cpu(m->hdr.type),
1171 le32_to_cpu(m->hdr.front_len), le32_to_cpu(m->hdr.middle_len), 1164 le32_to_cpu(m->hdr.front_len), le32_to_cpu(m->hdr.middle_len),
1172 le32_to_cpu(m->hdr.data_len), m->p.length); 1165 le32_to_cpu(m->hdr.data_len));
1173 BUG_ON(le32_to_cpu(m->hdr.front_len) != m->front.iov_len); 1166 BUG_ON(le32_to_cpu(m->hdr.front_len) != m->front.iov_len);
1174 1167
1175 /* tag + hdr + front + middle */ 1168 /* tag + hdr + front + middle */
@@ -1411,14 +1404,7 @@ static void out_msg_pos_next(struct ceph_connection *con, struct page *page,
1411 1404
1412 msg_pos->data_pos += sent; 1405 msg_pos->data_pos += sent;
1413 msg_pos->page_pos += sent; 1406 msg_pos->page_pos += sent;
1414 if (ceph_msg_has_pages(msg)) 1407 need_crc = ceph_msg_data_advance(&msg->data, sent);
1415 need_crc = ceph_msg_data_advance(&msg->p, sent);
1416 else if (ceph_msg_has_pagelist(msg))
1417 need_crc = ceph_msg_data_advance(&msg->l, sent);
1418#ifdef CONFIG_BLOCK
1419 else if (ceph_msg_has_bio(msg))
1420 need_crc = ceph_msg_data_advance(&msg->b, sent);
1421#endif /* CONFIG_BLOCK */
1422 BUG_ON(need_crc && sent != len); 1408 BUG_ON(need_crc && sent != len);
1423 1409
1424 if (sent < len) 1410 if (sent < len)
@@ -1441,12 +1427,8 @@ static void in_msg_pos_next(struct ceph_connection *con, size_t len,
1441 1427
1442 msg_pos->data_pos += received; 1428 msg_pos->data_pos += received;
1443 msg_pos->page_pos += received; 1429 msg_pos->page_pos += received;
1444 if (ceph_msg_has_pages(msg)) 1430 (void) ceph_msg_data_advance(&msg->data, received);
1445 (void) ceph_msg_data_advance(&msg->p, received); 1431
1446#ifdef CONFIG_BLOCK
1447 else if (ceph_msg_has_bio(msg))
1448 (void) ceph_msg_data_advance(&msg->b, received);
1449#endif /* CONFIG_BLOCK */
1450 if (received < len) 1432 if (received < len)
1451 return; 1433 return;
1452 1434
@@ -1486,6 +1468,9 @@ static int write_partial_message_data(struct ceph_connection *con)
1486 dout("%s %p msg %p page %d offset %d\n", __func__, 1468 dout("%s %p msg %p page %d offset %d\n", __func__,
1487 con, msg, msg_pos->page, msg_pos->page_pos); 1469 con, msg, msg_pos->page, msg_pos->page_pos);
1488 1470
1471 if (WARN_ON(!ceph_msg_has_data(msg)))
1472 return -EINVAL;
1473
1489 /* 1474 /*
1490 * Iterate through each page that contains data to be 1475 * Iterate through each page that contains data to be
1491 * written, and send as much as possible for each. 1476 * written, and send as much as possible for each.
@@ -1500,23 +1485,8 @@ static int write_partial_message_data(struct ceph_connection *con)
1500 size_t length; 1485 size_t length;
1501 bool last_piece; 1486 bool last_piece;
1502 1487
1503 if (ceph_msg_has_pages(msg)) { 1488 page = ceph_msg_data_next(&msg->data, &page_offset, &length,
1504 page = ceph_msg_data_next(&msg->p, &page_offset, 1489 &last_piece);
1505 &length, &last_piece);
1506 } else if (ceph_msg_has_pagelist(msg)) {
1507 page = ceph_msg_data_next(&msg->l, &page_offset,
1508 &length, &last_piece);
1509#ifdef CONFIG_BLOCK
1510 } else if (ceph_msg_has_bio(msg)) {
1511 page = ceph_msg_data_next(&msg->b, &page_offset,
1512 &length, &last_piece);
1513#endif
1514 } else {
1515 WARN(1, "con %p data_len %u but no outbound data\n",
1516 con, data_len);
1517 ret = -EINVAL;
1518 goto out;
1519 }
1520 if (do_datacrc && !msg_pos->did_page_crc) { 1490 if (do_datacrc && !msg_pos->did_page_crc) {
1521 u32 crc = le32_to_cpu(msg->footer.data_crc); 1491 u32 crc = le32_to_cpu(msg->footer.data_crc);
1522 1492
@@ -2197,20 +2167,13 @@ static int read_partial_msg_data(struct ceph_connection *con)
2197 int ret; 2167 int ret;
2198 2168
2199 BUG_ON(!msg); 2169 BUG_ON(!msg);
2170 if (WARN_ON(!ceph_msg_has_data(msg)))
2171 return -EIO;
2200 2172
2201 data_len = le32_to_cpu(con->in_hdr.data_len); 2173 data_len = le32_to_cpu(con->in_hdr.data_len);
2202 while (msg_pos->data_pos < data_len) { 2174 while (msg_pos->data_pos < data_len) {
2203 if (ceph_msg_has_pages(msg)) { 2175 page = ceph_msg_data_next(&msg->data, &page_offset, &length,
2204 page = ceph_msg_data_next(&msg->p, &page_offset, 2176 NULL);
2205 &length, NULL);
2206#ifdef CONFIG_BLOCK
2207 } else if (ceph_msg_has_bio(msg)) {
2208 page = ceph_msg_data_next(&msg->b, &page_offset,
2209 &length, NULL);
2210#endif
2211 } else {
2212 BUG_ON(1);
2213 }
2214 ret = ceph_tcp_recvpage(con->sock, page, page_offset, length); 2177 ret = ceph_tcp_recvpage(con->sock, page, page_offset, length);
2215 if (ret <= 0) 2178 if (ret <= 0)
2216 return ret; 2179 return ret;
@@ -2218,7 +2181,6 @@ static int read_partial_msg_data(struct ceph_connection *con)
2218 if (do_datacrc) 2181 if (do_datacrc)
2219 con->in_data_crc = ceph_crc32c_page(con->in_data_crc, 2182 con->in_data_crc = ceph_crc32c_page(con->in_data_crc,
2220 page, page_offset, ret); 2183 page, page_offset, ret);
2221
2222 in_msg_pos_next(con, length, ret); 2184 in_msg_pos_next(con, length, ret);
2223 } 2185 }
2224 2186
@@ -3043,12 +3005,12 @@ void ceph_msg_data_set_pages(struct ceph_msg *msg, struct page **pages,
3043{ 3005{
3044 BUG_ON(!pages); 3006 BUG_ON(!pages);
3045 BUG_ON(!length); 3007 BUG_ON(!length);
3046 BUG_ON(msg->p.type != CEPH_MSG_DATA_NONE); 3008 BUG_ON(msg->data.type != CEPH_MSG_DATA_NONE);
3047 3009
3048 msg->p.type = CEPH_MSG_DATA_PAGES; 3010 msg->data.type = CEPH_MSG_DATA_PAGES;
3049 msg->p.pages = pages; 3011 msg->data.pages = pages;
3050 msg->p.length = length; 3012 msg->data.length = length;
3051 msg->p.alignment = alignment & ~PAGE_MASK; 3013 msg->data.alignment = alignment & ~PAGE_MASK;
3052} 3014}
3053EXPORT_SYMBOL(ceph_msg_data_set_pages); 3015EXPORT_SYMBOL(ceph_msg_data_set_pages);
3054 3016
@@ -3057,20 +3019,20 @@ void ceph_msg_data_set_pagelist(struct ceph_msg *msg,
3057{ 3019{
3058 BUG_ON(!pagelist); 3020 BUG_ON(!pagelist);
3059 BUG_ON(!pagelist->length); 3021 BUG_ON(!pagelist->length);
3060 BUG_ON(msg->l.type != CEPH_MSG_DATA_NONE); 3022 BUG_ON(msg->data.type != CEPH_MSG_DATA_NONE);
3061 3023
3062 msg->l.type = CEPH_MSG_DATA_PAGELIST; 3024 msg->data.type = CEPH_MSG_DATA_PAGELIST;
3063 msg->l.pagelist = pagelist; 3025 msg->data.pagelist = pagelist;
3064} 3026}
3065EXPORT_SYMBOL(ceph_msg_data_set_pagelist); 3027EXPORT_SYMBOL(ceph_msg_data_set_pagelist);
3066 3028
3067void ceph_msg_data_set_bio(struct ceph_msg *msg, struct bio *bio) 3029void ceph_msg_data_set_bio(struct ceph_msg *msg, struct bio *bio)
3068{ 3030{
3069 BUG_ON(!bio); 3031 BUG_ON(!bio);
3070 BUG_ON(msg->b.type != CEPH_MSG_DATA_NONE); 3032 BUG_ON(msg->data.type != CEPH_MSG_DATA_NONE);
3071 3033
3072 msg->b.type = CEPH_MSG_DATA_BIO; 3034 msg->data.type = CEPH_MSG_DATA_BIO;
3073 msg->b.bio = bio; 3035 msg->data.bio = bio;
3074} 3036}
3075EXPORT_SYMBOL(ceph_msg_data_set_bio); 3037EXPORT_SYMBOL(ceph_msg_data_set_bio);
3076 3038
@@ -3094,9 +3056,7 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags,
3094 INIT_LIST_HEAD(&m->list_head); 3056 INIT_LIST_HEAD(&m->list_head);
3095 kref_init(&m->kref); 3057 kref_init(&m->kref);
3096 3058
3097 ceph_msg_data_init(&m->p); 3059 ceph_msg_data_init(&m->data);
3098 ceph_msg_data_init(&m->l);
3099 ceph_msg_data_init(&m->b);
3100 3060
3101 /* front */ 3061 /* front */
3102 m->front_max = front_len; 3062 m->front_max = front_len;
@@ -3251,20 +3211,13 @@ void ceph_msg_last_put(struct kref *kref)
3251 ceph_buffer_put(m->middle); 3211 ceph_buffer_put(m->middle);
3252 m->middle = NULL; 3212 m->middle = NULL;
3253 } 3213 }
3254 if (ceph_msg_has_pages(m)) { 3214 if (ceph_msg_has_data(m)) {
3255 m->p.length = 0; 3215 if (m->data.type == CEPH_MSG_DATA_PAGELIST) {
3256 m->p.pages = NULL; 3216 ceph_pagelist_release(m->data.pagelist);
3257 m->p.type = CEPH_OSD_DATA_TYPE_NONE; 3217 kfree(m->data.pagelist);
3258 } 3218 }
3259 if (ceph_msg_has_pagelist(m)) { 3219 memset(&m->data, 0, sizeof m->data);
3260 ceph_pagelist_release(m->l.pagelist); 3220 ceph_msg_data_init(&m->data);
3261 kfree(m->l.pagelist);
3262 m->l.pagelist = NULL;
3263 m->l.type = CEPH_OSD_DATA_TYPE_NONE;
3264 }
3265 if (ceph_msg_has_bio(m)) {
3266 m->b.bio = NULL;
3267 m->b.type = CEPH_OSD_DATA_TYPE_NONE;
3268 } 3221 }
3269 3222
3270 if (m->pool) 3223 if (m->pool)
@@ -3277,7 +3230,7 @@ EXPORT_SYMBOL(ceph_msg_last_put);
3277void ceph_msg_dump(struct ceph_msg *msg) 3230void ceph_msg_dump(struct ceph_msg *msg)
3278{ 3231{
3279 pr_debug("msg_dump %p (front_max %d length %zd)\n", msg, 3232 pr_debug("msg_dump %p (front_max %d length %zd)\n", msg,
3280 msg->front_max, msg->p.length); 3233 msg->front_max, msg->data.length);
3281 print_hex_dump(KERN_DEBUG, "header: ", 3234 print_hex_dump(KERN_DEBUG, "header: ",
3282 DUMP_PREFIX_OFFSET, 16, 1, 3235 DUMP_PREFIX_OFFSET, 16, 1,
3283 &msg->hdr, sizeof(msg->hdr), true); 3236 &msg->hdr, sizeof(msg->hdr), true);