aboutsummaryrefslogtreecommitdiffstats
path: root/fs/ceph/messenger.c
diff options
context:
space:
mode:
authorYehuda Sadeh <yehuda@hq.newdream.net>2010-04-06 18:01:27 -0400
committerSage Weil <sage@newdream.net>2010-10-20 18:37:18 -0400
commit68b4476b0bc13fef18266b4140309a30e86739d2 (patch)
tree47fab5ea2491c7bc75fe14a3b0d3a091eb6244b7 /fs/ceph/messenger.c
parent3499e8a5d4dbb083324efd942e2c4fb7eb65f27c (diff)
ceph: messenger and osdc changes for rbd
Allow the messenger to send/receive data in a bio. This is added so that we wouldn't need to copy the data into pages or some other buffer when doing IO for an rbd block device. We can now have trailing variable sized data for osd ops. Also osd ops encoding is more modular. Signed-off-by: Yehuda Sadeh <yehuda@hq.newdream.net> Signed-off-by: Sage Weil <sage@newdream.net>
Diffstat (limited to 'fs/ceph/messenger.c')
-rw-r--r--fs/ceph/messenger.c219
1 files changed, 187 insertions, 32 deletions
diff --git a/fs/ceph/messenger.c b/fs/ceph/messenger.c
index 2502d76fcec1..17a09b32a591 100644
--- a/fs/ceph/messenger.c
+++ b/fs/ceph/messenger.c
@@ -9,6 +9,8 @@
9#include <linux/slab.h> 9#include <linux/slab.h>
10#include <linux/socket.h> 10#include <linux/socket.h>
11#include <linux/string.h> 11#include <linux/string.h>
12#include <linux/bio.h>
13#include <linux/blkdev.h>
12#include <net/tcp.h> 14#include <net/tcp.h>
13 15
14#include "super.h" 16#include "super.h"
@@ -529,8 +531,11 @@ static void prepare_write_message(struct ceph_connection *con)
529 if (le32_to_cpu(m->hdr.data_len) > 0) { 531 if (le32_to_cpu(m->hdr.data_len) > 0) {
530 /* initialize page iterator */ 532 /* initialize page iterator */
531 con->out_msg_pos.page = 0; 533 con->out_msg_pos.page = 0;
532 con->out_msg_pos.page_pos = 534 if (m->pages)
533 le16_to_cpu(m->hdr.data_off) & ~PAGE_MASK; 535 con->out_msg_pos.page_pos =
536 le16_to_cpu(m->hdr.data_off) & ~PAGE_MASK;
537 else
538 con->out_msg_pos.page_pos = 0;
534 con->out_msg_pos.data_pos = 0; 539 con->out_msg_pos.data_pos = 0;
535 con->out_msg_pos.did_page_crc = 0; 540 con->out_msg_pos.did_page_crc = 0;
536 con->out_more = 1; /* data + footer will follow */ 541 con->out_more = 1; /* data + footer will follow */
@@ -712,6 +717,31 @@ out:
712 return ret; /* done! */ 717 return ret; /* done! */
713} 718}
714 719
720#ifdef CONFIG_BLOCK
721static void init_bio_iter(struct bio *bio, struct bio **iter, int *seg)
722{
723 if (!bio) {
724 *iter = NULL;
725 *seg = 0;
726 return;
727 }
728 *iter = bio;
729 *seg = bio->bi_idx;
730}
731
732static void iter_bio_next(struct bio **bio_iter, int *seg)
733{
734 if (*bio_iter == NULL)
735 return;
736
737 BUG_ON(*seg >= (*bio_iter)->bi_vcnt);
738
739 (*seg)++;
740 if (*seg == (*bio_iter)->bi_vcnt)
741 init_bio_iter((*bio_iter)->bi_next, bio_iter, seg);
742}
743#endif
744
715/* 745/*
716 * Write as much message data payload as we can. If we finish, queue 746 * Write as much message data payload as we can. If we finish, queue
717 * up the footer. 747 * up the footer.
@@ -726,21 +756,46 @@ static int write_partial_msg_pages(struct ceph_connection *con)
726 size_t len; 756 size_t len;
727 int crc = con->msgr->nocrc; 757 int crc = con->msgr->nocrc;
728 int ret; 758 int ret;
759 int total_max_write;
760 int in_trail = 0;
761 size_t trail_len = (msg->trail ? msg->trail->length : 0);
729 762
730 dout("write_partial_msg_pages %p msg %p page %d/%d offset %d\n", 763 dout("write_partial_msg_pages %p msg %p page %d/%d offset %d\n",
731 con, con->out_msg, con->out_msg_pos.page, con->out_msg->nr_pages, 764 con, con->out_msg, con->out_msg_pos.page, con->out_msg->nr_pages,
732 con->out_msg_pos.page_pos); 765 con->out_msg_pos.page_pos);
733 766
734 while (con->out_msg_pos.page < con->out_msg->nr_pages) { 767#ifdef CONFIG_BLOCK
768 if (msg->bio && !msg->bio_iter)
769 init_bio_iter(msg->bio, &msg->bio_iter, &msg->bio_seg);
770#endif
771
772 while (data_len > con->out_msg_pos.data_pos) {
735 struct page *page = NULL; 773 struct page *page = NULL;
736 void *kaddr = NULL; 774 void *kaddr = NULL;
775 int max_write = PAGE_SIZE;
776 int page_shift = 0;
777
778 total_max_write = data_len - trail_len -
779 con->out_msg_pos.data_pos;
737 780
738 /* 781 /*
739 * if we are calculating the data crc (the default), we need 782 * if we are calculating the data crc (the default), we need
740 * to map the page. if our pages[] has been revoked, use the 783 * to map the page. if our pages[] has been revoked, use the
741 * zero page. 784 * zero page.
742 */ 785 */
743 if (msg->pages) { 786
787 /* have we reached the trail part of the data? */
788 if (con->out_msg_pos.data_pos >= data_len - trail_len) {
789 in_trail = 1;
790
791 total_max_write = data_len - con->out_msg_pos.data_pos;
792
793 page = list_first_entry(&msg->trail->head,
794 struct page, lru);
795 if (crc)
796 kaddr = kmap(page);
797 max_write = PAGE_SIZE;
798 } else if (msg->pages) {
744 page = msg->pages[con->out_msg_pos.page]; 799 page = msg->pages[con->out_msg_pos.page];
745 if (crc) 800 if (crc)
746 kaddr = kmap(page); 801 kaddr = kmap(page);
@@ -749,13 +804,25 @@ static int write_partial_msg_pages(struct ceph_connection *con)
749 struct page, lru); 804 struct page, lru);
750 if (crc) 805 if (crc)
751 kaddr = kmap(page); 806 kaddr = kmap(page);
807#ifdef CONFIG_BLOCK
808 } else if (msg->bio) {
809 struct bio_vec *bv;
810
811 bv = bio_iovec_idx(msg->bio_iter, msg->bio_seg);
812 page = bv->bv_page;
813 page_shift = bv->bv_offset;
814 if (crc)
815 kaddr = kmap(page) + page_shift;
816 max_write = bv->bv_len;
817#endif
752 } else { 818 } else {
753 page = con->msgr->zero_page; 819 page = con->msgr->zero_page;
754 if (crc) 820 if (crc)
755 kaddr = page_address(con->msgr->zero_page); 821 kaddr = page_address(con->msgr->zero_page);
756 } 822 }
757 len = min((int)(PAGE_SIZE - con->out_msg_pos.page_pos), 823 len = min_t(int, max_write - con->out_msg_pos.page_pos,
758 (int)(data_len - con->out_msg_pos.data_pos)); 824 total_max_write);
825
759 if (crc && !con->out_msg_pos.did_page_crc) { 826 if (crc && !con->out_msg_pos.did_page_crc) {
760 void *base = kaddr + con->out_msg_pos.page_pos; 827 void *base = kaddr + con->out_msg_pos.page_pos;
761 u32 tmpcrc = le32_to_cpu(con->out_msg->footer.data_crc); 828 u32 tmpcrc = le32_to_cpu(con->out_msg->footer.data_crc);
@@ -765,13 +832,14 @@ static int write_partial_msg_pages(struct ceph_connection *con)
765 cpu_to_le32(crc32c(tmpcrc, base, len)); 832 cpu_to_le32(crc32c(tmpcrc, base, len));
766 con->out_msg_pos.did_page_crc = 1; 833 con->out_msg_pos.did_page_crc = 1;
767 } 834 }
768
769 ret = kernel_sendpage(con->sock, page, 835 ret = kernel_sendpage(con->sock, page,
770 con->out_msg_pos.page_pos, len, 836 con->out_msg_pos.page_pos + page_shift,
837 len,
771 MSG_DONTWAIT | MSG_NOSIGNAL | 838 MSG_DONTWAIT | MSG_NOSIGNAL |
772 MSG_MORE); 839 MSG_MORE);
773 840
774 if (crc && (msg->pages || msg->pagelist)) 841 if (crc &&
842 (msg->pages || msg->pagelist || msg->bio || in_trail))
775 kunmap(page); 843 kunmap(page);
776 844
777 if (ret <= 0) 845 if (ret <= 0)
@@ -783,9 +851,16 @@ static int write_partial_msg_pages(struct ceph_connection *con)
783 con->out_msg_pos.page_pos = 0; 851 con->out_msg_pos.page_pos = 0;
784 con->out_msg_pos.page++; 852 con->out_msg_pos.page++;
785 con->out_msg_pos.did_page_crc = 0; 853 con->out_msg_pos.did_page_crc = 0;
786 if (msg->pagelist) 854 if (in_trail)
855 list_move_tail(&page->lru,
856 &msg->trail->head);
857 else if (msg->pagelist)
787 list_move_tail(&page->lru, 858 list_move_tail(&page->lru,
788 &msg->pagelist->head); 859 &msg->pagelist->head);
860#ifdef CONFIG_BLOCK
861 else if (msg->bio)
862 iter_bio_next(&msg->bio_iter, &msg->bio_seg);
863#endif
789 } 864 }
790 } 865 }
791 866
@@ -1305,8 +1380,7 @@ static int read_partial_message_section(struct ceph_connection *con,
1305 struct kvec *section, 1380 struct kvec *section,
1306 unsigned int sec_len, u32 *crc) 1381 unsigned int sec_len, u32 *crc)
1307{ 1382{
1308 int left; 1383 int ret, left;
1309 int ret;
1310 1384
1311 BUG_ON(!section); 1385 BUG_ON(!section);
1312 1386
@@ -1329,13 +1403,83 @@ static int read_partial_message_section(struct ceph_connection *con,
1329static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con, 1403static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con,
1330 struct ceph_msg_header *hdr, 1404 struct ceph_msg_header *hdr,
1331 int *skip); 1405 int *skip);
1406
1407
1408static int read_partial_message_pages(struct ceph_connection *con,
1409 struct page **pages,
1410 unsigned data_len, int datacrc)
1411{
1412 void *p;
1413 int ret;
1414 int left;
1415
1416 left = min((int)(data_len - con->in_msg_pos.data_pos),
1417 (int)(PAGE_SIZE - con->in_msg_pos.page_pos));
1418 /* (page) data */
1419 BUG_ON(pages == NULL);
1420 p = kmap(pages[con->in_msg_pos.page]);
1421 ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos,
1422 left);
1423 if (ret > 0 && datacrc)
1424 con->in_data_crc =
1425 crc32c(con->in_data_crc,
1426 p + con->in_msg_pos.page_pos, ret);
1427 kunmap(pages[con->in_msg_pos.page]);
1428 if (ret <= 0)
1429 return ret;
1430 con->in_msg_pos.data_pos += ret;
1431 con->in_msg_pos.page_pos += ret;
1432 if (con->in_msg_pos.page_pos == PAGE_SIZE) {
1433 con->in_msg_pos.page_pos = 0;
1434 con->in_msg_pos.page++;
1435 }
1436
1437 return ret;
1438}
1439
1440#ifdef CONFIG_BLOCK
1441static int read_partial_message_bio(struct ceph_connection *con,
1442 struct bio **bio_iter, int *bio_seg,
1443 unsigned data_len, int datacrc)
1444{
1445 struct bio_vec *bv = bio_iovec_idx(*bio_iter, *bio_seg);
1446 void *p;
1447 int ret, left;
1448
1449 if (IS_ERR(bv))
1450 return PTR_ERR(bv);
1451
1452 left = min((int)(data_len - con->in_msg_pos.data_pos),
1453 (int)(bv->bv_len - con->in_msg_pos.page_pos));
1454
1455 p = kmap(bv->bv_page) + bv->bv_offset;
1456
1457 ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos,
1458 left);
1459 if (ret > 0 && datacrc)
1460 con->in_data_crc =
1461 crc32c(con->in_data_crc,
1462 p + con->in_msg_pos.page_pos, ret);
1463 kunmap(bv->bv_page);
1464 if (ret <= 0)
1465 return ret;
1466 con->in_msg_pos.data_pos += ret;
1467 con->in_msg_pos.page_pos += ret;
1468 if (con->in_msg_pos.page_pos == bv->bv_len) {
1469 con->in_msg_pos.page_pos = 0;
1470 iter_bio_next(bio_iter, bio_seg);
1471 }
1472
1473 return ret;
1474}
1475#endif
1476
1332/* 1477/*
1333 * read (part of) a message. 1478 * read (part of) a message.
1334 */ 1479 */
1335static int read_partial_message(struct ceph_connection *con) 1480static int read_partial_message(struct ceph_connection *con)
1336{ 1481{
1337 struct ceph_msg *m = con->in_msg; 1482 struct ceph_msg *m = con->in_msg;
1338 void *p;
1339 int ret; 1483 int ret;
1340 int to, left; 1484 int to, left;
1341 unsigned front_len, middle_len, data_len, data_off; 1485 unsigned front_len, middle_len, data_len, data_off;
@@ -1422,7 +1566,10 @@ static int read_partial_message(struct ceph_connection *con)
1422 m->middle->vec.iov_len = 0; 1566 m->middle->vec.iov_len = 0;
1423 1567
1424 con->in_msg_pos.page = 0; 1568 con->in_msg_pos.page = 0;
1425 con->in_msg_pos.page_pos = data_off & ~PAGE_MASK; 1569 if (m->pages)
1570 con->in_msg_pos.page_pos = data_off & ~PAGE_MASK;
1571 else
1572 con->in_msg_pos.page_pos = 0;
1426 con->in_msg_pos.data_pos = 0; 1573 con->in_msg_pos.data_pos = 0;
1427 } 1574 }
1428 1575
@@ -1440,27 +1587,29 @@ static int read_partial_message(struct ceph_connection *con)
1440 if (ret <= 0) 1587 if (ret <= 0)
1441 return ret; 1588 return ret;
1442 } 1589 }
1590#ifdef CONFIG_BLOCK
1591 if (m->bio && !m->bio_iter)
1592 init_bio_iter(m->bio, &m->bio_iter, &m->bio_seg);
1593#endif
1443 1594
1444 /* (page) data */ 1595 /* (page) data */
1445 while (con->in_msg_pos.data_pos < data_len) { 1596 while (con->in_msg_pos.data_pos < data_len) {
1446 left = min((int)(data_len - con->in_msg_pos.data_pos), 1597 if (m->pages) {
1447 (int)(PAGE_SIZE - con->in_msg_pos.page_pos)); 1598 ret = read_partial_message_pages(con, m->pages,
1448 BUG_ON(m->pages == NULL); 1599 data_len, datacrc);
1449 p = kmap(m->pages[con->in_msg_pos.page]); 1600 if (ret <= 0)
1450 ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos, 1601 return ret;
1451 left); 1602#ifdef CONFIG_BLOCK
1452 if (ret > 0 && datacrc) 1603 } else if (m->bio) {
1453 con->in_data_crc = 1604
1454 crc32c(con->in_data_crc, 1605 ret = read_partial_message_bio(con,
1455 p + con->in_msg_pos.page_pos, ret); 1606 &m->bio_iter, &m->bio_seg,
1456 kunmap(m->pages[con->in_msg_pos.page]); 1607 data_len, datacrc);
1457 if (ret <= 0) 1608 if (ret <= 0)
1458 return ret; 1609 return ret;
1459 con->in_msg_pos.data_pos += ret; 1610#endif
1460 con->in_msg_pos.page_pos += ret; 1611 } else {
1461 if (con->in_msg_pos.page_pos == PAGE_SIZE) { 1612 BUG_ON(1);
1462 con->in_msg_pos.page_pos = 0;
1463 con->in_msg_pos.page++;
1464 } 1613 }
1465 } 1614 }
1466 1615
@@ -2136,6 +2285,10 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags)
2136 m->nr_pages = 0; 2285 m->nr_pages = 0;
2137 m->pages = NULL; 2286 m->pages = NULL;
2138 m->pagelist = NULL; 2287 m->pagelist = NULL;
2288 m->bio = NULL;
2289 m->bio_iter = NULL;
2290 m->bio_seg = 0;
2291 m->trail = NULL;
2139 2292
2140 dout("ceph_msg_new %p front %d\n", m, front_len); 2293 dout("ceph_msg_new %p front %d\n", m, front_len);
2141 return m; 2294 return m;
@@ -2250,6 +2403,8 @@ void ceph_msg_last_put(struct kref *kref)
2250 m->pagelist = NULL; 2403 m->pagelist = NULL;
2251 } 2404 }
2252 2405
2406 m->trail = NULL;
2407
2253 if (m->pool) 2408 if (m->pool)
2254 ceph_msgpool_put(m->pool, m); 2409 ceph_msgpool_put(m->pool, m);
2255 else 2410 else