aboutsummaryrefslogtreecommitdiffstats
path: root/fs/ceph/messenger.c
diff options
context:
space:
mode:
Diffstat (limited to 'fs/ceph/messenger.c')
-rw-r--r--fs/ceph/messenger.c219
1 files changed, 187 insertions, 32 deletions
diff --git a/fs/ceph/messenger.c b/fs/ceph/messenger.c
index 2502d76fcec1..17a09b32a591 100644
--- a/fs/ceph/messenger.c
+++ b/fs/ceph/messenger.c
@@ -9,6 +9,8 @@
9#include <linux/slab.h> 9#include <linux/slab.h>
10#include <linux/socket.h> 10#include <linux/socket.h>
11#include <linux/string.h> 11#include <linux/string.h>
12#include <linux/bio.h>
13#include <linux/blkdev.h>
12#include <net/tcp.h> 14#include <net/tcp.h>
13 15
14#include "super.h" 16#include "super.h"
@@ -529,8 +531,11 @@ static void prepare_write_message(struct ceph_connection *con)
529 if (le32_to_cpu(m->hdr.data_len) > 0) { 531 if (le32_to_cpu(m->hdr.data_len) > 0) {
530 /* initialize page iterator */ 532 /* initialize page iterator */
531 con->out_msg_pos.page = 0; 533 con->out_msg_pos.page = 0;
532 con->out_msg_pos.page_pos = 534 if (m->pages)
533 le16_to_cpu(m->hdr.data_off) & ~PAGE_MASK; 535 con->out_msg_pos.page_pos =
536 le16_to_cpu(m->hdr.data_off) & ~PAGE_MASK;
537 else
538 con->out_msg_pos.page_pos = 0;
534 con->out_msg_pos.data_pos = 0; 539 con->out_msg_pos.data_pos = 0;
535 con->out_msg_pos.did_page_crc = 0; 540 con->out_msg_pos.did_page_crc = 0;
536 con->out_more = 1; /* data + footer will follow */ 541 con->out_more = 1; /* data + footer will follow */
@@ -712,6 +717,31 @@ out:
712 return ret; /* done! */ 717 return ret; /* done! */
713} 718}
714 719
720#ifdef CONFIG_BLOCK
721static void init_bio_iter(struct bio *bio, struct bio **iter, int *seg)
722{
723 if (!bio) {
724 *iter = NULL;
725 *seg = 0;
726 return;
727 }
728 *iter = bio;
729 *seg = bio->bi_idx;
730}
731
732static void iter_bio_next(struct bio **bio_iter, int *seg)
733{
734 if (*bio_iter == NULL)
735 return;
736
737 BUG_ON(*seg >= (*bio_iter)->bi_vcnt);
738
739 (*seg)++;
740 if (*seg == (*bio_iter)->bi_vcnt)
741 init_bio_iter((*bio_iter)->bi_next, bio_iter, seg);
742}
743#endif
744
715/* 745/*
716 * Write as much message data payload as we can. If we finish, queue 746 * Write as much message data payload as we can. If we finish, queue
717 * up the footer. 747 * up the footer.
@@ -726,21 +756,46 @@ static int write_partial_msg_pages(struct ceph_connection *con)
726 size_t len; 756 size_t len;
727 int crc = con->msgr->nocrc; 757 int crc = con->msgr->nocrc;
728 int ret; 758 int ret;
759 int total_max_write;
760 int in_trail = 0;
761 size_t trail_len = (msg->trail ? msg->trail->length : 0);
729 762
730 dout("write_partial_msg_pages %p msg %p page %d/%d offset %d\n", 763 dout("write_partial_msg_pages %p msg %p page %d/%d offset %d\n",
731 con, con->out_msg, con->out_msg_pos.page, con->out_msg->nr_pages, 764 con, con->out_msg, con->out_msg_pos.page, con->out_msg->nr_pages,
732 con->out_msg_pos.page_pos); 765 con->out_msg_pos.page_pos);
733 766
734 while (con->out_msg_pos.page < con->out_msg->nr_pages) { 767#ifdef CONFIG_BLOCK
768 if (msg->bio && !msg->bio_iter)
769 init_bio_iter(msg->bio, &msg->bio_iter, &msg->bio_seg);
770#endif
771
772 while (data_len > con->out_msg_pos.data_pos) {
735 struct page *page = NULL; 773 struct page *page = NULL;
736 void *kaddr = NULL; 774 void *kaddr = NULL;
775 int max_write = PAGE_SIZE;
776 int page_shift = 0;
777
778 total_max_write = data_len - trail_len -
779 con->out_msg_pos.data_pos;
737 780
738 /* 781 /*
739 * if we are calculating the data crc (the default), we need 782 * if we are calculating the data crc (the default), we need
740 * to map the page. if our pages[] has been revoked, use the 783 * to map the page. if our pages[] has been revoked, use the
741 * zero page. 784 * zero page.
742 */ 785 */
743 if (msg->pages) { 786
787 /* have we reached the trail part of the data? */
788 if (con->out_msg_pos.data_pos >= data_len - trail_len) {
789 in_trail = 1;
790
791 total_max_write = data_len - con->out_msg_pos.data_pos;
792
793 page = list_first_entry(&msg->trail->head,
794 struct page, lru);
795 if (crc)
796 kaddr = kmap(page);
797 max_write = PAGE_SIZE;
798 } else if (msg->pages) {
744 page = msg->pages[con->out_msg_pos.page]; 799 page = msg->pages[con->out_msg_pos.page];
745 if (crc) 800 if (crc)
746 kaddr = kmap(page); 801 kaddr = kmap(page);
@@ -749,13 +804,25 @@ static int write_partial_msg_pages(struct ceph_connection *con)
749 struct page, lru); 804 struct page, lru);
750 if (crc) 805 if (crc)
751 kaddr = kmap(page); 806 kaddr = kmap(page);
807#ifdef CONFIG_BLOCK
808 } else if (msg->bio) {
809 struct bio_vec *bv;
810
811 bv = bio_iovec_idx(msg->bio_iter, msg->bio_seg);
812 page = bv->bv_page;
813 page_shift = bv->bv_offset;
814 if (crc)
815 kaddr = kmap(page) + page_shift;
816 max_write = bv->bv_len;
817#endif
752 } else { 818 } else {
753 page = con->msgr->zero_page; 819 page = con->msgr->zero_page;
754 if (crc) 820 if (crc)
755 kaddr = page_address(con->msgr->zero_page); 821 kaddr = page_address(con->msgr->zero_page);
756 } 822 }
757 len = min((int)(PAGE_SIZE - con->out_msg_pos.page_pos), 823 len = min_t(int, max_write - con->out_msg_pos.page_pos,
758 (int)(data_len - con->out_msg_pos.data_pos)); 824 total_max_write);
825
759 if (crc && !con->out_msg_pos.did_page_crc) { 826 if (crc && !con->out_msg_pos.did_page_crc) {
760 void *base = kaddr + con->out_msg_pos.page_pos; 827 void *base = kaddr + con->out_msg_pos.page_pos;
761 u32 tmpcrc = le32_to_cpu(con->out_msg->footer.data_crc); 828 u32 tmpcrc = le32_to_cpu(con->out_msg->footer.data_crc);
@@ -765,13 +832,14 @@ static int write_partial_msg_pages(struct ceph_connection *con)
765 cpu_to_le32(crc32c(tmpcrc, base, len)); 832 cpu_to_le32(crc32c(tmpcrc, base, len));
766 con->out_msg_pos.did_page_crc = 1; 833 con->out_msg_pos.did_page_crc = 1;
767 } 834 }
768
769 ret = kernel_sendpage(con->sock, page, 835 ret = kernel_sendpage(con->sock, page,
770 con->out_msg_pos.page_pos, len, 836 con->out_msg_pos.page_pos + page_shift,
837 len,
771 MSG_DONTWAIT | MSG_NOSIGNAL | 838 MSG_DONTWAIT | MSG_NOSIGNAL |
772 MSG_MORE); 839 MSG_MORE);
773 840
774 if (crc && (msg->pages || msg->pagelist)) 841 if (crc &&
842 (msg->pages || msg->pagelist || msg->bio || in_trail))
775 kunmap(page); 843 kunmap(page);
776 844
777 if (ret <= 0) 845 if (ret <= 0)
@@ -783,9 +851,16 @@ static int write_partial_msg_pages(struct ceph_connection *con)
783 con->out_msg_pos.page_pos = 0; 851 con->out_msg_pos.page_pos = 0;
784 con->out_msg_pos.page++; 852 con->out_msg_pos.page++;
785 con->out_msg_pos.did_page_crc = 0; 853 con->out_msg_pos.did_page_crc = 0;
786 if (msg->pagelist) 854 if (in_trail)
855 list_move_tail(&page->lru,
856 &msg->trail->head);
857 else if (msg->pagelist)
787 list_move_tail(&page->lru, 858 list_move_tail(&page->lru,
788 &msg->pagelist->head); 859 &msg->pagelist->head);
860#ifdef CONFIG_BLOCK
861 else if (msg->bio)
862 iter_bio_next(&msg->bio_iter, &msg->bio_seg);
863#endif
789 } 864 }
790 } 865 }
791 866
@@ -1305,8 +1380,7 @@ static int read_partial_message_section(struct ceph_connection *con,
1305 struct kvec *section, 1380 struct kvec *section,
1306 unsigned int sec_len, u32 *crc) 1381 unsigned int sec_len, u32 *crc)
1307{ 1382{
1308 int left; 1383 int ret, left;
1309 int ret;
1310 1384
1311 BUG_ON(!section); 1385 BUG_ON(!section);
1312 1386
@@ -1329,13 +1403,83 @@ static int read_partial_message_section(struct ceph_connection *con,
1329static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con, 1403static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con,
1330 struct ceph_msg_header *hdr, 1404 struct ceph_msg_header *hdr,
1331 int *skip); 1405 int *skip);
1406
1407
1408static int read_partial_message_pages(struct ceph_connection *con,
1409 struct page **pages,
1410 unsigned data_len, int datacrc)
1411{
1412 void *p;
1413 int ret;
1414 int left;
1415
1416 left = min((int)(data_len - con->in_msg_pos.data_pos),
1417 (int)(PAGE_SIZE - con->in_msg_pos.page_pos));
1418 /* (page) data */
1419 BUG_ON(pages == NULL);
1420 p = kmap(pages[con->in_msg_pos.page]);
1421 ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos,
1422 left);
1423 if (ret > 0 && datacrc)
1424 con->in_data_crc =
1425 crc32c(con->in_data_crc,
1426 p + con->in_msg_pos.page_pos, ret);
1427 kunmap(pages[con->in_msg_pos.page]);
1428 if (ret <= 0)
1429 return ret;
1430 con->in_msg_pos.data_pos += ret;
1431 con->in_msg_pos.page_pos += ret;
1432 if (con->in_msg_pos.page_pos == PAGE_SIZE) {
1433 con->in_msg_pos.page_pos = 0;
1434 con->in_msg_pos.page++;
1435 }
1436
1437 return ret;
1438}
1439
1440#ifdef CONFIG_BLOCK
1441static int read_partial_message_bio(struct ceph_connection *con,
1442 struct bio **bio_iter, int *bio_seg,
1443 unsigned data_len, int datacrc)
1444{
1445 struct bio_vec *bv = bio_iovec_idx(*bio_iter, *bio_seg);
1446 void *p;
1447 int ret, left;
1448
1449 if (IS_ERR(bv))
1450 return PTR_ERR(bv);
1451
1452 left = min((int)(data_len - con->in_msg_pos.data_pos),
1453 (int)(bv->bv_len - con->in_msg_pos.page_pos));
1454
1455 p = kmap(bv->bv_page) + bv->bv_offset;
1456
1457 ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos,
1458 left);
1459 if (ret > 0 && datacrc)
1460 con->in_data_crc =
1461 crc32c(con->in_data_crc,
1462 p + con->in_msg_pos.page_pos, ret);
1463 kunmap(bv->bv_page);
1464 if (ret <= 0)
1465 return ret;
1466 con->in_msg_pos.data_pos += ret;
1467 con->in_msg_pos.page_pos += ret;
1468 if (con->in_msg_pos.page_pos == bv->bv_len) {
1469 con->in_msg_pos.page_pos = 0;
1470 iter_bio_next(bio_iter, bio_seg);
1471 }
1472
1473 return ret;
1474}
1475#endif
1476
1332/* 1477/*
1333 * read (part of) a message. 1478 * read (part of) a message.
1334 */ 1479 */
1335static int read_partial_message(struct ceph_connection *con) 1480static int read_partial_message(struct ceph_connection *con)
1336{ 1481{
1337 struct ceph_msg *m = con->in_msg; 1482 struct ceph_msg *m = con->in_msg;
1338 void *p;
1339 int ret; 1483 int ret;
1340 int to, left; 1484 int to, left;
1341 unsigned front_len, middle_len, data_len, data_off; 1485 unsigned front_len, middle_len, data_len, data_off;
@@ -1422,7 +1566,10 @@ static int read_partial_message(struct ceph_connection *con)
1422 m->middle->vec.iov_len = 0; 1566 m->middle->vec.iov_len = 0;
1423 1567
1424 con->in_msg_pos.page = 0; 1568 con->in_msg_pos.page = 0;
1425 con->in_msg_pos.page_pos = data_off & ~PAGE_MASK; 1569 if (m->pages)
1570 con->in_msg_pos.page_pos = data_off & ~PAGE_MASK;
1571 else
1572 con->in_msg_pos.page_pos = 0;
1426 con->in_msg_pos.data_pos = 0; 1573 con->in_msg_pos.data_pos = 0;
1427 } 1574 }
1428 1575
@@ -1440,27 +1587,29 @@ static int read_partial_message(struct ceph_connection *con)
1440 if (ret <= 0) 1587 if (ret <= 0)
1441 return ret; 1588 return ret;
1442 } 1589 }
1590#ifdef CONFIG_BLOCK
1591 if (m->bio && !m->bio_iter)
1592 init_bio_iter(m->bio, &m->bio_iter, &m->bio_seg);
1593#endif
1443 1594
1444 /* (page) data */ 1595 /* (page) data */
1445 while (con->in_msg_pos.data_pos < data_len) { 1596 while (con->in_msg_pos.data_pos < data_len) {
1446 left = min((int)(data_len - con->in_msg_pos.data_pos), 1597 if (m->pages) {
1447 (int)(PAGE_SIZE - con->in_msg_pos.page_pos)); 1598 ret = read_partial_message_pages(con, m->pages,
1448 BUG_ON(m->pages == NULL); 1599 data_len, datacrc);
1449 p = kmap(m->pages[con->in_msg_pos.page]); 1600 if (ret <= 0)
1450 ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos, 1601 return ret;
1451 left); 1602#ifdef CONFIG_BLOCK
1452 if (ret > 0 && datacrc) 1603 } else if (m->bio) {
1453 con->in_data_crc = 1604
1454 crc32c(con->in_data_crc, 1605 ret = read_partial_message_bio(con,
1455 p + con->in_msg_pos.page_pos, ret); 1606 &m->bio_iter, &m->bio_seg,
1456 kunmap(m->pages[con->in_msg_pos.page]); 1607 data_len, datacrc);
1457 if (ret <= 0) 1608 if (ret <= 0)
1458 return ret; 1609 return ret;
1459 con->in_msg_pos.data_pos += ret; 1610#endif
1460 con->in_msg_pos.page_pos += ret; 1611 } else {
1461 if (con->in_msg_pos.page_pos == PAGE_SIZE) { 1612 BUG_ON(1);
1462 con->in_msg_pos.page_pos = 0;
1463 con->in_msg_pos.page++;
1464 } 1613 }
1465 } 1614 }
1466 1615
@@ -2136,6 +2285,10 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags)
2136 m->nr_pages = 0; 2285 m->nr_pages = 0;
2137 m->pages = NULL; 2286 m->pages = NULL;
2138 m->pagelist = NULL; 2287 m->pagelist = NULL;
2288 m->bio = NULL;
2289 m->bio_iter = NULL;
2290 m->bio_seg = 0;
2291 m->trail = NULL;
2139 2292
2140 dout("ceph_msg_new %p front %d\n", m, front_len); 2293 dout("ceph_msg_new %p front %d\n", m, front_len);
2141 return m; 2294 return m;
@@ -2250,6 +2403,8 @@ void ceph_msg_last_put(struct kref *kref)
2250 m->pagelist = NULL; 2403 m->pagelist = NULL;
2251 } 2404 }
2252 2405
2406 m->trail = NULL;
2407
2253 if (m->pool) 2408 if (m->pool)
2254 ceph_msgpool_put(m->pool, m); 2409 ceph_msgpool_put(m->pool, m);
2255 else 2410 else