aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorYehuda Sadeh <yehuda@hq.newdream.net>2010-04-06 18:01:27 -0400
committerSage Weil <sage@newdream.net>2010-10-20 18:37:18 -0400
commit68b4476b0bc13fef18266b4140309a30e86739d2 (patch)
tree47fab5ea2491c7bc75fe14a3b0d3a091eb6244b7
parent3499e8a5d4dbb083324efd942e2c4fb7eb65f27c (diff)
ceph: messenger and osdc changes for rbd
Allow the messenger to send/receive data in a bio. This is added so that we wouldn't need to copy the data into pages or some other buffer when doing IO for an rbd block device. We can now have trailing variable sized data for osd ops. Also osd ops encoding is more modular. Signed-off-by: Yehuda Sadeh <yehuda@hq.newdream.net> Signed-off-by: Sage Weil <sage@newdream.net>
-rw-r--r--fs/ceph/messenger.c219
-rw-r--r--fs/ceph/messenger.h4
-rw-r--r--fs/ceph/osd_client.c249
-rw-r--r--fs/ceph/osd_client.h61
-rw-r--r--fs/ceph/pagelist.c2
-rw-r--r--fs/ceph/pagelist.h2
6 files changed, 436 insertions, 101 deletions
diff --git a/fs/ceph/messenger.c b/fs/ceph/messenger.c
index 2502d76fcec1..17a09b32a591 100644
--- a/fs/ceph/messenger.c
+++ b/fs/ceph/messenger.c
@@ -9,6 +9,8 @@
9#include <linux/slab.h> 9#include <linux/slab.h>
10#include <linux/socket.h> 10#include <linux/socket.h>
11#include <linux/string.h> 11#include <linux/string.h>
12#include <linux/bio.h>
13#include <linux/blkdev.h>
12#include <net/tcp.h> 14#include <net/tcp.h>
13 15
14#include "super.h" 16#include "super.h"
@@ -529,8 +531,11 @@ static void prepare_write_message(struct ceph_connection *con)
529 if (le32_to_cpu(m->hdr.data_len) > 0) { 531 if (le32_to_cpu(m->hdr.data_len) > 0) {
530 /* initialize page iterator */ 532 /* initialize page iterator */
531 con->out_msg_pos.page = 0; 533 con->out_msg_pos.page = 0;
532 con->out_msg_pos.page_pos = 534 if (m->pages)
533 le16_to_cpu(m->hdr.data_off) & ~PAGE_MASK; 535 con->out_msg_pos.page_pos =
536 le16_to_cpu(m->hdr.data_off) & ~PAGE_MASK;
537 else
538 con->out_msg_pos.page_pos = 0;
534 con->out_msg_pos.data_pos = 0; 539 con->out_msg_pos.data_pos = 0;
535 con->out_msg_pos.did_page_crc = 0; 540 con->out_msg_pos.did_page_crc = 0;
536 con->out_more = 1; /* data + footer will follow */ 541 con->out_more = 1; /* data + footer will follow */
@@ -712,6 +717,31 @@ out:
712 return ret; /* done! */ 717 return ret; /* done! */
713} 718}
714 719
720#ifdef CONFIG_BLOCK
721static void init_bio_iter(struct bio *bio, struct bio **iter, int *seg)
722{
723 if (!bio) {
724 *iter = NULL;
725 *seg = 0;
726 return;
727 }
728 *iter = bio;
729 *seg = bio->bi_idx;
730}
731
732static void iter_bio_next(struct bio **bio_iter, int *seg)
733{
734 if (*bio_iter == NULL)
735 return;
736
737 BUG_ON(*seg >= (*bio_iter)->bi_vcnt);
738
739 (*seg)++;
740 if (*seg == (*bio_iter)->bi_vcnt)
741 init_bio_iter((*bio_iter)->bi_next, bio_iter, seg);
742}
743#endif
744
715/* 745/*
716 * Write as much message data payload as we can. If we finish, queue 746 * Write as much message data payload as we can. If we finish, queue
717 * up the footer. 747 * up the footer.
@@ -726,21 +756,46 @@ static int write_partial_msg_pages(struct ceph_connection *con)
726 size_t len; 756 size_t len;
727 int crc = con->msgr->nocrc; 757 int crc = con->msgr->nocrc;
728 int ret; 758 int ret;
759 int total_max_write;
760 int in_trail = 0;
761 size_t trail_len = (msg->trail ? msg->trail->length : 0);
729 762
730 dout("write_partial_msg_pages %p msg %p page %d/%d offset %d\n", 763 dout("write_partial_msg_pages %p msg %p page %d/%d offset %d\n",
731 con, con->out_msg, con->out_msg_pos.page, con->out_msg->nr_pages, 764 con, con->out_msg, con->out_msg_pos.page, con->out_msg->nr_pages,
732 con->out_msg_pos.page_pos); 765 con->out_msg_pos.page_pos);
733 766
734 while (con->out_msg_pos.page < con->out_msg->nr_pages) { 767#ifdef CONFIG_BLOCK
768 if (msg->bio && !msg->bio_iter)
769 init_bio_iter(msg->bio, &msg->bio_iter, &msg->bio_seg);
770#endif
771
772 while (data_len > con->out_msg_pos.data_pos) {
735 struct page *page = NULL; 773 struct page *page = NULL;
736 void *kaddr = NULL; 774 void *kaddr = NULL;
775 int max_write = PAGE_SIZE;
776 int page_shift = 0;
777
778 total_max_write = data_len - trail_len -
779 con->out_msg_pos.data_pos;
737 780
738 /* 781 /*
739 * if we are calculating the data crc (the default), we need 782 * if we are calculating the data crc (the default), we need
740 * to map the page. if our pages[] has been revoked, use the 783 * to map the page. if our pages[] has been revoked, use the
741 * zero page. 784 * zero page.
742 */ 785 */
743 if (msg->pages) { 786
787 /* have we reached the trail part of the data? */
788 if (con->out_msg_pos.data_pos >= data_len - trail_len) {
789 in_trail = 1;
790
791 total_max_write = data_len - con->out_msg_pos.data_pos;
792
793 page = list_first_entry(&msg->trail->head,
794 struct page, lru);
795 if (crc)
796 kaddr = kmap(page);
797 max_write = PAGE_SIZE;
798 } else if (msg->pages) {
744 page = msg->pages[con->out_msg_pos.page]; 799 page = msg->pages[con->out_msg_pos.page];
745 if (crc) 800 if (crc)
746 kaddr = kmap(page); 801 kaddr = kmap(page);
@@ -749,13 +804,25 @@ static int write_partial_msg_pages(struct ceph_connection *con)
749 struct page, lru); 804 struct page, lru);
750 if (crc) 805 if (crc)
751 kaddr = kmap(page); 806 kaddr = kmap(page);
807#ifdef CONFIG_BLOCK
808 } else if (msg->bio) {
809 struct bio_vec *bv;
810
811 bv = bio_iovec_idx(msg->bio_iter, msg->bio_seg);
812 page = bv->bv_page;
813 page_shift = bv->bv_offset;
814 if (crc)
815 kaddr = kmap(page) + page_shift;
816 max_write = bv->bv_len;
817#endif
752 } else { 818 } else {
753 page = con->msgr->zero_page; 819 page = con->msgr->zero_page;
754 if (crc) 820 if (crc)
755 kaddr = page_address(con->msgr->zero_page); 821 kaddr = page_address(con->msgr->zero_page);
756 } 822 }
757 len = min((int)(PAGE_SIZE - con->out_msg_pos.page_pos), 823 len = min_t(int, max_write - con->out_msg_pos.page_pos,
758 (int)(data_len - con->out_msg_pos.data_pos)); 824 total_max_write);
825
759 if (crc && !con->out_msg_pos.did_page_crc) { 826 if (crc && !con->out_msg_pos.did_page_crc) {
760 void *base = kaddr + con->out_msg_pos.page_pos; 827 void *base = kaddr + con->out_msg_pos.page_pos;
761 u32 tmpcrc = le32_to_cpu(con->out_msg->footer.data_crc); 828 u32 tmpcrc = le32_to_cpu(con->out_msg->footer.data_crc);
@@ -765,13 +832,14 @@ static int write_partial_msg_pages(struct ceph_connection *con)
765 cpu_to_le32(crc32c(tmpcrc, base, len)); 832 cpu_to_le32(crc32c(tmpcrc, base, len));
766 con->out_msg_pos.did_page_crc = 1; 833 con->out_msg_pos.did_page_crc = 1;
767 } 834 }
768
769 ret = kernel_sendpage(con->sock, page, 835 ret = kernel_sendpage(con->sock, page,
770 con->out_msg_pos.page_pos, len, 836 con->out_msg_pos.page_pos + page_shift,
837 len,
771 MSG_DONTWAIT | MSG_NOSIGNAL | 838 MSG_DONTWAIT | MSG_NOSIGNAL |
772 MSG_MORE); 839 MSG_MORE);
773 840
774 if (crc && (msg->pages || msg->pagelist)) 841 if (crc &&
842 (msg->pages || msg->pagelist || msg->bio || in_trail))
775 kunmap(page); 843 kunmap(page);
776 844
777 if (ret <= 0) 845 if (ret <= 0)
@@ -783,9 +851,16 @@ static int write_partial_msg_pages(struct ceph_connection *con)
783 con->out_msg_pos.page_pos = 0; 851 con->out_msg_pos.page_pos = 0;
784 con->out_msg_pos.page++; 852 con->out_msg_pos.page++;
785 con->out_msg_pos.did_page_crc = 0; 853 con->out_msg_pos.did_page_crc = 0;
786 if (msg->pagelist) 854 if (in_trail)
855 list_move_tail(&page->lru,
856 &msg->trail->head);
857 else if (msg->pagelist)
787 list_move_tail(&page->lru, 858 list_move_tail(&page->lru,
788 &msg->pagelist->head); 859 &msg->pagelist->head);
860#ifdef CONFIG_BLOCK
861 else if (msg->bio)
862 iter_bio_next(&msg->bio_iter, &msg->bio_seg);
863#endif
789 } 864 }
790 } 865 }
791 866
@@ -1305,8 +1380,7 @@ static int read_partial_message_section(struct ceph_connection *con,
1305 struct kvec *section, 1380 struct kvec *section,
1306 unsigned int sec_len, u32 *crc) 1381 unsigned int sec_len, u32 *crc)
1307{ 1382{
1308 int left; 1383 int ret, left;
1309 int ret;
1310 1384
1311 BUG_ON(!section); 1385 BUG_ON(!section);
1312 1386
@@ -1329,13 +1403,83 @@ static int read_partial_message_section(struct ceph_connection *con,
1329static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con, 1403static struct ceph_msg *ceph_alloc_msg(struct ceph_connection *con,
1330 struct ceph_msg_header *hdr, 1404 struct ceph_msg_header *hdr,
1331 int *skip); 1405 int *skip);
1406
1407
1408static int read_partial_message_pages(struct ceph_connection *con,
1409 struct page **pages,
1410 unsigned data_len, int datacrc)
1411{
1412 void *p;
1413 int ret;
1414 int left;
1415
1416 left = min((int)(data_len - con->in_msg_pos.data_pos),
1417 (int)(PAGE_SIZE - con->in_msg_pos.page_pos));
1418 /* (page) data */
1419 BUG_ON(pages == NULL);
1420 p = kmap(pages[con->in_msg_pos.page]);
1421 ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos,
1422 left);
1423 if (ret > 0 && datacrc)
1424 con->in_data_crc =
1425 crc32c(con->in_data_crc,
1426 p + con->in_msg_pos.page_pos, ret);
1427 kunmap(pages[con->in_msg_pos.page]);
1428 if (ret <= 0)
1429 return ret;
1430 con->in_msg_pos.data_pos += ret;
1431 con->in_msg_pos.page_pos += ret;
1432 if (con->in_msg_pos.page_pos == PAGE_SIZE) {
1433 con->in_msg_pos.page_pos = 0;
1434 con->in_msg_pos.page++;
1435 }
1436
1437 return ret;
1438}
1439
1440#ifdef CONFIG_BLOCK
1441static int read_partial_message_bio(struct ceph_connection *con,
1442 struct bio **bio_iter, int *bio_seg,
1443 unsigned data_len, int datacrc)
1444{
1445 struct bio_vec *bv = bio_iovec_idx(*bio_iter, *bio_seg);
1446 void *p;
1447 int ret, left;
1448
1449 if (IS_ERR(bv))
1450 return PTR_ERR(bv);
1451
1452 left = min((int)(data_len - con->in_msg_pos.data_pos),
1453 (int)(bv->bv_len - con->in_msg_pos.page_pos));
1454
1455 p = kmap(bv->bv_page) + bv->bv_offset;
1456
1457 ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos,
1458 left);
1459 if (ret > 0 && datacrc)
1460 con->in_data_crc =
1461 crc32c(con->in_data_crc,
1462 p + con->in_msg_pos.page_pos, ret);
1463 kunmap(bv->bv_page);
1464 if (ret <= 0)
1465 return ret;
1466 con->in_msg_pos.data_pos += ret;
1467 con->in_msg_pos.page_pos += ret;
1468 if (con->in_msg_pos.page_pos == bv->bv_len) {
1469 con->in_msg_pos.page_pos = 0;
1470 iter_bio_next(bio_iter, bio_seg);
1471 }
1472
1473 return ret;
1474}
1475#endif
1476
1332/* 1477/*
1333 * read (part of) a message. 1478 * read (part of) a message.
1334 */ 1479 */
1335static int read_partial_message(struct ceph_connection *con) 1480static int read_partial_message(struct ceph_connection *con)
1336{ 1481{
1337 struct ceph_msg *m = con->in_msg; 1482 struct ceph_msg *m = con->in_msg;
1338 void *p;
1339 int ret; 1483 int ret;
1340 int to, left; 1484 int to, left;
1341 unsigned front_len, middle_len, data_len, data_off; 1485 unsigned front_len, middle_len, data_len, data_off;
@@ -1422,7 +1566,10 @@ static int read_partial_message(struct ceph_connection *con)
1422 m->middle->vec.iov_len = 0; 1566 m->middle->vec.iov_len = 0;
1423 1567
1424 con->in_msg_pos.page = 0; 1568 con->in_msg_pos.page = 0;
1425 con->in_msg_pos.page_pos = data_off & ~PAGE_MASK; 1569 if (m->pages)
1570 con->in_msg_pos.page_pos = data_off & ~PAGE_MASK;
1571 else
1572 con->in_msg_pos.page_pos = 0;
1426 con->in_msg_pos.data_pos = 0; 1573 con->in_msg_pos.data_pos = 0;
1427 } 1574 }
1428 1575
@@ -1440,27 +1587,29 @@ static int read_partial_message(struct ceph_connection *con)
1440 if (ret <= 0) 1587 if (ret <= 0)
1441 return ret; 1588 return ret;
1442 } 1589 }
1590#ifdef CONFIG_BLOCK
1591 if (m->bio && !m->bio_iter)
1592 init_bio_iter(m->bio, &m->bio_iter, &m->bio_seg);
1593#endif
1443 1594
1444 /* (page) data */ 1595 /* (page) data */
1445 while (con->in_msg_pos.data_pos < data_len) { 1596 while (con->in_msg_pos.data_pos < data_len) {
1446 left = min((int)(data_len - con->in_msg_pos.data_pos), 1597 if (m->pages) {
1447 (int)(PAGE_SIZE - con->in_msg_pos.page_pos)); 1598 ret = read_partial_message_pages(con, m->pages,
1448 BUG_ON(m->pages == NULL); 1599 data_len, datacrc);
1449 p = kmap(m->pages[con->in_msg_pos.page]); 1600 if (ret <= 0)
1450 ret = ceph_tcp_recvmsg(con->sock, p + con->in_msg_pos.page_pos, 1601 return ret;
1451 left); 1602#ifdef CONFIG_BLOCK
1452 if (ret > 0 && datacrc) 1603 } else if (m->bio) {
1453 con->in_data_crc = 1604
1454 crc32c(con->in_data_crc, 1605 ret = read_partial_message_bio(con,
1455 p + con->in_msg_pos.page_pos, ret); 1606 &m->bio_iter, &m->bio_seg,
1456 kunmap(m->pages[con->in_msg_pos.page]); 1607 data_len, datacrc);
1457 if (ret <= 0) 1608 if (ret <= 0)
1458 return ret; 1609 return ret;
1459 con->in_msg_pos.data_pos += ret; 1610#endif
1460 con->in_msg_pos.page_pos += ret; 1611 } else {
1461 if (con->in_msg_pos.page_pos == PAGE_SIZE) { 1612 BUG_ON(1);
1462 con->in_msg_pos.page_pos = 0;
1463 con->in_msg_pos.page++;
1464 } 1613 }
1465 } 1614 }
1466 1615
@@ -2136,6 +2285,10 @@ struct ceph_msg *ceph_msg_new(int type, int front_len, gfp_t flags)
2136 m->nr_pages = 0; 2285 m->nr_pages = 0;
2137 m->pages = NULL; 2286 m->pages = NULL;
2138 m->pagelist = NULL; 2287 m->pagelist = NULL;
2288 m->bio = NULL;
2289 m->bio_iter = NULL;
2290 m->bio_seg = 0;
2291 m->trail = NULL;
2139 2292
2140 dout("ceph_msg_new %p front %d\n", m, front_len); 2293 dout("ceph_msg_new %p front %d\n", m, front_len);
2141 return m; 2294 return m;
@@ -2250,6 +2403,8 @@ void ceph_msg_last_put(struct kref *kref)
2250 m->pagelist = NULL; 2403 m->pagelist = NULL;
2251 } 2404 }
2252 2405
2406 m->trail = NULL;
2407
2253 if (m->pool) 2408 if (m->pool)
2254 ceph_msgpool_put(m->pool, m); 2409 ceph_msgpool_put(m->pool, m);
2255 else 2410 else
diff --git a/fs/ceph/messenger.h b/fs/ceph/messenger.h
index 76fbc957bc13..5a79450604ef 100644
--- a/fs/ceph/messenger.h
+++ b/fs/ceph/messenger.h
@@ -82,6 +82,10 @@ struct ceph_msg {
82 struct ceph_pagelist *pagelist; /* instead of pages */ 82 struct ceph_pagelist *pagelist; /* instead of pages */
83 struct list_head list_head; 83 struct list_head list_head;
84 struct kref kref; 84 struct kref kref;
85 struct bio *bio; /* instead of pages/pagelist */
86 struct bio *bio_iter; /* bio iterator */
87 int bio_seg; /* current bio segment */
88 struct ceph_pagelist *trail; /* the trailing part of the data */
85 bool front_is_vmalloc; 89 bool front_is_vmalloc;
86 bool more_to_follow; 90 bool more_to_follow;
87 bool needs_out_seq; 91 bool needs_out_seq;
diff --git a/fs/ceph/osd_client.c b/fs/ceph/osd_client.c
index 2647dafd96f5..c5d818e73add 100644
--- a/fs/ceph/osd_client.c
+++ b/fs/ceph/osd_client.c
@@ -6,12 +6,16 @@
6#include <linux/pagemap.h> 6#include <linux/pagemap.h>
7#include <linux/slab.h> 7#include <linux/slab.h>
8#include <linux/uaccess.h> 8#include <linux/uaccess.h>
9#ifdef CONFIG_BLOCK
10#include <linux/bio.h>
11#endif
9 12
10#include "super.h" 13#include "super.h"
11#include "osd_client.h" 14#include "osd_client.h"
12#include "messenger.h" 15#include "messenger.h"
13#include "decode.h" 16#include "decode.h"
14#include "auth.h" 17#include "auth.h"
18#include "pagelist.h"
15 19
16#define OSD_OP_FRONT_LEN 4096 20#define OSD_OP_FRONT_LEN 4096
17#define OSD_OPREPLY_FRONT_LEN 512 21#define OSD_OPREPLY_FRONT_LEN 512
@@ -22,29 +26,50 @@ static int __kick_requests(struct ceph_osd_client *osdc,
22 26
23static void kick_requests(struct ceph_osd_client *osdc, struct ceph_osd *osd); 27static void kick_requests(struct ceph_osd_client *osdc, struct ceph_osd *osd);
24 28
29static int op_needs_trail(int op)
30{
31 switch (op) {
32 case CEPH_OSD_OP_GETXATTR:
33 case CEPH_OSD_OP_SETXATTR:
34 case CEPH_OSD_OP_CMPXATTR:
35 case CEPH_OSD_OP_CALL:
36 return 1;
37 default:
38 return 0;
39 }
40}
41
42static int op_has_extent(int op)
43{
44 return (op == CEPH_OSD_OP_READ ||
45 op == CEPH_OSD_OP_WRITE);
46}
47
25void ceph_calc_raw_layout(struct ceph_osd_client *osdc, 48void ceph_calc_raw_layout(struct ceph_osd_client *osdc,
26 struct ceph_file_layout *layout, 49 struct ceph_file_layout *layout,
27 u64 snapid, 50 u64 snapid,
28 u64 off, u64 len, u64 *bno, 51 u64 off, u64 *plen, u64 *bno,
29 struct ceph_osd_request *req) 52 struct ceph_osd_request *req,
53 struct ceph_osd_req_op *op)
30{ 54{
31 struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base; 55 struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
32 struct ceph_osd_op *op = (void *)(reqhead + 1); 56 u64 orig_len = *plen;
33 u64 orig_len = len;
34 u64 objoff, objlen; /* extent in object */ 57 u64 objoff, objlen; /* extent in object */
35 58
36 reqhead->snapid = cpu_to_le64(snapid); 59 reqhead->snapid = cpu_to_le64(snapid);
37 60
38 /* object extent? */ 61 /* object extent? */
39 ceph_calc_file_object_mapping(layout, off, &len, bno, 62 ceph_calc_file_object_mapping(layout, off, plen, bno,
40 &objoff, &objlen); 63 &objoff, &objlen);
41 if (len < orig_len) 64 if (*plen < orig_len)
42 dout(" skipping last %llu, final file extent %llu~%llu\n", 65 dout(" skipping last %llu, final file extent %llu~%llu\n",
43 orig_len - len, off, len); 66 orig_len - *plen, off, *plen);
44 67
45 op->extent.offset = cpu_to_le64(objoff); 68 if (op_has_extent(op->op)) {
46 op->extent.length = cpu_to_le64(objlen); 69 op->extent.offset = objoff;
47 req->r_num_pages = calc_pages_for(off, len); 70 op->extent.length = objlen;
71 }
72 req->r_num_pages = calc_pages_for(off, *plen);
48 73
49 dout("calc_layout bno=%llx %llu~%llu (%d pages)\n", 74 dout("calc_layout bno=%llx %llu~%llu (%d pages)\n",
50 *bno, objoff, objlen, req->r_num_pages); 75 *bno, objoff, objlen, req->r_num_pages);
@@ -80,11 +105,13 @@ static void calc_layout(struct ceph_osd_client *osdc,
80 struct ceph_vino vino, 105 struct ceph_vino vino,
81 struct ceph_file_layout *layout, 106 struct ceph_file_layout *layout,
82 u64 off, u64 *plen, 107 u64 off, u64 *plen,
83 struct ceph_osd_request *req) 108 struct ceph_osd_request *req,
109 struct ceph_osd_req_op *op)
84{ 110{
85 u64 bno; 111 u64 bno;
86 112
87 ceph_calc_raw_layout(osdc, layout, vino.snap, off, *plen, &bno, req); 113 ceph_calc_raw_layout(osdc, layout, vino.snap, off,
114 plen, &bno, req, op);
88 115
89 sprintf(req->r_oid, "%llx.%08llx", vino.ino, bno); 116 sprintf(req->r_oid, "%llx.%08llx", vino.ino, bno);
90 req->r_oid_len = strlen(req->r_oid); 117 req->r_oid_len = strlen(req->r_oid);
@@ -113,35 +140,64 @@ void ceph_osdc_release_request(struct kref *kref)
113 if (req->r_own_pages) 140 if (req->r_own_pages)
114 ceph_release_page_vector(req->r_pages, 141 ceph_release_page_vector(req->r_pages,
115 req->r_num_pages); 142 req->r_num_pages);
143#ifdef CONFIG_BLOCK
144 if (req->r_bio)
145 bio_put(req->r_bio);
146#endif
116 ceph_put_snap_context(req->r_snapc); 147 ceph_put_snap_context(req->r_snapc);
148 if (req->r_trail) {
149 ceph_pagelist_release(req->r_trail);
150 kfree(req->r_trail);
151 }
117 if (req->r_mempool) 152 if (req->r_mempool)
118 mempool_free(req, req->r_osdc->req_mempool); 153 mempool_free(req, req->r_osdc->req_mempool);
119 else 154 else
120 kfree(req); 155 kfree(req);
121} 156}
122 157
158static int op_needs_trail(int op)
159{
160 switch (op) {
161 case CEPH_OSD_OP_GETXATTR:
162 case CEPH_OSD_OP_SETXATTR:
163 case CEPH_OSD_OP_CMPXATTR:
164 return 1;
165 default:
166 return 0;
167 }
168}
169
170static int get_num_ops(struct ceph_osd_req_op *ops, int *needs_trail)
171{
172 int i = 0;
173
174 if (needs_trail)
175 *needs_trail = 0;
176 while (ops[i].op) {
177 if (needs_trail && op_needs_trail(ops[i].op))
178 *needs_trail = 1;
179 i++;
180 }
181
182 return i;
183}
184
123struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, 185struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
124 int flags, 186 int flags,
125 struct ceph_snap_context *snapc, 187 struct ceph_snap_context *snapc,
126 int do_sync, 188 struct ceph_osd_req_op *ops,
127 bool use_mempool, 189 bool use_mempool,
128 gfp_t gfp_flags, 190 gfp_t gfp_flags,
129 struct page **pages) 191 struct page **pages,
192 struct bio *bio)
130{ 193{
131 struct ceph_osd_request *req; 194 struct ceph_osd_request *req;
132 struct ceph_msg *msg; 195 struct ceph_msg *msg;
133 int num_op = 1 + do_sync; 196 int needs_trail;
134 size_t msg_size = sizeof(struct ceph_osd_request_head) + 197 int num_op = get_num_ops(ops, &needs_trail);
135 num_op*sizeof(struct ceph_osd_op); 198 size_t msg_size = sizeof(struct ceph_osd_request_head);
136 199
137 if (use_mempool) { 200 msg_size += num_op*sizeof(struct ceph_osd_op);
138 req = mempool_alloc(osdc->req_mempool, gfp_flags);
139 memset(req, 0, sizeof(*req));
140 } else {
141 req = kzalloc(sizeof(*req), gfp_flags);
142 }
143 if (!req)
144 return NULL;
145 201
146 if (use_mempool) { 202 if (use_mempool) {
147 req = mempool_alloc(osdc->req_mempool, gfp_flags); 203 req = mempool_alloc(osdc->req_mempool, gfp_flags);
@@ -154,6 +210,7 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
154 210
155 req->r_osdc = osdc; 211 req->r_osdc = osdc;
156 req->r_mempool = use_mempool; 212 req->r_mempool = use_mempool;
213
157 kref_init(&req->r_kref); 214 kref_init(&req->r_kref);
158 init_completion(&req->r_completion); 215 init_completion(&req->r_completion);
159 init_completion(&req->r_safe_completion); 216 init_completion(&req->r_safe_completion);
@@ -174,6 +231,15 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
174 } 231 }
175 req->r_reply = msg; 232 req->r_reply = msg;
176 233
234 /* allocate space for the trailing data */
235 if (needs_trail) {
236 req->r_trail = kmalloc(sizeof(struct ceph_pagelist), gfp_flags);
237 if (!req->r_trail) {
238 ceph_osdc_put_request(req);
239 return NULL;
240 }
241 ceph_pagelist_init(req->r_trail);
242 }
177 /* create request message; allow space for oid */ 243 /* create request message; allow space for oid */
178 msg_size += 40; 244 msg_size += 40;
179 if (snapc) 245 if (snapc)
@@ -186,38 +252,87 @@ struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
186 ceph_osdc_put_request(req); 252 ceph_osdc_put_request(req);
187 return NULL; 253 return NULL;
188 } 254 }
255
189 msg->hdr.type = cpu_to_le16(CEPH_MSG_OSD_OP); 256 msg->hdr.type = cpu_to_le16(CEPH_MSG_OSD_OP);
190 memset(msg->front.iov_base, 0, msg->front.iov_len); 257 memset(msg->front.iov_base, 0, msg->front.iov_len);
191 258
192 req->r_request = msg; 259 req->r_request = msg;
193 req->r_pages = pages; 260 req->r_pages = pages;
261#ifdef CONFIG_BLOCK
262 if (bio) {
263 req->r_bio = bio;
264 bio_get(req->r_bio);
265 }
266#endif
194 267
195 return req; 268 return req;
196} 269}
197 270
271static void osd_req_encode_op(struct ceph_osd_request *req,
272 struct ceph_osd_op *dst,
273 struct ceph_osd_req_op *src)
274{
275 dst->op = cpu_to_le16(src->op);
276
277 switch (dst->op) {
278 case CEPH_OSD_OP_READ:
279 case CEPH_OSD_OP_WRITE:
280 dst->extent.offset =
281 cpu_to_le64(src->extent.offset);
282 dst->extent.length =
283 cpu_to_le64(src->extent.length);
284 dst->extent.truncate_size =
285 cpu_to_le64(src->extent.truncate_size);
286 dst->extent.truncate_seq =
287 cpu_to_le32(src->extent.truncate_seq);
288 break;
289
290 case CEPH_OSD_OP_GETXATTR:
291 case CEPH_OSD_OP_SETXATTR:
292 case CEPH_OSD_OP_CMPXATTR:
293 BUG_ON(!req->r_trail);
294
295 dst->xattr.name_len = cpu_to_le32(src->xattr.name_len);
296 dst->xattr.value_len = cpu_to_le32(src->xattr.value_len);
297 dst->xattr.cmp_op = src->xattr.cmp_op;
298 dst->xattr.cmp_mode = src->xattr.cmp_mode;
299 ceph_pagelist_append(req->r_trail, src->xattr.name,
300 src->xattr.name_len);
301 ceph_pagelist_append(req->r_trail, src->xattr.val,
302 src->xattr.value_len);
303 break;
304 case CEPH_OSD_OP_STARTSYNC:
305 break;
306 default:
307 pr_err("unrecognized osd opcode %d\n", dst->op);
308 WARN_ON(1);
309 break;
310 }
311 dst->payload_len = cpu_to_le32(src->payload_len);
312}
313
198/* 314/*
199 * build new request AND message 315 * build new request AND message
200 * 316 *
201 */ 317 */
202void ceph_osdc_build_request(struct ceph_osd_request *req, 318void ceph_osdc_build_request(struct ceph_osd_request *req,
203 u64 off, u64 *plen, 319 u64 off, u64 *plen,
204 int opcode, 320 struct ceph_osd_req_op *src_ops,
205 struct ceph_snap_context *snapc, 321 struct ceph_snap_context *snapc,
206 int do_sync, 322 struct timespec *mtime,
207 u32 truncate_seq, 323 const char *oid,
208 u64 truncate_size, 324 int oid_len)
209 struct timespec *mtime,
210 const char *oid,
211 int oid_len)
212{ 325{
213 struct ceph_msg *msg = req->r_request; 326 struct ceph_msg *msg = req->r_request;
214 struct ceph_osd_request_head *head; 327 struct ceph_osd_request_head *head;
328 struct ceph_osd_req_op *src_op;
215 struct ceph_osd_op *op; 329 struct ceph_osd_op *op;
216 void *p; 330 void *p;
217 int num_op = 1 + do_sync; 331 int num_op = get_num_ops(src_ops, NULL);
218 size_t msg_size = sizeof(*head) + num_op*sizeof(*op); 332 size_t msg_size = sizeof(*head) + num_op*sizeof(*op);
219 int i;
220 int flags = req->r_flags; 333 int flags = req->r_flags;
334 u64 data_len = 0;
335 int i;
221 336
222 head = msg->front.iov_base; 337 head = msg->front.iov_base;
223 op = (void *)(head + 1); 338 op = (void *)(head + 1);
@@ -230,25 +345,23 @@ void ceph_osdc_build_request(struct ceph_osd_request *req,
230 if (flags & CEPH_OSD_FLAG_WRITE) 345 if (flags & CEPH_OSD_FLAG_WRITE)
231 ceph_encode_timespec(&head->mtime, mtime); 346 ceph_encode_timespec(&head->mtime, mtime);
232 head->num_ops = cpu_to_le16(num_op); 347 head->num_ops = cpu_to_le16(num_op);
233 op->op = cpu_to_le16(opcode);
234 348
235 if (flags & CEPH_OSD_FLAG_WRITE) {
236 req->r_request->hdr.data_off = cpu_to_le16(off);
237 req->r_request->hdr.data_len = cpu_to_le32(*plen);
238 op->payload_len = cpu_to_le32(*plen);
239 }
240 op->extent.truncate_size = cpu_to_le64(truncate_size);
241 op->extent.truncate_seq = cpu_to_le32(truncate_seq);
242 349
243 /* fill in oid */ 350 /* fill in oid */
244 head->object_len = cpu_to_le32(oid_len); 351 head->object_len = cpu_to_le32(oid_len);
245 memcpy(p, oid, oid_len); 352 memcpy(p, oid, oid_len);
246 p += oid_len; 353 p += oid_len;
247 354
248 if (do_sync) { 355 src_op = src_ops;
356 while (src_op->op) {
357 osd_req_encode_op(req, op, src_op);
358 src_op++;
249 op++; 359 op++;
250 op->op = cpu_to_le16(CEPH_OSD_OP_STARTSYNC);
251 } 360 }
361
362 if (req->r_trail)
363 data_len += req->r_trail->length;
364
252 if (snapc) { 365 if (snapc) {
253 head->snap_seq = cpu_to_le64(snapc->seq); 366 head->snap_seq = cpu_to_le64(snapc->seq);
254 head->num_snaps = cpu_to_le32(snapc->num_snaps); 367 head->num_snaps = cpu_to_le32(snapc->num_snaps);
@@ -258,6 +371,14 @@ void ceph_osdc_build_request(struct ceph_osd_request *req,
258 } 371 }
259 } 372 }
260 373
374 if (flags & CEPH_OSD_FLAG_WRITE) {
375 req->r_request->hdr.data_off = cpu_to_le16(off);
376 req->r_request->hdr.data_len = cpu_to_le32(*plen + data_len);
377 } else if (data_len) {
378 req->r_request->hdr.data_off = 0;
379 req->r_request->hdr.data_len = cpu_to_le32(data_len);
380 }
381
261 BUG_ON(p > msg->front.iov_base + msg->front.iov_len); 382 BUG_ON(p > msg->front.iov_base + msg->front.iov_len);
262 msg_size = p - msg->front.iov_base; 383 msg_size = p - msg->front.iov_base;
263 msg->front.iov_len = msg_size; 384 msg->front.iov_len = msg_size;
@@ -288,21 +409,34 @@ struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
288 struct timespec *mtime, 409 struct timespec *mtime,
289 bool use_mempool, int num_reply) 410 bool use_mempool, int num_reply)
290{ 411{
291 struct ceph_osd_request *req = 412 struct ceph_osd_req_op ops[3];
292 ceph_osdc_alloc_request(osdc, flags, 413 struct ceph_osd_request *req;
293 snapc, do_sync, 414
415 ops[0].op = opcode;
416 ops[0].extent.truncate_seq = truncate_seq;
417 ops[0].extent.truncate_size = truncate_size;
418 ops[0].payload_len = 0;
419
420 if (do_sync) {
421 ops[1].op = CEPH_OSD_OP_STARTSYNC;
422 ops[1].payload_len = 0;
423 ops[2].op = 0;
424 } else
425 ops[1].op = 0;
426
427 req = ceph_osdc_alloc_request(osdc, flags,
428 snapc, ops,
294 use_mempool, 429 use_mempool,
295 GFP_NOFS, NULL); 430 GFP_NOFS, NULL, NULL);
296 if (IS_ERR(req)) 431 if (IS_ERR(req))
297 return req; 432 return req;
298 433
299 /* calculate max write size */ 434 /* calculate max write size */
300 calc_layout(osdc, vino, layout, off, plen, req); 435 calc_layout(osdc, vino, layout, off, plen, req, ops);
301 req->r_file_layout = *layout; /* keep a copy */ 436 req->r_file_layout = *layout; /* keep a copy */
302 437
303 ceph_osdc_build_request(req, off, plen, opcode, 438 ceph_osdc_build_request(req, off, plen, ops,
304 snapc, do_sync, 439 snapc,
305 truncate_seq, truncate_size,
306 mtime, 440 mtime,
307 req->r_oid, req->r_oid_len); 441 req->r_oid, req->r_oid_len);
308 442
@@ -1177,6 +1311,10 @@ int ceph_osdc_start_request(struct ceph_osd_client *osdc,
1177 1311
1178 req->r_request->pages = req->r_pages; 1312 req->r_request->pages = req->r_pages;
1179 req->r_request->nr_pages = req->r_num_pages; 1313 req->r_request->nr_pages = req->r_num_pages;
1314#ifdef CONFIG_BLOCK
1315 req->r_request->bio = req->r_bio;
1316#endif
1317 req->r_request->trail = req->r_trail;
1180 1318
1181 register_request(osdc, req); 1319 register_request(osdc, req);
1182 1320
@@ -1493,6 +1631,9 @@ static struct ceph_msg *get_reply(struct ceph_connection *con,
1493 } 1631 }
1494 m->pages = req->r_pages; 1632 m->pages = req->r_pages;
1495 m->nr_pages = req->r_num_pages; 1633 m->nr_pages = req->r_num_pages;
1634#ifdef CONFIG_BLOCK
1635 m->bio = req->r_bio;
1636#endif
1496 } 1637 }
1497 *skip = 0; 1638 *skip = 0;
1498 req->r_con_filling_msg = ceph_con_get(con); 1639 req->r_con_filling_msg = ceph_con_get(con);
diff --git a/fs/ceph/osd_client.h b/fs/ceph/osd_client.h
index b687c2ea72e6..d583d1bf6cd9 100644
--- a/fs/ceph/osd_client.h
+++ b/fs/ceph/osd_client.h
@@ -15,6 +15,7 @@ struct ceph_snap_context;
15struct ceph_osd_request; 15struct ceph_osd_request;
16struct ceph_osd_client; 16struct ceph_osd_client;
17struct ceph_authorizer; 17struct ceph_authorizer;
18struct ceph_pagelist;
18 19
19/* 20/*
20 * completion callback for async writepages 21 * completion callback for async writepages
@@ -80,6 +81,11 @@ struct ceph_osd_request {
80 struct page **r_pages; /* pages for data payload */ 81 struct page **r_pages; /* pages for data payload */
81 int r_pages_from_pool; 82 int r_pages_from_pool;
82 int r_own_pages; /* if true, i own page list */ 83 int r_own_pages; /* if true, i own page list */
84#ifdef CONFIG_BLOCK
85 struct bio *r_bio; /* instead of pages */
86#endif
87
88 struct ceph_pagelist *r_trail; /* trailing part of the data */
83}; 89};
84 90
85struct ceph_osd_client { 91struct ceph_osd_client {
@@ -110,6 +116,36 @@ struct ceph_osd_client {
110 struct ceph_msgpool msgpool_op_reply; 116 struct ceph_msgpool msgpool_op_reply;
111}; 117};
112 118
119struct ceph_osd_req_op {
120 u16 op; /* CEPH_OSD_OP_* */
121 u32 flags; /* CEPH_OSD_FLAG_* */
122 union {
123 struct {
124 u64 offset, length;
125 u64 truncate_size;
126 u32 truncate_seq;
127 } extent;
128 struct {
129 const char *name;
130 u32 name_len;
131 const char *val;
132 u32 value_len;
133 __u8 cmp_op; /* CEPH_OSD_CMPXATTR_OP_* */
134 __u8 cmp_mode; /* CEPH_OSD_CMPXATTR_MODE_* */
135 } xattr;
136 struct {
137 __u8 class_len;
138 __u8 method_len;
139 __u8 argc;
140 u32 indata_len;
141 } cls;
142 struct {
143 u64 cookie, count;
144 } pgls;
145 };
146 u32 payload_len;
147};
148
113extern int ceph_osdc_init(struct ceph_osd_client *osdc, 149extern int ceph_osdc_init(struct ceph_osd_client *osdc,
114 struct ceph_client *client); 150 struct ceph_client *client);
115extern void ceph_osdc_stop(struct ceph_osd_client *osdc); 151extern void ceph_osdc_stop(struct ceph_osd_client *osdc);
@@ -122,27 +158,26 @@ extern void ceph_osdc_handle_map(struct ceph_osd_client *osdc,
122extern void ceph_calc_raw_layout(struct ceph_osd_client *osdc, 158extern void ceph_calc_raw_layout(struct ceph_osd_client *osdc,
123 struct ceph_file_layout *layout, 159 struct ceph_file_layout *layout,
124 u64 snapid, 160 u64 snapid,
125 u64 off, u64 len, u64 *bno, 161 u64 off, u64 *plen, u64 *bno,
126 struct ceph_osd_request *req); 162 struct ceph_osd_request *req,
163 struct ceph_osd_req_op *op);
127 164
128extern struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc, 165extern struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
129 int flags, 166 int flags,
130 struct ceph_snap_context *snapc, 167 struct ceph_snap_context *snapc,
131 int do_sync, 168 struct ceph_osd_req_op *ops,
132 bool use_mempool, 169 bool use_mempool,
133 gfp_t gfp_flags, 170 gfp_t gfp_flags,
134 struct page **pages); 171 struct page **pages,
172 struct bio *bio);
135 173
136extern void ceph_osdc_build_request(struct ceph_osd_request *req, 174extern void ceph_osdc_build_request(struct ceph_osd_request *req,
137 u64 off, u64 *plen, 175 u64 off, u64 *plen,
138 int opcode, 176 struct ceph_osd_req_op *src_ops,
139 struct ceph_snap_context *snapc, 177 struct ceph_snap_context *snapc,
140 int do_sync, 178 struct timespec *mtime,
141 u32 truncate_seq, 179 const char *oid,
142 u64 truncate_size, 180 int oid_len);
143 struct timespec *mtime,
144 const char *oid,
145 int oid_len);
146 181
147extern struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *, 182extern struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *,
148 struct ceph_file_layout *layout, 183 struct ceph_file_layout *layout,
diff --git a/fs/ceph/pagelist.c b/fs/ceph/pagelist.c
index 46a368b6dce5..326e1c04176f 100644
--- a/fs/ceph/pagelist.c
+++ b/fs/ceph/pagelist.c
@@ -39,7 +39,7 @@ static int ceph_pagelist_addpage(struct ceph_pagelist *pl)
39 return 0; 39 return 0;
40} 40}
41 41
42int ceph_pagelist_append(struct ceph_pagelist *pl, void *buf, size_t len) 42int ceph_pagelist_append(struct ceph_pagelist *pl, const void *buf, size_t len)
43{ 43{
44 while (pl->room < len) { 44 while (pl->room < len) {
45 size_t bit = pl->room; 45 size_t bit = pl->room;
diff --git a/fs/ceph/pagelist.h b/fs/ceph/pagelist.h
index e8a4187e1087..cc9327aa1c98 100644
--- a/fs/ceph/pagelist.h
+++ b/fs/ceph/pagelist.h
@@ -19,7 +19,7 @@ static inline void ceph_pagelist_init(struct ceph_pagelist *pl)
19} 19}
20extern int ceph_pagelist_release(struct ceph_pagelist *pl); 20extern int ceph_pagelist_release(struct ceph_pagelist *pl);
21 21
22extern int ceph_pagelist_append(struct ceph_pagelist *pl, void *d, size_t l); 22extern int ceph_pagelist_append(struct ceph_pagelist *pl, const void *d, size_t l);
23 23
24static inline int ceph_pagelist_encode_64(struct ceph_pagelist *pl, u64 v) 24static inline int ceph_pagelist_encode_64(struct ceph_pagelist *pl, u64 v)
25{ 25{