aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--drivers/block/rbd.c3
-rw-r--r--fs/ceph/addr.c14
-rw-r--r--fs/ceph/cache.c8
-rw-r--r--fs/ceph/file.c509
-rw-r--r--fs/ceph/inode.c8
-rw-r--r--include/linux/ceph/ceph_frag.h37
-rw-r--r--include/linux/ceph/messenger.h2
-rw-r--r--net/ceph/auth_x.c49
-rw-r--r--net/ceph/auth_x.h2
-rw-r--r--net/ceph/messenger.c105
-rw-r--r--net/ceph/mon_client.c4
11 files changed, 501 insertions, 240 deletions
diff --git a/drivers/block/rbd.c b/drivers/block/rbd.c
index 81ea69fee7ca..4a876785b68c 100644
--- a/drivers/block/rbd.c
+++ b/drivers/block/rbd.c
@@ -5185,8 +5185,7 @@ static int rbd_dev_probe_parent(struct rbd_device *rbd_dev, int depth)
5185 5185
5186out_err: 5186out_err:
5187 rbd_dev_unparent(rbd_dev); 5187 rbd_dev_unparent(rbd_dev);
5188 if (parent) 5188 rbd_dev_destroy(parent);
5189 rbd_dev_destroy(parent);
5190 return ret; 5189 return ret;
5191} 5190}
5192 5191
diff --git a/fs/ceph/addr.c b/fs/ceph/addr.c
index b7d218a168fb..c22213789090 100644
--- a/fs/ceph/addr.c
+++ b/fs/ceph/addr.c
@@ -1108,7 +1108,7 @@ retry_locked:
1108 return 0; 1108 return 0;
1109 1109
1110 /* past end of file? */ 1110 /* past end of file? */
1111 i_size = inode->i_size; /* caller holds i_mutex */ 1111 i_size = i_size_read(inode);
1112 1112
1113 if (page_off >= i_size || 1113 if (page_off >= i_size ||
1114 (pos_in_page == 0 && (pos+len) >= i_size && 1114 (pos_in_page == 0 && (pos+len) >= i_size &&
@@ -1149,7 +1149,6 @@ static int ceph_write_begin(struct file *file, struct address_space *mapping,
1149 page = grab_cache_page_write_begin(mapping, index, 0); 1149 page = grab_cache_page_write_begin(mapping, index, 0);
1150 if (!page) 1150 if (!page)
1151 return -ENOMEM; 1151 return -ENOMEM;
1152 *pagep = page;
1153 1152
1154 dout("write_begin file %p inode %p page %p %d~%d\n", file, 1153 dout("write_begin file %p inode %p page %p %d~%d\n", file,
1155 inode, page, (int)pos, (int)len); 1154 inode, page, (int)pos, (int)len);
@@ -1184,8 +1183,7 @@ static int ceph_write_end(struct file *file, struct address_space *mapping,
1184 zero_user_segment(page, from+copied, len); 1183 zero_user_segment(page, from+copied, len);
1185 1184
1186 /* did file size increase? */ 1185 /* did file size increase? */
1187 /* (no need for i_size_read(); we caller holds i_mutex */ 1186 if (pos+copied > i_size_read(inode))
1188 if (pos+copied > inode->i_size)
1189 check_cap = ceph_inode_set_size(inode, pos+copied); 1187 check_cap = ceph_inode_set_size(inode, pos+copied);
1190 1188
1191 if (!PageUptodate(page)) 1189 if (!PageUptodate(page))
@@ -1378,11 +1376,13 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf)
1378 1376
1379 ret = VM_FAULT_NOPAGE; 1377 ret = VM_FAULT_NOPAGE;
1380 if ((off > size) || 1378 if ((off > size) ||
1381 (page->mapping != inode->i_mapping)) 1379 (page->mapping != inode->i_mapping)) {
1380 unlock_page(page);
1382 goto out; 1381 goto out;
1382 }
1383 1383
1384 ret = ceph_update_writeable_page(vma->vm_file, off, len, page); 1384 ret = ceph_update_writeable_page(vma->vm_file, off, len, page);
1385 if (ret == 0) { 1385 if (ret >= 0) {
1386 /* success. we'll keep the page locked. */ 1386 /* success. we'll keep the page locked. */
1387 set_page_dirty(page); 1387 set_page_dirty(page);
1388 ret = VM_FAULT_LOCKED; 1388 ret = VM_FAULT_LOCKED;
@@ -1393,8 +1393,6 @@ static int ceph_page_mkwrite(struct vm_area_struct *vma, struct vm_fault *vmf)
1393 ret = VM_FAULT_SIGBUS; 1393 ret = VM_FAULT_SIGBUS;
1394 } 1394 }
1395out: 1395out:
1396 if (ret != VM_FAULT_LOCKED)
1397 unlock_page(page);
1398 if (ret == VM_FAULT_LOCKED || 1396 if (ret == VM_FAULT_LOCKED ||
1399 ci->i_inline_version != CEPH_INLINE_NONE) { 1397 ci->i_inline_version != CEPH_INLINE_NONE) {
1400 int dirty; 1398 int dirty;
diff --git a/fs/ceph/cache.c b/fs/ceph/cache.c
index 7680e2626815..a351480dbabc 100644
--- a/fs/ceph/cache.c
+++ b/fs/ceph/cache.c
@@ -106,7 +106,7 @@ static uint16_t ceph_fscache_inode_get_aux(const void *cookie_netfs_data,
106 106
107 memset(&aux, 0, sizeof(aux)); 107 memset(&aux, 0, sizeof(aux));
108 aux.mtime = inode->i_mtime; 108 aux.mtime = inode->i_mtime;
109 aux.size = inode->i_size; 109 aux.size = i_size_read(inode);
110 110
111 memcpy(buffer, &aux, sizeof(aux)); 111 memcpy(buffer, &aux, sizeof(aux));
112 112
@@ -117,9 +117,7 @@ static void ceph_fscache_inode_get_attr(const void *cookie_netfs_data,
117 uint64_t *size) 117 uint64_t *size)
118{ 118{
119 const struct ceph_inode_info* ci = cookie_netfs_data; 119 const struct ceph_inode_info* ci = cookie_netfs_data;
120 const struct inode* inode = &ci->vfs_inode; 120 *size = i_size_read(&ci->vfs_inode);
121
122 *size = inode->i_size;
123} 121}
124 122
125static enum fscache_checkaux ceph_fscache_inode_check_aux( 123static enum fscache_checkaux ceph_fscache_inode_check_aux(
@@ -134,7 +132,7 @@ static enum fscache_checkaux ceph_fscache_inode_check_aux(
134 132
135 memset(&aux, 0, sizeof(aux)); 133 memset(&aux, 0, sizeof(aux));
136 aux.mtime = inode->i_mtime; 134 aux.mtime = inode->i_mtime;
137 aux.size = inode->i_size; 135 aux.size = i_size_read(inode);
138 136
139 if (memcmp(data, &aux, sizeof(aux)) != 0) 137 if (memcmp(data, &aux, sizeof(aux)) != 0)
140 return FSCACHE_CHECKAUX_OBSOLETE; 138 return FSCACHE_CHECKAUX_OBSOLETE;
diff --git a/fs/ceph/file.c b/fs/ceph/file.c
index 10c5ae79696e..86a9c383955e 100644
--- a/fs/ceph/file.c
+++ b/fs/ceph/file.c
@@ -397,8 +397,9 @@ int ceph_release(struct inode *inode, struct file *file)
397} 397}
398 398
399enum { 399enum {
400 CHECK_EOF = 1, 400 HAVE_RETRIED = 1,
401 READ_INLINE = 2, 401 CHECK_EOF = 2,
402 READ_INLINE = 3,
402}; 403};
403 404
404/* 405/*
@@ -411,17 +412,15 @@ enum {
411static int striped_read(struct inode *inode, 412static int striped_read(struct inode *inode,
412 u64 off, u64 len, 413 u64 off, u64 len,
413 struct page **pages, int num_pages, 414 struct page **pages, int num_pages,
414 int *checkeof, bool o_direct, 415 int *checkeof)
415 unsigned long buf_align)
416{ 416{
417 struct ceph_fs_client *fsc = ceph_inode_to_client(inode); 417 struct ceph_fs_client *fsc = ceph_inode_to_client(inode);
418 struct ceph_inode_info *ci = ceph_inode(inode); 418 struct ceph_inode_info *ci = ceph_inode(inode);
419 u64 pos, this_len, left; 419 u64 pos, this_len, left;
420 int io_align, page_align; 420 loff_t i_size;
421 int pages_left; 421 int page_align, pages_left;
422 int read; 422 int read, ret;
423 struct page **page_pos; 423 struct page **page_pos;
424 int ret;
425 bool hit_stripe, was_short; 424 bool hit_stripe, was_short;
426 425
427 /* 426 /*
@@ -432,13 +431,9 @@ static int striped_read(struct inode *inode,
432 page_pos = pages; 431 page_pos = pages;
433 pages_left = num_pages; 432 pages_left = num_pages;
434 read = 0; 433 read = 0;
435 io_align = off & ~PAGE_MASK;
436 434
437more: 435more:
438 if (o_direct) 436 page_align = pos & ~PAGE_MASK;
439 page_align = (pos - io_align + buf_align) & ~PAGE_MASK;
440 else
441 page_align = pos & ~PAGE_MASK;
442 this_len = left; 437 this_len = left;
443 ret = ceph_osdc_readpages(&fsc->client->osdc, ceph_vino(inode), 438 ret = ceph_osdc_readpages(&fsc->client->osdc, ceph_vino(inode),
444 &ci->i_layout, pos, &this_len, 439 &ci->i_layout, pos, &this_len,
@@ -452,13 +447,12 @@ more:
452 dout("striped_read %llu~%llu (read %u) got %d%s%s\n", pos, left, read, 447 dout("striped_read %llu~%llu (read %u) got %d%s%s\n", pos, left, read,
453 ret, hit_stripe ? " HITSTRIPE" : "", was_short ? " SHORT" : ""); 448 ret, hit_stripe ? " HITSTRIPE" : "", was_short ? " SHORT" : "");
454 449
450 i_size = i_size_read(inode);
455 if (ret >= 0) { 451 if (ret >= 0) {
456 int didpages; 452 int didpages;
457 if (was_short && (pos + ret < inode->i_size)) { 453 if (was_short && (pos + ret < i_size)) {
458 int zlen = min(this_len - ret, 454 int zlen = min(this_len - ret, i_size - pos - ret);
459 inode->i_size - pos - ret); 455 int zoff = (off & ~PAGE_MASK) + read + ret;
460 int zoff = (o_direct ? buf_align : io_align) +
461 read + ret;
462 dout(" zero gap %llu to %llu\n", 456 dout(" zero gap %llu to %llu\n",
463 pos + ret, pos + ret + zlen); 457 pos + ret, pos + ret + zlen);
464 ceph_zero_page_vector_range(zoff, zlen, pages); 458 ceph_zero_page_vector_range(zoff, zlen, pages);
@@ -473,14 +467,14 @@ more:
473 pages_left -= didpages; 467 pages_left -= didpages;
474 468
475 /* hit stripe and need continue*/ 469 /* hit stripe and need continue*/
476 if (left && hit_stripe && pos < inode->i_size) 470 if (left && hit_stripe && pos < i_size)
477 goto more; 471 goto more;
478 } 472 }
479 473
480 if (read > 0) { 474 if (read > 0) {
481 ret = read; 475 ret = read;
482 /* did we bounce off eof? */ 476 /* did we bounce off eof? */
483 if (pos + left > inode->i_size) 477 if (pos + left > i_size)
484 *checkeof = CHECK_EOF; 478 *checkeof = CHECK_EOF;
485 } 479 }
486 480
@@ -521,54 +515,28 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *i,
521 if (ret < 0) 515 if (ret < 0)
522 return ret; 516 return ret;
523 517
524 if (iocb->ki_flags & IOCB_DIRECT) { 518 num_pages = calc_pages_for(off, len);
525 while (iov_iter_count(i)) { 519 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
526 size_t start; 520 if (IS_ERR(pages))
527 ssize_t n; 521 return PTR_ERR(pages);
528 522 ret = striped_read(inode, off, len, pages,
529 n = dio_get_pagev_size(i); 523 num_pages, checkeof);
530 pages = dio_get_pages_alloc(i, n, &start, &num_pages); 524 if (ret > 0) {
531 if (IS_ERR(pages)) 525 int l, k = 0;
532 return PTR_ERR(pages); 526 size_t left = ret;
533 527
534 ret = striped_read(inode, off, n, 528 while (left) {
535 pages, num_pages, checkeof, 529 size_t page_off = off & ~PAGE_MASK;
536 1, start); 530 size_t copy = min_t(size_t, left,
537 531 PAGE_SIZE - page_off);
538 ceph_put_page_vector(pages, num_pages, true); 532 l = copy_page_to_iter(pages[k++], page_off, copy, i);
539 533 off += l;
540 if (ret <= 0) 534 left -= l;
541 break; 535 if (l < copy)
542 off += ret;
543 iov_iter_advance(i, ret);
544 if (ret < n)
545 break; 536 break;
546 } 537 }
547 } else {
548 num_pages = calc_pages_for(off, len);
549 pages = ceph_alloc_page_vector(num_pages, GFP_KERNEL);
550 if (IS_ERR(pages))
551 return PTR_ERR(pages);
552 ret = striped_read(inode, off, len, pages,
553 num_pages, checkeof, 0, 0);
554 if (ret > 0) {
555 int l, k = 0;
556 size_t left = ret;
557
558 while (left) {
559 size_t page_off = off & ~PAGE_MASK;
560 size_t copy = min_t(size_t,
561 PAGE_SIZE - page_off, left);
562 l = copy_page_to_iter(pages[k++], page_off,
563 copy, i);
564 off += l;
565 left -= l;
566 if (l < copy)
567 break;
568 }
569 }
570 ceph_release_page_vector(pages, num_pages);
571 } 538 }
539 ceph_release_page_vector(pages, num_pages);
572 540
573 if (off > iocb->ki_pos) { 541 if (off > iocb->ki_pos) {
574 ret = off - iocb->ki_pos; 542 ret = off - iocb->ki_pos;
@@ -579,6 +547,193 @@ static ssize_t ceph_sync_read(struct kiocb *iocb, struct iov_iter *i,
579 return ret; 547 return ret;
580} 548}
581 549
550struct ceph_aio_request {
551 struct kiocb *iocb;
552 size_t total_len;
553 int write;
554 int error;
555 struct list_head osd_reqs;
556 unsigned num_reqs;
557 atomic_t pending_reqs;
558 struct timespec mtime;
559 struct ceph_cap_flush *prealloc_cf;
560};
561
562struct ceph_aio_work {
563 struct work_struct work;
564 struct ceph_osd_request *req;
565};
566
567static void ceph_aio_retry_work(struct work_struct *work);
568
569static void ceph_aio_complete(struct inode *inode,
570 struct ceph_aio_request *aio_req)
571{
572 struct ceph_inode_info *ci = ceph_inode(inode);
573 int ret;
574
575 if (!atomic_dec_and_test(&aio_req->pending_reqs))
576 return;
577
578 ret = aio_req->error;
579 if (!ret)
580 ret = aio_req->total_len;
581
582 dout("ceph_aio_complete %p rc %d\n", inode, ret);
583
584 if (ret >= 0 && aio_req->write) {
585 int dirty;
586
587 loff_t endoff = aio_req->iocb->ki_pos + aio_req->total_len;
588 if (endoff > i_size_read(inode)) {
589 if (ceph_inode_set_size(inode, endoff))
590 ceph_check_caps(ci, CHECK_CAPS_AUTHONLY, NULL);
591 }
592
593 spin_lock(&ci->i_ceph_lock);
594 ci->i_inline_version = CEPH_INLINE_NONE;
595 dirty = __ceph_mark_dirty_caps(ci, CEPH_CAP_FILE_WR,
596 &aio_req->prealloc_cf);
597 spin_unlock(&ci->i_ceph_lock);
598 if (dirty)
599 __mark_inode_dirty(inode, dirty);
600
601 }
602
603 ceph_put_cap_refs(ci, (aio_req->write ? CEPH_CAP_FILE_WR :
604 CEPH_CAP_FILE_RD));
605
606 aio_req->iocb->ki_complete(aio_req->iocb, ret, 0);
607
608 ceph_free_cap_flush(aio_req->prealloc_cf);
609 kfree(aio_req);
610}
611
612static void ceph_aio_complete_req(struct ceph_osd_request *req,
613 struct ceph_msg *msg)
614{
615 int rc = req->r_result;
616 struct inode *inode = req->r_inode;
617 struct ceph_aio_request *aio_req = req->r_priv;
618 struct ceph_osd_data *osd_data = osd_req_op_extent_osd_data(req, 0);
619 int num_pages = calc_pages_for((u64)osd_data->alignment,
620 osd_data->length);
621
622 dout("ceph_aio_complete_req %p rc %d bytes %llu\n",
623 inode, rc, osd_data->length);
624
625 if (rc == -EOLDSNAPC) {
626 struct ceph_aio_work *aio_work;
627 BUG_ON(!aio_req->write);
628
629 aio_work = kmalloc(sizeof(*aio_work), GFP_NOFS);
630 if (aio_work) {
631 INIT_WORK(&aio_work->work, ceph_aio_retry_work);
632 aio_work->req = req;
633 queue_work(ceph_inode_to_client(inode)->wb_wq,
634 &aio_work->work);
635 return;
636 }
637 rc = -ENOMEM;
638 } else if (!aio_req->write) {
639 if (rc == -ENOENT)
640 rc = 0;
641 if (rc >= 0 && osd_data->length > rc) {
642 int zoff = osd_data->alignment + rc;
643 int zlen = osd_data->length - rc;
644 /*
645 * If read is satisfied by single OSD request,
646 * it can pass EOF. Otherwise read is within
647 * i_size.
648 */
649 if (aio_req->num_reqs == 1) {
650 loff_t i_size = i_size_read(inode);
651 loff_t endoff = aio_req->iocb->ki_pos + rc;
652 if (endoff < i_size)
653 zlen = min_t(size_t, zlen,
654 i_size - endoff);
655 aio_req->total_len = rc + zlen;
656 }
657
658 if (zlen > 0)
659 ceph_zero_page_vector_range(zoff, zlen,
660 osd_data->pages);
661 }
662 }
663
664 ceph_put_page_vector(osd_data->pages, num_pages, false);
665 ceph_osdc_put_request(req);
666
667 if (rc < 0)
668 cmpxchg(&aio_req->error, 0, rc);
669
670 ceph_aio_complete(inode, aio_req);
671 return;
672}
673
674static void ceph_aio_retry_work(struct work_struct *work)
675{
676 struct ceph_aio_work *aio_work =
677 container_of(work, struct ceph_aio_work, work);
678 struct ceph_osd_request *orig_req = aio_work->req;
679 struct ceph_aio_request *aio_req = orig_req->r_priv;
680 struct inode *inode = orig_req->r_inode;
681 struct ceph_inode_info *ci = ceph_inode(inode);
682 struct ceph_snap_context *snapc;
683 struct ceph_osd_request *req;
684 int ret;
685
686 spin_lock(&ci->i_ceph_lock);
687 if (__ceph_have_pending_cap_snap(ci)) {
688 struct ceph_cap_snap *capsnap =
689 list_last_entry(&ci->i_cap_snaps,
690 struct ceph_cap_snap,
691 ci_item);
692 snapc = ceph_get_snap_context(capsnap->context);
693 } else {
694 BUG_ON(!ci->i_head_snapc);
695 snapc = ceph_get_snap_context(ci->i_head_snapc);
696 }
697 spin_unlock(&ci->i_ceph_lock);
698
699 req = ceph_osdc_alloc_request(orig_req->r_osdc, snapc, 2,
700 false, GFP_NOFS);
701 if (IS_ERR(req)) {
702 ret = PTR_ERR(req);
703 req = orig_req;
704 goto out;
705 }
706
707 req->r_flags = CEPH_OSD_FLAG_ORDERSNAP |
708 CEPH_OSD_FLAG_ONDISK |
709 CEPH_OSD_FLAG_WRITE;
710 req->r_base_oloc = orig_req->r_base_oloc;
711 req->r_base_oid = orig_req->r_base_oid;
712
713 req->r_ops[0] = orig_req->r_ops[0];
714 osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC, 0);
715
716 ceph_osdc_build_request(req, req->r_ops[0].extent.offset,
717 snapc, CEPH_NOSNAP, &aio_req->mtime);
718
719 ceph_put_snap_context(snapc);
720 ceph_osdc_put_request(orig_req);
721
722 req->r_callback = ceph_aio_complete_req;
723 req->r_inode = inode;
724 req->r_priv = aio_req;
725
726 ret = ceph_osdc_start_request(req->r_osdc, req, false);
727out:
728 if (ret < 0) {
729 BUG_ON(ret == -EOLDSNAPC);
730 req->r_result = ret;
731 ceph_aio_complete_req(req, NULL);
732 }
733
734 kfree(aio_work);
735}
736
582/* 737/*
583 * Write commit request unsafe callback, called to tell us when a 738 * Write commit request unsafe callback, called to tell us when a
584 * request is unsafe (that is, in flight--has been handed to the 739 * request is unsafe (that is, in flight--has been handed to the
@@ -612,16 +767,10 @@ static void ceph_sync_write_unsafe(struct ceph_osd_request *req, bool unsafe)
612} 767}
613 768
614 769
615/*
616 * Synchronous write, straight from __user pointer or user pages.
617 *
618 * If write spans object boundary, just do multiple writes. (For a
619 * correct atomic write, we should e.g. take write locks on all
620 * objects, rollback on failure, etc.)
621 */
622static ssize_t 770static ssize_t
623ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos, 771ceph_direct_read_write(struct kiocb *iocb, struct iov_iter *iter,
624 struct ceph_snap_context *snapc) 772 struct ceph_snap_context *snapc,
773 struct ceph_cap_flush **pcf)
625{ 774{
626 struct file *file = iocb->ki_filp; 775 struct file *file = iocb->ki_filp;
627 struct inode *inode = file_inode(file); 776 struct inode *inode = file_inode(file);
@@ -630,44 +779,52 @@ ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos,
630 struct ceph_vino vino; 779 struct ceph_vino vino;
631 struct ceph_osd_request *req; 780 struct ceph_osd_request *req;
632 struct page **pages; 781 struct page **pages;
633 int num_pages; 782 struct ceph_aio_request *aio_req = NULL;
634 int written = 0; 783 int num_pages = 0;
635 int flags; 784 int flags;
636 int check_caps = 0;
637 int ret; 785 int ret;
638 struct timespec mtime = CURRENT_TIME; 786 struct timespec mtime = CURRENT_TIME;
639 size_t count = iov_iter_count(from); 787 size_t count = iov_iter_count(iter);
788 loff_t pos = iocb->ki_pos;
789 bool write = iov_iter_rw(iter) == WRITE;
640 790
641 if (ceph_snap(file_inode(file)) != CEPH_NOSNAP) 791 if (write && ceph_snap(file_inode(file)) != CEPH_NOSNAP)
642 return -EROFS; 792 return -EROFS;
643 793
644 dout("sync_direct_write on file %p %lld~%u\n", file, pos, 794 dout("sync_direct_read_write (%s) on file %p %lld~%u\n",
645 (unsigned)count); 795 (write ? "write" : "read"), file, pos, (unsigned)count);
646 796
647 ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + count); 797 ret = filemap_write_and_wait_range(inode->i_mapping, pos, pos + count);
648 if (ret < 0) 798 if (ret < 0)
649 return ret; 799 return ret;
650 800
651 ret = invalidate_inode_pages2_range(inode->i_mapping, 801 if (write) {
652 pos >> PAGE_CACHE_SHIFT, 802 ret = invalidate_inode_pages2_range(inode->i_mapping,
653 (pos + count) >> PAGE_CACHE_SHIFT); 803 pos >> PAGE_CACHE_SHIFT,
654 if (ret < 0) 804 (pos + count) >> PAGE_CACHE_SHIFT);
655 dout("invalidate_inode_pages2_range returned %d\n", ret); 805 if (ret < 0)
806 dout("invalidate_inode_pages2_range returned %d\n", ret);
656 807
657 flags = CEPH_OSD_FLAG_ORDERSNAP | 808 flags = CEPH_OSD_FLAG_ORDERSNAP |
658 CEPH_OSD_FLAG_ONDISK | 809 CEPH_OSD_FLAG_ONDISK |
659 CEPH_OSD_FLAG_WRITE; 810 CEPH_OSD_FLAG_WRITE;
811 } else {
812 flags = CEPH_OSD_FLAG_READ;
813 }
660 814
661 while (iov_iter_count(from) > 0) { 815 while (iov_iter_count(iter) > 0) {
662 u64 len = dio_get_pagev_size(from); 816 u64 size = dio_get_pagev_size(iter);
663 size_t start; 817 size_t start = 0;
664 ssize_t n; 818 ssize_t len;
665 819
666 vino = ceph_vino(inode); 820 vino = ceph_vino(inode);
667 req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout, 821 req = ceph_osdc_new_request(&fsc->client->osdc, &ci->i_layout,
668 vino, pos, &len, 0, 822 vino, pos, &size, 0,
669 2,/*include a 'startsync' command*/ 823 /*include a 'startsync' command*/
670 CEPH_OSD_OP_WRITE, flags, snapc, 824 write ? 2 : 1,
825 write ? CEPH_OSD_OP_WRITE :
826 CEPH_OSD_OP_READ,
827 flags, snapc,
671 ci->i_truncate_seq, 828 ci->i_truncate_seq,
672 ci->i_truncate_size, 829 ci->i_truncate_size,
673 false); 830 false);
@@ -676,10 +833,8 @@ ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos,
676 break; 833 break;
677 } 834 }
678 835
679 osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC, 0); 836 len = size;
680 837 pages = dio_get_pages_alloc(iter, len, &start, &num_pages);
681 n = len;
682 pages = dio_get_pages_alloc(from, len, &start, &num_pages);
683 if (IS_ERR(pages)) { 838 if (IS_ERR(pages)) {
684 ceph_osdc_put_request(req); 839 ceph_osdc_put_request(req);
685 ret = PTR_ERR(pages); 840 ret = PTR_ERR(pages);
@@ -687,47 +842,128 @@ ceph_sync_direct_write(struct kiocb *iocb, struct iov_iter *from, loff_t pos,
687 } 842 }
688 843
689 /* 844 /*
690 * throw out any page cache pages in this range. this 845 * To simplify error handling, allow AIO when IO within i_size
691 * may block. 846 * or IO can be satisfied by single OSD request.
692 */ 847 */
693 truncate_inode_pages_range(inode->i_mapping, pos, 848 if (pos == iocb->ki_pos && !is_sync_kiocb(iocb) &&
694 (pos+n) | (PAGE_CACHE_SIZE-1)); 849 (len == count || pos + count <= i_size_read(inode))) {
695 osd_req_op_extent_osd_data_pages(req, 0, pages, n, start, 850 aio_req = kzalloc(sizeof(*aio_req), GFP_KERNEL);
696 false, false); 851 if (aio_req) {
852 aio_req->iocb = iocb;
853 aio_req->write = write;
854 INIT_LIST_HEAD(&aio_req->osd_reqs);
855 if (write) {
856 aio_req->mtime = mtime;
857 swap(aio_req->prealloc_cf, *pcf);
858 }
859 }
860 /* ignore error */
861 }
862
863 if (write) {
864 /*
865 * throw out any page cache pages in this range. this
866 * may block.
867 */
868 truncate_inode_pages_range(inode->i_mapping, pos,
869 (pos+len) | (PAGE_CACHE_SIZE - 1));
870
871 osd_req_op_init(req, 1, CEPH_OSD_OP_STARTSYNC, 0);
872 }
873
874
875 osd_req_op_extent_osd_data_pages(req, 0, pages, len, start,
876 false, false);
697 877
698 /* BUG_ON(vino.snap != CEPH_NOSNAP); */
699 ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime); 878 ceph_osdc_build_request(req, pos, snapc, vino.snap, &mtime);
700 879
701 ret = ceph_osdc_start_request(&fsc->client->osdc, req, false); 880 if (aio_req) {
881 aio_req->total_len += len;
882 aio_req->num_reqs++;
883 atomic_inc(&aio_req->pending_reqs);
884
885 req->r_callback = ceph_aio_complete_req;
886 req->r_inode = inode;
887 req->r_priv = aio_req;
888 list_add_tail(&req->r_unsafe_item, &aio_req->osd_reqs);
889
890 pos += len;
891 iov_iter_advance(iter, len);
892 continue;
893 }
894
895 ret = ceph_osdc_start_request(req->r_osdc, req, false);
702 if (!ret) 896 if (!ret)
703 ret = ceph_osdc_wait_request(&fsc->client->osdc, req); 897 ret = ceph_osdc_wait_request(&fsc->client->osdc, req);
704 898
899 size = i_size_read(inode);
900 if (!write) {
901 if (ret == -ENOENT)
902 ret = 0;
903 if (ret >= 0 && ret < len && pos + ret < size) {
904 int zlen = min_t(size_t, len - ret,
905 size - pos - ret);
906 ceph_zero_page_vector_range(start + ret, zlen,
907 pages);
908 ret += zlen;
909 }
910 if (ret >= 0)
911 len = ret;
912 }
913
705 ceph_put_page_vector(pages, num_pages, false); 914 ceph_put_page_vector(pages, num_pages, false);
706 915
707 ceph_osdc_put_request(req); 916 ceph_osdc_put_request(req);
708 if (ret) 917 if (ret < 0)
709 break; 918 break;
710 pos += n;
711 written += n;
712 iov_iter_advance(from, n);
713 919
714 if (pos > i_size_read(inode)) { 920 pos += len;
715 check_caps = ceph_inode_set_size(inode, pos); 921 iov_iter_advance(iter, len);
716 if (check_caps) 922
923 if (!write && pos >= size)
924 break;
925
926 if (write && pos > size) {
927 if (ceph_inode_set_size(inode, pos))
717 ceph_check_caps(ceph_inode(inode), 928 ceph_check_caps(ceph_inode(inode),
718 CHECK_CAPS_AUTHONLY, 929 CHECK_CAPS_AUTHONLY,
719 NULL); 930 NULL);
720 } 931 }
721 } 932 }
722 933
723 if (ret != -EOLDSNAPC && written > 0) { 934 if (aio_req) {
935 if (aio_req->num_reqs == 0) {
936 kfree(aio_req);
937 return ret;
938 }
939
940 ceph_get_cap_refs(ci, write ? CEPH_CAP_FILE_WR :
941 CEPH_CAP_FILE_RD);
942
943 while (!list_empty(&aio_req->osd_reqs)) {
944 req = list_first_entry(&aio_req->osd_reqs,
945 struct ceph_osd_request,
946 r_unsafe_item);
947 list_del_init(&req->r_unsafe_item);
948 if (ret >= 0)
949 ret = ceph_osdc_start_request(req->r_osdc,
950 req, false);
951 if (ret < 0) {
952 BUG_ON(ret == -EOLDSNAPC);
953 req->r_result = ret;
954 ceph_aio_complete_req(req, NULL);
955 }
956 }
957 return -EIOCBQUEUED;
958 }
959
960 if (ret != -EOLDSNAPC && pos > iocb->ki_pos) {
961 ret = pos - iocb->ki_pos;
724 iocb->ki_pos = pos; 962 iocb->ki_pos = pos;
725 ret = written;
726 } 963 }
727 return ret; 964 return ret;
728} 965}
729 966
730
731/* 967/*
732 * Synchronous write, straight from __user pointer or user pages. 968 * Synchronous write, straight from __user pointer or user pages.
733 * 969 *
@@ -897,8 +1133,14 @@ again:
897 ceph_cap_string(got)); 1133 ceph_cap_string(got));
898 1134
899 if (ci->i_inline_version == CEPH_INLINE_NONE) { 1135 if (ci->i_inline_version == CEPH_INLINE_NONE) {
900 /* hmm, this isn't really async... */ 1136 if (!retry_op && (iocb->ki_flags & IOCB_DIRECT)) {
901 ret = ceph_sync_read(iocb, to, &retry_op); 1137 ret = ceph_direct_read_write(iocb, to,
1138 NULL, NULL);
1139 if (ret >= 0 && ret < len)
1140 retry_op = CHECK_EOF;
1141 } else {
1142 ret = ceph_sync_read(iocb, to, &retry_op);
1143 }
902 } else { 1144 } else {
903 retry_op = READ_INLINE; 1145 retry_op = READ_INLINE;
904 } 1146 }
@@ -916,7 +1158,7 @@ again:
916 pinned_page = NULL; 1158 pinned_page = NULL;
917 } 1159 }
918 ceph_put_cap_refs(ci, got); 1160 ceph_put_cap_refs(ci, got);
919 if (retry_op && ret >= 0) { 1161 if (retry_op > HAVE_RETRIED && ret >= 0) {
920 int statret; 1162 int statret;
921 struct page *page = NULL; 1163 struct page *page = NULL;
922 loff_t i_size; 1164 loff_t i_size;
@@ -968,12 +1210,11 @@ again:
968 if (retry_op == CHECK_EOF && iocb->ki_pos < i_size && 1210 if (retry_op == CHECK_EOF && iocb->ki_pos < i_size &&
969 ret < len) { 1211 ret < len) {
970 dout("sync_read hit hole, ppos %lld < size %lld" 1212 dout("sync_read hit hole, ppos %lld < size %lld"
971 ", reading more\n", iocb->ki_pos, 1213 ", reading more\n", iocb->ki_pos, i_size);
972 inode->i_size);
973 1214
974 read += ret; 1215 read += ret;
975 len -= ret; 1216 len -= ret;
976 retry_op = 0; 1217 retry_op = HAVE_RETRIED;
977 goto again; 1218 goto again;
978 } 1219 }
979 } 1220 }
@@ -1052,7 +1293,7 @@ retry_snap:
1052 } 1293 }
1053 1294
1054 dout("aio_write %p %llx.%llx %llu~%zd getting caps. i_size %llu\n", 1295 dout("aio_write %p %llx.%llx %llu~%zd getting caps. i_size %llu\n",
1055 inode, ceph_vinop(inode), pos, count, inode->i_size); 1296 inode, ceph_vinop(inode), pos, count, i_size_read(inode));
1056 if (fi->fmode & CEPH_FILE_MODE_LAZY) 1297 if (fi->fmode & CEPH_FILE_MODE_LAZY)
1057 want = CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO; 1298 want = CEPH_CAP_FILE_BUFFER | CEPH_CAP_FILE_LAZYIO;
1058 else 1299 else
@@ -1088,8 +1329,8 @@ retry_snap:
1088 /* we might need to revert back to that point */ 1329 /* we might need to revert back to that point */
1089 data = *from; 1330 data = *from;
1090 if (iocb->ki_flags & IOCB_DIRECT) 1331 if (iocb->ki_flags & IOCB_DIRECT)
1091 written = ceph_sync_direct_write(iocb, &data, pos, 1332 written = ceph_direct_read_write(iocb, &data, snapc,
1092 snapc); 1333 &prealloc_cf);
1093 else 1334 else
1094 written = ceph_sync_write(iocb, &data, pos, snapc); 1335 written = ceph_sync_write(iocb, &data, pos, snapc);
1095 if (written == -EOLDSNAPC) { 1336 if (written == -EOLDSNAPC) {
@@ -1104,7 +1345,7 @@ retry_snap:
1104 iov_iter_advance(from, written); 1345 iov_iter_advance(from, written);
1105 ceph_put_snap_context(snapc); 1346 ceph_put_snap_context(snapc);
1106 } else { 1347 } else {
1107 loff_t old_size = inode->i_size; 1348 loff_t old_size = i_size_read(inode);
1108 /* 1349 /*
1109 * No need to acquire the i_truncate_mutex. Because 1350 * No need to acquire the i_truncate_mutex. Because
1110 * the MDS revokes Fwb caps before sending truncate 1351 * the MDS revokes Fwb caps before sending truncate
@@ -1115,7 +1356,7 @@ retry_snap:
1115 written = generic_perform_write(file, from, pos); 1356 written = generic_perform_write(file, from, pos);
1116 if (likely(written >= 0)) 1357 if (likely(written >= 0))
1117 iocb->ki_pos = pos + written; 1358 iocb->ki_pos = pos + written;
1118 if (inode->i_size > old_size) 1359 if (i_size_read(inode) > old_size)
1119 ceph_fscache_update_objectsize(inode); 1360 ceph_fscache_update_objectsize(inode);
1120 inode_unlock(inode); 1361 inode_unlock(inode);
1121 } 1362 }
@@ -1160,6 +1401,7 @@ out_unlocked:
1160static loff_t ceph_llseek(struct file *file, loff_t offset, int whence) 1401static loff_t ceph_llseek(struct file *file, loff_t offset, int whence)
1161{ 1402{
1162 struct inode *inode = file->f_mapping->host; 1403 struct inode *inode = file->f_mapping->host;
1404 loff_t i_size;
1163 int ret; 1405 int ret;
1164 1406
1165 inode_lock(inode); 1407 inode_lock(inode);
@@ -1172,9 +1414,10 @@ static loff_t ceph_llseek(struct file *file, loff_t offset, int whence)
1172 } 1414 }
1173 } 1415 }
1174 1416
1417 i_size = i_size_read(inode);
1175 switch (whence) { 1418 switch (whence) {
1176 case SEEK_END: 1419 case SEEK_END:
1177 offset += inode->i_size; 1420 offset += i_size;
1178 break; 1421 break;
1179 case SEEK_CUR: 1422 case SEEK_CUR:
1180 /* 1423 /*
@@ -1190,17 +1433,17 @@ static loff_t ceph_llseek(struct file *file, loff_t offset, int whence)
1190 offset += file->f_pos; 1433 offset += file->f_pos;
1191 break; 1434 break;
1192 case SEEK_DATA: 1435 case SEEK_DATA:
1193 if (offset >= inode->i_size) { 1436 if (offset >= i_size) {
1194 ret = -ENXIO; 1437 ret = -ENXIO;
1195 goto out; 1438 goto out;
1196 } 1439 }
1197 break; 1440 break;
1198 case SEEK_HOLE: 1441 case SEEK_HOLE:
1199 if (offset >= inode->i_size) { 1442 if (offset >= i_size) {
1200 ret = -ENXIO; 1443 ret = -ENXIO;
1201 goto out; 1444 goto out;
1202 } 1445 }
1203 offset = inode->i_size; 1446 offset = i_size;
1204 break; 1447 break;
1205 } 1448 }
1206 1449
diff --git a/fs/ceph/inode.c b/fs/ceph/inode.c
index da55eb8bcffa..fb4ba2e4e2a5 100644
--- a/fs/ceph/inode.c
+++ b/fs/ceph/inode.c
@@ -548,7 +548,7 @@ int ceph_fill_file_size(struct inode *inode, int issued,
548 if (ceph_seq_cmp(truncate_seq, ci->i_truncate_seq) > 0 || 548 if (ceph_seq_cmp(truncate_seq, ci->i_truncate_seq) > 0 ||
549 (truncate_seq == ci->i_truncate_seq && size > inode->i_size)) { 549 (truncate_seq == ci->i_truncate_seq && size > inode->i_size)) {
550 dout("size %lld -> %llu\n", inode->i_size, size); 550 dout("size %lld -> %llu\n", inode->i_size, size);
551 inode->i_size = size; 551 i_size_write(inode, size);
552 inode->i_blocks = (size + (1<<9) - 1) >> 9; 552 inode->i_blocks = (size + (1<<9) - 1) >> 9;
553 ci->i_reported_size = size; 553 ci->i_reported_size = size;
554 if (truncate_seq != ci->i_truncate_seq) { 554 if (truncate_seq != ci->i_truncate_seq) {
@@ -808,7 +808,7 @@ static int fill_inode(struct inode *inode, struct page *locked_page,
808 spin_unlock(&ci->i_ceph_lock); 808 spin_unlock(&ci->i_ceph_lock);
809 809
810 err = -EINVAL; 810 err = -EINVAL;
811 if (WARN_ON(symlen != inode->i_size)) 811 if (WARN_ON(symlen != i_size_read(inode)))
812 goto out; 812 goto out;
813 813
814 err = -ENOMEM; 814 err = -ENOMEM;
@@ -1549,7 +1549,7 @@ int ceph_inode_set_size(struct inode *inode, loff_t size)
1549 1549
1550 spin_lock(&ci->i_ceph_lock); 1550 spin_lock(&ci->i_ceph_lock);
1551 dout("set_size %p %llu -> %llu\n", inode, inode->i_size, size); 1551 dout("set_size %p %llu -> %llu\n", inode, inode->i_size, size);
1552 inode->i_size = size; 1552 i_size_write(inode, size);
1553 inode->i_blocks = (size + (1 << 9) - 1) >> 9; 1553 inode->i_blocks = (size + (1 << 9) - 1) >> 9;
1554 1554
1555 /* tell the MDS if we are approaching max_size */ 1555 /* tell the MDS if we are approaching max_size */
@@ -1911,7 +1911,7 @@ int ceph_setattr(struct dentry *dentry, struct iattr *attr)
1911 inode->i_size, attr->ia_size); 1911 inode->i_size, attr->ia_size);
1912 if ((issued & CEPH_CAP_FILE_EXCL) && 1912 if ((issued & CEPH_CAP_FILE_EXCL) &&
1913 attr->ia_size > inode->i_size) { 1913 attr->ia_size > inode->i_size) {
1914 inode->i_size = attr->ia_size; 1914 i_size_write(inode, attr->ia_size);
1915 inode->i_blocks = 1915 inode->i_blocks =
1916 (attr->ia_size + (1 << 9) - 1) >> 9; 1916 (attr->ia_size + (1 << 9) - 1) >> 9;
1917 inode->i_ctime = attr->ia_ctime; 1917 inode->i_ctime = attr->ia_ctime;
diff --git a/include/linux/ceph/ceph_frag.h b/include/linux/ceph/ceph_frag.h
index 5babb8e95352..b827e066e55a 100644
--- a/include/linux/ceph/ceph_frag.h
+++ b/include/linux/ceph/ceph_frag.h
@@ -40,46 +40,11 @@ static inline __u32 ceph_frag_mask_shift(__u32 f)
40 return 24 - ceph_frag_bits(f); 40 return 24 - ceph_frag_bits(f);
41} 41}
42 42
43static inline int ceph_frag_contains_value(__u32 f, __u32 v) 43static inline bool ceph_frag_contains_value(__u32 f, __u32 v)
44{ 44{
45 return (v & ceph_frag_mask(f)) == ceph_frag_value(f); 45 return (v & ceph_frag_mask(f)) == ceph_frag_value(f);
46} 46}
47static inline int ceph_frag_contains_frag(__u32 f, __u32 sub)
48{
49 /* is sub as specific as us, and contained by us? */
50 return ceph_frag_bits(sub) >= ceph_frag_bits(f) &&
51 (ceph_frag_value(sub) & ceph_frag_mask(f)) == ceph_frag_value(f);
52}
53 47
54static inline __u32 ceph_frag_parent(__u32 f)
55{
56 return ceph_frag_make(ceph_frag_bits(f) - 1,
57 ceph_frag_value(f) & (ceph_frag_mask(f) << 1));
58}
59static inline int ceph_frag_is_left_child(__u32 f)
60{
61 return ceph_frag_bits(f) > 0 &&
62 (ceph_frag_value(f) & (0x1000000 >> ceph_frag_bits(f))) == 0;
63}
64static inline int ceph_frag_is_right_child(__u32 f)
65{
66 return ceph_frag_bits(f) > 0 &&
67 (ceph_frag_value(f) & (0x1000000 >> ceph_frag_bits(f))) == 1;
68}
69static inline __u32 ceph_frag_sibling(__u32 f)
70{
71 return ceph_frag_make(ceph_frag_bits(f),
72 ceph_frag_value(f) ^ (0x1000000 >> ceph_frag_bits(f)));
73}
74static inline __u32 ceph_frag_left_child(__u32 f)
75{
76 return ceph_frag_make(ceph_frag_bits(f)+1, ceph_frag_value(f));
77}
78static inline __u32 ceph_frag_right_child(__u32 f)
79{
80 return ceph_frag_make(ceph_frag_bits(f)+1,
81 ceph_frag_value(f) | (0x1000000 >> (1+ceph_frag_bits(f))));
82}
83static inline __u32 ceph_frag_make_child(__u32 f, int by, int i) 48static inline __u32 ceph_frag_make_child(__u32 f, int by, int i)
84{ 49{
85 int newbits = ceph_frag_bits(f) + by; 50 int newbits = ceph_frag_bits(f) + by;
diff --git a/include/linux/ceph/messenger.h b/include/linux/ceph/messenger.h
index 71b1d6cdcb5d..8dbd7879fdc6 100644
--- a/include/linux/ceph/messenger.h
+++ b/include/linux/ceph/messenger.h
@@ -220,6 +220,7 @@ struct ceph_connection {
220 struct ceph_entity_addr actual_peer_addr; 220 struct ceph_entity_addr actual_peer_addr;
221 221
222 /* message out temps */ 222 /* message out temps */
223 struct ceph_msg_header out_hdr;
223 struct ceph_msg *out_msg; /* sending message (== tail of 224 struct ceph_msg *out_msg; /* sending message (== tail of
224 out_sent) */ 225 out_sent) */
225 bool out_msg_done; 226 bool out_msg_done;
@@ -229,7 +230,6 @@ struct ceph_connection {
229 int out_kvec_left; /* kvec's left in out_kvec */ 230 int out_kvec_left; /* kvec's left in out_kvec */
230 int out_skip; /* skip this many bytes */ 231 int out_skip; /* skip this many bytes */
231 int out_kvec_bytes; /* total bytes left */ 232 int out_kvec_bytes; /* total bytes left */
232 bool out_kvec_is_msg; /* kvec refers to out_msg */
233 int out_more; /* there is more data after the kvecs */ 233 int out_more; /* there is more data after the kvecs */
234 __le64 out_temp_ack; /* for writing an ack */ 234 __le64 out_temp_ack; /* for writing an ack */
235 struct ceph_timespec out_temp_keepalive2; /* for writing keepalive2 235 struct ceph_timespec out_temp_keepalive2; /* for writing keepalive2
diff --git a/net/ceph/auth_x.c b/net/ceph/auth_x.c
index 10d87753ed87..9e43a315e662 100644
--- a/net/ceph/auth_x.c
+++ b/net/ceph/auth_x.c
@@ -152,7 +152,6 @@ static int process_one_ticket(struct ceph_auth_client *ac,
152 void *ticket_buf = NULL; 152 void *ticket_buf = NULL;
153 void *tp, *tpend; 153 void *tp, *tpend;
154 void **ptp; 154 void **ptp;
155 struct ceph_timespec new_validity;
156 struct ceph_crypto_key new_session_key; 155 struct ceph_crypto_key new_session_key;
157 struct ceph_buffer *new_ticket_blob; 156 struct ceph_buffer *new_ticket_blob;
158 unsigned long new_expires, new_renew_after; 157 unsigned long new_expires, new_renew_after;
@@ -193,8 +192,8 @@ static int process_one_ticket(struct ceph_auth_client *ac,
193 if (ret) 192 if (ret)
194 goto out; 193 goto out;
195 194
196 ceph_decode_copy(&dp, &new_validity, sizeof(new_validity)); 195 ceph_decode_timespec(&validity, dp);
197 ceph_decode_timespec(&validity, &new_validity); 196 dp += sizeof(struct ceph_timespec);
198 new_expires = get_seconds() + validity.tv_sec; 197 new_expires = get_seconds() + validity.tv_sec;
199 new_renew_after = new_expires - (validity.tv_sec / 4); 198 new_renew_after = new_expires - (validity.tv_sec / 4);
200 dout(" expires=%lu renew_after=%lu\n", new_expires, 199 dout(" expires=%lu renew_after=%lu\n", new_expires,
@@ -233,10 +232,10 @@ static int process_one_ticket(struct ceph_auth_client *ac,
233 ceph_buffer_put(th->ticket_blob); 232 ceph_buffer_put(th->ticket_blob);
234 th->session_key = new_session_key; 233 th->session_key = new_session_key;
235 th->ticket_blob = new_ticket_blob; 234 th->ticket_blob = new_ticket_blob;
236 th->validity = new_validity;
237 th->secret_id = new_secret_id; 235 th->secret_id = new_secret_id;
238 th->expires = new_expires; 236 th->expires = new_expires;
239 th->renew_after = new_renew_after; 237 th->renew_after = new_renew_after;
238 th->have_key = true;
240 dout(" got ticket service %d (%s) secret_id %lld len %d\n", 239 dout(" got ticket service %d (%s) secret_id %lld len %d\n",
241 type, ceph_entity_type_name(type), th->secret_id, 240 type, ceph_entity_type_name(type), th->secret_id,
242 (int)th->ticket_blob->vec.iov_len); 241 (int)th->ticket_blob->vec.iov_len);
@@ -384,6 +383,24 @@ bad:
384 return -ERANGE; 383 return -ERANGE;
385} 384}
386 385
386static bool need_key(struct ceph_x_ticket_handler *th)
387{
388 if (!th->have_key)
389 return true;
390
391 return get_seconds() >= th->renew_after;
392}
393
394static bool have_key(struct ceph_x_ticket_handler *th)
395{
396 if (th->have_key) {
397 if (get_seconds() >= th->expires)
398 th->have_key = false;
399 }
400
401 return th->have_key;
402}
403
387static void ceph_x_validate_tickets(struct ceph_auth_client *ac, int *pneed) 404static void ceph_x_validate_tickets(struct ceph_auth_client *ac, int *pneed)
388{ 405{
389 int want = ac->want_keys; 406 int want = ac->want_keys;
@@ -402,20 +419,18 @@ static void ceph_x_validate_tickets(struct ceph_auth_client *ac, int *pneed)
402 continue; 419 continue;
403 420
404 th = get_ticket_handler(ac, service); 421 th = get_ticket_handler(ac, service);
405
406 if (IS_ERR(th)) { 422 if (IS_ERR(th)) {
407 *pneed |= service; 423 *pneed |= service;
408 continue; 424 continue;
409 } 425 }
410 426
411 if (get_seconds() >= th->renew_after) 427 if (need_key(th))
412 *pneed |= service; 428 *pneed |= service;
413 if (get_seconds() >= th->expires) 429 if (!have_key(th))
414 xi->have_keys &= ~service; 430 xi->have_keys &= ~service;
415 } 431 }
416} 432}
417 433
418
419static int ceph_x_build_request(struct ceph_auth_client *ac, 434static int ceph_x_build_request(struct ceph_auth_client *ac,
420 void *buf, void *end) 435 void *buf, void *end)
421{ 436{
@@ -667,14 +682,26 @@ static void ceph_x_destroy(struct ceph_auth_client *ac)
667 ac->private = NULL; 682 ac->private = NULL;
668} 683}
669 684
670static void ceph_x_invalidate_authorizer(struct ceph_auth_client *ac, 685static void invalidate_ticket(struct ceph_auth_client *ac, int peer_type)
671 int peer_type)
672{ 686{
673 struct ceph_x_ticket_handler *th; 687 struct ceph_x_ticket_handler *th;
674 688
675 th = get_ticket_handler(ac, peer_type); 689 th = get_ticket_handler(ac, peer_type);
676 if (!IS_ERR(th)) 690 if (!IS_ERR(th))
677 memset(&th->validity, 0, sizeof(th->validity)); 691 th->have_key = false;
692}
693
694static void ceph_x_invalidate_authorizer(struct ceph_auth_client *ac,
695 int peer_type)
696{
697 /*
698 * We are to invalidate a service ticket in the hopes of
699 * getting a new, hopefully more valid, one. But, we won't get
700 * it unless our AUTH ticket is good, so invalidate AUTH ticket
701 * as well, just in case.
702 */
703 invalidate_ticket(ac, peer_type);
704 invalidate_ticket(ac, CEPH_ENTITY_TYPE_AUTH);
678} 705}
679 706
680static int calcu_signature(struct ceph_x_authorizer *au, 707static int calcu_signature(struct ceph_x_authorizer *au,
diff --git a/net/ceph/auth_x.h b/net/ceph/auth_x.h
index e8b7c6917d47..40b1a3cf7397 100644
--- a/net/ceph/auth_x.h
+++ b/net/ceph/auth_x.h
@@ -16,7 +16,7 @@ struct ceph_x_ticket_handler {
16 unsigned int service; 16 unsigned int service;
17 17
18 struct ceph_crypto_key session_key; 18 struct ceph_crypto_key session_key;
19 struct ceph_timespec validity; 19 bool have_key;
20 20
21 u64 secret_id; 21 u64 secret_id;
22 struct ceph_buffer *ticket_blob; 22 struct ceph_buffer *ticket_blob;
diff --git a/net/ceph/messenger.c b/net/ceph/messenger.c
index 9981039ef4ff..9cfedf565f5b 100644
--- a/net/ceph/messenger.c
+++ b/net/ceph/messenger.c
@@ -23,9 +23,6 @@
23#include <linux/ceph/pagelist.h> 23#include <linux/ceph/pagelist.h>
24#include <linux/export.h> 24#include <linux/export.h>
25 25
26#define list_entry_next(pos, member) \
27 list_entry(pos->member.next, typeof(*pos), member)
28
29/* 26/*
30 * Ceph uses the messenger to exchange ceph_msg messages with other 27 * Ceph uses the messenger to exchange ceph_msg messages with other
31 * hosts in the system. The messenger provides ordered and reliable 28 * hosts in the system. The messenger provides ordered and reliable
@@ -672,6 +669,8 @@ static void reset_connection(struct ceph_connection *con)
672 } 669 }
673 con->in_seq = 0; 670 con->in_seq = 0;
674 con->in_seq_acked = 0; 671 con->in_seq_acked = 0;
672
673 con->out_skip = 0;
675} 674}
676 675
677/* 676/*
@@ -771,6 +770,8 @@ static u32 get_global_seq(struct ceph_messenger *msgr, u32 gt)
771 770
772static void con_out_kvec_reset(struct ceph_connection *con) 771static void con_out_kvec_reset(struct ceph_connection *con)
773{ 772{
773 BUG_ON(con->out_skip);
774
774 con->out_kvec_left = 0; 775 con->out_kvec_left = 0;
775 con->out_kvec_bytes = 0; 776 con->out_kvec_bytes = 0;
776 con->out_kvec_cur = &con->out_kvec[0]; 777 con->out_kvec_cur = &con->out_kvec[0];
@@ -779,9 +780,9 @@ static void con_out_kvec_reset(struct ceph_connection *con)
779static void con_out_kvec_add(struct ceph_connection *con, 780static void con_out_kvec_add(struct ceph_connection *con,
780 size_t size, void *data) 781 size_t size, void *data)
781{ 782{
782 int index; 783 int index = con->out_kvec_left;
783 784
784 index = con->out_kvec_left; 785 BUG_ON(con->out_skip);
785 BUG_ON(index >= ARRAY_SIZE(con->out_kvec)); 786 BUG_ON(index >= ARRAY_SIZE(con->out_kvec));
786 787
787 con->out_kvec[index].iov_len = size; 788 con->out_kvec[index].iov_len = size;
@@ -790,6 +791,27 @@ static void con_out_kvec_add(struct ceph_connection *con,
790 con->out_kvec_bytes += size; 791 con->out_kvec_bytes += size;
791} 792}
792 793
794/*
795 * Chop off a kvec from the end. Return residual number of bytes for
796 * that kvec, i.e. how many bytes would have been written if the kvec
797 * hadn't been nuked.
798 */
799static int con_out_kvec_skip(struct ceph_connection *con)
800{
801 int off = con->out_kvec_cur - con->out_kvec;
802 int skip = 0;
803
804 if (con->out_kvec_bytes > 0) {
805 skip = con->out_kvec[off + con->out_kvec_left - 1].iov_len;
806 BUG_ON(con->out_kvec_bytes < skip);
807 BUG_ON(!con->out_kvec_left);
808 con->out_kvec_bytes -= skip;
809 con->out_kvec_left--;
810 }
811
812 return skip;
813}
814
793#ifdef CONFIG_BLOCK 815#ifdef CONFIG_BLOCK
794 816
795/* 817/*
@@ -1042,7 +1064,7 @@ static bool ceph_msg_data_pagelist_advance(struct ceph_msg_data_cursor *cursor,
1042 /* Move on to the next page */ 1064 /* Move on to the next page */
1043 1065
1044 BUG_ON(list_is_last(&cursor->page->lru, &pagelist->head)); 1066 BUG_ON(list_is_last(&cursor->page->lru, &pagelist->head));
1045 cursor->page = list_entry_next(cursor->page, lru); 1067 cursor->page = list_next_entry(cursor->page, lru);
1046 cursor->last_piece = cursor->resid <= PAGE_SIZE; 1068 cursor->last_piece = cursor->resid <= PAGE_SIZE;
1047 1069
1048 return true; 1070 return true;
@@ -1166,7 +1188,7 @@ static bool ceph_msg_data_advance(struct ceph_msg_data_cursor *cursor,
1166 if (!cursor->resid && cursor->total_resid) { 1188 if (!cursor->resid && cursor->total_resid) {
1167 WARN_ON(!cursor->last_piece); 1189 WARN_ON(!cursor->last_piece);
1168 BUG_ON(list_is_last(&cursor->data->links, cursor->data_head)); 1190 BUG_ON(list_is_last(&cursor->data->links, cursor->data_head));
1169 cursor->data = list_entry_next(cursor->data, links); 1191 cursor->data = list_next_entry(cursor->data, links);
1170 __ceph_msg_data_cursor_init(cursor); 1192 __ceph_msg_data_cursor_init(cursor);
1171 new_piece = true; 1193 new_piece = true;
1172 } 1194 }
@@ -1197,7 +1219,6 @@ static void prepare_write_message_footer(struct ceph_connection *con)
1197 m->footer.flags |= CEPH_MSG_FOOTER_COMPLETE; 1219 m->footer.flags |= CEPH_MSG_FOOTER_COMPLETE;
1198 1220
1199 dout("prepare_write_message_footer %p\n", con); 1221 dout("prepare_write_message_footer %p\n", con);
1200 con->out_kvec_is_msg = true;
1201 con->out_kvec[v].iov_base = &m->footer; 1222 con->out_kvec[v].iov_base = &m->footer;
1202 if (con->peer_features & CEPH_FEATURE_MSG_AUTH) { 1223 if (con->peer_features & CEPH_FEATURE_MSG_AUTH) {
1203 if (con->ops->sign_message) 1224 if (con->ops->sign_message)
@@ -1225,7 +1246,6 @@ static void prepare_write_message(struct ceph_connection *con)
1225 u32 crc; 1246 u32 crc;
1226 1247
1227 con_out_kvec_reset(con); 1248 con_out_kvec_reset(con);
1228 con->out_kvec_is_msg = true;
1229 con->out_msg_done = false; 1249 con->out_msg_done = false;
1230 1250
1231 /* Sneak an ack in there first? If we can get it into the same 1251 /* Sneak an ack in there first? If we can get it into the same
@@ -1265,18 +1285,19 @@ static void prepare_write_message(struct ceph_connection *con)
1265 1285
1266 /* tag + hdr + front + middle */ 1286 /* tag + hdr + front + middle */
1267 con_out_kvec_add(con, sizeof (tag_msg), &tag_msg); 1287 con_out_kvec_add(con, sizeof (tag_msg), &tag_msg);
1268 con_out_kvec_add(con, sizeof (m->hdr), &m->hdr); 1288 con_out_kvec_add(con, sizeof(con->out_hdr), &con->out_hdr);
1269 con_out_kvec_add(con, m->front.iov_len, m->front.iov_base); 1289 con_out_kvec_add(con, m->front.iov_len, m->front.iov_base);
1270 1290
1271 if (m->middle) 1291 if (m->middle)
1272 con_out_kvec_add(con, m->middle->vec.iov_len, 1292 con_out_kvec_add(con, m->middle->vec.iov_len,
1273 m->middle->vec.iov_base); 1293 m->middle->vec.iov_base);
1274 1294
1275 /* fill in crc (except data pages), footer */ 1295 /* fill in hdr crc and finalize hdr */
1276 crc = crc32c(0, &m->hdr, offsetof(struct ceph_msg_header, crc)); 1296 crc = crc32c(0, &m->hdr, offsetof(struct ceph_msg_header, crc));
1277 con->out_msg->hdr.crc = cpu_to_le32(crc); 1297 con->out_msg->hdr.crc = cpu_to_le32(crc);
1278 con->out_msg->footer.flags = 0; 1298 memcpy(&con->out_hdr, &con->out_msg->hdr, sizeof(con->out_hdr));
1279 1299
1300 /* fill in front and middle crc, footer */
1280 crc = crc32c(0, m->front.iov_base, m->front.iov_len); 1301 crc = crc32c(0, m->front.iov_base, m->front.iov_len);
1281 con->out_msg->footer.front_crc = cpu_to_le32(crc); 1302 con->out_msg->footer.front_crc = cpu_to_le32(crc);
1282 if (m->middle) { 1303 if (m->middle) {
@@ -1288,6 +1309,7 @@ static void prepare_write_message(struct ceph_connection *con)
1288 dout("%s front_crc %u middle_crc %u\n", __func__, 1309 dout("%s front_crc %u middle_crc %u\n", __func__,
1289 le32_to_cpu(con->out_msg->footer.front_crc), 1310 le32_to_cpu(con->out_msg->footer.front_crc),
1290 le32_to_cpu(con->out_msg->footer.middle_crc)); 1311 le32_to_cpu(con->out_msg->footer.middle_crc));
1312 con->out_msg->footer.flags = 0;
1291 1313
1292 /* is there a data payload? */ 1314 /* is there a data payload? */
1293 con->out_msg->footer.data_crc = 0; 1315 con->out_msg->footer.data_crc = 0;
@@ -1492,7 +1514,6 @@ static int write_partial_kvec(struct ceph_connection *con)
1492 } 1514 }
1493 } 1515 }
1494 con->out_kvec_left = 0; 1516 con->out_kvec_left = 0;
1495 con->out_kvec_is_msg = false;
1496 ret = 1; 1517 ret = 1;
1497out: 1518out:
1498 dout("write_partial_kvec %p %d left in %d kvecs ret = %d\n", con, 1519 dout("write_partial_kvec %p %d left in %d kvecs ret = %d\n", con,
@@ -1584,6 +1605,7 @@ static int write_partial_skip(struct ceph_connection *con)
1584{ 1605{
1585 int ret; 1606 int ret;
1586 1607
1608 dout("%s %p %d left\n", __func__, con, con->out_skip);
1587 while (con->out_skip > 0) { 1609 while (con->out_skip > 0) {
1588 size_t size = min(con->out_skip, (int) PAGE_CACHE_SIZE); 1610 size_t size = min(con->out_skip, (int) PAGE_CACHE_SIZE);
1589 1611
@@ -2506,13 +2528,13 @@ more:
2506 2528
2507more_kvec: 2529more_kvec:
2508 /* kvec data queued? */ 2530 /* kvec data queued? */
2509 if (con->out_skip) { 2531 if (con->out_kvec_left) {
2510 ret = write_partial_skip(con); 2532 ret = write_partial_kvec(con);
2511 if (ret <= 0) 2533 if (ret <= 0)
2512 goto out; 2534 goto out;
2513 } 2535 }
2514 if (con->out_kvec_left) { 2536 if (con->out_skip) {
2515 ret = write_partial_kvec(con); 2537 ret = write_partial_skip(con);
2516 if (ret <= 0) 2538 if (ret <= 0)
2517 goto out; 2539 goto out;
2518 } 2540 }
@@ -2805,13 +2827,17 @@ static bool con_backoff(struct ceph_connection *con)
2805 2827
2806static void con_fault_finish(struct ceph_connection *con) 2828static void con_fault_finish(struct ceph_connection *con)
2807{ 2829{
2830 dout("%s %p\n", __func__, con);
2831
2808 /* 2832 /*
2809 * in case we faulted due to authentication, invalidate our 2833 * in case we faulted due to authentication, invalidate our
2810 * current tickets so that we can get new ones. 2834 * current tickets so that we can get new ones.
2811 */ 2835 */
2812 if (con->auth_retry && con->ops->invalidate_authorizer) { 2836 if (con->auth_retry) {
2813 dout("calling invalidate_authorizer()\n"); 2837 dout("auth_retry %d, invalidating\n", con->auth_retry);
2814 con->ops->invalidate_authorizer(con); 2838 if (con->ops->invalidate_authorizer)
2839 con->ops->invalidate_authorizer(con);
2840 con->auth_retry = 0;
2815 } 2841 }
2816 2842
2817 if (con->ops->fault) 2843 if (con->ops->fault)
@@ -3050,16 +3076,31 @@ void ceph_msg_revoke(struct ceph_msg *msg)
3050 ceph_msg_put(msg); 3076 ceph_msg_put(msg);
3051 } 3077 }
3052 if (con->out_msg == msg) { 3078 if (con->out_msg == msg) {
3053 dout("%s %p msg %p - was sending\n", __func__, con, msg); 3079 BUG_ON(con->out_skip);
3054 con->out_msg = NULL; 3080 /* footer */
3055 if (con->out_kvec_is_msg) { 3081 if (con->out_msg_done) {
3056 con->out_skip = con->out_kvec_bytes; 3082 con->out_skip += con_out_kvec_skip(con);
3057 con->out_kvec_is_msg = false; 3083 } else {
3084 BUG_ON(!msg->data_length);
3085 if (con->peer_features & CEPH_FEATURE_MSG_AUTH)
3086 con->out_skip += sizeof(msg->footer);
3087 else
3088 con->out_skip += sizeof(msg->old_footer);
3058 } 3089 }
3090 /* data, middle, front */
3091 if (msg->data_length)
3092 con->out_skip += msg->cursor.total_resid;
3093 if (msg->middle)
3094 con->out_skip += con_out_kvec_skip(con);
3095 con->out_skip += con_out_kvec_skip(con);
3096
3097 dout("%s %p msg %p - was sending, will write %d skip %d\n",
3098 __func__, con, msg, con->out_kvec_bytes, con->out_skip);
3059 msg->hdr.seq = 0; 3099 msg->hdr.seq = 0;
3060 3100 con->out_msg = NULL;
3061 ceph_msg_put(msg); 3101 ceph_msg_put(msg);
3062 } 3102 }
3103
3063 mutex_unlock(&con->mutex); 3104 mutex_unlock(&con->mutex);
3064} 3105}
3065 3106
@@ -3361,9 +3402,7 @@ static void ceph_msg_free(struct ceph_msg *m)
3361static void ceph_msg_release(struct kref *kref) 3402static void ceph_msg_release(struct kref *kref)
3362{ 3403{
3363 struct ceph_msg *m = container_of(kref, struct ceph_msg, kref); 3404 struct ceph_msg *m = container_of(kref, struct ceph_msg, kref);
3364 LIST_HEAD(data); 3405 struct ceph_msg_data *data, *next;
3365 struct list_head *links;
3366 struct list_head *next;
3367 3406
3368 dout("%s %p\n", __func__, m); 3407 dout("%s %p\n", __func__, m);
3369 WARN_ON(!list_empty(&m->list_head)); 3408 WARN_ON(!list_empty(&m->list_head));
@@ -3376,12 +3415,8 @@ static void ceph_msg_release(struct kref *kref)
3376 m->middle = NULL; 3415 m->middle = NULL;
3377 } 3416 }
3378 3417
3379 list_splice_init(&m->data, &data); 3418 list_for_each_entry_safe(data, next, &m->data, links) {
3380 list_for_each_safe(links, next, &data) { 3419 list_del_init(&data->links);
3381 struct ceph_msg_data *data;
3382
3383 data = list_entry(links, struct ceph_msg_data, links);
3384 list_del_init(links);
3385 ceph_msg_data_destroy(data); 3420 ceph_msg_data_destroy(data);
3386 } 3421 }
3387 m->data_length = 0; 3422 m->data_length = 0;
diff --git a/net/ceph/mon_client.c b/net/ceph/mon_client.c
index edda01626a45..de85dddc3dc0 100644
--- a/net/ceph/mon_client.c
+++ b/net/ceph/mon_client.c
@@ -364,10 +364,6 @@ static bool have_debugfs_info(struct ceph_mon_client *monc)
364 return monc->client->have_fsid && monc->auth->global_id > 0; 364 return monc->client->have_fsid && monc->auth->global_id > 0;
365} 365}
366 366
367/*
368 * The monitor responds with mount ack indicate mount success. The
369 * included client ticket allows the client to talk to MDSs and OSDs.
370 */
371static void ceph_monc_handle_map(struct ceph_mon_client *monc, 367static void ceph_monc_handle_map(struct ceph_mon_client *monc,
372 struct ceph_msg *msg) 368 struct ceph_msg *msg)
373{ 369{