aboutsummaryrefslogtreecommitdiffstats
path: root/fs/ceph
diff options
context:
space:
mode:
authorYan, Zheng <zyan@redhat.com>2015-12-23 08:23:38 -0500
committerIlya Dryomov <idryomov@gmail.com>2016-01-21 13:36:07 -0500
commitc8fe9b17d055fe80e1a1591f5900ce41fbf6b796 (patch)
tree6cece642189c7c68c8b6db89262eccc6167f998d /fs/ceph
parent458c4703aea04674f0b5f63b43cf5669a602a110 (diff)
ceph: Asynchronous IO support
The basic idea of AIO support is simple, just call kiocb::ki_complete() in OSD request's complete callback. But there are several special cases. when IO span multiple objects, we need to wait until all OSD requests are complete, then call kiocb::ki_complete(). Error handling in this case is tricky too. For simplify, AIO both span multiple objects and extends i_size are not allowed. Another special case is check EOF for reading (other client can write to the file and extend i_size concurrently). For simplify, the direct-IO/AIO code path does do the check, fallback to normal syn read instead. Signed-off-by: Yan, Zheng <zyan@redhat.com>
Diffstat (limited to 'fs/ceph')
-rw-r--r--fs/ceph/file.c397
1 files changed, 278 insertions, 119 deletions
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index 3c68e6aee2f0..8e924b7dd498 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -397,8 +397,9 @@ int ceph_release(struct inode *inode, struct file *file)
397} 397}
398 398
399enum { 399enum {
400 CHECK_EOF = 1, 400 HAVE_RETRIED = 1,
401 READ_INLINE = 2, 401 CHECK_EOF = 2,
402 READ_INLINE = 3,
402}; 403};
403 404
404/* 405/*
@@ -411,17 +412,14 @@ enum {
411static int striped_read(struct inode *inode, 412static int striped_read(struct inode *inode,
412 u64 off, u64 len, 413 u64 off, u64 len,
413 struct page **pages, int num_pages, 414 struct page **pages, int num_pages,
414 int *checkeof, bool o_direct, 415 int *checkeof)
415 unsigned long buf_align)
416{ 416{
417 struct ceph_fs_client *fsc = ceph_inode_to_client(inode); 417 struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
418 struct ceph_inode_info *ci = ceph_inode(inode); 418 struct ceph_inode_info *ci = ceph_inode(inode);
419 u64 pos, this_len, left; 419 u64 pos, this_len, left;
420 int io_align, page_align; 420 int page_align, pages_left;
421 int pages_left; 421 int read, ret;
422 int read;
423 struct page **page_pos; 422 struct page **page_pos;
424 int ret;
425 bool hit_stripe, was_short; 423 bool hit_stripe, was_short;
426 424
427 /* 425 /*
@@ -432,13 +430,9 @@ static int striped_read(struct inode *inode,
432 page_pos = pages; 430 page_pos = pages;
433 pages_left = num_pages; 431 pages_left = num_pages;
434 read = 0; 432 read = 0;
435 io_align = off & ~PAGE_MASK;
436 433
437more: 434more:
438 if (o_direct) 435 page_align = pos & ~PAGE_MASK;
439 page_align = (pos - io_align + buf_align) & ~PAGE_MASK;
440 else
441 page_align = pos & ~PAGE_MASK;
442 this_len = left; 436 this_len = left;
443 ret = ceph_osdc_readpages(&fsc->client->osdc, ceph_vino(inode), 437 ret = ceph_osdc_readpages(&fsc->client->osdc, ceph_vino(inode),
444 &ci->i_layout, pos, &this_len, 438 &ci->i_layout, pos, &this_len,
@@ -457,8 +451,7 @@ more:
457 if (was_short && (pos + ret < inode->i_size)) { 451 if (was_short && (pos + ret < inode->i_size)) {
458 int zlen = min(this_len - ret, 452 int zlen = min(this_len - ret,
459 inode->i_size - pos - ret); 453 inode->i_size - pos - ret);
460 int zoff = (o_direct ? buf_align : io_align) + 454 int zoff = (off & ~PAGE_MASK) + read + ret;
461 read + ret;
462 dout(" zero gap %llu to %llu\n", 455 dout(" zero gap %llu to %llu\n",
463 pos + ret, pos + ret + zlen); 456 pos + ret, pos + ret + zlen);
464 ceph_zero_page_vector_range(zoff, zlen, pages); 457 ceph_zero_page_vector_range(zoff, zlen, pages);
@@ -521,54 +514,28 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *i,
521 if (ret < 0) 514 if (ret < 0)
522 return ret; 515 return ret;
523 516
524 if (iocb->ki_flags & IOCB_DIRECT) { 517 num_pages = calc_pages_for(off, len);
525 while (iov_iter_count(i)) { 518 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
526 size_t start; 519 if (IS_ERR(pages))
527 ssize_t n; 520 return PTR_ERR(pages);
528 521 ret = striped_read(inode, off, len, pages,
529 n = dio_get_pagev_size(i); 522 num_pages, checkeof);
530 pages = dio_get_pages_alloc(i, n, &start, &num_pages); 523 if (ret > 0) {
531 if (IS_ERR(pages)) 524 int l, k = 0;
532 return PTR_ERR(pages); 525 size_t left = ret;
533 526
534 ret = striped_read(inode, off, n, 527 while (left) {
535 pages, num_pages, checkeof, 528 size_t page_off = off & ~PAGE_MASK;
536 1, start); 529 size_t copy = min_t(size_t, left,
537 530 PAGE_SIZE - page_off);
538 ceph_put_page_vector(pages, num_pages, true); 531 l = copy_page_to_iter(pages[k++], page_off, copy, i);
539 532 off += l;
540 if (ret <= 0) 533 left -= l;
541 break; 534 if (l < copy)
542 off += ret;
543 iov_iter_advance(i, ret);
544 if (ret < n)
545 break; 535 break;
546 } 536 }
547 } else {
548 num_pages = calc_pages_for(off, len);
549 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
550 if (IS_ERR(pages))
551 return PTR_ERR(pages);
552 ret = striped_read(inode, off, len, pages,
553 num_pages, checkeof, 0, 0);
554 if (ret > 0) {
555 int l, k = 0;
556 size_t left = ret;
557
558 while (left) {
559 size_t page_off = off & ~PAGE_MASK;
560 size_t copy = min_t(size_t,
561 PAGE_SIZE - page_off, left);
562 l = copy_page_to_iter(pages[k++], page_off,
563 copy, i);
564 off += l;
565 left -= l;
566 if (l < copy)
567 break;
568 }
569 }
570 ceph_release_page_vector(pages, num_pages);
571 } 537 }
538 ceph_release_page_vector(pages, num_pages);
572 539
573 if (off > iocb->ki_pos) { 540 if (off > iocb->ki_pos) {
574 ret = off - iocb->ki_pos; 541 ret = off - iocb->ki_pos;
@@ -579,6 +546,113 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *i,
579 return ret; 546 return ret;
580} 547}
581 548
549struct ceph_aio_request {
550 struct kiocb *iocb;
551 size_t total_len;
552 int write;
553 int error;
554 struct list_head osd_reqs;
555 unsigned num_reqs;
556 atomic_t pending_reqs;
557 struct ceph_cap_flush *prealloc_cf;
558};
559
560static void ceph_aio_complete(struct inode *inode,
561 struct ceph_aio_request *aio_req)
562{
563 struct ceph_inode_info *ci = ceph_inode(inode);
564 int ret;
565
566 if (!atomic_dec_and_test(&aio_req->pending_reqs))
567 return;
568
569 ret = aio_req->error;
570 if (!ret)
571 ret = aio_req->total_len;
572
573 dout("ceph_aio_complete %p rc %d\n", inode, ret);
574
575 if (ret >= 0 && aio_req->write) {
576 int dirty;
577
578 loff_t endoff = aio_req->iocb->ki_pos + aio_req->total_len;
579 if (endoff > i_size_read(inode)) {
580 if (ceph_inode_set_size(inode, endoff))
581 ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL);
582 }
583
584 spin_lock(&ci->i_ceph_lock);
585 ci->i_inline_version = CEPH_INLINE_NONE;
586 dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR,
587 &aio_req->prealloc_cf);
588 spin_unlock(&ci->i_ceph_lock);
589 if (dirty)
590 __mark_inode_dirty(inode, dirty);
591
592 }
593
594 ceph_put_cap_refs(ci, (aio_req->write ? CEPH_CAP_FILE_WR :
595 CEPH_CAP_FILE_RD));
596
597 aio_req->iocb->ki_complete(aio_req->iocb, ret, 0);
598
599 ceph_free_cap_flush(aio_req->prealloc_cf);
600 kfree(aio_req);
601}
602
603static void ceph_aio_complete_req(struct ceph_osd_request *req,
604 struct ceph_msg *msg)
605{
606 int rc = req->r_result;
607 struct inode *inode = req->r_inode;
608 struct ceph_aio_request *aio_req = req->r_priv;
609 struct ceph_osd_data *osd_data = osd_req_op_extent_osd_data(req, 0);
610 int num_pages = calc_pages_for((u64)osd_data->alignment,
611 osd_data->length);
612
613 dout("ceph_aio_complete_req %p rc %d bytes %llu\n",
614 inode, rc, osd_data->length);
615
616 if (rc == -EOLDSNAPC) {
617 BUG_ON(1);
618 }
619
620 if (!aio_req->write) {
621 if (rc == -ENOENT)
622 rc = 0;
623 if (rc >= 0 && osd_data->length > rc) {
624 int zoff = osd_data->alignment + rc;
625 int zlen = osd_data->length - rc;
626 /*
627 * If read is satisfied by single OSD request,
628 * it can pass EOF. Otherwise read is within
629 * i_size.
630 */
631 if (aio_req->num_reqs == 1) {
632 loff_t i_size = i_size_read(inode);
633 loff_t endoff = aio_req->iocb->ki_pos + rc;
634 if (endoff < i_size)
635 zlen = min_t(size_t, zlen,
636 i_size - endoff);
637 aio_req->total_len = rc + zlen;
638 }
639
640 if (zlen > 0)
641 ceph_zero_page_vector_range(zoff, zlen,
642 osd_data->pages);
643 }
644 }
645
646 ceph_put_page_vector(osd_data->pages, num_pages, false);
647 ceph_osdc_put_request(req);
648
649 if (rc < 0)
650 cmpxchg(&aio_req->error, 0, rc);
651
652 ceph_aio_complete(inode, aio_req);
653 return;
654}
655
582/* 656/*
583 * Write commit request unsafe callback, called to tell us when a 657 * Write commit request unsafe callback, called to tell us when a
584 * request is unsafe (that is, in flight--has been handed to the 658 * request is unsafe (that is, in flight--has been handed to the
@@ -612,16 +686,10 @@ static void ceph_sync_write_unsafe(struct ceph_osd_request *req, bool unsafe)
612} 686}
613 687
614 688
615/*
616 * Synchronous write, straight from __user pointer or user pages.
617 *
618 * If write spans object boundary, just do multiple writes. (For a
619 * correct atomic write, we should e.g. take write locks on all
620 * objects, rollback on failure, etc.)
621 */
622static ssize_t 689static ssize_t
623ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos, 690ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
624 struct ceph_snap_context *snapc) 691 struct ceph_snap_context *snapc,
692 struct ceph_cap_flush **pcf)
625{ 693{
626 struct file *file = iocb->ki_filp; 694 struct file *file = iocb->ki_filp;
627 struct inode *inode = file_inode(file); 695 struct inode *inode = file_inode(file);
@@ -630,44 +698,52 @@ ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos,
630 struct ceph_vino vino; 698 struct ceph_vino vino;
631 struct ceph_osd_request *req; 699 struct ceph_osd_request *req;
632 struct page **pages; 700 struct page **pages;
633 int num_pages; 701 struct ceph_aio_request *aio_req = NULL;
634 int written = 0; 702 int num_pages = 0;
635 int flags; 703 int flags;
636 int check_caps = 0;
637 int ret; 704 int ret;
638 struct timespec mtime = CURRENT_TIME; 705 struct timespec mtime = CURRENT_TIME;
639 size_t count = iov_iter_count(from); 706 size_t count = iov_iter_count(iter);
707 loff_t pos = iocb->ki_pos;
708 bool write = iov_iter_rw(iter) == WRITE;
640 709
641 if (ceph_snap(file_inode(file)) != CEPH_NOSNAP) 710 if (write && ceph_snap(file_inode(file)) != CEPH_NOSNAP)
642 return -EROFS; 711 return -EROFS;
643 712
644 dout("sync_direct_write on file %p %lld~%u\n", file, pos, 713 dout("sync_direct_read_write (%s) on file %p %lld~%u\n",
645 (unsigned)count); 714 (write ? "write" : "read"), file, pos, (unsigned)count);
646 715
647 ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + count); 716 ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + count);
648 if (ret < 0) 717 if (ret < 0)
649 return ret; 718 return ret;
650 719
651 ret = invalidate_inode_pages2_range(inode->i_mapping, 720 if (write) {
652 pos >> PAGE_CACHE_SHIFT, 721 ret = invalidate_inode_pages2_range(inode->i_mapping,
653 (pos + count) >> PAGE_CACHE_SHIFT); 722 pos >> PAGE_CACHE_SHIFT,
654 if (ret < 0) 723 (pos + count) >> PAGE_CACHE_SHIFT);
655 dout("invalidate_inode_pages2_range returned %d\n", ret); 724 if (ret < 0)
725 dout("invalidate_inode_pages2_range returned %d\n", ret);
656 726
657 flags = CEPH_OSD_FLAG_ORDERSNAP | 727 flags = CEPH_OSD_FLAG_ORDERSNAP |
658 CEPH_OSD_FLAG_ONDISK | 728 CEPH_OSD_FLAG_ONDISK |
659 CEPH_OSD_FLAG_WRITE; 729 CEPH_OSD_FLAG_WRITE;
730 } else {
731 flags = CEPH_OSD_FLAG_READ;
732 }
660 733
661 while (iov_iter_count(from) > 0) { 734 while (iov_iter_count(iter) > 0) {
662 u64 len = dio_get_pagev_size(from); 735 u64 size = dio_get_pagev_size(iter);
663 size_t start; 736 size_t start = 0;
664 ssize_t n; 737 ssize_t len;
665 738
666 vino = ceph_vino(inode); 739 vino = ceph_vino(inode);
667 req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, 740 req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
668 vino, pos, &len, 0, 741 vino, pos, &size, 0,
669 2,/*include a 'startsync' command*/ 742 /*include a 'startsync' command*/
670 CEPH_OSD_OP_WRITE, flags, snapc, 743 write ? 2 : 1,
744 write ? CEPH_OSD_OP_WRITE :
745 CEPH_OSD_OP_READ,
746 flags, snapc,
671 ci->i_truncate_seq, 747 ci->i_truncate_seq,
672 ci->i_truncate_size, 748 ci->i_truncate_size,
673 false); 749 false);
@@ -676,10 +752,8 @@ ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos,
676 break; 752 break;
677 } 753 }
678 754
679 osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC, 0); 755 len = size;
680 756 pages = dio_get_pages_alloc(iter, len, &start, &num_pages);
681 n = len;
682 pages = dio_get_pages_alloc(from, len, &start, &num_pages);
683 if (IS_ERR(pages)) { 757 if (IS_ERR(pages)) {
684 ceph_osdc_put_request(req); 758 ceph_osdc_put_request(req);
685 ret = PTR_ERR(pages); 759 ret = PTR_ERR(pages);
@@ -687,47 +761,126 @@ ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos,
687 } 761 }
688 762
689 /* 763 /*
690 * throw out any page cache pages in this range. this 764 * To simplify error handling, allow AIO when IO within i_size
691 * may block. 765 * or IO can be satisfied by single OSD request.
692 */ 766 */
693 truncate_inode_pages_range(inode->i_mapping, pos, 767 if (pos == iocb->ki_pos && !is_sync_kiocb(iocb) &&
694 (pos+n) | (PAGE_CACHE_SIZE-1)); 768 (len == count || pos + count <= i_size_read(inode))) {
695 osd_req_op_extent_osd_data_pages(req, 0, pages, n, start, 769 aio_req = kzalloc(sizeof(*aio_req), GFP_KERNEL);
696 false, false); 770 if (aio_req) {
771 aio_req->iocb = iocb;
772 aio_req->write = write;
773 INIT_LIST_HEAD(&aio_req->osd_reqs);
774 if (write) {
775 swap(aio_req->prealloc_cf, *pcf);
776 }
777 }
778 /* ignore error */
779 }
780
781 if (write) {
782 /*
783 * throw out any page cache pages in this range. this
784 * may block.
785 */
786 truncate_inode_pages_range(inode->i_mapping, pos,
787 (pos+len) | (PAGE_CACHE_SIZE - 1));
788
789 osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC, 0);
790 }
791
792
793 osd_req_op_extent_osd_data_pages(req, 0, pages, len, start,
794 false, false);
697 795
698 /* BUG_ON(vino.snap != CEPH_NOSNAP); */
699 ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime); 796 ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime);
700 797
701 ret = ceph_osdc_start_request(&fsc->client->osdc, req, false); 798 if (aio_req) {
799 aio_req->total_len += len;
800 aio_req->num_reqs++;
801 atomic_inc(&aio_req->pending_reqs);
802
803 req->r_callback = ceph_aio_complete_req;
804 req->r_inode = inode;
805 req->r_priv = aio_req;
806 list_add_tail(&req->r_unsafe_item, &aio_req->osd_reqs);
807
808 pos += len;
809 iov_iter_advance(iter, len);
810 continue;
811 }
812
813 ret = ceph_osdc_start_request(req->r_osdc, req, false);
702 if (!ret) 814 if (!ret)
703 ret = ceph_osdc_wait_request(&fsc->client->osdc, req); 815 ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
704 816
817 size = i_size_read(inode);
818 if (!write) {
819 if (ret == -ENOENT)
820 ret = 0;
821 if (ret >= 0 && ret < len && pos + ret < size) {
822 int zlen = min_t(size_t, len - ret,
823 size - pos - ret);
824 ceph_zero_page_vector_range(start + ret, zlen,
825 pages);
826 ret += zlen;
827 }
828 if (ret >= 0)
829 len = ret;
830 }
831
705 ceph_put_page_vector(pages, num_pages, false); 832 ceph_put_page_vector(pages, num_pages, false);
706 833
707 ceph_osdc_put_request(req); 834 ceph_osdc_put_request(req);
708 if (ret) 835 if (ret < 0)
709 break; 836 break;
710 pos += n;
711 written += n;
712 iov_iter_advance(from, n);
713 837
714 if (pos > i_size_read(inode)) { 838 pos += len;
715 check_caps = ceph_inode_set_size(inode, pos); 839 iov_iter_advance(iter, len);
716 if (check_caps) 840
841 if (!write && pos >= size)
842 break;
843
844 if (write && pos > size) {
845 if (ceph_inode_set_size(inode, pos))
717 ceph_check_caps(ceph_inode(inode), 846 ceph_check_caps(ceph_inode(inode),
718 CHECK_CAPS_AUTHONLY, 847 CHECK_CAPS_AUTHONLY,
719 NULL); 848 NULL);
720 } 849 }
721 } 850 }
722 851
723 if (ret != -EOLDSNAPC && written > 0) { 852 if (aio_req) {
853 if (aio_req->num_reqs == 0) {
854 kfree(aio_req);
855 return ret;
856 }
857
858 ceph_get_cap_refs(ci, write ? CEPH_CAP_FILE_WR :
859 CEPH_CAP_FILE_RD);
860
861 while (!list_empty(&aio_req->osd_reqs)) {
862 req = list_first_entry(&aio_req->osd_reqs,
863 struct ceph_osd_request,
864 r_unsafe_item);
865 list_del_init(&req->r_unsafe_item);
866 if (ret >= 0)
867 ret = ceph_osdc_start_request(req->r_osdc,
868 req, false);
869 if (ret < 0) {
870 req->r_result = ret;
871 ceph_aio_complete_req(req, NULL);
872 }
873 }
874 return -EIOCBQUEUED;
875 }
876
877 if (ret != -EOLDSNAPC && pos > iocb->ki_pos) {
878 ret = pos - iocb->ki_pos;
724 iocb->ki_pos = pos; 879 iocb->ki_pos = pos;
725 ret = written;
726 } 880 }
727 return ret; 881 return ret;
728} 882}
729 883
730
731/* 884/*
732 * Synchronous write, straight from __user pointer or user pages. 885 * Synchronous write, straight from __user pointer or user pages.
733 * 886 *
@@ -897,8 +1050,14 @@ again:
897 ceph_cap_string(got)); 1050 ceph_cap_string(got));
898 1051
899 if (ci->i_inline_version == CEPH_INLINE_NONE) { 1052 if (ci->i_inline_version == CEPH_INLINE_NONE) {
900 /* hmm, this isn't really async... */ 1053 if (!retry_op && (iocb->ki_flags & IOCB_DIRECT)) {
901 ret = ceph_sync_read(iocb, to, &retry_op); 1054 ret = ceph_direct_read_write(iocb, to,
1055 NULL, NULL);
1056 if (ret >= 0 && ret < len)
1057 retry_op = CHECK_EOF;
1058 } else {
1059 ret = ceph_sync_read(iocb, to, &retry_op);
1060 }
902 } else { 1061 } else {
903 retry_op = READ_INLINE; 1062 retry_op = READ_INLINE;
904 } 1063 }
@@ -916,7 +1075,7 @@ again:
916 pinned_page = NULL; 1075 pinned_page = NULL;
917 } 1076 }
918 ceph_put_cap_refs(ci, got); 1077 ceph_put_cap_refs(ci, got);
919 if (retry_op && ret >= 0) { 1078 if (retry_op > HAVE_RETRIED && ret >= 0) {
920 int statret; 1079 int statret;
921 struct page *page = NULL; 1080 struct page *page = NULL;
922 loff_t i_size; 1081 loff_t i_size;
@@ -973,7 +1132,7 @@ again:
973 1132
974 read += ret; 1133 read += ret;
975 len -= ret; 1134 len -= ret;
976 retry_op = 0; 1135 retry_op = HAVE_RETRIED;
977 goto again; 1136 goto again;
978 } 1137 }
979 } 1138 }
@@ -1088,8 +1247,8 @@ retry_snap:
1088 /* we might need to revert back to that point */ 1247 /* we might need to revert back to that point */
1089 data = *from; 1248 data = *from;
1090 if (iocb->ki_flags & IOCB_DIRECT) 1249 if (iocb->ki_flags & IOCB_DIRECT)
1091 written = ceph_sync_direct_write(iocb, &data, pos, 1250 written = ceph_direct_read_write(iocb, &data, snapc,
1092 snapc); 1251 &prealloc_cf);
1093 else 1252 else
1094 written = ceph_sync_write(iocb, &data, pos, snapc); 1253 written = ceph_sync_write(iocb, &data, pos, snapc);
1095 if (written == -EOLDSNAPC) { 1254 if (written == -EOLDSNAPC) {