aboutsummaryrefslogtreecommitdiffstats
path: root/fs
diff options
context:
space:
mode:
authorLinus Torvalds <torvalds@linux-foundation.org>2016-01-24 15:34:13 -0500
committerLinus Torvalds <torvalds@linux-foundation.org>2016-01-24 15:34:13 -0500
commit00e3f5cc305c8a056a22cecedab3a71d59dae1fc (patch)
treeb2f3cff7d986768aab7ebc84b9efefa8ea3ecc00 /fs
parent772950ed21c36f4157ff34e7d10fb61975f64558 (diff)
parent7e01726a6853e032536ed7e75c1e1232872ff318 (diff)
Merge branch 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client
Pull Ceph updates from Sage Weil: "The two main changes are aio support in CephFS, and a series that fixes several issues in the authentication key timeout/renewal code. On top of that are a variety of cleanups and minor bug fixes" * 'for-linus' of git://git.kernel.org/pub/scm/linux/kernel/git/sage/ceph-client: libceph: remove outdated comment libceph: kill off ceph_x_ticket_handler::validity libceph: invalidate AUTH in addition to a service ticket libceph: fix authorizer invalidation, take 2 libceph: clear messenger auth_retry flag if we fault libceph: fix ceph_msg_revoke() libceph: use list_for_each_entry_safe ceph: use i_size_{read,write} to get/set i_size ceph: re-send AIO write request when getting -EOLDSNAP error ceph: Asynchronous IO support ceph: Avoid to propagate the invalid page point ceph: fix double page_unlock() in page_mkwrite() rbd: delete an unnecessary check before rbd_dev_destroy() libceph: use list_next_entry instead of list_entry_next ceph: ceph_frag_contains_value can be boolean ceph: remove unused functions in ceph_frag.h
Diffstat (limited to 'fs')
-rw-r--r--fs/ceph/addr.c14
-rw-r--r--fs/ceph/cache.c8
-rw-r--r--fs/ceph/file.c509
-rw-r--r--fs/ceph/inode.c8
4 files changed, 389 insertions, 150 deletions
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
index b7d218a168fb..c22213789090 100644
--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -1108,7 +1108,7 @@ retry_locked:
1108 return 0; 1108 return 0;
1109 1109
1110 /* past end of file? */ 1110 /* past end of file? */
1111 i_size = inode->i_size; /* caller holds i_mutex */ 1111 i_size = i_size_read(inode);
1112 1112
1113 if (page_off >= i_size || 1113 if (page_off >= i_size ||
1114 (pos_in_page == 0 && (pos+len) >= i_size && 1114 (pos_in_page == 0 && (pos+len) >= i_size &&
@@ -1149,7 +1149,6 @@ static int ceph_write_begin(struct file *file, struct address_space *mapping,
1149 page = grab_cache_page_write_begin(mapping, index, 0); 1149 page = grab_cache_page_write_begin(mapping, index, 0);
1150 if (!page) 1150 if (!page)
1151 return -ENOMEM; 1151 return -ENOMEM;
1152 *pagep = page;
1153 1152
1154 dout("write_begin file %p inode %p page %p %d~%d\n", file, 1153 dout("write_begin file %p inode %p page %p %d~%d\n", file,
1155 inode, page, (int)pos, (int)len); 1154 inode, page, (int)pos, (int)len);
@@ -1184,8 +1183,7 @@ static int ceph_write_end(struct file *file, struct address_space *mapping,
1184 zero_user_segment(page, from+copied, len); 1183 zero_user_segment(page, from+copied, len);
1185 1184
1186 /* did file size increase? */ 1185 /* did file size increase? */
1187 /* (no need for i_size_read(); we caller holds i_mutex */ 1186 if (pos+copied > i_size_read(inode))
1188 if (pos+copied > inode->i_size)
1189 check_cap = ceph_inode_set_size(inode, pos+copied); 1187 check_cap = ceph_inode_set_size(inode, pos+copied);
1190 1188
1191 if (!PageUptodate(page)) 1189 if (!PageUptodate(page))
@@ -1378,11 +1376,13 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf)
1378 1376
1379 ret = VM_FAULT_NOPAGE; 1377 ret = VM_FAULT_NOPAGE;
1380 if ((off > size) || 1378 if ((off > size) ||
1381 (page->mapping != inode->i_mapping)) 1379 (page->mapping != inode->i_mapping)) {
1380 unlock_page(page);
1382 goto out; 1381 goto out;
1382 }
1383 1383
1384 ret = ceph_update_writeable_page(vma->vm_file, off, len, page); 1384 ret = ceph_update_writeable_page(vma->vm_file, off, len, page);
1385 if (ret == 0) { 1385 if (ret >= 0) {
1386 /* success. we'll keep the page locked. */ 1386 /* success. we'll keep the page locked. */
1387 set_page_dirty(page); 1387 set_page_dirty(page);
1388 ret = VM_FAULT_LOCKED; 1388 ret = VM_FAULT_LOCKED;
@@ -1393,8 +1393,6 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf)
1393 ret = VM_FAULT_SIGBUS; 1393 ret = VM_FAULT_SIGBUS;
1394 } 1394 }
1395out: 1395out:
1396 if (ret != VM_FAULT_LOCKED)
1397 unlock_page(page);
1398 if (ret == VM_FAULT_LOCKED || 1396 if (ret == VM_FAULT_LOCKED ||
1399 ci->i_inline_version != CEPH_INLINE_NONE) { 1397 ci->i_inline_version != CEPH_INLINE_NONE) {
1400 int dirty; 1398 int dirty;
diff --git a/fs/ceph/cache.c b/fs/ceph/cache.c
index 7680e2626815..a351480dbabc 100644
--- a/fs/ceph/cache.c
+++ b/fs/ceph/cache.c
@@ -106,7 +106,7 @@ static uint16_t ceph_fscache_inode_get_aux(const void *cookie_netfs_data,
106 106
107 memset(&aux, 0, sizeof(aux)); 107 memset(&aux, 0, sizeof(aux));
108 aux.mtime = inode->i_mtime; 108 aux.mtime = inode->i_mtime;
109 aux.size = inode->i_size; 109 aux.size = i_size_read(inode);
110 110
111 memcpy(buffer, &aux, sizeof(aux)); 111 memcpy(buffer, &aux, sizeof(aux));
112 112
@@ -117,9 +117,7 @@ static void ceph_fscache_inode_get_attr(const void *cookie_netfs_data,
117 uint64_t *size) 117 uint64_t *size)
118{ 118{
119 const struct ceph_inode_info* ci = cookie_netfs_data; 119 const struct ceph_inode_info* ci = cookie_netfs_data;
120 const struct inode* inode = &ci->vfs_inode; 120 *size = i_size_read(&ci->vfs_inode);
121
122 *size = inode->i_size;
123} 121}
124 122
125static enum fscache_checkaux ceph_fscache_inode_check_aux( 123static enum fscache_checkaux ceph_fscache_inode_check_aux(
@@ -134,7 +132,7 @@ static enum fscache_checkaux ceph_fscache_inode_check_aux(
134 132
135 memset(&aux, 0, sizeof(aux)); 133 memset(&aux, 0, sizeof(aux));
136 aux.mtime = inode->i_mtime; 134 aux.mtime = inode->i_mtime;
137 aux.size = inode->i_size; 135 aux.size = i_size_read(inode);
138 136
139 if (memcmp(data, &aux, sizeof(aux)) != 0) 137 if (memcmp(data, &aux, sizeof(aux)) != 0)
140 return FSCACHE_CHECKAUX_OBSOLETE; 138 return FSCACHE_CHECKAUX_OBSOLETE;
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index 10c5ae79696e..86a9c383955e 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -397,8 +397,9 @@ int ceph_release(struct inode *inode, struct file *file)
397} 397}
398 398
399enum { 399enum {
400 CHECK_EOF = 1, 400 HAVE_RETRIED = 1,
401 READ_INLINE = 2, 401 CHECK_EOF = 2,
402 READ_INLINE = 3,
402}; 403};
403 404
404/* 405/*
@@ -411,17 +412,15 @@ enum {
411static int striped_read(struct inode *inode, 412static int striped_read(struct inode *inode,
412 u64 off, u64 len, 413 u64 off, u64 len,
413 struct page **pages, int num_pages, 414 struct page **pages, int num_pages,
414 int *checkeof, bool o_direct, 415 int *checkeof)
415 unsigned long buf_align)
416{ 416{
417 struct ceph_fs_client *fsc = ceph_inode_to_client(inode); 417 struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
418 struct ceph_inode_info *ci = ceph_inode(inode); 418 struct ceph_inode_info *ci = ceph_inode(inode);
419 u64 pos, this_len, left; 419 u64 pos, this_len, left;
420 int io_align, page_align; 420 loff_t i_size;
421 int pages_left; 421 int page_align, pages_left;
422 int read; 422 int read, ret;
423 struct page **page_pos; 423 struct page **page_pos;
424 int ret;
425 bool hit_stripe, was_short; 424 bool hit_stripe, was_short;
426 425
427 /* 426 /*
@@ -432,13 +431,9 @@ static int striped_read(struct inode *inode,
432 page_pos = pages; 431 page_pos = pages;
433 pages_left = num_pages; 432 pages_left = num_pages;
434 read = 0; 433 read = 0;
435 io_align = off & ~PAGE_MASK;
436 434
437more: 435more:
438 if (o_direct) 436 page_align = pos & ~PAGE_MASK;
439 page_align = (pos - io_align + buf_align) & ~PAGE_MASK;
440 else
441 page_align = pos & ~PAGE_MASK;
442 this_len = left; 437 this_len = left;
443 ret = ceph_osdc_readpages(&fsc->client->osdc, ceph_vino(inode), 438 ret = ceph_osdc_readpages(&fsc->client->osdc, ceph_vino(inode),
444 &ci->i_layout, pos, &this_len, 439 &ci->i_layout, pos, &this_len,
@@ -452,13 +447,12 @@ more:
452 dout("striped_read %llu~%llu (read %u) got %d%s%s\n", pos, left, read, 447 dout("striped_read %llu~%llu (read %u) got %d%s%s\n", pos, left, read,
453 ret, hit_stripe ? " HITSTRIPE" : "", was_short ? " SHORT" : ""); 448 ret, hit_stripe ? " HITSTRIPE" : "", was_short ? " SHORT" : "");
454 449
450 i_size = i_size_read(inode);
455 if (ret >= 0) { 451 if (ret >= 0) {
456 int didpages; 452 int didpages;
457 if (was_short && (pos + ret < inode->i_size)) { 453 if (was_short && (pos + ret < i_size)) {
458 int zlen = min(this_len - ret, 454 int zlen = min(this_len - ret, i_size - pos - ret);
459 inode->i_size - pos - ret); 455 int zoff = (off & ~PAGE_MASK) + read + ret;
460 int zoff = (o_direct ? buf_align : io_align) +
461 read + ret;
462 dout(" zero gap %llu to %llu\n", 456 dout(" zero gap %llu to %llu\n",
463 pos + ret, pos + ret + zlen); 457 pos + ret, pos + ret + zlen);
464 ceph_zero_page_vector_range(zoff, zlen, pages); 458 ceph_zero_page_vector_range(zoff, zlen, pages);
@@ -473,14 +467,14 @@ more:
473 pages_left -= didpages; 467 pages_left -= didpages;
474 468
475 /* hit stripe and need continue*/ 469 /* hit stripe and need continue*/
476 if (left && hit_stripe && pos < inode->i_size) 470 if (left && hit_stripe && pos < i_size)
477 goto more; 471 goto more;
478 } 472 }
479 473
480 if (read > 0) { 474 if (read > 0) {
481 ret = read; 475 ret = read;
482 /* did we bounce off eof? */ 476 /* did we bounce off eof? */
483 if (pos + left > inode->i_size) 477 if (pos + left > i_size)
484 *checkeof = CHECK_EOF; 478 *checkeof = CHECK_EOF;
485 } 479 }
486 480
@@ -521,54 +515,28 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *i,
521 if (ret < 0) 515 if (ret < 0)
522 return ret; 516 return ret;
523 517
524 if (iocb->ki_flags & IOCB_DIRECT) { 518 num_pages = calc_pages_for(off, len);
525 while (iov_iter_count(i)) { 519 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
526 size_t start; 520 if (IS_ERR(pages))
527 ssize_t n; 521 return PTR_ERR(pages);
528 522 ret = striped_read(inode, off, len, pages,
529 n = dio_get_pagev_size(i); 523 num_pages, checkeof);
530 pages = dio_get_pages_alloc(i, n, &start, &num_pages); 524 if (ret > 0) {
531 if (IS_ERR(pages)) 525 int l, k = 0;
532 return PTR_ERR(pages); 526 size_t left = ret;
533 527
534 ret = striped_read(inode, off, n, 528 while (left) {
535 pages, num_pages, checkeof, 529 size_t page_off = off & ~PAGE_MASK;
536 1, start); 530 size_t copy = min_t(size_t, left,
537 531 PAGE_SIZE - page_off);
538 ceph_put_page_vector(pages, num_pages, true); 532 l = copy_page_to_iter(pages[k++], page_off, copy, i);
539 533 off += l;
540 if (ret <= 0) 534 left -= l;
541 break; 535 if (l < copy)
542 off += ret;
543 iov_iter_advance(i, ret);
544 if (ret < n)
545 break; 536 break;
546 } 537 }
547 } else {
548 num_pages = calc_pages_for(off, len);
549 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
550 if (IS_ERR(pages))
551 return PTR_ERR(pages);
552 ret = striped_read(inode, off, len, pages,
553 num_pages, checkeof, 0, 0);
554 if (ret > 0) {
555 int l, k = 0;
556 size_t left = ret;
557
558 while (left) {
559 size_t page_off = off & ~PAGE_MASK;
560 size_t copy = min_t(size_t,
561 PAGE_SIZE - page_off, left);
562 l = copy_page_to_iter(pages[k++], page_off,
563 copy, i);
564 off += l;
565 left -= l;
566 if (l < copy)
567 break;
568 }
569 }
570 ceph_release_page_vector(pages, num_pages);
571 } 538 }
539 ceph_release_page_vector(pages, num_pages);
572 540
573 if (off > iocb->ki_pos) { 541 if (off > iocb->ki_pos) {
574 ret = off - iocb->ki_pos; 542 ret = off - iocb->ki_pos;
@@ -579,6 +547,193 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *i,
579 return ret; 547 return ret;
580} 548}
581 549
550struct ceph_aio_request {
551 struct kiocb *iocb;
552 size_t total_len;
553 int write;
554 int error;
555 struct list_head osd_reqs;
556 unsigned num_reqs;
557 atomic_t pending_reqs;
558 struct timespec mtime;
559 struct ceph_cap_flush *prealloc_cf;
560};
561
562struct ceph_aio_work {
563 struct work_struct work;
564 struct ceph_osd_request *req;
565};
566
567static void ceph_aio_retry_work(struct work_struct *work);
568
569static void ceph_aio_complete(struct inode *inode,
570 struct ceph_aio_request *aio_req)
571{
572 struct ceph_inode_info *ci = ceph_inode(inode);
573 int ret;
574
575 if (!atomic_dec_and_test(&aio_req->pending_reqs))
576 return;
577
578 ret = aio_req->error;
579 if (!ret)
580 ret = aio_req->total_len;
581
582 dout("ceph_aio_complete %p rc %d\n", inode, ret);
583
584 if (ret >= 0 && aio_req->write) {
585 int dirty;
586
587 loff_t endoff = aio_req->iocb->ki_pos + aio_req->total_len;
588 if (endoff > i_size_read(inode)) {
589 if (ceph_inode_set_size(inode, endoff))
590 ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL);
591 }
592
593 spin_lock(&ci->i_ceph_lock);
594 ci->i_inline_version = CEPH_INLINE_NONE;
595 dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR,
596 &aio_req->prealloc_cf);
597 spin_unlock(&ci->i_ceph_lock);
598 if (dirty)
599 __mark_inode_dirty(inode, dirty);
600
601 }
602
603 ceph_put_cap_refs(ci, (aio_req->write ? CEPH_CAP_FILE_WR :
604 CEPH_CAP_FILE_RD));
605
606 aio_req->iocb->ki_complete(aio_req->iocb, ret, 0);
607
608 ceph_free_cap_flush(aio_req->prealloc_cf);
609 kfree(aio_req);
610}
611
612static void ceph_aio_complete_req(struct ceph_osd_request *req,
613 struct ceph_msg *msg)
614{
615 int rc = req->r_result;
616 struct inode *inode = req->r_inode;
617 struct ceph_aio_request *aio_req = req->r_priv;
618 struct ceph_osd_data *osd_data = osd_req_op_extent_osd_data(req, 0);
619 int num_pages = calc_pages_for((u64)osd_data->alignment,
620 osd_data->length);
621
622 dout("ceph_aio_complete_req %p rc %d bytes %llu\n",
623 inode, rc, osd_data->length);
624
625 if (rc == -EOLDSNAPC) {
626 struct ceph_aio_work *aio_work;
627 BUG_ON(!aio_req->write);
628
629 aio_work = kmalloc(sizeof(*aio_work), GFP_NOFS);
630 if (aio_work) {
631 INIT_WORK(&aio_work->work, ceph_aio_retry_work);
632 aio_work->req = req;
633 queue_work(ceph_inode_to_client(inode)->wb_wq,
634 &aio_work->work);
635 return;
636 }
637 rc = -ENOMEM;
638 } else if (!aio_req->write) {
639 if (rc == -ENOENT)
640 rc = 0;
641 if (rc >= 0 && osd_data->length > rc) {
642 int zoff = osd_data->alignment + rc;
643 int zlen = osd_data->length - rc;
644 /*
645 * If read is satisfied by single OSD request,
646 * it can pass EOF. Otherwise read is within
647 * i_size.
648 */
649 if (aio_req->num_reqs == 1) {
650 loff_t i_size = i_size_read(inode);
651 loff_t endoff = aio_req->iocb->ki_pos + rc;
652 if (endoff < i_size)
653 zlen = min_t(size_t, zlen,
654 i_size - endoff);
655 aio_req->total_len = rc + zlen;
656 }
657
658 if (zlen > 0)
659 ceph_zero_page_vector_range(zoff, zlen,
660 osd_data->pages);
661 }
662 }
663
664 ceph_put_page_vector(osd_data->pages, num_pages, false);
665 ceph_osdc_put_request(req);
666
667 if (rc < 0)
668 cmpxchg(&aio_req->error, 0, rc);
669
670 ceph_aio_complete(inode, aio_req);
671 return;
672}
673
674static void ceph_aio_retry_work(struct work_struct *work)
675{
676 struct ceph_aio_work *aio_work =
677 container_of(work, struct ceph_aio_work, work);
678 struct ceph_osd_request *orig_req = aio_work->req;
679 struct ceph_aio_request *aio_req = orig_req->r_priv;
680 struct inode *inode = orig_req->r_inode;
681 struct ceph_inode_info *ci = ceph_inode(inode);
682 struct ceph_snap_context *snapc;
683 struct ceph_osd_request *req;
684 int ret;
685
686 spin_lock(&ci->i_ceph_lock);
687 if (__ceph_have_pending_cap_snap(ci)) {
688 struct ceph_cap_snap *capsnap =
689 list_last_entry(&ci->i_cap_snaps,
690 struct ceph_cap_snap,
691 ci_item);
692 snapc = ceph_get_snap_context(capsnap->context);
693 } else {
694 BUG_ON(!ci->i_head_snapc);
695 snapc = ceph_get_snap_context(ci->i_head_snapc);
696 }
697 spin_unlock(&ci->i_ceph_lock);
698
699 req = ceph_osdc_alloc_request(orig_req->r_osdc, snapc, 2,
700 false, GFP_NOFS);
701 if (IS_ERR(req)) {
702 ret = PTR_ERR(req);
703 req = orig_req;
704 goto out;
705 }
706
707 req->r_flags = CEPH_OSD_FLAG_ORDERSNAP |
708 CEPH_OSD_FLAG_ONDISK |
709 CEPH_OSD_FLAG_WRITE;
710 req->r_base_oloc = orig_req->r_base_oloc;
711 req->r_base_oid = orig_req->r_base_oid;
712
713 req->r_ops[0] = orig_req->r_ops[0];
714 osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC, 0);
715
716 ceph_osdc_build_request(req, req->r_ops[0].extent.offset,
717 snapc, CEPH_NOSNAP, &aio_req->mtime);
718
719 ceph_put_snap_context(snapc);
720 ceph_osdc_put_request(orig_req);
721
722 req->r_callback = ceph_aio_complete_req;
723 req->r_inode = inode;
724 req->r_priv = aio_req;
725
726 ret = ceph_osdc_start_request(req->r_osdc, req, false);
727out:
728 if (ret < 0) {
729 BUG_ON(ret == -EOLDSNAPC);
730 req->r_result = ret;
731 ceph_aio_complete_req(req, NULL);
732 }
733
734 kfree(aio_work);
735}
736
582/* 737/*
583 * Write commit request unsafe callback, called to tell us when a 738 * Write commit request unsafe callback, called to tell us when a
584 * request is unsafe (that is, in flight--has been handed to the 739 * request is unsafe (that is, in flight--has been handed to the
@@ -612,16 +767,10 @@ static void ceph_sync_write_unsafe(struct ceph_osd_request *req, bool unsafe)
612} 767}
613 768
614 769
615/*
616 * Synchronous write, straight from __user pointer or user pages.
617 *
618 * If write spans object boundary, just do multiple writes. (For a
619 * correct atomic write, we should e.g. take write locks on all
620 * objects, rollback on failure, etc.)
621 */
622static ssize_t 770static ssize_t
623ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos, 771ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
624 struct ceph_snap_context *snapc) 772 struct ceph_snap_context *snapc,
773 struct ceph_cap_flush **pcf)
625{ 774{
626 struct file *file = iocb->ki_filp; 775 struct file *file = iocb->ki_filp;
627 struct inode *inode = file_inode(file); 776 struct inode *inode = file_inode(file);
@@ -630,44 +779,52 @@ ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos,
630 struct ceph_vino vino; 779 struct ceph_vino vino;
631 struct ceph_osd_request *req; 780 struct ceph_osd_request *req;
632 struct page **pages; 781 struct page **pages;
633 int num_pages; 782 struct ceph_aio_request *aio_req = NULL;
634 int written = 0; 783 int num_pages = 0;
635 int flags; 784 int flags;
636 int check_caps = 0;
637 int ret; 785 int ret;
638 struct timespec mtime = CURRENT_TIME; 786 struct timespec mtime = CURRENT_TIME;
639 size_t count = iov_iter_count(from); 787 size_t count = iov_iter_count(iter);
788 loff_t pos = iocb->ki_pos;
789 bool write = iov_iter_rw(iter) == WRITE;
640 790
641 if (ceph_snap(file_inode(file)) != CEPH_NOSNAP) 791 if (write && ceph_snap(file_inode(file)) != CEPH_NOSNAP)
642 return -EROFS; 792 return -EROFS;
643 793
644 dout("sync_direct_write on file %p %lld~%u\n", file, pos, 794 dout("sync_direct_read_write (%s) on file %p %lld~%u\n",
645 (unsigned)count); 795 (write ? "write" : "read"), file, pos, (unsigned)count);
646 796
647 ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + count); 797 ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + count);
648 if (ret < 0) 798 if (ret < 0)
649 return ret; 799 return ret;
650 800
651 ret = invalidate_inode_pages2_range(inode->i_mapping, 801 if (write) {
652 pos >> PAGE_CACHE_SHIFT, 802 ret = invalidate_inode_pages2_range(inode->i_mapping,
653 (pos + count) >> PAGE_CACHE_SHIFT); 803 pos >> PAGE_CACHE_SHIFT,
654 if (ret < 0) 804 (pos + count) >> PAGE_CACHE_SHIFT);
655 dout("invalidate_inode_pages2_range returned %d\n", ret); 805 if (ret < 0)
806 dout("invalidate_inode_pages2_range returned %d\n", ret);
656 807
657 flags = CEPH_OSD_FLAG_ORDERSNAP | 808 flags = CEPH_OSD_FLAG_ORDERSNAP |
658 CEPH_OSD_FLAG_ONDISK | 809 CEPH_OSD_FLAG_ONDISK |
659 CEPH_OSD_FLAG_WRITE; 810 CEPH_OSD_FLAG_WRITE;
811 } else {
812 flags = CEPH_OSD_FLAG_READ;
813 }
660 814
661 while (iov_iter_count(from) > 0) { 815 while (iov_iter_count(iter) > 0) {
662 u64 len = dio_get_pagev_size(from); 816 u64 size = dio_get_pagev_size(iter);
663 size_t start; 817 size_t start = 0;
664 ssize_t n; 818 ssize_t len;
665 819
666 vino = ceph_vino(inode); 820 vino = ceph_vino(inode);
667 req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, 821 req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
668 vino, pos, &len, 0, 822 vino, pos, &size, 0,
669 2,/*include a 'startsync' command*/ 823 /*include a 'startsync' command*/
670 CEPH_OSD_OP_WRITE, flags, snapc, 824 write ? 2 : 1,
825 write ? CEPH_OSD_OP_WRITE :
826 CEPH_OSD_OP_READ,
827 flags, snapc,
671 ci->i_truncate_seq, 828 ci->i_truncate_seq,
672 ci->i_truncate_size, 829 ci->i_truncate_size,
673 false); 830 false);
@@ -676,10 +833,8 @@ ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos,
676 break; 833 break;
677 } 834 }
678 835
679 osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC, 0); 836 len = size;
680 837 pages = dio_get_pages_alloc(iter, len, &start, &num_pages);
681 n = len;
682 pages = dio_get_pages_alloc(from, len, &start, &num_pages);
683 if (IS_ERR(pages)) { 838 if (IS_ERR(pages)) {
684 ceph_osdc_put_request(req); 839 ceph_osdc_put_request(req);
685 ret = PTR_ERR(pages); 840 ret = PTR_ERR(pages);
@@ -687,47 +842,128 @@ ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos,
687 } 842 }
688 843
689 /* 844 /*
690 * throw out any page cache pages in this range. this 845 * To simplify error handling, allow AIO when IO within i_size
691 * may block. 846 * or IO can be satisfied by single OSD request.
692 */ 847 */
693 truncate_inode_pages_range(inode->i_mapping, pos, 848 if (pos == iocb->ki_pos && !is_sync_kiocb(iocb) &&
694 (pos+n) | (PAGE_CACHE_SIZE-1)); 849 (len == count || pos + count <= i_size_read(inode))) {
695 osd_req_op_extent_osd_data_pages(req, 0, pages, n, start, 850 aio_req = kzalloc(sizeof(*aio_req), GFP_KERNEL);
696 false, false); 851 if (aio_req) {
852 aio_req->iocb = iocb;
853 aio_req->write = write;
854 INIT_LIST_HEAD(&aio_req->osd_reqs);
855 if (write) {
856 aio_req->mtime = mtime;
857 swap(aio_req->prealloc_cf, *pcf);
858 }
859 }
860 /* ignore error */
861 }
862
863 if (write) {
864 /*
865 * throw out any page cache pages in this range. this
866 * may block.
867 */
868 truncate_inode_pages_range(inode->i_mapping, pos,
869 (pos+len) | (PAGE_CACHE_SIZE - 1));
870
871 osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC, 0);
872 }
873
874
875 osd_req_op_extent_osd_data_pages(req, 0, pages, len, start,
876 false, false);
697 877
698 /* BUG_ON(vino.snap != CEPH_NOSNAP); */
699 ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime); 878 ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime);
700 879
701 ret = ceph_osdc_start_request(&fsc->client->osdc, req, false); 880 if (aio_req) {
881 aio_req->total_len += len;
882 aio_req->num_reqs++;
883 atomic_inc(&aio_req->pending_reqs);
884
885 req->r_callback = ceph_aio_complete_req;
886 req->r_inode = inode;
887 req->r_priv = aio_req;
888 list_add_tail(&req->r_unsafe_item, &aio_req->osd_reqs);
889
890 pos += len;
891 iov_iter_advance(iter, len);
892 continue;
893 }
894
895 ret = ceph_osdc_start_request(req->r_osdc, req, false);
702 if (!ret) 896 if (!ret)
703 ret = ceph_osdc_wait_request(&fsc->client->osdc, req); 897 ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
704 898
899 size = i_size_read(inode);
900 if (!write) {
901 if (ret == -ENOENT)
902 ret = 0;
903 if (ret >= 0 && ret < len && pos + ret < size) {
904 int zlen = min_t(size_t, len - ret,
905 size - pos - ret);
906 ceph_zero_page_vector_range(start + ret, zlen,
907 pages);
908 ret += zlen;
909 }
910 if (ret >= 0)
911 len = ret;
912 }
913
705 ceph_put_page_vector(pages, num_pages, false); 914 ceph_put_page_vector(pages, num_pages, false);
706 915
707 ceph_osdc_put_request(req); 916 ceph_osdc_put_request(req);
708 if (ret) 917 if (ret < 0)
709 break; 918 break;
710 pos += n;
711 written += n;
712 iov_iter_advance(from, n);
713 919
714 if (pos > i_size_read(inode)) { 920 pos += len;
715 check_caps = ceph_inode_set_size(inode, pos); 921 iov_iter_advance(iter, len);
716 if (check_caps) 922
923 if (!write && pos >= size)
924 break;
925
926 if (write && pos > size) {
927 if (ceph_inode_set_size(inode, pos))
717 ceph_check_caps(ceph_inode(inode), 928 ceph_check_caps(ceph_inode(inode),
718 CHECK_CAPS_AUTHONLY, 929 CHECK_CAPS_AUTHONLY,
719 NULL); 930 NULL);
720 } 931 }
721 } 932 }
722 933
723 if (ret != -EOLDSNAPC && written > 0) { 934 if (aio_req) {
935 if (aio_req->num_reqs == 0) {
936 kfree(aio_req);
937 return ret;
938 }
939
940 ceph_get_cap_refs(ci, write ? CEPH_CAP_FILE_WR :
941 CEPH_CAP_FILE_RD);
942
943 while (!list_empty(&aio_req->osd_reqs)) {
944 req = list_first_entry(&aio_req->osd_reqs,
945 struct ceph_osd_request,
946 r_unsafe_item);
947 list_del_init(&req->r_unsafe_item);
948 if (ret >= 0)
949 ret = ceph_osdc_start_request(req->r_osdc,
950 req, false);
951 if (ret < 0) {
952 BUG_ON(ret == -EOLDSNAPC);
953 req->r_result = ret;
954 ceph_aio_complete_req(req, NULL);
955 }
956 }
957 return -EIOCBQUEUED;
958 }
959
960 if (ret != -EOLDSNAPC && pos > iocb->ki_pos) {
961 ret = pos - iocb->ki_pos;
724 iocb->ki_pos = pos; 962 iocb->ki_pos = pos;
725 ret = written;
726 } 963 }
727 return ret; 964 return ret;
728} 965}
729 966
730
731/* 967/*
732 * Synchronous write, straight from __user pointer or user pages. 968 * Synchronous write, straight from __user pointer or user pages.
733 * 969 *
@@ -897,8 +1133,14 @@ again:
897 ceph_cap_string(got)); 1133 ceph_cap_string(got));
898 1134
899 if (ci->i_inline_version == CEPH_INLINE_NONE) { 1135 if (ci->i_inline_version == CEPH_INLINE_NONE) {
900 /* hmm, this isn't really async... */ 1136 if (!retry_op && (iocb->ki_flags & IOCB_DIRECT)) {
901 ret = ceph_sync_read(iocb, to, &retry_op); 1137 ret = ceph_direct_read_write(iocb, to,
1138 NULL, NULL);
1139 if (ret >= 0 && ret < len)
1140 retry_op = CHECK_EOF;
1141 } else {
1142 ret = ceph_sync_read(iocb, to, &retry_op);
1143 }
902 } else { 1144 } else {
903 retry_op = READ_INLINE; 1145 retry_op = READ_INLINE;
904 } 1146 }
@@ -916,7 +1158,7 @@ again:
916 pinned_page = NULL; 1158 pinned_page = NULL;
917 } 1159 }
918 ceph_put_cap_refs(ci, got); 1160 ceph_put_cap_refs(ci, got);
919 if (retry_op && ret >= 0) { 1161 if (retry_op > HAVE_RETRIED && ret >= 0) {
920 int statret; 1162 int statret;
921 struct page *page = NULL; 1163 struct page *page = NULL;
922 loff_t i_size; 1164 loff_t i_size;
@@ -968,12 +1210,11 @@ again:
968 if (retry_op == CHECK_EOF && iocb->ki_pos < i_size && 1210 if (retry_op == CHECK_EOF && iocb->ki_pos < i_size &&
969 ret < len) { 1211 ret < len) {
970 dout("sync_read hit hole, ppos %lld < size %lld" 1212 dout("sync_read hit hole, ppos %lld < size %lld"
971 ", reading more\n", iocb->ki_pos, 1213 ", reading more\n", iocb->ki_pos, i_size);
972 inode->i_size);
973 1214
974 read += ret; 1215 read += ret;
975 len -= ret; 1216 len -= ret;
976 retry_op = 0; 1217 retry_op = HAVE_RETRIED;
977 goto again; 1218 goto again;
978 } 1219 }
979 } 1220 }
@@ -1052,7 +1293,7 @@ retry_snap:
1052 } 1293 }
1053 1294
1054 dout("aio_write %p %llx.%llx %llu~%zd getting caps. i_size %llu\n", 1295 dout("aio_write %p %llx.%llx %llu~%zd getting caps. i_size %llu\n",
1055 inode, ceph_vinop(inode), pos, count, inode->i_size); 1296 inode, ceph_vinop(inode), pos, count, i_size_read(inode));
1056 if (fi->fmode & CEPH_FILE_MODE_LAZY) 1297 if (fi->fmode & CEPH_FILE_MODE_LAZY)
1057 want = CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO; 1298 want = CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO;
1058 else 1299 else
@@ -1088,8 +1329,8 @@ retry_snap:
1088 /* we might need to revert back to that point */ 1329 /* we might need to revert back to that point */
1089 data = *from; 1330 data = *from;
1090 if (iocb->ki_flags & IOCB_DIRECT) 1331 if (iocb->ki_flags & IOCB_DIRECT)
1091 written = ceph_sync_direct_write(iocb, &data, pos, 1332 written = ceph_direct_read_write(iocb, &data, snapc,
1092 snapc); 1333 &prealloc_cf);
1093 else 1334 else
1094 written = ceph_sync_write(iocb, &data, pos, snapc); 1335 written = ceph_sync_write(iocb, &data, pos, snapc);
1095 if (written == -EOLDSNAPC) { 1336 if (written == -EOLDSNAPC) {
@@ -1104,7 +1345,7 @@ retry_snap:
1104 iov_iter_advance(from, written); 1345 iov_iter_advance(from, written);
1105 ceph_put_snap_context(snapc); 1346 ceph_put_snap_context(snapc);
1106 } else { 1347 } else {
1107 loff_t old_size = inode->i_size; 1348 loff_t old_size = i_size_read(inode);
1108 /* 1349 /*
1109 * No need to acquire the i_truncate_mutex. Because 1350 * No need to acquire the i_truncate_mutex. Because
1110 * the MDS revokes Fwb caps before sending truncate 1351 * the MDS revokes Fwb caps before sending truncate
@@ -1115,7 +1356,7 @@ retry_snap:
1115 written = generic_perform_write(file, from, pos); 1356 written = generic_perform_write(file, from, pos);
1116 if (likely(written >= 0)) 1357 if (likely(written >= 0))
1117 iocb->ki_pos = pos + written; 1358 iocb->ki_pos = pos + written;
1118 if (inode->i_size > old_size) 1359 if (i_size_read(inode) > old_size)
1119 ceph_fscache_update_objectsize(inode); 1360 ceph_fscache_update_objectsize(inode);
1120 inode_unlock(inode); 1361 inode_unlock(inode);
1121 } 1362 }
@@ -1160,6 +1401,7 @@ out_unlocked:
1160static loff_t ceph_llseek(struct file *file, loff_t offset, int whence) 1401static loff_t ceph_llseek(struct file *file, loff_t offset, int whence)
1161{ 1402{
1162 struct inode *inode = file->f_mapping->host; 1403 struct inode *inode = file->f_mapping->host;
1404 loff_t i_size;
1163 int ret; 1405 int ret;
1164 1406
1165 inode_lock(inode); 1407 inode_lock(inode);
@@ -1172,9 +1414,10 @@ static loff_t ceph_llseek(struct file *file, loff_t offset, int whence)
1172 } 1414 }
1173 } 1415 }
1174 1416
1417 i_size = i_size_read(inode);
1175 switch (whence) { 1418 switch (whence) {
1176 case SEEK_END: 1419 case SEEK_END:
1177 offset += inode->i_size; 1420 offset += i_size;
1178 break; 1421 break;
1179 case SEEK_CUR: 1422 case SEEK_CUR:
1180 /* 1423 /*
@@ -1190,17 +1433,17 @@ static loff_t ceph_llseek(struct file *file, loff_t offset, int whence)
1190 offset += file->f_pos; 1433 offset += file->f_pos;
1191 break; 1434 break;
1192 case SEEK_DATA: 1435 case SEEK_DATA:
1193 if (offset >= inode->i_size) { 1436 if (offset >= i_size) {
1194 ret = -ENXIO; 1437 ret = -ENXIO;
1195 goto out; 1438 goto out;
1196 } 1439 }
1197 break; 1440 break;
1198 case SEEK_HOLE: 1441 case SEEK_HOLE:
1199 if (offset >= inode->i_size) { 1442 if (offset >= i_size) {
1200 ret = -ENXIO; 1443 ret = -ENXIO;
1201 goto out; 1444 goto out;
1202 } 1445 }
1203 offset = inode->i_size; 1446 offset = i_size;
1204 break; 1447 break;
1205 } 1448 }
1206 1449
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index da55eb8bcffa..fb4ba2e4e2a5 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -548,7 +548,7 @@ int ceph_fill_file_size(struct inode *inode, int issued,
548 if (ceph_seq_cmp(truncate_seq, ci->i_truncate_seq) > 0 || 548 if (ceph_seq_cmp(truncate_seq, ci->i_truncate_seq) > 0 ||
549 (truncate_seq == ci->i_truncate_seq && size > inode->i_size)) { 549 (truncate_seq == ci->i_truncate_seq && size > inode->i_size)) {
550 dout("size %lld -> %llu\n", inode->i_size, size); 550 dout("size %lld -> %llu\n", inode->i_size, size);
551 inode->i_size = size; 551 i_size_write(inode, size);
552 inode->i_blocks = (size + (1<<9) - 1) >> 9; 552 inode->i_blocks = (size + (1<<9) - 1) >> 9;
553 ci->i_reported_size = size; 553 ci->i_reported_size = size;
554 if (truncate_seq != ci->i_truncate_seq) { 554 if (truncate_seq != ci->i_truncate_seq) {
@@ -808,7 +808,7 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
808 spin_unlock(&ci->i_ceph_lock); 808 spin_unlock(&ci->i_ceph_lock);
809 809
810 err = -EINVAL; 810 err = -EINVAL;
811 if (WARN_ON(symlen != inode->i_size)) 811 if (WARN_ON(symlen != i_size_read(inode)))
812 goto out; 812 goto out;
813 813
814 err = -ENOMEM; 814 err = -ENOMEM;
@@ -1549,7 +1549,7 @@ int ceph_inode_set_size(struct inode *inode, loff_t size)
1549 1549
1550 spin_lock(&ci->i_ceph_lock); 1550 spin_lock(&ci->i_ceph_lock);
1551 dout("set_size %p %llu -> %llu\n", inode, inode->i_size, size); 1551 dout("set_size %p %llu -> %llu\n", inode, inode->i_size, size);
1552 inode->i_size = size; 1552 i_size_write(inode, size);
1553 inode->i_blocks = (size + (1 << 9) - 1) >> 9; 1553 inode->i_blocks = (size + (1 << 9) - 1) >> 9;
1554 1554
1555 /* tell the MDS if we are approaching max_size */ 1555 /* tell the MDS if we are approaching max_size */
@@ -1911,7 +1911,7 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr)
1911 inode->i_size, attr->ia_size); 1911 inode->i_size, attr->ia_size);
1912 if ((issued & CEPH_CAP_FILE_EXCL) && 1912 if ((issued & CEPH_CAP_FILE_EXCL) &&
1913 attr->ia_size > inode->i_size) { 1913 attr->ia_size > inode->i_size) {
1914 inode->i_size = attr->ia_size; 1914 i_size_write(inode, attr->ia_size);
1915 inode->i_blocks = 1915 inode->i_blocks =
1916 (attr->ia_size + (1 << 9) - 1) >> 9; 1916 (attr->ia_size + (1 << 9) - 1) >> 9;
1917 inode->i_ctime = attr->ia_ctime; 1917 inode->i_ctime = attr->ia_ctime;