aboutsummaryrefslogtreecommitdiffstats
path: root/fs/ceph
diff options
context:
space:
mode:
authorAlex Elder <elder@inktank.com>2013-02-14 13:16:43 -0500
committerSage Weil <sage@inktank.com>2013-05-02 00:16:27 -0400
commit0fff87ec798abdb4a99f01cbb0197266bb68c5dc (patch)
tree33c853319e28ed9dd20835c1f3f066be404b50a1 /fs/ceph
parent2ac2b7a6d4976bd6b5dc0751aa77d12d48d3ac4c (diff)
libceph: separate read and write data
An osd request defines information about where data to be read should be placed as well as where data to write comes from. Currently these are represented by common fields. Keep information about data for writing separate from data to be read by splitting these into data_in and data_out fields. This is the key patch in this whole series, in that it actually identifies which osd requests generate outgoing data and which generate incoming data. It's less obvious (currently) that an osd CALL op generates both outgoing and incoming data; that's the focus of some upcoming work. This resolves: http://tracker.ceph.com/issues/4127 Signed-off-by: Alex Elder <elder@inktank.com> Reviewed-by: Josh Durgin <josh.durgin@inktank.com>
Diffstat (limited to 'fs/ceph')
-rw-r--r--fs/ceph/addr.c67
-rw-r--r--fs/ceph/file.c10
2 files changed, 41 insertions, 36 deletions
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
index 276fe96f12e3..c117c51741d5 100644
--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -243,9 +243,9 @@ static void finish_read(struct ceph_osd_request *req, struct ceph_msg *msg)
243 dout("finish_read %p req %p rc %d bytes %d\n", inode, req, rc, bytes); 243 dout("finish_read %p req %p rc %d bytes %d\n", inode, req, rc, bytes);
244 244
245 /* unlock all pages, zeroing any data we didn't read */ 245 /* unlock all pages, zeroing any data we didn't read */
246 BUG_ON(req->r_data.type != CEPH_OSD_DATA_TYPE_PAGES); 246 BUG_ON(req->r_data_in.type != CEPH_OSD_DATA_TYPE_PAGES);
247 for (i = 0; i < req->r_data.num_pages; i++, bytes -= PAGE_CACHE_SIZE) { 247 for (i = 0; i < req->r_data_in.num_pages; i++) {
248 struct page *page = req->r_data.pages[i]; 248 struct page *page = req->r_data_in.pages[i];
249 249
250 if (bytes < (int)PAGE_CACHE_SIZE) { 250 if (bytes < (int)PAGE_CACHE_SIZE) {
251 /* zero (remainder of) page */ 251 /* zero (remainder of) page */
@@ -258,8 +258,9 @@ static void finish_read(struct ceph_osd_request *req, struct ceph_msg *msg)
258 SetPageUptodate(page); 258 SetPageUptodate(page);
259 unlock_page(page); 259 unlock_page(page);
260 page_cache_release(page); 260 page_cache_release(page);
261 bytes -= PAGE_CACHE_SIZE;
261 } 262 }
262 kfree(req->r_data.pages); 263 kfree(req->r_data_in.pages);
263} 264}
264 265
265static void ceph_unlock_page_vector(struct page **pages, int num_pages) 266static void ceph_unlock_page_vector(struct page **pages, int num_pages)
@@ -337,10 +338,10 @@ static int start_read(struct inode *inode, struct list_head *page_list, int max)
337 } 338 }
338 pages[i] = page; 339 pages[i] = page;
339 } 340 }
340 req->r_data.type = CEPH_OSD_DATA_TYPE_PAGES; 341 req->r_data_in.type = CEPH_OSD_DATA_TYPE_PAGES;
341 req->r_data.pages = pages; 342 req->r_data_in.pages = pages;
342 req->r_data.num_pages = nr_pages; 343 req->r_data_in.num_pages = nr_pages;
343 req->r_data.alignment = 0; 344 req->r_data_in.alignment = 0;
344 req->r_callback = finish_read; 345 req->r_callback = finish_read;
345 req->r_inode = inode; 346 req->r_inode = inode;
346 347
@@ -563,7 +564,7 @@ static void writepages_finish(struct ceph_osd_request *req,
563 long writeback_stat; 564 long writeback_stat;
564 unsigned issued = ceph_caps_issued(ci); 565 unsigned issued = ceph_caps_issued(ci);
565 566
566 BUG_ON(req->r_data.type != CEPH_OSD_DATA_TYPE_PAGES); 567 BUG_ON(req->r_data_out.type != CEPH_OSD_DATA_TYPE_PAGES);
567 if (rc >= 0) { 568 if (rc >= 0) {
568 /* 569 /*
569 * Assume we wrote the pages we originally sent. The 570 * Assume we wrote the pages we originally sent. The
@@ -571,7 +572,7 @@ static void writepages_finish(struct ceph_osd_request *req,
571 * raced with a truncation and was adjusted at the osd, 572 * raced with a truncation and was adjusted at the osd,
572 * so don't believe the reply. 573 * so don't believe the reply.
573 */ 574 */
574 wrote = req->r_data.num_pages; 575 wrote = req->r_data_out.num_pages;
575 } else { 576 } else {
576 wrote = 0; 577 wrote = 0;
577 mapping_set_error(mapping, rc); 578 mapping_set_error(mapping, rc);
@@ -580,8 +581,8 @@ static void writepages_finish(struct ceph_osd_request *req,
580 inode, rc, bytes, wrote); 581 inode, rc, bytes, wrote);
581 582
582 /* clean all pages */ 583 /* clean all pages */
583 for (i = 0; i < req->r_data.num_pages; i++) { 584 for (i = 0; i < req->r_data_out.num_pages; i++) {
584 page = req->r_data.pages[i]; 585 page = req->r_data_out.pages[i];
585 BUG_ON(!page); 586 BUG_ON(!page);
586 WARN_ON(!PageUptodate(page)); 587 WARN_ON(!PageUptodate(page));
587 588
@@ -610,31 +611,34 @@ static void writepages_finish(struct ceph_osd_request *req,
610 unlock_page(page); 611 unlock_page(page);
611 } 612 }
612 dout("%p wrote+cleaned %d pages\n", inode, wrote); 613 dout("%p wrote+cleaned %d pages\n", inode, wrote);
613 ceph_put_wrbuffer_cap_refs(ci, req->r_data.num_pages, snapc); 614 ceph_put_wrbuffer_cap_refs(ci, req->r_data_out.num_pages, snapc);
614 615
615 ceph_release_pages(req->r_data.pages, req->r_data.num_pages); 616 ceph_release_pages(req->r_data_out.pages, req->r_data_out.num_pages);
616 if (req->r_data.pages_from_pool) 617 if (req->r_data_out.pages_from_pool)
617 mempool_free(req->r_data.pages, 618 mempool_free(req->r_data_out.pages,
618 ceph_sb_to_client(inode->i_sb)->wb_pagevec_pool); 619 ceph_sb_to_client(inode->i_sb)->wb_pagevec_pool);
619 else 620 else
620 kfree(req->r_data.pages); 621 kfree(req->r_data_out.pages);
621 ceph_osdc_put_request(req); 622 ceph_osdc_put_request(req);
622} 623}
623 624
624/* 625/*
625 * allocate a page vec, either directly, or if necessary, via a the 626 * allocate a page vec, either directly, or if necessary, via a the
626 * mempool. we avoid the mempool if we can because req->r_data.num_pages 627 * mempool. we avoid the mempool if we can because req->r_data_out.num_pages
627 * may be less than the maximum write size. 628 * may be less than the maximum write size.
628 */ 629 */
629static void alloc_page_vec(struct ceph_fs_client *fsc, 630static void alloc_page_vec(struct ceph_fs_client *fsc,
630 struct ceph_osd_request *req) 631 struct ceph_osd_request *req)
631{ 632{
632 req->r_data.pages = kmalloc(sizeof(struct page *) * req->r_data.num_pages, 633 size_t size;
633 GFP_NOFS); 634
634 if (!req->r_data.pages) { 635 size = sizeof (struct page *) * req->r_data_out.num_pages;
635 req->r_data.pages = mempool_alloc(fsc->wb_pagevec_pool, GFP_NOFS); 636 req->r_data_out.pages = kmalloc(size, GFP_NOFS);
636 req->r_data.pages_from_pool = 1; 637 if (!req->r_data_out.pages) {
637 WARN_ON(!req->r_data.pages); 638 req->r_data_out.pages = mempool_alloc(fsc->wb_pagevec_pool,
639 GFP_NOFS);
640 req->r_data_out.pages_from_pool = 1;
641 WARN_ON(!req->r_data_out.pages);
638 } 642 }
639} 643}
640 644
@@ -833,10 +837,11 @@ get_more_pages:
833 break; 837 break;
834 } 838 }
835 839
836 req->r_data.type = CEPH_OSD_DATA_TYPE_PAGES; 840 req->r_data_out.type = CEPH_OSD_DATA_TYPE_PAGES;
837 req->r_data.num_pages = calc_pages_for(0, len); 841 req->r_data_out.num_pages =
838 req->r_data.alignment = 0; 842 calc_pages_for(0, len);
839 max_pages = req->r_data.num_pages; 843 req->r_data_out.alignment = 0;
844 max_pages = req->r_data_out.num_pages;
840 845
841 alloc_page_vec(fsc, req); 846 alloc_page_vec(fsc, req);
842 req->r_callback = writepages_finish; 847 req->r_callback = writepages_finish;
@@ -858,7 +863,7 @@ get_more_pages:
858 } 863 }
859 864
860 set_page_writeback(page); 865 set_page_writeback(page);
861 req->r_data.pages[locked_pages] = page; 866 req->r_data_out.pages[locked_pages] = page;
862 locked_pages++; 867 locked_pages++;
863 next = page->index + 1; 868 next = page->index + 1;
864 } 869 }
@@ -888,14 +893,14 @@ get_more_pages:
888 } 893 }
889 894
890 /* submit the write */ 895 /* submit the write */
891 offset = req->r_data.pages[0]->index << PAGE_CACHE_SHIFT; 896 offset = req->r_data_out.pages[0]->index << PAGE_CACHE_SHIFT;
892 len = min((snap_size ? snap_size : i_size_read(inode)) - offset, 897 len = min((snap_size ? snap_size : i_size_read(inode)) - offset,
893 (u64)locked_pages << PAGE_CACHE_SHIFT); 898 (u64)locked_pages << PAGE_CACHE_SHIFT);
894 dout("writepages got %d pages at %llu~%llu\n", 899 dout("writepages got %d pages at %llu~%llu\n",
895 locked_pages, offset, len); 900 locked_pages, offset, len);
896 901
897 /* revise final length, page count */ 902 /* revise final length, page count */
898 req->r_data.num_pages = locked_pages; 903 req->r_data_out.num_pages = locked_pages;
899 req->r_request_ops[0].extent.length = cpu_to_le64(len); 904 req->r_request_ops[0].extent.length = cpu_to_le64(len);
900 req->r_request_ops[0].payload_len = cpu_to_le32(len); 905 req->r_request_ops[0].payload_len = cpu_to_le32(len);
901 req->r_request->hdr.data_len = cpu_to_le32(len); 906 req->r_request->hdr.data_len = cpu_to_le32(len);
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index 3643a386ab23..501fb37b81a2 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -568,13 +568,13 @@ more:
568 if ((file->f_flags & O_SYNC) == 0) { 568 if ((file->f_flags & O_SYNC) == 0) {
569 /* get a second commit callback */ 569 /* get a second commit callback */
570 req->r_safe_callback = sync_write_commit; 570 req->r_safe_callback = sync_write_commit;
571 req->r_data.own_pages = 1; 571 req->r_data_out.own_pages = 1;
572 } 572 }
573 } 573 }
574 req->r_data.type = CEPH_OSD_DATA_TYPE_PAGES; 574 req->r_data_out.type = CEPH_OSD_DATA_TYPE_PAGES;
575 req->r_data.pages = pages; 575 req->r_data_out.pages = pages;
576 req->r_data.num_pages = num_pages; 576 req->r_data_out.num_pages = num_pages;
577 req->r_data.alignment = page_align; 577 req->r_data_out.alignment = page_align;
578 req->r_inode = inode; 578 req->r_inode = inode;
579 579
580 ret = ceph_osdc_start_request(&fsc->client->osdc, req, false); 580 ret = ceph_osdc_start_request(&fsc->client->osdc, req, false);