aboutsummaryrefslogtreecommitdiffstats
path: root/fs/ceph
diff options
context:
space:
mode:
authormajianpeng <majianpeng@gmail.com>2013-09-26 02:42:17 -0400
committerSage Weil <sage@inktank.com>2013-12-13 12:13:17 -0500
commit8eb4efb091c8d8f70a0e6822288b043f8691ec51 (patch)
treeefd1b0ca17ea9b8625517094ba8fc057fc10941f /fs/ceph
parente8344e668915a7488def414f016dbf7d9fce84b5 (diff)
ceph: implement readv/preadv for sync operation
For readv/preadv sync-operatoin, ceph only do the first iov. Now implement this. Signed-off-by: Jianpeng Ma <majianpeng@gmail.com> Reviewed-by: Yan, Zheng <zheng.z.yan@intel.com>
Diffstat (limited to 'fs/ceph')
-rw-r--r--fs/ceph/file.c162
1 files changed, 116 insertions, 46 deletions
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index 5cf034e915bb..c4419e848a4f 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -408,51 +408,92 @@ more:
408 * 408 *
409 * If the read spans object boundary, just do multiple reads. 409 * If the read spans object boundary, just do multiple reads.
410 */ 410 */
411static ssize_t ceph_sync_read(struct file *file, char __user *data, 411static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *i,
412 unsigned len, loff_t *poff, int *checkeof) 412 int *checkeof)
413{ 413{
414 struct file *file = iocb->ki_filp;
414 struct inode *inode = file_inode(file); 415 struct inode *inode = file_inode(file);
415 struct page **pages; 416 struct page **pages;
416 u64 off = *poff; 417 u64 off = iocb->ki_pos;
417 int num_pages, ret; 418 int num_pages, ret;
419 size_t len = i->count;
418 420
419 dout("sync_read on file %p %llu~%u %s\n", file, off, len, 421 dout("sync_read on file %p %llu~%u %s\n", file, off,
422 (unsigned)len,
420 (file->f_flags & O_DIRECT) ? "O_DIRECT" : ""); 423 (file->f_flags & O_DIRECT) ? "O_DIRECT" : "");
421
422 if (file->f_flags & O_DIRECT) {
423 num_pages = calc_pages_for((unsigned long)data, len);
424 pages = ceph_get_direct_page_vector(data, num_pages, true);
425 } else {
426 num_pages = calc_pages_for(off, len);
427 pages = ceph_alloc_page_vector(num_pages, GFP_NOFS);
428 }
429 if (IS_ERR(pages))
430 return PTR_ERR(pages);
431
432 /* 424 /*
433 * flush any page cache pages in this range. this 425 * flush any page cache pages in this range. this
434 * will make concurrent normal and sync io slow, 426 * will make concurrent normal and sync io slow,
435 * but it will at least behave sensibly when they are 427 * but it will at least behave sensibly when they are
436 * in sequence. 428 * in sequence.
437 */ 429 */
438 ret = filemap_write_and_wait(inode->i_mapping); 430 ret = filemap_write_and_wait_range(inode->i_mapping, off,
431 off + len);
439 if (ret < 0) 432 if (ret < 0)
440 goto done; 433 return ret;
441 434
442 ret = striped_read(inode, off, len, pages, num_pages, checkeof, 435 if (file->f_flags & O_DIRECT) {
443 file->f_flags & O_DIRECT, 436 while (iov_iter_count(i)) {
444 (unsigned long)data & ~PAGE_MASK); 437 void __user *data = i->iov[0].iov_base + i->iov_offset;
438 size_t len = i->iov[0].iov_len - i->iov_offset;
439
440 num_pages = calc_pages_for((unsigned long)data, len);
441 pages = ceph_get_direct_page_vector(data,
442 num_pages, true);
443 if (IS_ERR(pages))
444 return PTR_ERR(pages);
445
446 ret = striped_read(inode, off, len,
447 pages, num_pages, checkeof,
448 1, (unsigned long)data & ~PAGE_MASK);
449 ceph_put_page_vector(pages, num_pages, true);
450
451 if (ret <= 0)
452 break;
453 off += ret;
454 iov_iter_advance(i, ret);
455 if (ret < len)
456 break;
457 }
458 } else {
459 num_pages = calc_pages_for(off, len);
460 pages = ceph_alloc_page_vector(num_pages, GFP_NOFS);
461 if (IS_ERR(pages))
462 return PTR_ERR(pages);
463 ret = striped_read(inode, off, len, pages,
464 num_pages, checkeof, 0, 0);
465 if (ret > 0) {
466 int l, k = 0;
467 size_t left = len = ret;
468
469 while (left) {
470 void __user *data = i->iov[0].iov_base
471 + i->iov_offset;
472 l = min(i->iov[0].iov_len - i->iov_offset,
473 left);
474
475 ret = ceph_copy_page_vector_to_user(&pages[k],
476 data, off,
477 l);
478 if (ret > 0) {
479 iov_iter_advance(i, ret);
480 left -= ret;
481 off += ret;
482 k = calc_pages_for(iocb->ki_pos,
483 len - left + 1) - 1;
484 BUG_ON(k >= num_pages && left);
485 } else
486 break;
487 }
488 }
489 ceph_release_page_vector(pages, num_pages);
490 }
445 491
446 if (ret >= 0 && (file->f_flags & O_DIRECT) == 0) 492 if (off > iocb->ki_pos) {
447 ret = ceph_copy_page_vector_to_user(pages, data, off, ret); 493 ret = off - iocb->ki_pos;
448 if (ret >= 0) 494 iocb->ki_pos = off;
449 *poff = off + ret; 495 }
450 496
451done:
452 if (file->f_flags & O_DIRECT)
453 ceph_put_page_vector(pages, num_pages, true);
454 else
455 ceph_release_page_vector(pages, num_pages);
456 dout("sync_read result %d\n", ret); 497 dout("sync_read result %d\n", ret);
457 return ret; 498 return ret;
458} 499}
@@ -758,55 +799,84 @@ static ssize_t ceph_aio_read(struct kiocb *iocb, const struct iovec *iov,
758{ 799{
759 struct file *filp = iocb->ki_filp; 800 struct file *filp = iocb->ki_filp;
760 struct ceph_file_info *fi = filp->private_data; 801 struct ceph_file_info *fi = filp->private_data;
761 loff_t *ppos = &iocb->ki_pos; 802 size_t len = iocb->ki_nbytes;
762 size_t len = iov->iov_len;
763 struct inode *inode = file_inode(filp); 803 struct inode *inode = file_inode(filp);
764 struct ceph_inode_info *ci = ceph_inode(inode); 804 struct ceph_inode_info *ci = ceph_inode(inode);
765 void __user *base = iov->iov_base;
766 ssize_t ret; 805 ssize_t ret;
767 int want, got = 0; 806 int want, got = 0;
768 int checkeof = 0, read = 0; 807 int checkeof = 0, read = 0;
769 808
770 dout("aio_read %p %llx.%llx %llu~%u trying to get caps on %p\n",
771 inode, ceph_vinop(inode), pos, (unsigned)len, inode);
772again: 809again:
810 dout("aio_read %p %llx.%llx %llu~%u trying to get caps on %p\n",
811 inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len, inode);
812
773 if (fi->fmode & CEPH_FILE_MODE_LAZY) 813 if (fi->fmode & CEPH_FILE_MODE_LAZY)
774 want = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO; 814 want = CEPH_CAP_FILE_CACHE | CEPH_CAP_FILE_LAZYIO;
775 else 815 else
776 want = CEPH_CAP_FILE_CACHE; 816 want = CEPH_CAP_FILE_CACHE;
777 ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want, &got, -1); 817 ret = ceph_get_caps(ci, CEPH_CAP_FILE_RD, want, &got, -1);
778 if (ret < 0) 818 if (ret < 0)
779 goto out; 819 return ret;
780 dout("aio_read %p %llx.%llx %llu~%u got cap refs on %s\n",
781 inode, ceph_vinop(inode), pos, (unsigned)len,
782 ceph_cap_string(got));
783 820
784 if ((got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) == 0 || 821 if ((got & (CEPH_CAP_FILE_CACHE|CEPH_CAP_FILE_LAZYIO)) == 0 ||
785 (iocb->ki_filp->f_flags & O_DIRECT) || 822 (iocb->ki_filp->f_flags & O_DIRECT) ||
786 (fi->flags & CEPH_F_SYNC)) 823 (fi->flags & CEPH_F_SYNC)) {
824 struct iov_iter i;
825
826 dout("aio_sync_read %p %llx.%llx %llu~%u got cap refs on %s\n",
827 inode, ceph_vinop(inode), iocb->ki_pos, (unsigned)len,
828 ceph_cap_string(got));
829
830 if (!read) {
831 ret = generic_segment_checks(iov, &nr_segs,
832 &len, VERIFY_WRITE);
833 if (ret)
834 goto out;
835 }
836
837 iov_iter_init(&i, iov, nr_segs, len, read);
838
787 /* hmm, this isn't really async... */ 839 /* hmm, this isn't really async... */
788 ret = ceph_sync_read(filp, base, len, ppos, &checkeof); 840 ret = ceph_sync_read(iocb, &i, &checkeof);
789 else 841 } else {
790 ret = generic_file_aio_read(iocb, iov, nr_segs, pos); 842 /*
843 * We can't modify the content of iov,
844 * so we only read from beginning.
845 */
846 if (read) {
847 iocb->ki_pos = pos;
848 len = iocb->ki_nbytes;
849 read = 0;
850 }
851 dout("aio_read %p %llx.%llx %llu~%u got cap refs on %s\n",
852 inode, ceph_vinop(inode), pos, (unsigned)len,
853 ceph_cap_string(got));
791 854
855 ret = generic_file_aio_read(iocb, iov, nr_segs, pos);
856 }
792out: 857out:
793 dout("aio_read %p %llx.%llx dropping cap refs on %s = %d\n", 858 dout("aio_read %p %llx.%llx dropping cap refs on %s = %d\n",
794 inode, ceph_vinop(inode), ceph_cap_string(got), (int)ret); 859 inode, ceph_vinop(inode), ceph_cap_string(got), (int)ret);
795 ceph_put_cap_refs(ci, got); 860 ceph_put_cap_refs(ci, got);
796 861
797 if (checkeof && ret >= 0) { 862 if (checkeof && ret >= 0) {
798 int statret = ceph_do_getattr(inode, CEPH_STAT_CAP_SIZE); 863 int statret = ceph_do_getattr(inode,
864 CEPH_STAT_CAP_SIZE);
799 865
800 /* hit EOF or hole? */ 866 /* hit EOF or hole? */
801 if (statret == 0 && *ppos < inode->i_size) { 867 if (statret == 0 && iocb->ki_pos < inode->i_size &&
802 dout("aio_read sync_read hit hole, ppos %lld < size %lld, reading more\n", *ppos, inode->i_size); 868 ret < len) {
869 dout("sync_read hit hole, ppos %lld < size %lld"
870 ", reading more\n", iocb->ki_pos,
871 inode->i_size);
872
803 read += ret; 873 read += ret;
804 base += ret;
805 len -= ret; 874 len -= ret;
806 checkeof = 0; 875 checkeof = 0;
807 goto again; 876 goto again;
808 } 877 }
809 } 878 }
879
810 if (ret >= 0) 880 if (ret >= 0)
811 ret += read; 881 ret += read;
812 882